1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
//! Local wait registry for `QUEUE READ … WAIT <duration>` (PRD #718, slice C).
//!
//! Keyed by `(scope, queue)`. Each slot holds a parking_lot::Condvar plus
//! a monotonic generation counter; waiters snapshot the generation, do a
//! second non-blocking probe, then park on the condvar until either a
//! notify bumps the generation, the timeout elapses, or `cancel_all`
//! sets the shutdown flag.
//!
//! Wake-all semantics (first cut): every notify wakes every waiter on
//! the slot. Normal delivery arbitration decides winners — losers
//! re-wait or time out. This is intentionally simple; targeted wake
//! lands in a later slice once arbitration is observable.
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::{Condvar, Mutex};
/// What happened to a waiter when it returned from `wait`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitOutcome {
/// A producer notified this `(scope, queue)` slot — the caller
/// should re-probe the queue for available work.
Woken,
/// The caller's wait budget elapsed without a notify.
Timeout,
/// The registry was cancelled (shutdown). Surfaces as an explicit
/// cancellation error to the caller.
Cancelled,
}
/// One slot per `(scope, queue)`. The mutex guards the generation
/// counter; the condvar is the parking primitive. Slots are reused
/// across waiters — once registered they live for the runtime's
/// lifetime (the set is bounded by the number of distinct queues).
struct Slot {
state: Mutex<u64>,
cond: Condvar,
}
impl Slot {
fn new() -> Self {
Self {
state: Mutex::new(0),
cond: Condvar::new(),
}
}
}
pub struct QueueWaitRegistry {
slots: Mutex<HashMap<(String, String), Arc<Slot>>>,
cancelled: AtomicBool,
/// Cleared once at construction; bumped on `cancel_all`. Waiters
/// re-check after every wake to honour cancellation independently
/// of which slot they were parked on.
cancel_cond: Condvar,
cancel_mu: Mutex<()>,
}
impl Default for QueueWaitRegistry {
fn default() -> Self {
Self::new()
}
}
impl QueueWaitRegistry {
pub fn new() -> Self {
Self {
slots: Mutex::new(HashMap::new()),
cancelled: AtomicBool::new(false),
cancel_cond: Condvar::new(),
cancel_mu: Mutex::new(()),
}
}
/// Returns the current cancellation flag without resetting it.
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
/// Drop the shutdown flag back to false. Used by tests that share
/// a process-wide registry instance across cases.
pub fn reset_cancelled(&self) {
self.cancelled.store(false, Ordering::Release);
}
fn slot(&self, scope: &str, queue: &str) -> Arc<Slot> {
let mut slots = self.slots.lock();
if let Some(existing) = slots.get(&(scope.to_string(), queue.to_string())) {
return existing.clone();
}
let fresh = Arc::new(Slot::new());
slots.insert((scope.to_string(), queue.to_string()), fresh.clone());
fresh
}
/// Snapshot the current generation for `(scope, queue)`. Callers
/// take this BEFORE their second non-blocking probe so a notify
/// that fires between the probe and `wait_until` bumps the
/// generation and the condvar wait returns immediately
/// (lost-wake-free).
pub fn snapshot(&self, scope: &str, queue: &str) -> Snapshot {
let slot = self.slot(scope, queue);
let gen = *slot.state.lock();
Snapshot { slot, gen }
}
/// Park on the snapshot's slot until the generation moves past
/// `snapshot.gen`, the deadline elapses, or `cancel_all` fires.
pub fn wait_until(&self, snapshot: &Snapshot, deadline: Instant) -> WaitOutcome {
if self.is_cancelled() {
return WaitOutcome::Cancelled;
}
let mut guard = snapshot.slot.state.lock();
loop {
if self.is_cancelled() {
return WaitOutcome::Cancelled;
}
if *guard != snapshot.gen {
return WaitOutcome::Woken;
}
let now = Instant::now();
if now >= deadline {
return WaitOutcome::Timeout;
}
let remaining = deadline - now;
let result = snapshot.slot.cond.wait_for(&mut guard, remaining);
if self.is_cancelled() {
return WaitOutcome::Cancelled;
}
if *guard != snapshot.gen {
return WaitOutcome::Woken;
}
if result.timed_out() && Instant::now() >= deadline {
return WaitOutcome::Timeout;
}
}
}
/// Bump the generation on `(scope, queue)` and wake every parked
/// waiter. Idempotent — a slot with no waiters just bumps the
/// generation, which is correct (next waiter that arrives before
/// snapshot still sees a fresh starting point).
pub fn notify(&self, scope: &str, queue: &str) {
let slot = self.slot(scope, queue);
let mut guard = slot.state.lock();
*guard = guard.wrapping_add(1);
drop(guard);
slot.cond.notify_all();
}
/// Shutdown drain: set the cancellation flag and wake every slot's
/// condvar so parked waiters return `Cancelled` immediately.
pub fn cancel_all(&self) {
self.cancelled.store(true, Ordering::Release);
let slots = self.slots.lock();
for slot in slots.values() {
let _g = slot.state.lock();
slot.cond.notify_all();
}
drop(slots);
let _g = self.cancel_mu.lock();
self.cancel_cond.notify_all();
}
}
/// Opaque token captured before the second non-blocking probe. Holding
/// onto the slot keeps it alive even if the registry is dropped
/// between operations (which doesn't happen in production but keeps
/// tests safe).
pub struct Snapshot {
slot: Arc<Slot>,
gen: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn notify_wakes_parked_waiter() {
let reg = Arc::new(QueueWaitRegistry::new());
let snap = reg.snapshot("default", "q");
let reg_clone = reg.clone();
let t = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
reg_clone.notify("default", "q");
});
let outcome = reg.wait_until(&snap, Instant::now() + Duration::from_secs(2));
t.join().unwrap();
assert_eq!(outcome, WaitOutcome::Woken);
}
#[test]
fn timeout_returns_when_no_notify() {
let reg = QueueWaitRegistry::new();
let snap = reg.snapshot("default", "q");
let start = Instant::now();
let outcome = reg.wait_until(&snap, start + Duration::from_millis(120));
assert_eq!(outcome, WaitOutcome::Timeout);
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100), "elapsed={elapsed:?}");
}
#[test]
fn cancel_returns_cancelled_to_parked_waiters() {
let reg = Arc::new(QueueWaitRegistry::new());
let snap = reg.snapshot("default", "q");
let reg_clone = reg.clone();
let t = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
reg_clone.cancel_all();
});
let outcome = reg.wait_until(&snap, Instant::now() + Duration::from_secs(5));
t.join().unwrap();
assert_eq!(outcome, WaitOutcome::Cancelled);
}
#[test]
fn notify_before_wait_is_observed_through_generation() {
let reg = QueueWaitRegistry::new();
let snap = reg.snapshot("default", "q");
// notify fires BEFORE wait_until — the generation bump must
// still make wait_until return Woken without parking.
reg.notify("default", "q");
let outcome = reg.wait_until(&snap, Instant::now() + Duration::from_secs(5));
assert_eq!(outcome, WaitOutcome::Woken);
}
#[test]
fn notify_on_unrelated_queue_does_not_wake() {
let reg = QueueWaitRegistry::new();
let snap = reg.snapshot("default", "q1");
reg.notify("default", "q2");
let outcome = reg.wait_until(&snap, Instant::now() + Duration::from_millis(60));
assert_eq!(outcome, WaitOutcome::Timeout);
}
#[test]
fn wake_all_releases_every_parked_waiter() {
let reg = Arc::new(QueueWaitRegistry::new());
let mut handles = Vec::new();
for _ in 0..5 {
let reg = reg.clone();
handles.push(thread::spawn(move || {
let snap = reg.snapshot("default", "q");
reg.wait_until(&snap, Instant::now() + Duration::from_secs(2))
}));
}
thread::sleep(Duration::from_millis(80));
reg.notify("default", "q");
for h in handles {
assert_eq!(h.join().unwrap(), WaitOutcome::Woken);
}
}
}