use std::marker::PhantomData;
use std::rc::Rc;
use crate::types::*;
pub use wingfoil_derive::latency_stages;
pub trait Latency: Element + Copy {
const N: usize;
fn stage_names() -> &'static [&'static str];
fn stamps(&self) -> &[u64];
fn stamp_mut(&mut self, idx: usize) -> &mut u64;
}
pub trait Stage<L: Latency> {
const NAME: &'static str;
const INDEX: usize;
#[inline]
fn stamp(latency: &mut L, t: u64) {
*latency.stamp_mut(Self::INDEX) = t;
}
}
pub trait HasLatency {
type L: Latency;
fn latency(&self) -> &Self::L;
fn latency_mut(&mut self) -> &mut Self::L;
}
#[repr(C)]
#[derive(
Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct Traced<T, L> {
pub payload: T,
pub latency: L,
}
impl<T, L> Traced<T, L> {
#[inline]
pub fn new(payload: T) -> Self
where
L: Default,
{
Self {
payload,
latency: L::default(),
}
}
#[inline]
pub fn with_latency(payload: T, latency: L) -> Self {
Self { payload, latency }
}
}
impl<T, L: Latency> HasLatency for Traced<T, L> {
type L = L;
#[inline]
fn latency(&self) -> &L {
&self.latency
}
#[inline]
fn latency_mut(&mut self) -> &mut L {
&mut self.latency
}
}
#[cfg(feature = "iceoryx2")]
unsafe impl<T, L> iceoryx2::prelude::ZeroCopySend for Traced<T, L>
where
T: iceoryx2::prelude::ZeroCopySend,
L: iceoryx2::prelude::ZeroCopySend,
{
unsafe fn type_name() -> &'static str {
traced_type_name(unsafe { T::type_name() }, unsafe { L::type_name() })
}
}
#[cfg(feature = "iceoryx2")]
fn traced_type_name(t: &'static str, l: &'static str) -> &'static str {
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
static CACHE: OnceLock<Mutex<HashMap<(&'static str, &'static str), &'static str>>> =
OnceLock::new();
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
let mut guard = cache.lock().unwrap();
if let Some(s) = guard.get(&(t, l)) {
return s;
}
let composed: &'static str = Box::leak(format!("wingfoil::Traced<{t}, {l}>").into_boxed_str());
guard.insert((t, l), composed);
composed
}
pub struct StampStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
upstream: Rc<dyn Stream<P>>,
value: P,
_stage: PhantomData<fn() -> S>,
}
impl<P, S> StampStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
Self {
upstream,
value: P::default(),
_stage: PhantomData,
}
}
}
#[node(active = [upstream], output = value: P)]
impl<P, S> MutableNode for StampStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.value = self.upstream.peek_value();
S::stamp(self.value.latency_mut(), state.wall_time().into());
Ok(true)
}
}
pub struct StampPreciseStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
upstream: Rc<dyn Stream<P>>,
value: P,
_stage: PhantomData<fn() -> S>,
}
impl<P, S> StampPreciseStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
Self {
upstream,
value: P::default(),
_stage: PhantomData,
}
}
}
#[node(active = [upstream], output = value: P)]
impl<P, S> MutableNode for StampPreciseStream<P, S>
where
P: Element + HasLatency,
S: Stage<P::L> + 'static,
{
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.value = self.upstream.peek_value();
S::stamp(self.value.latency_mut(), state.wall_time_precise().into());
Ok(true)
}
}
pub trait LatencyStreamOps<P>
where
P: Element + HasLatency,
{
#[must_use]
fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static;
#[must_use]
fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static;
#[must_use]
fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static;
#[must_use]
fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static;
}
impl<P> LatencyStreamOps<P> for dyn Stream<P>
where
P: Element + HasLatency + 'static,
{
fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static,
{
StampStream::<P, S>::new(self.clone()).into_stream()
}
fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static,
{
if enabled {
self.stamp::<S>()
} else {
self.clone()
}
}
fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static,
{
StampPreciseStream::<P, S>::new(self.clone()).into_stream()
}
fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
where
S: Stage<P::L> + 'static,
{
if enabled {
self.stamp_precise::<S>()
} else {
self.clone()
}
}
}
const HISTOGRAM_BUCKETS: usize = 64;
#[derive(Clone, Copy, Debug)]
pub struct StageStats {
pub count: u64,
pub sum_ns: u64,
pub min_ns: u64,
pub max_ns: u64,
pub histogram: [u64; HISTOGRAM_BUCKETS],
}
impl Default for StageStats {
fn default() -> Self {
Self {
count: 0,
sum_ns: 0,
min_ns: u64::MAX,
max_ns: 0,
histogram: [0; HISTOGRAM_BUCKETS],
}
}
}
impl StageStats {
#[inline]
pub fn record(&mut self, delta_ns: u64) {
self.count += 1;
self.sum_ns = self.sum_ns.saturating_add(delta_ns);
if delta_ns < self.min_ns {
self.min_ns = delta_ns;
}
if delta_ns > self.max_ns {
self.max_ns = delta_ns;
}
let bucket = ((delta_ns + 1).ilog2() as usize).min(HISTOGRAM_BUCKETS - 1);
self.histogram[bucket] += 1;
}
pub fn mean_ns(&self) -> u64 {
self.sum_ns.checked_div(self.count).unwrap_or(0)
}
pub fn quantile_ns(&self, q: f64) -> u64 {
if self.count == 0 {
return 0;
}
let target = ((self.count as f64) * q).ceil() as u64;
let mut cum = 0u64;
for (i, &n) in self.histogram.iter().enumerate() {
cum += n;
if cum >= target {
return 1u64 << (i + 1).min(63);
}
}
self.max_ns
}
}
pub struct LatencyStats<L: Latency> {
pub stages: Vec<StageStats>,
_phantom: PhantomData<L>,
}
impl<L: Latency> Default for LatencyStats<L> {
fn default() -> Self {
Self {
stages: vec![StageStats::default(); L::N],
_phantom: PhantomData,
}
}
}
impl<L: Latency> LatencyStats<L> {
pub fn new() -> Self {
Self::default()
}
pub fn observe(&mut self, latency: &L) {
let stamps = latency.stamps();
for i in 1..L::N {
let prev = stamps[i - 1];
let cur = stamps[i];
if prev == 0 || cur == 0 || cur < prev {
continue;
}
self.stages[i].record(cur - prev);
}
}
pub fn format_report(&self) -> String {
let names = L::stage_names();
let mut out = String::new();
out.push_str("latency report (delta from previous stage, nanoseconds):\n");
out.push_str(&format!(
" {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
"stage", "count", "min", "mean", "p50", "p99", "max"
));
for i in 1..L::N {
let s = &self.stages[i];
let label = format!("{} -> {}", names[i - 1], names[i]);
if s.count == 0 {
out.push_str(&format!(" {label:<24} {:>10}\n", "(no samples)"));
continue;
}
out.push_str(&format!(
" {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
label,
s.count,
s.min_ns,
s.mean_ns(),
s.quantile_ns(0.5),
s.quantile_ns(0.99),
s.max_ns,
));
}
out
}
}
pub struct LatencyReport<P>
where
P: Element + HasLatency,
{
upstream: Rc<dyn Stream<P>>,
stats: Rc<std::cell::RefCell<LatencyStats<P::L>>>,
print_on_teardown: bool,
}
impl<P> LatencyReport<P>
where
P: Element + HasLatency,
{
pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
Self {
upstream,
stats: Rc::new(std::cell::RefCell::new(LatencyStats::new())),
print_on_teardown: false,
}
}
pub fn print_on_teardown(mut self, yes: bool) -> Self {
self.print_on_teardown = yes;
self
}
pub fn stats(&self) -> Rc<std::cell::RefCell<LatencyStats<P::L>>> {
self.stats.clone()
}
}
#[node(active = [upstream])]
impl<P> MutableNode for LatencyReport<P>
where
P: Element + HasLatency,
{
fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
let value = self.upstream.peek_value();
self.stats.borrow_mut().observe(value.latency());
Ok(true)
}
fn stop(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
if self.print_on_teardown {
print!("{}", self.stats.borrow().format_report());
}
Ok(())
}
}
pub trait LatencyReportOps<P>
where
P: Element + HasLatency,
{
fn latency_report(
self: &Rc<Self>,
print_on_teardown: bool,
) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
fn latency_report_if(
self: &Rc<Self>,
enabled: bool,
print_on_teardown: bool,
) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
}
impl<P> LatencyReportOps<P> for dyn Stream<P>
where
P: Element + HasLatency + 'static,
{
fn latency_report(
self: &Rc<Self>,
print_on_teardown: bool,
) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
let report = LatencyReport::new(self.clone()).print_on_teardown(print_on_teardown);
let stats = report.stats();
(report.into_node(), stats)
}
fn latency_report_if(
self: &Rc<Self>,
enabled: bool,
print_on_teardown: bool,
) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
if enabled {
self.latency_report(print_on_teardown)
} else {
let stats = Rc::new(std::cell::RefCell::new(LatencyStats::<P::L>::new()));
(self.clone().as_node(), stats)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::nodes::{CallBackStream, NodeOperators, StreamOperators};
use crate::queue::ValueAt;
use std::cell::RefCell;
use std::mem::{align_of, offset_of, size_of};
latency_stages! {
pub TradeLatency {
ingest,
decode,
strategy,
publish,
}
}
#[test]
fn latency_struct_layout_is_packed_u64s() {
assert_eq!(size_of::<TradeLatency>(), 4 * size_of::<u64>());
assert_eq!(align_of::<TradeLatency>(), align_of::<u64>());
assert_eq!(offset_of!(TradeLatency, ingest), 0);
assert_eq!(offset_of!(TradeLatency, decode), 8);
assert_eq!(offset_of!(TradeLatency, strategy), 16);
assert_eq!(offset_of!(TradeLatency, publish), 24);
}
#[test]
fn latency_trait_reports_n_and_names() {
assert_eq!(TradeLatency::N, 4);
assert_eq!(
TradeLatency::stage_names(),
&["ingest", "decode", "strategy", "publish"]
);
}
#[test]
fn stamps_slice_view_matches_named_fields() {
let l = TradeLatency {
ingest: 1,
decode: 2,
strategy: 3,
publish: 4,
};
assert_eq!(l.stamps(), &[1u64, 2, 3, 4]);
}
#[test]
fn stamp_mut_writes_named_field() {
let mut l = TradeLatency::default();
*l.stamp_mut(2) = 99;
assert_eq!(l.strategy, 99);
}
#[test]
#[should_panic(expected = "stage index out of bounds")]
fn stamp_mut_panics_out_of_bounds() {
let mut l = TradeLatency::default();
*l.stamp_mut(4) = 0;
}
#[test]
fn stage_markers_have_correct_index_and_name() {
assert_eq!(<trade_latency::ingest as Stage<TradeLatency>>::INDEX, 0);
assert_eq!(<trade_latency::publish as Stage<TradeLatency>>::INDEX, 3);
assert_eq!(
<trade_latency::strategy as Stage<TradeLatency>>::NAME,
"strategy"
);
}
#[test]
fn stage_stamp_writes_correct_field() {
let mut l = TradeLatency::default();
<trade_latency::strategy as Stage<TradeLatency>>::stamp(&mut l, 1234);
assert_eq!(l.strategy, 1234);
assert_eq!(l.ingest, 0);
assert_eq!(l.decode, 0);
assert_eq!(l.publish, 0);
}
#[test]
fn traced_layout_payload_first_no_padding_for_aligned_payload() {
let s = size_of::<Traced<u64, TradeLatency>>();
assert_eq!(s, size_of::<u64>() + size_of::<TradeLatency>());
assert_eq!(offset_of!(Traced<u64, TradeLatency>, payload), 0);
assert_eq!(
offset_of!(Traced<u64, TradeLatency>, latency),
size_of::<u64>()
);
}
#[test]
fn has_latency_round_trip() {
let mut t: Traced<u64, TradeLatency> = Traced::new(7);
t.latency_mut().strategy = 42;
assert_eq!(t.latency().strategy, 42);
assert_eq!(t.payload, 7);
}
#[test]
fn stamp_stream_writes_wall_time_into_named_stage() {
let cb = Rc::new(RefCell::new(
CallBackStream::<Traced<u64, TradeLatency>>::new(),
));
cb.borrow_mut().push(ValueAt::new(
Traced::new(11u64),
crate::time::NanoTime::new(100),
));
cb.borrow_mut().push(ValueAt::new(
Traced::new(22u64),
crate::time::NanoTime::new(250),
));
let stamped = cb
.clone()
.as_stream()
.stamp::<trade_latency::strategy>()
.collect();
stamped
.run(
crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
crate::graph::RunFor::Forever,
)
.unwrap();
let collected = stamped.peek_value();
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].value.payload, 11);
assert_eq!(collected[1].value.payload, 22);
assert_eq!(collected[0].value.latency.ingest, 0);
assert_eq!(collected[1].value.latency.ingest, 0);
assert!(collected[0].value.latency.strategy > 0);
assert!(collected[1].value.latency.strategy >= collected[0].value.latency.strategy);
}
#[test]
fn stamp_works_identically_in_historical_and_realtime() {
use std::time::Duration;
fn run_one(mode: crate::graph::RunMode) -> crate::time::NanoTime {
let stream = crate::nodes::ticker(Duration::from_millis(1))
.count()
.map(|seq: u64| Traced::<u64, TradeLatency>::new(seq))
.stamp::<trade_latency::ingest>()
.stamp_precise::<trade_latency::publish>()
.collect();
stream.run(mode, crate::graph::RunFor::Cycles(3)).unwrap();
let values = stream.peek_value();
assert!(!values.is_empty());
let l = values[0].value.latency;
assert!(l.ingest > 0, "ingest stamp should be populated");
assert!(l.publish >= l.ingest, "publish >= ingest");
crate::time::NanoTime::new(l.ingest)
}
let historical = run_one(crate::graph::RunMode::HistoricalFrom(
crate::time::NanoTime::ZERO,
));
let realtime = run_one(crate::graph::RunMode::RealTime);
assert!(u64::from(historical) > 1_000_000_000);
assert!(u64::from(realtime) > 1_000_000_000);
}
#[test]
fn traced_serializes_via_serde_json() {
let original = Traced::with_latency(
42u32,
TradeLatency {
ingest: 100,
decode: 200,
strategy: 300,
publish: 400,
},
);
let bytes = serde_json::to_vec(&original).unwrap();
let round: Traced<u32, TradeLatency> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(round, original);
}
#[test]
fn stamp_if_disabled_inserts_no_node() {
let cb = Rc::new(RefCell::new(
CallBackStream::<Traced<u64, TradeLatency>>::new(),
));
let upstream = cb.clone().as_stream();
let stamped = upstream.stamp_if::<trade_latency::strategy>(false);
assert!(
Rc::ptr_eq(&upstream, &stamped),
"stamp_if(false) should be identity"
);
}
#[test]
fn stamp_precise_writes_fresh_timestamps() {
let cb = Rc::new(RefCell::new(
CallBackStream::<Traced<u64, TradeLatency>>::new(),
));
cb.borrow_mut().push(ValueAt::new(
Traced::new(1u64),
crate::time::NanoTime::new(100),
));
let stamped = cb
.clone()
.as_stream()
.stamp_precise::<trade_latency::ingest>()
.stamp_precise::<trade_latency::publish>()
.collect();
stamped
.run(
crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
crate::graph::RunFor::Forever,
)
.unwrap();
let collected = stamped.peek_value();
assert_eq!(collected.len(), 1);
let l = collected[0].value.latency;
assert!(l.ingest > 0);
assert!(l.publish >= l.ingest);
}
#[test]
fn stage_stats_records_min_mean_max() {
let mut s = StageStats::default();
s.record(10);
s.record(20);
s.record(30);
assert_eq!(s.count, 3);
assert_eq!(s.min_ns, 10);
assert_eq!(s.max_ns, 30);
assert_eq!(s.mean_ns(), 20);
}
#[test]
fn stage_stats_quantile_zero_when_empty() {
let s = StageStats::default();
assert_eq!(s.quantile_ns(0.5), 0);
assert_eq!(s.mean_ns(), 0);
}
#[test]
fn latency_stats_observes_deltas_between_adjacent_stages() {
let mut stats = LatencyStats::<TradeLatency>::new();
stats.observe(&TradeLatency {
ingest: 100,
decode: 150,
strategy: 200,
publish: 400,
});
assert_eq!(stats.stages[0].count, 0);
assert_eq!(stats.stages[1].count, 1);
assert_eq!(stats.stages[1].sum_ns, 50);
assert_eq!(stats.stages[2].sum_ns, 50);
assert_eq!(stats.stages[3].sum_ns, 200);
}
#[test]
fn latency_stats_skips_partial_stamps() {
let mut stats = LatencyStats::<TradeLatency>::new();
stats.observe(&TradeLatency {
ingest: 100,
decode: 0,
strategy: 200,
publish: 0,
});
for i in 1..TradeLatency::N {
assert_eq!(stats.stages[i].count, 0, "stage {i} should be skipped");
}
}
#[test]
fn latency_report_aggregates_across_ticks() {
let cb = Rc::new(RefCell::new(
CallBackStream::<Traced<u64, TradeLatency>>::new(),
));
for (i, base) in [100u64, 200, 300].iter().enumerate() {
cb.borrow_mut().push(ValueAt::new(
Traced::with_latency(
i as u64,
TradeLatency {
ingest: *base,
decode: *base + 10,
strategy: *base + 30,
publish: *base + 60,
},
),
crate::time::NanoTime::new(*base),
));
}
let stream = cb.clone().as_stream();
let (sink, stats) = stream.latency_report(false);
sink.run(
crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
crate::graph::RunFor::Forever,
)
.unwrap();
let s = stats.borrow();
assert_eq!(s.stages[1].count, 3); assert_eq!(s.stages[1].mean_ns(), 10);
assert_eq!(s.stages[2].count, 3); assert_eq!(s.stages[2].mean_ns(), 20);
assert_eq!(s.stages[3].count, 3); assert_eq!(s.stages[3].mean_ns(), 30);
}
#[test]
fn format_report_renders_named_stages() {
let mut stats = LatencyStats::<TradeLatency>::new();
stats.observe(&TradeLatency {
ingest: 100,
decode: 200,
strategy: 400,
publish: 800,
});
let report = stats.format_report();
assert!(report.contains("ingest -> decode"));
assert!(report.contains("decode -> strategy"));
assert!(report.contains("strategy -> publish"));
}
#[test]
fn multiple_stamps_compose() {
let cb = Rc::new(RefCell::new(
CallBackStream::<Traced<u64, TradeLatency>>::new(),
));
cb.borrow_mut().push(ValueAt::new(
Traced::new(1u64),
crate::time::NanoTime::new(50),
));
let stamped = cb
.clone()
.as_stream()
.stamp::<trade_latency::ingest>()
.stamp::<trade_latency::strategy>()
.stamp::<trade_latency::publish>()
.collect();
stamped
.run(
crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
crate::graph::RunFor::Forever,
)
.unwrap();
let collected = stamped.peek_value();
assert_eq!(collected.len(), 1);
let l = collected[0].value.latency;
assert!(l.ingest > 0);
assert_eq!(l.strategy, l.ingest);
assert_eq!(l.publish, l.ingest);
assert_eq!(l.decode, 0);
}
#[cfg(feature = "iceoryx2")]
mod type_name_propagation {
use super::*;
use iceoryx2::prelude::ZeroCopySend;
#[repr(C)]
#[derive(Debug, Clone, Copy, Default, ZeroCopySend)]
#[type_name("test::Payload")]
struct Payload {
v: u64,
}
latency_stages! {
#[type_name("test::PinnedLatency")]
pub PinnedLatency {
a,
b,
}
}
#[test]
fn leaf_overrides_propagate_through_traced() {
assert_eq!(unsafe { Payload::type_name() }, "test::Payload");
assert_eq!(unsafe { PinnedLatency::type_name() }, "test::PinnedLatency");
assert_eq!(
unsafe { Traced::<Payload, PinnedLatency>::type_name() },
"wingfoil::Traced<test::Payload, test::PinnedLatency>",
);
}
}
}