Skip to main content

meerkat_runtime/
queue.rs

1//! InputQueue — FIFO queue with scope filtering.
2
3use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9/// A queued input entry.
10#[derive(Debug, Clone)]
11pub struct QueuedInput {
12    pub input_id: InputId,
13    pub input: Input,
14}
15
16/// FIFO input queue.
17#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19    queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23    /// Create a new empty queue.
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Enqueue an input.
29    pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30        self.queue.push_back(QueuedInput { input_id, input });
31    }
32
33    /// Dequeue the next input (FIFO).
34    pub fn dequeue(&mut self) -> Option<QueuedInput> {
35        self.queue.pop_front()
36    }
37
38    /// Peek at the next input without removing it.
39    pub fn peek(&self) -> Option<&QueuedInput> {
40        self.queue.front()
41    }
42
43    /// Check if the queue is empty.
44    pub fn is_empty(&self) -> bool {
45        self.queue.is_empty()
46    }
47
48    /// Number of entries in the queue.
49    pub fn len(&self) -> usize {
50        self.queue.len()
51    }
52
53    /// Remove a specific input by ID (e.g., for supersession).
54    pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
55        if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
56            self.queue.remove(pos)
57        } else {
58            None
59        }
60    }
61
62    /// Drain all entries from the queue.
63    pub fn drain(&mut self) -> Vec<QueuedInput> {
64        self.queue.drain(..).collect()
65    }
66
67    /// Get all input IDs in the queue (in order).
68    pub fn input_ids(&self) -> Vec<InputId> {
69        self.queue.iter().map(|q| q.input_id.clone()).collect()
70    }
71}
72
73#[cfg(test)]
74#[allow(clippy::unwrap_used)]
75mod tests {
76    use super::*;
77    use crate::input::*;
78    use chrono::Utc;
79
80    fn make_prompt(id: InputId) -> Input {
81        Input::Prompt(PromptInput {
82            header: InputHeader {
83                id,
84                timestamp: Utc::now(),
85                source: InputOrigin::Operator,
86                durability: InputDurability::Ephemeral,
87                visibility: InputVisibility::default(),
88                idempotency_key: None,
89                supersession_key: None,
90                correlation_id: None,
91            },
92            text: "test".into(),
93            turn_metadata: None,
94        })
95    }
96
97    #[test]
98    fn fifo_ordering() {
99        let mut queue = InputQueue::new();
100        let id1 = InputId::new();
101        let id2 = InputId::new();
102        let id3 = InputId::new();
103
104        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
105        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
106        queue.enqueue(id3.clone(), make_prompt(id3.clone()));
107
108        assert_eq!(queue.len(), 3);
109        assert_eq!(queue.dequeue().unwrap().input_id, id1);
110        assert_eq!(queue.dequeue().unwrap().input_id, id2);
111        assert_eq!(queue.dequeue().unwrap().input_id, id3);
112        assert!(queue.is_empty());
113    }
114
115    #[test]
116    fn remove_by_id() {
117        let mut queue = InputQueue::new();
118        let id1 = InputId::new();
119        let id2 = InputId::new();
120
121        queue.enqueue(id1.clone(), make_prompt(id1.clone()));
122        queue.enqueue(id2.clone(), make_prompt(id2.clone()));
123
124        let removed = queue.remove(&id1);
125        assert!(removed.is_some());
126        assert_eq!(queue.len(), 1);
127        assert_eq!(queue.dequeue().unwrap().input_id, id2);
128    }
129
130    #[test]
131    fn remove_nonexistent() {
132        let mut queue = InputQueue::new();
133        let result = queue.remove(&InputId::new());
134        assert!(result.is_none());
135    }
136
137    #[test]
138    fn peek_does_not_remove() {
139        let mut queue = InputQueue::new();
140        let id = InputId::new();
141        queue.enqueue(id.clone(), make_prompt(id.clone()));
142
143        assert_eq!(queue.peek().unwrap().input_id, id);
144        assert_eq!(queue.len(), 1); // Still there
145    }
146
147    #[test]
148    fn drain_empties_queue() {
149        let mut queue = InputQueue::new();
150        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
151        queue.enqueue(InputId::new(), make_prompt(InputId::new()));
152
153        let drained = queue.drain();
154        assert_eq!(drained.len(), 2);
155        assert!(queue.is_empty());
156    }
157}