use super::*;
use crate::compute::{BackendCapabilities, ComputeBackend, ComputeModel, ModelMetadata};
use crate::errors::{InferenceError, InferenceErrorKind};
use crate::memory::{MemoryClass, PlacementAcceptance};
use crate::message::Message;
use crate::message::{MessageFlags, MessageHeader};
use crate::node::model::InferenceModel;
use crate::node::sink::Sink;
use crate::node::source::Source;
use crate::prelude::{create_test_tensor_from_array, TestTensor, TEST_TENSOR_BYTE_COUNT};
use crate::types::{DeadlineNs, QoSClass, SequenceNumber, TraceId};
#[cfg(feature = "std")]
use crate::node::source::probe::{SourceIngressProbe, SourceIngressUpdater};
use core::fmt::Write;
fn random_test_node_delay(random_state: &mut u32, max_delay_microseconds: u32) {
if max_delay_microseconds == 0 {
return;
}
if *random_state == 0 {
*random_state = 1;
}
let mut current_state = *random_state;
current_state ^= current_state << 13;
current_state ^= current_state >> 17;
current_state ^= current_state << 5;
*random_state = current_state;
let delay_microseconds = (current_state % max_delay_microseconds) + 1;
let assumed_cpu_frequency_hertz: u32 = 2_000_000_000; let estimated_cpu_cycles_per_loop_iteration: u32 = 8;
let cycles_per_microsecond = assumed_cpu_frequency_hertz / 1_000_000;
let mut iterations_per_microsecond =
cycles_per_microsecond / estimated_cpu_cycles_per_loop_iteration;
if iterations_per_microsecond == 0 {
iterations_per_microsecond = 1;
}
let total_iterations = delay_microseconds.saturating_mul(iterations_per_microsecond);
for _iteration in 0..total_iterations {
core::hint::spin_loop();
}
}
#[inline]
fn create_test_tensor_from_counter(counter: u32) -> TestTensor {
let counter_modulo_nine_digits = counter % 1_000_000_000;
let digit_0 = (counter_modulo_nine_digits / 100_000_000) % 10;
let digit_1 = (counter_modulo_nine_digits / 10_000_000) % 10;
let digit_2 = (counter_modulo_nine_digits / 1_000_000) % 10;
let digit_3 = (counter_modulo_nine_digits / 100_000) % 10;
let digit_4 = (counter_modulo_nine_digits / 10_000) % 10;
let digit_5 = (counter_modulo_nine_digits / 1_000) % 10;
let digit_6 = (counter_modulo_nine_digits / 100) % 10;
let digit_7 = (counter_modulo_nine_digits / 10) % 10;
let digit_8 = counter_modulo_nine_digits % 10;
create_test_tensor_from_array([
[digit_0, digit_1, digit_2],
[digit_3, digit_4, digit_5],
[digit_6, digit_7, digit_8],
])
}
pub struct TestCounterSourceTensor<Clock, const BACKLOG_CAP: usize>
where
Clock: PlatformClock,
{
clock: Clock,
next_counter_value_to_emit: u32,
trace_id: TraceId,
next_sequence: SequenceNumber,
deadline_ns: Option<DeadlineNs>,
qos: QoSClass,
flags: MessageFlags,
node_capabilities: NodeCapabilities,
node_policy: NodePolicy,
output_placement_acceptance: [PlacementAcceptance; 1],
ingress_policy: EdgePolicy,
backlog: [Option<Message<TestTensor>>; BACKLOG_CAP],
backlog_head: usize,
backlog_len: usize,
backlog_bytes: usize,
#[cfg(feature = "std")]
ingress_probe: Option<SourceIngressProbe>,
#[cfg(feature = "std")]
ingress_updater: Option<SourceIngressUpdater>,
}
impl<Clock, const BACKLOG_CAP: usize> TestCounterSourceTensor<Clock, BACKLOG_CAP>
where
Clock: PlatformClock,
{
#[allow(clippy::too_many_arguments)]
pub const fn new(
clock: Clock,
starting_value_inclusive: u32,
trace_id: TraceId,
starting_sequence: SequenceNumber,
deadline_ns: Option<DeadlineNs>,
qos: QoSClass,
flags: MessageFlags,
node_capabilities: NodeCapabilities,
node_policy: NodePolicy,
output_placement_acceptance: [PlacementAcceptance; 1],
ingress_policy: EdgePolicy,
) -> Self {
if BACKLOG_CAP < ingress_policy.caps.max_items {
panic!(
"TestCounterSourceTensor: backlog capacity must be >= ingress_policy.caps.max_items"
);
}
Self {
clock,
next_counter_value_to_emit: starting_value_inclusive,
trace_id,
next_sequence: starting_sequence,
deadline_ns,
qos,
flags,
node_capabilities,
node_policy,
output_placement_acceptance,
ingress_policy,
backlog: [None; BACKLOG_CAP],
backlog_head: 0usize,
backlog_len: 0usize,
backlog_bytes: 0usize,
#[cfg(feature = "std")]
ingress_probe: None,
#[cfg(feature = "std")]
ingress_updater: None,
}
}
#[cfg(feature = "std")]
pub fn with_probe(mut self, probe: SourceIngressProbe, updater: SourceIngressUpdater) -> Self {
self.ingress_probe = Some(probe);
self.ingress_updater = Some(updater);
self
}
#[inline]
fn make_message(&self) -> Message<TestTensor> {
Message::new(
MessageHeader::new(
self.trace_id,
self.next_sequence,
self.clock.now_ticks(),
self.deadline_ns,
self.qos,
TEST_TENSOR_BYTE_COUNT,
self.flags,
MemoryClass::Host,
),
create_test_tensor_from_counter(self.next_counter_value_to_emit),
)
}
#[inline]
pub fn produce_n_items_in_backlog(&mut self, n: usize) {
let mut to_add = n;
while to_add > 0 && self.backlog_len < BACKLOG_CAP {
let tail = (self.backlog_head + self.backlog_len) % BACKLOG_CAP;
self.backlog[tail] = Some(self.make_message());
self.backlog_len += 1;
to_add = to_add.saturating_sub(1);
self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
self.next_counter_value_to_emit = self.next_counter_value_to_emit.wrapping_add(1);
self.next_sequence = SequenceNumber::new(self.next_sequence.as_u64().wrapping_add(1));
}
}
#[inline]
fn try_pop_from_backlog(&mut self) -> Option<Message<TestTensor>> {
if self.backlog_len == 0 {
return None;
}
let head_index = self.backlog_head;
let message = self.backlog[head_index].take();
self.backlog_head = (self.backlog_head + 1) % BACKLOG_CAP;
self.backlog_len = self.backlog_len.saturating_sub(1);
self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
message
}
#[inline]
fn random_backlog_add_count(&self) -> usize {
let now_ticks_u64 = *self.clock.now_ticks().as_u64();
if (now_ticks_u64 & 1) == 0 {
1
} else {
2
}
}
}
impl<Clock, const BACKLOG_CAP: usize> Source<TestTensor, 1>
for TestCounterSourceTensor<Clock, BACKLOG_CAP>
where
Clock: PlatformClock,
{
type Error = core::convert::Infallible;
#[inline]
fn open(&mut self) -> Result<(), Self::Error> {
Ok(())
}
#[inline]
fn try_produce(&mut self) -> Option<(usize, Message<TestTensor>)> {
#[cfg(feature = "std")]
let mut random_seed: u32 = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|e| e.duration());
(now.as_nanos() & 0xFFFF_FFFF) as u32
};
#[cfg(not(feature = "std"))]
let mut random_seed = 1;
random_test_node_delay(&mut random_seed, 250);
self.produce_n_items_in_backlog(self.random_backlog_add_count());
self.try_pop_from_backlog().map(|message| (0, message))
}
#[inline]
fn ingress_occupancy(&self) -> EdgeOccupancy {
#[cfg(feature = "std")]
if let Some(probe) = &self.ingress_probe {
return probe.occupancy(&self.ingress_policy());
}
let items = self.backlog_len;
let bytes = self.backlog_bytes;
EdgeOccupancy::new(items, bytes, self.ingress_policy.watermark(items, bytes))
}
#[inline]
fn output_acceptance(&self) -> [PlacementAcceptance; 1] {
self.output_placement_acceptance
}
#[inline]
fn capabilities(&self) -> NodeCapabilities {
self.node_capabilities
}
#[inline]
fn policy(&self) -> NodePolicy {
self.node_policy
}
fn ingress_policy(&self) -> EdgePolicy {
self.ingress_policy
}
#[inline]
fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64> {
if (self.backlog_len == 0) || (item_index >= self.backlog_len) {
return None;
}
Some(
*self.backlog[item_index]
.unwrap()
.header()
.creation_tick()
.as_u64(),
)
}
}
pub struct TestTensorModel;
impl ComputeModel<TestTensor, TestTensor> for TestTensorModel {
#[inline]
fn init(&mut self) -> Result<(), InferenceError> {
Ok(())
}
#[inline]
fn infer_one(&mut self, inp: &TestTensor, out: &mut TestTensor) -> Result<(), InferenceError> {
#[cfg(feature = "std")]
let mut random_seed: u32 = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|e| e.duration());
(now.as_nanos() & 0xFFFF_FFFF) as u32
};
#[cfg(not(feature = "std"))]
let mut random_seed = 1;
random_test_node_delay(&mut random_seed, 500);
*out = *inp;
Ok(())
}
#[inline]
fn infer_batch(
&mut self,
inputs: crate::message::batch::Batch<'_, TestTensor>,
outputs: &mut [TestTensor],
) -> Result<(), InferenceError> {
#[cfg(feature = "std")]
let mut random_seed: u32 = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|e| e.duration());
(now.as_nanos() & 0xFFFF_FFFF) as u32
};
#[cfg(not(feature = "std"))]
let mut random_seed = 1;
random_test_node_delay(&mut random_seed, 1000);
let in_msgs = inputs.messages();
let in_len = in_msgs.len();
if outputs.len() < in_len {
return Err(InferenceError::new(InferenceErrorKind::ExecutionFailed, 0));
}
for (o, m) in outputs.iter_mut().zip(in_msgs.iter()) {
*o = *m.payload();
}
Ok(())
}
#[inline]
fn drain(&mut self) -> Result<(), InferenceError> {
Ok(())
}
#[inline]
fn reset(&mut self) -> Result<(), InferenceError> {
Ok(())
}
#[inline]
fn metadata(&self) -> ModelMetadata {
ModelMetadata::new(MemoryClass::Host, MemoryClass::Host, None, None)
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct TestTensorBackend;
impl ComputeBackend<TestTensor, TestTensor> for TestTensorBackend {
type Model = TestTensorModel;
type Error = InferenceError;
type ModelDescriptor<'d> = ();
#[inline]
fn capabilities(&self) -> BackendCapabilities {
BackendCapabilities::new(false, Some(usize::MAX), 0)
}
#[inline]
fn load_model<'d>(&self, _desc: Self::ModelDescriptor<'d>) -> Result<Self::Model, Self::Error> {
Ok(TestTensorModel)
}
}
pub type TestIdentityModelNodeTensor<const MAX_BATCH: usize> =
InferenceModel<TestTensorBackend, TestTensor, TestTensor, MAX_BATCH>;
impl<const MAX_BATCH: usize> TestIdentityModelNodeTensor<MAX_BATCH> {
#[inline]
pub fn new_identity(
node_capabilities: NodeCapabilities,
node_policy: NodePolicy,
input_placement_acceptance: [PlacementAcceptance; 1],
output_placement_acceptance: [PlacementAcceptance; 1],
) -> Result<Self, InferenceError> {
let backend = TestTensorBackend;
InferenceModel::new(
backend,
(),
node_policy,
node_capabilities,
input_placement_acceptance,
output_placement_acceptance,
)
}
#[inline]
pub fn kind() -> NodeKind {
NodeKind::Model
}
}
pub struct TestSinkNodeTensor {
node_capabilities: NodeCapabilities,
node_policy: NodePolicy,
input_placement_acceptance: [PlacementAcceptance; 1],
printer: fn(&str),
processed: u32,
}
impl TestSinkNodeTensor {
pub const fn new(
node_capabilities: NodeCapabilities,
node_policy: NodePolicy,
input_placement_acceptance: [PlacementAcceptance; 1],
printer: fn(&str),
) -> Self {
Self {
node_capabilities,
node_policy,
input_placement_acceptance,
printer,
processed: 0,
}
}
pub fn processed(&self) -> &u32 {
&self.processed
}
}
struct FixedBuf<const N: usize> {
buf: [u8; N],
len: usize,
}
impl<const N: usize> FixedBuf<N> {
#[inline]
const fn new() -> Self {
Self {
buf: [0; N],
len: 0,
}
}
#[inline]
fn as_str(&self) -> &str {
core::str::from_utf8(&self.buf[..self.len]).unwrap_or_default()
}
}
impl<const N: usize> core::fmt::Write for FixedBuf<N> {
fn write_str(&mut self, s: &str) -> core::fmt::Result {
let bytes = s.as_bytes();
let remaining = N.saturating_sub(self.len);
if remaining == 0 {
return Ok(());
}
let mut to_copy = core::cmp::min(bytes.len(), remaining);
let dst = &mut self.buf[self.len..self.len + to_copy];
for (d, &b) in dst.iter_mut().zip(&bytes[..to_copy]) {
*d = b;
}
while to_copy > 0 {
let last = self.buf[self.len + to_copy - 1];
if (last & 0xC0) != 0x80 {
break;
}
to_copy -= 1;
}
if to_copy == 0 {
return Ok(());
}
self.len += to_copy;
Ok(())
}
}
impl Sink<TestTensor, 1> for TestSinkNodeTensor {
type Error = core::convert::Infallible;
#[inline]
fn open(&mut self) -> Result<(), Self::Error> {
Ok(())
}
#[inline]
fn consume(&mut self, msg: &Message<TestTensor>) -> Result<(), Self::Error> {
#[cfg(feature = "std")]
let mut random_seed: u32 = {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|e| e.duration());
(now.as_nanos() & 0xFFFF_FFFF) as u32
};
#[cfg(not(feature = "std"))]
let mut random_seed = 1;
random_test_node_delay(&mut random_seed, 100);
let mut buf: FixedBuf<1024> = FixedBuf::new();
let _ = core::write!(&mut buf, "{:?}", msg);
(self.printer)(buf.as_str());
self.processed += 1;
Ok(())
}
#[inline]
fn input_acceptance(&self) -> [PlacementAcceptance; 1] {
self.input_placement_acceptance
}
#[inline]
fn capabilities(&self) -> NodeCapabilities {
self.node_capabilities
}
#[inline]
fn policy(&self) -> NodePolicy {
self.node_policy
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::MessageFlags;
use crate::policy::{AdmissionPolicy, OverBudgetAction, QueueCaps};
use crate::prelude::NoStdLinuxMonotonicClock;
use crate::types::{NodeIndex, SequenceNumber, TraceId};
const TEST_INGRESS_POLICY: EdgePolicy = EdgePolicy::new(
QueueCaps::new(16, 14, None, None),
AdmissionPolicy::DropNewest,
OverBudgetAction::Drop,
);
crate::run_node_contract_tests!(test_counter_source_contract, {
make_nodelink: || {
let clock = NoStdLinuxMonotonicClock::new();
let start_value = 0u32;
let trace_id = TraceId::new(1);
let seq = SequenceNumber::new(1);
let deadline = None;
let qos = crate::types::QoSClass::BestEffort;
let flags = MessageFlags::empty();
let node_caps = crate::node::NodeCapabilities::default();
let node_policy = crate::policy::NodePolicy::default();
let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
let ingress_policy = TEST_INGRESS_POLICY;
let src: TestCounterSourceTensor<_, 16> = TestCounterSourceTensor::new(
clock,
start_value,
trace_id,
seq,
deadline,
qos,
flags,
node_caps,
node_policy,
output_accept,
ingress_policy,
);
let src_node = src.into_sourcenode(crate::policy::NodePolicy::default());
crate::node::link::NodeLink::new(src_node, NodeIndex::new(0), Some("test-counter-source"))
}
});
crate::run_node_contract_tests!(test_identity_model_contract, {
make_nodelink: || {
let node_caps = crate::node::NodeCapabilities::default();
let node_policy = crate::policy::NodePolicy::default();
let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
let node = TestIdentityModelNodeTensor::<8>::new_identity(
node_caps,
node_policy,
input_accept,
output_accept,
)
.expect("create identity model node");
crate::node::link::NodeLink::new(node, NodeIndex::new(0), Some("test-identity-model"))
}
});
crate::run_node_contract_tests!(test_sink_node_contract, {
make_nodelink: || {
let node_caps = crate::node::NodeCapabilities::default();
let node_policy = crate::policy::NodePolicy::default();
let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
let sink = TestSinkNodeTensor::new(node_caps, node_policy, input_accept, |_s| {});
let sink_node = crate::node::sink::SinkNode::from(sink);
crate::node::link::NodeLink::new(sink_node, NodeIndex::new(0), Some("test-sink"))
}
});
}