1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! Shared capacity-wake primitive for bridge poll loops.
//!
//! Translated from openclaudecode/src/bridge/capacityWake.ts
//!
//! Both replBridge.ts and bridgeMain.ts need to sleep while "at capacity"
//! but wake early when either (a) the outer loop signal aborts (shutdown),
//! or (b) capacity frees up (session done / transport lost). This module
//! encapsulates the mutable wake-controller + two-signal merger that both
//! poll loops previously duplicated byte-for-byte.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
/// Capacity signal with cleanup
pub struct CapacitySignal {
pub signal: Arc<AtomicBool>,
pub cleanup: Box<dyn Fn() + Send + Sync>,
}
/// Capacity wake controller
pub struct CapacityWake {
/// Create a signal that aborts when either the outer loop signal or the
/// capacity-wake controller fires. Returns the merged signal and a cleanup
/// function that removes listeners when the sleep resolves normally
/// (without abort).
signal: Arc<AtomicBool>,
/// Abort the current at-capacity sleep and arm a fresh controller so the
/// poll loop immediately re-checks for new work.
wake: Arc<AtomicBool>,
/// Outer signal (from the loop that owns this capacity wake)
outer_signal: Arc<AtomicBool>,
/// Whether the current signal has been armed
armed: Arc<AtomicBool>,
}
impl CapacityWake {
/// Create a new capacity wake with the given outer signal.
pub fn new(outer_signal: Arc<AtomicBool>) -> Self {
Self {
signal: Arc::new(AtomicBool::new(false)),
wake: Arc::new(AtomicBool::new(false)),
outer_signal,
armed: Arc::new(AtomicBool::new(false)),
}
}
/// Get the capacity signal. When triggered (from either outer abort
/// or wake() call), the poll loop should re-check for work.
pub fn get_signal(&self) -> CapacitySignal {
// Check if already triggered
let triggered =
self.outer_signal.load(Ordering::SeqCst) || self.wake.load(Ordering::SeqCst);
// Reset wake flag after reading (so subsequent calls return untriggered)
if self.wake.load(Ordering::SeqCst) {
self.wake.store(false, Ordering::SeqCst);
}
// Arm the signal
self.armed.store(true, Ordering::SeqCst);
// Reset signal for next wait cycle
self.signal.store(false, Ordering::SeqCst);
CapacitySignal {
signal: if triggered {
Arc::new(AtomicBool::new(true))
} else {
Arc::clone(&self.signal)
},
cleanup: Box::new(move || {
// Cleanup is handled by drop
}),
}
}
/// Wake up the capacity wait. This causes get_signal() to return
/// a triggered signal, and re-arms for the next wait.
pub fn wake(&self) {
// Set wake trigger - get_signal() will check this and return triggered
self.wake.store(true, Ordering::SeqCst);
}
/// Check if the outer signal has been triggered
pub fn is_outer_aborted(&self) -> bool {
self.outer_signal.load(Ordering::SeqCst)
}
}
/// Create a capacity wake primitive for bridge poll loops.
pub fn create_capacity_wake(outer_signal: Arc<AtomicBool>) -> CapacityWake {
CapacityWake::new(outer_signal)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_capacity_wake_basic() {
let outer = Arc::new(AtomicBool::new(false));
let wake = create_capacity_wake(Arc::clone(&outer));
// Initially not triggered
let signal = wake.get_signal();
assert!(!signal.signal.load(Ordering::SeqCst));
// Wake should trigger
wake.wake();
let signal2 = wake.get_signal();
// After wake, next signal should be triggered
assert!(signal2.signal.load(Ordering::SeqCst));
}
#[test]
fn test_capacity_wake_outer_abort() {
let outer = Arc::new(AtomicBool::new(false));
let wake = create_capacity_wake(Arc::clone(&outer));
// Trigger outer
outer.store(true, Ordering::SeqCst);
// Should return triggered signal
let signal = wake.get_signal();
assert!(signal.signal.load(Ordering::SeqCst));
}
}