Skip to main content

meerkat_runtime/
queue.rs

1//! InputQueue — FIFO queue with scope filtering.
2
3use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9/// A queued input entry.
10#[derive(Debug, Clone)]
11pub struct QueuedInput {
12    pub input_id: InputId,
13    pub input: Input,
14}
15
16/// FIFO input queue.
17#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19    queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23    /// Create a new empty queue.
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Enqueue an input.
29    pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30        self.queue.push_back(QueuedInput { input_id, input });
31    }
32
33    /// Dequeue the next input (FIFO).
34    pub fn dequeue(&mut self) -> Option<QueuedInput> {
35        self.queue.pop_front()
36    }
37
38    /// Peek at the next input without removing it.
39    pub fn peek(&self) -> Option<&QueuedInput> {
40        self.queue.front()
41    }
42
43    /// Check if the queue is empty.
44    pub fn is_empty(&self) -> bool {
45        self.queue.is_empty()
46    }
47
48    /// Number of entries in the queue.
49    pub fn len(&self) -> usize {
50        self.queue.len()
51    }
52
53    /// Remove a specific input by ID (e.g., for supersession).
54    pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
55        if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
56            self.queue.remove(pos)
57        } else {
58            None
59        }
60    }
61
62    /// Drain all entries from the queue.
63    pub fn drain(&mut self) -> Vec<QueuedInput> {
64        self.queue.drain(..).collect()
65    }
66
67    /// Get all input IDs in the queue (in order).
68    pub fn input_ids(&self) -> Vec<InputId> {
69        self.queue.iter().map(|q| q.input_id.clone()).collect()
70    }
71}
72
73#[cfg(test)]
74#[allow(clippy::unwrap_used)]
75mod tests {
76    use super::*;
77    use crate::input::*;
78    use chrono::Utc;
79
80    fn make_prompt(id: InputId) -> Input {
81        Input::Prompt(PromptInput {
82            header: InputHeader {
83                id,
84                timestamp: Utc::now(),
85                source: InputOrigin::Operator,
86                durability: InputDurability::Ephemeral,
87                visibility: InputVisibility::default(),
88                idempotency_key: None,
89                supersession_key: None,
90                correlation_id: None,
91            },
92            text: "test".into(),
93            blocks: None,
94            turn_metadata: None,
95        })
96    }
97
98    #[test]
99    fn fifo_ordering() {
100        let mut queue = InputQueue::new();
101        let id1 = InputId::new();
102        let id2 = InputId::new();
103        let id3 = InputId::new();
104
105        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
106        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
107        queue.enqueue(id3.clone(), make_prompt(id3.clone()));
108
109        assert_eq!(queue.len(), 3);
110        assert_eq!(queue.dequeue().unwrap().input_id, id1);
111        assert_eq!(queue.dequeue().unwrap().input_id, id2);
112        assert_eq!(queue.dequeue().unwrap().input_id, id3);
113        assert!(queue.is_empty());
114    }
115
116    #[test]
117    fn remove_by_id() {
118        let mut queue = InputQueue::new();
119        let id1 = InputId::new();
120        let id2 = InputId::new();
121
122        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
123        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
124
125        let removed = queue.remove(&id1);
126        assert!(removed.is_some());
127        assert_eq!(queue.len(), 1);
128        assert_eq!(queue.dequeue().unwrap().input_id, id2);
129    }
130
131    #[test]
132    fn remove_nonexistent() {
133        let mut queue = InputQueue::new();
134        let result = queue.remove(&InputId::new());
135        assert!(result.is_none());
136    }
137
138    #[test]
139    fn peek_does_not_remove() {
140        let mut queue = InputQueue::new();
141        let id = InputId::new();
142        queue.enqueue(id.clone(), make_prompt(id.clone()));
143
144        assert_eq!(queue.peek().unwrap().input_id, id);
145        assert_eq!(queue.len(), 1); // Still there
146    }
147
148    #[test]
149    fn drain_empties_queue() {
150        let mut queue = InputQueue::new();
151        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
152        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
153
154        let drained = queue.drain();
155        assert_eq!(drained.len(), 2);
156        assert!(queue.is_empty());
157    }
158}