1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
use crate::compat::HashMap;
use crate::compat::{Mutex, RwLock};
use alloc::sync::Arc;
use core::sync::atomic::{AtomicU64, Ordering};
/// Slot execution priority. Higher-priority slots fire before lower-priority ones.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum Priority {
/// High priority — fired first.
High,
/// Normal priority — default.
#[default]
Normal,
/// Low priority — fired last.
Low,
}
impl Priority {
fn rank(&self) -> u8 {
match self {
Priority::High => 0,
Priority::Normal => 1,
Priority::Low => 2,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
/// Opaque connection handle used to disconnect a slot.
pub struct ConnectionHandle(pub u64);
static NEXT_HANDLE: AtomicU64 = AtomicU64::new(1);
type SlotFn<T> = Box<dyn FnMut(Arc<T>) + Send + Sync + 'static>;
struct SlotEntry<T: Clone + Send + 'static> {
callback: Option<SlotFn<T>>,
once: bool,
blocked: bool,
priority: Priority,
}
struct SignalInner<T: Clone + Send + 'static> {
slots: RwLock<HashMap<ConnectionHandle, SlotEntry<T>>>,
}
impl<T: Clone + Send + 'static> SignalInner<T> {
fn disconnect(&self, handle: ConnectionHandle) -> bool {
self.slots.write().expect("signal lock poisoned").remove(&handle).is_some()
}
fn block(&self, handle: ConnectionHandle) -> bool {
if let Some(entry) = self.slots.write().expect("signal lock poisoned").get_mut(&handle) {
entry.blocked = true;
true
} else {
false
}
}
fn unblock(&self, handle: ConnectionHandle) -> bool {
if let Some(entry) = self.slots.write().expect("signal lock poisoned").get_mut(&handle) {
entry.blocked = false;
true
} else {
false
}
}
fn is_blocked(&self, handle: ConnectionHandle) -> Option<bool> {
self.slots.read().expect("signal lock poisoned").get(&handle).map(|entry| entry.blocked)
}
fn set_priority(&self, handle: ConnectionHandle, priority: Priority) -> bool {
if let Some(entry) = self.slots.write().expect("signal lock poisoned").get_mut(&handle) {
entry.priority = priority;
true
} else {
false
}
}
}
/// Owner scope that automatically disconnects tracked signal connections on drop.
#[derive(Default)]
pub struct ConnectionScope {
disconnectors: Mutex<Vec<Box<dyn FnOnce() + Send + 'static>>>,
}
impl ConnectionScope {
/// Create an empty connection scope.
pub fn new() -> Self {
Self::default()
}
/// Manually clear all tracked connections without dropping the scope.
pub fn clear(&self) {
let mut disconnectors = self.disconnectors.lock().unwrap_or_else(|e| e.into_inner());
while let Some(disconnector) = disconnectors.pop() {
disconnector();
}
}
/// Returns the number of connections currently tracked by this scope.
pub fn disconnect_count(&self) -> usize {
self.disconnectors.lock().unwrap_or_else(|e| e.into_inner()).len()
}
fn track(&self, disconnector: Box<dyn FnOnce() + Send + 'static>) {
self.disconnectors.lock().unwrap_or_else(|e| e.into_inner()).push(disconnector);
}
}
impl Drop for ConnectionScope {
fn drop(&mut self) {
let mut disconnectors = self.disconnectors.lock().unwrap_or_else(|e| e.into_inner());
while let Some(disconnector) = disconnectors.pop() {
disconnector();
}
}
}
/// Generic signal type with typed payload, `once` slots, and scoped auto-disconnect.
#[derive(Clone)]
pub struct Signal<T: Clone + Send + 'static> {
inner: Arc<SignalInner<T>>,
}
impl<T: Clone + Send + 'static> Signal<T> {
/// Create an empty signal.
pub fn new() -> Self {
Self { inner: Arc::new(SignalInner { slots: RwLock::new(HashMap::new()) }) }
}
/// Connect a slot and return its connection handle.
pub fn connect<F>(&self, slot: F) -> ConnectionHandle
where
F: FnMut(Arc<T>) + Send + Sync + 'static,
{
self.connect_with_priority(slot, Priority::Normal)
}
/// Connect a slot with a specific priority.
/// High-priority slots fire before Normal, Normal before Low.
pub fn connect_with_priority<F>(&self, slot: F, priority: Priority) -> ConnectionHandle
where
F: FnMut(Arc<T>) + Send + Sync + 'static,
{
let handle = ConnectionHandle(NEXT_HANDLE.fetch_add(1, Ordering::Relaxed));
self.inner.slots.write().expect("signal lock poisoned").insert(
handle,
SlotEntry { callback: Some(Box::new(slot)), once: false, blocked: false, priority },
);
handle
}
/// Connect a slot that is invoked once and then disconnected automatically.
pub fn connect_once<F>(&self, slot: F) -> ConnectionHandle
where
F: FnMut(Arc<T>) + Send + Sync + 'static,
{
let handle = ConnectionHandle(NEXT_HANDLE.fetch_add(1, Ordering::Relaxed));
self.inner.slots.write().expect("signal lock poisoned").insert(
handle,
SlotEntry {
callback: Some(Box::new(slot)),
once: true,
blocked: false,
priority: Priority::Normal,
},
);
handle
}
/// Connect a slot bound to a connection scope. It disconnects when the scope is dropped.
pub fn connect_scoped<F>(&self, owner: &ConnectionScope, slot: F) -> ConnectionHandle
where
F: FnMut(Arc<T>) + Send + Sync + 'static,
{
let handle = self.connect(slot);
self.track_owner(owner, handle);
handle
}
/// Connect a once-slot bound to a connection scope.
pub fn connect_once_scoped<F>(&self, owner: &ConnectionScope, slot: F) -> ConnectionHandle
where
F: FnMut(Arc<T>) + Send + Sync + 'static,
{
let handle = self.connect_once(slot);
self.track_owner(owner, handle);
handle
}
/// Disconnect slot by handle. Returns true if the handle was valid.
pub fn disconnect(&self, handle: ConnectionHandle) -> bool {
self.inner.disconnect(handle)
}
/// Disconnect all slots registered on this signal.
pub fn disconnect_all(&self) {
self.inner.slots.write().expect("signal lock poisoned").clear();
}
/// Temporarily block a slot without disconnecting it. Returns true if the handle was valid.
pub fn block(&self, handle: ConnectionHandle) -> bool {
self.inner.block(handle)
}
/// Unblock a previously blocked slot. Returns true if the handle was valid.
pub fn unblock(&self, handle: ConnectionHandle) -> bool {
self.inner.unblock(handle)
}
/// Returns `Some(true/false)` if the handle exists, `None` if invalid.
pub fn is_blocked(&self, handle: ConnectionHandle) -> Option<bool> {
self.inner.is_blocked(handle)
}
/// Returns `true` if the handle is still connected (valid).
pub fn is_connected(&self, handle: ConnectionHandle) -> bool {
self.inner.slots.read().expect("signal lock poisoned").contains_key(&handle)
}
/// Change the priority of an existing connection. Returns true if the handle was valid.
pub fn set_priority(&self, handle: ConnectionHandle, priority: Priority) -> bool {
self.inner.set_priority(handle, priority)
}
/// Emit a cloned value to all connected (non-blocked) slots.
///
/// This method safely processes slots **one at a time** by temporarily
/// taking each slot's callback (via `Option::take`) under a write lock,
/// leaving the handle **in** the HashMap so that `disconnect(own_handle)`
/// can find and remove it. The callback is invoked **outside** the lock,
/// and if the handle still exists afterward (i.e., was not self-disconnected),
/// the callback is restored. Once-slots are removed unconditionally after
/// invocation. Callbacks may safely call `connect`, `disconnect`,
/// `disconnect_all`, `block`, `unblock`, or `emit` on **the same Signal**
/// without deadlocking. Self-disconnect from within a callback is now
/// correctly honored and does not get undone by a stale re-insertion.
///
/// Slots are invoked in priority order (High → Normal → Low).
/// Blocked slots are skipped entirely.
pub fn emit(&self, value: T) {
let arc_value = Arc::new(value);
// 1. Snapshot handles and priorities under a read lock.
let snapshot: Vec<(ConnectionHandle, Priority)> = {
let slots = self.inner.slots.read().expect("signal lock poisoned");
slots.iter().map(|(h, e)| (*h, e.priority)).collect()
};
// 2. Sort by priority (High first).
let mut snapshot = snapshot;
snapshot.sort_by_key(|a| a.1.rank());
// 3. Process each slot individually against the real HashMap.
// The callback is temporarily taken (via Option::take) under a write
// lock, leaving the handle in the map so that if the callback calls
// `disconnect(own_handle)`, the disconnect can find and remove the
// handle. After invocation, if the handle still exists in the map
// (i.e., was not self-disconnected), the callback is restored.
// Once-slots are removed unconditionally after invocation.
for (handle, _priority) in snapshot {
// Temporarily take the callback under a write lock, leaving None.
// The handle stays in the HashMap so disconnect() can find it.
let taken = {
let mut slots = self.inner.slots.write().expect("signal lock poisoned");
if let Some(entry) = slots.get_mut(&handle) {
if entry.blocked {
None
} else {
entry.callback.take()
}
} else {
// Handle was disconnected by a prior callback in this emit loop.
None
}
};
if let Some(mut callback) = taken {
callback(arc_value.clone());
// After callback: if it was a once-slot, remove the entry.
// Otherwise, re-install the callback only if the handle still
// exists (i.e., the callback did not call disconnect on itself).
let mut slots = self.inner.slots.write().expect("signal lock poisoned");
if let Some(entry) = slots.get_mut(&handle) {
if entry.once {
// Once-slot: remove the entry entirely.
slots.remove(&handle);
} else {
// Non-once, still connected: restore the callback.
entry.callback = Some(callback);
}
}
// If handle was removed by self-disconnect, callback is dropped.
}
}
}
/// Return number of currently connected slots.
pub fn slot_count(&self) -> usize {
self.inner.slots.read().expect("signal lock poisoned").len()
}
fn track_owner(&self, owner: &ConnectionScope, handle: ConnectionHandle) {
let weak = Arc::downgrade(&self.inner);
owner.track(Box::new(move || {
if let Some(inner) = weak.upgrade() {
let _ = inner.disconnect(handle);
}
}));
}
}
impl<T: Clone + Send + 'static> Default for Signal<T> {
fn default() -> Self {
Self::new()
}
}