1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::{marker::PhantomData, time::Duration};
use timely::dataflow::operators::capture::{Event as RawEvent, EventPusher};

/// Logs events from a timely stream, including progress information
/// and logging messages
#[derive(Debug)]
pub struct BatchLogger<T, D, P>
where
    P: EventPusher<Duration, (Duration, D, T)>,
{
    // None when the logging stream is closed
    time: Duration,
    event_pusher: P,
    __type: PhantomData<(D, T)>,
}

impl<T, D, P> BatchLogger<T, D, P>
where
    P: EventPusher<Duration, (Duration, D, T)>,
{
    /// Creates a new batch logger.
    pub fn new(event_pusher: P) -> Self {
        BatchLogger {
            time: Default::default(),
            event_pusher,
            __type: PhantomData,
        }
    }

    /// Publishes a batch of logged events and advances the capability.
    pub fn publish_batch<D2, T2>(&mut self, &time: &Duration, data: &mut Vec<(Duration, D2, T2)>)
    where
        D: From<D2>,
        T: From<T2>,
    {
        if !data.is_empty() {
            self.event_pusher.push(RawEvent::Messages(
                self.time,
                data.drain(..)
                    .map(|(time, worker, data)| (time, D::from(worker), T::from(data)))
                    .collect(),
            ));
        }

        if self.time < time {
            let new_frontier = time;
            let old_frontier = self.time;

            self.event_pusher.push(RawEvent::Progress(vec![
                (new_frontier, 1),
                (old_frontier, -1),
            ]));
        }

        self.time = time;
    }
}

impl<T, E, P> Drop for BatchLogger<T, E, P>
where
    P: EventPusher<Duration, (Duration, E, T)>,
{
    fn drop(&mut self) {
        self.event_pusher
            .push(RawEvent::Progress(vec![(self.time, -1)]));
    }
}