1use super::*;
4
5use crate::compute::{BackendCapabilities, ComputeBackend, ComputeModel, ModelMetadata};
6use crate::errors::{InferenceError, InferenceErrorKind};
7use crate::memory::{MemoryClass, PlacementAcceptance};
8use crate::message::Message;
9use crate::message::{MessageFlags, MessageHeader};
10use crate::node::model::InferenceModel;
11use crate::node::sink::Sink;
12use crate::node::source::Source;
13use crate::prelude::{create_test_tensor_from_array, TestTensor, TEST_TENSOR_BYTE_COUNT};
14use crate::types::{DeadlineNs, QoSClass, SequenceNumber, TraceId};
15
16#[cfg(feature = "std")]
17use crate::node::source::probe::{SourceIngressProbe, SourceIngressUpdater};
18
19use core::fmt::Write;
20
21fn random_test_node_delay(random_state: &mut u32, max_delay_microseconds: u32) {
28 if max_delay_microseconds == 0 {
30 return;
31 }
32
33 if *random_state == 0 {
35 *random_state = 1;
36 }
37 let mut current_state = *random_state;
38 current_state ^= current_state << 13;
39 current_state ^= current_state >> 17;
40 current_state ^= current_state << 5;
41 *random_state = current_state;
42
43 let delay_microseconds = (current_state % max_delay_microseconds) + 1;
45
46 let assumed_cpu_frequency_hertz: u32 = 2_000_000_000; let estimated_cpu_cycles_per_loop_iteration: u32 = 8;
49
50 let cycles_per_microsecond = assumed_cpu_frequency_hertz / 1_000_000;
52
53 let mut iterations_per_microsecond =
55 cycles_per_microsecond / estimated_cpu_cycles_per_loop_iteration;
56 if iterations_per_microsecond == 0 {
57 iterations_per_microsecond = 1;
58 }
59
60 let total_iterations = delay_microseconds.saturating_mul(iterations_per_microsecond);
61
62 for _iteration in 0..total_iterations {
63 core::hint::spin_loop();
64 }
65}
66
67#[inline]
81fn create_test_tensor_from_counter(counter: u32) -> TestTensor {
82 let counter_modulo_nine_digits = counter % 1_000_000_000;
83
84 let digit_0 = (counter_modulo_nine_digits / 100_000_000) % 10;
85 let digit_1 = (counter_modulo_nine_digits / 10_000_000) % 10;
86 let digit_2 = (counter_modulo_nine_digits / 1_000_000) % 10;
87 let digit_3 = (counter_modulo_nine_digits / 100_000) % 10;
88 let digit_4 = (counter_modulo_nine_digits / 10_000) % 10;
89 let digit_5 = (counter_modulo_nine_digits / 1_000) % 10;
90 let digit_6 = (counter_modulo_nine_digits / 100) % 10;
91 let digit_7 = (counter_modulo_nine_digits / 10) % 10;
92 let digit_8 = counter_modulo_nine_digits % 10;
93
94 create_test_tensor_from_array([
95 [digit_0, digit_1, digit_2],
96 [digit_3, digit_4, digit_5],
97 [digit_6, digit_7, digit_8],
98 ])
99}
100
101pub struct TestCounterSourceTensor<Clock, const BACKLOG_CAP: usize>
105where
106 Clock: PlatformClock,
107{
108 clock: Clock,
110
111 next_counter_value_to_emit: u32,
113
114 trace_id: TraceId,
116 next_sequence: SequenceNumber,
117 deadline_ns: Option<DeadlineNs>,
118 qos: QoSClass,
119 flags: MessageFlags,
120
121 node_capabilities: NodeCapabilities,
123 node_policy: NodePolicy,
124 output_placement_acceptance: [PlacementAcceptance; 1],
125 ingress_policy: EdgePolicy,
126
127 backlog: [Option<Message<TestTensor>>; BACKLOG_CAP],
131 backlog_head: usize,
132 backlog_len: usize,
133 backlog_bytes: usize,
134
135 #[cfg(feature = "std")]
137 ingress_probe: Option<SourceIngressProbe>,
138 #[cfg(feature = "std")]
139 ingress_updater: Option<SourceIngressUpdater>,
140}
141
142impl<Clock, const BACKLOG_CAP: usize> TestCounterSourceTensor<Clock, BACKLOG_CAP>
143where
144 Clock: PlatformClock,
145{
146 #[allow(clippy::too_many_arguments)]
148 pub const fn new(
149 clock: Clock,
150 starting_value_inclusive: u32,
151 trace_id: TraceId,
152 starting_sequence: SequenceNumber,
153 deadline_ns: Option<DeadlineNs>,
154 qos: QoSClass,
155 flags: MessageFlags,
156 node_capabilities: NodeCapabilities,
157 node_policy: NodePolicy,
158 output_placement_acceptance: [PlacementAcceptance; 1],
159 ingress_policy: EdgePolicy,
160 ) -> Self {
161 if BACKLOG_CAP < ingress_policy.caps.max_items {
164 panic!(
165 "TestCounterSourceTensor: backlog capacity must be >= ingress_policy.caps.max_items"
166 );
167 }
168
169 Self {
170 clock,
171 next_counter_value_to_emit: starting_value_inclusive,
172 trace_id,
173 next_sequence: starting_sequence,
174 deadline_ns,
175 qos,
176 flags,
177 node_capabilities,
178 node_policy,
179 output_placement_acceptance,
180 ingress_policy,
181 backlog: [None; BACKLOG_CAP],
182 backlog_head: 0usize,
183 backlog_len: 0usize,
184 backlog_bytes: 0usize,
185 #[cfg(feature = "std")]
186 ingress_probe: None,
187 #[cfg(feature = "std")]
188 ingress_updater: None,
189 }
190 }
191
192 #[cfg(feature = "std")]
194 pub fn with_probe(mut self, probe: SourceIngressProbe, updater: SourceIngressUpdater) -> Self {
195 self.ingress_probe = Some(probe);
196 self.ingress_updater = Some(updater);
197 self
198 }
199
200 #[inline]
201 fn make_message(&self) -> Message<TestTensor> {
202 Message::new(
203 MessageHeader::new(
204 self.trace_id,
205 self.next_sequence,
206 self.clock.now_ticks(),
207 self.deadline_ns,
208 self.qos,
209 TEST_TENSOR_BYTE_COUNT,
210 self.flags,
211 MemoryClass::Host,
212 ),
213 create_test_tensor_from_counter(self.next_counter_value_to_emit),
214 )
215 }
216
217 #[inline]
219 pub fn produce_n_items_in_backlog(&mut self, n: usize) {
220 let mut to_add = n;
223 while to_add > 0 && self.backlog_len < BACKLOG_CAP {
224 let tail = (self.backlog_head + self.backlog_len) % BACKLOG_CAP;
225 self.backlog[tail] = Some(self.make_message());
226
227 self.backlog_len += 1;
228 to_add = to_add.saturating_sub(1);
229 self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
230
231 self.next_counter_value_to_emit = self.next_counter_value_to_emit.wrapping_add(1);
232 self.next_sequence = SequenceNumber::new(self.next_sequence.as_u64().wrapping_add(1));
233 }
234 }
235
236 #[inline]
241 fn try_pop_from_backlog(&mut self) -> Option<Message<TestTensor>> {
242 if self.backlog_len == 0 {
243 return None;
244 }
245
246 let head_index = self.backlog_head;
247
248 let message = self.backlog[head_index].take();
250
251 self.backlog_head = (self.backlog_head + 1) % BACKLOG_CAP;
253 self.backlog_len = self.backlog_len.saturating_sub(1);
254
255 self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
257
258 message
259 }
260
261 #[inline]
262 fn random_backlog_add_count(&self) -> usize {
263 let now_ticks_u64 = *self.clock.now_ticks().as_u64();
266 if (now_ticks_u64 & 1) == 0 {
267 1
268 } else {
269 2
270 }
271 }
272}
273
274impl<Clock, const BACKLOG_CAP: usize> Source<TestTensor, 1>
275 for TestCounterSourceTensor<Clock, BACKLOG_CAP>
276where
277 Clock: PlatformClock,
278{
279 type Error = core::convert::Infallible;
280
281 #[inline]
282 fn open(&mut self) -> Result<(), Self::Error> {
283 Ok(())
284 }
285
286 #[inline]
287 fn try_produce(&mut self) -> Option<(usize, Message<TestTensor>)> {
288 #[cfg(feature = "std")]
290 let mut random_seed: u32 = {
291 let now = std::time::SystemTime::now()
292 .duration_since(std::time::UNIX_EPOCH)
293 .unwrap_or_else(|e| e.duration());
294 (now.as_nanos() & 0xFFFF_FFFF) as u32
295 };
296 #[cfg(not(feature = "std"))]
297 let mut random_seed = 1;
298 random_test_node_delay(&mut random_seed, 250);
299
300 self.produce_n_items_in_backlog(self.random_backlog_add_count());
302
303 self.try_pop_from_backlog().map(|message| (0, message))
305 }
306
307 #[inline]
308 fn ingress_occupancy(&self) -> EdgeOccupancy {
309 #[cfg(feature = "std")]
310 if let Some(probe) = &self.ingress_probe {
311 return probe.occupancy(&self.ingress_policy());
312 }
313
314 let items = self.backlog_len;
316 let bytes = self.backlog_bytes;
317 EdgeOccupancy::new(items, bytes, self.ingress_policy.watermark(items, bytes))
318 }
319
320 #[inline]
321 fn output_acceptance(&self) -> [PlacementAcceptance; 1] {
322 self.output_placement_acceptance
323 }
324
325 #[inline]
326 fn capabilities(&self) -> NodeCapabilities {
327 self.node_capabilities
328 }
329
330 #[inline]
331 fn policy(&self) -> NodePolicy {
332 self.node_policy
333 }
334
335 fn ingress_policy(&self) -> EdgePolicy {
336 self.ingress_policy
337 }
338
339 #[inline]
343 fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64> {
344 if (self.backlog_len == 0) || (item_index >= self.backlog_len) {
346 return None;
347 }
348
349 Some(
350 *self.backlog[item_index]
351 .unwrap()
352 .header()
353 .creation_tick()
354 .as_u64(),
355 )
356 }
357}
358
359pub struct TestTensorModel;
365
366impl ComputeModel<TestTensor, TestTensor> for TestTensorModel {
367 #[inline]
368 fn init(&mut self) -> Result<(), InferenceError> {
369 Ok(())
370 }
371
372 #[inline]
373 fn infer_one(&mut self, inp: &TestTensor, out: &mut TestTensor) -> Result<(), InferenceError> {
374 #[cfg(feature = "std")]
375 let mut random_seed: u32 = {
376 let now = std::time::SystemTime::now()
377 .duration_since(std::time::UNIX_EPOCH)
378 .unwrap_or_else(|e| e.duration());
379 (now.as_nanos() & 0xFFFF_FFFF) as u32
380 };
381 #[cfg(not(feature = "std"))]
382 let mut random_seed = 1;
383 random_test_node_delay(&mut random_seed, 500);
384
385 *out = *inp;
386 Ok(())
387 }
388
389 #[inline]
390 fn infer_batch(
391 &mut self,
392 inputs: crate::message::batch::Batch<'_, TestTensor>,
393 outputs: &mut [TestTensor],
394 ) -> Result<(), InferenceError> {
395 #[cfg(feature = "std")]
396 let mut random_seed: u32 = {
397 let now = std::time::SystemTime::now()
398 .duration_since(std::time::UNIX_EPOCH)
399 .unwrap_or_else(|e| e.duration());
400 (now.as_nanos() & 0xFFFF_FFFF) as u32
401 };
402 #[cfg(not(feature = "std"))]
403 let mut random_seed = 1;
404 random_test_node_delay(&mut random_seed, 1000);
405
406 let in_msgs = inputs.messages();
407 let in_len = in_msgs.len();
408
409 if outputs.len() < in_len {
410 return Err(InferenceError::new(InferenceErrorKind::ExecutionFailed, 0));
411 }
412
413 for (o, m) in outputs.iter_mut().zip(in_msgs.iter()) {
415 *o = *m.payload();
416 }
417
418 Ok(())
419 }
420
421 #[inline]
422 fn drain(&mut self) -> Result<(), InferenceError> {
423 Ok(())
424 }
425
426 #[inline]
427 fn reset(&mut self) -> Result<(), InferenceError> {
428 Ok(())
429 }
430
431 #[inline]
432 fn metadata(&self) -> ModelMetadata {
433 ModelMetadata::new(MemoryClass::Host, MemoryClass::Host, None, None)
434 }
435}
436
437#[derive(Clone, Copy, Debug, Default)]
439pub struct TestTensorBackend;
440
441impl ComputeBackend<TestTensor, TestTensor> for TestTensorBackend {
442 type Model = TestTensorModel;
443 type Error = InferenceError;
444
445 type ModelDescriptor<'d> = ();
447
448 #[inline]
449 fn capabilities(&self) -> BackendCapabilities {
450 BackendCapabilities::new(false, Some(usize::MAX), 0)
451 }
452
453 #[inline]
454 fn load_model<'d>(&self, _desc: Self::ModelDescriptor<'d>) -> Result<Self::Model, Self::Error> {
455 Ok(TestTensorModel)
456 }
457}
458
459pub type TestIdentityModelNodeTensor<const MAX_BATCH: usize> =
461 InferenceModel<TestTensorBackend, TestTensor, TestTensor, MAX_BATCH>;
462
463impl<const MAX_BATCH: usize> TestIdentityModelNodeTensor<MAX_BATCH> {
464 #[inline]
466 pub fn new_identity(
467 node_capabilities: NodeCapabilities,
468 node_policy: NodePolicy,
469 input_placement_acceptance: [PlacementAcceptance; 1],
470 output_placement_acceptance: [PlacementAcceptance; 1],
471 ) -> Result<Self, InferenceError> {
472 let backend = TestTensorBackend;
473 InferenceModel::new(
474 backend,
475 (),
476 node_policy,
477 node_capabilities,
478 input_placement_acceptance,
479 output_placement_acceptance,
480 )
481 }
482
483 #[inline]
485 pub fn kind() -> NodeKind {
486 NodeKind::Model
487 }
488}
489
490pub struct TestSinkNodeTensor {
497 node_capabilities: NodeCapabilities,
498 node_policy: NodePolicy,
499 input_placement_acceptance: [PlacementAcceptance; 1],
500 printer: fn(&str),
501 processed: u32,
502}
503
504impl TestSinkNodeTensor {
505 pub const fn new(
507 node_capabilities: NodeCapabilities,
508 node_policy: NodePolicy,
509 input_placement_acceptance: [PlacementAcceptance; 1],
510 printer: fn(&str),
511 ) -> Self {
512 Self {
513 node_capabilities,
514 node_policy,
515 input_placement_acceptance,
516 printer,
517 processed: 0,
518 }
519 }
520
521 pub fn processed(&self) -> &u32 {
523 &self.processed
524 }
525}
526
527struct FixedBuf<const N: usize> {
529 buf: [u8; N],
530 len: usize,
531}
532
533impl<const N: usize> FixedBuf<N> {
534 #[inline]
535 const fn new() -> Self {
536 Self {
537 buf: [0; N],
538 len: 0,
539 }
540 }
541
542 #[inline]
543 fn as_str(&self) -> &str {
544 core::str::from_utf8(&self.buf[..self.len]).unwrap_or_default()
545 }
546}
547
548impl<const N: usize> core::fmt::Write for FixedBuf<N> {
549 fn write_str(&mut self, s: &str) -> core::fmt::Result {
550 let bytes = s.as_bytes();
551 let remaining = N.saturating_sub(self.len);
553 if remaining == 0 {
554 return Ok(());
557 }
558
559 let mut to_copy = core::cmp::min(bytes.len(), remaining);
561
562 let dst = &mut self.buf[self.len..self.len + to_copy];
564 for (d, &b) in dst.iter_mut().zip(&bytes[..to_copy]) {
566 *d = b;
567 }
568
569 while to_copy > 0 {
578 let last = self.buf[self.len + to_copy - 1];
579 if (last & 0xC0) != 0x80 {
580 break;
582 }
583 to_copy -= 1;
584 }
585
586 if to_copy == 0 {
589 return Ok(());
591 }
592
593 self.len += to_copy;
595 Ok(())
596 }
597}
598
599impl Sink<TestTensor, 1> for TestSinkNodeTensor {
600 type Error = core::convert::Infallible;
601
602 #[inline]
603 fn open(&mut self) -> Result<(), Self::Error> {
604 Ok(())
605 }
606
607 #[inline]
608 fn consume(&mut self, msg: &Message<TestTensor>) -> Result<(), Self::Error> {
609 #[cfg(feature = "std")]
610 let mut random_seed: u32 = {
611 let now = std::time::SystemTime::now()
612 .duration_since(std::time::UNIX_EPOCH)
613 .unwrap_or_else(|e| e.duration());
614 (now.as_nanos() & 0xFFFF_FFFF) as u32
615 };
616 #[cfg(not(feature = "std"))]
617 let mut random_seed = 1;
618 random_test_node_delay(&mut random_seed, 100);
619
620 let mut buf: FixedBuf<1024> = FixedBuf::new();
621 let _ = core::write!(&mut buf, "{:?}", msg);
622 (self.printer)(buf.as_str());
623
624 self.processed += 1;
625
626 Ok(())
627 }
628
629 #[inline]
630 fn input_acceptance(&self) -> [PlacementAcceptance; 1] {
631 self.input_placement_acceptance
632 }
633
634 #[inline]
635 fn capabilities(&self) -> NodeCapabilities {
636 self.node_capabilities
637 }
638
639 #[inline]
640 fn policy(&self) -> NodePolicy {
641 self.node_policy
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use crate::message::MessageFlags;
649 use crate::policy::{AdmissionPolicy, OverBudgetAction, QueueCaps};
650 use crate::prelude::NoStdLinuxMonotonicClock;
651 use crate::types::{NodeIndex, SequenceNumber, TraceId};
652
653 const TEST_INGRESS_POLICY: EdgePolicy = EdgePolicy::new(
654 QueueCaps::new(16, 14, None, None),
655 AdmissionPolicy::DropNewest,
656 OverBudgetAction::Drop,
657 );
658
659 crate::run_node_contract_tests!(test_counter_source_contract, {
663 make_nodelink: || {
664 let clock = NoStdLinuxMonotonicClock::new();
666
667 let start_value = 0u32;
669 let trace_id = TraceId::new(1);
670 let seq = SequenceNumber::new(1);
671 let deadline = None;
672 let qos = crate::types::QoSClass::BestEffort;
673 let flags = MessageFlags::empty();
674 let node_caps = crate::node::NodeCapabilities::default();
675 let node_policy = crate::policy::NodePolicy::default();
676 let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
677 let ingress_policy = TEST_INGRESS_POLICY;
678
679 let src: TestCounterSourceTensor<_, 16> = TestCounterSourceTensor::new(
681 clock,
682 start_value,
683 trace_id,
684 seq,
685 deadline,
686 qos,
687 flags,
688 node_caps,
689 node_policy,
690 output_accept,
691 ingress_policy,
692 );
693
694 let src_node = src.into_sourcenode(crate::policy::NodePolicy::default());
697
698 crate::node::link::NodeLink::new(src_node, NodeIndex::new(0), Some("test-counter-source"))
700 }
701 });
702
703 crate::run_node_contract_tests!(test_identity_model_contract, {
707 make_nodelink: || {
708 let node_caps = crate::node::NodeCapabilities::default();
710 let node_policy = crate::policy::NodePolicy::default();
711 let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
712 let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
713
714 let node = TestIdentityModelNodeTensor::<8>::new_identity(
716 node_caps,
717 node_policy,
718 input_accept,
719 output_accept,
720 )
721 .expect("create identity model node");
722
723 crate::node::link::NodeLink::new(node, NodeIndex::new(0), Some("test-identity-model"))
725 }
726 });
727
728 crate::run_node_contract_tests!(test_sink_node_contract, {
732 make_nodelink: || {
733 let node_caps = crate::node::NodeCapabilities::default();
734 let node_policy = crate::policy::NodePolicy::default();
735 let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
736
737 let sink = TestSinkNodeTensor::new(node_caps, node_policy, input_accept, |_s| {});
739
740 let sink_node = crate::node::sink::SinkNode::from(sink);
742
743 crate::node::link::NodeLink::new(sink_node, NodeIndex::new(0), Some("test-sink"))
745 }
746 });
747}