1use crate::signals::{NodeId, QueueId, SystemSignal};
8use crate::task::Task;
9use std::collections::VecDeque;
10use std::fmt;
11use std::sync::mpsc;
12use std::sync::{
13 Arc, Mutex,
14 atomic::{AtomicBool, Ordering},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum QueueError {
20 Full,
22 Empty,
24 Closed,
26 SendError,
28}
29
30impl fmt::Display for QueueError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 QueueError::Full => write!(f, "Queue is full"),
34 QueueError::Empty => write!(f, "Queue is empty"),
35 QueueError::Closed => write!(f, "Queue is closed"),
36 QueueError::SendError => write!(f, "Failed to send system signal"),
37 }
38 }
39}
40
41impl std::error::Error for QueueError {}
42
43pub struct VibeQueue<T> {
49 id: QueueId,
51 node_id: NodeId,
53 tasks: Arc<Mutex<VecDeque<T>>>,
55 capacity: usize,
57 low_watermark_percentage: f32,
59 high_watermark_percentage: f32,
61 signal_tx: mpsc::Sender<SystemSignal>,
63 was_overloaded: Arc<AtomicBool>,
65 is_closed: Arc<AtomicBool>,
67}
68
69impl<T> fmt::Debug for VibeQueue<T> {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("VibeQueue")
72 .field("id", &self.id)
73 .field("node_id", &self.node_id)
74 .field("capacity", &self.capacity)
75 .field("current_len", &self.len())
76 .field("is_closed", &self.is_closed.load(Ordering::Relaxed))
77 .finish()
78 }
79}
80
81impl<T> VibeQueue<T> {
82 pub fn len(&self) -> usize {
84 self.tasks
85 .lock()
86 .expect("Queue mutex poisoned for len")
87 .len()
88 }
89
90 pub fn capacity(&self) -> usize {
92 self.capacity
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.len() == 0
98 }
99
100 pub fn is_full(&self) -> bool {
102 self.len() >= self.capacity
103 }
104
105 pub fn id(&self) -> QueueId {
107 self.id
108 }
109
110 pub fn close(&self) {
114 self.is_closed.store(true, Ordering::SeqCst);
115 }
116
117 pub fn is_closed(&self) -> bool {
119 self.is_closed.load(Ordering::Relaxed)
120 }
121}
122
123impl VibeQueue<Task> {
124 pub fn with_watermarks_and_signal(
129 node_id: NodeId,
130 capacity: usize,
131 low_watermark_percentage: f32,
132 high_watermark_percentage: f32,
133 signal_tx: mpsc::Sender<SystemSignal>,
134 ) -> Self {
135 if capacity == 0 {
136 panic!("VibeQueue capacity cannot be 0");
137 }
138 if !(0.0 < low_watermark_percentage && low_watermark_percentage < high_watermark_percentage)
139 {
140 panic!("Invalid low_watermark_percentage");
141 }
142 if !(low_watermark_percentage < high_watermark_percentage
143 && high_watermark_percentage < 1.0)
144 {
145 panic!("Invalid high_watermark_percentage");
146 }
147
148 VibeQueue {
149 id: QueueId::new(),
150 node_id,
151 tasks: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))),
152 capacity,
153 low_watermark_percentage,
154 high_watermark_percentage,
155 signal_tx,
156 was_overloaded: Arc::new(AtomicBool::new(false)),
157 is_closed: Arc::new(AtomicBool::new(false)),
158 }
159 }
160
161 pub fn new_with_signal(
163 node_id: NodeId,
164 capacity: usize,
165 signal_tx: mpsc::Sender<SystemSignal>,
166 ) -> Self {
167 Self::with_watermarks_and_signal(node_id, capacity, 0.25, 0.75, signal_tx)
168 }
169
170 pub fn enqueue(&self, task: Task) -> Result<(), QueueError> {
175 if self.is_closed.load(Ordering::Relaxed) {
176 return Err(QueueError::Closed);
177 }
178
179 let mut tasks_guard = self.tasks.lock().expect("Queue mutex poisoned for enqueue");
180
181 if tasks_guard.len() >= self.capacity {
182 if !self.was_overloaded.swap(true, Ordering::SeqCst) {
183 let signal = SystemSignal::NodeOverloaded {
184 node_id: self.node_id,
185 queue_id: Some(self.id),
186 };
187 if self.signal_tx.send(signal).is_err() {
188 self.was_overloaded.store(false, Ordering::SeqCst);
189 return Err(QueueError::SendError);
190 }
191 }
192 return Err(QueueError::Full);
193 }
194
195 tasks_guard.push_back(task);
196 Ok(())
197 }
198
199 pub fn dequeue(&self) -> Option<Task> {
204 let mut tasks_guard = self.tasks.lock().expect("Queue mutex poisoned for dequeue");
205 let task_option = tasks_guard.pop_front();
206
207 if task_option.is_some() {
208 let current_len = tasks_guard.len();
209 let low_watermark_count =
210 (self.capacity as f32 * self.low_watermark_percentage) as usize;
211
212 if self.was_overloaded.load(Ordering::Relaxed)
213 && current_len <= low_watermark_count
214 && self
215 .was_overloaded
216 .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
217 .is_ok()
218 {
219 let signal = SystemSignal::NodeIdle {
220 node_id: self.node_id,
221 queue_id: Some(self.id),
222 };
223 if self.signal_tx.send(signal).is_err() {
224 self.was_overloaded.store(true, Ordering::SeqCst);
225 }
226 }
227 } else if self.is_closed.load(Ordering::Relaxed) && tasks_guard.is_empty() {
228 if self
229 .was_overloaded
230 .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
231 .is_ok()
232 {
233 let signal = SystemSignal::NodeIdle {
234 node_id: self.node_id,
235 queue_id: Some(self.id),
236 };
237 let _ = self.signal_tx.send(signal);
238 }
239 return None;
240 }
241
242 task_option
243 }
244}
245
246impl<T> Clone for VibeQueue<T> {
247 fn clone(&self) -> Self {
252 VibeQueue {
253 id: self.id,
254 node_id: self.node_id,
255 tasks: Arc::clone(&self.tasks),
256 capacity: self.capacity,
257 low_watermark_percentage: self.low_watermark_percentage,
258 high_watermark_percentage: self.high_watermark_percentage,
259 signal_tx: self.signal_tx.clone(),
260 was_overloaded: Arc::clone(&self.was_overloaded),
261 is_closed: Arc::clone(&self.is_closed),
262 }
263 }
264}