timely/dataflow/operators/
to_stream.rs

1//! Conversion to the `Stream` type from iterators.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6use crate::Container;
7
8use crate::dataflow::operators::generic::operator::source;
9use crate::dataflow::operators::CapabilitySet;
10use crate::dataflow::{StreamCore, Scope, Stream};
11use crate::progress::Timestamp;
12use crate::Data;
13
14/// Converts to a timely `Stream`.
15pub trait ToStream<T: Timestamp, D: Data> {
16    /// Converts to a timely `Stream`.
17    ///
18    /// # Examples
19    ///
20    /// ```
21    /// use timely::dataflow::operators::{ToStream, Capture};
22    /// use timely::dataflow::operators::capture::Extract;
23    ///
24    /// let (data1, data2) = timely::example(|scope| {
25    ///     let data1 = (0..3).to_stream(scope).capture();
26    ///     let data2 = vec![0,1,2].to_stream(scope).capture();
27    ///     (data1, data2)
28    /// });
29    ///
30    /// assert_eq!(data1.extract(), data2.extract());
31    /// ```
32    fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>;
33}
34
35impl<T: Timestamp, I: IntoIterator+'static> ToStream<T, I::Item> for I where I::Item: Data {
36    fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, I::Item> {
37
38        source(scope, "ToStream", |capability, info| {
39
40            // Acquire an activator, so that the operator can rescheduled itself.
41            let activator = scope.activator_for(&info.address[..]);
42
43            let mut iterator = self.into_iter().fuse();
44            let mut capability = Some(capability);
45
46            move |output| {
47
48                if let Some(element) = iterator.next() {
49                    let mut session = output.session(capability.as_ref().unwrap());
50                    session.give(element);
51                    let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
52                    for element in iterator.by_ref().take(n - 1) {
53                        session.give(element);
54                    }
55                    activator.activate();
56                }
57                else {
58                    capability = None;
59                }
60            }
61        })
62    }
63}
64
65/// Converts to a timely [StreamCore].
66pub trait ToStreamCore<T: Timestamp, C: Container> {
67    /// Converts to a timely [StreamCore].
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use timely::dataflow::operators::{ToStreamCore, Capture};
73    /// use timely::dataflow::operators::capture::Extract;
74    ///
75    /// let (data1, data2) = timely::example(|scope| {
76    ///     let data1 = Some((0..3).collect::<Vec<_>>()).to_stream_core(scope).capture();
77    ///     let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture();
78    ///     (data1, data2)
79    /// });
80    ///
81    /// assert_eq!(data1.extract(), data2.extract());
82    /// ```
83    fn to_stream_core<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C>;
84}
85
86impl<T: Timestamp, I: IntoIterator+'static> ToStreamCore<T, I::Item> for I where I::Item: Container {
87    fn to_stream_core<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, I::Item> {
88
89        source(scope, "ToStreamCore", |capability, info| {
90
91            // Acquire an activator, so that the operator can rescheduled itself.
92            let activator = scope.activator_for(&info.address[..]);
93
94            let mut iterator = self.into_iter().fuse();
95            let mut capability = Some(capability);
96
97            move |output| {
98
99                if let Some(mut element) = iterator.next() {
100                    let mut session = output.session(capability.as_ref().unwrap());
101                    session.give_container(&mut element);
102                    let n = 256;
103                    for mut element in iterator.by_ref().take(n - 1) {
104                        session.give_container(&mut element);
105                    }
106                    activator.activate();
107                }
108                else {
109                    capability = None;
110                }
111            }
112        })
113    }
114}
115
116/// Data and progress events of the native stream.
117pub enum Event<F: IntoIterator, D> {
118    /// Indicates that timestamps have advanced to frontier F
119    Progress(F),
120    /// Indicates that event D happened at time T
121    Message(F::Item, D),
122}
123
124/// Converts to a timely `Stream`.
125pub trait ToStreamAsync<T: Timestamp, D: Data> {
126    /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely
127    /// `Stream`](crate::dataflow::Stream).
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use futures_util::stream;
133    ///
134    /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync};
135    /// use timely::dataflow::operators::capture::Extract;
136    ///
137    /// let native_stream = stream::iter(vec![
138    ///     Event::Message(0, 0),
139    ///     Event::Message(0, 1),
140    ///     Event::Message(0, 2),
141    ///     Event::Progress(Some(0)),
142    /// ]);
143    ///
144    /// let (data1, data2) = timely::example(|scope| {
145    ///     let data1 = native_stream.to_stream(scope).capture();
146    ///     let data2 = vec![0,1,2].to_stream(scope).capture();
147    ///
148    ///     (data1, data2)
149    /// });
150    ///
151    /// assert_eq!(data1.extract(), data2.extract());
152    /// ```
153    fn to_stream<S: Scope<Timestamp = T>>(self, scope: &S) -> Stream<S, D>;
154}
155
156impl<T, D, F, I> ToStreamAsync<T, D> for I
157where
158    D: Data,
159    T: Timestamp,
160    F: IntoIterator<Item = T>,
161    I: futures_util::stream::Stream<Item = Event<F, D>> + Unpin + 'static,
162{
163    fn to_stream<S: Scope<Timestamp = T>>(mut self, scope: &S) -> Stream<S, D> {
164        source(scope, "ToStreamAsync", move |capability, info| {
165            let activator = Arc::new(scope.sync_activator_for(&info.address[..]));
166
167            let mut cap_set = CapabilitySet::from_elem(capability);
168
169            move |output| {
170                let waker = futures_util::task::waker_ref(&activator);
171                let mut context = Context::from_waker(&waker);
172
173                // Consume all the ready items of the source_stream and issue them to the operator
174                while let Poll::Ready(item) = Pin::new(&mut self).poll_next(&mut context) {
175                    match item {
176                        Some(Event::Progress(time)) => {
177                            cap_set.downgrade(time);
178                        }
179                        Some(Event::Message(time, data)) => {
180                            output.session(&cap_set.delayed(&time)).give(data);
181                        }
182                        None => {
183                            cap_set.downgrade(&[]);
184                            break;
185                        }
186                    }
187                }
188            }
189        })
190    }
191}