Skip to main content

meerkat_runtime/
queue.rs

1//! InputQueue — FIFO queue with scope filtering.
2
3use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9/// A queued input entry.
10#[derive(Debug, Clone)]
11pub struct QueuedInput {
12    pub input_id: InputId,
13    pub input: Input,
14}
15
16/// FIFO input queue.
17#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19    queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23    /// Create a new empty queue.
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Enqueue an input.
29    pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30        self.queue.push_back(QueuedInput { input_id, input });
31    }
32
33    /// Enqueue an input at the front of the queue.
34    pub fn enqueue_front(&mut self, input_id: InputId, input: Input) {
35        self.queue.push_front(QueuedInput { input_id, input });
36    }
37
38    /// Dequeue the next input (FIFO).
39    pub fn dequeue(&mut self) -> Option<QueuedInput> {
40        self.queue.pop_front()
41    }
42
43    /// Peek at the next input without removing it.
44    pub fn peek(&self) -> Option<&QueuedInput> {
45        self.queue.front()
46    }
47
48    /// Check if the queue is empty.
49    pub fn is_empty(&self) -> bool {
50        self.queue.is_empty()
51    }
52
53    /// Number of entries in the queue.
54    pub fn len(&self) -> usize {
55        self.queue.len()
56    }
57
58    /// Remove a specific input by ID and return it as (InputId, Input).
59    ///
60    /// Used by the batching policy to dequeue specific IDs determined by the authority.
61    pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, crate::input::Input)> {
62        self.remove(input_id).map(|q| (q.input_id, q.input))
63    }
64
65    /// Remove a specific input by ID (e.g., for supersession).
66    pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
67        if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
68            self.queue.remove(pos)
69        } else {
70            None
71        }
72    }
73
74    /// Drain all entries from the queue.
75    pub fn drain(&mut self) -> Vec<QueuedInput> {
76        self.queue.drain(..).collect()
77    }
78
79    /// Get all input IDs in the queue (in order).
80    pub fn input_ids(&self) -> Vec<InputId> {
81        self.queue.iter().map(|q| q.input_id.clone()).collect()
82    }
83}
84
85#[cfg(test)]
86#[allow(clippy::unwrap_used)]
87mod tests {
88    use super::*;
89    use crate::input::*;
90    use chrono::Utc;
91
92    fn make_prompt(id: InputId) -> Input {
93        Input::Prompt(PromptInput {
94            header: InputHeader {
95                id,
96                timestamp: Utc::now(),
97                source: InputOrigin::Operator,
98                durability: InputDurability::Ephemeral,
99                visibility: InputVisibility::default(),
100                idempotency_key: None,
101                supersession_key: None,
102                correlation_id: None,
103            },
104            text: "test".into(),
105            blocks: None,
106            turn_metadata: None,
107        })
108    }
109
110    #[test]
111    fn fifo_ordering() {
112        let mut queue = InputQueue::new();
113        let id1 = InputId::new();
114        let id2 = InputId::new();
115        let id3 = InputId::new();
116
117        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
118        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
119        queue.enqueue(id3.clone(), make_prompt(id3.clone()));
120
121        assert_eq!(queue.len(), 3);
122        assert_eq!(queue.dequeue().unwrap().input_id, id1);
123        assert_eq!(queue.dequeue().unwrap().input_id, id2);
124        assert_eq!(queue.dequeue().unwrap().input_id, id3);
125        assert!(queue.is_empty());
126    }
127
128    #[test]
129    fn remove_by_id() {
130        let mut queue = InputQueue::new();
131        let id1 = InputId::new();
132        let id2 = InputId::new();
133
134        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
135        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
136
137        let removed = queue.remove(&id1);
138        assert!(removed.is_some());
139        assert_eq!(queue.len(), 1);
140        assert_eq!(queue.dequeue().unwrap().input_id, id2);
141    }
142
143    #[test]
144    fn remove_nonexistent() {
145        let mut queue = InputQueue::new();
146        let result = queue.remove(&InputId::new());
147        assert!(result.is_none());
148    }
149
150    #[test]
151    fn peek_does_not_remove() {
152        let mut queue = InputQueue::new();
153        let id = InputId::new();
154        queue.enqueue(id.clone(), make_prompt(id.clone()));
155
156        assert_eq!(queue.peek().unwrap().input_id, id);
157        assert_eq!(queue.len(), 1); // Still there
158    }
159
160    #[test]
161    fn drain_empties_queue() {
162        let mut queue = InputQueue::new();
163        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
164        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
165
166        let drained = queue.drain();
167        assert_eq!(drained.len(), 2);
168        assert!(queue.is_empty());
169    }
170
171    #[test]
172    fn enqueue_front_wins_ordering() {
173        let mut queue = InputQueue::new();
174        let back = InputId::new();
175        let front = InputId::new();
176
177        queue.enqueue(back.clone(), make_prompt(back.clone()));
178        queue.enqueue_front(front.clone(), make_prompt(front.clone()));
179
180        assert_eq!(queue.dequeue().unwrap().input_id, front);
181        assert_eq!(queue.dequeue().unwrap().input_id, back);
182    }
183}