Skip to main content

meerkat_runtime/
queue.rs

1//! InputQueue — FIFO queue with scope filtering.
2
3use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9/// A queued input entry.
10#[derive(Debug, Clone)]
11pub struct QueuedInput {
12    pub input_id: InputId,
13    pub input: Input,
14}
15
16/// FIFO input queue.
17#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19    queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23    /// Create a new empty queue.
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Enqueue an input.
29    pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30        self.queue.push_back(QueuedInput { input_id, input });
31    }
32
33    /// Enqueue an input at the front of the queue.
34    pub fn enqueue_front(&mut self, input_id: InputId, input: Input) {
35        self.queue.push_front(QueuedInput { input_id, input });
36    }
37
38    /// Dequeue the next input (FIFO).
39    pub fn dequeue(&mut self) -> Option<QueuedInput> {
40        self.queue.pop_front()
41    }
42
43    /// Peek at the next input without removing it.
44    pub fn peek(&self) -> Option<&QueuedInput> {
45        self.queue.front()
46    }
47
48    /// Check if the queue is empty.
49    pub fn is_empty(&self) -> bool {
50        self.queue.is_empty()
51    }
52
53    /// Number of entries in the queue.
54    pub fn len(&self) -> usize {
55        self.queue.len()
56    }
57
58    /// Remove a specific input by ID and return it as (InputId, Input).
59    ///
60    /// Used by the batching policy to dequeue specific IDs determined by the authority.
61    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, crate::input::Input)> {
62        self.remove(input_id).map(|q| (q.input_id, q.input))
63    }
64
65    /// Remove a specific input by ID (e.g., for supersession).
66    pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
67        if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
68            self.queue.remove(pos)
69        } else {
70            None
71        }
72    }
73
74    /// Drain all entries from the queue.
75    pub fn drain(&mut self) -> Vec<QueuedInput> {
76        self.queue.drain(..).collect()
77    }
78
79    /// Get all input IDs in the queue (in order).
80    pub fn input_ids(&self) -> Vec<InputId> {
81        self.queue.iter().map(|q| q.input_id.clone()).collect()
82    }
83}
84
85#[cfg(test)]
86#[allow(clippy::unwrap_used)]
87mod tests {
88    use super::*;
89    use crate::input::*;
90    use chrono::Utc;
91
92    fn make_prompt(id: InputId) -> Input {
93        Input::Prompt(PromptInput {
94            header: InputHeader {
95                id,
96                timestamp: Utc::now(),
97                source: InputOrigin::Operator,
98                durability: InputDurability::Ephemeral,
99                visibility: InputVisibility::default(),
100                idempotency_key: None,
101                supersession_key: None,
102                correlation_id: None,
103            },
104            text: "test".into(),
105            blocks: None,
106            typed_turn_appends: Vec::new(),
107            turn_metadata: None,
108        })
109    }
110
111    #[test]
112    fn fifo_ordering() {
113        let mut queue = InputQueue::new();
114        let id1 = InputId::new();
115        let id2 = InputId::new();
116        let id3 = InputId::new();
117
118        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
119        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
120        queue.enqueue(id3.clone(), make_prompt(id3.clone()));
121
122        assert_eq!(queue.len(), 3);
123        assert_eq!(queue.dequeue().unwrap().input_id, id1);
124        assert_eq!(queue.dequeue().unwrap().input_id, id2);
125        assert_eq!(queue.dequeue().unwrap().input_id, id3);
126        assert!(queue.is_empty());
127    }
128
129    #[test]
130    fn remove_by_id() {
131        let mut queue = InputQueue::new();
132        let id1 = InputId::new();
133        let id2 = InputId::new();
134
135        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
136        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
137
138        let removed = queue.remove(&id1);
139        assert!(removed.is_some());
140        assert_eq!(queue.len(), 1);
141        assert_eq!(queue.dequeue().unwrap().input_id, id2);
142    }
143
144    #[test]
145    fn remove_nonexistent() {
146        let mut queue = InputQueue::new();
147        let result = queue.remove(&InputId::new());
148        assert!(result.is_none());
149    }
150
151    #[test]
152    fn peek_does_not_remove() {
153        let mut queue = InputQueue::new();
154        let id = InputId::new();
155        queue.enqueue(id.clone(), make_prompt(id.clone()));
156
157        assert_eq!(queue.peek().unwrap().input_id, id);
158        assert_eq!(queue.len(), 1); // Still there
159    }
160
161    #[test]
162    fn drain_empties_queue() {
163        let mut queue = InputQueue::new();
164        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
165        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
166
167        let drained = queue.drain();
168        assert_eq!(drained.len(), 2);
169        assert!(queue.is_empty());
170    }
171
172    #[test]
173    fn enqueue_front_wins_ordering() {
174        let mut queue = InputQueue::new();
175        let back = InputId::new();
176        let front = InputId::new();
177
178        queue.enqueue(back.clone(), make_prompt(back.clone()));
179        queue.enqueue_front(front.clone(), make_prompt(front.clone()));
180
181        assert_eq!(queue.dequeue().unwrap().input_id, front);
182        assert_eq!(queue.dequeue().unwrap().input_id, back);
183    }
184}