timely/dataflow/channels/pushers/
tee.rs1use std::cell::RefCell;
4use std::fmt::{self, Debug};
5use std::rc::Rc;
6
7use crate::dataflow::channels::{BundleCore, Message};
8
9use crate::communication::Push;
10use crate::{Container, Data};
11
12type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<BundleCore<T, D>>>>>>;
13
14pub struct TeeCore<T, D> {
16 buffer: D,
17 shared: PushList<T, D>,
18}
19
20pub type Tee<T, D> = TeeCore<T, Vec<D>>;
22
23impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
24 #[inline]
25 fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
26 let mut pushers = self.shared.borrow_mut();
27 if let Some(message) = message {
28 for index in 1..pushers.len() {
29 self.buffer.clone_from(&message.data);
30 Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
31 }
32 }
33 else {
34 for index in 1..pushers.len() {
35 pushers[index-1].push(&mut None);
36 }
37 }
38 if pushers.len() > 0 {
39 let last = pushers.len() - 1;
40 pushers[last].push(message);
41 }
42 }
43}
44
45impl<T, D: Container> TeeCore<T, D> {
46 pub fn new() -> (TeeCore<T, D>, TeeHelper<T, D>) {
48 let shared = Rc::new(RefCell::new(Vec::new()));
49 let port = TeeCore {
50 buffer: Default::default(),
51 shared: shared.clone(),
52 };
53
54 (port, TeeHelper { shared })
55 }
56}
57
58impl<T, D: Container> Clone for TeeCore<T, D> {
59 fn clone(&self) -> Self {
60 Self {
61 buffer: Default::default(),
62 shared: self.shared.clone(),
63 }
64 }
65}
66
67impl<T, D> Debug for TeeCore<T, D>
68where
69 D: Debug,
70{
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 let mut debug = f.debug_struct("Tee");
73 debug.field("buffer", &self.buffer);
74
75 if let Ok(shared) = self.shared.try_borrow() {
76 debug.field("shared", &format!("{} pushers", shared.len()));
77 } else {
78 debug.field("shared", &"...");
79 }
80
81 debug.finish()
82 }
83}
84
85pub struct TeeHelper<T, D> {
87 shared: PushList<T, D>,
88}
89
90impl<T, D> TeeHelper<T, D> {
91 pub fn add_pusher<P: Push<BundleCore<T, D>>+'static>(&self, pusher: P) {
93 self.shared.borrow_mut().push(Box::new(pusher));
94 }
95}
96
97impl<T, D> Clone for TeeHelper<T, D> {
98 fn clone(&self) -> Self {
99 TeeHelper {
100 shared: self.shared.clone(),
101 }
102 }
103}
104
105impl<T, D> Debug for TeeHelper<T, D> {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 let mut debug = f.debug_struct("TeeHelper");
108
109 if let Ok(shared) = self.shared.try_borrow() {
110 debug.field("shared", &format!("{} pushers", shared.len()));
111 } else {
112 debug.field("shared", &"...");
113 }
114
115 debug.finish()
116 }
117}