timely/dataflow/channels/pushers/
counter.rs

1//! A wrapper which counts the number of records pushed past and updates a shared count map.
2
3use std::marker::PhantomData;
4use std::rc::Rc;
5use std::cell::RefCell;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::dataflow::channels::BundleCore;
9use crate::communication::Push;
10use crate::Container;
11
12/// A wrapper which updates shared `produced` based on the number of records pushed.
13#[derive(Debug)]
14pub struct CounterCore<T, D, P: Push<BundleCore<T, D>>> {
15    pushee: P,
16    produced: Rc<RefCell<ChangeBatch<T>>>,
17    phantom: PhantomData<D>,
18}
19
20/// A counter specialized to vector.
21pub type Counter<T, D, P> = CounterCore<T, Vec<D>, P>;
22
23impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D, P> where P: Push<BundleCore<T, D>> {
24    #[inline]
25    fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
26        if let Some(message) = message {
27            self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
28        }
29
30        // only propagate `None` if dirty (indicates flush)
31        if message.is_some() || !self.produced.borrow_mut().is_empty() {
32            self.pushee.push(message);
33        }
34    }
35}
36
37impl<T, D, P: Push<BundleCore<T, D>>> CounterCore<T, D, P> where T : Ord+Clone+'static {
38    /// Allocates a new `Counter` from a pushee and shared counts.
39    pub fn new(pushee: P) -> CounterCore<T, D, P> {
40        CounterCore {
41            pushee,
42            produced: Rc::new(RefCell::new(ChangeBatch::new())),
43            phantom: PhantomData,
44        }
45    }
46    /// A references to shared changes in counts, for cloning or draining.
47    #[inline]
48    pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
49        &self.produced
50    }
51}