Skip to main content

limen_core/edge/
link.rs

1//! Edge graph-link and descriptor types.
2
3use crate::{
4    edge::{Edge, EdgeOccupancy, EnqueueResult},
5    errors::QueueError,
6    policy::EdgePolicy,
7    prelude::{BatchView, HeaderStore},
8    types::{EdgeIndex, MessageToken, PortId},
9};
10
11/// A lightweight descriptor that **links to** the concrete queue instance
12/// backing a graph edge, along with its routing and policy metadata.
13///
14/// Unlike a pure descriptor, `EdgeLink` **owns** the queue
15/// implementation. This keeps it zero-alloc and allows direct, policy-aware
16/// operations on the buffer.
17///
18/// - `Q`: concrete queue type implementing `Edge`
19#[non_exhaustive]
20#[derive(Debug)]
21pub struct EdgeLink<Q>
22where
23    Q: Edge,
24{
25    /// Owned handle to the concrete queue instance for this edge.
26    queue: Q,
27
28    /// Unique identifier of this edge in the graph.
29    id: EdgeIndex,
30
31    /// Upstream node's output port.
32    upstream_port: PortId,
33
34    /// Downstream node's input port.
35    downstream_port: PortId,
36
37    /// Admission and scheduling policy applied to this edge.
38    policy: EdgePolicy,
39
40    /// Optional static name used for diagnostics or graph tooling.
41    name: Option<&'static str>,
42}
43
44impl<Q> EdgeLink<Q>
45where
46    Q: Edge,
47{
48    /// Construct a new `EdgeLink` that owns the given queue and records its metadata.
49    #[inline]
50    pub fn new(
51        queue: Q,
52        id: EdgeIndex,
53        upstream_port: PortId,
54        downstream_port: PortId,
55        policy: EdgePolicy,
56        name: Option<&'static str>,
57    ) -> Self {
58        Self {
59            queue,
60            id,
61            upstream_port,
62            downstream_port,
63            policy,
64            name,
65        }
66    }
67
68    /// Get a reference to the inner queue.
69    #[inline]
70    pub fn queue(&self) -> &Q {
71        &self.queue
72    }
73
74    /// Get a mutable reference to the inner queue.
75    #[inline]
76    pub fn queue_mut(&mut self) -> &mut Q {
77        &mut self.queue
78    }
79
80    /// Get the unique identifier of this edge.
81    #[inline]
82    pub fn id(&self) -> &EdgeIndex {
83        &self.id
84    }
85
86    /// Get the upstream output port index.
87    #[inline]
88    pub fn upstream_port(&self) -> &PortId {
89        &self.upstream_port
90    }
91
92    /// Get the downstream input port index.
93    #[inline]
94    pub fn downstream_port(&self) -> &PortId {
95        &self.downstream_port
96    }
97
98    /// Get the edge policy applied to this queue.
99    #[inline]
100    pub fn policy(&self) -> &EdgePolicy {
101        &self.policy
102    }
103
104    /// Get the optional static name of this queue link.
105    #[inline]
106    pub fn name(&self) -> Option<&'static str> {
107        self.name
108    }
109
110    /// Return the `EdgeDescriptor` for this `EdgeLink`.
111    #[inline]
112    pub fn descriptor(&self) -> EdgeDescriptor {
113        EdgeDescriptor {
114            id: self.id,
115            upstream: self.upstream_port,
116            downstream: self.downstream_port,
117            name: self.name,
118        }
119    }
120}
121
122impl<Q> Edge for EdgeLink<Q>
123where
124    Q: Edge,
125{
126    fn try_push<H: HeaderStore>(
127        &mut self,
128        token: MessageToken,
129        policy: &EdgePolicy,
130        headers: &H,
131    ) -> EnqueueResult {
132        self.queue.try_push(token, policy, headers)
133    }
134
135    fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
136        self.queue.try_pop(headers)
137    }
138
139    fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
140        self.queue.occupancy(policy)
141    }
142
143    fn is_empty(&self) -> bool {
144        self.queue.is_empty()
145    }
146
147    fn try_peek(&self) -> Result<MessageToken, QueueError> {
148        self.queue.try_peek()
149    }
150
151    fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
152        self.queue.try_peek_at(index)
153    }
154
155    fn try_pop_batch<H: HeaderStore>(
156        &mut self,
157        policy: &crate::policy::BatchingPolicy,
158        headers: &H,
159    ) -> Result<BatchView<'_, MessageToken>, QueueError> {
160        self.queue.try_pop_batch(policy, headers)
161    }
162}
163
164/// An edge couples one output port to one input port with an admission policy.
165#[non_exhaustive]
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub struct EdgeDescriptor {
168    /// Unique identifier of this edge in the graph.
169    id: EdgeIndex,
170    /// Identifier of the upstream node / port.
171    upstream: PortId,
172    /// Identifier of the downstream node / port.
173    downstream: PortId,
174    /// Optional static name (for diagnostics or graph tooling).
175    name: Option<&'static str>,
176}
177
178impl EdgeDescriptor {
179    /// Construct a new `EdgeDescriptor`.
180    #[inline]
181    pub fn new(
182        id: EdgeIndex,
183        upstream: PortId,
184        downstream: PortId,
185        name: Option<&'static str>,
186    ) -> Self {
187        Self {
188            id,
189            upstream,
190            downstream,
191            name,
192        }
193    }
194
195    /// Unique identifier of this edge in the graph.
196    #[inline]
197    pub fn id(&self) -> &EdgeIndex {
198        &self.id
199    }
200
201    /// Identifier of the upstream node / port.
202    #[inline]
203    pub fn upstream(&self) -> &PortId {
204        &self.upstream
205    }
206
207    /// Identifier of the downstream node / port.
208    #[inline]
209    pub fn downstream(&self) -> &PortId {
210        &self.downstream
211    }
212
213    /// Optional static name (for diagnostics or graph tooling).
214    #[inline]
215    pub fn name(&self) -> Option<&'static str> {
216        self.name
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    use crate::edge::bench::TestSpscRingBuf;
225    use crate::edge::{Edge, EnqueueResult};
226    use crate::errors::QueueError;
227    use crate::memory::manager::MemoryManager;
228    use crate::memory::static_manager::StaticMemoryManager;
229    use crate::message::{Message, MessageHeader};
230    use crate::policy::{AdmissionPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
231    use crate::prelude::{create_test_tensor_filled_with, TestTensor};
232    use crate::types::{EdgeIndex, NodeIndex, PortId, PortIndex, Ticks};
233
234    const POLICY: EdgePolicy = EdgePolicy::new(
235        QueueCaps::new(8, 6, None, None),
236        AdmissionPolicy::DropNewest,
237        OverBudgetAction::Drop,
238    );
239
240    const MGR_DEPTH: usize = 32;
241
242    fn make_msg_tensor(tick: u64) -> Message<TestTensor> {
243        let mut h = MessageHeader::empty();
244        h.set_creation_tick(Ticks::new(tick));
245        Message::new(h, create_test_tensor_filled_with(0))
246    }
247
248    fn make_link() -> EdgeLink<TestSpscRingBuf<16>> {
249        let queue = TestSpscRingBuf::<16>::new();
250        let id = EdgeIndex::new(0);
251        let upstream_port = PortId::new(NodeIndex::new(0), PortIndex::new(0));
252        let downstream_port = PortId::new(NodeIndex::new(1), PortIndex::new(0));
253
254        EdgeLink::new(
255            queue,
256            id,
257            upstream_port,
258            downstream_port,
259            POLICY,
260            Some("edge:hi"),
261        )
262    }
263
264    // --- Run the full Edge contract suite against EdgeLink ---
265    crate::run_edge_contract_tests!(edge_link_contract, || make_link());
266
267    #[test]
268    fn edge_link_metadata_accessors_and_descriptor() {
269        let link = make_link();
270
271        assert_eq!(link.id(), &EdgeIndex::new(0));
272        assert_eq!(
273            link.upstream_port(),
274            &PortId::new(NodeIndex::new(0), PortIndex::new(0))
275        );
276        assert_eq!(
277            link.downstream_port(),
278            &PortId::new(NodeIndex::new(1), PortIndex::new(0))
279        );
280        assert_eq!(link.policy(), &POLICY);
281        assert_eq!(link.name(), Some("edge:hi"));
282
283        let d = link.descriptor();
284        assert_eq!(d.id(), &EdgeIndex::new(0));
285        assert_eq!(
286            d.upstream(),
287            &PortId::new(NodeIndex::new(0), PortIndex::new(0))
288        );
289        assert_eq!(
290            d.downstream(),
291            &PortId::new(NodeIndex::new(1), PortIndex::new(0))
292        );
293        assert_eq!(d.name(), Some("edge:hi"));
294    }
295
296    #[test]
297    fn edge_link_forwards_to_inner_queue() {
298        let mut link = make_link();
299        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
300
301        // Push a token via the link.
302        let m = make_msg_tensor(42);
303        let token = mgr.store(m).expect("store");
304        assert_eq!(link.try_push(token, &POLICY, &mgr), EnqueueResult::Enqueued);
305
306        // Peek via the link.
307        let peek_token = link.try_peek().expect("peek");
308        assert_eq!(peek_token, token);
309        let peek_h = mgr.peek_header(peek_token).expect("peek header");
310        assert_eq!(*peek_h.creation_tick(), Ticks::new(42));
311
312        // Pop via the link.
313        let popped = link.try_pop(&mgr).expect("pop");
314        assert_eq!(popped, token);
315        let popped_h = mgr.peek_header(popped).expect("popped header");
316        assert_eq!(*popped_h.creation_tick(), Ticks::new(42));
317
318        // Back to empty.
319        assert!(link.is_empty());
320        assert!(matches!(link.try_pop(&mgr), Err(QueueError::Empty)));
321    }
322
323    #[test]
324    fn edge_link_occupancy_delegates() {
325        let mut link = make_link();
326        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
327
328        let occ0 = link.occupancy(&POLICY);
329        assert_eq!(*occ0.items(), 0usize);
330
331        let t1 = mgr.store(make_msg_tensor(1)).expect("store");
332        let t2 = mgr.store(make_msg_tensor(2)).expect("store");
333        assert_eq!(link.try_push(t1, &POLICY, &mgr), EnqueueResult::Enqueued);
334        assert_eq!(link.try_push(t2, &POLICY, &mgr), EnqueueResult::Enqueued);
335
336        let occ2 = link.occupancy(&POLICY);
337        assert_eq!(*occ2.items(), 2usize);
338    }
339
340    #[test]
341    fn edge_link_queue_accessor() {
342        let mut link = make_link();
343        let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
344
345        // Push via the link, then verify through the inner queue accessor.
346        let token = mgr.store(make_msg_tensor(7)).expect("store");
347        assert_eq!(link.try_push(token, &POLICY, &mgr), EnqueueResult::Enqueued);
348
349        assert!(!link.queue().is_empty());
350
351        // Pop via inner queue_mut.
352        let popped = link.queue_mut().try_pop(&mgr).expect("pop via queue_mut");
353        assert_eq!(popped, token);
354        assert!(link.queue().is_empty());
355    }
356}