limen_core/edge.rs
1//! SPSC edge trait, occupancy types, and queue implementations.
2//!
3//! Edges are the typed SPSC queues that connect nodes. They store
4//! [`MessageToken`] handles rather than full messages; actual message data
5//! lives in a [`MemoryManager`](crate::memory::manager::MemoryManager).
6//! Header metadata needed for admission and batching is accessed through a
7//! [`HeaderStore`] parameter —
8//! statically dispatched, no `dyn`.
9//!
10//! Key types:
11//! - [`Edge`] — the core SPSC contract (`try_push`, `try_pop`, `occupancy`, etc.).
12//! - [`EdgeOccupancy`] — occupancy snapshot used for scheduling and telemetry.
13//! - [`EnqueueResult`] — outcome of a push attempt.
14//! - [`NoQueue`] — a no-op placeholder for unconnected ports.
15//! - `ScopedEdge` (`std`) — factory for per-worker handles in concurrent execution.
16//!
17//! Implementations: [`spsc_array`] (`no_std`), `spsc_vecdeque` (`alloc`),
18//! `spsc_concurrent` (`std`), [`spsc_priority2`], `spsc_raw` (`spsc_raw` feature).
19
20use crate::errors::QueueError;
21use crate::message::Message;
22use crate::policy::{AdmissionDecision, BatchingPolicy, EdgePolicy, WatermarkState};
23use crate::prelude::{BatchView, HeaderStore, Payload};
24use crate::types::MessageToken;
25
26pub mod link;
27
28pub mod spsc_array;
29
30#[cfg(feature = "alloc")]
31pub mod spsc_vecdeque;
32
33#[cfg(feature = "std")]
34pub mod spsc_concurrent;
35
36#[cfg(feature = "spsc_raw")]
37pub mod spsc_raw;
38
39pub mod spsc_priority2;
40
41#[cfg(any(test, feature = "bench"))]
42pub mod bench;
43
44#[cfg(any(test, feature = "bench"))]
45pub mod contract_tests;
46#[cfg(any(test, feature = "bench"))]
47pub use contract_tests::*;
48
49/// Push result for enqueue attempts.
50#[non_exhaustive]
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum EnqueueResult {
53 /// Item was enqueued successfully.
54 Enqueued,
55 /// Item was dropped per policy (DropNewest).
56 DroppedNewest,
57 /// Item could not be enqueued due to backpressure or full capacity.
58 Rejected,
59}
60
61/// Queue occupancy snapshot used for decisions.
62#[non_exhaustive]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub struct EdgeOccupancy {
65 /// Number of items currently in the queue.
66 items: usize,
67 /// Estimated bytes currently in the queue.
68 bytes: usize,
69 /// Watermark state derived from capacities.
70 watermark: WatermarkState,
71}
72
73impl EdgeOccupancy {
74 /// Create a new `EdgeOccupancy`.
75 #[inline]
76 pub const fn new(items: usize, bytes: usize, watermark: WatermarkState) -> Self {
77 Self {
78 items,
79 bytes,
80 watermark,
81 }
82 }
83
84 /// Number of items currently in the queue.
85 #[inline]
86 pub fn items(&self) -> &usize {
87 &self.items
88 }
89
90 /// Estimated bytes currently in the queue.
91 #[inline]
92 pub fn bytes(&self) -> &usize {
93 &self.bytes
94 }
95
96 /// Watermark state derived from capacities.
97 #[inline]
98 pub fn watermark(&self) -> &WatermarkState {
99 &self.watermark
100 }
101}
102
103/// A single-producer, single-consumer queue contract.
104///
105/// Edges store [`MessageToken`] handles. Actual message data (header + payload)
106/// resides in a [`MemoryManager`](crate::memory::manager::MemoryManager).
107/// Edge methods that need header metadata (admission, batching, peek) receive
108/// a `&impl HeaderStore` parameter — statically dispatched, no `dyn`.
109pub trait Edge {
110 /// Attempt to push a token onto the queue using the given edge policy.
111 ///
112 /// The implementation uses `headers` to look up the token's message header
113 /// for admission decisions (byte size, QoS, deadline).
114 ///
115 /// For `DropOldest` policies, the caller must pre-evict via
116 /// `get_admission_decision` + `try_pop` before calling `try_push`.
117 /// `try_push` itself never evicts.
118 fn try_push<H: HeaderStore>(
119 &mut self,
120 token: MessageToken,
121 policy: &EdgePolicy,
122 headers: &H,
123 ) -> EnqueueResult;
124
125 /// Attempt to pop the front token from the queue.
126 ///
127 /// Uses `headers` to look up the popped token's byte size for internal
128 /// byte tracking.
129 fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError>;
130
131 /// Return a snapshot of occupancy used for telemetry and admission.
132 ///
133 /// Uses internal counters (items + total_bytes) — no HeaderStore needed.
134 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
135
136 /// Return `true` if the queue is empty.
137 fn is_empty(&self) -> bool;
138
139 /// Peek at the front token without removing it.
140 fn try_peek(&self) -> Result<MessageToken, QueueError>;
141
142 /// Peek at the token at logical position `index` from the front.
143 ///
144 /// - `index = 0` is equivalent to `try_peek`.
145 /// - Returns `QueueError::Empty` if `index` is out of range.
146 fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError>;
147
148 /// Peek the front message header via `HeaderStore` (convenience).
149 ///
150 /// Returns the `HeaderGuard` associated to `H`, which dereferences to
151 /// `MessageHeader`. This allows both single-threaded managers (which can
152 /// return `&MessageHeader`) and concurrent managers (which return a
153 /// guard holding a slot-level lock).
154 ///
155 /// The returned guard keeps the underlying header valid for the lifetime
156 /// of the guard.
157 fn peek_header<'h, H: HeaderStore>(
158 &self,
159 headers: &'h H,
160 ) -> Result<<H as HeaderStore>::HeaderGuard<'h>, QueueError> {
161 let token = self.try_peek()?;
162 headers.peek_header(token).map_err(|_| QueueError::Empty)
163 }
164
165 /// Pop a batch of tokens according to the provided batching policy.
166 ///
167 /// Uses `headers` for delta-t readiness checks (peeks `creation_tick`
168 /// on tokens in the queue via HeaderStore).
169 fn try_pop_batch<H: HeaderStore>(
170 &mut self,
171 policy: &BatchingPolicy,
172 headers: &H,
173 ) -> Result<BatchView<'_, MessageToken>, QueueError>;
174
175 /// Return an `AdmissionDecision` for the given token according to
176 /// `policy` and the current occupancy snapshot.
177 ///
178 /// Pure: does not mutate the queue.
179 fn get_admission_decision<H: HeaderStore>(
180 &self,
181 policy: &EdgePolicy,
182 token: MessageToken,
183 headers: &H,
184 ) -> AdmissionDecision {
185 let occ = self.occupancy(policy);
186 match headers.peek_header(token) {
187 Ok(h) => policy.decide(
188 occ.items,
189 occ.bytes,
190 *h.payload_size_bytes(),
191 *h.deadline_ns(),
192 *h.qos(),
193 ),
194 Err(_) => AdmissionDecision::Reject,
195 }
196 }
197
198 /// Return an `AdmissionDecision` for the given token according to
199 /// `policy` and the current occupancy snapshot.
200 ///
201 /// Pure: does not mutate the queue.
202 fn get_admission_decision_from_message<P: Payload>(
203 &self,
204 policy: &EdgePolicy,
205 message: &Message<P>,
206 ) -> AdmissionDecision {
207 let occ = self.occupancy(policy);
208 let h = message.header();
209 policy.decide(
210 occ.items,
211 occ.bytes,
212 *h.payload_size_bytes(),
213 *h.deadline_ns(),
214 *h.qos(),
215 )
216 }
217}
218
219/// Which role a scoped edge handle serves.
220///
221/// Arc-based edges (e.g. `ConcurrentEdge`) ignore this — the clone is
222/// full-duplex. Future lock-free split-handle edges (e.g. `SpscAtomicRing`)
223/// will return a producer-only or consumer-only handle depending on `kind`.
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum EdgeHandleKind {
226 /// Handle will be used to push messages (output side of a node).
227 Producer,
228 /// Handle will be used to pop messages (input side of a node).
229 Consumer,
230}
231
232/// Scoped handle factory for edges used in concurrent execution.
233///
234/// The GAT `Handle<'a>` allows implementations to return either:
235/// - An owned clone (Arc-based: `ConcurrentEdge`)
236/// - A borrowed split handle (future lock-free: producer or consumer view)
237///
238/// The lifetime `'a` is tied to `std::thread::scope` — all handles are
239/// guaranteed to be dropped before the scope exits.
240#[cfg(feature = "std")]
241pub trait ScopedEdge: Edge {
242 /// Per-worker handle type. Must implement `Edge + Send` so it can be
243 /// moved into a scoped thread and used for stepping.
244 type Handle<'a>: Edge + Send + 'a
245 where
246 Self: 'a;
247
248 /// Create a scoped handle for a worker thread.
249 ///
250 /// `kind` indicates whether the worker will use this handle as a
251 /// producer (push) or consumer (pop). Arc-based implementations may
252 /// ignore `kind`. Split-handle implementations use it to select the
253 /// correct half.
254 fn scoped_handle<'a>(&'a self, kind: EdgeHandleKind) -> Self::Handle<'a>
255 where
256 Self: 'a;
257}
258
259/// A no-op queue implementation used for phantom inputs and outputs.
260///
261/// `NoQueue` acts as a placeholder in the graph where a queue is required by
262/// type but no actual buffering or message transfer is desired. All enqueue
263/// attempts are rejected, and all dequeue or peek attempts return empty.
264///
265/// This is primarily useful for:
266/// - Phantom or unconnected ports in a graph.
267/// - Simplifying generic code that expects a queue type, without allocating
268/// unnecessary resources.
269/// - Static analysis or testing scenarios where message flow is disabled.
270///
271/// # Type Parameters
272/// - `P`: Payload type of the [`Message`] carried by this queue.
273///
274/// # Behavior
275/// - [`Edge::try_push`] always returns [`EnqueueResult::Rejected`].
276/// - [`Edge::try_pop`] always returns [`QueueError::Empty`].
277/// - [`Edge::try_peek`] always returns [`QueueError::Empty`].
278/// - [`Edge::occupancy`] always reports zero items, zero bytes, and
279/// [`WatermarkState::AtOrAboveHard`] (fully saturated, disallowing admission).
280pub struct NoQueue;
281
282impl Edge for NoQueue {
283 #[inline]
284 fn try_push<H: HeaderStore>(
285 &mut self,
286 _token: MessageToken,
287 _policy: &EdgePolicy,
288 _headers: &H,
289 ) -> EnqueueResult {
290 EnqueueResult::Rejected
291 }
292
293 #[inline]
294 fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
295 Err(QueueError::Empty)
296 }
297
298 #[inline]
299 fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
300 EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard)
301 }
302
303 fn is_empty(&self) -> bool {
304 true
305 }
306
307 fn try_peek(&self) -> Result<MessageToken, QueueError> {
308 Err(QueueError::Empty)
309 }
310
311 fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
312 Err(QueueError::Empty)
313 }
314
315 #[inline]
316 fn try_pop_batch<H: HeaderStore>(
317 &mut self,
318 _policy: &BatchingPolicy,
319 _headers: &H,
320 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
321 Err(QueueError::Empty)
322 }
323}