Skip to main content

limit_cli/tui/
input_queue.rs

1// Input queue system for managing messages during async operations
2
3use std::collections::VecDeque;
4
5const DEFAULT_MAX_QUEUED_MESSAGES: usize = 50;
6const DEFAULT_MAX_PENDING_STEERS: usize = 10;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub struct QueueConfig {
10    pub max_queued_messages: usize,
11    pub max_pending_steers: usize,
12}
13
14impl Default for QueueConfig {
15    fn default() -> Self {
16        Self {
17            max_queued_messages: DEFAULT_MAX_QUEUED_MESSAGES,
18            max_pending_steers: DEFAULT_MAX_PENDING_STEERS,
19        }
20    }
21}
22
23#[derive(Debug, Clone, PartialEq)]
24pub struct QueuedMessage {
25    pub text: String,
26    pub is_steer: bool,
27}
28
29impl QueuedMessage {
30    pub fn new(text: String) -> Self {
31        Self {
32            text,
33            is_steer: false,
34        }
35    }
36
37    pub fn new_steer(text: String) -> Self {
38        Self {
39            text,
40            is_steer: true,
41        }
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct ThreadInputState {
47    pub queued_messages: Vec<QueuedMessage>,
48    pub pending_steers: Vec<QueuedMessage>,
49    pub submit_after_interrupt: bool,
50    pub suppress_autosend: bool,
51}
52
53impl ThreadInputState {
54    pub fn has_content(&self) -> bool {
55        !self.queued_messages.is_empty() || !self.pending_steers.is_empty()
56    }
57}
58
59/// Manages input messages during async operations
60#[derive(Debug, Clone)]
61pub struct InputQueue {
62    queued_messages: VecDeque<QueuedMessage>,
63    pending_steers: VecDeque<QueuedMessage>,
64    submit_pending_steers_after_interrupt: bool,
65    suppress_autosend: bool,
66    config: QueueConfig,
67}
68
69impl Default for InputQueue {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl InputQueue {
76    pub fn new() -> Self {
77        Self::with_config(QueueConfig::default())
78    }
79
80    pub fn with_config(config: QueueConfig) -> Self {
81        Self {
82            queued_messages: VecDeque::with_capacity(config.max_queued_messages),
83            pending_steers: VecDeque::with_capacity(config.max_pending_steers),
84            submit_pending_steers_after_interrupt: false,
85            suppress_autosend: false,
86            config,
87        }
88    }
89
90    pub fn queue_message(&mut self, text: String) {
91        if self.queued_messages.len() >= self.config.max_queued_messages {
92            self.queued_messages.pop_front();
93        }
94        self.queued_messages.push_back(QueuedMessage::new(text));
95    }
96
97    pub fn add_steer(&mut self, text: String) {
98        if self.pending_steers.len() >= self.config.max_pending_steers {
99            self.pending_steers.pop_front();
100        }
101        self.pending_steers
102            .push_back(QueuedMessage::new_steer(text));
103    }
104
105    /// Check if there are any queued messages
106    pub fn has_queued_messages(&self) -> bool {
107        !self.queued_messages.is_empty()
108    }
109
110    /// Check if there are any pending steers
111    pub fn has_pending_steers(&self) -> bool {
112        !self.pending_steers.is_empty()
113    }
114
115    /// Check if queue is empty (no queued or pending messages)
116    pub fn is_empty(&self) -> bool {
117        self.queued_messages.is_empty() && self.pending_steers.is_empty()
118    }
119
120    /// Get number of queued messages
121    pub fn queued_count(&self) -> usize {
122        self.queued_messages.len()
123    }
124
125    /// Get number of pending steers
126    pub fn steer_count(&self) -> usize {
127        self.pending_steers.len()
128    }
129
130    /// Pop the next queued message (FIFO)
131    pub fn pop_queued(&mut self) -> Option<QueuedMessage> {
132        self.queued_messages.pop_front()
133    }
134
135    /// Get all queued message texts for preview
136    pub fn queued_texts(&self) -> Vec<String> {
137        self.queued_messages
138            .iter()
139            .map(|m| m.text.clone())
140            .collect()
141    }
142
143    /// Get all pending steer texts for preview
144    pub fn steer_texts(&self) -> Vec<String> {
145        self.pending_steers.iter().map(|m| m.text.clone()).collect()
146    }
147
148    /// Drain all pending steers
149    pub fn drain_steers(&mut self) -> Vec<QueuedMessage> {
150        self.pending_steers.drain(..).collect()
151    }
152
153    /// Drain all queued messages
154    pub fn drain_queued(&mut self) -> Vec<QueuedMessage> {
155        self.queued_messages.drain(..).collect()
156    }
157
158    /// Pop the last queued message for editing
159    pub fn pop_last_queued(&mut self) -> Option<QueuedMessage> {
160        self.queued_messages.pop_back()
161    }
162
163    /// Mark that next interrupt should submit steers
164    pub fn set_submit_after_interrupt(&mut self, value: bool) {
165        self.submit_pending_steers_after_interrupt = value;
166    }
167
168    /// Check if should submit steers after interrupt
169    pub fn should_submit_after_interrupt(&self) -> bool {
170        self.submit_pending_steers_after_interrupt
171    }
172
173    /// Set suppress autosend flag
174    pub fn set_suppress_autosend(&mut self, value: bool) {
175        self.suppress_autosend = value;
176    }
177
178    /// Check if autosend is suppressed
179    pub fn is_autosend_suppressed(&self) -> bool {
180        self.suppress_autosend
181    }
182
183    /// Merge all pending messages (steers + queued) into a single text
184    pub fn merge_all(&mut self) -> Option<String> {
185        if self.is_empty() {
186            return None;
187        }
188
189        let mut texts: Vec<String> = Vec::new();
190
191        for steer in self.pending_steers.drain(..) {
192            texts.push(steer.text);
193        }
194
195        for msg in self.queued_messages.drain(..) {
196            texts.push(msg.text);
197        }
198
199        self.submit_pending_steers_after_interrupt = false;
200
201        Some(texts.join("\n\n"))
202    }
203
204    /// Clear all queued and pending messages
205    pub fn clear(&mut self) {
206        self.queued_messages.clear();
207        self.pending_steers.clear();
208        self.submit_pending_steers_after_interrupt = false;
209    }
210
211    pub fn save_thread_state(&self) -> Option<ThreadInputState> {
212        if self.queued_messages.is_empty() && self.pending_steers.is_empty() {
213            return None;
214        }
215
216        Some(ThreadInputState {
217            queued_messages: self.queued_messages.iter().cloned().collect(),
218            pending_steers: self.pending_steers.iter().cloned().collect(),
219            submit_after_interrupt: self.submit_pending_steers_after_interrupt,
220            suppress_autosend: self.suppress_autosend,
221        })
222    }
223
224    pub fn restore_thread_state(&mut self, state: Option<ThreadInputState>) {
225        if let Some(state) = state {
226            self.queued_messages.clear();
227            for msg in state.queued_messages {
228                self.queued_messages.push_back(msg);
229            }
230            for msg in state.pending_steers {
231                self.pending_steers.push_back(msg);
232            }
233            self.submit_pending_steers_after_interrupt = state.submit_after_interrupt;
234            self.suppress_autosend = state.suppress_autosend;
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_queue_message() {
245        let mut queue = InputQueue::new();
246        assert!(!queue.has_queued_messages());
247
248        queue.queue_message("Hello".to_string());
249        assert!(queue.has_queued_messages());
250        assert_eq!(queue.queued_count(), 1);
251    }
252
253    #[test]
254    fn test_add_steer() {
255        let mut queue = InputQueue::new();
256        assert!(!queue.has_pending_steers());
257
258        queue.add_steer("Continue".to_string());
259        assert!(queue.has_pending_steers());
260        assert_eq!(queue.steer_count(), 1);
261    }
262
263    #[test]
264    fn test_pop_queued() {
265        let mut queue = InputQueue::new();
266        queue.queue_message("First".to_string());
267        queue.queue_message("Second".to_string());
268
269        let msg = queue.pop_queued();
270        assert_eq!(msg.unwrap().text, "First");
271        assert_eq!(queue.queued_count(), 1);
272    }
273
274    #[test]
275    fn test_pop_last_queued() {
276        let mut queue = InputQueue::new();
277        queue.queue_message("First".to_string());
278        queue.queue_message("Second".to_string());
279
280        let msg = queue.pop_last_queued();
281        assert_eq!(msg.unwrap().text, "Second");
282        assert_eq!(queue.queued_count(), 1);
283    }
284
285    #[test]
286    fn test_merge_all() {
287        let mut queue = InputQueue::new();
288        queue.add_steer("Steer1".to_string());
289        queue.queue_message("Queue1".to_string());
290
291        let merged = queue.merge_all();
292        assert_eq!(merged, Some("Steer1\n\nQueue1".to_string()));
293        assert!(queue.is_empty());
294    }
295
296    #[test]
297    fn test_interrupt_flag() {
298        let mut queue = InputQueue::new();
299        assert!(!queue.should_submit_after_interrupt());
300
301        queue.set_submit_after_interrupt(true);
302        assert!(queue.should_submit_after_interrupt());
303    }
304
305    #[test]
306    fn test_suppress_autosend() {
307        let mut queue = InputQueue::new();
308        assert!(!queue.is_autosend_suppressed());
309
310        queue.set_suppress_autosend(true);
311        assert!(queue.is_autosend_suppressed());
312    }
313
314    #[test]
315    fn test_clear() {
316        let mut queue = InputQueue::new();
317        queue.queue_message("Test".to_string());
318        queue.add_steer("Steer".to_string());
319        queue.set_submit_after_interrupt(true);
320
321        queue.clear();
322        assert!(queue.is_empty());
323        assert!(!queue.should_submit_after_interrupt());
324    }
325
326    #[test]
327    fn test_thread_state_roundtrip() {
328        let mut queue = InputQueue::new();
329        queue.queue_message("Queued".to_string());
330        queue.add_steer("Steer".to_string());
331        queue.set_submit_after_interrupt(true);
332        queue.set_suppress_autosend(true);
333
334        let state = queue.save_thread_state();
335        assert!(state.is_some());
336        let state = state.unwrap();
337        assert!(state.has_content());
338        assert_eq!(state.queued_messages.len(), 1);
339        assert_eq!(state.pending_steers.len(), 1);
340
341        queue.clear();
342        assert!(queue.is_empty());
343
344        queue.restore_thread_state(Some(state));
345        assert_eq!(queue.queued_count(), 1);
346        assert_eq!(queue.steer_count(), 1);
347        assert!(queue.should_submit_after_interrupt());
348        assert!(queue.is_autosend_suppressed());
349    }
350
351    #[test]
352    fn test_thread_state_empty() {
353        let queue = InputQueue::new();
354        assert!(queue.save_thread_state().is_none());
355    }
356
357    #[test]
358    fn test_backpressure_queued() {
359        let config = QueueConfig {
360            max_queued_messages: 3,
361            max_pending_steers: 10,
362        };
363        let mut queue = InputQueue::with_config(config);
364
365        for i in 0..5 {
366            queue.queue_message(format!("Message {}", i));
367        }
368
369        assert_eq!(queue.queued_count(), 3);
370        let first = queue.pop_queued().unwrap();
371        assert_eq!(first.text, "Message 2");
372    }
373
374    #[test]
375    fn test_backpressure_steers() {
376        let config = QueueConfig {
377            max_queued_messages: 50,
378            max_pending_steers: 2,
379        };
380        let mut queue = InputQueue::with_config(config);
381
382        for i in 0..4 {
383            queue.add_steer(format!("Steer {}", i));
384        }
385
386        assert_eq!(queue.steer_count(), 2);
387    }
388}