1use parking_lot::Mutex;
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use crate::block::{BatchMode, Block, NextStrategy, Scheduling};
7use crate::environment::StreamContextInner;
8use crate::operator::end::End;
9use crate::operator::iteration::IterationStateLock;
10use crate::operator::source::Source;
11use crate::operator::window::WindowDescription;
12use crate::operator::DataKey;
13use crate::operator::Start;
14use crate::operator::{Data, ExchangeData, KeyerFn, Operator};
15use crate::scheduler::BlockId;
16
17pub struct Stream<Op>
28where
29 Op: Operator,
30{
31 pub(crate) block: Block<Op>,
33 pub(crate) ctx: Arc<Mutex<StreamContextInner>>,
35}
36
37pub trait KeyedItem {
38 type Key: DataKey;
39 type Value;
40 fn key(&self) -> &Self::Key;
41 fn value(&self) -> &Self::Value;
42 fn into_kv(self) -> (Self::Key, Self::Value);
43}
44
45impl<K: DataKey, V> KeyedItem for (K, V) {
46 type Key = K;
47 type Value = V;
48 fn key(&self) -> &Self::Key {
49 &self.0
50 }
51 fn value(&self) -> &Self::Value {
52 &self.1
53 }
54 fn into_kv(self) -> (Self::Key, Self::Value) {
55 self
56 }
57}
58
59pub struct KeyedStream<OperatorChain>(pub Stream<OperatorChain>)
65where
66 OperatorChain: Operator,
67 OperatorChain::Out: KeyedItem;
68
69pub struct WindowedStream<Op, O: Data, WinDescr>
93where
94 Op: Operator,
95 Op::Out: KeyedItem,
96 WinDescr: WindowDescription<<Op::Out as KeyedItem>::Value>,
97{
98 pub(crate) inner: KeyedStream<Op>,
99 pub(crate) descr: WinDescr,
100 pub(crate) _win_out: PhantomData<O>,
101}
102
103impl<Op> Stream<Op>
104where
105 Op: Operator,
106{
107 pub(crate) fn new(ctx: Arc<Mutex<StreamContextInner>>, block: Block<Op>) -> Self {
108 Self { block, ctx }
109 }
110
111 pub fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> Stream<Op2>
121 where
122 Op2: Operator,
123 GetOp: FnOnce(Op) -> Op2,
124 {
125 Stream::new(self.ctx, self.block.add_operator(get_operator))
126 }
127
128 pub(crate) fn split_block<GetEndOp, OpEnd, IndexFn>(
137 self,
138 get_end_operator: GetEndOp,
139 next_strategy: NextStrategy<Op::Out, IndexFn>,
140 ) -> Stream<impl Operator<Out = Op::Out>>
141 where
142 IndexFn: KeyerFn<u64, Op::Out>,
143 Op::Out: ExchangeData,
144 OpEnd: Operator<Out = ()> + 'static,
145 GetEndOp: FnOnce(Op, NextStrategy<Op::Out, IndexFn>, BatchMode) -> OpEnd,
146 {
147 let Stream { block, ctx } = self;
148 let batch_mode = block.batch_mode;
150 let iteration_ctx = block.iteration_ctx.clone();
151 let mut block =
153 block.add_operator(|prev| get_end_operator(prev, next_strategy.clone(), batch_mode));
154 block.is_only_one_strategy = matches!(next_strategy, NextStrategy::OnlyOne);
155
156 let mut env_lock = ctx.lock();
158 let prev_id = env_lock.close_block(block);
159 let source = Start::single(prev_id, iteration_ctx.last().cloned());
161 let new_block = env_lock.new_block(source, batch_mode, iteration_ctx);
162 env_lock.connect_blocks::<Op::Out>(prev_id, new_block.id);
164
165 drop(env_lock);
166 Stream::new(ctx, new_block)
167 }
168
169 pub(crate) fn binary_connection<Op2, S, Fs, F1, F2>(
181 self,
182 oth: Stream<Op2>,
183 get_start_operator: Fs,
184 next_strategy1: NextStrategy<Op::Out, F1>,
185 next_strategy2: NextStrategy<Op2::Out, F2>,
186 ) -> Stream<S>
187 where
188 Op: 'static,
189 Op2: Operator + 'static,
190 Op::Out: ExchangeData,
191 Op2::Out: ExchangeData,
192 F1: KeyerFn<u64, Op::Out>,
193 F2: KeyerFn<u64, Op2::Out>,
194 S: Operator + Source,
195 Fs: FnOnce(BlockId, BlockId, bool, bool, Option<Arc<IterationStateLock>>) -> S,
196 {
197 let Stream { block: b1, ctx } = self;
198 let Stream { block: b2, .. } = oth;
199
200 let batch_mode = b1.batch_mode;
201 let is_one_1 = matches!(next_strategy1, NextStrategy::OnlyOne);
202 let is_one_2 = matches!(next_strategy2, NextStrategy::OnlyOne);
203 let sched_1 = b1.scheduling.clone();
204 let sched_2 = b2.scheduling.clone();
205 if is_one_1 && is_one_2 && sched_1.replication != sched_2.replication {
206 panic!(
207 "The parallelism of the 2 blocks coming inside a Y connection must be equal. \
208 On the left ({}) is {:?}, on the right ({}) is {:?}",
209 b1, sched_1.replication, b2, sched_2.replication
210 );
211 }
212
213 let iter_ctx_1 = b1.iteration_ctx();
214 let iter_ctx_2 = b2.iteration_ctx();
215 let (iteration_ctx, left_cache, right_cache) = if iter_ctx_1 == iter_ctx_2 {
216 (b1.iteration_ctx.clone(), false, false)
217 } else {
218 if !iter_ctx_1.is_empty() && !iter_ctx_2.is_empty() {
219 panic!("Side inputs are supported only if one of the streams is coming from outside any iteration");
220 }
221 if iter_ctx_1.is_empty() {
222 (b2.iteration_ctx.clone(), true, false)
224 } else {
225 (b1.iteration_ctx.clone(), false, true)
227 }
228 };
229
230 let mut b1 = b1.add_operator(|prev| End::new(prev, next_strategy1, batch_mode));
233 let mut b2 = b2.add_operator(|prev| End::new(prev, next_strategy2, batch_mode));
234 b1.is_only_one_strategy = is_one_1;
235 b2.is_only_one_strategy = is_one_2;
236
237 let mut env_lock = ctx.lock();
238 let id_1 = b1.id;
239 let id_2 = b2.id;
240
241 env_lock.close_block(b1);
242 env_lock.close_block(b2);
243
244 let source = get_start_operator(
245 id_1,
246 id_2,
247 left_cache,
248 right_cache,
249 iteration_ctx.last().cloned(),
250 );
251
252 let mut new_block = env_lock.new_block(source, batch_mode, iteration_ctx);
253 let id_new = new_block.id;
254
255 env_lock.connect_blocks::<Op::Out>(id_1, id_new);
256 env_lock.connect_blocks::<Op2::Out>(id_2, id_new);
257
258 drop(env_lock);
259
260 new_block.scheduling = match (is_one_1, is_one_2) {
263 (true, _) => sched_1,
264 (_, true) => sched_2,
265 _ => Scheduling::default(),
266 };
267
268 Stream::new(ctx, new_block)
269 }
270
271 pub(crate) fn clone(&mut self) -> Self {
274 let new_block = self.ctx.lock().clone_block(&self.block);
275 Stream::new(self.ctx.clone(), new_block)
276 }
277
278 pub(crate) fn finalize_block(self)
281 where
282 Op: 'static,
283 Op::Out: Send,
284 {
285 let mut env = self.ctx.lock();
286 env.scheduler_mut().schedule_block(self.block);
287 }
288}
289
290impl<OperatorChain> KeyedStream<OperatorChain>
291where
292 OperatorChain: Operator + 'static,
293 OperatorChain::Out: KeyedItem,
294{
295 pub(crate) fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> KeyedStream<Op2>
296 where
297 Op2: Operator,
298 GetOp: FnOnce(OperatorChain) -> Op2,
299 Op2::Out: KeyedItem<Key = <OperatorChain::Out as KeyedItem>::Key>,
300 {
301 KeyedStream(self.0.add_operator(get_operator))
302 }
303}
304
305impl<OperatorChain> Stream<OperatorChain>
306where
307 OperatorChain: Operator,
308 OperatorChain::Out: KeyedItem,
309{
310 pub fn to_keyed(self) -> KeyedStream<OperatorChain> {
312 KeyedStream(self)
313 }
314}