ai_agent/bridge/flush_gate.rs
1//! State machine for gating message writes during an initial flush.
2//!
3//! Translated from openclaudecode/src/bridge/flushGate.ts
4//!
5//! When a bridge session starts, historical messages are flushed to the
6//! server via a single HTTP POST. During that flush, new messages must
7//! be queued to prevent them from arriving at the server interleaved
8//! with the historical messages.
9//!
10//! Lifecycle:
11//! start() -> enqueue() returns true, items are queued
12//! end() -> returns queued items for draining, enqueue() returns false
13//! drop() -> discards queued items (permanent transport close)
14//! deactivate() -> clears active flag without dropping items
15//! (transport replacement - new transport will drain)
16
17use std::mem;
18
19/// FlushGate is a state machine for gating message writes during an initial flush.
20#[derive(Debug, Clone)]
21pub struct FlushGate<T> {
22 active: bool,
23 pending: Vec<T>,
24}
25
26impl<T> Default for FlushGate<T> {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl<T> FlushGate<T> {
33 /// Create a new FlushGate.
34 pub fn new() -> Self {
35 Self {
36 active: false,
37 pending: Vec::new(),
38 }
39 }
40
41 /// Whether the flush is currently active.
42 pub fn active(&self) -> bool {
43 self.active
44 }
45
46 /// Number of pending items in the queue.
47 pub fn pending_count(&self) -> usize {
48 self.pending.len()
49 }
50
51 /// Mark flush as in-progress. enqueue() will start queuing items.
52 pub fn start(&mut self) {
53 self.active = true;
54 }
55
56 /// End the flush and return any queued items for draining.
57 /// Caller is responsible for sending the returned items.
58 pub fn end(&mut self) -> Vec<T> {
59 self.active = false;
60 mem::take(&mut self.pending)
61 }
62
63 /// If flush is active, queue the items and return true.
64 /// If flush is not active, return false (caller should send directly).
65 pub fn enqueue(&mut self, items: Vec<T>) -> bool {
66 if !self.active {
67 return false;
68 }
69 self.pending.extend(items);
70 true
71 }
72
73 /// Enqueue a single item.
74 pub fn enqueue_one(&mut self, item: T) -> bool {
75 if !self.active {
76 return false;
77 }
78 self.pending.push(item);
79 true
80 }
81
82 /// Discard all queued items (permanent transport close).
83 /// Returns the number of items dropped.
84 pub fn drop(&mut self) -> usize {
85 self.active = false;
86 let count = self.pending.len();
87 self.pending.clear();
88 count
89 }
90
91 /// Clear the active flag without dropping queued items.
92 /// Used when the transport is replaced - the new
93 /// transport's flush will drain the pending items.
94 pub fn deactivate(&mut self) {
95 self.active = false;
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_flush_gate_basic() {
105 let mut gate: FlushGate<i32> = FlushGate::new();
106
107 // Initially not active
108 assert!(!gate.active());
109 assert!(!gate.enqueue_one(1));
110
111 // Start flush
112 gate.start();
113 assert!(gate.active());
114 assert!(gate.enqueue_one(1));
115 assert!(gate.enqueue(vec![2, 3]));
116 assert_eq!(gate.pending_count(), 3);
117
118 // End flush
119 let items = gate.end();
120 assert!(!gate.active());
121 assert_eq!(items, vec![1, 2, 3]);
122 assert_eq!(gate.pending_count(), 0);
123
124 // After end, enqueue should return false
125 assert!(!gate.enqueue_one(4));
126 }
127
128 #[test]
129 fn test_flush_gate_drop() {
130 let mut gate: FlushGate<i32> = FlushGate::new();
131 gate.start();
132 gate.enqueue(vec![1, 2, 3]);
133
134 let dropped = gate.drop();
135 assert_eq!(dropped, 3);
136 assert!(!gate.active());
137 assert_eq!(gate.pending_count(), 0);
138 }
139
140 #[test]
141 fn test_flush_gate_deactivate() {
142 let mut gate: FlushGate<i32> = FlushGate::new();
143 gate.start();
144 gate.enqueue(vec![1, 2, 3]);
145
146 gate.deactivate();
147 assert!(!gate.active());
148 // Pending items should remain
149 assert_eq!(gate.pending_count(), 3);
150 }
151}