styx_core/queue.rs
1use crossbeam_queue::ArrayQueue;
2use std::sync::{
3 Arc,
4 atomic::{AtomicBool, Ordering},
5};
6
7/// Result of attempting to enqueue.
8///
9/// # Example
10/// ```rust
11/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
12///
13/// let (tx, _rx) = bounded::<u8>(1);
14/// assert_eq!(tx.send(1), SendOutcome::Ok);
15/// ```
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SendOutcome {
18 /// Value was accepted.
19 Ok,
20 /// Queue is full.
21 Full,
22 /// Queue is closed.
23 Closed,
24}
25
26/// Result of attempting to dequeue.
27///
28/// # Example
29/// ```rust
30/// use styx_core::prelude::{bounded, RecvOutcome};
31///
32/// let (_tx, rx) = bounded::<u8>(1);
33/// match rx.recv() {
34/// RecvOutcome::Empty | RecvOutcome::Closed | RecvOutcome::Data(_) => {}
35/// }
36/// ```
37#[derive(Debug)]
38pub enum RecvOutcome<T> {
39 /// Received value.
40 Data(T),
41 /// Queue has been closed and drained.
42 Closed,
43 /// Queue currently empty.
44 Empty,
45}
46
47/// Bounded sender handle.
48///
49/// # Example
50/// ```rust
51/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
52///
53/// let (tx, _rx) = bounded::<u8>(1);
54/// assert_eq!(tx.send(1), SendOutcome::Ok);
55/// ```
56#[derive(Clone)]
57pub struct BoundedTx<T> {
58 inner: Arc<QueueInner<T>>,
59}
60
61impl<T> BoundedTx<T> {
62 /// Attempt to send without blocking.
63 pub fn send(&self, value: T) -> SendOutcome {
64 if self.inner.closed.load(Ordering::Acquire) {
65 return SendOutcome::Closed;
66 }
67 self.inner
68 .queue
69 .push(value)
70 .map(|_| SendOutcome::Ok)
71 .unwrap_or(SendOutcome::Full)
72 }
73
74 /// Close the queue to further sends.
75 pub fn close(&self) {
76 self.inner.closed.store(true, Ordering::Release);
77 }
78}
79
80#[cfg(feature = "async")]
81impl<T> BoundedTx<T> {
82 /// Async helper that yields on backpressure.
83 pub async fn send_async(&self, mut value: T) -> SendOutcome {
84 loop {
85 if self.inner.closed.load(Ordering::Acquire) {
86 return SendOutcome::Closed;
87 }
88 match self.inner.queue.push(value) {
89 Ok(()) => return SendOutcome::Ok,
90 Err(v) => {
91 value = v;
92 tokio::task::yield_now().await;
93 }
94 }
95 }
96 }
97}
98
99/// Bounded receiver handle.
100///
101/// # Example
102/// ```rust
103/// use styx_core::prelude::{bounded, RecvOutcome};
104///
105/// let (_tx, rx) = bounded::<u8>(1);
106/// assert!(matches!(rx.recv(), RecvOutcome::Empty | RecvOutcome::Closed));
107/// ```
108#[derive(Clone)]
109pub struct BoundedRx<T> {
110 inner: Arc<QueueInner<T>>,
111}
112
113impl<T> BoundedRx<T> {
114 /// Attempt to receive without blocking.
115 pub fn recv(&self) -> RecvOutcome<T> {
116 match self.inner.queue.pop() {
117 Some(value) => RecvOutcome::Data(value),
118 None => {
119 if self.inner.closed.load(Ordering::Acquire) {
120 RecvOutcome::Closed
121 } else {
122 RecvOutcome::Empty
123 }
124 }
125 }
126 }
127
128 /// Mark the queue as closed; senders will see `Closed` and exit.
129 pub fn close(&self) {
130 self.inner.closed.store(true, Ordering::Release);
131 }
132}
133
134#[cfg(feature = "async")]
135impl<T> BoundedRx<T> {
136 /// Async helper that yields until data or closure.
137 pub async fn recv_async(&self) -> RecvOutcome<T> {
138 loop {
139 match self.recv() {
140 RecvOutcome::Empty => {
141 tokio::task::yield_now().await;
142 }
143 other => return other,
144 }
145 }
146 }
147}
148
149struct QueueInner<T> {
150 queue: ArrayQueue<T>,
151 closed: AtomicBool,
152}
153
154/// Create a bounded queue with the given capacity.
155///
156/// # Example
157/// ```rust
158/// use styx_core::prelude::{bounded, RecvOutcome, SendOutcome};
159///
160/// let (tx, rx) = bounded::<u8>(1);
161/// assert_eq!(tx.send(1), SendOutcome::Ok);
162/// match rx.recv() {
163/// RecvOutcome::Data(_) | RecvOutcome::Empty | RecvOutcome::Closed => {}
164/// }
165/// ```
166pub fn bounded<T>(capacity: usize) -> (BoundedTx<T>, BoundedRx<T>) {
167 let inner = Arc::new(QueueInner {
168 queue: ArrayQueue::new(capacity),
169 closed: AtomicBool::new(false),
170 });
171 (
172 BoundedTx {
173 inner: inner.clone(),
174 },
175 BoundedRx { inner },
176 )
177}
178
179/// Create an unbounded queue using a generous default capacity.
180///
181/// # Example
182/// ```rust
183/// use styx_core::prelude::unbounded;
184///
185/// let (_tx, _rx) = unbounded::<u8>();
186/// ```
187pub fn unbounded<T>() -> (BoundedTx<T>, BoundedRx<T>) {
188 bounded(1024)
189}
190
191/// Newest-value queue: always returns the latest value without backpressure.
192///
193/// # Example
194/// ```rust
195/// use styx_core::prelude::{newest, RecvOutcome};
196///
197/// let (tx, rx) = newest::<u8>();
198/// let _ = tx.send(5);
199/// assert!(matches!(rx.recv(), RecvOutcome::Data(_)));
200/// ```
201pub fn newest<T>() -> (NewestTx<T>, NewestRx<T>)
202where
203 T: Clone,
204{
205 let shared = Arc::new(NewestInner {
206 slot: parking_lot::RwLock::new(None),
207 closed: AtomicBool::new(false),
208 });
209 (
210 NewestTx {
211 inner: shared.clone(),
212 },
213 NewestRx { inner: shared },
214 )
215}
216
217/// Sender for newest-value queue.
218///
219/// # Example
220/// ```rust
221/// use styx_core::prelude::newest;
222///
223/// let (tx, _rx) = newest::<u8>();
224/// let _ = tx.send(1);
225/// ```
226#[derive(Clone)]
227pub struct NewestTx<T> {
228 inner: Arc<NewestInner<T>>,
229}
230
231impl<T: Clone> NewestTx<T> {
232 /// Overwrite with the latest value.
233 pub fn send(&self, value: T) -> SendOutcome {
234 if self.inner.closed.load(Ordering::Acquire) {
235 return SendOutcome::Closed;
236 }
237 *self.inner.slot.write() = Some(value);
238 SendOutcome::Ok
239 }
240
241 /// Close the queue.
242 pub fn close(&self) {
243 self.inner.closed.store(true, Ordering::Release);
244 }
245}
246
247/// Receiver for newest-value queue.
248///
249/// # Example
250/// ```rust
251/// use styx_core::prelude::{newest, RecvOutcome};
252///
253/// let (_tx, rx) = newest::<u8>();
254/// assert!(matches!(rx.recv(), RecvOutcome::Empty | RecvOutcome::Closed));
255/// ```
256#[derive(Clone)]
257pub struct NewestRx<T> {
258 inner: Arc<NewestInner<T>>,
259}
260
261impl<T: Clone> NewestRx<T> {
262 /// Get the latest value if present.
263 pub fn recv(&self) -> RecvOutcome<T> {
264 let read = self.inner.slot.read();
265 if let Some(value) = read.as_ref() {
266 RecvOutcome::Data(value.clone())
267 } else if self.inner.closed.load(Ordering::Acquire) {
268 RecvOutcome::Closed
269 } else {
270 RecvOutcome::Empty
271 }
272 }
273}
274
275struct NewestInner<T> {
276 slot: parking_lot::RwLock<Option<T>>,
277 closed: AtomicBool,
278}