ddshow 0.2.2

Timely and Differential dataflow log analysis and vizualization
use differential_dataflow::{difference::Semigroup, AsCollection, Collection, Data};
use std::{fmt::Debug, panic::Location};
use timely::dataflow::{
    channels::pact::Pipeline,
    operators::{Inspect, Operator},
    Scope, Stream,
};
use tracing::{level_filters::STATIC_MAX_LEVEL, metadata::LevelFilter};

pub trait InspectExt {
    type Value;

    fn debug_inspect<F>(&self, inspect: F) -> Self
    where
        F: FnMut(&Self::Value) + 'static;

    #[track_caller]
    fn debug(&self) -> Self
    where
        Self: Sized,
        Self::Value: Debug,
    {
        let location = Location::caller();
        self.debug_inspect(move |value| {
            println!(
                "[{}:{}:{}]: {:?}",
                location.file(),
                location.line(),
                location.column(),
                value,
            );
        })
    }

    #[track_caller]
    fn debug_with(&self, name: &str) -> Self
    where
        Self: Sized,
        Self::Value: Debug,
    {
        let name = name.to_owned();
        self.debug_inspect(move |value| {
            println!("[{}]: {:?}", name, value);
        })
    }

    fn debug_frontier(&self) -> Self;

    fn debug_frontier_with(&self, with: &str) -> Self;
}

impl<S, D, R> InspectExt for Collection<S, D, R>
where
    S: Scope,
    D: Data,
    R: Semigroup,
{
    type Value = (D, S::Timestamp, R);

    #[track_caller]
    fn debug_inspect<F>(&self, inspect: F) -> Self
    where
        F: FnMut(&Self::Value) + 'static,
    {
        if cfg!(debug_assertions) {
            self.inspect(inspect)
        } else {
            self.clone()
        }
    }

    #[track_caller]
    fn debug_frontier(&self) -> Self {
        self.inner.debug_frontier().as_collection()
    }

    #[track_caller]
    fn debug_frontier_with(&self, with: &str) -> Self {
        self.inner.debug_frontier_with(with).as_collection()
    }
}

impl<S, D> InspectExt for Stream<S, D>
where
    S: Scope,
    D: Data,
{
    type Value = D;

    #[track_caller]
    fn debug_inspect<F>(&self, inspect: F) -> Self
    where
        F: FnMut(&Self::Value) + 'static,
    {
        if cfg!(debug_assertions) {
            self.inspect(inspect)
        } else {
            self.clone()
        }
    }

    #[track_caller]
    fn debug_frontier(&self) -> Self {
        if STATIC_MAX_LEVEL >= LevelFilter::TRACE {
            let worker = self.scope().index();
            let caller = Location::caller();
            let mut buffer = Vec::new();

            self.unary_frontier(Pipeline, &located!("InspectFrontier"), move |_, _| {
                move |input, output| {
                    input.for_each(|time, data| {
                        data.swap(&mut buffer);
                        output.session(&time).give_vec(&mut buffer);
                    });

                    tracing::trace!(
                        target: "inspect_frontier",
                        worker = worker,
                        frontier = ?input.frontier().frontier(),
                        "frontier at {}:{}:{}",
                        caller.file(),
                        caller.line(),
                        caller.column(),
                    );
                }
            })
        } else {
            self.clone()
        }
    }

    #[track_caller]
    fn debug_frontier_with(&self, with: &str) -> Self {
        if STATIC_MAX_LEVEL == LevelFilter::TRACE {
            let with = with.to_owned();
            let worker = self.scope().index();
            let caller = Location::caller();
            let mut buffer = Vec::new();

            self.unary_frontier(Pipeline, &located!("InspectFrontier"), move |_, _| {
                move |input, output| {
                    input.for_each(|time, data| {
                        data.swap(&mut buffer);
                        output.session(&time).give_vec(&mut buffer);
                    });

                    tracing::trace!(
                        target: "inspect_frontier",
                        worker = worker,
                        frontier = ?input.frontier().frontier(),
                        "{} frontier at {}:{}:{}",
                        with,
                        caller.file(),
                        caller.line(),
                        caller.column(),
                    );
                }
            })
        } else {
            self.clone()
        }
    }
}

impl<S> InspectExt for Option<S>
where
    S: InspectExt,
{
    type Value = <S as InspectExt>::Value;

    #[track_caller]
    fn debug_inspect<F>(&self, inspect: F) -> Self
    where
        F: FnMut(&Self::Value) + 'static,
    {
        self.as_ref().map(|stream| stream.debug_inspect(inspect))
    }

    #[track_caller]
    fn debug_frontier(&self) -> Self {
        self.as_ref().map(|stream| stream.debug_frontier())
    }

    #[track_caller]
    fn debug_frontier_with(&self, with: &str) -> Self {
        self.as_ref().map(|stream| stream.debug_frontier_with(with))
    }
}