Skip to main content

wingfoil/
latency.rs

1//! Latency measurement primitives.
2//!
3//! Stamp engine timestamps onto messages as they hop through nodes (and across
4//! processes), then aggregate the deltas at the end of the pipeline.
5//!
6//! # Concepts
7//!
8//! - [`Latency`] — a `#[repr(C)]` POD record of `u64` nanosecond timestamps,
9//!   one per pipeline stage. Generated by the [`latency_stages!`] macro.
10//! - [`Stage`] — a zero-sized type identifying a single stage at compile time.
11//!   The macro generates one per field in a snake_case sub-module.
12//! - [`Traced<T, L>`] — a `#[repr(C)]` wrapper pairing a payload `T` with its
13//!   latency record `L`. Drop-in for any node graph and safe to ship through
14//!   `iceoryx2` (when both `T` and `L` are `ZeroCopySend`).
15//! - [`HasLatency`] — trait letting generic instrumentation nodes (e.g.
16//!   [`StampStream`], `LatencyReport`) read/write the embedded record.
17//!
18//! # Time source
19//!
20//! Stamps always read the wall clock (never [`GraphState::time`], which is
21//! source-driven in historical mode and would be useless for latency).
22//! Two variants:
23//!
24//! - [`LatencyStreamOps::stamp`] reads [`GraphState::wall_time`] — a
25//!   cycle-start wall-clock snap, one `u64` load, cheap. Stages sharing an
26//!   engine cycle get the same stamp.
27//! - [`LatencyStreamOps::stamp_precise`] reads
28//!   [`GraphState::wall_time_precise`] — a fresh TSC read (~5-10 ns), giving
29//!   distinct stamps to stages that run in the same engine cycle.
30//!
31//! Both variants behave identically in realtime and historical mode — the
32//! graph wiring stays the same across environments and the numbers mean
33//! "wall-clock time spent between stages". In historical mode this
34//! measures backtest replay performance; in realtime it measures production
35//! latency.
36//!
37//! # Toggling
38//!
39//! Each stamping and reporting method has an `_if` variant that takes a
40//! boolean and returns the upstream unchanged when disabled — zero runtime
41//! cost, no node inserted into the graph. Thread a single config flag
42//! through your pipeline builder to disable stamping for ultra-hot paths or
43//! backtests.
44//!
45//! # Example
46//!
47//! ```ignore
48//! use wingfoil::*;
49//!
50//! latency_stages! {
51//!     pub TradeLatency {
52//!         ingest,
53//!         decode,
54//!         strategy,
55//!         publish,
56//!     }
57//! }
58//!
59//! // Markers live in a snake_case sub-module named after the struct.
60//! fn build(stream: std::rc::Rc<dyn Stream<Traced<u64, TradeLatency>>>)
61//!     -> std::rc::Rc<dyn Stream<Traced<u64, TradeLatency>>>
62//! {
63//!     stream
64//!         .stamp::<trade_latency::ingest>()
65//!         .stamp::<trade_latency::strategy>()
66//! }
67//! ```
68
69use std::marker::PhantomData;
70use std::rc::Rc;
71
72use crate::types::*;
73
74/// Declarative macro that generates a `#[repr(C)]` named-field latency record
75/// plus per-stage marker types. See the [module docs](self) for usage.
76pub use wingfoil_derive::latency_stages;
77
78// ---------------------------------------------------------------------------
79// Core traits
80// ---------------------------------------------------------------------------
81
82/// A fixed-size, named-field collection of `u64` nanosecond timestamps.
83///
84/// Implementors must be `#[repr(C)]` packed `u64` fields (or strictly
85/// equivalent), so that the in-memory layout matches `[u64; N]` and the
86/// generated `stamps`/`stamp_mut` slice views are sound.
87///
88/// Use the [`latency_stages!`] macro to generate an implementation; rolling
89/// your own is supported but you must uphold the layout invariant.
90pub trait Latency: Element + Copy {
91    /// Number of stages.
92    const N: usize;
93    /// Stage names, in stamp order.
94    fn stage_names() -> &'static [&'static str];
95    /// Borrow the raw `[u64; N]` view of the stamps.
96    fn stamps(&self) -> &[u64];
97    /// Borrow a single stamp mutably by index. Panics if `idx >= N`.
98    fn stamp_mut(&mut self, idx: usize) -> &mut u64;
99}
100
101/// A compile-time marker identifying one stage within a [`Latency`] record.
102///
103/// The [`latency_stages!`] macro emits one zero-sized `Stage` impl per field.
104pub trait Stage<L: Latency> {
105    /// Stage name (matches the field identifier).
106    const NAME: &'static str;
107    /// Index into `L::stamps()`.
108    const INDEX: usize;
109
110    /// Write `t` (nanos) into the stage's slot.
111    #[inline]
112    fn stamp(latency: &mut L, t: u64) {
113        *latency.stamp_mut(Self::INDEX) = t;
114    }
115}
116
117/// A payload that carries an embedded [`Latency`] record.
118///
119/// Implemented automatically for [`Traced<T, L>`]. Hand-roll if you embed a
120/// latency record as a sub-field of a richer payload.
121pub trait HasLatency {
122    type L: Latency;
123    fn latency(&self) -> &Self::L;
124    fn latency_mut(&mut self) -> &mut Self::L;
125}
126
127// ---------------------------------------------------------------------------
128// Traced wrapper
129// ---------------------------------------------------------------------------
130
131/// A payload `T` paired with a latency record `L`.
132///
133/// `#[repr(C)]` with `payload` first so that, for typical payloads (alignment
134/// ≤ 8) and `L: Latency` (all `u64` fields, alignment 8), no padding is
135/// inserted between the two.
136#[repr(C)]
137#[derive(
138    Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
139)]
140pub struct Traced<T, L> {
141    pub payload: T,
142    pub latency: L,
143}
144
145impl<T, L> Traced<T, L> {
146    /// Construct a `Traced<T, L>` from a payload, defaulting the latency record.
147    #[inline]
148    pub fn new(payload: T) -> Self
149    where
150        L: Default,
151    {
152        Self {
153            payload,
154            latency: L::default(),
155        }
156    }
157
158    /// Construct from explicit payload + latency.
159    #[inline]
160    pub fn with_latency(payload: T, latency: L) -> Self {
161        Self { payload, latency }
162    }
163}
164
165impl<T, L: Latency> HasLatency for Traced<T, L> {
166    type L = L;
167    #[inline]
168    fn latency(&self) -> &L {
169        &self.latency
170    }
171    #[inline]
172    fn latency_mut(&mut self) -> &mut L {
173        &mut self.latency
174    }
175}
176
177// SAFETY: `Traced<T, L>` is `#[repr(C)]` with two fields. When both `T` and
178// `L` are themselves `ZeroCopySend`, the composite is self-contained and has
179// a uniform memory representation, satisfying the trait's invariants.
180//
181// The default `ZeroCopySend::type_name()` returns `core::any::type_name::<Self>()`,
182// which embeds the absolute Rust paths of `T` and `L`. When the same struct is
183// declared via `#[path = "..."] mod ...;` from two different binary crates (a
184// common pattern for sharing an iceoryx2 payload between two example binaries),
185// the paths differ and iceoryx2 reports `IncompatibleTypes` even though the
186// memory layouts are identical. We compose the name from `T::type_name()` and
187// `L::type_name()` so leaf overrides via `#[type_name(...)]` propagate up.
188#[cfg(feature = "iceoryx2")]
189unsafe impl<T, L> iceoryx2::prelude::ZeroCopySend for Traced<T, L>
190where
191    T: iceoryx2::prelude::ZeroCopySend,
192    L: iceoryx2::prelude::ZeroCopySend,
193{
194    unsafe fn type_name() -> &'static str {
195        traced_type_name(unsafe { T::type_name() }, unsafe { L::type_name() })
196    }
197}
198
199#[cfg(feature = "iceoryx2")]
200fn traced_type_name(t: &'static str, l: &'static str) -> &'static str {
201    use std::collections::HashMap;
202    use std::sync::{Mutex, OnceLock};
203    static CACHE: OnceLock<Mutex<HashMap<(&'static str, &'static str), &'static str>>> =
204        OnceLock::new();
205    let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
206    let mut guard = cache.lock().unwrap();
207    if let Some(s) = guard.get(&(t, l)) {
208        return s;
209    }
210    let composed: &'static str = Box::leak(format!("wingfoil::Traced<{t}, {l}>").into_boxed_str());
211    guard.insert((t, l), composed);
212    composed
213}
214
215// ---------------------------------------------------------------------------
216// StampStream / StampPreciseStream — wrapper nodes that stamp one stage
217// ---------------------------------------------------------------------------
218
219/// A node that forwards its upstream value while stamping
220/// [`GraphState::wall_time`] (cycle-start wall-clock snap) into a single
221/// named stage of the embedded [`Latency`] record.
222///
223/// One `u64` store per tick, no allocation. Stages that tick in the same
224/// engine cycle share the same timestamp — use [`StampPreciseStream`] for
225/// intra-cycle resolution.
226pub struct StampStream<P, S>
227where
228    P: Element + HasLatency,
229    S: Stage<P::L> + 'static,
230{
231    upstream: Rc<dyn Stream<P>>,
232    value: P,
233    _stage: PhantomData<fn() -> S>,
234}
235
236impl<P, S> StampStream<P, S>
237where
238    P: Element + HasLatency,
239    S: Stage<P::L> + 'static,
240{
241    pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
242        Self {
243            upstream,
244            value: P::default(),
245            _stage: PhantomData,
246        }
247    }
248}
249
250#[node(active = [upstream], output = value: P)]
251impl<P, S> MutableNode for StampStream<P, S>
252where
253    P: Element + HasLatency,
254    S: Stage<P::L> + 'static,
255{
256    fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
257        self.value = self.upstream.peek_value();
258        S::stamp(self.value.latency_mut(), state.wall_time().into());
259        Ok(true)
260    }
261}
262
263/// Like [`StampStream`] but reads [`GraphState::wall_time_precise`] — a fresh
264/// TSC snap on every tick. Costs ~5-10 ns per stamp on x86 but gives
265/// intra-cycle resolution, so stages running in the same engine cycle get
266/// distinct timestamps.
267pub struct StampPreciseStream<P, S>
268where
269    P: Element + HasLatency,
270    S: Stage<P::L> + 'static,
271{
272    upstream: Rc<dyn Stream<P>>,
273    value: P,
274    _stage: PhantomData<fn() -> S>,
275}
276
277impl<P, S> StampPreciseStream<P, S>
278where
279    P: Element + HasLatency,
280    S: Stage<P::L> + 'static,
281{
282    pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
283        Self {
284            upstream,
285            value: P::default(),
286            _stage: PhantomData,
287        }
288    }
289}
290
291#[node(active = [upstream], output = value: P)]
292impl<P, S> MutableNode for StampPreciseStream<P, S>
293where
294    P: Element + HasLatency,
295    S: Stage<P::L> + 'static,
296{
297    fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
298        self.value = self.upstream.peek_value();
299        S::stamp(self.value.latency_mut(), state.wall_time_precise().into());
300        Ok(true)
301    }
302}
303
304// ---------------------------------------------------------------------------
305// Public ergonomics: .stamp::<Stage>() on Stream<P> where P: HasLatency
306// ---------------------------------------------------------------------------
307
308/// Extension trait adding `.stamp::<Stage>()` and friends to streams whose
309/// values carry a [`Latency`] record.
310pub trait LatencyStreamOps<P>
311where
312    P: Element + HasLatency,
313{
314    /// Wrap this stream in a [`StampStream`] for stage `S`. Each tick writes
315    /// [`GraphState::wall_time`] (cycle-start snap, one `u64` store) into the
316    /// stage's slot before forwarding.
317    #[must_use]
318    fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
319    where
320        S: Stage<P::L> + 'static;
321
322    /// Conditional variant of [`stamp`](Self::stamp). When `enabled` is false,
323    /// returns `self` unchanged — no node is inserted into the graph and
324    /// there is zero runtime cost. Useful for flipping stamping on/off at
325    /// graph-construction time via a config flag.
326    #[must_use]
327    fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
328    where
329        S: Stage<P::L> + 'static;
330
331    /// Like [`stamp`](Self::stamp) but uses [`GraphState::wall_time_precise`]
332    /// (fresh TSC read, ~5-10ns) for intra-cycle resolution.
333    #[must_use]
334    fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
335    where
336        S: Stage<P::L> + 'static;
337
338    /// Conditional variant of [`stamp_precise`](Self::stamp_precise).
339    #[must_use]
340    fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
341    where
342        S: Stage<P::L> + 'static;
343}
344
345impl<P> LatencyStreamOps<P> for dyn Stream<P>
346where
347    P: Element + HasLatency + 'static,
348{
349    fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
350    where
351        S: Stage<P::L> + 'static,
352    {
353        StampStream::<P, S>::new(self.clone()).into_stream()
354    }
355
356    fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
357    where
358        S: Stage<P::L> + 'static,
359    {
360        if enabled {
361            self.stamp::<S>()
362        } else {
363            self.clone()
364        }
365    }
366
367    fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
368    where
369        S: Stage<P::L> + 'static,
370    {
371        StampPreciseStream::<P, S>::new(self.clone()).into_stream()
372    }
373
374    fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
375    where
376        S: Stage<P::L> + 'static,
377    {
378        if enabled {
379            self.stamp_precise::<S>()
380        } else {
381            self.clone()
382        }
383    }
384}
385
386// ---------------------------------------------------------------------------
387// LatencyReport — sink that aggregates per-stage delta statistics
388// ---------------------------------------------------------------------------
389
390const HISTOGRAM_BUCKETS: usize = 64;
391
392/// Fixed-size, non-allocating per-stage statistics: count, total, min, max,
393/// plus a log2-bucketed histogram for percentile estimation.
394#[derive(Clone, Copy, Debug)]
395pub struct StageStats {
396    pub count: u64,
397    pub sum_ns: u64,
398    pub min_ns: u64,
399    pub max_ns: u64,
400    /// `histogram[i]` counts deltas in `[2^i ns, 2^(i+1) ns)`.
401    /// Index 0 covers `[0, 2 ns)`.
402    pub histogram: [u64; HISTOGRAM_BUCKETS],
403}
404
405impl Default for StageStats {
406    fn default() -> Self {
407        Self {
408            count: 0,
409            sum_ns: 0,
410            min_ns: u64::MAX,
411            max_ns: 0,
412            histogram: [0; HISTOGRAM_BUCKETS],
413        }
414    }
415}
416
417impl StageStats {
418    #[inline]
419    pub fn record(&mut self, delta_ns: u64) {
420        self.count += 1;
421        self.sum_ns = self.sum_ns.saturating_add(delta_ns);
422        if delta_ns < self.min_ns {
423            self.min_ns = delta_ns;
424        }
425        if delta_ns > self.max_ns {
426            self.max_ns = delta_ns;
427        }
428        // Log2 bucket: bucket = ilog2(delta_ns + 1), capped to HISTOGRAM_BUCKETS-1.
429        let bucket = ((delta_ns + 1).ilog2() as usize).min(HISTOGRAM_BUCKETS - 1);
430        self.histogram[bucket] += 1;
431    }
432
433    /// Mean delta in nanoseconds, or 0 if no samples recorded.
434    pub fn mean_ns(&self) -> u64 {
435        self.sum_ns.checked_div(self.count).unwrap_or(0)
436    }
437
438    /// Estimate the value at quantile `q` in `[0.0, 1.0]` from the histogram.
439    /// Returns the upper bound of the bucket containing the quantile, or 0 if
440    /// no samples have been recorded.
441    pub fn quantile_ns(&self, q: f64) -> u64 {
442        if self.count == 0 {
443            return 0;
444        }
445        let target = ((self.count as f64) * q).ceil() as u64;
446        let mut cum = 0u64;
447        for (i, &n) in self.histogram.iter().enumerate() {
448            cum += n;
449            if cum >= target {
450                // Upper bound of bucket i is 2^(i+1).
451                return 1u64 << (i + 1).min(63);
452            }
453        }
454        self.max_ns
455    }
456}
457
458/// Aggregated per-stage statistics for a [`Latency`] type.
459///
460/// Records the **delta from the previous stage** for stages 1..N. Stage 0 has
461/// no predecessor and is not aggregated (its absolute timestamp is observable
462/// directly on the message).
463pub struct LatencyStats<L: Latency> {
464    /// One slot per stage; `stages[0]` is unused (no predecessor).
465    pub stages: Vec<StageStats>,
466    _phantom: PhantomData<L>,
467}
468
469impl<L: Latency> Default for LatencyStats<L> {
470    fn default() -> Self {
471        Self {
472            stages: vec![StageStats::default(); L::N],
473            _phantom: PhantomData,
474        }
475    }
476}
477
478impl<L: Latency> LatencyStats<L> {
479    pub fn new() -> Self {
480        Self::default()
481    }
482
483    /// Record one observation. Computes deltas between adjacent stages and
484    /// updates each stage's stats. Stages whose stamp is zero (unset) or
485    /// whose predecessor is zero are skipped, so partial pipelines still
486    /// produce useful numbers for the stages that did record stamps.
487    pub fn observe(&mut self, latency: &L) {
488        let stamps = latency.stamps();
489        for i in 1..L::N {
490            let prev = stamps[i - 1];
491            let cur = stamps[i];
492            if prev == 0 || cur == 0 || cur < prev {
493                continue;
494            }
495            self.stages[i].record(cur - prev);
496        }
497    }
498
499    /// Render a multi-line summary suitable for printing on shutdown.
500    pub fn format_report(&self) -> String {
501        let names = L::stage_names();
502        let mut out = String::new();
503        out.push_str("latency report (delta from previous stage, nanoseconds):\n");
504        out.push_str(&format!(
505            "  {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
506            "stage", "count", "min", "mean", "p50", "p99", "max"
507        ));
508        for i in 1..L::N {
509            let s = &self.stages[i];
510            let label = format!("{} -> {}", names[i - 1], names[i]);
511            if s.count == 0 {
512                out.push_str(&format!("  {label:<24} {:>10}\n", "(no samples)"));
513                continue;
514            }
515            out.push_str(&format!(
516                "  {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
517                label,
518                s.count,
519                s.min_ns,
520                s.mean_ns(),
521                s.quantile_ns(0.5),
522                s.quantile_ns(0.99),
523                s.max_ns,
524            ));
525        }
526        out
527    }
528}
529
530/// A sink node that consumes a stream of `P: HasLatency` and accumulates
531/// per-stage delta statistics.
532///
533/// At graph teardown the statistics are available via [`LatencyReport::stats`]
534/// or printed via [`LatencyReport::print_on_drop`] (see constructor flag).
535pub struct LatencyReport<P>
536where
537    P: Element + HasLatency,
538{
539    upstream: Rc<dyn Stream<P>>,
540    stats: Rc<std::cell::RefCell<LatencyStats<P::L>>>,
541    print_on_teardown: bool,
542}
543
544impl<P> LatencyReport<P>
545where
546    P: Element + HasLatency,
547{
548    pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
549        Self {
550            upstream,
551            stats: Rc::new(std::cell::RefCell::new(LatencyStats::new())),
552            print_on_teardown: false,
553        }
554    }
555
556    /// Print the report to stdout when the graph stops.
557    pub fn print_on_teardown(mut self, yes: bool) -> Self {
558        self.print_on_teardown = yes;
559        self
560    }
561
562    /// Shared handle to the running stats — clone before installing in the
563    /// graph if you want to read them after the run.
564    pub fn stats(&self) -> Rc<std::cell::RefCell<LatencyStats<P::L>>> {
565        self.stats.clone()
566    }
567}
568
569#[node(active = [upstream])]
570impl<P> MutableNode for LatencyReport<P>
571where
572    P: Element + HasLatency,
573{
574    fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
575        let value = self.upstream.peek_value();
576        self.stats.borrow_mut().observe(value.latency());
577        Ok(true)
578    }
579
580    fn stop(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
581        if self.print_on_teardown {
582            print!("{}", self.stats.borrow().format_report());
583        }
584        Ok(())
585    }
586}
587
588/// Extension methods to install a [`LatencyReport`] sink. Returns the
589/// report's stats handle so the caller can inspect numbers after the graph
590/// exits.
591pub trait LatencyReportOps<P>
592where
593    P: Element + HasLatency,
594{
595    /// Install a [`LatencyReport`] sink on this stream.
596    /// `print_on_teardown` controls whether a summary is printed at shutdown.
597    /// Returns `(sink_node, stats_handle)`.
598    fn latency_report(
599        self: &Rc<Self>,
600        print_on_teardown: bool,
601    ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
602
603    /// Conditional variant. When `enabled` is false, returns
604    /// `(NeverNode, empty_stats)` — the sink never ticks and the stats handle
605    /// stays at zero counts. Lets a single config flag toggle aggregation on
606    /// or off without edits to the wiring.
607    fn latency_report_if(
608        self: &Rc<Self>,
609        enabled: bool,
610        print_on_teardown: bool,
611    ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
612}
613
614impl<P> LatencyReportOps<P> for dyn Stream<P>
615where
616    P: Element + HasLatency + 'static,
617{
618    fn latency_report(
619        self: &Rc<Self>,
620        print_on_teardown: bool,
621    ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
622        let report = LatencyReport::new(self.clone()).print_on_teardown(print_on_teardown);
623        let stats = report.stats();
624        (report.into_node(), stats)
625    }
626
627    fn latency_report_if(
628        self: &Rc<Self>,
629        enabled: bool,
630        print_on_teardown: bool,
631    ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
632        if enabled {
633            self.latency_report(print_on_teardown)
634        } else {
635            let stats = Rc::new(std::cell::RefCell::new(LatencyStats::<P::L>::new()));
636            // A no-op sink: use the upstream itself as a node reference so
637            // the graph stays valid; it ticks but does nothing observable
638            // with respect to `stats`.
639            (self.clone().as_node(), stats)
640        }
641    }
642}
643
644// ---------------------------------------------------------------------------
645// Tests
646// ---------------------------------------------------------------------------
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::nodes::{CallBackStream, NodeOperators, StreamOperators};
652    use crate::queue::ValueAt;
653    use std::cell::RefCell;
654    use std::mem::{align_of, offset_of, size_of};
655
656    latency_stages! {
657        pub TradeLatency {
658            ingest,
659            decode,
660            strategy,
661            publish,
662        }
663    }
664
665    // ── Layout checks ────────────────────────────────────────────────────────
666
667    #[test]
668    fn latency_struct_layout_is_packed_u64s() {
669        assert_eq!(size_of::<TradeLatency>(), 4 * size_of::<u64>());
670        assert_eq!(align_of::<TradeLatency>(), align_of::<u64>());
671        assert_eq!(offset_of!(TradeLatency, ingest), 0);
672        assert_eq!(offset_of!(TradeLatency, decode), 8);
673        assert_eq!(offset_of!(TradeLatency, strategy), 16);
674        assert_eq!(offset_of!(TradeLatency, publish), 24);
675    }
676
677    #[test]
678    fn latency_trait_reports_n_and_names() {
679        assert_eq!(TradeLatency::N, 4);
680        assert_eq!(
681            TradeLatency::stage_names(),
682            &["ingest", "decode", "strategy", "publish"]
683        );
684    }
685
686    #[test]
687    fn stamps_slice_view_matches_named_fields() {
688        let l = TradeLatency {
689            ingest: 1,
690            decode: 2,
691            strategy: 3,
692            publish: 4,
693        };
694        assert_eq!(l.stamps(), &[1u64, 2, 3, 4]);
695    }
696
697    #[test]
698    fn stamp_mut_writes_named_field() {
699        let mut l = TradeLatency::default();
700        *l.stamp_mut(2) = 99;
701        assert_eq!(l.strategy, 99);
702    }
703
704    #[test]
705    #[should_panic(expected = "stage index out of bounds")]
706    fn stamp_mut_panics_out_of_bounds() {
707        let mut l = TradeLatency::default();
708        *l.stamp_mut(4) = 0;
709    }
710
711    // ── Stage markers ────────────────────────────────────────────────────────
712
713    #[test]
714    fn stage_markers_have_correct_index_and_name() {
715        assert_eq!(<trade_latency::ingest as Stage<TradeLatency>>::INDEX, 0);
716        assert_eq!(<trade_latency::publish as Stage<TradeLatency>>::INDEX, 3);
717        assert_eq!(
718            <trade_latency::strategy as Stage<TradeLatency>>::NAME,
719            "strategy"
720        );
721    }
722
723    #[test]
724    fn stage_stamp_writes_correct_field() {
725        let mut l = TradeLatency::default();
726        <trade_latency::strategy as Stage<TradeLatency>>::stamp(&mut l, 1234);
727        assert_eq!(l.strategy, 1234);
728        assert_eq!(l.ingest, 0);
729        assert_eq!(l.decode, 0);
730        assert_eq!(l.publish, 0);
731    }
732
733    // ── Traced wrapper ───────────────────────────────────────────────────────
734
735    #[test]
736    fn traced_layout_payload_first_no_padding_for_aligned_payload() {
737        // u64 payload + u64-aligned latency: should be densely packed.
738        let s = size_of::<Traced<u64, TradeLatency>>();
739        assert_eq!(s, size_of::<u64>() + size_of::<TradeLatency>());
740        assert_eq!(offset_of!(Traced<u64, TradeLatency>, payload), 0);
741        assert_eq!(
742            offset_of!(Traced<u64, TradeLatency>, latency),
743            size_of::<u64>()
744        );
745    }
746
747    #[test]
748    fn has_latency_round_trip() {
749        let mut t: Traced<u64, TradeLatency> = Traced::new(7);
750        t.latency_mut().strategy = 42;
751        assert_eq!(t.latency().strategy, 42);
752        assert_eq!(t.payload, 7);
753    }
754
755    // ── StampStream node ─────────────────────────────────────────────────────
756
757    #[test]
758    fn stamp_stream_writes_wall_time_into_named_stage() {
759        // Stamps use wall-clock time (state.wall_time()), so in historical
760        // mode we assert monotonicity rather than exact values.
761        let cb = Rc::new(RefCell::new(
762            CallBackStream::<Traced<u64, TradeLatency>>::new(),
763        ));
764        cb.borrow_mut().push(ValueAt::new(
765            Traced::new(11u64),
766            crate::time::NanoTime::new(100),
767        ));
768        cb.borrow_mut().push(ValueAt::new(
769            Traced::new(22u64),
770            crate::time::NanoTime::new(250),
771        ));
772
773        let stamped = cb
774            .clone()
775            .as_stream()
776            .stamp::<trade_latency::strategy>()
777            .collect();
778
779        stamped
780            .run(
781                crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
782                crate::graph::RunFor::Forever,
783            )
784            .unwrap();
785
786        let collected = stamped.peek_value();
787        assert_eq!(collected.len(), 2);
788        assert_eq!(collected[0].value.payload, 11);
789        assert_eq!(collected[1].value.payload, 22);
790        // Unused stages remain zero.
791        assert_eq!(collected[0].value.latency.ingest, 0);
792        assert_eq!(collected[1].value.latency.ingest, 0);
793        // Stamps are real wall-clock times: both non-zero, second >= first.
794        assert!(collected[0].value.latency.strategy > 0);
795        assert!(collected[1].value.latency.strategy >= collected[0].value.latency.strategy);
796    }
797
798    #[test]
799    fn stamp_works_identically_in_historical_and_realtime() {
800        // "Same wiring, swap IO adapters" — the stamp wrappers use wall_time
801        // in both modes, so the pipeline produces non-zero, monotonic stamps
802        // regardless of RunMode.
803        use std::time::Duration;
804        fn run_one(mode: crate::graph::RunMode) -> crate::time::NanoTime {
805            let stream = crate::nodes::ticker(Duration::from_millis(1))
806                .count()
807                .map(|seq: u64| Traced::<u64, TradeLatency>::new(seq))
808                .stamp::<trade_latency::ingest>()
809                .stamp_precise::<trade_latency::publish>()
810                .collect();
811            stream.run(mode, crate::graph::RunFor::Cycles(3)).unwrap();
812            let values = stream.peek_value();
813            assert!(!values.is_empty());
814            let l = values[0].value.latency;
815            assert!(l.ingest > 0, "ingest stamp should be populated");
816            assert!(l.publish >= l.ingest, "publish >= ingest");
817            crate::time::NanoTime::new(l.ingest)
818        }
819        let historical = run_one(crate::graph::RunMode::HistoricalFrom(
820            crate::time::NanoTime::ZERO,
821        ));
822        let realtime = run_one(crate::graph::RunMode::RealTime);
823        // Both modes produce wall-clock stamps — they should be within the
824        // same order of magnitude (current nanoseconds-since-epoch range).
825        assert!(u64::from(historical) > 1_000_000_000);
826        assert!(u64::from(realtime) > 1_000_000_000);
827    }
828
829    #[test]
830    fn traced_serializes_via_serde_json() {
831        // Transport-agnostic: prove the derive lets us serialize a Traced
832        // payload through a generic serde layer (serde_json here; bincode,
833        // CBOR, postcard etc all follow the same contract).
834        let original = Traced::with_latency(
835            42u32,
836            TradeLatency {
837                ingest: 100,
838                decode: 200,
839                strategy: 300,
840                publish: 400,
841            },
842        );
843        let bytes = serde_json::to_vec(&original).unwrap();
844        let round: Traced<u32, TradeLatency> = serde_json::from_slice(&bytes).unwrap();
845        assert_eq!(round, original);
846    }
847
848    #[test]
849    fn stamp_if_disabled_inserts_no_node() {
850        // stamp_if(false) must return the upstream unchanged.
851        let cb = Rc::new(RefCell::new(
852            CallBackStream::<Traced<u64, TradeLatency>>::new(),
853        ));
854        let upstream = cb.clone().as_stream();
855        let stamped = upstream.stamp_if::<trade_latency::strategy>(false);
856        assert!(
857            Rc::ptr_eq(&upstream, &stamped),
858            "stamp_if(false) should be identity"
859        );
860    }
861
862    #[test]
863    fn stamp_precise_writes_fresh_timestamps() {
864        // Two stamp_precise wrappers in series should give different stamps
865        // even in the same engine cycle — that is the whole point of _precise.
866        let cb = Rc::new(RefCell::new(
867            CallBackStream::<Traced<u64, TradeLatency>>::new(),
868        ));
869        cb.borrow_mut().push(ValueAt::new(
870            Traced::new(1u64),
871            crate::time::NanoTime::new(100),
872        ));
873
874        let stamped = cb
875            .clone()
876            .as_stream()
877            .stamp_precise::<trade_latency::ingest>()
878            .stamp_precise::<trade_latency::publish>()
879            .collect();
880
881        stamped
882            .run(
883                crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
884                crate::graph::RunFor::Forever,
885            )
886            .unwrap();
887
888        let collected = stamped.peek_value();
889        assert_eq!(collected.len(), 1);
890        let l = collected[0].value.latency;
891        assert!(l.ingest > 0);
892        assert!(l.publish >= l.ingest);
893    }
894
895    // ── LatencyStats / LatencyReport ────────────────────────────────────────
896
897    #[test]
898    fn stage_stats_records_min_mean_max() {
899        let mut s = StageStats::default();
900        s.record(10);
901        s.record(20);
902        s.record(30);
903        assert_eq!(s.count, 3);
904        assert_eq!(s.min_ns, 10);
905        assert_eq!(s.max_ns, 30);
906        assert_eq!(s.mean_ns(), 20);
907    }
908
909    #[test]
910    fn stage_stats_quantile_zero_when_empty() {
911        let s = StageStats::default();
912        assert_eq!(s.quantile_ns(0.5), 0);
913        assert_eq!(s.mean_ns(), 0);
914    }
915
916    #[test]
917    fn latency_stats_observes_deltas_between_adjacent_stages() {
918        let mut stats = LatencyStats::<TradeLatency>::new();
919        // ingest=100, decode=150, strategy=200, publish=400 → deltas 50, 50, 200
920        stats.observe(&TradeLatency {
921            ingest: 100,
922            decode: 150,
923            strategy: 200,
924            publish: 400,
925        });
926        // ingest stage has no predecessor — index 0 is unused.
927        assert_eq!(stats.stages[0].count, 0);
928        assert_eq!(stats.stages[1].count, 1);
929        assert_eq!(stats.stages[1].sum_ns, 50);
930        assert_eq!(stats.stages[2].sum_ns, 50);
931        assert_eq!(stats.stages[3].sum_ns, 200);
932    }
933
934    #[test]
935    fn latency_stats_skips_partial_stamps() {
936        // A pipeline that only stamped ingest+strategy (decode and publish unset).
937        let mut stats = LatencyStats::<TradeLatency>::new();
938        stats.observe(&TradeLatency {
939            ingest: 100,
940            decode: 0,
941            strategy: 200,
942            publish: 0,
943        });
944        // Adjacent pairs with a zero stamp are skipped → all stage counts == 0.
945        for i in 1..TradeLatency::N {
946            assert_eq!(stats.stages[i].count, 0, "stage {i} should be skipped");
947        }
948    }
949
950    #[test]
951    fn latency_report_aggregates_across_ticks() {
952        let cb = Rc::new(RefCell::new(
953            CallBackStream::<Traced<u64, TradeLatency>>::new(),
954        ));
955        // Three messages, each with full ingest→publish stamps.
956        for (i, base) in [100u64, 200, 300].iter().enumerate() {
957            cb.borrow_mut().push(ValueAt::new(
958                Traced::with_latency(
959                    i as u64,
960                    TradeLatency {
961                        ingest: *base,
962                        decode: *base + 10,
963                        strategy: *base + 30,
964                        publish: *base + 60,
965                    },
966                ),
967                crate::time::NanoTime::new(*base),
968            ));
969        }
970
971        let stream = cb.clone().as_stream();
972        let (sink, stats) = stream.latency_report(false);
973
974        sink.run(
975            crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
976            crate::graph::RunFor::Forever,
977        )
978        .unwrap();
979
980        let s = stats.borrow();
981        // 3 messages, each contributes one delta per non-zero stage transition.
982        assert_eq!(s.stages[1].count, 3); // ingest → decode (10ns each)
983        assert_eq!(s.stages[1].mean_ns(), 10);
984        assert_eq!(s.stages[2].count, 3); // decode → strategy (20ns each)
985        assert_eq!(s.stages[2].mean_ns(), 20);
986        assert_eq!(s.stages[3].count, 3); // strategy → publish (30ns each)
987        assert_eq!(s.stages[3].mean_ns(), 30);
988    }
989
990    #[test]
991    fn format_report_renders_named_stages() {
992        let mut stats = LatencyStats::<TradeLatency>::new();
993        stats.observe(&TradeLatency {
994            ingest: 100,
995            decode: 200,
996            strategy: 400,
997            publish: 800,
998        });
999        let report = stats.format_report();
1000        assert!(report.contains("ingest -> decode"));
1001        assert!(report.contains("decode -> strategy"));
1002        assert!(report.contains("strategy -> publish"));
1003    }
1004
1005    #[test]
1006    fn multiple_stamps_compose() {
1007        let cb = Rc::new(RefCell::new(
1008            CallBackStream::<Traced<u64, TradeLatency>>::new(),
1009        ));
1010        cb.borrow_mut().push(ValueAt::new(
1011            Traced::new(1u64),
1012            crate::time::NanoTime::new(50),
1013        ));
1014
1015        let stamped = cb
1016            .clone()
1017            .as_stream()
1018            .stamp::<trade_latency::ingest>()
1019            .stamp::<trade_latency::strategy>()
1020            .stamp::<trade_latency::publish>()
1021            .collect();
1022
1023        stamped
1024            .run(
1025                crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
1026                crate::graph::RunFor::Forever,
1027            )
1028            .unwrap();
1029
1030        let collected = stamped.peek_value();
1031        assert_eq!(collected.len(), 1);
1032        let l = collected[0].value.latency;
1033        // All three cached-stamp wrappers run in the same engine cycle and
1034        // share the cycle-start wall_time snap.
1035        assert!(l.ingest > 0);
1036        assert_eq!(l.strategy, l.ingest);
1037        assert_eq!(l.publish, l.ingest);
1038        assert_eq!(l.decode, 0);
1039    }
1040
1041    // The Traced<T, L>::type_name() override propagates leaf overrides up so
1042    // that two binaries declaring the same payload via `#[path]`-included
1043    // modules can still match an iceoryx2 service. This test pins the
1044    // composed-name format and confirms the macro's `#[type_name(...)]`
1045    // attribute is honoured.
1046    #[cfg(feature = "iceoryx2")]
1047    mod type_name_propagation {
1048        use super::*;
1049        use iceoryx2::prelude::ZeroCopySend;
1050
1051        #[repr(C)]
1052        #[derive(Debug, Clone, Copy, Default, ZeroCopySend)]
1053        #[type_name("test::Payload")]
1054        struct Payload {
1055            v: u64,
1056        }
1057
1058        latency_stages! {
1059            #[type_name("test::PinnedLatency")]
1060            pub PinnedLatency {
1061                a,
1062                b,
1063            }
1064        }
1065
1066        #[test]
1067        fn leaf_overrides_propagate_through_traced() {
1068            assert_eq!(unsafe { Payload::type_name() }, "test::Payload");
1069            assert_eq!(unsafe { PinnedLatency::type_name() }, "test::PinnedLatency");
1070            assert_eq!(
1071                unsafe { Traced::<Payload, PinnedLatency>::type_name() },
1072                "wingfoil::Traced<test::Payload, test::PinnedLatency>",
1073            );
1074        }
1075    }
1076}