timely/dataflow/operators/capture/
event.rs1#[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)]
9pub enum EventCore<T, D> {
10 Progress(Vec<(T, i64)>),
12 Messages(T, D),
14}
15
16pub type Event<T, D> = EventCore<T, Vec<D>>;
18
19pub trait EventIteratorCore<T, D> {
26 fn next(&mut self) -> Option<&EventCore<T, D>>;
28}
29
30pub trait EventIterator<T, D>: EventIteratorCore<T, Vec<D>> {
33 fn next(&mut self) -> Option<&Event<T, D>>;
35}
36impl<T, D, E: EventIteratorCore<T, Vec<D>>> EventIterator<T, D> for E {
37 fn next(&mut self) -> Option<&Event<T, D>> {
38 <Self as EventIteratorCore<_, _>>::next(self)
39 }
40}
41
42
43pub trait EventPusherCore<T, D> {
45 fn push(&mut self, event: EventCore<T, D>);
47}
48
49pub trait EventPusher<T, D>: EventPusherCore<T, Vec<D>> {}
52impl<T, D, E: EventPusherCore<T, Vec<D>>> EventPusher<T, D> for E {}
53
54
55impl<T, D> EventPusherCore<T, D> for ::std::sync::mpsc::Sender<EventCore<T, D>> {
57 fn push(&mut self, event: EventCore<T, D>) {
58 let _ = self.send(event);
61 }
62}
63
64pub mod link {
66
67 use std::rc::Rc;
68 use std::cell::RefCell;
69
70 use super::{EventCore, EventPusherCore, EventIteratorCore};
71
72 pub struct EventLinkCore<T, D> {
74 pub event: Option<EventCore<T, D>>,
79 pub next: RefCell<Option<Rc<EventLinkCore<T, D>>>>,
81 }
82
83 pub type EventLink<T, D> = EventLinkCore<T, Vec<D>>;
85
86 impl<T, D> EventLinkCore<T, D> {
87 pub fn new() -> EventLinkCore<T, D> {
89 EventLinkCore { event: None, next: RefCell::new(None) }
90 }
91 }
92
93 impl<T, D> EventPusherCore<T, D> for Rc<EventLinkCore<T, D>> {
95 fn push(&mut self, event: EventCore<T, D>) {
96 *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) }));
97 let next = self.next.borrow().as_ref().unwrap().clone();
98 *self = next;
99 }
100 }
101
102 impl<T, D> EventIteratorCore<T, D> for Rc<EventLinkCore<T, D>> {
103 fn next(&mut self) -> Option<&EventCore<T, D>> {
104 let is_some = self.next.borrow().is_some();
105 if is_some {
106 let next = self.next.borrow().as_ref().unwrap().clone();
107 *self = next;
108 self.event.as_ref()
109 }
110 else {
111 None
112 }
113 }
114 }
115
116 impl<T, D> Drop for EventLinkCore<T, D> {
118 fn drop(&mut self) {
119 while let Some(link) = self.next.replace(None) {
120 if let Ok(head) = Rc::try_unwrap(link) {
121 *self = head;
122 }
123 }
124 }
125 }
126
127 impl<T, D> Default for EventLinkCore<T, D> {
128 fn default() -> Self {
129 Self::new()
130 }
131 }
132
133 #[test]
134 fn avoid_stack_overflow_in_drop() {
135 let mut event1 = Rc::new(EventLinkCore::<(),()>::new());
136 let _event2 = event1.clone();
137 for _ in 0 .. 1_000_000 {
138 event1.push(EventCore::Progress(vec![]));
139 }
140 }
141}
142
143pub mod binary {
145
146 use std::io::Write;
147 use abomonation::Abomonation;
148 use super::{EventCore, EventPusherCore, EventIteratorCore};
149
150 pub struct EventWriterCore<T, D, W: ::std::io::Write> {
152 stream: W,
153 phant: ::std::marker::PhantomData<(T,D)>,
154 }
155
156 pub type EventWriter<T, D, W> = EventWriterCore<T, Vec<D>, W>;
158
159 impl<T, D, W: ::std::io::Write> EventWriterCore<T, D, W> {
160 pub fn new(w: W) -> Self {
162 Self {
163 stream: w,
164 phant: ::std::marker::PhantomData,
165 }
166 }
167 }
168
169 impl<T: Abomonation, D: Abomonation, W: ::std::io::Write> EventPusherCore<T, D> for EventWriterCore<T, D, W> {
170 fn push(&mut self, event: EventCore<T, D>) {
171 unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); }
173 }
174 }
175
176 pub struct EventReaderCore<T, D, R: ::std::io::Read> {
178 reader: R,
179 bytes: Vec<u8>,
180 buff1: Vec<u8>,
181 buff2: Vec<u8>,
182 consumed: usize,
183 valid: usize,
184 phant: ::std::marker::PhantomData<(T,D)>,
185 }
186
187 pub type EventReader<T, D, R> = EventReaderCore<T, Vec<D>, R>;
189
190 impl<T, D, R: ::std::io::Read> EventReaderCore<T, D, R> {
191 pub fn new(r: R) -> Self {
193 Self {
194 reader: r,
195 bytes: vec![0u8; 1 << 20],
196 buff1: vec![],
197 buff2: vec![],
198 consumed: 0,
199 valid: 0,
200 phant: ::std::marker::PhantomData,
201 }
202 }
203 }
204
205 impl<T: Abomonation, D: Abomonation, R: ::std::io::Read> EventIteratorCore<T, D> for EventReaderCore<T, D, R> {
206 fn next(&mut self) -> Option<&EventCore<T, D>> {
207
208 if unsafe { ::abomonation::decode::<EventCore<T,D>>(&mut self.buff1[self.consumed..]) }.is_some() {
210 let (item, rest) = unsafe { ::abomonation::decode::<EventCore<T,D>>(&mut self.buff1[self.consumed..]) }.unwrap();
211 self.consumed = self.valid - rest.len();
212 return Some(item);
213 }
214 if self.consumed > 0 {
216 self.buff2.clear();
217 self.buff2.write_all(&self.buff1[self.consumed..]).unwrap();
218 ::std::mem::swap(&mut self.buff1, &mut self.buff2);
219 self.valid = self.buff1.len();
220 self.consumed = 0;
221 }
222
223 if let Ok(len) = self.reader.read(&mut self.bytes[..]) {
224 self.buff1.write_all(&self.bytes[..len]).unwrap();
225 self.valid = self.buff1.len();
226 }
227
228 None
229 }
230 }
231}