simple_profiler/
simple_profiler.rs

1use dpc_pariter::IteratorExt;
2
3use std::time;
4
5/// Custom [`readahead::Profiler`] implementation
6///
7/// This implementation will reports on `stderr` that
8/// a given side of a pipeline is blocked, waiting for
9/// the other side.
10///
11/// In a real code this might be logging using `log`,
12/// capturing metrics, etc.
13struct StderrMsgProfiler {
14    name: String,
15    start: time::Instant,
16    duration: time::Duration,
17}
18
19impl StderrMsgProfiler {
20    fn new(name: &str) -> Self {
21        Self {
22            start: time::Instant::now(),
23            duration: time::Duration::default(),
24            name: name.to_string(),
25        }
26    }
27}
28
29impl dpc_pariter::Profiler for StderrMsgProfiler {
30    fn start(&mut self) {
31        self.start = time::Instant::now();
32    }
33
34    fn end(&mut self) {
35        self.duration += time::Instant::now()
36            .duration_since(self.start)
37            // Even with absolutely no delay waiting for
38            // the other side of the channel a send/recv will take some time.
39            // Substract some tiny value to account for it, to prevent
40            // rare but spurious and confusing messages.
41            .saturating_sub(time::Duration::from_millis(1));
42
43        let min_duration_to_report = time::Duration::from_millis(100);
44        if min_duration_to_report <= self.duration {
45            eprintln!(
46                "{} was blocked waiting for {}ms",
47                self.name,
48                self.duration.as_millis()
49            );
50            self.duration -= min_duration_to_report;
51        }
52    }
53}
54
55fn main() {
56    dpc_pariter::scope(|scope| {
57        (0..22)
58            .map(|i| {
59                // make producting values slow
60                std::thread::sleep(time::Duration::from_millis(10));
61                i
62            })
63            .readahead_scoped_profiled(
64                scope,
65                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
66                    eprintln!("Blocked on sending")
67                }),
68                dpc_pariter::TotalTimeProfiler::new(|stat| {
69                    eprintln!(
70                        "Sending receiving wait time: {}ms",
71                        stat.total().as_millis()
72                    )
73                }),
74            )
75            .for_each(|i| {
76                println!("{i}");
77            })
78    })
79    .expect("thread panicked");
80
81    (0..22)
82        .profile_egress(StderrMsgProfiler::new("sending"))
83        .readahead()
84        .profile_ingress(StderrMsgProfiler::new("receiving"))
85        .for_each(|i| {
86            println!("{i}");
87            // make consuming values slow
88            std::thread::sleep(time::Duration::from_millis(10));
89        });
90
91    dpc_pariter::scope(|scope| {
92        (0..22)
93            .map(|i| {
94                // make producting values slow
95                std::thread::sleep(time::Duration::from_millis(10));
96                i
97            })
98            .readahead_scoped_profiled(
99                scope,
100                StderrMsgProfiler::new("sending2"),
101                StderrMsgProfiler::new("receiving2"),
102            )
103            .for_each(|i| {
104                println!("{i}");
105            })
106    })
107    .expect("thread panicked");
108}