1use std::collections::VecDeque;
4
5use meerkat_core::lifecycle::InputId;
6
7use crate::input::Input;
8
9#[derive(Debug, Clone)]
11pub struct QueuedInput {
12 pub input_id: InputId,
13 pub input: Input,
14}
15
16#[derive(Debug, Default, Clone)]
18pub struct InputQueue {
19 queue: VecDeque<QueuedInput>,
20}
21
22impl InputQueue {
23 pub fn new() -> Self {
25 Self::default()
26 }
27
28 pub fn enqueue(&mut self, input_id: InputId, input: Input) {
30 self.queue.push_back(QueuedInput { input_id, input });
31 }
32
33 pub fn dequeue(&mut self) -> Option<QueuedInput> {
35 self.queue.pop_front()
36 }
37
38 pub fn peek(&self) -> Option<&QueuedInput> {
40 self.queue.front()
41 }
42
43 pub fn is_empty(&self) -> bool {
45 self.queue.is_empty()
46 }
47
48 pub fn len(&self) -> usize {
50 self.queue.len()
51 }
52
53 pub fn remove(&mut self, input_id: &InputId) -> Option<QueuedInput> {
55 if let Some(pos) = self.queue.iter().position(|q| q.input_id == *input_id) {
56 self.queue.remove(pos)
57 } else {
58 None
59 }
60 }
61
62 pub fn drain(&mut self) -> Vec<QueuedInput> {
64 self.queue.drain(..).collect()
65 }
66
67 pub fn input_ids(&self) -> Vec<InputId> {
69 self.queue.iter().map(|q| q.input_id.clone()).collect()
70 }
71}
72
73#[cfg(test)]
74#[allow(clippy::unwrap_used)]
75mod tests {
76 use super::*;
77 use crate::input::*;
78 use chrono::Utc;
79
80 fn make_prompt(id: InputId) -> Input {
81 Input::Prompt(PromptInput {
82 header: InputHeader {
83 id,
84 timestamp: Utc::now(),
85 source: InputOrigin::Operator,
86 durability: InputDurability::Ephemeral,
87 visibility: InputVisibility::default(),
88 idempotency_key: None,
89 supersession_key: None,
90 correlation_id: None,
91 },
92 text: "test".into(),
93 turn_metadata: None,
94 })
95 }
96
97 #[test]
98 fn fifo_ordering() {
99 let mut queue = InputQueue::new();
100 let id1 = InputId::new();
101 let id2 = InputId::new();
102 let id3 = InputId::new();
103
104 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
105 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
106 queue.enqueue(id3.clone(), make_prompt(id3.clone()));
107
108 assert_eq!(queue.len(), 3);
109 assert_eq!(queue.dequeue().unwrap().input_id, id1);
110 assert_eq!(queue.dequeue().unwrap().input_id, id2);
111 assert_eq!(queue.dequeue().unwrap().input_id, id3);
112 assert!(queue.is_empty());
113 }
114
115 #[test]
116 fn remove_by_id() {
117 let mut queue = InputQueue::new();
118 let id1 = InputId::new();
119 let id2 = InputId::new();
120
121 queue.enqueue(id1.clone(), make_prompt(id1.clone()));
122 queue.enqueue(id2.clone(), make_prompt(id2.clone()));
123
124 let removed = queue.remove(&id1);
125 assert!(removed.is_some());
126 assert_eq!(queue.len(), 1);
127 assert_eq!(queue.dequeue().unwrap().input_id, id2);
128 }
129
130 #[test]
131 fn remove_nonexistent() {
132 let mut queue = InputQueue::new();
133 let result = queue.remove(&InputId::new());
134 assert!(result.is_none());
135 }
136
137 #[test]
138 fn peek_does_not_remove() {
139 let mut queue = InputQueue::new();
140 let id = InputId::new();
141 queue.enqueue(id.clone(), make_prompt(id.clone()));
142
143 assert_eq!(queue.peek().unwrap().input_id, id);
144 assert_eq!(queue.len(), 1); }
146
147 #[test]
148 fn drain_empties_queue() {
149 let mut queue = InputQueue::new();
150 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
151 queue.enqueue(InputId::new(), make_prompt(InputId::new()));
152
153 let drained = queue.drain();
154 assert_eq!(drained.len(), 2);
155 assert!(queue.is_empty());
156 }
157}