timely/dataflow/channels/pushers/
tee.rs

1//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.
2
3use std::cell::RefCell;
4use std::fmt::{self, Debug};
5use std::rc::Rc;
6
7use crate::dataflow::channels::{BundleCore, Message};
8
9use crate::communication::Push;
10use crate::{Container, Data};
11
12type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<BundleCore<T, D>>>>>>;
13
14/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
15pub struct TeeCore<T, D> {
16    buffer: D,
17    shared: PushList<T, D>,
18}
19
20/// [TeeCore] specialized to `Vec`-based container.
21pub type Tee<T, D> = TeeCore<T, Vec<D>>;
22
23impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
24    #[inline]
25    fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
26        let mut pushers = self.shared.borrow_mut();
27        if let Some(message) = message {
28            for index in 1..pushers.len() {
29                self.buffer.clone_from(&message.data);
30                Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
31            }
32        }
33        else {
34            for index in 1..pushers.len() {
35                pushers[index-1].push(&mut None);
36            }
37        }
38        if pushers.len() > 0 {
39            let last = pushers.len() - 1;
40            pushers[last].push(message);
41        }
42    }
43}
44
45impl<T, D: Container> TeeCore<T, D> {
46    /// Allocates a new pair of `Tee` and `TeeHelper`.
47    pub fn new() -> (TeeCore<T, D>, TeeHelper<T, D>) {
48        let shared = Rc::new(RefCell::new(Vec::new()));
49        let port = TeeCore {
50            buffer: Default::default(),
51            shared: shared.clone(),
52        };
53
54        (port, TeeHelper { shared })
55    }
56}
57
58impl<T, D: Container> Clone for TeeCore<T, D> {
59    fn clone(&self) -> Self {
60        Self {
61            buffer: Default::default(),
62            shared: self.shared.clone(),
63        }
64    }
65}
66
67impl<T, D> Debug for TeeCore<T, D>
68where
69    D: Debug,
70{
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        let mut debug = f.debug_struct("Tee");
73        debug.field("buffer", &self.buffer);
74
75        if let Ok(shared) = self.shared.try_borrow() {
76            debug.field("shared", &format!("{} pushers", shared.len()));
77        } else {
78            debug.field("shared", &"...");
79        }
80
81        debug.finish()
82    }
83}
84
85/// A shared list of `Box<Push>` used to add `Push` implementors.
86pub struct TeeHelper<T, D> {
87    shared: PushList<T, D>,
88}
89
90impl<T, D> TeeHelper<T, D> {
91    /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
92    pub fn add_pusher<P: Push<BundleCore<T, D>>+'static>(&self, pusher: P) {
93        self.shared.borrow_mut().push(Box::new(pusher));
94    }
95}
96
97impl<T, D> Clone for TeeHelper<T, D> {
98    fn clone(&self) -> Self {
99        TeeHelper {
100            shared: self.shared.clone(),
101        }
102    }
103}
104
105impl<T, D> Debug for TeeHelper<T, D> {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        let mut debug = f.debug_struct("TeeHelper");
108
109        if let Ok(shared) = self.shared.try_borrow() {
110            debug.field("shared", &format!("{} pushers", shared.len()));
111        } else {
112            debug.field("shared", &"...");
113        }
114
115        debug.finish()
116    }
117}