noir_compute/
stream.rs

1use parking_lot::Mutex;
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use crate::block::{BatchMode, Block, NextStrategy, Scheduling};
7use crate::environment::StreamContextInner;
8use crate::operator::end::End;
9use crate::operator::iteration::IterationStateLock;
10use crate::operator::source::Source;
11use crate::operator::window::WindowDescription;
12use crate::operator::DataKey;
13use crate::operator::Start;
14use crate::operator::{Data, ExchangeData, KeyerFn, Operator};
15use crate::scheduler::BlockId;
16
17/// A Stream represents a chain of operators that work on a flow of data. The type of the elements
18/// that is leaving the stream is `Out`.
19///
20/// Internally a stream is composed by a chain of blocks, each of which can be seen as a simpler
21/// stream with input and output types.
22///
23/// A block is internally composed of a chain of operators, nested like the `Iterator` from `std`.
24/// The type of the chain inside the block is `OperatorChain` and it's required as type argument of
25/// the stream. This type only represents the chain inside the last block of the stream, not all the
26/// blocks inside of it.
27pub struct Stream<Op>
28where
29    Op: Operator,
30{
31    /// The last block inside the stream.
32    pub(crate) block: Block<Op>,
33    /// A reference to the environment this stream lives in.
34    pub(crate) ctx: Arc<Mutex<StreamContextInner>>,
35}
36
37pub trait KeyedItem {
38    type Key: DataKey;
39    type Value;
40    fn key(&self) -> &Self::Key;
41    fn value(&self) -> &Self::Value;
42    fn into_kv(self) -> (Self::Key, Self::Value);
43}
44
45impl<K: DataKey, V> KeyedItem for (K, V) {
46    type Key = K;
47    type Value = V;
48    fn key(&self) -> &Self::Key {
49        &self.0
50    }
51    fn value(&self) -> &Self::Value {
52        &self.1
53    }
54    fn into_kv(self) -> (Self::Key, Self::Value) {
55        self
56    }
57}
58
59/// A [`KeyedStream`] is like a set of [`Stream`]s, each of which partitioned by some `Key`. Internally
60/// it's just a stream whose elements are `(K, V)` pairs and the operators behave following the
61/// [`KeyedStream`] semantics.
62///
63/// The type of the `Key` must be a valid key inside an hashmap.
64pub struct KeyedStream<OperatorChain>(pub Stream<OperatorChain>)
65where
66    OperatorChain: Operator,
67    OperatorChain::Out: KeyedItem;
68
69/// A [`WindowedStream`] is a data stream partitioned by `Key`, where elements of each partition
70/// are divided in groups called windows.
71/// Each element can be assigned to one or multiple windows.
72///
73/// Windows are handled independently for each partition of the stream.
74/// Each partition may be processed in parallel.
75///
76/// The windowing logic is implemented through 3 traits:
77/// - A [`WindowDescription`] contains the parameters and logic that characterize the windowing strategy,
78///   when given a `WindowAccumulator` it instantiates a `WindowManager`.
79/// - A [`WindowManger`](crate::operator::window::WindowManager) is the struct responsible for creating
80///   the windows and forwarding the input elements to the correct window which will should pass it to
81///   its `WindowAccumulator`.
82/// - A [`WindowAccumulator`](crate::operator::window::WindowAccumulator) contains the logic that should
83///   be applied to the elements of each window.
84///
85/// There are a set of provided window descriptions with their respective managers:
86///  - [`EventTimeWindow`][crate::operator::window::EventTimeWindow]
87///  - [`ProcessingTimeWindow`][crate::operator::window::ProcessingTimeWindow]
88///  - [`CountWindow`][crate::operator::window::CountWindow]
89///  - [`SessionWindow`][crate::operator::window::SessionWindow]
90///  - [`TransactionWindow`][crate::operator::window::TransactionWindow]
91///
92pub struct WindowedStream<Op, O: Data, WinDescr>
93where
94    Op: Operator,
95    Op::Out: KeyedItem,
96    WinDescr: WindowDescription<<Op::Out as KeyedItem>::Value>,
97{
98    pub(crate) inner: KeyedStream<Op>,
99    pub(crate) descr: WinDescr,
100    pub(crate) _win_out: PhantomData<O>,
101}
102
103impl<Op> Stream<Op>
104where
105    Op: Operator,
106{
107    pub(crate) fn new(ctx: Arc<Mutex<StreamContextInner>>, block: Block<Op>) -> Self {
108        Self { block, ctx }
109    }
110
111    /// Add a new operator to the current chain inside the stream. This consumes the stream and
112    /// returns a new one with the operator added.
113    ///
114    /// `get_operator` is a function that is given the previous chain of operators and should return
115    /// the new chain of operators. The new chain cannot be simply passed as argument since it is
116    /// required to do a partial move of the `InnerBlock` structure.
117    ///
118    /// **Note**: this is an advanced function that manipulates the block structure. Probably it is
119    /// not what you are looking for.
120    pub fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> Stream<Op2>
121    where
122        Op2: Operator,
123        GetOp: FnOnce(Op) -> Op2,
124    {
125        Stream::new(self.ctx, self.block.add_operator(get_operator))
126    }
127
128    /// Add a new block to the stream, closing and registering the previous one. The new block is
129    /// connected to the previous one.
130    ///
131    /// `get_end_operator` is used to extend the operator chain of the old block with the last
132    /// operator (e.g. `operator::End`, `operator::GroupByEndOperator`). The end operator must
133    /// be an `Operator<()>`.
134    ///
135    /// The new block is initialized with a `Start`.
136    pub(crate) fn split_block<GetEndOp, OpEnd, IndexFn>(
137        self,
138        get_end_operator: GetEndOp,
139        next_strategy: NextStrategy<Op::Out, IndexFn>,
140    ) -> Stream<impl Operator<Out = Op::Out>>
141    where
142        IndexFn: KeyerFn<u64, Op::Out>,
143        Op::Out: ExchangeData,
144        OpEnd: Operator<Out = ()> + 'static,
145        GetEndOp: FnOnce(Op, NextStrategy<Op::Out, IndexFn>, BatchMode) -> OpEnd,
146    {
147        let Stream { block, ctx } = self;
148        // Clone parameters for new block
149        let batch_mode = block.batch_mode;
150        let iteration_ctx = block.iteration_ctx.clone();
151        // Add end operator
152        let mut block =
153            block.add_operator(|prev| get_end_operator(prev, next_strategy.clone(), batch_mode));
154        block.is_only_one_strategy = matches!(next_strategy, NextStrategy::OnlyOne);
155
156        // Close old block
157        let mut env_lock = ctx.lock();
158        let prev_id = env_lock.close_block(block);
159        // Create new block
160        let source = Start::single(prev_id, iteration_ctx.last().cloned());
161        let new_block = env_lock.new_block(source, batch_mode, iteration_ctx);
162        // Connect blocks
163        env_lock.connect_blocks::<Op::Out>(prev_id, new_block.id);
164
165        drop(env_lock);
166        Stream::new(ctx, new_block)
167    }
168
169    /// Similar to `.add_block`, but with 2 incoming blocks.
170    ///
171    /// This will add a new Y connection between two blocks. The two incoming blocks will be closed
172    /// and a new one will be created with the 2 previous ones coming into it.
173    ///
174    /// This won't add any network shuffle, hence the next strategy will be `OnlyOne`. For this
175    /// reason the 2 input streams must have the same parallelism, otherwise this function panics.
176    ///
177    /// The start operator of the new block must support multiple inputs: the provided function
178    /// will be called with the ids of the 2 input blocks and should return the new start operator
179    /// of the new block.
180    pub(crate) fn binary_connection<Op2, S, Fs, F1, F2>(
181        self,
182        oth: Stream<Op2>,
183        get_start_operator: Fs,
184        next_strategy1: NextStrategy<Op::Out, F1>,
185        next_strategy2: NextStrategy<Op2::Out, F2>,
186    ) -> Stream<S>
187    where
188        Op: 'static,
189        Op2: Operator + 'static,
190        Op::Out: ExchangeData,
191        Op2::Out: ExchangeData,
192        F1: KeyerFn<u64, Op::Out>,
193        F2: KeyerFn<u64, Op2::Out>,
194        S: Operator + Source,
195        Fs: FnOnce(BlockId, BlockId, bool, bool, Option<Arc<IterationStateLock>>) -> S,
196    {
197        let Stream { block: b1, ctx } = self;
198        let Stream { block: b2, .. } = oth;
199
200        let batch_mode = b1.batch_mode;
201        let is_one_1 = matches!(next_strategy1, NextStrategy::OnlyOne);
202        let is_one_2 = matches!(next_strategy2, NextStrategy::OnlyOne);
203        let sched_1 = b1.scheduling.clone();
204        let sched_2 = b2.scheduling.clone();
205        if is_one_1 && is_one_2 && sched_1.replication != sched_2.replication {
206            panic!(
207                "The parallelism of the 2 blocks coming inside a Y connection must be equal. \
208                On the left ({}) is {:?}, on the right ({}) is {:?}",
209                b1, sched_1.replication, b2, sched_2.replication
210            );
211        }
212
213        let iter_ctx_1 = b1.iteration_ctx();
214        let iter_ctx_2 = b2.iteration_ctx();
215        let (iteration_ctx, left_cache, right_cache) = if iter_ctx_1 == iter_ctx_2 {
216            (b1.iteration_ctx.clone(), false, false)
217        } else {
218            if !iter_ctx_1.is_empty() && !iter_ctx_2.is_empty() {
219                panic!("Side inputs are supported only if one of the streams is coming from outside any iteration");
220            }
221            if iter_ctx_1.is_empty() {
222                // self is the side input, cache it
223                (b2.iteration_ctx.clone(), true, false)
224            } else {
225                // oth is the side input, cache it
226                (b1.iteration_ctx.clone(), false, true)
227            }
228        };
229
230        // close previous blocks
231
232        let mut b1 = b1.add_operator(|prev| End::new(prev, next_strategy1, batch_mode));
233        let mut b2 = b2.add_operator(|prev| End::new(prev, next_strategy2, batch_mode));
234        b1.is_only_one_strategy = is_one_1;
235        b2.is_only_one_strategy = is_one_2;
236
237        let mut env_lock = ctx.lock();
238        let id_1 = b1.id;
239        let id_2 = b2.id;
240
241        env_lock.close_block(b1);
242        env_lock.close_block(b2);
243
244        let source = get_start_operator(
245            id_1,
246            id_2,
247            left_cache,
248            right_cache,
249            iteration_ctx.last().cloned(),
250        );
251
252        let mut new_block = env_lock.new_block(source, batch_mode, iteration_ctx);
253        let id_new = new_block.id;
254
255        env_lock.connect_blocks::<Op::Out>(id_1, id_new);
256        env_lock.connect_blocks::<Op2::Out>(id_2, id_new);
257
258        drop(env_lock);
259
260        // make sure the new block has the same parallelism of the previous one with OnlyOne
261        // strategy
262        new_block.scheduling = match (is_one_1, is_one_2) {
263            (true, _) => sched_1,
264            (_, true) => sched_2,
265            _ => Scheduling::default(),
266        };
267
268        Stream::new(ctx, new_block)
269    }
270
271    /// Clone the given block, taking care of connecting the new block to the same previous blocks
272    /// of the original one.
273    pub(crate) fn clone(&mut self) -> Self {
274        let new_block = self.ctx.lock().clone_block(&self.block);
275        Stream::new(self.ctx.clone(), new_block)
276    }
277
278    /// Like `add_block` but without creating a new block. Therefore this closes the current stream
279    /// and just add the last block to the scheduler.
280    pub(crate) fn finalize_block(self)
281    where
282        Op: 'static,
283        Op::Out: Send,
284    {
285        let mut env = self.ctx.lock();
286        env.scheduler_mut().schedule_block(self.block);
287    }
288}
289
290impl<OperatorChain> KeyedStream<OperatorChain>
291where
292    OperatorChain: Operator + 'static,
293    OperatorChain::Out: KeyedItem,
294{
295    pub(crate) fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> KeyedStream<Op2>
296    where
297        Op2: Operator,
298        GetOp: FnOnce(OperatorChain) -> Op2,
299        Op2::Out: KeyedItem<Key = <OperatorChain::Out as KeyedItem>::Key>,
300    {
301        KeyedStream(self.0.add_operator(get_operator))
302    }
303}
304
305impl<OperatorChain> Stream<OperatorChain>
306where
307    OperatorChain: Operator,
308    OperatorChain::Out: KeyedItem,
309{
310    /// TODO DOCS
311    pub fn to_keyed(self) -> KeyedStream<OperatorChain> {
312        KeyedStream(self)
313    }
314}