1use crate::{
4 edge::{Edge, EdgeOccupancy, EnqueueResult},
5 errors::QueueError,
6 policy::EdgePolicy,
7 prelude::{BatchView, HeaderStore},
8 types::{EdgeIndex, MessageToken, PortId},
9};
10
11#[non_exhaustive]
20#[derive(Debug)]
21pub struct EdgeLink<Q>
22where
23 Q: Edge,
24{
25 queue: Q,
27
28 id: EdgeIndex,
30
31 upstream_port: PortId,
33
34 downstream_port: PortId,
36
37 policy: EdgePolicy,
39
40 name: Option<&'static str>,
42}
43
44impl<Q> EdgeLink<Q>
45where
46 Q: Edge,
47{
48 #[inline]
50 pub fn new(
51 queue: Q,
52 id: EdgeIndex,
53 upstream_port: PortId,
54 downstream_port: PortId,
55 policy: EdgePolicy,
56 name: Option<&'static str>,
57 ) -> Self {
58 Self {
59 queue,
60 id,
61 upstream_port,
62 downstream_port,
63 policy,
64 name,
65 }
66 }
67
68 #[inline]
70 pub fn queue(&self) -> &Q {
71 &self.queue
72 }
73
74 #[inline]
76 pub fn queue_mut(&mut self) -> &mut Q {
77 &mut self.queue
78 }
79
80 #[inline]
82 pub fn id(&self) -> &EdgeIndex {
83 &self.id
84 }
85
86 #[inline]
88 pub fn upstream_port(&self) -> &PortId {
89 &self.upstream_port
90 }
91
92 #[inline]
94 pub fn downstream_port(&self) -> &PortId {
95 &self.downstream_port
96 }
97
98 #[inline]
100 pub fn policy(&self) -> &EdgePolicy {
101 &self.policy
102 }
103
104 #[inline]
106 pub fn name(&self) -> Option<&'static str> {
107 self.name
108 }
109
110 #[inline]
112 pub fn descriptor(&self) -> EdgeDescriptor {
113 EdgeDescriptor {
114 id: self.id,
115 upstream: self.upstream_port,
116 downstream: self.downstream_port,
117 name: self.name,
118 }
119 }
120}
121
122impl<Q> Edge for EdgeLink<Q>
123where
124 Q: Edge,
125{
126 fn try_push<H: HeaderStore>(
127 &mut self,
128 token: MessageToken,
129 policy: &EdgePolicy,
130 headers: &H,
131 ) -> EnqueueResult {
132 self.queue.try_push(token, policy, headers)
133 }
134
135 fn try_pop<H: HeaderStore>(&mut self, headers: &H) -> Result<MessageToken, QueueError> {
136 self.queue.try_pop(headers)
137 }
138
139 fn occupancy(&self, policy: &EdgePolicy) -> EdgeOccupancy {
140 self.queue.occupancy(policy)
141 }
142
143 fn is_empty(&self) -> bool {
144 self.queue.is_empty()
145 }
146
147 fn try_peek(&self) -> Result<MessageToken, QueueError> {
148 self.queue.try_peek()
149 }
150
151 fn try_peek_at(&self, index: usize) -> Result<MessageToken, QueueError> {
152 self.queue.try_peek_at(index)
153 }
154
155 fn try_pop_batch<H: HeaderStore>(
156 &mut self,
157 policy: &crate::policy::BatchingPolicy,
158 headers: &H,
159 ) -> Result<BatchView<'_, MessageToken>, QueueError> {
160 self.queue.try_pop_batch(policy, headers)
161 }
162}
163
164#[non_exhaustive]
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub struct EdgeDescriptor {
168 id: EdgeIndex,
170 upstream: PortId,
172 downstream: PortId,
174 name: Option<&'static str>,
176}
177
178impl EdgeDescriptor {
179 #[inline]
181 pub fn new(
182 id: EdgeIndex,
183 upstream: PortId,
184 downstream: PortId,
185 name: Option<&'static str>,
186 ) -> Self {
187 Self {
188 id,
189 upstream,
190 downstream,
191 name,
192 }
193 }
194
195 #[inline]
197 pub fn id(&self) -> &EdgeIndex {
198 &self.id
199 }
200
201 #[inline]
203 pub fn upstream(&self) -> &PortId {
204 &self.upstream
205 }
206
207 #[inline]
209 pub fn downstream(&self) -> &PortId {
210 &self.downstream
211 }
212
213 #[inline]
215 pub fn name(&self) -> Option<&'static str> {
216 self.name
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 use crate::edge::bench::TestSpscRingBuf;
225 use crate::edge::{Edge, EnqueueResult};
226 use crate::errors::QueueError;
227 use crate::memory::manager::MemoryManager;
228 use crate::memory::static_manager::StaticMemoryManager;
229 use crate::message::{Message, MessageHeader};
230 use crate::policy::{AdmissionPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
231 use crate::prelude::{create_test_tensor_filled_with, TestTensor};
232 use crate::types::{EdgeIndex, NodeIndex, PortId, PortIndex, Ticks};
233
234 const POLICY: EdgePolicy = EdgePolicy::new(
235 QueueCaps::new(8, 6, None, None),
236 AdmissionPolicy::DropNewest,
237 OverBudgetAction::Drop,
238 );
239
240 const MGR_DEPTH: usize = 32;
241
242 fn make_msg_tensor(tick: u64) -> Message<TestTensor> {
243 let mut h = MessageHeader::empty();
244 h.set_creation_tick(Ticks::new(tick));
245 Message::new(h, create_test_tensor_filled_with(0))
246 }
247
248 fn make_link() -> EdgeLink<TestSpscRingBuf<16>> {
249 let queue = TestSpscRingBuf::<16>::new();
250 let id = EdgeIndex::new(0);
251 let upstream_port = PortId::new(NodeIndex::new(0), PortIndex::new(0));
252 let downstream_port = PortId::new(NodeIndex::new(1), PortIndex::new(0));
253
254 EdgeLink::new(
255 queue,
256 id,
257 upstream_port,
258 downstream_port,
259 POLICY,
260 Some("edge:hi"),
261 )
262 }
263
264 crate::run_edge_contract_tests!(edge_link_contract, || make_link());
266
267 #[test]
268 fn edge_link_metadata_accessors_and_descriptor() {
269 let link = make_link();
270
271 assert_eq!(link.id(), &EdgeIndex::new(0));
272 assert_eq!(
273 link.upstream_port(),
274 &PortId::new(NodeIndex::new(0), PortIndex::new(0))
275 );
276 assert_eq!(
277 link.downstream_port(),
278 &PortId::new(NodeIndex::new(1), PortIndex::new(0))
279 );
280 assert_eq!(link.policy(), &POLICY);
281 assert_eq!(link.name(), Some("edge:hi"));
282
283 let d = link.descriptor();
284 assert_eq!(d.id(), &EdgeIndex::new(0));
285 assert_eq!(
286 d.upstream(),
287 &PortId::new(NodeIndex::new(0), PortIndex::new(0))
288 );
289 assert_eq!(
290 d.downstream(),
291 &PortId::new(NodeIndex::new(1), PortIndex::new(0))
292 );
293 assert_eq!(d.name(), Some("edge:hi"));
294 }
295
296 #[test]
297 fn edge_link_forwards_to_inner_queue() {
298 let mut link = make_link();
299 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
300
301 let m = make_msg_tensor(42);
303 let token = mgr.store(m).expect("store");
304 assert_eq!(link.try_push(token, &POLICY, &mgr), EnqueueResult::Enqueued);
305
306 let peek_token = link.try_peek().expect("peek");
308 assert_eq!(peek_token, token);
309 let peek_h = mgr.peek_header(peek_token).expect("peek header");
310 assert_eq!(*peek_h.creation_tick(), Ticks::new(42));
311
312 let popped = link.try_pop(&mgr).expect("pop");
314 assert_eq!(popped, token);
315 let popped_h = mgr.peek_header(popped).expect("popped header");
316 assert_eq!(*popped_h.creation_tick(), Ticks::new(42));
317
318 assert!(link.is_empty());
320 assert!(matches!(link.try_pop(&mgr), Err(QueueError::Empty)));
321 }
322
323 #[test]
324 fn edge_link_occupancy_delegates() {
325 let mut link = make_link();
326 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
327
328 let occ0 = link.occupancy(&POLICY);
329 assert_eq!(*occ0.items(), 0usize);
330
331 let t1 = mgr.store(make_msg_tensor(1)).expect("store");
332 let t2 = mgr.store(make_msg_tensor(2)).expect("store");
333 assert_eq!(link.try_push(t1, &POLICY, &mgr), EnqueueResult::Enqueued);
334 assert_eq!(link.try_push(t2, &POLICY, &mgr), EnqueueResult::Enqueued);
335
336 let occ2 = link.occupancy(&POLICY);
337 assert_eq!(*occ2.items(), 2usize);
338 }
339
340 #[test]
341 fn edge_link_queue_accessor() {
342 let mut link = make_link();
343 let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
344
345 let token = mgr.store(make_msg_tensor(7)).expect("store");
347 assert_eq!(link.try_push(token, &POLICY, &mgr), EnqueueResult::Enqueued);
348
349 assert!(!link.queue().is_empty());
350
351 let popped = link.queue_mut().try_pop(&mgr).expect("pop via queue_mut");
353 assert_eq!(popped, token);
354 assert!(link.queue().is_empty());
355 }
356}