timely/dataflow/operators/capture/
event.rs

1//! Traits and types describing timely dataflow events.
2//!
3//! The `Event` type describes the information an operator can observe about a timely dataflow
4//! stream. There are two types of events, (i) the receipt of data and (ii) reports of progress
5//! of timestamps.
6
7/// Data and progress events of the captured stream.
8#[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)]
9pub enum EventCore<T, D> {
10    /// Progress received via `push_external_progress`.
11    Progress(Vec<(T, i64)>),
12    /// Messages received via the data stream.
13    Messages(T, D),
14}
15
16/// Data and progress events of the captured stream, specialized to vector-based containers.
17pub type Event<T, D> = EventCore<T, Vec<D>>;
18
19/// Iterates over contained `EventCore<T, D>`.
20///
21/// The `EventIterator` trait describes types that can iterate over references to events,
22/// and which can be used to replay a stream into a new timely dataflow computation.
23///
24/// This method is not simply an iterator because of the lifetime in the result.
25pub trait EventIteratorCore<T, D> {
26    /// Iterates over references to `EventCore<T, D>` elements.
27    fn next(&mut self) -> Option<&EventCore<T, D>>;
28}
29
30/// A [EventIteratorCore] specialized to vector-based containers.
31// TODO: use trait aliases once stable.
32pub trait EventIterator<T, D>: EventIteratorCore<T, Vec<D>> {
33    /// Iterates over references to `Event<T, D>` elements.
34    fn next(&mut self) -> Option<&Event<T, D>>;
35}
36impl<T, D, E: EventIteratorCore<T, Vec<D>>> EventIterator<T, D> for E {
37    fn next(&mut self) -> Option<&Event<T, D>> {
38        <Self as EventIteratorCore<_, _>>::next(self)
39    }
40}
41
42
43/// Receives `EventCore<T, D>` events.
44pub trait EventPusherCore<T, D> {
45    /// Provides a new `Event<T, D>` to the pusher.
46    fn push(&mut self, event: EventCore<T, D>);
47}
48
49/// A [EventPusherCore] specialized to vector-based containers.
50// TODO: use trait aliases once stable.
51pub trait EventPusher<T, D>: EventPusherCore<T, Vec<D>> {}
52impl<T, D, E: EventPusherCore<T, Vec<D>>> EventPusher<T, D> for E {}
53
54
55// implementation for the linked list behind a `Handle`.
56impl<T, D> EventPusherCore<T, D> for ::std::sync::mpsc::Sender<EventCore<T, D>> {
57    fn push(&mut self, event: EventCore<T, D>) {
58        // NOTE: An Err(x) result just means "data not accepted" most likely
59        //       because the receiver is gone. No need to panic.
60        let _ = self.send(event);
61    }
62}
63
64/// A linked-list event pusher and iterator.
65pub mod link {
66
67    use std::rc::Rc;
68    use std::cell::RefCell;
69
70    use super::{EventCore, EventPusherCore, EventIteratorCore};
71
72    /// A linked list of EventCore<T, D>.
73    pub struct EventLinkCore<T, D> {
74        /// An event, if one exists.
75        ///
76        /// An event might not exist, if either we want to insert a `None` and have the output iterator pause,
77        /// or in the case of the very first linked list element, which has no event when constructed.
78        pub event: Option<EventCore<T, D>>,
79        /// The next event, if it exists.
80        pub next: RefCell<Option<Rc<EventLinkCore<T, D>>>>,
81    }
82
83    /// A [EventLinkCore] specialized to vector-based containers.
84    pub type EventLink<T, D> = EventLinkCore<T, Vec<D>>;
85
86    impl<T, D> EventLinkCore<T, D> {
87        /// Allocates a new `EventLink`.
88        pub fn new() -> EventLinkCore<T, D> {
89            EventLinkCore { event: None, next: RefCell::new(None) }
90        }
91    }
92
93    // implementation for the linked list behind a `Handle`.
94    impl<T, D> EventPusherCore<T, D> for Rc<EventLinkCore<T, D>> {
95        fn push(&mut self, event: EventCore<T, D>) {
96            *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) }));
97            let next = self.next.borrow().as_ref().unwrap().clone();
98            *self = next;
99        }
100    }
101
102    impl<T, D> EventIteratorCore<T, D> for Rc<EventLinkCore<T, D>> {
103        fn next(&mut self) -> Option<&EventCore<T, D>> {
104            let is_some = self.next.borrow().is_some();
105            if is_some {
106                let next = self.next.borrow().as_ref().unwrap().clone();
107                *self = next;
108                self.event.as_ref()
109            }
110            else {
111                None
112            }
113        }
114    }
115
116    // Drop implementation to prevent stack overflow through naive drop impl.
117    impl<T, D> Drop for EventLinkCore<T, D> {
118        fn drop(&mut self) {
119            while let Some(link) = self.next.replace(None) {
120                if let Ok(head) = Rc::try_unwrap(link) {
121                    *self = head;
122                }
123            }
124        }
125    }
126
127    impl<T, D> Default for EventLinkCore<T, D> {
128        fn default() -> Self {
129            Self::new()
130        }
131    }
132
133    #[test]
134    fn avoid_stack_overflow_in_drop() {
135        let mut event1 = Rc::new(EventLinkCore::<(),()>::new());
136        let _event2 = event1.clone();
137        for _ in 0 .. 1_000_000 {
138            event1.push(EventCore::Progress(vec![]));
139        }
140    }
141}
142
143/// A binary event pusher and iterator.
144pub mod binary {
145
146    use std::io::Write;
147    use abomonation::Abomonation;
148    use super::{EventCore, EventPusherCore, EventIteratorCore};
149
150    /// A wrapper for `W: Write` implementing `EventPusherCore<T, D>`.
151    pub struct EventWriterCore<T, D, W: ::std::io::Write> {
152        stream: W,
153        phant: ::std::marker::PhantomData<(T,D)>,
154    }
155
156    /// [EventWriterCore] specialized to vector-based containers.
157    pub type EventWriter<T, D, W> = EventWriterCore<T, Vec<D>, W>;
158
159    impl<T, D, W: ::std::io::Write> EventWriterCore<T, D, W> {
160        /// Allocates a new `EventWriter` wrapping a supplied writer.
161        pub fn new(w: W) -> Self {
162            Self {
163                stream: w,
164                phant: ::std::marker::PhantomData,
165            }
166        }
167    }
168
169    impl<T: Abomonation, D: Abomonation, W: ::std::io::Write> EventPusherCore<T, D> for EventWriterCore<T, D, W> {
170        fn push(&mut self, event: EventCore<T, D>) {
171            // TODO: `push` has no mechanism to report errors, so we `unwrap`.
172            unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); }
173        }
174    }
175
176    /// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
177    pub struct EventReaderCore<T, D, R: ::std::io::Read> {
178        reader: R,
179        bytes: Vec<u8>,
180        buff1: Vec<u8>,
181        buff2: Vec<u8>,
182        consumed: usize,
183        valid: usize,
184        phant: ::std::marker::PhantomData<(T,D)>,
185    }
186
187    /// [EventReaderCore] specialized to vector-based containers.
188    pub type EventReader<T, D, R> = EventReaderCore<T, Vec<D>, R>;
189
190    impl<T, D, R: ::std::io::Read> EventReaderCore<T, D, R> {
191        /// Allocates a new `EventReader` wrapping a supplied reader.
192        pub fn new(r: R) -> Self {
193            Self {
194                reader: r,
195                bytes: vec![0u8; 1 << 20],
196                buff1: vec![],
197                buff2: vec![],
198                consumed: 0,
199                valid: 0,
200                phant: ::std::marker::PhantomData,
201            }
202        }
203    }
204
205    impl<T: Abomonation, D: Abomonation, R: ::std::io::Read> EventIteratorCore<T, D> for EventReaderCore<T, D, R> {
206        fn next(&mut self) -> Option<&EventCore<T, D>> {
207
208            // if we can decode something, we should just return it! :D
209            if unsafe { ::abomonation::decode::<EventCore<T,D>>(&mut self.buff1[self.consumed..]) }.is_some() {
210                let (item, rest) = unsafe { ::abomonation::decode::<EventCore<T,D>>(&mut self.buff1[self.consumed..]) }.unwrap();
211                self.consumed = self.valid - rest.len();
212                return Some(item);
213            }
214            // if we exhaust data we should shift back (if any shifting to do)
215            if self.consumed > 0 {
216                self.buff2.clear();
217                self.buff2.write_all(&self.buff1[self.consumed..]).unwrap();
218                ::std::mem::swap(&mut self.buff1, &mut self.buff2);
219                self.valid = self.buff1.len();
220                self.consumed = 0;
221            }
222
223            if let Ok(len) = self.reader.read(&mut self.bytes[..]) {
224                self.buff1.write_all(&self.bytes[..len]).unwrap();
225                self.valid = self.buff1.len();
226            }
227
228            None
229        }
230    }
231}