1use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9#[derive(Debug, Clone)]
11pub struct QueuedInput {
12 pub input_id: InputId,
13 pub input: Input,
14}
15
16#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19 queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23 pub fn new() -> Self {
25 Self::default()
26 }
27
28 pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30 self.queue.push_back(QueuedInput { input_id, input });
31 }
32
33 pub fn enqueue_front(&mut self, input_id: InputId, input: Input) {
35 self.queue.push_front(QueuedInput { input_id, input });
36 }
37
38 pub fn dequeue(&mut self) -> Option<QueuedInput> {
40 self.queue.pop_front()
41 }
42
43 pub fn peek(&self) -> Option<&QueuedInput> {
45 self.queue.front()
46 }
47
48 pub fn is_empty(&self) -> bool {
50 self.queue.is_empty()
51 }
52
53 pub fn len(&self) -> usize {
55 self.queue.len()
56 }
57
58 pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, crate::input::Input)> {
62 self.remove(input_id).map(|q| (q.input_id, q.input))
63 }
64
65 pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
67 if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
68 self.queue.remove(pos)
69 } else {
70 None
71 }
72 }
73
74 pub fn drain(&mut self) -> Vec<QueuedInput> {
76 self.queue.drain(..).collect()
77 }
78
79 pub fn input_ids(&self) -> Vec<InputId> {
81 self.queue.iter().map(|q| q.input_id.clone()).collect()
82 }
83}
84
85#[cfg(test)]
86#[allow(clippy::unwrap_used)]
87mod tests {
88 use super::*;
89 use crate::input::*;
90 use chrono::Utc;
91
92 fn make_prompt(id: InputId) -> Input {
93 Input::Prompt(PromptInput {
94 header: InputHeader {
95 id,
96 timestamp: Utc::now(),
97 source: InputOrigin::Operator,
98 durability: InputDurability::Ephemeral,
99 visibility: InputVisibility::default(),
100 idempotency_key: None,
101 supersession_key: None,
102 correlation_id: None,
103 },
104 text: "test".into(),
105 blocks: None,
106 typed_turn_appends: Vec::new(),
107 turn_metadata: None,
108 })
109 }
110
111 #[test]
112 fn fifo_ordering() {
113 let mut queue = InputQueue::new();
114 let id1 = InputId::new();
115 let id2 = InputId::new();
116 let id3 = InputId::new();
117
118 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
119 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
120 queue.enqueue(id3.clone(), make_prompt(id3.clone()));
121
122 assert_eq!(queue.len(), 3);
123 assert_eq!(queue.dequeue().unwrap().input_id, id1);
124 assert_eq!(queue.dequeue().unwrap().input_id, id2);
125 assert_eq!(queue.dequeue().unwrap().input_id, id3);
126 assert!(queue.is_empty());
127 }
128
129 #[test]
130 fn remove_by_id() {
131 let mut queue = InputQueue::new();
132 let id1 = InputId::new();
133 let id2 = InputId::new();
134
135 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
136 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
137
138 let removed = queue.remove(&id1);
139 assert!(removed.is_some());
140 assert_eq!(queue.len(), 1);
141 assert_eq!(queue.dequeue().unwrap().input_id, id2);
142 }
143
144 #[test]
145 fn remove_nonexistent() {
146 let mut queue = InputQueue::new();
147 let result = queue.remove(&InputId::new());
148 assert!(result.is_none());
149 }
150
151 #[test]
152 fn peek_does_not_remove() {
153 let mut queue = InputQueue::new();
154 let id = InputId::new();
155 queue.enqueue(id.clone(), make_prompt(id.clone()));
156
157 assert_eq!(queue.peek().unwrap().input_id, id);
158 assert_eq!(queue.len(), 1); }
160
161 #[test]
162 fn drain_empties_queue() {
163 let mut queue = InputQueue::new();
164 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
165 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
166
167 let drained = queue.drain();
168 assert_eq!(drained.len(), 2);
169 assert!(queue.is_empty());
170 }
171
172 #[test]
173 fn enqueue_front_wins_ordering() {
174 let mut queue = InputQueue::new();
175 let back = InputId::new();
176 let front = InputId::new();
177
178 queue.enqueue(back.clone(), make_prompt(back.clone()));
179 queue.enqueue_front(front.clone(), make_prompt(front.clone()));
180
181 assert_eq!(queue.dequeue().unwrap().input_id, front);
182 assert_eq!(queue.dequeue().unwrap().input_id, back);
183 }
184}