timely/dataflow/channels/pushers/
counter.rs1use std::marker::PhantomData;
4use std::rc::Rc;
5use std::cell::RefCell;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::dataflow::channels::BundleCore;
9use crate::communication::Push;
10use crate::Container;
11
12#[derive(Debug)]
14pub struct CounterCore<T, D, P: Push<BundleCore<T, D>>> {
15 pushee: P,
16 produced: Rc<RefCell<ChangeBatch<T>>>,
17 phantom: PhantomData<D>,
18}
19
20pub type Counter<T, D, P> = CounterCore<T, Vec<D>, P>;
22
23impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D, P> where P: Push<BundleCore<T, D>> {
24 #[inline]
25 fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
26 if let Some(message) = message {
27 self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
28 }
29
30 if message.is_some() || !self.produced.borrow_mut().is_empty() {
32 self.pushee.push(message);
33 }
34 }
35}
36
37impl<T, D, P: Push<BundleCore<T, D>>> CounterCore<T, D, P> where T : Ord+Clone+'static {
38 pub fn new(pushee: P) -> CounterCore<T, D, P> {
40 CounterCore {
41 pushee,
42 produced: Rc::new(RefCell::new(ChangeBatch::new())),
43 phantom: PhantomData,
44 }
45 }
46 #[inline]
48 pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
49 &self.produced
50 }
51}