1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//! State machine for gating message writes during an initial flush.
//!
//! Translated from openclaudecode/src/bridge/flushGate.ts
//!
//! When a bridge session starts, historical messages are flushed to the
//! server via a single HTTP POST. During that flush, new messages must
//! be queued to prevent them from arriving at the server interleaved
//! with the historical messages.
//!
//! Lifecycle:
//! start() -> enqueue() returns true, items are queued
//! end() -> returns queued items for draining, enqueue() returns false
//! drop() -> discards queued items (permanent transport close)
//! deactivate() -> clears active flag without dropping items
//! (transport replacement - new transport will drain)
use std::mem;
/// FlushGate is a state machine for gating message writes during an initial flush.
#[derive(Debug, Clone)]
pub struct FlushGate<T> {
active: bool,
pending: Vec<T>,
}
impl<T> Default for FlushGate<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> FlushGate<T> {
/// Create a new FlushGate.
pub fn new() -> Self {
Self {
active: false,
pending: Vec::new(),
}
}
/// Whether the flush is currently active.
pub fn active(&self) -> bool {
self.active
}
/// Number of pending items in the queue.
pub fn pending_count(&self) -> usize {
self.pending.len()
}
/// Mark flush as in-progress. enqueue() will start queuing items.
pub fn start(&mut self) {
self.active = true;
}
/// End the flush and return any queued items for draining.
/// Caller is responsible for sending the returned items.
pub fn end(&mut self) -> Vec<T> {
self.active = false;
mem::take(&mut self.pending)
}
/// If flush is active, queue the items and return true.
/// If flush is not active, return false (caller should send directly).
pub fn enqueue(&mut self, items: Vec<T>) -> bool {
if !self.active {
return false;
}
self.pending.extend(items);
true
}
/// Enqueue a single item.
pub fn enqueue_one(&mut self, item: T) -> bool {
if !self.active {
return false;
}
self.pending.push(item);
true
}
/// Discard all queued items (permanent transport close).
/// Returns the number of items dropped.
pub fn drop(&mut self) -> usize {
self.active = false;
let count = self.pending.len();
self.pending.clear();
count
}
/// Clear the active flag without dropping queued items.
/// Used when the transport is replaced - the new
/// transport's flush will drain the pending items.
pub fn deactivate(&mut self) {
self.active = false;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_flush_gate_basic() {
let mut gate: FlushGate<i32> = FlushGate::new();
// Initially not active
assert!(!gate.active());
assert!(!gate.enqueue_one(1));
// Start flush
gate.start();
assert!(gate.active());
assert!(gate.enqueue_one(1));
assert!(gate.enqueue(vec![2, 3]));
assert_eq!(gate.pending_count(), 3);
// End flush
let items = gate.end();
assert!(!gate.active());
assert_eq!(items, vec![1, 2, 3]);
assert_eq!(gate.pending_count(), 0);
// After end, enqueue should return false
assert!(!gate.enqueue_one(4));
}
#[test]
fn test_flush_gate_drop() {
let mut gate: FlushGate<i32> = FlushGate::new();
gate.start();
gate.enqueue(vec![1, 2, 3]);
let dropped = gate.drop();
assert_eq!(dropped, 3);
assert!(!gate.active());
assert_eq!(gate.pending_count(), 0);
}
#[test]
fn test_flush_gate_deactivate() {
let mut gate: FlushGate<i32> = FlushGate::new();
gate.start();
gate.enqueue(vec![1, 2, 3]);
gate.deactivate();
assert!(!gate.active());
// Pending items should remain
assert_eq!(gate.pending_count(), 3);
}
}