Skip to main content

limen_core/edge/
spsc_concurrent.rs

1//! Thread-safe ring buffer implementing [`Edge`], backed by `Arc<Mutex<…>>`.
2//!
3//! `ConcurrentEdge` is the `std`-feature drop-in for `StaticRing<N>` in graphs
4//! executed by `run_scoped()` on multiple threads. Cloning yields another handle
5//! to the **same** underlying ring — all clones share one `Arc<Mutex<…>>`.
6//!
7//! # Locking discipline
8//! Every method locks `inner` exactly once, does all work on the locked data, and
9//! releases the lock before returning. No method calls another `Edge` method while
10//! holding the lock — this prevents deadlocks. `try_pop_batch` materialises its
11//! result tokens into an owned `Vec` before releasing, so no borrowed reference
12//! escapes the critical section.
13
14use alloc::vec::Vec;
15use std::sync::{Arc, Mutex};
16
17use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
18use crate::errors::QueueError;
19use crate::message::batch::BatchView;
20use crate::policy::{AdmissionDecision, BatchingPolicy, EdgePolicy, WindowKind};
21use crate::prelude::HeaderStore;
22use crate::types::MessageToken;
23
24// ---------------------------------------------------------------------------
25// Inner state
26// ---------------------------------------------------------------------------
27
28struct ConcurrentEdgeInner {
29    buf: Vec<MessageToken>,
30    head: usize,
31    tail: usize,
32    len: usize,
33    capacity: usize,
34    bytes: usize,
35}
36
37impl ConcurrentEdgeInner {
38    fn new(capacity: usize) -> Self {
39        Self {
40            buf: alloc::vec![MessageToken::default(); capacity],
41            head: 0,
42            tail: 0,
43            len: 0,
44            capacity,
45            bytes: 0,
46        }
47    }
48
49    #[inline]
50    fn is_full(&self) -> bool {
51        self.len == self.capacity
52    }
53
54    #[inline]
55    fn push_raw(&mut self, token: MessageToken) {
56        self.buf[self.tail] = token;
57        self.tail = (self.tail + 1) % self.capacity;
58        self.len += 1;
59    }
60
61    #[inline]
62    fn pop_raw(&mut self) -> MessageToken {
63        let token = core::mem::take(&mut self.buf[self.head]);
64        self.head = (self.head + 1) % self.capacity;
65        self.len -= 1;
66        token
67    }
68
69    /// Rotate so live items are contiguous at `buf[0..len]`.
70    fn normalize(&mut self) {
71        if self.len == 0 {
72            self.head = 0;
73            self.tail = 0;
74            return;
75        }
76
77        if self.head == 0 {
78            self.tail = self.len % self.capacity;
79            return;
80        }
81
82        let cap = self.capacity;
83        let mut live_tokens = Vec::with_capacity(self.len);
84
85        for i in 0..self.len {
86            let src = (self.head + i) % cap;
87            live_tokens.push(core::mem::take(&mut self.buf[src]));
88        }
89
90        self.buf[..self.len].copy_from_slice(&live_tokens[..self.len]);
91
92        for i in self.len..cap {
93            self.buf[i] = MessageToken::default();
94        }
95
96        self.head = 0;
97        self.tail = self.len % cap;
98    }
99}
100
101// ---------------------------------------------------------------------------
102// Public handle
103// ---------------------------------------------------------------------------
104
105/// A thread-safe edge handle. All clones share the same underlying ring buffer.
106///
107/// Use `ConcurrentEdge::new(capacity)` to create; clone for each worker that
108/// needs read or write access. Intended for use in codegen-generated `run_scoped()`
109/// methods as the `std`-feature replacement for `StaticRing<N>`.
110#[derive(Clone)]
111pub struct ConcurrentEdge {
112    inner: Arc<Mutex<ConcurrentEdgeInner>>,
113}
114
115impl ConcurrentEdge {
116    /// Create a new ring buffer with the given item capacity.
117    pub fn new(capacity: usize) -> Self {
118        Self {
119            inner: Arc::new(Mutex::new(ConcurrentEdgeInner::new(capacity))),
120        }
121    }
122}
123
124// ---------------------------------------------------------------------------
125// Edge impl
126// ---------------------------------------------------------------------------
127
128impl Edge for ConcurrentEdge {
129    fn try_push<H: HeaderStore>(
130        &mut self,
131        token: MessageToken,
132        policy: &EdgePolicy,
133        headers: &H,
134    ) -> EnqueueResult {
135        let mut inner = match self.inner.lock() {
136            Ok(g) => g,
137            Err(_) => return EnqueueResult::Rejected,
138        };
139
140        // Inline get_admission_decision — do NOT call self.get_admission_decision()
141        // as that calls self.occupancy() which would re-lock and deadlock.
142        let (decision, item_bytes) = match headers.peek_header(token) {
143            Ok(h) => {
144                let b = *h.payload_size_bytes();
145                let d = policy.decide(inner.len, inner.bytes, b, *h.deadline_ns(), *h.qos());
146                (d, b)
147            }
148            Err(_) => return EnqueueResult::Rejected,
149        };
150
151        match decision {
152            AdmissionDecision::Admit => {
153                if inner.is_full() || policy.caps.at_or_above_hard(inner.len, inner.bytes) {
154                    return EnqueueResult::Rejected;
155                }
156                inner.bytes = inner.bytes.saturating_add(item_bytes);
157                inner.push_raw(token);
158                EnqueueResult::Enqueued
159            }
160            AdmissionDecision::DropNewest => EnqueueResult::DroppedNewest,
161            AdmissionDecision::Reject | AdmissionDecision::Block => EnqueueResult::Rejected,
162            AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
163                if inner.is_full() || policy.caps.at_or_above_hard(inner.len, inner.bytes) {
164                    return EnqueueResult::Rejected;
165                }
166                inner.bytes = inner.bytes.saturating_add(item_bytes);
167                inner.push_raw(token);
168                EnqueueResult::Enqueued
169            }
170        }
171    }
172
173    fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
174        let mut inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
175        if inner.len == 0 {
176            return Err(QueueError::Empty);
177        }
178        let front_token = inner.buf[inner.head];
179        let front_bytes = headers
180            .peek_header(front_token)
181            .map(|h| *h.payload_size_bytes())
182            .unwrap_or(0);
183        let token = inner.pop_raw();
184        inner.bytes = inner.bytes.saturating_sub(front_bytes);
185        Ok(token)
186    }
187
188    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
189        match self.inner.lock() {
190            Ok(inner) => {
191                let watermark = policy.watermark(inner.len, inner.bytes);
192                EdgeOccupancy::new(inner.len, inner.bytes, watermark)
193            }
194            Err(_) => EdgeOccupancy::new(0, 0, policy.watermark(0, 0)),
195        }
196    }
197
198    fn is_empty(&self) -> bool {
199        self.inner.lock().map(|g| g.len == 0).unwrap_or(true)
200    }
201
202    fn try_peek(&self) -> Result<MessageToken, QueueError> {
203        let inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
204        if inner.len == 0 {
205            return Err(QueueError::Empty);
206        }
207        Ok(inner.buf[inner.head])
208    }
209
210    fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
211        let inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
212        if inner.len == 0 || index >= inner.len {
213            return Err(QueueError::Empty);
214        }
215        let pos = (inner.head + index) % inner.capacity;
216        Ok(inner.buf[pos])
217    }
218
219    fn try_pop_batch<H: HeaderStore>(
220        &mut self,
221        policy: &BatchingPolicy,
222        headers: &H,
223    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
224        let mut inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
225
226        if inner.len == 0 {
227            return Err(QueueError::Empty);
228        }
229
230        inner.normalize();
231        let old_len = inner.len;
232
233        let fixed_opt = *policy.fixed_n();
234        let delta_t_opt = *policy.max_delta_t();
235        let window_kind = policy.window_kind();
236
237        let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
238            Some(1)
239        } else {
240            fixed_opt
241        };
242
243        // Delta-t window: count how many consecutive items fall within cap ticks.
244        let mut delta_count = inner.len;
245        if let Some(cap) = delta_t_opt {
246            if let Ok(front_header) = headers.peek_header(inner.buf[0]) {
247                let front_ticks = *front_header.creation_tick();
248                let mut c = 0usize;
249                while c < inner.len {
250                    if let Ok(h) = headers.peek_header(inner.buf[c]) {
251                        let tick = *h.creation_tick();
252                        let delta = tick.saturating_sub(front_ticks);
253                        if delta <= cap {
254                            c += 1;
255                        } else {
256                            break;
257                        }
258                    } else {
259                        break;
260                    }
261                }
262                delta_count = c;
263            }
264        }
265
266        let apply_fixed = |limit: usize| -> usize {
267            if let Some(n) = effective_fixed {
268                core::cmp::min(limit, n)
269            } else {
270                limit
271            }
272        };
273
274        // --- Disjoint window
275        if let WindowKind::Disjoint = window_kind {
276            let take_n = apply_fixed(core::cmp::min(inner.len, delta_count));
277            if take_n == 0 {
278                return Err(QueueError::Empty);
279            }
280            let mut dropped_bytes = 0usize;
281            for i in 0..take_n {
282                if let Ok(h) = headers.peek_header(inner.buf[i]) {
283                    dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
284                }
285            }
286            inner.bytes = inner.bytes.saturating_sub(dropped_bytes);
287            inner.len = old_len - take_n;
288            inner.head = take_n % inner.capacity;
289            inner.tail = (inner.head + inner.len) % inner.capacity;
290
291            let mut owned = Vec::with_capacity(take_n);
292            for &tok in &inner.buf[..take_n] {
293                owned.push(tok);
294            }
295            return Ok(BatchView::from_owned(owned));
296        }
297
298        // --- Sliding window
299        if let WindowKind::Sliding(sw) = window_kind {
300            let stride = *sw.stride();
301            let size = effective_fixed.unwrap_or(1);
302            let max_present =
303                apply_fixed(core::cmp::min(core::cmp::min(inner.len, size), delta_count));
304            let stride_to_pop = core::cmp::min(stride, inner.len);
305
306            if max_present == 0 {
307                return Err(QueueError::Empty);
308            }
309
310            let mut popped_bytes = 0usize;
311            for i in 0..stride_to_pop {
312                if let Ok(h) = headers.peek_header(inner.buf[i]) {
313                    popped_bytes = popped_bytes.saturating_add(*h.payload_size_bytes());
314                }
315            }
316            inner.bytes = inner.bytes.saturating_sub(popped_bytes);
317            inner.len = old_len - stride_to_pop;
318            inner.head = stride_to_pop % inner.capacity;
319            inner.tail = (inner.head + inner.len) % inner.capacity;
320
321            let mut owned = Vec::with_capacity(max_present);
322            for &tok in &inner.buf[..max_present] {
323                owned.push(tok);
324            }
325            return Ok(BatchView::from_owned(owned));
326        }
327
328        // --- Default (future WindowKind variants or non-exhaustive fallback)
329        let take_n = apply_fixed(core::cmp::min(inner.len, delta_count));
330        if take_n == 0 {
331            return Err(QueueError::Empty);
332        }
333        let mut dropped_bytes = 0usize;
334        for i in 0..take_n {
335            if let Ok(h) = headers.peek_header(inner.buf[i]) {
336                dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
337            }
338        }
339        inner.bytes = inner.bytes.saturating_sub(dropped_bytes);
340        inner.len = old_len - take_n;
341        inner.head = take_n % inner.capacity;
342        inner.tail = (inner.head + inner.len) % inner.capacity;
343
344        let mut owned = Vec::with_capacity(take_n);
345        for &tok in &inner.buf[..take_n] {
346            owned.push(tok);
347        }
348        Ok(BatchView::from_owned(owned))
349    }
350}
351
352#[cfg(feature = "std")]
353impl crate::edge::ScopedEdge for ConcurrentEdge {
354    type Handle<'a>
355        = ConcurrentEdge
356    where
357        Self: 'a;
358
359    fn scoped_handle<'a>(&'a self, _kind: crate::edge::EdgeHandleKind) -> Self::Handle<'a>
360    where
361        Self: 'a,
362    {
363        self.clone()
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    crate::run_edge_contract_tests!(concurrent_edge_contract, || ConcurrentEdge::new(16));
372}