timely/dataflow/operators/to_stream.rs
1//! Conversion to the `Stream` type from iterators.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6use crate::Container;
7
8use crate::dataflow::operators::generic::operator::source;
9use crate::dataflow::operators::CapabilitySet;
10use crate::dataflow::{StreamCore, Scope, Stream};
11use crate::progress::Timestamp;
12use crate::Data;
13
14/// Converts to a timely `Stream`.
15pub trait ToStream<T: Timestamp, D: Data> {
16 /// Converts to a timely `Stream`.
17 ///
18 /// # Examples
19 ///
20 /// ```
21 /// use timely::dataflow::operators::{ToStream, Capture};
22 /// use timely::dataflow::operators::capture::Extract;
23 ///
24 /// let (data1, data2) = timely::example(|scope| {
25 /// let data1 = (0..3).to_stream(scope).capture();
26 /// let data2 = vec![0,1,2].to_stream(scope).capture();
27 /// (data1, data2)
28 /// });
29 ///
30 /// assert_eq!(data1.extract(), data2.extract());
31 /// ```
32 fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>;
33}
34
35impl<T: Timestamp, I: IntoIterator+'static> ToStream<T, I::Item> for I where I::Item: Data {
36 fn to_stream<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, I::Item> {
37
38 source(scope, "ToStream", |capability, info| {
39
40 // Acquire an activator, so that the operator can rescheduled itself.
41 let activator = scope.activator_for(&info.address[..]);
42
43 let mut iterator = self.into_iter().fuse();
44 let mut capability = Some(capability);
45
46 move |output| {
47
48 if let Some(element) = iterator.next() {
49 let mut session = output.session(capability.as_ref().unwrap());
50 session.give(element);
51 let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
52 for element in iterator.by_ref().take(n - 1) {
53 session.give(element);
54 }
55 activator.activate();
56 }
57 else {
58 capability = None;
59 }
60 }
61 })
62 }
63}
64
65/// Converts to a timely [StreamCore].
66pub trait ToStreamCore<T: Timestamp, C: Container> {
67 /// Converts to a timely [StreamCore].
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// use timely::dataflow::operators::{ToStreamCore, Capture};
73 /// use timely::dataflow::operators::capture::Extract;
74 ///
75 /// let (data1, data2) = timely::example(|scope| {
76 /// let data1 = Some((0..3).collect::<Vec<_>>()).to_stream_core(scope).capture();
77 /// let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture();
78 /// (data1, data2)
79 /// });
80 ///
81 /// assert_eq!(data1.extract(), data2.extract());
82 /// ```
83 fn to_stream_core<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C>;
84}
85
86impl<T: Timestamp, I: IntoIterator+'static> ToStreamCore<T, I::Item> for I where I::Item: Container {
87 fn to_stream_core<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, I::Item> {
88
89 source(scope, "ToStreamCore", |capability, info| {
90
91 // Acquire an activator, so that the operator can rescheduled itself.
92 let activator = scope.activator_for(&info.address[..]);
93
94 let mut iterator = self.into_iter().fuse();
95 let mut capability = Some(capability);
96
97 move |output| {
98
99 if let Some(mut element) = iterator.next() {
100 let mut session = output.session(capability.as_ref().unwrap());
101 session.give_container(&mut element);
102 let n = 256;
103 for mut element in iterator.by_ref().take(n - 1) {
104 session.give_container(&mut element);
105 }
106 activator.activate();
107 }
108 else {
109 capability = None;
110 }
111 }
112 })
113 }
114}
115
116/// Data and progress events of the native stream.
117pub enum Event<F: IntoIterator, D> {
118 /// Indicates that timestamps have advanced to frontier F
119 Progress(F),
120 /// Indicates that event D happened at time T
121 Message(F::Item, D),
122}
123
124/// Converts to a timely `Stream`.
125pub trait ToStreamAsync<T: Timestamp, D: Data> {
126 /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely
127 /// `Stream`](crate::dataflow::Stream).
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use futures_util::stream;
133 ///
134 /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync};
135 /// use timely::dataflow::operators::capture::Extract;
136 ///
137 /// let native_stream = stream::iter(vec![
138 /// Event::Message(0, 0),
139 /// Event::Message(0, 1),
140 /// Event::Message(0, 2),
141 /// Event::Progress(Some(0)),
142 /// ]);
143 ///
144 /// let (data1, data2) = timely::example(|scope| {
145 /// let data1 = native_stream.to_stream(scope).capture();
146 /// let data2 = vec![0,1,2].to_stream(scope).capture();
147 ///
148 /// (data1, data2)
149 /// });
150 ///
151 /// assert_eq!(data1.extract(), data2.extract());
152 /// ```
153 fn to_stream<S: Scope<Timestamp = T>>(self, scope: &S) -> Stream<S, D>;
154}
155
156impl<T, D, F, I> ToStreamAsync<T, D> for I
157where
158 D: Data,
159 T: Timestamp,
160 F: IntoIterator<Item = T>,
161 I: futures_util::stream::Stream<Item = Event<F, D>> + Unpin + 'static,
162{
163 fn to_stream<S: Scope<Timestamp = T>>(mut self, scope: &S) -> Stream<S, D> {
164 source(scope, "ToStreamAsync", move |capability, info| {
165 let activator = Arc::new(scope.sync_activator_for(&info.address[..]));
166
167 let mut cap_set = CapabilitySet::from_elem(capability);
168
169 move |output| {
170 let waker = futures_util::task::waker_ref(&activator);
171 let mut context = Context::from_waker(&waker);
172
173 // Consume all the ready items of the source_stream and issue them to the operator
174 while let Poll::Ready(item) = Pin::new(&mut self).poll_next(&mut context) {
175 match item {
176 Some(Event::Progress(time)) => {
177 cap_set.downgrade(time);
178 }
179 Some(Event::Message(time, data)) => {
180 output.session(&cap_set.delayed(&time)).give(data);
181 }
182 None => {
183 cap_set.downgrade(&[]);
184 break;
185 }
186 }
187 }
188 }
189 })
190 }
191}