styx_core/
queue.rs

1use crossbeam_queue::ArrayQueue;
2use std::sync::{
3    Arc,
4    atomic::{AtomicBool, Ordering},
5};
6
7/// Result of attempting to enqueue.
8///
9/// # Example
10/// ```rust
11/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
12///
13/// let (tx, _rx) = bounded::<u8>(1);
14/// assert_eq!(tx.send(1), SendOutcome::Ok);
15/// ```
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SendOutcome {
18    /// Value was accepted.
19    Ok,
20    /// Queue is full.
21    Full,
22    /// Queue is closed.
23    Closed,
24}
25
26/// Result of attempting to dequeue.
27///
28/// # Example
29/// ```rust
30/// use styx_core::prelude::{bounded, RecvOutcome};
31///
32/// let (_tx, rx) = bounded::<u8>(1);
33/// match rx.recv() {
34///     RecvOutcome::Empty | RecvOutcome::Closed | RecvOutcome::Data(_) => {}
35/// }
36/// ```
37#[derive(Debug)]
38pub enum RecvOutcome<T> {
39    /// Received value.
40    Data(T),
41    /// Queue has been closed and drained.
42    Closed,
43    /// Queue currently empty.
44    Empty,
45}
46
47/// Bounded sender handle.
48///
49/// # Example
50/// ```rust
51/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
52///
53/// let (tx, _rx) = bounded::<u8>(1);
54/// assert_eq!(tx.send(1), SendOutcome::Ok);
55/// ```
56#[derive(Clone)]
57pub struct BoundedTx<T> {
58    inner: Arc<QueueInner<T>>,
59}
60
61impl<T> BoundedTx<T> {
62    /// Attempt to send without blocking.
63    pub fn send(&self, value: T) -> SendOutcome {
64        if self.inner.closed.load(Ordering::Acquire) {
65            return SendOutcome::Closed;
66        }
67        self.inner
68            .queue
69            .push(value)
70            .map(|_| SendOutcome::Ok)
71            .unwrap_or(SendOutcome::Full)
72    }
73
74    /// Close the queue to further sends.
75    pub fn close(&self) {
76        self.inner.closed.store(true, Ordering::Release);
77    }
78}
79
80#[cfg(feature = "async")]
81impl<T> BoundedTx<T> {
82    /// Async helper that yields on backpressure.
83    pub async fn send_async(&self, mut value: T) -> SendOutcome {
84        loop {
85            if self.inner.closed.load(Ordering::Acquire) {
86                return SendOutcome::Closed;
87            }
88            match self.inner.queue.push(value) {
89                Ok(()) => return SendOutcome::Ok,
90                Err(v) => {
91                    value = v;
92                    tokio::task::yield_now().await;
93                }
94            }
95        }
96    }
97}
98
99/// Bounded receiver handle.
100///
101/// # Example
102/// ```rust
103/// use styx_core::prelude::{bounded, RecvOutcome};
104///
105/// let (_tx, rx) = bounded::<u8>(1);
106/// assert!(matches!(rx.recv(), RecvOutcome::Empty | RecvOutcome::Closed));
107/// ```
108#[derive(Clone)]
109pub struct BoundedRx<T> {
110    inner: Arc<QueueInner<T>>,
111}
112
113impl<T> BoundedRx<T> {
114    /// Attempt to receive without blocking.
115    pub fn recv(&self) -> RecvOutcome<T> {
116        match self.inner.queue.pop() {
117            Some(value) => RecvOutcome::Data(value),
118            None => {
119                if self.inner.closed.load(Ordering::Acquire) {
120                    RecvOutcome::Closed
121                } else {
122                    RecvOutcome::Empty
123                }
124            }
125        }
126    }
127
128    /// Mark the queue as closed; senders will see `Closed` and exit.
129    pub fn close(&self) {
130        self.inner.closed.store(true, Ordering::Release);
131    }
132}
133
134#[cfg(feature = "async")]
135impl<T> BoundedRx<T> {
136    /// Async helper that yields until data or closure.
137    pub async fn recv_async(&self) -> RecvOutcome<T> {
138        loop {
139            match self.recv() {
140                RecvOutcome::Empty => {
141                    tokio::task::yield_now().await;
142                }
143                other => return other,
144            }
145        }
146    }
147}
148
149struct QueueInner<T> {
150    queue: ArrayQueue<T>,
151    closed: AtomicBool,
152}
153
154/// Create a bounded queue with the given capacity.
155///
156/// # Example
157/// ```rust
158/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
159///
160/// let (tx, rx) = bounded::<u8>(1);
161/// assert_eq!(tx.send(1), SendOutcome::Ok);
162/// match rx.recv() {
163///     RecvOutcome::Data(_) | RecvOutcome::Empty | RecvOutcome::Closed => {}
164/// }
165/// ```
166pub fn bounded<T>(capacity: usize) -> (BoundedTx<T>, BoundedRx<T>) {
167    let inner = Arc::new(QueueInner {
168        queue: ArrayQueue::new(capacity),
169        closed: AtomicBool::new(false),
170    });
171    (
172        BoundedTx {
173            inner: inner.clone(),
174        },
175        BoundedRx { inner },
176    )
177}
178
179/// Create an unbounded queue using a generous default capacity.
180///
181/// # Example
182/// ```rust
183/// use styx_core::prelude::unbounded;
184///
185/// let (_tx, _rx) = unbounded::<u8>();
186/// ```
187pub fn unbounded<T>() -> (BoundedTx<T>, BoundedRx<T>) {
188    bounded(1024)
189}
190
191/// Newest-value queue: always returns the latest value without backpressure.
192///
193/// # Example
194/// ```rust
195/// use styx_core::prelude::{newest, RecvOutcome};
196///
197/// let (tx, rx) = newest::<u8>();
198/// let _ = tx.send(5);
199/// assert!(matches!(rx.recv(), RecvOutcome::Data(_)));
200/// ```
201pub fn newest<T>() -> (NewestTx<T>, NewestRx<T>)
202where
203    T: Clone,
204{
205    let shared = Arc::new(NewestInner {
206        slot: parking_lot::RwLock::new(None),
207        closed: AtomicBool::new(false),
208    });
209    (
210        NewestTx {
211            inner: shared.clone(),
212        },
213        NewestRx { inner: shared },
214    )
215}
216
217/// Sender for newest-value queue.
218///
219/// # Example
220/// ```rust
221/// use styx_core::prelude::newest;
222///
223/// let (tx, _rx) = newest::<u8>();
224/// let _ = tx.send(1);
225/// ```
226#[derive(Clone)]
227pub struct NewestTx<T> {
228    inner: Arc<NewestInner<T>>,
229}
230
231impl<T: Clone> NewestTx<T> {
232    /// Overwrite with the latest value.
233    pub fn send(&self, value: T) -> SendOutcome {
234        if self.inner.closed.load(Ordering::Acquire) {
235            return SendOutcome::Closed;
236        }
237        *self.inner.slot.write() = Some(value);
238        SendOutcome::Ok
239    }
240
241    /// Close the queue.
242    pub fn close(&self) {
243        self.inner.closed.store(true, Ordering::Release);
244    }
245}
246
247/// Receiver for newest-value queue.
248///
249/// # Example
250/// ```rust
251/// use styx_core::prelude::{newest, RecvOutcome};
252///
253/// let (_tx, rx) = newest::<u8>();
254/// assert!(matches!(rx.recv(), RecvOutcome::Empty | RecvOutcome::Closed));
255/// ```
256#[derive(Clone)]
257pub struct NewestRx<T> {
258    inner: Arc<NewestInner<T>>,
259}
260
261impl<T: Clone> NewestRx<T> {
262    /// Get the latest value if present.
263    pub fn recv(&self) -> RecvOutcome<T> {
264        let read = self.inner.slot.read();
265        if let Some(value) = read.as_ref() {
266            RecvOutcome::Data(value.clone())
267        } else if self.inner.closed.load(Ordering::Acquire) {
268            RecvOutcome::Closed
269        } else {
270            RecvOutcome::Empty
271        }
272    }
273}
274
275struct NewestInner<T> {
276    slot: parking_lot::RwLock<Option<T>>,
277    closed: AtomicBool,
278}