timely/dataflow/channels/pushers/
buffer.rs

1//! Buffering and session mechanisms to provide the appearance of record-at-a-time sending,
2//! with the performance of batched sends.
3
4use crate::dataflow::channels::{Bundle, BundleCore, Message};
5use crate::progress::Timestamp;
6use crate::dataflow::operators::Capability;
7use crate::communication::Push;
8use crate::{Container, Data};
9
10/// Buffers data sent at the same time, for efficient communication.
11///
12/// The `Buffer` type should be used by calling `session` with a time, which checks whether
13/// data must be flushed and creates a `Session` object which allows sending at the given time.
14#[derive(Debug)]
15pub struct BufferCore<T, D: Container, P: Push<BundleCore<T, D>>> {
16    /// the currently open time, if it is open
17    time: Option<T>,
18    /// a buffer for records, to send at self.time
19    buffer: D,
20    pusher: P,
21}
22
23/// A buffer specialized to vector-based containers.
24pub type Buffer<T, D, P> = BufferCore<T, Vec<D>, P>;
25
26impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
27
28    /// Creates a new `Buffer`.
29    pub fn new(pusher: P) -> Self {
30        Self {
31            time: None,
32            buffer: Default::default(),
33            pusher,
34        }
35    }
36
37    /// Returns a `Session`, which accepts data to send at the associated time
38    pub fn session(&mut self, time: &T) -> Session<T, C, P> {
39        if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
40        self.time = Some(time.clone());
41        Session { buffer: self }
42    }
43    /// Allocates a new `AutoflushSession` which flushes itself on drop.
44    pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSessionCore<T, C, P> where T: Timestamp {
45        if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
46        self.time = Some(cap.time().clone());
47        AutoflushSessionCore {
48            buffer: self,
49            _capability: cap,
50        }
51    }
52
53    /// Returns a reference to the inner `P: Push` type.
54    ///
55    /// This is currently used internally, and should not be used without some care.
56    pub fn inner(&mut self) -> &mut P { &mut self.pusher }
57
58    /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
59    pub fn cease(&mut self) {
60        self.flush();
61        self.pusher.push(&mut None);
62    }
63
64    /// moves the contents of
65    fn flush(&mut self) {
66        if !self.buffer.is_empty() {
67            let time = self.time.as_ref().unwrap().clone();
68            Message::push_at(&mut self.buffer, time, &mut self.pusher);
69        }
70    }
71
72    // Gives an entire container at a specific time.
73    fn give_container(&mut self, vector: &mut C) {
74        if !vector.is_empty() {
75            // flush to ensure fifo-ness
76            self.flush();
77
78            let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone();
79            Message::push_at(vector, time, &mut self.pusher);
80        }
81    }
82}
83
84impl<T, D: Data, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
85    // internal method for use by `Session`.
86    #[inline]
87    fn give(&mut self, data: D) {
88        if self.buffer.capacity() < crate::container::buffer::default_capacity::<D>() {
89            let to_reserve = crate::container::buffer::default_capacity::<D>() - self.buffer.capacity();
90            self.buffer.reserve(to_reserve);
91        }
92        self.buffer.push(data);
93        // assert!(self.buffer.capacity() == Message::<O::Data>::default_length());
94        if self.buffer.len() == self.buffer.capacity() {
95            self.flush();
96        }
97    }
98
99    // Gives an entire message at a specific time.
100    fn give_vec(&mut self, vector: &mut Vec<D>) {
101        // flush to ensure fifo-ness
102        self.flush();
103
104        let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone();
105        Message::push_at(vector, time, &mut self.pusher);
106    }
107}
108
109
110/// An output session for sending records at a specified time.
111///
112/// The `Session` struct provides the user-facing interface to an operator output, namely
113/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and
114/// avoids what would otherwise be a constant cost of checking timestamp equality.
115pub struct Session<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
116    buffer: &'a mut BufferCore<T, C, P>,
117}
118
119impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P>  where T: Eq+Clone+'a, C: 'a {
120    /// Provide a container at the time specified by the [Session].
121    pub fn give_container(&mut self, container: &mut C) {
122        self.buffer.give_container(container)
123    }
124}
125
126impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P>  where T: Eq+Clone+'a, D: 'a {
127    /// Provides one record at the time specified by the `Session`.
128    #[inline]
129    pub fn give(&mut self, data: D) {
130        self.buffer.give(data);
131    }
132    /// Provides an iterator of records at the time specified by the `Session`.
133    #[inline]
134    pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
135        for item in iter {
136            self.give(item);
137        }
138    }
139    /// Provides a fully formed `Content<D>` message for senders which can use this type.
140    ///
141    /// The `Content` type is the backing memory for communication in timely, and it can
142    /// often be more efficient to reuse this memory rather than have timely allocate
143    /// new backing memory.
144    #[inline]
145    pub fn give_vec(&mut self, message: &mut Vec<D>) {
146        if !message.is_empty() {
147            self.buffer.give_vec(message);
148        }
149    }
150}
151
152/// A session which will flush itself when dropped.
153pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> where
154    T: Eq+Clone+'a, C: 'a {
155    /// A reference to the underlying buffer.
156    buffer: &'a mut BufferCore<T, C, P>,
157    /// The capability being used to send the data.
158    _capability: Capability<T>,
159}
160
161/// Auto-flush session specialized to vector-based containers.
162pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec<D>, P>;
163
164impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
165    /// Transmits a single record.
166    #[inline]
167    pub fn give(&mut self, data: D) {
168        self.buffer.give(data);
169    }
170    /// Transmits records produced by an iterator.
171    #[inline]
172    pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
173        for item in iter {
174            self.give(item);
175        }
176    }
177    /// Transmits a pre-packed batch of data.
178    #[inline]
179    pub fn give_content(&mut self, message: &mut Vec<D>) {
180        if !message.is_empty() {
181            self.buffer.give_vec(message);
182        }
183    }
184}
185
186impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
187    fn drop(&mut self) {
188        self.buffer.cease();
189    }
190}