use crate::edge::EdgeOccupancy;
use crate::graph::bench::TestPipeline;
use crate::graph::GraphApi;
use crate::memory::PlacementAcceptance;
use crate::message::MessageFlags;
use crate::node::bench::{
TestCounterSourceTensor, TestIdentityModelNodeTensor, TestSinkNodeTensor, TestTensorBackend,
};
use crate::node::NodeCapabilities;
use crate::policy::{
BatchingPolicy, BudgetPolicy, DeadlinePolicy, EdgePolicy, NodePolicy, SlidingWindow,
WatermarkState, WindowKind,
};
use crate::prelude::graph_telemetry::GraphTelemetry;
use crate::prelude::linux::NoStdLinuxMonotonicClock;
use crate::prelude::sink::{fixed_buffer_line_writer, FixedBuffer, FmtLineWriter};
use crate::prelude::TestTensor;
use crate::runtime::bench::TestNoStdRuntime;
use crate::runtime::LimenRuntime;
use crate::types::{QoSClass, SequenceNumber, Ticks, TraceId};
type Q32 = crate::edge::bench::TestSpscRingBuf<32>;
const INGRESS_POLICY: EdgePolicy = EdgePolicy {
caps: crate::policy::QueueCaps {
max_items: 32,
soft_items: 32,
max_bytes: None,
soft_bytes: None,
},
over_budget: crate::policy::OverBudgetAction::Drop,
admission: crate::policy::AdmissionPolicy::DropOldest,
};
const TEST_MAX_BATCH: usize = 32;
type MapNode = TestIdentityModelNodeTensor<TEST_MAX_BATCH>;
const LARGE_DELTA_T: Ticks = Ticks::new(1_000_000_000_000u64);
type NoStdTestTelemetry = GraphTelemetry<3, 3, FmtLineWriter<FixedBuffer<2048>>>;
type NoStdTestClock = NoStdLinuxMonotonicClock;
#[test]
fn core_pipeline_runs_with_nostd_runtime() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = MapNode::new(
TestTensorBackend,
(),
node_policy,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
graph.validate_graph().unwrap();
let mut occ: [EdgeOccupancy; 3] = [EdgeOccupancy::new(0, 0, WatermarkState::AtOrAboveHard); 3];
graph.write_all_edge_occupancies(&mut occ).unwrap();
#[cfg(feature = "std")]
println!(
"--- [initial_graph_occupancies] --- {:?}\n",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
for _ in 0..9 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
graph.validate_graph().unwrap();
assert!(
!<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::is_stopping(&runtime)
);
<TestNoStdRuntime<
NoStdTestClock,
GraphTelemetry<3, 3, FmtLineWriter<FixedBuffer<2048>>>,
3,
3,
> as LimenRuntime<TestPipeline<NoStdTestClock>, 3, 3>>::request_stop(&mut runtime);
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_disjoint_fixed_n() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::fixed(3),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::fixed(3),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 3, "edge1 (src->map) should have 3 items");
assert_eq!(*occ[2].items(), 6, "edge2 (map->snk) should have 6 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_disjoint_max_delta_t() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::delta_t(LARGE_DELTA_T),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 1, "edge1 should have 1 item");
assert_eq!(*occ[2].items(), 0, "edge2 should have 0 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_disjoint_fixed_n_and_max_delta_t() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::fixed(3),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::fixed_and_delta_t(3, LARGE_DELTA_T),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 3, "edge1 (src->map) should have 3 items");
assert_eq!(*occ[2].items(), 6, "edge2 (map->snk) should have 6 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_sliding_fixed_n() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::fixed(2),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::fixed_with_window(3, WindowKind::Sliding(SlidingWindow::new(1))),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 5, "edge1 (src->map) should have 5 items");
assert_eq!(*occ[2].items(), 5, "edge2 (map->snk) should have 5 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_sliding_max_delta_t() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::delta_t_with_window(
LARGE_DELTA_T,
WindowKind::Sliding(SlidingWindow::new(1)),
),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 1, "edge1 should have 1 item");
assert_eq!(*occ[2].items(), 0, "edge2 should have 0 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}
#[test]
fn batch_nostd_sliding_fixed_n_and_max_delta_t() {
let printer: fn(&str) = {
#[cfg(feature = "std")]
{
fn print_fn(s: &str) {
println!("--- [***Sink Output***] --- {}", s);
}
print_fn
}
#[cfg(not(feature = "std"))]
{
fn noop(_: &str) {}
noop
}
};
let node_policy_src = NodePolicy::new(
BatchingPolicy::fixed(3),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_map = NodePolicy::new(
BatchingPolicy::with_window(
Some(3),
Some(LARGE_DELTA_T),
WindowKind::Sliding(SlidingWindow::new(2)),
),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let node_policy_snk = NodePolicy::new(
BatchingPolicy::none(),
BudgetPolicy::new(None, None),
DeadlinePolicy::new(false, None, None),
);
let clock = NoStdLinuxMonotonicClock::new();
let mut src = TestCounterSourceTensor::new(
clock,
0,
TraceId::new(0u64),
SequenceNumber::new(0u64),
None,
QoSClass::BestEffort,
MessageFlags::empty(),
NodeCapabilities::default(),
node_policy_src,
[PlacementAcceptance::default()],
INGRESS_POLICY,
);
src.produce_n_items_in_backlog(16);
let map = TestIdentityModelNodeTensor::<TEST_MAX_BATCH>::new(
TestTensorBackend,
(),
node_policy_map,
NodeCapabilities::default(),
[PlacementAcceptance::default()],
[PlacementAcceptance::default()],
)
.unwrap();
let snk = TestSinkNodeTensor::new(
NodeCapabilities::default(),
node_policy_snk,
[PlacementAcceptance::default()],
printer,
);
let q0: Q32 = Q32::default();
let q1: Q32 = Q32::default();
let sink = fixed_buffer_line_writer::<2048>();
let telemetry: NoStdTestTelemetry = NoStdTestTelemetry::new(0, true, sink);
let mgr0 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mgr1 = crate::memory::static_manager::StaticMemoryManager::<TestTensor, 35>::new();
let mut graph = TestPipeline::new(src, map, snk, q0, q1, mgr0, mgr1);
let mut runtime: TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> =
TestNoStdRuntime::new();
runtime.init(&mut graph, clock, telemetry).unwrap();
for _ in 0..10 {
let _ = runtime.step(&mut graph).unwrap();
#[cfg(feature = "std")]
println!(
"--- [graph_occupancies] --- {:?}",
<TestNoStdRuntime<NoStdTestClock, NoStdTestTelemetry, 3, 3> as crate::runtime::LimenRuntime<
crate::graph::bench::TestPipeline<NoStdTestClock>,
3,
3,
>>::occupancies(&runtime)
);
}
let occ = LimenRuntime::<TestPipeline<NoStdTestClock>, 3, 3>::occupancies(&runtime);
assert_eq!(*occ[1].items(), 6, "edge1 (src->map) should have 6 items");
assert_eq!(*occ[2].items(), 6, "edge2 (map->snk) should have 6 items");
#[cfg(feature = "std")]
{
let _ = runtime.with_telemetry(|telemetry| {
use crate::prelude::Telemetry as _;
telemetry.push_metrics();
telemetry.flush();
let sink_ref = telemetry.writer();
let buffer_ref = sink_ref.inner();
println!("\n--- [telemetry buffer] ---\n{}", buffer_ref.as_str());
});
}
}