vibe_code/
queue.rs

1//! Implements `VibeQueue`, a thread-safe, bounded queue for tasks.
2//!
3//! This module provides the core queuing mechanism for a `VibeNode`. It manages
4//! task backpressure and signals the node when it becomes overloaded or idle,
5//! based on configurable high and low watermarks.
6
7use crate::signals::{NodeId, QueueId, SystemSignal};
8use crate::task::Task;
9use std::collections::VecDeque;
10use std::fmt;
11use std::sync::mpsc;
12use std::sync::{
13    Arc, Mutex,
14    atomic::{AtomicBool, Ordering},
15};
16
17/// Represents errors that can occur during queue operations.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum QueueError {
20    /// The queue has reached its maximum capacity.
21    Full,
22    /// The queue is empty.
23    Empty,
24    /// The queue has been closed and can no longer be used.
25    Closed,
26    /// Failed to send a system signal regarding queue state.
27    SendError,
28}
29
30impl fmt::Display for QueueError {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            QueueError::Full => write!(f, "Queue is full"),
34            QueueError::Empty => write!(f, "Queue is empty"),
35            QueueError::Closed => write!(f, "Queue is closed"),
36            QueueError::SendError => write!(f, "Failed to send system signal"),
37        }
38    }
39}
40
41impl std::error::Error for QueueError {}
42
43/// A bounded, thread-safe queue for holding tasks.
44///
45/// `VibeQueue` is used within each `VibeNode` to buffer incoming tasks. It sends
46/// `NodeOverloaded` and `NodeIdle` signals to the system when its size crosses
47/// defined percentage thresholds (watermarks).
48pub struct VibeQueue<T> {
49    /// A unique identifier for this queue.
50    id: QueueId,
51    /// The ID of the node that owns this queue.
52    node_id: NodeId,
53    /// The internal queue storage, protected by a `Mutex`.
54    tasks: Arc<Mutex<VecDeque<T>>>,
55    /// The maximum number of items the queue can hold.
56    capacity: usize,
57    /// The percentage of capacity at or below which the queue is considered "idle".
58    low_watermark_percentage: f32,
59    /// The percentage of capacity at or above which the queue is considered "overloaded".
60    high_watermark_percentage: f32,
61    /// A channel to send signals to the central system.
62    signal_tx: mpsc::Sender<SystemSignal>,
63    /// A flag to track if the queue is currently in an overloaded state.
64    was_overloaded: Arc<AtomicBool>,
65    /// A flag to indicate if the queue has been permanently closed.
66    is_closed: Arc<AtomicBool>,
67}
68
69impl<T> fmt::Debug for VibeQueue<T> {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("VibeQueue")
72            .field("id", &self.id)
73            .field("node_id", &self.node_id)
74            .field("capacity", &self.capacity)
75            .field("current_len", &self.len())
76            .field("is_closed", &self.is_closed.load(Ordering::Relaxed))
77            .finish()
78    }
79}
80
81impl<T> VibeQueue<T> {
82    /// Returns the current number of items in the queue.
83    pub fn len(&self) -> usize {
84        self.tasks
85            .lock()
86            .expect("Queue mutex poisoned for len")
87            .len()
88    }
89
90    /// Returns the maximum capacity of the queue.
91    pub fn capacity(&self) -> usize {
92        self.capacity
93    }
94
95    /// Returns `true` if the queue contains no items.
96    pub fn is_empty(&self) -> bool {
97        self.len() == 0
98    }
99
100    /// Returns `true` if the queue has reached its maximum capacity.
101    pub fn is_full(&self) -> bool {
102        self.len() >= self.capacity
103    }
104
105    /// Returns the unique ID of this queue.
106    pub fn id(&self) -> QueueId {
107        self.id
108    }
109
110    /// Closes the queue, preventing any new items from being added.
111    ///
112    /// Items already in the queue can still be removed.
113    pub fn close(&self) {
114        self.is_closed.store(true, Ordering::SeqCst);
115    }
116
117    /// Returns `true` if the queue has been closed.
118    pub fn is_closed(&self) -> bool {
119        self.is_closed.load(Ordering::Relaxed)
120    }
121}
122
123impl VibeQueue<Task> {
124    /// Creates a new `VibeQueue` with custom watermarks for signaling.
125    ///
126    /// # Panics
127    /// Panics if `capacity` is 0 or if watermark percentages are invalid.
128    pub fn with_watermarks_and_signal(
129        node_id: NodeId,
130        capacity: usize,
131        low_watermark_percentage: f32,
132        high_watermark_percentage: f32,
133        signal_tx: mpsc::Sender<SystemSignal>,
134    ) -> Self {
135        if capacity == 0 {
136            panic!("VibeQueue capacity cannot be 0");
137        }
138        if !(0.0 < low_watermark_percentage && low_watermark_percentage < high_watermark_percentage)
139        {
140            panic!("Invalid low_watermark_percentage");
141        }
142        if !(low_watermark_percentage < high_watermark_percentage
143            && high_watermark_percentage < 1.0)
144        {
145            panic!("Invalid high_watermark_percentage");
146        }
147
148        VibeQueue {
149            id: QueueId::new(),
150            node_id,
151            tasks: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))),
152            capacity,
153            low_watermark_percentage,
154            high_watermark_percentage,
155            signal_tx,
156            was_overloaded: Arc::new(AtomicBool::new(false)),
157            is_closed: Arc::new(AtomicBool::new(false)),
158        }
159    }
160
161    /// Creates a new `VibeQueue` with default watermarks (25% and 75%).
162    pub fn new_with_signal(
163        node_id: NodeId,
164        capacity: usize,
165        signal_tx: mpsc::Sender<SystemSignal>,
166    ) -> Self {
167        Self::with_watermarks_and_signal(node_id, capacity, 0.25, 0.75, signal_tx)
168    }
169
170    /// Adds a task to the back of the queue.
171    ///
172    /// If adding the task causes the queue to become full, it sends a
173    /// `NodeOverloaded` signal.
174    pub fn enqueue(&self, task: Task) -> Result<(), QueueError> {
175        if self.is_closed.load(Ordering::Relaxed) {
176            return Err(QueueError::Closed);
177        }
178
179        let mut tasks_guard = self.tasks.lock().expect("Queue mutex poisoned for enqueue");
180
181        if tasks_guard.len() >= self.capacity {
182            if !self.was_overloaded.swap(true, Ordering::SeqCst) {
183                let signal = SystemSignal::NodeOverloaded {
184                    node_id: self.node_id,
185                    queue_id: Some(self.id),
186                };
187                if self.signal_tx.send(signal).is_err() {
188                    self.was_overloaded.store(false, Ordering::SeqCst);
189                    return Err(QueueError::SendError);
190                }
191            }
192            return Err(QueueError::Full);
193        }
194
195        tasks_guard.push_back(task);
196        Ok(())
197    }
198
199    /// Removes a task from the front of the queue.
200    ///
201    /// If removing the task causes a previously overloaded queue to drop below
202    /// its low watermark, it sends a `NodeIdle` signal.
203    pub fn dequeue(&self) -> Option<Task> {
204        let mut tasks_guard = self.tasks.lock().expect("Queue mutex poisoned for dequeue");
205        let task_option = tasks_guard.pop_front();
206
207        if task_option.is_some() {
208            let current_len = tasks_guard.len();
209            let low_watermark_count =
210                (self.capacity as f32 * self.low_watermark_percentage) as usize;
211
212            if self.was_overloaded.load(Ordering::Relaxed)
213                && current_len <= low_watermark_count
214                && self
215                    .was_overloaded
216                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
217                    .is_ok()
218            {
219                let signal = SystemSignal::NodeIdle {
220                    node_id: self.node_id,
221                    queue_id: Some(self.id),
222                };
223                if self.signal_tx.send(signal).is_err() {
224                    self.was_overloaded.store(true, Ordering::SeqCst);
225                }
226            }
227        } else if self.is_closed.load(Ordering::Relaxed) && tasks_guard.is_empty() {
228            if self
229                .was_overloaded
230                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
231                .is_ok()
232            {
233                let signal = SystemSignal::NodeIdle {
234                    node_id: self.node_id,
235                    queue_id: Some(self.id),
236                };
237                let _ = self.signal_tx.send(signal);
238            }
239            return None;
240        }
241
242        task_option
243    }
244}
245
246impl<T> Clone for VibeQueue<T> {
247    /// Clones the queue.
248    ///
249    /// This is a cheap operation, as it only clones the `Arc` pointers to the
250    /// underlying queue data, not the data itself.
251    fn clone(&self) -> Self {
252        VibeQueue {
253            id: self.id,
254            node_id: self.node_id,
255            tasks: Arc::clone(&self.tasks),
256            capacity: self.capacity,
257            low_watermark_percentage: self.low_watermark_percentage,
258            high_watermark_percentage: self.high_watermark_percentage,
259            signal_tx: self.signal_tx.clone(),
260            was_overloaded: Arc::clone(&self.was_overloaded),
261            is_closed: Arc::clone(&self.is_closed),
262        }
263    }
264}