Skip to main content

limen_core/edge/
spsc_array.rs

1//! Static SPSC ring buffer for P0 (no_std + no_alloc), **safe version**.
2//!
3//! Stores `MessageToken` values directly in a fixed-size `[MessageToken; N]`
4//! ring. Header metadata is accessed via `HeaderStore` for admission and
5//! batching decisions.
6
7use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
8use crate::errors::QueueError;
9use crate::policy::{AdmissionDecision, EdgePolicy};
10use crate::prelude::{BatchView, HeaderStore};
11use crate::types::MessageToken;
12
13use core::mem;
14
15/// A fixed-capacity ring buffer storing [`MessageToken`] values.
16pub struct StaticRing<const N: usize> {
17    buf: [MessageToken; N],
18    head: usize,
19    tail: usize,
20    len: usize,
21    /// Running byte total, updated on push/pop via HeaderStore.
22    bytes: usize,
23}
24
25impl<const N: usize> StaticRing<N> {
26    /// Create a new empty ring.
27    #[inline]
28    pub fn new() -> Self {
29        Self {
30            buf: [MessageToken::default(); N],
31            head: 0,
32            tail: 0,
33            len: 0,
34            bytes: 0,
35        }
36    }
37
38    #[inline]
39    fn is_full(&self) -> bool {
40        self.len == N
41    }
42
43    #[inline]
44    fn push_raw(&mut self, token: MessageToken) {
45        self.buf[self.tail] = token;
46        self.tail = (self.tail + 1) % N;
47        self.len += 1;
48    }
49
50    #[inline]
51    fn pop_raw(&mut self) -> MessageToken {
52        let token = mem::take(&mut self.buf[self.head]);
53        self.head = (self.head + 1) % N;
54        self.len -= 1;
55        token
56    }
57
58    /// Normalize so live items are contiguous at buf[0..len].
59    fn normalize(&mut self) {
60        if self.len == 0 {
61            self.head = 0;
62            self.tail = 0;
63            return;
64        }
65        if self.head == 0 {
66            self.tail = (self.head + self.len) % N;
67            return;
68        }
69        for i in 0..self.len {
70            let src_idx = (self.head + i) % N;
71            let tmp = mem::take(&mut self.buf[src_idx]);
72            self.buf[i] = tmp;
73        }
74        for i in self.len..N {
75            self.buf[i] = MessageToken::default();
76        }
77        self.head = 0;
78        self.tail = (self.head + self.len) % N;
79    }
80}
81
82impl<const N: usize> Default for StaticRing<N> {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl<const N: usize> Edge for StaticRing<N> {
89    fn try_push<H: HeaderStore>(
90        &mut self,
91        token: MessageToken,
92        policy: &EdgePolicy,
93        headers: &H,
94    ) -> EnqueueResult {
95        let decision = self.get_admission_decision(policy, token, headers);
96
97        // Look up the incoming token's byte size via HeaderStore.
98        let item_bytes = headers
99            .peek_header(token)
100            .map(|h| *h.payload_size_bytes())
101            .unwrap_or(0);
102
103        match decision {
104            AdmissionDecision::Admit => {
105                if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
106                    return EnqueueResult::Rejected;
107                }
108                self.bytes = self.bytes.saturating_add(item_bytes);
109                self.push_raw(token);
110                EnqueueResult::Enqueued
111            }
112            AdmissionDecision::DropNewest => EnqueueResult::DroppedNewest,
113            AdmissionDecision::Reject => EnqueueResult::Rejected,
114            AdmissionDecision::Block => EnqueueResult::Rejected,
115            AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
116                // Eviction is the caller's responsibility. push_output /
117                // out_try_push calls get_admission_decision, pre-evicts via
118                // try_pop, then calls try_push. Push if physically possible.
119                if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
120                    return EnqueueResult::Rejected;
121                }
122                self.bytes = self.bytes.saturating_add(item_bytes);
123                self.push_raw(token);
124                EnqueueResult::Enqueued
125            }
126        }
127    }
128
129    fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
130        if self.len == 0 {
131            return Err(QueueError::Empty);
132        }
133        // Peek header before popping to update byte tracking.
134        let front_token = self.buf[self.head];
135        let front_bytes = headers
136            .peek_header(front_token)
137            .map(|h| *h.payload_size_bytes())
138            .unwrap_or(0);
139
140        let token = self.pop_raw();
141        self.bytes = self.bytes.saturating_sub(front_bytes);
142        Ok(token)
143    }
144
145    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
146        let watermark = policy.watermark(self.len, self.bytes);
147        EdgeOccupancy::new(self.len, self.bytes, watermark)
148    }
149
150    fn is_empty(&self) -> bool {
151        self.len == 0
152    }
153
154    fn try_peek(&self) -> Result<MessageToken, QueueError> {
155        if self.len == 0 {
156            return Err(QueueError::Empty);
157        }
158        Ok(self.buf[self.head])
159    }
160
161    fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
162        if self.len == 0 || index >= self.len {
163            return Err(QueueError::Empty);
164        }
165        let pos = (self.head + index) % N;
166        Ok(self.buf[pos])
167    }
168
169    fn try_pop_batch<H: HeaderStore>(
170        &mut self,
171        policy: &crate::policy::BatchingPolicy,
172        headers: &H,
173    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
174        use crate::policy::WindowKind;
175
176        if self.len == 0 {
177            return Err(QueueError::Empty);
178        }
179
180        self.normalize();
181        let old_len = self.len;
182
183        let fixed_opt = *policy.fixed_n();
184        let delta_t_opt = *policy.max_delta_t();
185        let window_kind = policy.window_kind();
186
187        let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
188            Some(1)
189        } else {
190            fixed_opt
191        };
192
193        // Delta-t check via HeaderStore.
194        let mut delta_count = self.len;
195        if let Some(cap) = delta_t_opt {
196            if let Ok(front_header) = headers.peek_header(self.buf[0]) {
197                let front_ticks = *front_header.creation_tick();
198                let mut c = 0usize;
199                while c < self.len {
200                    if let Ok(h) = headers.peek_header(self.buf[c]) {
201                        let tick = *h.creation_tick();
202                        let delta = tick.saturating_sub(front_ticks);
203                        if delta <= cap {
204                            c += 1;
205                        } else {
206                            break;
207                        }
208                    } else {
209                        break;
210                    }
211                }
212                delta_count = c;
213            }
214        }
215
216        let apply_fixed = |limit: usize| -> usize {
217            if let Some(n) = effective_fixed {
218                core::cmp::min(limit, n)
219            } else {
220                limit
221            }
222        };
223
224        // --- Disjoint windows
225        if let WindowKind::Disjoint = window_kind {
226            let take_n = apply_fixed(core::cmp::min(self.len, delta_count));
227            if take_n == 0 {
228                return Err(QueueError::Empty);
229            }
230
231            // Update byte tracking for popped items.
232            let mut dropped_bytes = 0usize;
233            for i in 0..take_n {
234                if let Ok(h) = headers.peek_header(self.buf[i]) {
235                    dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
236                }
237            }
238            self.bytes = self.bytes.saturating_sub(dropped_bytes);
239
240            let new_head = take_n % N;
241            self.len = old_len - take_n;
242            self.head = new_head;
243            self.tail = (self.head + self.len) % N;
244
245            let slice = &mut self.buf[..take_n];
246            return Ok(BatchView::from_borrowed(slice, take_n));
247        }
248
249        // --- Sliding windows
250        if let WindowKind::Sliding(sw) = window_kind {
251            let stride = *sw.stride();
252            let size = effective_fixed.unwrap_or(1);
253
254            let mut max_present = core::cmp::min(self.len, size);
255            max_present = apply_fixed(core::cmp::min(max_present, delta_count));
256            let stride_to_pop = core::cmp::min(stride, self.len);
257
258            if max_present == 0 {
259                return Err(QueueError::Empty);
260            }
261
262            let mut popped_bytes = 0usize;
263            for i in 0..stride_to_pop {
264                if let Ok(h) = headers.peek_header(self.buf[i]) {
265                    popped_bytes = popped_bytes.saturating_add(*h.payload_size_bytes());
266                }
267            }
268            self.bytes = self.bytes.saturating_sub(popped_bytes);
269
270            let new_head = stride_to_pop % N;
271            self.len = old_len - stride_to_pop;
272            self.head = new_head;
273            self.tail = (self.head + self.len) % N;
274
275            let slice = &mut self.buf[..max_present];
276            return Ok(BatchView::from_borrowed(slice, max_present));
277        }
278
279        // --- Default (non-sliding, non-disjoint)
280        let mut take_n = core::cmp::min(self.len, delta_count);
281        take_n = apply_fixed(take_n);
282        if take_n == 0 {
283            return Err(QueueError::Empty);
284        }
285
286        let mut dropped_bytes = 0usize;
287        for i in 0..take_n {
288            if let Ok(h) = headers.peek_header(self.buf[i]) {
289                dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
290            }
291        }
292        self.bytes = self.bytes.saturating_sub(dropped_bytes);
293
294        let new_head = take_n % N;
295        self.len = old_len - take_n;
296        self.head = new_head;
297        self.tail = (self.head + self.len) % N;
298
299        let slice = &mut self.buf[..take_n];
300        Ok(BatchView::from_borrowed(slice, take_n))
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    // Runs the full Edge contract suite against StaticRing<Message<u32>, 16>.
309    crate::run_edge_contract_tests!(static_ring_contract, || { StaticRing::<16>::new() });
310}