ai_agent/bridge/capacity_wake.rs
1//! Shared capacity-wake primitive for bridge poll loops.
2//!
3//! Translated from openclaudecode/src/bridge/capacityWake.ts
4//!
5//! Both replBridge.ts and bridgeMain.ts need to sleep while "at capacity"
6//! but wake early when either (a) the outer loop signal aborts (shutdown),
7//! or (b) capacity frees up (session done / transport lost). This module
8//! encapsulates the mutable wake-controller + two-signal merger that both
9//! poll loops previously duplicated byte-for-byte.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14/// Capacity signal with cleanup
15pub struct CapacitySignal {
16 pub signal: Arc<AtomicBool>,
17 pub cleanup: Box<dyn Fn() + Send + Sync>,
18}
19
20/// Capacity wake controller
21pub struct CapacityWake {
22 /// Create a signal that aborts when either the outer loop signal or the
23 /// capacity-wake controller fires. Returns the merged signal and a cleanup
24 /// function that removes listeners when the sleep resolves normally
25 /// (without abort).
26 signal: Arc<AtomicBool>,
27 /// Abort the current at-capacity sleep and arm a fresh controller so the
28 /// poll loop immediately re-checks for new work.
29 wake: Arc<AtomicBool>,
30 /// Outer signal (from the loop that owns this capacity wake)
31 outer_signal: Arc<AtomicBool>,
32 /// Whether the current signal has been armed
33 armed: Arc<AtomicBool>,
34}
35
36impl CapacityWake {
37 /// Create a new capacity wake with the given outer signal.
38 pub fn new(outer_signal: Arc<AtomicBool>) -> Self {
39 Self {
40 signal: Arc::new(AtomicBool::new(false)),
41 wake: Arc::new(AtomicBool::new(false)),
42 outer_signal,
43 armed: Arc::new(AtomicBool::new(false)),
44 }
45 }
46
47 /// Get the capacity signal. When triggered (from either outer abort
48 /// or wake() call), the poll loop should re-check for work.
49 pub fn get_signal(&self) -> CapacitySignal {
50 // Check if already triggered
51 let triggered =
52 self.outer_signal.load(Ordering::SeqCst) || self.wake.load(Ordering::SeqCst);
53
54 // Reset wake flag after reading (so subsequent calls return untriggered)
55 if self.wake.load(Ordering::SeqCst) {
56 self.wake.store(false, Ordering::SeqCst);
57 }
58
59 // Arm the signal
60 self.armed.store(true, Ordering::SeqCst);
61 // Reset signal for next wait cycle
62 self.signal.store(false, Ordering::SeqCst);
63
64 CapacitySignal {
65 signal: if triggered {
66 Arc::new(AtomicBool::new(true))
67 } else {
68 Arc::clone(&self.signal)
69 },
70 cleanup: Box::new(move || {
71 // Cleanup is handled by drop
72 }),
73 }
74 }
75
76 /// Wake up the capacity wait. This causes get_signal() to return
77 /// a triggered signal, and re-arms for the next wait.
78 pub fn wake(&self) {
79 // Set wake trigger - get_signal() will check this and return triggered
80 self.wake.store(true, Ordering::SeqCst);
81 }
82
83 /// Check if the outer signal has been triggered
84 pub fn is_outer_aborted(&self) -> bool {
85 self.outer_signal.load(Ordering::SeqCst)
86 }
87}
88
89/// Create a capacity wake primitive for bridge poll loops.
90pub fn create_capacity_wake(outer_signal: Arc<AtomicBool>) -> CapacityWake {
91 CapacityWake::new(outer_signal)
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97
98 #[test]
99 fn test_capacity_wake_basic() {
100 let outer = Arc::new(AtomicBool::new(false));
101 let wake = create_capacity_wake(Arc::clone(&outer));
102
103 // Initially not triggered
104 let signal = wake.get_signal();
105 assert!(!signal.signal.load(Ordering::SeqCst));
106
107 // Wake should trigger
108 wake.wake();
109 let signal2 = wake.get_signal();
110 // After wake, next signal should be triggered
111 assert!(signal2.signal.load(Ordering::SeqCst));
112 }
113
114 #[test]
115 fn test_capacity_wake_outer_abort() {
116 let outer = Arc::new(AtomicBool::new(false));
117 let wake = create_capacity_wake(Arc::clone(&outer));
118
119 // Trigger outer
120 outer.store(true, Ordering::SeqCst);
121
122 // Should return triggered signal
123 let signal = wake.get_signal();
124 assert!(signal.signal.load(Ordering::SeqCst));
125 }
126}