Skip to main content

limen_core/
edge.rs

1//! SPSC edge trait, occupancy types, and queue implementations.
2//!
3//! Edges are the typed SPSC queues that connect nodes. They store
4//! [`MessageToken`] handles rather than full messages; actual message data
5//! lives in a [`MemoryManager`](crate::memory::manager::MemoryManager).
6//! Header metadata needed for admission and batching is accessed through a
7//! [`HeaderStore`] parameter —
8//! statically dispatched, no `dyn`.
9//!
10//! Key types:
11//! - [`Edge`] — the core SPSC contract (`try_push`, `try_pop`, `occupancy`, etc.).
12//! - [`EdgeOccupancy`] — occupancy snapshot used for scheduling and telemetry.
13//! - [`EnqueueResult`] — outcome of a push attempt.
14//! - [`NoQueue`] — a no-op placeholder for unconnected ports.
15//! - `ScopedEdge` (`std`) — factory for per-worker handles in concurrent execution.
16//!
17//! Implementations: [`spsc_array`] (`no_std`), `spsc_vecdeque` (`alloc`),
18//! `spsc_concurrent` (`std`), [`spsc_priority2`], `spsc_raw` (`spsc_raw` feature).
19
20use crate::errors::QueueError;
21use crate::message::Message;
22use crate::policy::{AdmissionDecision, BatchingPolicy, EdgePolicy, WatermarkState};
23use crate::prelude::{BatchView, HeaderStore, Payload};
24use crate::types::MessageToken;
25
26pub mod link;
27
28pub mod spsc_array;
29
30#[cfg(feature = "alloc")]
31pub mod spsc_vecdeque;
32
33#[cfg(feature = "std")]
34pub mod spsc_concurrent;
35
36#[cfg(feature = "spsc_raw")]
37pub mod spsc_raw;
38
39pub mod spsc_priority2;
40
41#[cfg(any(test, feature = "bench"))]
42pub mod bench;
43
44#[cfg(any(test, feature = "bench"))]
45pub mod contract_tests;
46#[cfg(any(test, feature = "bench"))]
47pub use contract_tests::*;
48
49/// Push result for enqueue attempts.
50#[non_exhaustive]
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum EnqueueResult {
53    /// Item was enqueued successfully.
54    Enqueued,
55    /// Item was dropped per policy (DropNewest).
56    DroppedNewest,
57    /// Item could not be enqueued due to backpressure or full capacity.
58    Rejected,
59}
60
61/// Queue occupancy snapshot used for decisions.
62#[non_exhaustive]
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub struct EdgeOccupancy {
65    /// Number of items currently in the queue.
66    items: usize,
67    /// Estimated bytes currently in the queue.
68    bytes: usize,
69    /// Watermark state derived from capacities.
70    watermark: WatermarkState,
71}
72
73impl EdgeOccupancy {
74    /// Create a new `EdgeOccupancy`.
75    #[inline]
76    pub const fn new(items: usize, bytes: usize, watermark: WatermarkState) -> Self {
77        Self {
78            items,
79            bytes,
80            watermark,
81        }
82    }
83
84    /// Number of items currently in the queue.
85    #[inline]
86    pub fn items(&self) -> &usize {
87        &self.items
88    }
89
90    /// Estimated bytes currently in the queue.
91    #[inline]
92    pub fn bytes(&self) -> &usize {
93        &self.bytes
94    }
95
96    /// Watermark state derived from capacities.
97    #[inline]
98    pub fn watermark(&self) -> &WatermarkState {
99        &self.watermark
100    }
101}
102
103/// A single-producer, single-consumer queue contract.
104///
105/// Edges store [`MessageToken`] handles. Actual message data (header + payload)
106/// resides in a [`MemoryManager`](crate::memory::manager::MemoryManager).
107/// Edge methods that need header metadata (admission, batching, peek) receive
108/// a `&impl HeaderStore` parameter — statically dispatched, no `dyn`.
109pub trait Edge {
110    /// Attempt to push a token onto the queue using the given edge policy.
111    ///
112    /// The implementation uses `headers` to look up the token's message header
113    /// for admission decisions (byte size, QoS, deadline).
114    ///
115    /// For `DropOldest` policies, the caller must pre-evict via
116    /// `get_admission_decision` + `try_pop` before calling `try_push`.
117    /// `try_push` itself never evicts.
118    fn try_push<H: HeaderStore>(
119        &mut self,
120        token: MessageToken,
121        policy: &EdgePolicy,
122        headers: &H,
123    ) -> EnqueueResult;
124
125    /// Attempt to pop the front token from the queue.
126    ///
127    /// Uses `headers` to look up the popped token's byte size for internal
128    /// byte tracking.
129    fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError>;
130
131    /// Return a snapshot of occupancy used for telemetry and admission.
132    ///
133    /// Uses internal counters (items + total_bytes) — no HeaderStore needed.
134    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy;
135
136    /// Return `true` if the queue is empty.
137    fn is_empty(&self) -> bool;
138
139    /// Peek at the front token without removing it.
140    fn try_peek(&self) -> Result<MessageToken, QueueError>;
141
142    /// Peek at the token at logical position `index` from the front.
143    ///
144    /// - `index = 0` is equivalent to `try_peek`.
145    /// - Returns `QueueError::Empty` if `index` is out of range.
146    fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError>;
147
148    /// Peek the front message header via `HeaderStore` (convenience).
149    ///
150    /// Returns the `HeaderGuard` associated to `H`, which dereferences to
151    /// `MessageHeader`. This allows both single-threaded managers (which can
152    /// return `&MessageHeader`) and concurrent managers (which return a
153    /// guard holding a slot-level lock).
154    ///
155    /// The returned guard keeps the underlying header valid for the lifetime
156    /// of the guard.
157    fn peek_header<'h, H: HeaderStore>(
158        &self,
159        headers: &'h H,
160    ) -> Result<<H as HeaderStore>::HeaderGuard<'h>, QueueError> {
161        let token = self.try_peek()?;
162        headers.peek_header(token).map_err(|_| QueueError::Empty)
163    }
164
165    /// Pop a batch of tokens according to the provided batching policy.
166    ///
167    /// Uses `headers` for delta-t readiness checks (peeks `creation_tick`
168    /// on tokens in the queue via HeaderStore).
169    fn try_pop_batch<H: HeaderStore>(
170        &mut self,
171        policy: &BatchingPolicy,
172        headers: &H,
173    ) -> Result<BatchView<'_, MessageToken>, QueueError>;
174
175    /// Return an `AdmissionDecision` for the given token according to
176    /// `policy` and the current occupancy snapshot.
177    ///
178    /// Pure: does not mutate the queue.
179    fn get_admission_decision<H: HeaderStore>(
180        &self,
181        policy: &EdgePolicy,
182        token: MessageToken,
183        headers: &H,
184    ) -> AdmissionDecision {
185        let occ = self.occupancy(policy);
186        match headers.peek_header(token) {
187            Ok(h) => policy.decide(
188                occ.items,
189                occ.bytes,
190                *h.payload_size_bytes(),
191                *h.deadline_ns(),
192                *h.qos(),
193            ),
194            Err(_) => AdmissionDecision::Reject,
195        }
196    }
197
198    /// Return an `AdmissionDecision` for the given token according to
199    /// `policy` and the current occupancy snapshot.
200    ///
201    /// Pure: does not mutate the queue.
202    fn get_admission_decision_from_message<P: Payload>(
203        &self,
204        policy: &EdgePolicy,
205        message: &Message<P>,
206    ) -> AdmissionDecision {
207        let occ = self.occupancy(policy);
208        let h = message.header();
209        policy.decide(
210            occ.items,
211            occ.bytes,
212            *h.payload_size_bytes(),
213            *h.deadline_ns(),
214            *h.qos(),
215        )
216    }
217}
218
219/// Which role a scoped edge handle serves.
220///
221/// Arc-based edges (e.g. `ConcurrentEdge`) ignore this — the clone is
222/// full-duplex. Future lock-free split-handle edges (e.g. `SpscAtomicRing`)
223/// will return a producer-only or consumer-only handle depending on `kind`.
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum EdgeHandleKind {
226    /// Handle will be used to push messages (output side of a node).
227    Producer,
228    /// Handle will be used to pop messages (input side of a node).
229    Consumer,
230}
231
232/// Scoped handle factory for edges used in concurrent execution.
233///
234/// The GAT `Handle<'a>` allows implementations to return either:
235/// - An owned clone (Arc-based: `ConcurrentEdge`)
236/// - A borrowed split handle (future lock-free: producer or consumer view)
237///
238/// The lifetime `'a` is tied to `std::thread::scope` — all handles are
239/// guaranteed to be dropped before the scope exits.
240#[cfg(feature = "std")]
241pub trait ScopedEdge: Edge {
242    /// Per-worker handle type. Must implement `Edge + Send` so it can be
243    /// moved into a scoped thread and used for stepping.
244    type Handle<'a>: Edge + Send + 'a
245    where
246        Self: 'a;
247
248    /// Create a scoped handle for a worker thread.
249    ///
250    /// `kind` indicates whether the worker will use this handle as a
251    /// producer (push) or consumer (pop). Arc-based implementations may
252    /// ignore `kind`. Split-handle implementations use it to select the
253    /// correct half.
254    fn scoped_handle<'a>(&'a self, kind: EdgeHandleKind) -> Self::Handle<'a>
255    where
256        Self: 'a;
257}
258
259/// A no-op queue implementation used for phantom inputs and outputs.
260///
261/// `NoQueue` acts as a placeholder in the graph where a queue is required by
262/// type but no actual buffering or message transfer is desired. All enqueue
263/// attempts are rejected, and all dequeue or peek attempts return empty.
264///
265/// This is primarily useful for:
266/// - Phantom or unconnected ports in a graph.
267/// - Simplifying generic code that expects a queue type, without allocating
268///   unnecessary resources.
269/// - Static analysis or testing scenarios where message flow is disabled.
270///
271/// # Type Parameters
272/// - `P`: Payload type of the [`Message`] carried by this queue.
273///
274/// # Behavior
275/// - [`Edge::try_push`] always returns [`EnqueueResult::Rejected`].
276/// - [`Edge::try_pop`] always returns [`QueueError::Empty`].
277/// - [`Edge::try_peek`] always returns [`QueueError::Empty`].
278/// - [`Edge::occupancy`] always reports zero items, zero bytes, and
279///   [`WatermarkState::AtOrAboveHard`] (fully saturated, disallowing admission).
280pub struct NoQueue;
281
282impl Edge for NoQueue {
283    #[inline]
284    fn try_push<H: HeaderStore>(
285        &mut self,
286        _token: MessageToken,
287        _policy: &EdgePolicy,
288        _headers: &H,
289    ) -> EnqueueResult {
290        EnqueueResult::Rejected
291    }
292
293    #[inline]
294    fn try_pop<H: HeaderStore>(&mut self, _headers: &H) -> Result<MessageToken, QueueError> {
295        Err(QueueError::Empty)
296    }
297
298    #[inline]
299    fn occupancy(&self, _policy: &EdgePolicy) -> EdgeOccupancy {
300        EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard)
301    }
302
303    fn is_empty(&self) -> bool {
304        true
305    }
306
307    fn try_peek(&self) -> Result<MessageToken, QueueError> {
308        Err(QueueError::Empty)
309    }
310
311    fn try_peek_at(&self, _index: usize) -> Result<MessageToken, QueueError> {
312        Err(QueueError::Empty)
313    }
314
315    #[inline]
316    fn try_pop_batch<H: HeaderStore>(
317        &mut self,
318        _policy: &BatchingPolicy,
319        _headers: &H,
320    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
321        Err(QueueError::Empty)
322    }
323}