1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
//! Django-style synchronous signal dispatcher
//!
//! This module provides a synchronous signal system compatible with Django's dispatch pattern.
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
/// Receiver function type for synchronous signals
pub type SyncReceiverFn = Arc<
dyn Fn(Option<Arc<dyn Any + Send + Sync>>, &HashMap<String, String>) -> String + Send + Sync,
>;
/// Type alias for signal receiver function
type ReceiverFn =
dyn Fn(Option<Arc<dyn Any + Send + Sync>>, &HashMap<String, String>) -> String + Send + Sync;
/// Synchronous signal that mimics Django's Signal class
#[derive(Clone)]
pub struct SyncSignal {
receivers: Arc<RwLock<Vec<SignalReceiver>>>,
/// Caching flag reserved for future optimization
///
/// This field is intentionally excluded from current implementation.
/// The caching mechanism is designed but not used in the current send() implementation.
///
/// Planned usage:
/// - Cache resolved receiver functions to avoid repeated lookups
/// - Optimize signal dispatch performance for frequently-fired signals
/// - Implement cache invalidation on receiver registration/deregistration
///
/// Implementation requires:
/// - Cache storage design (per-signal or global)
/// - Cache invalidation strategy
/// - Thread-safety considerations for cached function pointers
// Allow dead_code: caching flag reserved for planned dispatch optimization; not yet used in current send() implementation
#[allow(dead_code)]
use_caching: bool,
}
struct SignalReceiver {
receiver: Weak<ReceiverFn>,
sender_type_id: Option<std::any::TypeId>,
dispatch_uid: Option<String>,
// Keep a strong reference to prevent premature deallocation (when caller transfers ownership)
_strong_ref: Option<SyncReceiverFn>,
}
impl SyncSignal {
/// Create a new synchronous signal
pub fn new() -> Self {
Self {
receivers: Arc::new(RwLock::new(Vec::new())),
use_caching: false,
}
}
/// Create a new synchronous signal with caching
pub fn new_with_caching() -> Self {
Self {
receivers: Arc::new(RwLock::new(Vec::new())),
use_caching: true,
}
}
/// Connect a receiver to this signal
pub fn connect<F>(
&self,
receiver: Arc<F>,
sender_type_id: Option<std::any::TypeId>,
dispatch_uid: Option<String>,
) -> Result<(), String>
where
F: Fn(Option<Arc<dyn Any + Send + Sync>>, &HashMap<String, String>) -> String
+ Send
+ Sync
+ 'static,
{
// Check if caller has other references before converting
let should_store_strong = Arc::strong_count(&receiver) == 1;
// Store the Arc as a trait object
let receiver_arc: SyncReceiverFn = receiver;
let weak_receiver = Arc::downgrade(&receiver_arc);
let mut receivers = self.receivers.write();
// Remove existing receiver with same dispatch_uid
if let Some(ref uid) = dispatch_uid {
receivers.retain(|r| r.dispatch_uid.as_ref() != Some(uid));
}
// Prevent duplicate registrations
let receiver_ptr = weak_receiver.as_ptr();
receivers.retain(|r| !std::ptr::addr_eq(r.receiver.as_ptr(), receiver_ptr));
receivers.push(SignalReceiver {
receiver: weak_receiver,
sender_type_id,
dispatch_uid,
// Only store strong ref if caller has no other references (ownership transfer)
_strong_ref: if should_store_strong {
Some(receiver_arc)
} else {
None
},
});
Ok(())
}
/// Disconnect a receiver by dispatch_uid
/// If dispatch_uid is None, disconnects all receivers
pub fn disconnect(&self, dispatch_uid: Option<&str>) -> bool {
let mut receivers = self.receivers.write();
let original_len = receivers.len();
if let Some(uid) = dispatch_uid {
receivers.retain(|r| r.dispatch_uid.as_deref() != Some(uid));
} else {
// If no dispatch_uid provided, clear all receivers
receivers.clear();
}
receivers.len() < original_len
}
/// Send signal to all connected receivers
///
/// Receivers are collected under the lock, then the lock is released before
/// invoking callbacks to prevent deadlock if a callback tries to modify
/// the signal (connect/disconnect/send).
pub fn send(
&self,
sender: Option<Arc<dyn Any + Send + Sync>>,
kwargs: &HashMap<String, String>,
) -> Vec<(String, String)> {
self.clear_dead_receivers();
// Collect live receivers under the lock, then release it before invocation
let live_receivers: Vec<(Option<std::any::TypeId>, SyncReceiverFn)> = {
let receivers = self.receivers.read();
receivers
.iter()
.filter_map(|r| r.receiver.upgrade().map(|recv| (r.sender_type_id, recv)))
.collect()
};
// Lock is now released; safe to invoke callbacks without deadlock risk
let mut results = Vec::new();
for (sender_type_id, receiver) in &live_receivers {
// Check sender type match
if let Some(expected_type_id) = sender_type_id {
if let Some(ref actual_sender) = sender {
// Must explicitly dereference Arc to get the underlying TypeId
if (**actual_sender).type_id() != *expected_type_id {
continue; // Type mismatch
}
} else {
continue; // Receiver expects a specific sender, but None was provided
}
}
let result = receiver(sender.clone(), kwargs);
results.push(("receiver".to_string(), result));
}
results
}
/// Send signal robustly (catching panics)
///
/// Receivers are collected under the lock, then the lock is released before
/// invoking callbacks to prevent deadlock if a callback tries to modify
/// the signal (connect/disconnect/send).
pub fn send_robust(
&self,
sender: Option<Arc<dyn Any + Send + Sync>>,
kwargs: &HashMap<String, String>,
) -> Vec<(String, Result<String, String>)> {
self.clear_dead_receivers();
// Collect live receivers under the lock, then release it before invocation
let live_receivers: Vec<(Option<std::any::TypeId>, SyncReceiverFn)> = {
let receivers = self.receivers.read();
receivers
.iter()
.filter_map(|r| r.receiver.upgrade().map(|recv| (r.sender_type_id, recv)))
.collect()
};
// Lock is now released; safe to invoke callbacks without deadlock risk
let mut results = Vec::new();
for (sender_type_id, receiver) in &live_receivers {
// Check sender type match
if let Some(expected_type_id) = sender_type_id {
if let Some(ref actual_sender) = sender {
// Must explicitly dereference Arc to get the underlying TypeId
if (**actual_sender).type_id() != *expected_type_id {
continue; // Type mismatch
}
} else {
continue; // Receiver expects a specific sender, but None was provided
}
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
receiver(sender.clone(), kwargs)
}));
match result {
Ok(val) => results.push(("receiver".to_string(), Ok(val))),
Err(_) => results.push(("receiver".to_string(), Err("panic".to_string()))),
}
}
results
}
/// Check if signal has any listeners
pub fn has_listeners(&self) -> bool {
self.clear_dead_receivers();
let receivers = self.receivers.read();
!receivers.is_empty()
}
/// Get receiver count
pub fn receivers_count(&self) -> usize {
self.receivers.read().len()
}
/// Clear dead (garbage collected) receivers
pub fn clear_dead_receivers(&self) {
let mut receivers = self.receivers.write();
receivers.retain(|r| r.receiver.strong_count() > 0);
}
}
impl Default for SyncSignal {
fn default() -> Self {
Self::new()
}
}