use crate::stream_executor::StreamExecutorStats;
use super::super::{
stream_executor::StreamExecutor,
mutiny_stream::MutinyStream,
types::FullDuplexUniChannel,
};
use std::{fmt::Debug, time::Duration, sync::{Arc, atomic::{AtomicU32, Ordering::Relaxed}}};
use std::future::Future;
use std::marker::PhantomData;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::Stream;
use tokio::sync::Mutex;
pub struct Uni<ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
const INSTRUMENTS: usize,
DerivedItemType: Send + Sync + Debug + 'static = ItemType> {
pub channel: Arc<UniChannelType>,
pub stream_executors: Vec<Arc<StreamExecutor<INSTRUMENTS>>>,
pub finished_executors_count: AtomicU32,
_phantom: PhantomData<(&'static ItemType, &'static DerivedItemType)>,
}
#[async_trait]
impl<ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
const INSTRUMENTS: usize,
DerivedItemType: Send + Sync + Debug + 'static>
GenericUni for
Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
const INSTRUMENTS: usize = INSTRUMENTS;
type ItemType = ItemType;
type UniChannelType = UniChannelType;
type DerivedItemType = DerivedItemType;
type MutinyStreamType = MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>;
fn new<IntoString:Into<String> >(uni_name: IntoString) -> Self {
Uni {
channel: UniChannelType::new(uni_name),
stream_executors: vec![],
finished_executors_count: AtomicU32::new(0),
_phantom: PhantomData,
}
}
#[inline(always)]
fn send(&self, item:Self::ItemType) -> keen_retry::RetryConsumerResult<(),Self::ItemType,()> {
self.channel.send(item)
}
#[inline(always)]
fn send_with<F:FnOnce(&mut Self::ItemType)>(&self, setter:F) -> keen_retry::RetryConsumerResult<(),F,()> {
self.channel.send_with(setter)
}
fn consumer_stream(self) -> (Arc<Self> ,Vec<MutinyStream<'static,Self::ItemType,Self::UniChannelType,Self::DerivedItemType> >) {
let streams = self.consumer_stream_internal();
let arc_self = Arc::new(self);
(arc_self, streams)
}
#[inline(always)]
fn pending_items_count(&self) -> u32 {
self.channel.pending_items_count()
}
#[inline(always)]
fn buffer_size(&self) -> u32 {
self.channel.buffer_size()
}
async fn flush(&self, duration: Duration) -> u32 {
self.channel.flush(duration).await
}
async fn close(&self, timeout: Duration) -> bool {
self.channel.gracefully_end_all_streams(timeout).await == 0
}
fn spawn_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutType> + Send + 'static,
OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
ErrVoidAsyncType: Future<Output=()> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(mut self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self> {
let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
let on_err_callback = Arc::new(on_err_callback);
let in_streams = self.consumer_stream_internal();
for i in 0..=in_streams.len() {
let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
self.stream_executors.push(executor);
}
let arc_self = Arc::new(self);
let arc_self_ref = Arc::clone(&arc_self);
arc_self.stream_executors.iter().zip(in_streams)
.for_each(|(executor, in_stream)| {
let arc_self = Arc::clone(&arc_self);
let on_close_callback = Arc::clone(&on_close_callback);
let on_err_callback = Arc::clone(&on_err_callback);
let out_stream = pipeline_builder(in_stream);
Arc::clone(executor)
.spawn_executor::<_, _, _, _>(
concurrency_limit,
move |err| on_err_callback(err),
move |executor| {
async move {
arc_self.finished_executors_count.fetch_add(1, Relaxed);
on_close_callback(executor).await;
}
},
out_stream
);
});
arc_self_ref
}
fn spawn_fallibles_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(mut self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self> {
let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
let on_err_callback = Arc::new(on_err_callback);
let in_streams = self.consumer_stream_internal();
for i in 0..=in_streams.len() {
let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
self.stream_executors.push(executor);
}
let arc_self = Arc::new(self);
let arc_self_ref = Arc::clone(&arc_self);
arc_self.stream_executors.iter().zip(in_streams)
.for_each(|(executor, in_stream)| {
let arc_self = Arc::clone(&arc_self);
let on_close_callback = Arc::clone(&on_close_callback);
let on_err_callback = Arc::clone(&on_err_callback);
let out_stream = pipeline_builder(in_stream);
Arc::clone(executor)
.spawn_fallibles_executor::<_, _>(
concurrency_limit,
move |err| on_err_callback(err),
move |executor| {
let arc_self = Arc::clone(&arc_self);
async move {
arc_self.finished_executors_count.fetch_add(1, Relaxed);
on_close_callback(executor).await;
}
},
out_stream
);
});
arc_self_ref
}
fn spawn_futures_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutType> + Send + 'static,
OutType: Future<Output=OutItemType> + Send,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(mut self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self> {
let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
let in_streams= self.consumer_stream_internal();
for i in 0..=in_streams.len() {
let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(pipeline_name, futures_timeout);
self.stream_executors.push(executor);
}
let arc_self = Arc::new(self);
let arc_self_ref = Arc::clone(&arc_self);
arc_self.stream_executors.iter().zip(in_streams)
.for_each(|(executor, in_stream)| {
let arc_self = Arc::clone(&arc_self);
let on_close_callback = Arc::clone(&on_close_callback);
let out_stream = pipeline_builder(in_stream);
Arc::clone(executor)
.spawn_futures_executor(
concurrency_limit,
move |executor| {
let arc_self = Arc::clone(&arc_self);
async move {
arc_self.finished_executors_count.fetch_add(1, Relaxed);
on_close_callback(executor).await;
}
},
out_stream
);
});
arc_self_ref
}
fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutItemType> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(mut self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self> {
let on_close_callback = Arc::new(latch_callback_1p(UniChannelType::MAX_STREAMS as u32, on_close_callback));
let in_streams = self.consumer_stream_internal();
for i in 0..=in_streams.len() {
let pipeline_name = format!("Consumer #{i} for Uni '{}'", self.channel.name());
let executor = StreamExecutor::<INSTRUMENTS>::new(pipeline_name);
self.stream_executors.push(executor);
}
let arc_self = Arc::new(self);
let arc_self_ref = Arc::clone(&arc_self);
arc_self.stream_executors.iter().zip(in_streams)
.for_each(|(executor, in_stream)| {
let arc_self = Arc::clone(&arc_self);
let on_close_callback = Arc::clone(&on_close_callback);
let out_stream = pipeline_builder(in_stream);
Arc::clone(executor)
.spawn_non_futures_non_fallibles_executor(
concurrency_limit,
move |executor| {
let arc_self = Arc::clone(&arc_self);
async move {
arc_self.finished_executors_count.fetch_add(1, Relaxed);
on_close_callback(executor).await;
}
},
out_stream
);
});
arc_self_ref
}
}
impl<ItemType: Send + Sync + Debug + 'static,
UniChannelType: FullDuplexUniChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Send + Sync + 'static,
const INSTRUMENTS: usize,
DerivedItemType: Send + Sync + Debug + 'static>
Uni<ItemType, UniChannelType, INSTRUMENTS, DerivedItemType> {
#[must_use]
fn consumer_stream_internal(&self) -> Vec<MutinyStream<'static, ItemType, UniChannelType, DerivedItemType>> {
(0..UniChannelType::MAX_STREAMS)
.map(|_| {
let (stream, _stream_id) = self.channel.create_stream();
stream
})
.collect()
}
}
#[async_trait]
pub trait GenericUni {
const INSTRUMENTS: usize;
type ItemType: Send + Sync + Debug + 'static;
type DerivedItemType: Send + Sync + Debug + 'static;
type UniChannelType: FullDuplexUniChannel<ItemType=Self::ItemType, DerivedItemType=Self::DerivedItemType> + Send + Sync + 'static;
type MutinyStreamType;
fn new<IntoString: Into<String>>(uni_name: IntoString) -> Self;
#[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
fn send(&self, item: Self::ItemType) -> keen_retry::RetryConsumerResult<(), Self::ItemType, ()>;
#[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
fn send_with<F: FnOnce(&mut Self::ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()>;
#[must_use = "By calling this method, the Uni gets converted into only providing Streams (rather than executing them) -- so the returned values of (self, Streams) must be used"]
fn consumer_stream(self) -> (Arc<Self>, Vec<MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>>);
fn buffer_size(&self) -> u32;
fn pending_items_count(&self) -> u32;
async fn flush(&self, timeout: Duration) -> u32;
#[must_use = "Returns true if the Uni could be closed within the given time"]
async fn close(&self, timeout: Duration) -> bool;
#[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
fn spawn_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutType> + Send + 'static,
OutType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
ErrVoidAsyncType: Future<Output=()> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self>;
#[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
fn spawn_fallibles_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_err_callback: impl Fn(Box<dyn std::error::Error + Send + Sync>) + Send + Sync + 'static,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self>;
#[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
fn spawn_futures_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutType> + Send + 'static,
OutType: Future<Output=OutItemType> + Send,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self,
concurrency_limit: u32,
futures_timeout: Duration,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self>;
#[must_use = "`Arc<self>` is returned back, so the return value must be used to send data to this `Uni` and to close it"]
fn spawn_non_futures_non_fallibles_executors<OutItemType: Send + Debug,
OutStreamType: Stream<Item=OutItemType> + Send + 'static,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self,
concurrency_limit: u32,
pipeline_builder: impl Fn(MutinyStream<'static, Self::ItemType, Self::UniChannelType, Self::DerivedItemType>) -> OutStreamType,
on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
-> Arc<Self>;
}
#[macro_export]
macro_rules! unis_close_async {
($timeout: expr,
$($uni: tt),+) => {
{
tokio::join!( $( $uni.channel.flush($timeout), )+ );
tokio::join!( $( $uni.channel.gracefully_end_all_streams($timeout), )+);
}
}
}
pub use unis_close_async;
fn latch_callback_1p<CallbackParameterType: Send + 'static,
CallbackAsyncType: Send + Future<Output=()>>
(latch_count: u32,
async_callback: impl FnOnce(CallbackParameterType) -> CallbackAsyncType + Send + Sync + 'static)
-> impl Fn(CallbackParameterType) -> BoxFuture<'static, ()> {
let async_callback = Arc::new(Mutex::new(Some(async_callback)));
let latch_counter = Arc::new(AtomicU32::new(latch_count));
move |p1| {
let async_callback = Arc::clone(&async_callback);
let latch_counter = Arc::clone(&latch_counter);
Box::pin(async move {
if latch_counter.fetch_sub(1, Relaxed) == 1 {
let mut async_callback = async_callback.lock().await;
(async_callback.take().expect("Uni::latch_callback_1p(): BUG! FnOnce() not honored by the algorithm"))(p1).await;
}
})
}
}