1use std::marker::PhantomData;
70use std::rc::Rc;
71
72use crate::types::*;
73
74pub use wingfoil_derive::latency_stages;
77
78pub trait Latency: Element + Copy {
91 const N: usize;
93 fn stage_names() -> &'static [&'static str];
95 fn stamps(&self) -> &[u64];
97 fn stamp_mut(&mut self, idx: usize) -> &mut u64;
99}
100
101pub trait Stage<L: Latency> {
105 const NAME: &'static str;
107 const INDEX: usize;
109
110 #[inline]
112 fn stamp(latency: &mut L, t: u64) {
113 *latency.stamp_mut(Self::INDEX) = t;
114 }
115}
116
117pub trait HasLatency {
122 type L: Latency;
123 fn latency(&self) -> &Self::L;
124 fn latency_mut(&mut self) -> &mut Self::L;
125}
126
127#[repr(C)]
137#[derive(
138 Clone, Copy, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
139)]
140pub struct Traced<T, L> {
141 pub payload: T,
142 pub latency: L,
143}
144
145impl<T, L> Traced<T, L> {
146 #[inline]
148 pub fn new(payload: T) -> Self
149 where
150 L: Default,
151 {
152 Self {
153 payload,
154 latency: L::default(),
155 }
156 }
157
158 #[inline]
160 pub fn with_latency(payload: T, latency: L) -> Self {
161 Self { payload, latency }
162 }
163}
164
165impl<T, L: Latency> HasLatency for Traced<T, L> {
166 type L = L;
167 #[inline]
168 fn latency(&self) -> &L {
169 &self.latency
170 }
171 #[inline]
172 fn latency_mut(&mut self) -> &mut L {
173 &mut self.latency
174 }
175}
176
177#[cfg(feature = "iceoryx2")]
189unsafe impl<T, L> iceoryx2::prelude::ZeroCopySend for Traced<T, L>
190where
191 T: iceoryx2::prelude::ZeroCopySend,
192 L: iceoryx2::prelude::ZeroCopySend,
193{
194 unsafe fn type_name() -> &'static str {
195 traced_type_name(unsafe { T::type_name() }, unsafe { L::type_name() })
196 }
197}
198
199#[cfg(feature = "iceoryx2")]
200fn traced_type_name(t: &'static str, l: &'static str) -> &'static str {
201 use std::collections::HashMap;
202 use std::sync::{Mutex, OnceLock};
203 static CACHE: OnceLock<Mutex<HashMap<(&'static str, &'static str), &'static str>>> =
204 OnceLock::new();
205 let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
206 let mut guard = cache.lock().unwrap();
207 if let Some(s) = guard.get(&(t, l)) {
208 return s;
209 }
210 let composed: &'static str = Box::leak(format!("wingfoil::Traced<{t}, {l}>").into_boxed_str());
211 guard.insert((t, l), composed);
212 composed
213}
214
215pub struct StampStream<P, S>
227where
228 P: Element + HasLatency,
229 S: Stage<P::L> + 'static,
230{
231 upstream: Rc<dyn Stream<P>>,
232 value: P,
233 _stage: PhantomData<fn() -> S>,
234}
235
236impl<P, S> StampStream<P, S>
237where
238 P: Element + HasLatency,
239 S: Stage<P::L> + 'static,
240{
241 pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
242 Self {
243 upstream,
244 value: P::default(),
245 _stage: PhantomData,
246 }
247 }
248}
249
250#[node(active = [upstream], output = value: P)]
251impl<P, S> MutableNode for StampStream<P, S>
252where
253 P: Element + HasLatency,
254 S: Stage<P::L> + 'static,
255{
256 fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
257 self.value = self.upstream.peek_value();
258 S::stamp(self.value.latency_mut(), state.wall_time().into());
259 Ok(true)
260 }
261}
262
263pub struct StampPreciseStream<P, S>
268where
269 P: Element + HasLatency,
270 S: Stage<P::L> + 'static,
271{
272 upstream: Rc<dyn Stream<P>>,
273 value: P,
274 _stage: PhantomData<fn() -> S>,
275}
276
277impl<P, S> StampPreciseStream<P, S>
278where
279 P: Element + HasLatency,
280 S: Stage<P::L> + 'static,
281{
282 pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
283 Self {
284 upstream,
285 value: P::default(),
286 _stage: PhantomData,
287 }
288 }
289}
290
291#[node(active = [upstream], output = value: P)]
292impl<P, S> MutableNode for StampPreciseStream<P, S>
293where
294 P: Element + HasLatency,
295 S: Stage<P::L> + 'static,
296{
297 fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
298 self.value = self.upstream.peek_value();
299 S::stamp(self.value.latency_mut(), state.wall_time_precise().into());
300 Ok(true)
301 }
302}
303
304pub trait LatencyStreamOps<P>
311where
312 P: Element + HasLatency,
313{
314 #[must_use]
318 fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
319 where
320 S: Stage<P::L> + 'static;
321
322 #[must_use]
327 fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
328 where
329 S: Stage<P::L> + 'static;
330
331 #[must_use]
334 fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
335 where
336 S: Stage<P::L> + 'static;
337
338 #[must_use]
340 fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
341 where
342 S: Stage<P::L> + 'static;
343}
344
345impl<P> LatencyStreamOps<P> for dyn Stream<P>
346where
347 P: Element + HasLatency + 'static,
348{
349 fn stamp<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
350 where
351 S: Stage<P::L> + 'static,
352 {
353 StampStream::<P, S>::new(self.clone()).into_stream()
354 }
355
356 fn stamp_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
357 where
358 S: Stage<P::L> + 'static,
359 {
360 if enabled {
361 self.stamp::<S>()
362 } else {
363 self.clone()
364 }
365 }
366
367 fn stamp_precise<S>(self: &Rc<Self>) -> Rc<dyn Stream<P>>
368 where
369 S: Stage<P::L> + 'static,
370 {
371 StampPreciseStream::<P, S>::new(self.clone()).into_stream()
372 }
373
374 fn stamp_precise_if<S>(self: &Rc<Self>, enabled: bool) -> Rc<dyn Stream<P>>
375 where
376 S: Stage<P::L> + 'static,
377 {
378 if enabled {
379 self.stamp_precise::<S>()
380 } else {
381 self.clone()
382 }
383 }
384}
385
386const HISTOGRAM_BUCKETS: usize = 64;
391
392#[derive(Clone, Copy, Debug)]
395pub struct StageStats {
396 pub count: u64,
397 pub sum_ns: u64,
398 pub min_ns: u64,
399 pub max_ns: u64,
400 pub histogram: [u64; HISTOGRAM_BUCKETS],
403}
404
405impl Default for StageStats {
406 fn default() -> Self {
407 Self {
408 count: 0,
409 sum_ns: 0,
410 min_ns: u64::MAX,
411 max_ns: 0,
412 histogram: [0; HISTOGRAM_BUCKETS],
413 }
414 }
415}
416
417impl StageStats {
418 #[inline]
419 pub fn record(&mut self, delta_ns: u64) {
420 self.count += 1;
421 self.sum_ns = self.sum_ns.saturating_add(delta_ns);
422 if delta_ns < self.min_ns {
423 self.min_ns = delta_ns;
424 }
425 if delta_ns > self.max_ns {
426 self.max_ns = delta_ns;
427 }
428 let bucket = ((delta_ns + 1).ilog2() as usize).min(HISTOGRAM_BUCKETS - 1);
430 self.histogram[bucket] += 1;
431 }
432
433 pub fn mean_ns(&self) -> u64 {
435 self.sum_ns.checked_div(self.count).unwrap_or(0)
436 }
437
438 pub fn quantile_ns(&self, q: f64) -> u64 {
442 if self.count == 0 {
443 return 0;
444 }
445 let target = ((self.count as f64) * q).ceil() as u64;
446 let mut cum = 0u64;
447 for (i, &n) in self.histogram.iter().enumerate() {
448 cum += n;
449 if cum >= target {
450 return 1u64 << (i + 1).min(63);
452 }
453 }
454 self.max_ns
455 }
456}
457
458pub struct LatencyStats<L: Latency> {
464 pub stages: Vec<StageStats>,
466 _phantom: PhantomData<L>,
467}
468
469impl<L: Latency> Default for LatencyStats<L> {
470 fn default() -> Self {
471 Self {
472 stages: vec![StageStats::default(); L::N],
473 _phantom: PhantomData,
474 }
475 }
476}
477
478impl<L: Latency> LatencyStats<L> {
479 pub fn new() -> Self {
480 Self::default()
481 }
482
483 pub fn observe(&mut self, latency: &L) {
488 let stamps = latency.stamps();
489 for i in 1..L::N {
490 let prev = stamps[i - 1];
491 let cur = stamps[i];
492 if prev == 0 || cur == 0 || cur < prev {
493 continue;
494 }
495 self.stages[i].record(cur - prev);
496 }
497 }
498
499 pub fn format_report(&self) -> String {
501 let names = L::stage_names();
502 let mut out = String::new();
503 out.push_str("latency report (delta from previous stage, nanoseconds):\n");
504 out.push_str(&format!(
505 " {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
506 "stage", "count", "min", "mean", "p50", "p99", "max"
507 ));
508 for i in 1..L::N {
509 let s = &self.stages[i];
510 let label = format!("{} -> {}", names[i - 1], names[i]);
511 if s.count == 0 {
512 out.push_str(&format!(" {label:<24} {:>10}\n", "(no samples)"));
513 continue;
514 }
515 out.push_str(&format!(
516 " {:<24} {:>10} {:>12} {:>12} {:>12} {:>12} {:>12}\n",
517 label,
518 s.count,
519 s.min_ns,
520 s.mean_ns(),
521 s.quantile_ns(0.5),
522 s.quantile_ns(0.99),
523 s.max_ns,
524 ));
525 }
526 out
527 }
528}
529
530pub struct LatencyReport<P>
536where
537 P: Element + HasLatency,
538{
539 upstream: Rc<dyn Stream<P>>,
540 stats: Rc<std::cell::RefCell<LatencyStats<P::L>>>,
541 print_on_teardown: bool,
542}
543
544impl<P> LatencyReport<P>
545where
546 P: Element + HasLatency,
547{
548 pub fn new(upstream: Rc<dyn Stream<P>>) -> Self {
549 Self {
550 upstream,
551 stats: Rc::new(std::cell::RefCell::new(LatencyStats::new())),
552 print_on_teardown: false,
553 }
554 }
555
556 pub fn print_on_teardown(mut self, yes: bool) -> Self {
558 self.print_on_teardown = yes;
559 self
560 }
561
562 pub fn stats(&self) -> Rc<std::cell::RefCell<LatencyStats<P::L>>> {
565 self.stats.clone()
566 }
567}
568
569#[node(active = [upstream])]
570impl<P> MutableNode for LatencyReport<P>
571where
572 P: Element + HasLatency,
573{
574 fn cycle(&mut self, _state: &mut GraphState) -> anyhow::Result<bool> {
575 let value = self.upstream.peek_value();
576 self.stats.borrow_mut().observe(value.latency());
577 Ok(true)
578 }
579
580 fn stop(&mut self, _state: &mut GraphState) -> anyhow::Result<()> {
581 if self.print_on_teardown {
582 print!("{}", self.stats.borrow().format_report());
583 }
584 Ok(())
585 }
586}
587
588pub trait LatencyReportOps<P>
592where
593 P: Element + HasLatency,
594{
595 fn latency_report(
599 self: &Rc<Self>,
600 print_on_teardown: bool,
601 ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
602
603 fn latency_report_if(
608 self: &Rc<Self>,
609 enabled: bool,
610 print_on_teardown: bool,
611 ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>);
612}
613
614impl<P> LatencyReportOps<P> for dyn Stream<P>
615where
616 P: Element + HasLatency + 'static,
617{
618 fn latency_report(
619 self: &Rc<Self>,
620 print_on_teardown: bool,
621 ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
622 let report = LatencyReport::new(self.clone()).print_on_teardown(print_on_teardown);
623 let stats = report.stats();
624 (report.into_node(), stats)
625 }
626
627 fn latency_report_if(
628 self: &Rc<Self>,
629 enabled: bool,
630 print_on_teardown: bool,
631 ) -> (Rc<dyn Node>, Rc<std::cell::RefCell<LatencyStats<P::L>>>) {
632 if enabled {
633 self.latency_report(print_on_teardown)
634 } else {
635 let stats = Rc::new(std::cell::RefCell::new(LatencyStats::<P::L>::new()));
636 (self.clone().as_node(), stats)
640 }
641 }
642}
643
644#[cfg(test)]
649mod tests {
650 use super::*;
651 use crate::nodes::{CallBackStream, NodeOperators, StreamOperators};
652 use crate::queue::ValueAt;
653 use std::cell::RefCell;
654 use std::mem::{align_of, offset_of, size_of};
655
656 latency_stages! {
657 pub TradeLatency {
658 ingest,
659 decode,
660 strategy,
661 publish,
662 }
663 }
664
665 #[test]
668 fn latency_struct_layout_is_packed_u64s() {
669 assert_eq!(size_of::<TradeLatency>(), 4 * size_of::<u64>());
670 assert_eq!(align_of::<TradeLatency>(), align_of::<u64>());
671 assert_eq!(offset_of!(TradeLatency, ingest), 0);
672 assert_eq!(offset_of!(TradeLatency, decode), 8);
673 assert_eq!(offset_of!(TradeLatency, strategy), 16);
674 assert_eq!(offset_of!(TradeLatency, publish), 24);
675 }
676
677 #[test]
678 fn latency_trait_reports_n_and_names() {
679 assert_eq!(TradeLatency::N, 4);
680 assert_eq!(
681 TradeLatency::stage_names(),
682 &["ingest", "decode", "strategy", "publish"]
683 );
684 }
685
686 #[test]
687 fn stamps_slice_view_matches_named_fields() {
688 let l = TradeLatency {
689 ingest: 1,
690 decode: 2,
691 strategy: 3,
692 publish: 4,
693 };
694 assert_eq!(l.stamps(), &[1u64, 2, 3, 4]);
695 }
696
697 #[test]
698 fn stamp_mut_writes_named_field() {
699 let mut l = TradeLatency::default();
700 *l.stamp_mut(2) = 99;
701 assert_eq!(l.strategy, 99);
702 }
703
704 #[test]
705 #[should_panic(expected = "stage index out of bounds")]
706 fn stamp_mut_panics_out_of_bounds() {
707 let mut l = TradeLatency::default();
708 *l.stamp_mut(4) = 0;
709 }
710
711 #[test]
714 fn stage_markers_have_correct_index_and_name() {
715 assert_eq!(<trade_latency::ingest as Stage<TradeLatency>>::INDEX, 0);
716 assert_eq!(<trade_latency::publish as Stage<TradeLatency>>::INDEX, 3);
717 assert_eq!(
718 <trade_latency::strategy as Stage<TradeLatency>>::NAME,
719 "strategy"
720 );
721 }
722
723 #[test]
724 fn stage_stamp_writes_correct_field() {
725 let mut l = TradeLatency::default();
726 <trade_latency::strategy as Stage<TradeLatency>>::stamp(&mut l, 1234);
727 assert_eq!(l.strategy, 1234);
728 assert_eq!(l.ingest, 0);
729 assert_eq!(l.decode, 0);
730 assert_eq!(l.publish, 0);
731 }
732
733 #[test]
736 fn traced_layout_payload_first_no_padding_for_aligned_payload() {
737 let s = size_of::<Traced<u64, TradeLatency>>();
739 assert_eq!(s, size_of::<u64>() + size_of::<TradeLatency>());
740 assert_eq!(offset_of!(Traced<u64, TradeLatency>, payload), 0);
741 assert_eq!(
742 offset_of!(Traced<u64, TradeLatency>, latency),
743 size_of::<u64>()
744 );
745 }
746
747 #[test]
748 fn has_latency_round_trip() {
749 let mut t: Traced<u64, TradeLatency> = Traced::new(7);
750 t.latency_mut().strategy = 42;
751 assert_eq!(t.latency().strategy, 42);
752 assert_eq!(t.payload, 7);
753 }
754
755 #[test]
758 fn stamp_stream_writes_wall_time_into_named_stage() {
759 let cb = Rc::new(RefCell::new(
762 CallBackStream::<Traced<u64, TradeLatency>>::new(),
763 ));
764 cb.borrow_mut().push(ValueAt::new(
765 Traced::new(11u64),
766 crate::time::NanoTime::new(100),
767 ));
768 cb.borrow_mut().push(ValueAt::new(
769 Traced::new(22u64),
770 crate::time::NanoTime::new(250),
771 ));
772
773 let stamped = cb
774 .clone()
775 .as_stream()
776 .stamp::<trade_latency::strategy>()
777 .collect();
778
779 stamped
780 .run(
781 crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
782 crate::graph::RunFor::Forever,
783 )
784 .unwrap();
785
786 let collected = stamped.peek_value();
787 assert_eq!(collected.len(), 2);
788 assert_eq!(collected[0].value.payload, 11);
789 assert_eq!(collected[1].value.payload, 22);
790 assert_eq!(collected[0].value.latency.ingest, 0);
792 assert_eq!(collected[1].value.latency.ingest, 0);
793 assert!(collected[0].value.latency.strategy > 0);
795 assert!(collected[1].value.latency.strategy >= collected[0].value.latency.strategy);
796 }
797
798 #[test]
799 fn stamp_works_identically_in_historical_and_realtime() {
800 use std::time::Duration;
804 fn run_one(mode: crate::graph::RunMode) -> crate::time::NanoTime {
805 let stream = crate::nodes::ticker(Duration::from_millis(1))
806 .count()
807 .map(|seq: u64| Traced::<u64, TradeLatency>::new(seq))
808 .stamp::<trade_latency::ingest>()
809 .stamp_precise::<trade_latency::publish>()
810 .collect();
811 stream.run(mode, crate::graph::RunFor::Cycles(3)).unwrap();
812 let values = stream.peek_value();
813 assert!(!values.is_empty());
814 let l = values[0].value.latency;
815 assert!(l.ingest > 0, "ingest stamp should be populated");
816 assert!(l.publish >= l.ingest, "publish >= ingest");
817 crate::time::NanoTime::new(l.ingest)
818 }
819 let historical = run_one(crate::graph::RunMode::HistoricalFrom(
820 crate::time::NanoTime::ZERO,
821 ));
822 let realtime = run_one(crate::graph::RunMode::RealTime);
823 assert!(u64::from(historical) > 1_000_000_000);
826 assert!(u64::from(realtime) > 1_000_000_000);
827 }
828
829 #[test]
830 fn traced_serializes_via_serde_json() {
831 let original = Traced::with_latency(
835 42u32,
836 TradeLatency {
837 ingest: 100,
838 decode: 200,
839 strategy: 300,
840 publish: 400,
841 },
842 );
843 let bytes = serde_json::to_vec(&original).unwrap();
844 let round: Traced<u32, TradeLatency> = serde_json::from_slice(&bytes).unwrap();
845 assert_eq!(round, original);
846 }
847
848 #[test]
849 fn stamp_if_disabled_inserts_no_node() {
850 let cb = Rc::new(RefCell::new(
852 CallBackStream::<Traced<u64, TradeLatency>>::new(),
853 ));
854 let upstream = cb.clone().as_stream();
855 let stamped = upstream.stamp_if::<trade_latency::strategy>(false);
856 assert!(
857 Rc::ptr_eq(&upstream, &stamped),
858 "stamp_if(false) should be identity"
859 );
860 }
861
862 #[test]
863 fn stamp_precise_writes_fresh_timestamps() {
864 let cb = Rc::new(RefCell::new(
867 CallBackStream::<Traced<u64, TradeLatency>>::new(),
868 ));
869 cb.borrow_mut().push(ValueAt::new(
870 Traced::new(1u64),
871 crate::time::NanoTime::new(100),
872 ));
873
874 let stamped = cb
875 .clone()
876 .as_stream()
877 .stamp_precise::<trade_latency::ingest>()
878 .stamp_precise::<trade_latency::publish>()
879 .collect();
880
881 stamped
882 .run(
883 crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
884 crate::graph::RunFor::Forever,
885 )
886 .unwrap();
887
888 let collected = stamped.peek_value();
889 assert_eq!(collected.len(), 1);
890 let l = collected[0].value.latency;
891 assert!(l.ingest > 0);
892 assert!(l.publish >= l.ingest);
893 }
894
895 #[test]
898 fn stage_stats_records_min_mean_max() {
899 let mut s = StageStats::default();
900 s.record(10);
901 s.record(20);
902 s.record(30);
903 assert_eq!(s.count, 3);
904 assert_eq!(s.min_ns, 10);
905 assert_eq!(s.max_ns, 30);
906 assert_eq!(s.mean_ns(), 20);
907 }
908
909 #[test]
910 fn stage_stats_quantile_zero_when_empty() {
911 let s = StageStats::default();
912 assert_eq!(s.quantile_ns(0.5), 0);
913 assert_eq!(s.mean_ns(), 0);
914 }
915
916 #[test]
917 fn latency_stats_observes_deltas_between_adjacent_stages() {
918 let mut stats = LatencyStats::<TradeLatency>::new();
919 stats.observe(&TradeLatency {
921 ingest: 100,
922 decode: 150,
923 strategy: 200,
924 publish: 400,
925 });
926 assert_eq!(stats.stages[0].count, 0);
928 assert_eq!(stats.stages[1].count, 1);
929 assert_eq!(stats.stages[1].sum_ns, 50);
930 assert_eq!(stats.stages[2].sum_ns, 50);
931 assert_eq!(stats.stages[3].sum_ns, 200);
932 }
933
934 #[test]
935 fn latency_stats_skips_partial_stamps() {
936 let mut stats = LatencyStats::<TradeLatency>::new();
938 stats.observe(&TradeLatency {
939 ingest: 100,
940 decode: 0,
941 strategy: 200,
942 publish: 0,
943 });
944 for i in 1..TradeLatency::N {
946 assert_eq!(stats.stages[i].count, 0, "stage {i} should be skipped");
947 }
948 }
949
950 #[test]
951 fn latency_report_aggregates_across_ticks() {
952 let cb = Rc::new(RefCell::new(
953 CallBackStream::<Traced<u64, TradeLatency>>::new(),
954 ));
955 for (i, base) in [100u64, 200, 300].iter().enumerate() {
957 cb.borrow_mut().push(ValueAt::new(
958 Traced::with_latency(
959 i as u64,
960 TradeLatency {
961 ingest: *base,
962 decode: *base + 10,
963 strategy: *base + 30,
964 publish: *base + 60,
965 },
966 ),
967 crate::time::NanoTime::new(*base),
968 ));
969 }
970
971 let stream = cb.clone().as_stream();
972 let (sink, stats) = stream.latency_report(false);
973
974 sink.run(
975 crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
976 crate::graph::RunFor::Forever,
977 )
978 .unwrap();
979
980 let s = stats.borrow();
981 assert_eq!(s.stages[1].count, 3); assert_eq!(s.stages[1].mean_ns(), 10);
984 assert_eq!(s.stages[2].count, 3); assert_eq!(s.stages[2].mean_ns(), 20);
986 assert_eq!(s.stages[3].count, 3); assert_eq!(s.stages[3].mean_ns(), 30);
988 }
989
990 #[test]
991 fn format_report_renders_named_stages() {
992 let mut stats = LatencyStats::<TradeLatency>::new();
993 stats.observe(&TradeLatency {
994 ingest: 100,
995 decode: 200,
996 strategy: 400,
997 publish: 800,
998 });
999 let report = stats.format_report();
1000 assert!(report.contains("ingest -> decode"));
1001 assert!(report.contains("decode -> strategy"));
1002 assert!(report.contains("strategy -> publish"));
1003 }
1004
1005 #[test]
1006 fn multiple_stamps_compose() {
1007 let cb = Rc::new(RefCell::new(
1008 CallBackStream::<Traced<u64, TradeLatency>>::new(),
1009 ));
1010 cb.borrow_mut().push(ValueAt::new(
1011 Traced::new(1u64),
1012 crate::time::NanoTime::new(50),
1013 ));
1014
1015 let stamped = cb
1016 .clone()
1017 .as_stream()
1018 .stamp::<trade_latency::ingest>()
1019 .stamp::<trade_latency::strategy>()
1020 .stamp::<trade_latency::publish>()
1021 .collect();
1022
1023 stamped
1024 .run(
1025 crate::graph::RunMode::HistoricalFrom(crate::time::NanoTime::ZERO),
1026 crate::graph::RunFor::Forever,
1027 )
1028 .unwrap();
1029
1030 let collected = stamped.peek_value();
1031 assert_eq!(collected.len(), 1);
1032 let l = collected[0].value.latency;
1033 assert!(l.ingest > 0);
1036 assert_eq!(l.strategy, l.ingest);
1037 assert_eq!(l.publish, l.ingest);
1038 assert_eq!(l.decode, 0);
1039 }
1040
1041 #[cfg(feature = "iceoryx2")]
1047 mod type_name_propagation {
1048 use super::*;
1049 use iceoryx2::prelude::ZeroCopySend;
1050
1051 #[repr(C)]
1052 #[derive(Debug, Clone, Copy, Default, ZeroCopySend)]
1053 #[type_name("test::Payload")]
1054 struct Payload {
1055 v: u64,
1056 }
1057
1058 latency_stages! {
1059 #[type_name("test::PinnedLatency")]
1060 pub PinnedLatency {
1061 a,
1062 b,
1063 }
1064 }
1065
1066 #[test]
1067 fn leaf_overrides_propagate_through_traced() {
1068 assert_eq!(unsafe { Payload::type_name() }, "test::Payload");
1069 assert_eq!(unsafe { PinnedLatency::type_name() }, "test::PinnedLatency");
1070 assert_eq!(
1071 unsafe { Traced::<Payload, PinnedLatency>::type_name() },
1072 "wingfoil::Traced<test::Payload, test::PinnedLatency>",
1073 );
1074 }
1075 }
1076}