1use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9#[derive(Debug, Clone)]
11pub struct QueuedInput {
12 pub input_id: InputId,
13 pub input: Input,
14}
15
16#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19 queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23 pub fn new() -> Self {
25 Self::default()
26 }
27
28 pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30 self.queue.push_back(QueuedInput { input_id, input });
31 }
32
33 pub fn dequeue(&mut self) -> Option<QueuedInput> {
35 self.queue.pop_front()
36 }
37
38 pub fn peek(&self) -> Option<&QueuedInput> {
40 self.queue.front()
41 }
42
43 pub fn is_empty(&self) -> bool {
45 self.queue.is_empty()
46 }
47
48 pub fn len(&self) -> usize {
50 self.queue.len()
51 }
52
53 pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
55 if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
56 self.queue.remove(pos)
57 } else {
58 None
59 }
60 }
61
62 pub fn drain(&mut self) -> Vec<QueuedInput> {
64 self.queue.drain(..).collect()
65 }
66
67 pub fn input_ids(&self) -> Vec<InputId> {
69 self.queue.iter().map(|q| q.input_id.clone()).collect()
70 }
71}
72
73#[cfg(test)]
74#[allow(clippy::unwrap_used)]
75mod tests {
76 use super::*;
77 use crate::input::*;
78 use chrono::Utc;
79
80 fn make_prompt(id: InputId) -> Input {
81 Input::Prompt(PromptInput {
82 header: InputHeader {
83 id,
84 timestamp: Utc::now(),
85 source: InputOrigin::Operator,
86 durability: InputDurability::Ephemeral,
87 visibility: InputVisibility::default(),
88 idempotency_key: None,
89 supersession_key: None,
90 correlation_id: None,
91 },
92 text: "test".into(),
93 blocks: None,
94 turn_metadata: None,
95 })
96 }
97
98 #[test]
99 fn fifo_ordering() {
100 let mut queue = InputQueue::new();
101 let id1 = InputId::new();
102 let id2 = InputId::new();
103 let id3 = InputId::new();
104
105 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
106 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
107 queue.enqueue(id3.clone(), make_prompt(id3.clone()));
108
109 assert_eq!(queue.len(), 3);
110 assert_eq!(queue.dequeue().unwrap().input_id, id1);
111 assert_eq!(queue.dequeue().unwrap().input_id, id2);
112 assert_eq!(queue.dequeue().unwrap().input_id, id3);
113 assert!(queue.is_empty());
114 }
115
116 #[test]
117 fn remove_by_id() {
118 let mut queue = InputQueue::new();
119 let id1 = InputId::new();
120 let id2 = InputId::new();
121
122 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
123 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
124
125 let removed = queue.remove(&id1);
126 assert!(removed.is_some());
127 assert_eq!(queue.len(), 1);
128 assert_eq!(queue.dequeue().unwrap().input_id, id2);
129 }
130
131 #[test]
132 fn remove_nonexistent() {
133 let mut queue = InputQueue::new();
134 let result = queue.remove(&InputId::new());
135 assert!(result.is_none());
136 }
137
138 #[test]
139 fn peek_does_not_remove() {
140 let mut queue = InputQueue::new();
141 let id = InputId::new();
142 queue.enqueue(id.clone(), make_prompt(id.clone()));
143
144 assert_eq!(queue.peek().unwrap().input_id, id);
145 assert_eq!(queue.len(), 1); }
147
148 #[test]
149 fn drain_empties_queue() {
150 let mut queue = InputQueue::new();
151 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
152 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
153
154 let drained = queue.drain();
155 assert_eq!(drained.len(), 2);
156 assert!(queue.is_empty());
157 }
158}