Skip to main content

ai_agent/bridge/
capacity_wake.rs

1//! Shared capacity-wake primitive for bridge poll loops.
2//!
3//! Translated from openclaudecode/src/bridge/capacityWake.ts
4//!
5//! Both replBridge.ts and bridgeMain.ts need to sleep while "at capacity"
6//! but wake early when either (a) the outer loop signal aborts (shutdown),
7//! or (b) capacity frees up (session done / transport lost). This module
8//! encapsulates the mutable wake-controller + two-signal merger that both
9//! poll loops previously duplicated byte-for-byte.
10
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14/// Capacity signal with cleanup
15pub struct CapacitySignal {
16    pub signal: Arc<AtomicBool>,
17    pub cleanup: Box<dyn Fn() + Send + Sync>,
18}
19
20/// Capacity wake controller
21pub struct CapacityWake {
22    /// Create a signal that aborts when either the outer loop signal or the
23    /// capacity-wake controller fires. Returns the merged signal and a cleanup
24    /// function that removes listeners when the sleep resolves normally
25    /// (without abort).
26    signal: Arc<AtomicBool>,
27    /// Abort the current at-capacity sleep and arm a fresh controller so the
28    /// poll loop immediately re-checks for new work.
29    wake: Arc<AtomicBool>,
30    /// Outer signal (from the loop that owns this capacity wake)
31    outer_signal: Arc<AtomicBool>,
32    /// Whether the current signal has been armed
33    armed: Arc<AtomicBool>,
34}
35
36impl CapacityWake {
37    /// Create a new capacity wake with the given outer signal.
38    pub fn new(outer_signal: Arc<AtomicBool>) -> Self {
39        Self {
40            signal: Arc::new(AtomicBool::new(false)),
41            wake: Arc::new(AtomicBool::new(false)),
42            outer_signal,
43            armed: Arc::new(AtomicBool::new(false)),
44        }
45    }
46
47    /// Get the capacity signal. When triggered (from either outer abort
48    /// or wake() call), the poll loop should re-check for work.
49    pub fn get_signal(&self) -> CapacitySignal {
50        // Check if already triggered
51        let triggered =
52            self.outer_signal.load(Ordering::SeqCst) || self.wake.load(Ordering::SeqCst);
53
54        // Reset wake flag after reading (so subsequent calls return untriggered)
55        if self.wake.load(Ordering::SeqCst) {
56            self.wake.store(false, Ordering::SeqCst);
57        }
58
59        // Arm the signal
60        self.armed.store(true, Ordering::SeqCst);
61        // Reset signal for next wait cycle
62        self.signal.store(false, Ordering::SeqCst);
63
64        CapacitySignal {
65            signal: if triggered {
66                Arc::new(AtomicBool::new(true))
67            } else {
68                Arc::clone(&self.signal)
69            },
70            cleanup: Box::new(move || {
71                // Cleanup is handled by drop
72            }),
73        }
74    }
75
76    /// Wake up the capacity wait. This causes get_signal() to return
77    /// a triggered signal, and re-arms for the next wait.
78    pub fn wake(&self) {
79        // Set wake trigger - get_signal() will check this and return triggered
80        self.wake.store(true, Ordering::SeqCst);
81    }
82
83    /// Check if the outer signal has been triggered
84    pub fn is_outer_aborted(&self) -> bool {
85        self.outer_signal.load(Ordering::SeqCst)
86    }
87}
88
89/// Create a capacity wake primitive for bridge poll loops.
90pub fn create_capacity_wake(outer_signal: Arc<AtomicBool>) -> CapacityWake {
91    CapacityWake::new(outer_signal)
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    #[test]
99    fn test_capacity_wake_basic() {
100        let outer = Arc::new(AtomicBool::new(false));
101        let wake = create_capacity_wake(Arc::clone(&outer));
102
103        // Initially not triggered
104        let signal = wake.get_signal();
105        assert!(!signal.signal.load(Ordering::SeqCst));
106
107        // Wake should trigger
108        wake.wake();
109        let signal2 = wake.get_signal();
110        // After wake, next signal should be triggered
111        assert!(signal2.signal.load(Ordering::SeqCst));
112    }
113
114    #[test]
115    fn test_capacity_wake_outer_abort() {
116        let outer = Arc::new(AtomicBool::new(false));
117        let wake = create_capacity_wake(Arc::clone(&outer));
118
119        // Trigger outer
120        outer.store(true, Ordering::SeqCst);
121
122        // Should return triggered signal
123        let signal = wake.get_signal();
124        assert!(signal.signal.load(Ordering::SeqCst));
125    }
126}