limen_core/edge/
spsc_concurrent.rs1use alloc::vec::Vec;
15use std::sync::{Arc, Mutex};
16
17use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
18use crate::errors::QueueError;
19use crate::message::batch::BatchView;
20use crate::policy::{AdmissionDecision, BatchingPolicy, EdgePolicy, WindowKind};
21use crate::prelude::HeaderStore;
22use crate::types::MessageToken;
23
24struct ConcurrentEdgeInner {
29 buf: Vec<MessageToken>,
30 head: usize,
31 tail: usize,
32 len: usize,
33 capacity: usize,
34 bytes: usize,
35}
36
37impl ConcurrentEdgeInner {
38 fn new(capacity: usize) -> Self {
39 Self {
40 buf: alloc::vec![MessageToken::default(); capacity],
41 head: 0,
42 tail: 0,
43 len: 0,
44 capacity,
45 bytes: 0,
46 }
47 }
48
49 #[inline]
50 fn is_full(&self) -> bool {
51 self.len == self.capacity
52 }
53
54 #[inline]
55 fn push_raw(&mut self, token: MessageToken) {
56 self.buf[self.tail] = token;
57 self.tail = (self.tail + 1) % self.capacity;
58 self.len += 1;
59 }
60
61 #[inline]
62 fn pop_raw(&mut self) -> MessageToken {
63 let token = core::mem::take(&mut self.buf[self.head]);
64 self.head = (self.head + 1) % self.capacity;
65 self.len -= 1;
66 token
67 }
68
69 fn normalize(&mut self) {
71 if self.len == 0 {
72 self.head = 0;
73 self.tail = 0;
74 return;
75 }
76
77 if self.head == 0 {
78 self.tail = self.len % self.capacity;
79 return;
80 }
81
82 let cap = self.capacity;
83 let mut live_tokens = Vec::with_capacity(self.len);
84
85 for i in 0..self.len {
86 let src = (self.head + i) % cap;
87 live_tokens.push(core::mem::take(&mut self.buf[src]));
88 }
89
90 self.buf[..self.len].copy_from_slice(&live_tokens[..self.len]);
91
92 for i in self.len..cap {
93 self.buf[i] = MessageToken::default();
94 }
95
96 self.head = 0;
97 self.tail = self.len % cap;
98 }
99}
100
101#[derive(Clone)]
111pub struct ConcurrentEdge {
112 inner: Arc<Mutex<ConcurrentEdgeInner>>,
113}
114
115impl ConcurrentEdge {
116 pub fn new(capacity: usize) -> Self {
118 Self {
119 inner: Arc::new(Mutex::new(ConcurrentEdgeInner::new(capacity))),
120 }
121 }
122}
123
124impl Edge for ConcurrentEdge {
129 fn try_push<H: HeaderStore>(
130 &mut self,
131 token: MessageToken,
132 policy: &EdgePolicy,
133 headers: &H,
134 ) -> EnqueueResult {
135 let mut inner = match self.inner.lock() {
136 Ok(g) => g,
137 Err(_) => return EnqueueResult::Rejected,
138 };
139
140 let (decision, item_bytes) = match headers.peek_header(token) {
143 Ok(h) => {
144 let b = *h.payload_size_bytes();
145 let d = policy.decide(inner.len, inner.bytes, b, *h.deadline_ns(), *h.qos());
146 (d, b)
147 }
148 Err(_) => return EnqueueResult::Rejected,
149 };
150
151 match decision {
152 AdmissionDecision::Admit => {
153 if inner.is_full() || policy.caps.at_or_above_hard(inner.len, inner.bytes) {
154 return EnqueueResult::Rejected;
155 }
156 inner.bytes = inner.bytes.saturating_add(item_bytes);
157 inner.push_raw(token);
158 EnqueueResult::Enqueued
159 }
160 AdmissionDecision::DropNewest => EnqueueResult::DroppedNewest,
161 AdmissionDecision::Reject | AdmissionDecision::Block => EnqueueResult::Rejected,
162 AdmissionDecision::Evict(_) | AdmissionDecision::EvictUntilBelowHard => {
163 if inner.is_full() || policy.caps.at_or_above_hard(inner.len, inner.bytes) {
164 return EnqueueResult::Rejected;
165 }
166 inner.bytes = inner.bytes.saturating_add(item_bytes);
167 inner.push_raw(token);
168 EnqueueResult::Enqueued
169 }
170 }
171 }
172
173 fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
174 let mut inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
175 if inner.len == 0 {
176 return Err(QueueError::Empty);
177 }
178 let front_token = inner.buf[inner.head];
179 let front_bytes = headers
180 .peek_header(front_token)
181 .map(|h| *h.payload_size_bytes())
182 .unwrap_or(0);
183 let token = inner.pop_raw();
184 inner.bytes = inner.bytes.saturating_sub(front_bytes);
185 Ok(token)
186 }
187
188 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
189 match self.inner.lock() {
190 Ok(inner) => {
191 let watermark = policy.watermark(inner.len, inner.bytes);
192 EdgeOccupancy::new(inner.len, inner.bytes, watermark)
193 }
194 Err(_) => EdgeOccupancy::new(0, 0, policy.watermark(0, 0)),
195 }
196 }
197
198 fn is_empty(&self) -> bool {
199 self.inner.lock().map(|g| g.len == 0).unwrap_or(true)
200 }
201
202 fn try_peek(&self) -> Result<MessageToken, QueueError> {
203 let inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
204 if inner.len == 0 {
205 return Err(QueueError::Empty);
206 }
207 Ok(inner.buf[inner.head])
208 }
209
210 fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
211 let inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
212 if inner.len == 0 || index >= inner.len {
213 return Err(QueueError::Empty);
214 }
215 let pos = (inner.head + index) % inner.capacity;
216 Ok(inner.buf[pos])
217 }
218
219 fn try_pop_batch<H: HeaderStore>(
220 &mut self,
221 policy: &BatchingPolicy,
222 headers: &H,
223 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
224 let mut inner = self.inner.lock().map_err(|_| QueueError::Poisoned)?;
225
226 if inner.len == 0 {
227 return Err(QueueError::Empty);
228 }
229
230 inner.normalize();
231 let old_len = inner.len;
232
233 let fixed_opt = *policy.fixed_n();
234 let delta_t_opt = *policy.max_delta_t();
235 let window_kind = policy.window_kind();
236
237 let effective_fixed: Option<usize> = if fixed_opt.is_none() && delta_t_opt.is_none() {
238 Some(1)
239 } else {
240 fixed_opt
241 };
242
243 let mut delta_count = inner.len;
245 if let Some(cap) = delta_t_opt {
246 if let Ok(front_header) = headers.peek_header(inner.buf[0]) {
247 let front_ticks = *front_header.creation_tick();
248 let mut c = 0usize;
249 while c < inner.len {
250 if let Ok(h) = headers.peek_header(inner.buf[c]) {
251 let tick = *h.creation_tick();
252 let delta = tick.saturating_sub(front_ticks);
253 if delta <= cap {
254 c += 1;
255 } else {
256 break;
257 }
258 } else {
259 break;
260 }
261 }
262 delta_count = c;
263 }
264 }
265
266 let apply_fixed = |limit: usize| -> usize {
267 if let Some(n) = effective_fixed {
268 core::cmp::min(limit, n)
269 } else {
270 limit
271 }
272 };
273
274 if let WindowKind::Disjoint = window_kind {
276 let take_n = apply_fixed(core::cmp::min(inner.len, delta_count));
277 if take_n == 0 {
278 return Err(QueueError::Empty);
279 }
280 let mut dropped_bytes = 0usize;
281 for i in 0..take_n {
282 if let Ok(h) = headers.peek_header(inner.buf[i]) {
283 dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
284 }
285 }
286 inner.bytes = inner.bytes.saturating_sub(dropped_bytes);
287 inner.len = old_len - take_n;
288 inner.head = take_n % inner.capacity;
289 inner.tail = (inner.head + inner.len) % inner.capacity;
290
291 let mut owned = Vec::with_capacity(take_n);
292 for &tok in &inner.buf[..take_n] {
293 owned.push(tok);
294 }
295 return Ok(BatchView::from_owned(owned));
296 }
297
298 if let WindowKind::Sliding(sw) = window_kind {
300 let stride = *sw.stride();
301 let size = effective_fixed.unwrap_or(1);
302 let max_present =
303 apply_fixed(core::cmp::min(core::cmp::min(inner.len, size), delta_count));
304 let stride_to_pop = core::cmp::min(stride, inner.len);
305
306 if max_present == 0 {
307 return Err(QueueError::Empty);
308 }
309
310 let mut popped_bytes = 0usize;
311 for i in 0..stride_to_pop {
312 if let Ok(h) = headers.peek_header(inner.buf[i]) {
313 popped_bytes = popped_bytes.saturating_add(*h.payload_size_bytes());
314 }
315 }
316 inner.bytes = inner.bytes.saturating_sub(popped_bytes);
317 inner.len = old_len - stride_to_pop;
318 inner.head = stride_to_pop % inner.capacity;
319 inner.tail = (inner.head + inner.len) % inner.capacity;
320
321 let mut owned = Vec::with_capacity(max_present);
322 for &tok in &inner.buf[..max_present] {
323 owned.push(tok);
324 }
325 return Ok(BatchView::from_owned(owned));
326 }
327
328 let take_n = apply_fixed(core::cmp::min(inner.len, delta_count));
330 if take_n == 0 {
331 return Err(QueueError::Empty);
332 }
333 let mut dropped_bytes = 0usize;
334 for i in 0..take_n {
335 if let Ok(h) = headers.peek_header(inner.buf[i]) {
336 dropped_bytes = dropped_bytes.saturating_add(*h.payload_size_bytes());
337 }
338 }
339 inner.bytes = inner.bytes.saturating_sub(dropped_bytes);
340 inner.len = old_len - take_n;
341 inner.head = take_n % inner.capacity;
342 inner.tail = (inner.head + inner.len) % inner.capacity;
343
344 let mut owned = Vec::with_capacity(take_n);
345 for &tok in &inner.buf[..take_n] {
346 owned.push(tok);
347 }
348 Ok(BatchView::from_owned(owned))
349 }
350}
351
352#[cfg(feature = "std")]
353impl crate::edge::ScopedEdge for ConcurrentEdge {
354 type Handle<'a>
355 = ConcurrentEdge
356 where
357 Self: 'a;
358
359 fn scoped_handle<'a>(&'a self, _kind: crate::edge::EdgeHandleKind) -> Self::Handle<'a>
360 where
361 Self: 'a,
362 {
363 self.clone()
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 crate::run_edge_contract_tests!(concurrent_edge_contract, || ConcurrentEdge::new(16));
372}