1use crate::stream_executor::StreamExecutorStats;
4
5use super::super::{
6 stream_executor::StreamExecutor,
7 mutiny_stream::MutinyStream,
8 types::FullDuplexUniChannel,
9};
10use std::{fmt::Debug, time::Duration, sync::{Arc, atomic::{AtomicU32, Ordering::Relaxed}}};
11use std::future::Future;
12use std::marker::PhantomData;
13use futures::future::BoxFuture;
14use futures::Stream;
15use keen_retry::RetryConsumerResult;
16use tokio::sync::Mutex;
17
18
19pub struct Uni<ItemType: Send + Sync + Debug + 'static,
22 UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
23 const INSTRUMENTS: usize,
24 DerivedItemType: Send + Sync + Debug + 'static = ItemType> {
25 pub channel: Arc<UniChannelType>,
26 pub stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>,
27 pub finished_executors_count: AtomicU32,
28 _phantom: PhantomData<(&'static ItemType, &'static DerivedItemType)>,
29}
30
31impl<ItemType: Send + Sync + Debug + 'static,
32 UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
33 const INSTRUMENTS: usize,
34 DerivedItemType: Send + Sync + Debug + 'static>
35GenericUni for
36Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
37 const INSTRUMENTS: usize = INSTRUMENTS;
38 type ItemType = ItemType;
39 type UniChannelType = UniChannelType;
40 type DerivedItemType = DerivedItemType;
41 type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>;
42
43
44 fn new<IntoString:Into<String> >(uni_name: IntoString) -> Self {
45 Uni {
46 channel: UniChannelType::new(uni_name),
47 stream_executors: vec![],
48 finished_executors_count: AtomicU32::new(0),
49 _phantom: PhantomData,
50 }
51 }
52
53 #[inline(always)]
54 fn send(&self, item:Self::ItemType) -> keen_retry::RetryConsumerResult<(),Self::ItemType,()> {
55 self.channel.send(item)
56 }
57
58 #[inline(always)]
59 fn send_with<F:FnOnce(&mut Self::ItemType)>(&self, setter:F) -> keen_retry::RetryConsumerResult<(),F,()> {
60 self.channel.send_with(setter)
61 }
62
63 #[inline(always)]
64 fn send_with_async<F: FnOnce(&'static mut Self::ItemType) -> Fut + Send,
65 Fut: Future<Output=&'static mut Self::ItemType> + Send>
66 (&'static self,
67 setter: F)
68 -> impl Future<Output=RetryConsumerResult<(), F, ()>> + Send {
69 self.channel.send_with_async(setter)
70 }
71
72 #[inline(always)]
73 fn reserve_slot(&self) -> Option<&mut Self::ItemType> {
74 self.channel.reserve_slot()
75 }
76
77 #[inline(always)]
78 fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool {
79 self.channel.try_send_reserved(reserved_slot)
80 }
81
82 #[inline(always)]
83 fn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool {
84 self.channel.try_cancel_slot_reserve(reserved_slot)
85 }
86
87 fn consumer_stream(self) -> (Arc<Self> ,Vec<MutinyStream<'static,Self::ItemType,Self::UniChannelType,Self::DerivedItemType> >) {
88 let streams = self.consumer_stream_internal();
89 let arc_self = Arc::new(self);
90 (arc_self, streams)
91 }
92
93 #[inline(always)]
94 fn pending_items_count(&self) -> u32 {
95 self.channel.pending_items_count()
96 }
97
98 #[inline(always)]
99 fn buffer_size(&self) -> u32 {
100 self.channel.buffer_size()
101 }
102
103 async fn flush(&self, duration: Duration) -> u32 {
104 self.channel.flush(duration).await
105 }
106
107 async fn close(&self, timeout: Duration) -> bool {
108 self.channel.gracefully_end_all_streams(timeout).await == 0
109 }
110
111 fn spawn_executors<OutItemType: Send + Debug,
112 OutStreamType: Stream<Item=OutType> + Send + 'static,
113 OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
114 ErrVoidAsyncType: Future<Output=()> + Send + 'static,
115 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
116
117 (mut self,
118 concurrency_limit: u32,
119 futures_timeout: Duration,
120 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
121 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
122 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
123
124 -> Arc<Self> {
125
126 let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
127 let on_err_callback = Arc::new(on_err_callback);
128 let in_streams = self.consumer_stream_internal();
129 for i in 0..=in_streams.len() {
130 let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
131 let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
132 self.stream_executors.push(executor);
133 }
134 let arc_self = Arc::new(self);
135 let arc_self_ref = Arc::clone(&arc_self);
136 arc_self.stream_executors.iter().zip(in_streams)
137 .for_each(|(executor, in_stream)| {
138 let arc_self = Arc::clone(&arc_self);
139 let on_close_callback = Arc::clone(&on_close_callback);
140 let on_err_callback = Arc::clone(&on_err_callback);
141 let out_stream = pipeline_builder(in_stream);
142 Arc::clone(executor)
143 .spawn_executor::<_, _, _, _>(
144 concurrency_limit,
145 move |err| on_err_callback(err),
146 move |executor| {
147 async move {
148 arc_self.finished_executors_count.fetch_add(1, Relaxed);
149 on_close_callback(executor).await;
150 }
151 },
152 out_stream
153 );
154 });
155 arc_self_ref
156 }
157
158 fn spawn_fallibles_executors<OutItemType: Send + Debug,
159 OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
160 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
161
162 (mut self,
163 concurrency_limit: u32,
164 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
165 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
166 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
167
168 -> Arc<Self> {
169
170 let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
171 let on_err_callback = Arc::new(on_err_callback);
172 let in_streams = self.consumer_stream_internal();
173 for i in 0..=in_streams.len() {
174 let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
175 let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
176 self.stream_executors.push(executor);
177 }
178 let arc_self = Arc::new(self);
179 let arc_self_ref = Arc::clone(&arc_self);
180 arc_self.stream_executors.iter().zip(in_streams)
181 .for_each(|(executor, in_stream)| {
182 let arc_self = Arc::clone(&arc_self);
183 let on_close_callback = Arc::clone(&on_close_callback);
184 let on_err_callback = Arc::clone(&on_err_callback);
185 let out_stream = pipeline_builder(in_stream);
186 Arc::clone(executor)
187 .spawn_fallibles_executor::<_, _>(
188 concurrency_limit,
189 move |err| on_err_callback(err),
190 move |executor| {
191 let arc_self = Arc::clone(&arc_self);
192 async move {
193 arc_self.finished_executors_count.fetch_add(1, Relaxed);
194 on_close_callback(executor).await;
195 }
196 },
197 out_stream
198 );
199 });
200 arc_self_ref
201 }
202
203 fn spawn_futures_executors<OutItemType: Send + Debug,
204 OutStreamType: Stream<Item=OutType> + Send + 'static,
205 OutType: Future<Output=OutItemType> + Send,
206 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
207
208 (mut self,
209 concurrency_limit: u32,
210 futures_timeout: Duration,
211 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
212 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
213
214 -> Arc<Self> {
215
216 let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
217 let in_streams= self.consumer_stream_internal();
218 for i in 0..=in_streams.len() {
219 let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
220 let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
221 self.stream_executors.push(executor);
222 }
223 let arc_self = Arc::new(self);
224 let arc_self_ref = Arc::clone(&arc_self);
225 arc_self.stream_executors.iter().zip(in_streams)
226 .for_each(|(executor, in_stream)| {
227 let arc_self = Arc::clone(&arc_self);
228 let on_close_callback = Arc::clone(&on_close_callback);
229 let out_stream = pipeline_builder(in_stream);
230 Arc::clone(executor)
231 .spawn_futures_executor(
232 concurrency_limit,
233 move |executor| {
234 let arc_self = Arc::clone(&arc_self);
235 async move {
236 arc_self.finished_executors_count.fetch_add(1, Relaxed);
237 on_close_callback(executor).await;
238 }
239 },
240 out_stream
241 );
242 });
243 arc_self_ref
244 }
245
246 fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug,
247 OutStreamType: Stream<Item=OutItemType> + Send + 'static,
248 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
249
250 (mut self,
251 concurrency_limit: u32,
252 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
253 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
254
255 -> Arc<Self> {
256
257 let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
258 let in_streams = self.consumer_stream_internal();
259 for i in 0..=in_streams.len() {
260 let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
261 let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
262 self.stream_executors.push(executor);
263 }
264 let arc_self = Arc::new(self);
265 let arc_self_ref = Arc::clone(&arc_self);
266 arc_self.stream_executors.iter().zip(in_streams)
267 .for_each(|(executor, in_stream)| {
268 let arc_self = Arc::clone(&arc_self);
269 let on_close_callback = Arc::clone(&on_close_callback);
270 let out_stream = pipeline_builder(in_stream);
271 Arc::clone(executor)
272 .spawn_non_futures_non_fallibles_executor(
273 concurrency_limit,
274 move |executor| {
275 let arc_self = Arc::clone(&arc_self);
276 async move {
277 arc_self.finished_executors_count.fetch_add(1, Relaxed);
278 on_close_callback(executor).await;
279 }
280 },
281 out_stream
282 );
283 });
284 arc_self_ref
285 }
286}
287
288impl<ItemType: Send + Sync + Debug + 'static,
289 UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
290 const INSTRUMENTS: usize,
291 DerivedItemType: Send + Sync + Debug + 'static>
292Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
293
294 #[must_use]
296 fn consumer_stream_internal(&self) -> Vec<MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>> {
297 (0..UniChannelType::MAX_STREAMS)
298 .map(|_| {
299 let (stream, _stream_id) = self.channel.create_stream();
300 stream
301 })
302 .collect()
303 }
304}
305
306
307pub trait GenericUni {
316 const INSTRUMENTS: usize;
318 type ItemType: Send + Sync + Debug + 'static;
320 type DerivedItemType: Send + Sync + Debug + 'static;
322 type UniChannelType: FullDuplexUniChannel<ItemType=Self::ItemType, DerivedItemType=Self::DerivedItemType> + Send + Sync + 'static;
324 type MutinyStreamType;
327
328 fn new<IntoString: Into<String>>(uni_name: IntoString) -> Self;
338
339 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
341 fn send(&self, item: Self::ItemType) -> keen_retry::RetryConsumerResult<(), Self::ItemType, ()>;
342
343 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
345 fn send_with<F: FnOnce(&mut Self::ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()>;
346
347 fn send_with_async<F: FnOnce(&'static mut Self::ItemType) -> Fut + Send,
349 Fut: Future<Output=&'static mut Self::ItemType> + Send>
350 (&'static self,
351 setter: F)
352 -> impl Future<Output=keen_retry::RetryConsumerResult<(), F, ()>> + Send;
353
354 fn reserve_slot(&self) -> Option<&mut Self::ItemType>;
356
357 fn try_send_reserved(&self, reserved_slot: &mut Self::ItemType) -> bool;
359
360 fn try_cancel_slot_reserve(&self, reserved_slot: &mut Self::ItemType) -> bool;
362
363 #[must_use = "By calling this method, the Uni gets converted into only providing Streams (rather than executing them) -- so the returned values of (self, Streams) must be used"]
365 fn consumer_stream(self) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
366
367 fn buffer_size(&self) -> u32;
370
371 fn pending_items_count(&self) -> u32;
374
375 fn flush(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
378
379 #[must_use = "Returns true if the Uni could be closed within the given time"]
385 fn close(&self, timeout: Duration) -> impl Future<Output=bool> + Send;
386
387 #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
392 fn spawn_executors<OutItemType: Send + Debug,
393 OutStreamType: Stream<Item=OutType> + Send + 'static,
394 OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
395 ErrVoidAsyncType: Future<Output=()> + Send + 'static,
396 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
397
398 (self,
399 concurrency_limit: u32,
400 futures_timeout: Duration,
401 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
402 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
403 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
404
405 -> Arc<Self>;
406
407 #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
412 fn spawn_fallibles_executors<OutItemType: Send + Debug,
413 OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
414 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
415
416 (self,
417 concurrency_limit: u32,
418 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
419 on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
420 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
421
422 -> Arc<Self>;
423
424 #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
428 fn spawn_futures_executors<OutItemType: Send + Debug,
429 OutStreamType: Stream<Item=OutType> + Send + 'static,
430 OutType: Future<Output=OutItemType> + Send,
431 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
432
433 (self,
434 concurrency_limit: u32,
435 futures_timeout: Duration,
436 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
437 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
438
439 -> Arc<Self>;
440
441 #[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
445 fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug,
446 OutStreamType: Stream<Item=OutItemType> + Send + 'static,
447 CloseVoidAsyncType: Future<Output=()> + Send + 'static>
448
449 (self,
450 concurrency_limit: u32,
451 pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
452 on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
453
454 -> Arc<Self>;
455}
456
457
458#[macro_export]
460macro_rules! unis_close_async {
461 ($timeout: expr,
462 $($uni: tt),+) => {
463 {
464 tokio::join!( $( $uni.channel.flush($timeout), )+ );
465 tokio::join!( $( $uni.channel.gracefully_end_all_streams($timeout), )+);
466 }
467 }
468}
469pub use unis_close_async;
470
471
472fn latch_callback_1p<CallbackParameterType: Send + 'static,
474 CallbackAsyncType: Send + Future<Output=()>>
475 (latch_count: u32,
476 async_callback: impl FnOnce(CallbackParameterType) -> CallbackAsyncType + Send + Sync + 'static)
477 -> impl Fn(CallbackParameterType) -> BoxFuture<'static, ()> {
478 let async_callback = Arc::new(Mutex::new(Some(async_callback)));
479 let latch_counter = Arc::new(AtomicU32::new(latch_count));
480 move |p1| {
481 let async_callback = Arc::clone(&async_callback);
482 let latch_counter = Arc::clone(&latch_counter);
483 Box::pin(async move {
484 if latch_counter.fetch_sub(1, Relaxed) == 1 {
485 let mut async_callback = async_callback.lock().await;
486 (async_callback.take().expect("Uni::latch_callback_1p(): BUG! FnOnce() not honored by the algorithm"))(p1).await;
487 }
488 })
489 }
490}