1use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9#[derive(Debug, Clone)]
11pub struct QueuedInput {
12 pub input_id: InputId,
13 pub input: Input,
14}
15
16#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19 queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23 pub fn new() -> Self {
25 Self::default()
26 }
27
28 pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30 self.queue.push_back(QueuedInput { input_id, input });
31 }
32
33 pub fn enqueue_front(&mut self, input_id: InputId, input: Input) {
35 self.queue.push_front(QueuedInput { input_id, input });
36 }
37
38 pub fn dequeue(&mut self) -> Option<QueuedInput> {
40 self.queue.pop_front()
41 }
42
43 pub fn peek(&self) -> Option<&QueuedInput> {
45 self.queue.front()
46 }
47
48 pub fn is_empty(&self) -> bool {
50 self.queue.is_empty()
51 }
52
53 pub fn len(&self) -> usize {
55 self.queue.len()
56 }
57
58 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, crate::input::Input)> {
62 self.remove(input_id).map(|q| (q.input_id, q.input))
63 }
64
65 pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
67 if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
68 self.queue.remove(pos)
69 } else {
70 None
71 }
72 }
73
74 pub fn drain(&mut self) -> Vec<QueuedInput> {
76 self.queue.drain(..).collect()
77 }
78
79 pub fn input_ids(&self) -> Vec<InputId> {
81 self.queue.iter().map(|q| q.input_id.clone()).collect()
82 }
83}
84
85#[cfg(test)]
86#[allow(clippy::unwrap_used)]
87mod tests {
88 use super::*;
89 use crate::input::*;
90 use chrono::Utc;
91
92 fn make_prompt(id: InputId) -> Input {
93 Input::Prompt(PromptInput {
94 header: InputHeader {
95 id,
96 timestamp: Utc::now(),
97 source: InputOrigin::Operator,
98 durability: InputDurability::Ephemeral,
99 visibility: InputVisibility::default(),
100 idempotency_key: None,
101 supersession_key: None,
102 correlation_id: None,
103 },
104 text: "test".into(),
105 blocks: None,
106 turn_metadata: None,
107 })
108 }
109
110 #[test]
111 fn fifo_ordering() {
112 let mut queue = InputQueue::new();
113 let id1 = InputId::new();
114 let id2 = InputId::new();
115 let id3 = InputId::new();
116
117 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
118 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
119 queue.enqueue(id3.clone(), make_prompt(id3.clone()));
120
121 assert_eq!(queue.len(), 3);
122 assert_eq!(queue.dequeue().unwrap().input_id, id1);
123 assert_eq!(queue.dequeue().unwrap().input_id, id2);
124 assert_eq!(queue.dequeue().unwrap().input_id, id3);
125 assert!(queue.is_empty());
126 }
127
128 #[test]
129 fn remove_by_id() {
130 let mut queue = InputQueue::new();
131 let id1 = InputId::new();
132 let id2 = InputId::new();
133
134 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
135 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
136
137 let removed = queue.remove(&id1);
138 assert!(removed.is_some());
139 assert_eq!(queue.len(), 1);
140 assert_eq!(queue.dequeue().unwrap().input_id, id2);
141 }
142
143 #[test]
144 fn remove_nonexistent() {
145 let mut queue = InputQueue::new();
146 let result = queue.remove(&InputId::new());
147 assert!(result.is_none());
148 }
149
150 #[test]
151 fn peek_does_not_remove() {
152 let mut queue = InputQueue::new();
153 let id = InputId::new();
154 queue.enqueue(id.clone(), make_prompt(id.clone()));
155
156 assert_eq!(queue.peek().unwrap().input_id, id);
157 assert_eq!(queue.len(), 1); }
159
160 #[test]
161 fn drain_empties_queue() {
162 let mut queue = InputQueue::new();
163 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
164 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
165
166 let drained = queue.drain();
167 assert_eq!(drained.len(), 2);
168 assert!(queue.is_empty());
169 }
170
171 #[test]
172 fn enqueue_front_wins_ordering() {
173 let mut queue = InputQueue::new();
174 let back = InputId::new();
175 let front = InputId::new();
176
177 queue.enqueue(back.clone(), make_prompt(back.clone()));
178 queue.enqueue_front(front.clone(), make_prompt(front.clone()));
179
180 assert_eq!(queue.dequeue().unwrap().input_id, front);
181 assert_eq!(queue.dequeue().unwrap().input_id, back);
182 }
183}