timely/dataflow/channels/pushers/
buffer.rs1use crate::dataflow::channels::{Bundle, BundleCore, Message};
5use crate::progress::Timestamp;
6use crate::dataflow::operators::Capability;
7use crate::communication::Push;
8use crate::{Container, Data};
9
10#[derive(Debug)]
15pub struct BufferCore<T, D: Container, P: Push<BundleCore<T, D>>> {
16 time: Option<T>,
18 buffer: D,
20 pusher: P,
21}
22
23pub type Buffer<T, D, P> = BufferCore<T, Vec<D>, P>;
25
26impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
27
28 pub fn new(pusher: P) -> Self {
30 Self {
31 time: None,
32 buffer: Default::default(),
33 pusher,
34 }
35 }
36
37 pub fn session(&mut self, time: &T) -> Session<T, C, P> {
39 if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
40 self.time = Some(time.clone());
41 Session { buffer: self }
42 }
43 pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSessionCore<T, C, P> where T: Timestamp {
45 if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
46 self.time = Some(cap.time().clone());
47 AutoflushSessionCore {
48 buffer: self,
49 _capability: cap,
50 }
51 }
52
53 pub fn inner(&mut self) -> &mut P { &mut self.pusher }
57
58 pub fn cease(&mut self) {
60 self.flush();
61 self.pusher.push(&mut None);
62 }
63
64 fn flush(&mut self) {
66 if !self.buffer.is_empty() {
67 let time = self.time.as_ref().unwrap().clone();
68 Message::push_at(&mut self.buffer, time, &mut self.pusher);
69 }
70 }
71
72 fn give_container(&mut self, vector: &mut C) {
74 if !vector.is_empty() {
75 self.flush();
77
78 let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone();
79 Message::push_at(vector, time, &mut self.pusher);
80 }
81 }
82}
83
84impl<T, D: Data, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
85 #[inline]
87 fn give(&mut self, data: D) {
88 if self.buffer.capacity() < crate::container::buffer::default_capacity::<D>() {
89 let to_reserve = crate::container::buffer::default_capacity::<D>() - self.buffer.capacity();
90 self.buffer.reserve(to_reserve);
91 }
92 self.buffer.push(data);
93 if self.buffer.len() == self.buffer.capacity() {
95 self.flush();
96 }
97 }
98
99 fn give_vec(&mut self, vector: &mut Vec<D>) {
101 self.flush();
103
104 let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone();
105 Message::push_at(vector, time, &mut self.pusher);
106 }
107}
108
109
110pub struct Session<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
116 buffer: &'a mut BufferCore<T, C, P>,
117}
118
119impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
120 pub fn give_container(&mut self, container: &mut C) {
122 self.buffer.give_container(container)
123 }
124}
125
126impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
127 #[inline]
129 pub fn give(&mut self, data: D) {
130 self.buffer.give(data);
131 }
132 #[inline]
134 pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
135 for item in iter {
136 self.give(item);
137 }
138 }
139 #[inline]
145 pub fn give_vec(&mut self, message: &mut Vec<D>) {
146 if !message.is_empty() {
147 self.buffer.give_vec(message);
148 }
149 }
150}
151
152pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> where
154 T: Eq+Clone+'a, C: 'a {
155 buffer: &'a mut BufferCore<T, C, P>,
157 _capability: Capability<T>,
159}
160
161pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec<D>, P>;
163
164impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
165 #[inline]
167 pub fn give(&mut self, data: D) {
168 self.buffer.give(data);
169 }
170 #[inline]
172 pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
173 for item in iter {
174 self.give(item);
175 }
176 }
177 #[inline]
179 pub fn give_content(&mut self, message: &mut Vec<D>) {
180 if !message.is_empty() {
181 self.buffer.give_vec(message);
182 }
183 }
184}
185
186impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
187 fn drop(&mut self) {
188 self.buffer.cease();
189 }
190}