Skip to main content

ai_agent/bridge/
flush_gate.rs

1//! State machine for gating message writes during an initial flush.
2//!
3//! Translated from openclaudecode/src/bridge/flushGate.ts
4//!
5//! When a bridge session starts, historical messages are flushed to the
6//! server via a single HTTP POST. During that flush, new messages must
7//! be queued to prevent them from arriving at the server interleaved
8//! with the historical messages.
9//!
10//! Lifecycle:
11//!   start() -> enqueue() returns true, items are queued
12//!   end()   -> returns queued items for draining, enqueue() returns false
13//!   drop()  -> discards queued items (permanent transport close)
14//!   deactivate() -> clears active flag without dropping items
15//!                   (transport replacement - new transport will drain)
16
17use std::mem;
18
19/// FlushGate is a state machine for gating message writes during an initial flush.
20#[derive(Debug, Clone)]
21pub struct FlushGate<T> {
22    active: bool,
23    pending: Vec<T>,
24}
25
26impl<T> Default for FlushGate<T> {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl<T> FlushGate<T> {
33    /// Create a new FlushGate.
34    pub fn new() -> Self {
35        Self {
36            active: false,
37            pending: Vec::new(),
38        }
39    }
40
41    /// Whether the flush is currently active.
42    pub fn active(&self) -> bool {
43        self.active
44    }
45
46    /// Number of pending items in the queue.
47    pub fn pending_count(&self) -> usize {
48        self.pending.len()
49    }
50
51    /// Mark flush as in-progress. enqueue() will start queuing items.
52    pub fn start(&mut self) {
53        self.active = true;
54    }
55
56    /// End the flush and return any queued items for draining.
57    /// Caller is responsible for sending the returned items.
58    pub fn end(&mut self) -> Vec<T> {
59        self.active = false;
60        mem::take(&mut self.pending)
61    }
62
63    /// If flush is active, queue the items and return true.
64    /// If flush is not active, return false (caller should send directly).
65    pub fn enqueue(&mut self, items: Vec<T>) -> bool {
66        if !self.active {
67            return false;
68        }
69        self.pending.extend(items);
70        true
71    }
72
73    /// Enqueue a single item.
74    pub fn enqueue_one(&mut self, item: T) -> bool {
75        if !self.active {
76            return false;
77        }
78        self.pending.push(item);
79        true
80    }
81
82    /// Discard all queued items (permanent transport close).
83    /// Returns the number of items dropped.
84    pub fn drop(&mut self) -> usize {
85        self.active = false;
86        let count = self.pending.len();
87        self.pending.clear();
88        count
89    }
90
91    /// Clear the active flag without dropping queued items.
92    /// Used when the transport is replaced - the new
93    /// transport's flush will drain the pending items.
94    pub fn deactivate(&mut self) {
95        self.active = false;
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_flush_gate_basic() {
105        let mut gate: FlushGate<i32> = FlushGate::new();
106
107        // Initially not active
108        assert!(!gate.active());
109        assert!(!gate.enqueue_one(1));
110
111        // Start flush
112        gate.start();
113        assert!(gate.active());
114        assert!(gate.enqueue_one(1));
115        assert!(gate.enqueue(vec![2, 3]));
116        assert_eq!(gate.pending_count(), 3);
117
118        // End flush
119        let items = gate.end();
120        assert!(!gate.active());
121        assert_eq!(items, vec![1, 2, 3]);
122        assert_eq!(gate.pending_count(), 0);
123
124        // After end, enqueue should return false
125        assert!(!gate.enqueue_one(4));
126    }
127
128    #[test]
129    fn test_flush_gate_drop() {
130        let mut gate: FlushGate<i32> = FlushGate::new();
131        gate.start();
132        gate.enqueue(vec![1, 2, 3]);
133
134        let dropped = gate.drop();
135        assert_eq!(dropped, 3);
136        assert!(!gate.active());
137        assert_eq!(gate.pending_count(), 0);
138    }
139
140    #[test]
141    fn test_flush_gate_deactivate() {
142        let mut gate: FlushGate<i32> = FlushGate::new();
143        gate.start();
144        gate.enqueue(vec![1, 2, 3]);
145
146        gate.deactivate();
147        assert!(!gate.active());
148        // Pending items should remain
149        assert_eq!(gate.pending_count(), 3);
150    }
151}