dpc-pariter 0.5.1

Parallel iterator processing
Documentation
use dpc_pariter::IteratorExt;

use std::time;

/// Custom [`readahead::Profiler`] implementation
///
/// This implementation will reports on `stderr` that
/// a given side of a pipeline is blocked, waiting for
/// the other side.
///
/// In a real code this might be logging using `log`,
/// capturing metrics, etc.
struct StderrMsgProfiler {
    name: String,
    start: time::Instant,
    duration: time::Duration,
}

impl StderrMsgProfiler {
    fn new(name: &str) -> Self {
        Self {
            start: time::Instant::now(),
            duration: time::Duration::default(),
            name: name.to_string(),
        }
    }
}

impl dpc_pariter::Profiler for StderrMsgProfiler {
    fn start(&mut self) {
        self.start = time::Instant::now();
    }

    fn end(&mut self) {
        self.duration += time::Instant::now()
            .duration_since(self.start)
            // Even with absolutely no delay waiting for
            // the other side of the channel a send/recv will take some time.
            // Substract some tiny value to account for it, to prevent
            // rare but spurious and confusing messages.
            .saturating_sub(time::Duration::from_millis(1));

        let min_duration_to_report = time::Duration::from_millis(100);
        if min_duration_to_report <= self.duration {
            eprintln!(
                "{} was blocked waiting for {}ms",
                self.name,
                self.duration.as_millis()
            );
            self.duration -= min_duration_to_report;
        }
    }
}

fn main() {
    dpc_pariter::scope(|scope| {
        (0..22)
            .map(|i| {
                // make producting values slow
                std::thread::sleep(time::Duration::from_millis(10));
                i
            })
            .readahead_scoped_profiled(
                scope,
                dpc_pariter::TotalTimeProfiler::periodically_millis(2_000, || {
                    eprintln!("Blocked on sending")
                }),
                dpc_pariter::TotalTimeProfiler::new(|stat| {
                    eprintln!(
                        "Sending receiving wait time: {}ms",
                        stat.total().as_millis()
                    )
                }),
            )
            .for_each(|i| {
                println!("{i}");
            })
    })
    .expect("thread panicked");

    (0..22)
        .profile_egress(StderrMsgProfiler::new("sending"))
        .readahead()
        .profile_ingress(StderrMsgProfiler::new("receiving"))
        .for_each(|i| {
            println!("{i}");
            // make consuming values slow
            std::thread::sleep(time::Duration::from_millis(10));
        });

    dpc_pariter::scope(|scope| {
        (0..22)
            .map(|i| {
                // make producting values slow
                std::thread::sleep(time::Duration::from_millis(10));
                i
            })
            .readahead_scoped_profiled(
                scope,
                StderrMsgProfiler::new("sending2"),
                StderrMsgProfiler::new("receiving2"),
            )
            .for_each(|i| {
                println!("{i}");
            })
    })
    .expect("thread panicked");
}