use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::{BatchSubscriber, Broker, Subscriber, SubscriptionSource};
use crate::runtime::batch::BatchHandler;
use crate::runtime::context::State;
use crate::runtime::dispatch::{
Delivery, Workers, spawn_batch_dispatch, spawn_dispatch, spawn_dispatch_workers,
};
use crate::runtime::failure::{DispatchFailure, ErrorShutdown, FailurePolicies};
use crate::runtime::handler::Handler;
use crate::runtime::lifecycle::{BoxError, BoxFuture};
use crate::runtime::metadata::HandlerMetadata;
use super::SourceMessage;
pub(crate) type BoundStarter<B> = Box<
dyn FnOnce(
Arc<B>,
Arc<State>,
Arc<Delivery>,
ErrorShutdown,
CancellationToken,
) -> BoxFuture<'static, Result<JoinHandle<()>, BoxError>>
+ Send,
>;
pub struct RouterSink<B> {
starters: Vec<BoundStarter<B>>,
handlers: Vec<HandlerMetadata>,
}
impl<B> std::fmt::Debug for RouterSink<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RouterSink")
.field("handlers", &self.handlers.len())
.finish_non_exhaustive()
}
}
impl<B: Broker + 'static> RouterSink<B> {
pub(crate) fn new() -> Self {
Self {
starters: Vec::new(),
handlers: Vec::new(),
}
}
pub(crate) fn push_handle<S, H>(
&mut self,
subscriber: S,
handler: H,
meta: HandlerMetadata,
policies: FailurePolicies,
) where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters.push(Box::new(
move |_broker, state, delivery, shutdown, token| {
Box::pin(async move {
let failure = DispatchFailure::new(policies, shutdown);
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery, failure,
))
})
},
));
self.handlers.push(meta);
}
pub(crate) fn push_subscribe_batch<S, H>(
&mut self,
source: S,
handler: H,
meta: HandlerMetadata,
policies: FailurePolicies,
workers: Workers,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
SourceMessage<B, S>: Send + 'static,
H: BatchHandler<SourceMessage<B, S>> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters.push(Box::new(
move |broker: Arc<B>, state, delivery, shutdown, token| {
Box::pin(async move {
let subscriber = source
.subscribe(broker.as_ref())
.await
.map_err(|e| Box::new(e) as BoxError)?;
let failure = DispatchFailure::new(policies, shutdown);
Ok(spawn_batch_dispatch(
subscriber, handler, token, name, state, delivery, failure, workers,
))
})
},
));
self.handlers.push(meta);
}
pub(crate) fn push_subscribe_workers<S, H>(
&mut self,
source: S,
handler: H,
meta: HandlerMetadata,
policies: FailurePolicies,
workers: Workers,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
SourceMessage<B, S>: Send + Sync + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters.push(Box::new(
move |broker: Arc<B>, state, delivery, shutdown, token| {
Box::pin(async move {
let subscriber = source
.subscribe(broker.as_ref())
.await
.map_err(|e| Box::new(e) as BoxError)?;
let failure = DispatchFailure::new(policies, shutdown);
Ok(spawn_dispatch_workers(
subscriber, handler, token, name, state, delivery, failure, workers,
))
})
},
));
self.handlers.push(meta);
}
pub(crate) fn push_subscribe<S, H>(
&mut self,
source: S,
handler: H,
meta: HandlerMetadata,
policies: FailurePolicies,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters.push(Box::new(
move |broker: Arc<B>, state, delivery, shutdown, token| {
Box::pin(async move {
let subscriber = source
.subscribe(broker.as_ref())
.await
.map_err(|e| Box::new(e) as BoxError)?;
let failure = DispatchFailure::new(policies, shutdown);
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery, failure,
))
})
},
));
self.handlers.push(meta);
}
pub(crate) fn into_parts(self) -> (Vec<BoundStarter<B>>, Vec<HandlerMetadata>) {
(self.starters, self.handlers)
}
}