limen_core/edge/
spsc_array.rs1use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
8use crate::errors::QueueError;
9use crate::policy::{AdmissionDecision, EdgePolicy};
10use crate::prelude::{BatchView, HeaderStore};
11use crate::types::MessageToken;
12
13use core::mem;
14
15pub struct StaticRing<const N: usize> {
17 buf: [MessageToken; N],
18 head: usize,
19 tail: usize,
20 len: usize,
21 bytes: usize,
23}
24
25impl<const N: usize> StaticRing<N> {
26 #[inline]
28 pub fn new() -> Self {
29 Self {
30 buf: [MessageToken::default(); N],
31 head: 0,
32 tail: 0,
33 len: 0,
34 bytes: 0,
35 }
36 }
37
38 #[inline]
39 fn is_full(&self) -> bool {
40 self.len == N
41 }
42
43 #[inline]
44 fn push_raw(&mut self, token: MessageToken) {
45 self.buf[self.tail] = token;
46 self.tail = (self.tail + 1) % N;
47 self.len += 1;
48 }
49
50 #[inline]
51 fn pop_raw(&mut self) -> MessageToken {
52 let token = mem::take(&mut self.buf[self.head]);
53 self.head = (self.head + 1) % N;
54 self.len -= 1;
55 token
56 }
57
58 fn normalize(&mut self) {
60 if self.len == 0 {
61 self.head = 0;
62 self.tail = 0;
63 return;
64 }
65 if self.head == 0 {
66 self.tail = (self.head + self.len) % N;
67 return;
68 }
69 for i in 0..self.len {
70 let src_idx = (self.head + i) % N;
71 let tmp = mem::take(&mut self.buf[src_idx]);
72 self.buf[i] = tmp;
73 }
74 for i in self.len..N {
75 self.buf[i] = MessageToken::default();
76 }
77 self.head = 0;
78 self.tail = (self.head + self.len) % N;
79 }
80}
81
82impl<const N: usize> Default for StaticRing<N> {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl<const N: usize> Edge for StaticRing<N> {
89 fn try_push<H: HeaderStore>(
90 &mut self,
91 token: MessageToken,
92 policy: &EdgePolicy,
93 headers: &H,
94 ) -> EnqueueResult {
95 let decision = self.get_admission_decision(policy, token, headers);
96
97 let item_bytes = headers
99 .peek_header(token)
100 .map(|h| *h.payload_size_bytes())
101 .unwrap_or(0);
102
103 match decision {
104 AdmissionDecision::Admit => {
105 if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
106 return EnqueueResult::Rejected;
107 }
108 self.bytes = self.bytes.saturating_add(item_bytes);
109 self.push_raw(token);
110 EnqueueResult::Enqueued
111 }
112 AdmissionDecision::DropNewest => EnqueueResult::DroppedNewest,
113 AdmissionDecision::Reject => EnqueueResult::Rejected,
114 AdmissionDecision::Block => EnqueueResult::Rejected,
115 AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
116 if self.is_full() || policy.caps.at_or_above_hard(self.len, self.bytes) {
120 return EnqueueResult::Rejected;
121 }
122 self.bytes = self.bytes.saturating_add(item_bytes);
123 self.push_raw(token);
124 EnqueueResult::Enqueued
125 }
126 }
127 }
128
129 fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
130 if self.len == 0 {
131 return Err(QueueError::Empty);
132 }
133 let front_token = self.buf[self.head];
135 let front_bytes = headers
136 .peek_header(front_token)
137 .map(|h| *h.payload_size_bytes())
138 .unwrap_or(0);
139
140 let token = self.pop_raw();
141 self.bytes = self.bytes.saturating_sub(front_bytes);
142 Ok(token)
143 }
144
145 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
146 let watermark = policy.watermark(self.len, self.bytes);
147 EdgeOccupancy::new(self.len, self.bytes, watermark)
148 }
149
150 fn is_empty(&self) -> bool {
151 self.len == 0
152 }
153
154 fn try_peek(&self) -> Result<MessageToken, QueueError> {
155 if self.len == 0 {
156 return Err(QueueError::Empty);
157 }
158 Ok(self.buf[self.head])
159 }
160
161 fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
162 if self.len == 0 || index >= self.len {
163 return Err(QueueError::Empty);
164 }
165 let pos = (self.head + index) % N;
166 Ok(self.buf[pos])
167 }
168
169 fn try_pop_batch<H: HeaderStore>(
170 &mut self,
171 policy: &crate::policy::BatchingPolicy,
172 headers: &H,
173 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
174 use crate::policy::WindowKind;
175
176 if self.len == 0 {
177 return Err(QueueError::Empty);
178 }
179
180 self.normalize();
181 let old_len = self.len;
182
183 let fixed_opt = *policy.fixed_n();
184 let delta_t_opt = *policy.max_delta_t();
185 let window_kind = policy.window_kind();
186
187 let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
188 Some(1)
189 } else {
190 fixed_opt
191 };
192
193 let mut delta_count = self.len;
195 if let Some(cap) = delta_t_opt {
196 if let Ok(front_header) = headers.peek_header(self.buf[0]) {
197 let front_ticks = *front_header.creation_tick();
198 let mut c = 0usize;
199 while c < self.len {
200 if let Ok(h) = headers.peek_header(self.buf[c]) {
201 let tick = *h.creation_tick();
202 let delta = tick.saturating_sub(front_ticks);
203 if delta <= cap {
204 c += 1;
205 } else {
206 break;
207 }
208 } else {
209 break;
210 }
211 }
212 delta_count = c;
213 }
214 }
215
216 let apply_fixed = |limit: usize| -> usize {
217 if let Some(n) = effective_fixed {
218 core::cmp::min(limit, n)
219 } else {
220 limit
221 }
222 };
223
224 if let WindowKind::Disjoint = window_kind {
226 let take_n = apply_fixed(core::cmp::min(self.len, delta_count));
227 if take_n == 0 {
228 return Err(QueueError::Empty);
229 }
230
231 let mut dropped_bytes = 0usize;
233 for i in 0..take_n {
234 if let Ok(h) = headers.peek_header(self.buf[i]) {
235 dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
236 }
237 }
238 self.bytes = self.bytes.saturating_sub(dropped_bytes);
239
240 let new_head = take_n % N;
241 self.len = old_len - take_n;
242 self.head = new_head;
243 self.tail = (self.head + self.len) % N;
244
245 let slice = &mut self.buf[..take_n];
246 return Ok(BatchView::from_borrowed(slice, take_n));
247 }
248
249 if let WindowKind::Sliding(sw) = window_kind {
251 let stride = *sw.stride();
252 let size = effective_fixed.unwrap_or(1);
253
254 let mut max_present = core::cmp::min(self.len, size);
255 max_present = apply_fixed(core::cmp::min(max_present, delta_count));
256 let stride_to_pop = core::cmp::min(stride, self.len);
257
258 if max_present == 0 {
259 return Err(QueueError::Empty);
260 }
261
262 let mut popped_bytes = 0usize;
263 for i in 0..stride_to_pop {
264 if let Ok(h) = headers.peek_header(self.buf[i]) {
265 popped_bytes = popped_bytes.saturating_add(*h.payload_size_bytes());
266 }
267 }
268 self.bytes = self.bytes.saturating_sub(popped_bytes);
269
270 let new_head = stride_to_pop % N;
271 self.len = old_len - stride_to_pop;
272 self.head = new_head;
273 self.tail = (self.head + self.len) % N;
274
275 let slice = &mut self.buf[..max_present];
276 return Ok(BatchView::from_borrowed(slice, max_present));
277 }
278
279 let mut take_n = core::cmp::min(self.len, delta_count);
281 take_n = apply_fixed(take_n);
282 if take_n == 0 {
283 return Err(QueueError::Empty);
284 }
285
286 let mut dropped_bytes = 0usize;
287 for i in 0..take_n {
288 if let Ok(h) = headers.peek_header(self.buf[i]) {
289 dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
290 }
291 }
292 self.bytes = self.bytes.saturating_sub(dropped_bytes);
293
294 let new_head = take_n % N;
295 self.len = old_len - take_n;
296 self.head = new_head;
297 self.tail = (self.head + self.len) % N;
298
299 let slice = &mut self.buf[..take_n];
300 Ok(BatchView::from_borrowed(slice, take_n))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 crate::run_edge_contract_tests!(static_ring_contract, || { StaticRing::<16>::new() });
310}