1use super::super::{
4 stream_executor::StreamExecutor,
5 mutiny_stream::MutinyStream,
6 types::FullDuplexMultiChannel,
7};
8use std::{
9 sync::Arc,
10 fmt::Debug,
11 time::Duration,
12 future::Future,
13 marker::PhantomData,
14};
15use indexmap::IndexMap;
16use futures::Stream;
17use tokio::sync::RwLock;
18
19
20pub struct Multi<ItemType: Debug + Sync + Send + 'static,
30 MultiChannelType: FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
31 const INSTRUMENTS: usize,
32 DerivedItemType: Debug + Sync + Send + 'static> {
33 pub multi_name: String,
34 pub channel: Arc<MultiChannelType>,
35 pub executor_infos: RwLock<IndexMap<String, ExecutorInfo>>,
36 _phantom: PhantomData<(ItemType, MultiChannelType, DerivedItemType)>,
37}
38
39impl<ItemType: Debug + Send + Sync + 'static,
40 MultiChannelType: FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
41 const INSTRUMENTS: usize,
42 DerivedItemType: Debug + Sync + Send + 'static>
43GenericMulti<INSTRUMENTS> for
44Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
45 const INSTRUMENTS: usize = INSTRUMENTS;
46 type ItemType = ItemType;
47 type MultiChannelType = MultiChannelType;
48 type DerivedItemType = DerivedItemType;
49 type MutinyStreamType = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>;
50
51 fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self {
52 Self::new(multi_name)
53 }
54 fn to_multi(self) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
55 self
56 }
57}
58
59impl<ItemType: Debug + Send + Sync + 'static,
60 MultiChannelType: FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
61 const INSTRUMENTS: usize,
62 DerivedItemType: Debug + Sync + Send + 'static>
63Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
64
65 pub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self {
75 let multi_name = multi_name.into();
76 Multi {
77 multi_name: multi_name.clone(),
78 channel: MultiChannelType::new(multi_name.clone()),
79 executor_infos: RwLock::new(IndexMap::new()),
80 _phantom: PhantomData,
81 }
82 }
83
84 pub fn name(&self) -> &str {
86 &self.multi_name
87 }
88
89 #[inline(always)]
90 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
91 pub fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()> {
92 self.channel.send(item)
93 }
94
95 #[inline(always)]
96 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
97 pub fn send_with<F: FnOnce(&mut ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
98 self.channel.send_with(setter)
99 }
100
101 #[inline(always)]
102 #[must_use = "The return type should be examined in case retrying is needed"]
103 pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool {
104 self.channel.send_derived(arc_item)
105 }
106
107 #[inline(always)]
108 #[must_use]
109 pub fn buffer_size(&self) -> u32 {
110 self.channel.buffer_size()
111 }
112
113 #[inline(always)]
114 #[must_use]
115 pub fn pending_items_count(&self) -> u32 {
116 self.channel.pending_items_count()
117 }
118
119 pub async fn spawn_executor<IntoString: Into<String>,
122 OutItemType: Send + Debug,
123 OutStreamType: Stream<Item=OutType> + Send + 'static,
124 OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
125 ErrVoidAsyncType: Future<Output=()> + Send + 'static,
126 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
127
128 (&self,
129 concurrency_limit: u32,
130 futures_timeout: Duration,
131 pipeline_name: IntoString,
132 pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
133 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
134 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
135
136 -> Result<(), Box<dyn std::error::Error>> {
137
138 let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
139 let out_stream = pipeline_builder(in_stream);
140 self.spawn_executor_from_stream(concurrency_limit, futures_timeout, pipeline_name, in_stream_id, out_stream, on_err_callback, on_close_callback).await
141 }
142
143 pub async fn spawn_oldies_executor<IntoString: Into<String>,
151 OutItemType: Send + Debug,
152 OldiesOutStreamType: Stream<Item=OldiesOutType> + Sync + Send + 'static,
153 NewiesOutStreamType: Stream<Item=NewiesOutType> + Sync + Send + 'static,
154 OldiesOutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
155 NewiesOutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
156 ErrVoidAsyncType: Future<Output=()> + Send + 'static,
157 OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
158 NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
159
160 (self: &Arc<Self>,
161 concurrency_limit: u32,
162 sequential_transition: bool,
163 futures_timeout: Duration,
164 oldies_pipeline_name: IntoString,
165 oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
166 oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
167 newies_pipeline_name: IntoString,
168 newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
169 newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
170 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static)
171
172 -> Result<(), Box<dyn std::error::Error>> {
173
174 let ((oldies_in_stream, oldies_in_stream_id),
175 (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
176
177 let cloned_self = Arc::clone(self);
178 let oldies_pipeline_name = oldies_pipeline_name.into();
179 let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
180 let on_err_callback_ref1 = Arc::new(on_err_callback);
181 let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref1);
182 let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
183 let newies_out_stream = newies_pipeline_builder(newies_in_stream);
184
185 match sequential_transition {
186 true => {
187 self.spawn_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
188 move |err| on_err_callback_ref1(err),
189 move |executor| {
190 let cloned_self = Arc::clone(&cloned_self);
191 let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref2);
192 let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
193 async move {
194 cloned_self.spawn_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
195 move |err| on_err_callback_ref2(err),
196 newies_on_close_callback).await
197 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
198 .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
199 oldies_on_close_callback(executor).await;
200 }
201 } ).await
202 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
203 },
204 false => {
205 self.spawn_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
206 move |err| on_err_callback_ref1(err),
207 oldies_on_close_callback).await
208 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
209 self.spawn_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
210 move |err| on_err_callback_ref2(err),
211 newies_on_close_callback).await
212 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
213 },
214 }
215 Ok(())
216 }
217
218 async fn spawn_executor_from_stream<IntoString: Into<String>,
220 OutItemType: Send + Debug,
221 OutStreamType: Stream<Item=OutType> + Send + 'static,
222 OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
223 ErrVoidAsyncType: Future<Output=()> + Send + 'static,
224 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
225
226 (&self,
227 concurrency_limit: u32,
228 futures_timeout: Duration,
229 pipeline_name: IntoString,
230 stream_id: u32,
231 pipelined_stream: OutStreamType,
232 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
233 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
234
235 -> Result<(), Box<dyn std::error::Error>> {
236
237 let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(format!("{}: {}", self.name(), pipeline_name.into()), futures_timeout);
238 self.add_executor(executor.clone(), stream_id).await?;
239 executor
240 .spawn_executor::<_, _, _, _>(
241 concurrency_limit,
242 on_err_callback,
243 on_close_callback,
244 pipelined_stream
245 );
246 Ok(())
247 }
248
249 pub async fn spawn_futures_executor<IntoString: Into<String>,
252 OutItemType: Send + Debug,
253 OutStreamType: Stream<Item=OutType> + Send + 'static,
254 OutType: Future<Output=OutItemType> + Send,
255 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
256
257 (&self,
258 concurrency_limit: u32,
259 futures_timeout: Duration,
260 pipeline_name: IntoString,
261 pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
262 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
263
264 -> Result<(), Box<dyn std::error::Error>> {
265
266 let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
267 let out_stream = pipeline_builder(in_stream);
268 self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, pipeline_name, in_stream_id, out_stream, on_close_callback).await
269 }
270
271 pub async fn spawn_futures_oldies_executor<IntoString: Into<String>,
279 OutItemType: Send + Debug,
280 OldiesOutStreamType: Stream<Item=OldiesOutType> + Sync + Send + 'static,
281 NewiesOutStreamType: Stream<Item=NewiesOutType> + Sync + Send + 'static,
282 OldiesOutType: Future<Output=OutItemType> + Send,
283 NewiesOutType: Future<Output=OutItemType> + Send,
284 OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
285 NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
286
287 (self: &Arc<Self>,
288 concurrency_limit: u32,
289 sequential_transition: bool,
290 futures_timeout: Duration,
291 oldies_pipeline_name: IntoString,
292 oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
293 oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
294 newies_pipeline_name: IntoString,
295 newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
296 newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static)
297
298 -> Result<(), Box<dyn std::error::Error>> {
299
300 let ((oldies_in_stream, oldies_in_stream_id),
301 (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
302
303 let cloned_self = Arc::clone(self);
304 let oldies_pipeline_name = oldies_pipeline_name.into();
305 let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
306 let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
307 let newies_out_stream = newies_pipeline_builder(newies_in_stream);
308
309 match sequential_transition {
310 true => {
311 self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
312 move |executor| {
313 let cloned_self = Arc::clone(&cloned_self);
314 let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
315 async move {
316 cloned_self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
317 newies_on_close_callback).await
318 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
319 .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
320 oldies_on_close_callback(executor).await;
321 }
322 } ).await
323 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
324 },
325 false => {
326 self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
327 oldies_on_close_callback).await
328 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
329 self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
330 newies_on_close_callback).await
331 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
332 },
333 }
334 Ok(())
335 }
336
337 async fn spawn_futures_executor_from_stream<IntoString: Into<String>,
339 OutItemType: Send + Debug,
340 OutStreamType: Stream<Item=OutType> + Send + 'static,
341 OutType: Future<Output=OutItemType> + Send,
342 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
343
344 (&self,
345 concurrency_limit: u32,
346 futures_timeout: Duration,
347 pipeline_name: IntoString,
348 stream_id: u32,
349 pipelined_stream: OutStreamType,
350 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
351
352 -> Result<(), Box<dyn std::error::Error>> {
353
354 let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(format!("{}: {}", self.name(), pipeline_name.into()), futures_timeout);
355 self.add_executor(executor.clone(), stream_id).await?;
356 executor
357 .spawn_futures_executor::<_, _, _>(
358 concurrency_limit,
359 on_close_callback,
360 pipelined_stream
361 );
362 Ok(())
363 }
364
365 pub async fn spawn_fallibles_executor<IntoString: Into<String>,
368 OutItemType: Send + Debug,
369 OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
370 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
371
372 (&self,
373 concurrency_limit: u32,
374 pipeline_name: IntoString,
375 pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
376 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
377 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
378
379 -> Result<(), Box<dyn std::error::Error>> {
380
381 let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
382 let out_stream = pipeline_builder(in_stream);
383 self.spawn_fallibles_executor_from_stream(concurrency_limit, pipeline_name, in_stream_id, out_stream, on_err_callback, on_close_callback).await
384 }
385
386 pub async fn spawn_fallibles_oldies_executor<IntoString: Into<String>,
394 OutItemType: Send + Debug,
395 OldiesOutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Sync + Send + 'static,
396 NewiesOutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Sync + Send + 'static,
397 OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
398 NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
399
400 (self: &Arc<Self>,
401 concurrency_limit: u32,
402 sequential_transition: bool,
403 oldies_pipeline_name: IntoString,
404 oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
405 oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
406 newies_pipeline_name: IntoString,
407 newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType + Send + Sync + 'static,
408 newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
409 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static)
410
411 -> Result<(), Box<dyn std::error::Error>> {
412
413 let ((oldies_in_stream, oldies_in_stream_id),
414 (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
415
416 let cloned_self = Arc::clone(self);
417 let oldies_pipeline_name = oldies_pipeline_name.into();
418 let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
419 let on_err_callback_ref1 = Arc::new(on_err_callback);
420 let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref1);
421 let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
422 let newies_out_stream = newies_pipeline_builder(newies_in_stream);
423
424 match sequential_transition {
425 true => {
426 self.spawn_fallibles_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
427 move |err| on_err_callback_ref1(err),
428 move |executor| {
429 let cloned_self = Arc::clone(&cloned_self);
430 let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref2);
431 let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
432 async move {
433 cloned_self.spawn_fallibles_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
434 move |err| on_err_callback_ref2(err),
435 newies_on_close_callback).await
436 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
437 .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
438 oldies_on_close_callback(executor).await;
439 }
440 } ).await
441 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
442 },
443 false => {
444 self.spawn_fallibles_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
445 move |err| on_err_callback_ref1(err),
446 oldies_on_close_callback).await
447 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
448 self.spawn_fallibles_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
449 move |err| on_err_callback_ref2(err),
450 newies_on_close_callback).await
451 .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
452 },
453 }
454 Ok(())
455 }
456
457 async fn spawn_fallibles_executor_from_stream<IntoString: Into<String>,
459 OutItemType: Send + Debug,
460 OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
461 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
462
463 (&self,
464 concurrency_limit: u32,
465 pipeline_name: IntoString,
466 stream_id: u32,
467 pipelined_stream: OutStreamType,
468 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
469 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
470
471 -> Result<(), Box<dyn std::error::Error>> {
472
473 let executor = StreamExecutor::<INSTRUMENTS>::new(format!("{}: {}", self.name(), pipeline_name.into()));
474 self.add_executor(executor.clone(), stream_id).await?;
475 executor
476 .spawn_fallibles_executor::<_, _>(
477 concurrency_limit,
478 on_err_callback,
479 on_close_callback,
480 pipelined_stream
481 );
482 Ok(())
483 }
484
485 pub async fn spawn_non_futures_non_fallible_executor<IntoString: Into<String>,
488 OutItemType: Send + Debug,
489 OutStreamType: Stream<Item=OutItemType> + Send + 'static,
490 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
491
492 (&self,
493 concurrency_limit: u32,
494 pipeline_name: IntoString,
495 pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
496 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
497
498 -> Result<(), Box<dyn std::error::Error>> {
499
500 let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
501 let out_stream = pipeline_builder(in_stream);
502 self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, pipeline_name, in_stream_id, out_stream, on_close_callback).await
503 }
504
505 pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString: Into<String>,
513 OldiesOutItemType: Send + Debug,
514 NewiesOutItemType: Send + Debug,
515 OldiesOutStreamType: Stream<Item=OldiesOutItemType> + Sync + Send + 'static,
516 NewiesOutStreamType: Stream<Item=NewiesOutItemType> + Sync + Send + 'static,
517 OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
518 NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
519
520 (self: &Arc<Self>,
521 concurrency_limit: u32,
522 sequential_transition: bool,
523 oldies_pipeline_name: IntoString,
524 oldies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
525 oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
526 newies_pipeline_name: IntoString,
527 newies_pipeline_builder: impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
528 newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> NewiesCloseVoidAsyncType + Send + Sync + 'static)
529
530 -> Result<(), Box<dyn std::error::Error>> {
531
532 let ((oldies_in_stream, oldies_in_stream_id),
533 (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
534
535 let cloned_self = Arc::clone(self);
536 let oldies_pipeline_name = oldies_pipeline_name.into();
537 let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
538 let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
539 let newies_out_stream = newies_pipeline_builder(newies_in_stream);
540
541 match sequential_transition {
542 true => {
543 self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
544 move |executor| {
545 let cloned_self = Arc::clone(&cloned_self);
546 let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
547 async move {
548 cloned_self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
549 newies_on_close_callback).await
550 .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `newies` executor: {:?}", err))
551 .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
552 oldies_on_close_callback(executor).await;
553 }
554 }).await
555 .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
556
557 },
558 false => {
559 self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
560 oldies_on_close_callback).await
561 .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
562 self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
563 newies_on_close_callback).await
564 .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `newies` executor: {:?}", err))?;
565 },
566 }
567 Ok(())
568 }
569
570 async fn spawn_non_futures_non_fallible_executor_from_stream<IntoString: Into<String>,
572 OutItemType: Send + Debug,
573 OutStreamType: Stream<Item=OutItemType> + Send + 'static,
574 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
575
576 (&self,
577 concurrency_limit: u32,
578 pipeline_name: IntoString,
579 stream_id: u32,
580 pipelined_stream: OutStreamType,
581 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
582
583 -> Result<(), Box<dyn std::error::Error>> {
584
585 let executor = StreamExecutor::<INSTRUMENTS>::new(format!("{}: {}", self.name(), pipeline_name.into()));
586 self.add_executor(executor.clone(), stream_id).await?;
587 executor
588 .spawn_non_futures_non_fallibles_executor::<_, _>(
589 concurrency_limit,
590 on_close_callback,
591 pipelined_stream
592 );
593 Ok(())
594 }
595
596 pub async fn close(&self, timeout: Duration) -> bool {
602 self.channel.gracefully_end_all_streams(timeout).await == 0
603 }
604
605 #[must_use = "futures do nothing unless you `.await` or poll them"]
616 pub async fn flush_and_cancel_executor<IntoString: Into<String>>
617 (&self,
618 pipeline_name: IntoString,
619 timeout: Duration) -> bool {
620
621 let executor_name = format!("{}: {}", self.multi_name, pipeline_name.into());
622 let mut executor_infos = self.executor_infos.write().await;
624 let executor_info = match executor_infos.swap_remove(&executor_name) {
625 Some(executor) => executor,
626 None => return false,
627 };
628 drop(executor_infos);
629
630 executor_info.executor_stats.report_scheduled_to_finish();
632 self.channel.gracefully_end_stream(executor_info.stream_id, timeout).await;
633 true
634 }
635
636 async fn add_executor(&self, stream_executor: Arc<dyn StreamExecutorStats + Send + Sync>, stream_id: u32) -> Result<(), Box<dyn std::error::Error>> {
638 let mut internal_multis = self.executor_infos.write().await;
639 if internal_multis.contains_key(stream_executor.executor_name()) {
640 Err(Box::from(format!("an executor with the same name is already present: '{}'", stream_executor.executor_name())))
641 } else {
642 internal_multis.insert(stream_executor.executor_name().to_owned(), ExecutorInfo { executor_stats: stream_executor, stream_id });
643 Ok(())
644 }
645 }
646
647}
648
649
650pub trait GenericMulti<const INSTRUMENTS: usize> {
658 const INSTRUMENTS: usize;
660 type ItemType: Debug + Sync + Send + 'static;
662 type DerivedItemType: Debug + Sync + Send + 'static;
664 type MultiChannelType: FullDuplexMultiChannel<ItemType=Self::ItemType, DerivedItemType=Self::DerivedItemType> + Sync + Send;
666 type MutinyStreamType;
669
670 fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self;
672 fn to_multi(self) -> Multi<Self::ItemType, Self::MultiChannelType, INSTRUMENTS, Self::DerivedItemType>;
673}
674
675
676#[macro_export]
678macro_rules! multis_close_async {
679 ($timeout: expr,
680 $($multi: expr),+) => {
681 {
682 tokio::join!( $( $multi.channel.flush($timeout), )+ );
683 tokio::join!( $( $multi.channel.gracefully_end_all_streams($timeout), )+ );
684 }
685 }
686}
687pub use multis_close_async;
688use crate::stream_executor::StreamExecutorStats;
689pub use crate::types::ChannelCommon;
690
691pub struct ExecutorInfo {
693 pub executor_stats: Arc<dyn StreamExecutorStats + Send + Sync>,
694 pub stream_id: u32,
695}