use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::codec::Codec;
use crate::{Broker, Publisher, Subscriber, SubscriptionSource};
use super::context::State;
use super::dispatch::{Delivery, spawn_dispatch};
use super::handler::Handler;
use super::lifecycle::{BoxError, BoxFuture};
use super::metadata::HandlerMetadata;
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};
use super::publishing::{PublishingDef, PublishingHandler};
use super::subscriber_def::SubscriberDef;
use super::typed::typed;
pub(crate) type BoundStarter<B> = Box<
dyn FnOnce(
Arc<B>,
Arc<State>,
Arc<Delivery>,
CancellationToken,
) -> BoxFuture<'static, Result<JoinHandle<()>, BoxError>>
+ Send,
>;
pub struct Router<B> {
starters: Vec<BoundStarter<B>>,
handlers: Vec<HandlerMetadata>,
}
impl<B> Default for Router<B> {
fn default() -> Self {
Self {
starters: Vec::new(),
handlers: Vec::new(),
}
}
}
impl<B> std::fmt::Debug for Router<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Router")
.field("handlers", &self.handlers.len())
.finish_non_exhaustive()
}
}
impl<B: Broker + 'static> Router<B> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn handle<S, H>(&mut self, subscriber: S, handler: H, meta: HandlerMetadata)
where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters
.push(Box::new(move |_broker, state, delivery, token| {
Box::pin(async move {
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery,
))
})
}));
self.handlers.push(meta);
}
pub fn subscribe<S, H>(&mut self, source: S, handler: H, meta: HandlerMetadata)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters
.push(Box::new(move |broker: Arc<B>, state, delivery, token| {
Box::pin(async move {
let subscriber = source
.subscribe(broker.as_ref())
.await
.map_err(|e| Box::new(e) as BoxError)?;
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery,
))
})
}));
self.handlers.push(meta);
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include<D>(&mut self, def: D)
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.mount(def, crate::codec::DefaultCodec::default());
}
fn mount<D, C>(&mut self, def: D, codec: C)
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
{
let source = def.source();
let mut meta = HandlerMetadata::typed::<D::Input>(source.name().to_owned());
if let Some(description) = def.description() {
meta = meta.with_description(description.to_owned());
}
if let Some(schema) = def.input_schema() {
meta = meta.with_payload_schema(schema);
}
let handler = typed(codec, def.into_handler());
self.subscribe(source, handler, meta);
}
pub fn include_publishing<D, P, PC, PL>(&mut self, def: D, publisher: TypedPublisher<P, PC, PL>)
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
{
let codec = publisher.codec().clone();
self.mount_publishing(def, codec, publisher);
}
fn mount_publishing<D, C, P, PC, PL>(
&mut self,
def: D,
codec: C,
publisher: TypedPublisher<P, PC, PL>,
) where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let source = def.source();
let description = def.description().map(str::to_owned);
let schema = def.input_schema();
let mut meta = HandlerMetadata::typed::<D::Input>(source.name().to_owned())
.with_output_type(std::any::type_name::<D::Reply>());
if let Some(description) = description {
meta = meta.with_description(description);
}
if let Some(schema) = schema {
meta = meta.with_payload_schema(schema);
}
let pipeline: Arc<[Arc<dyn PublishMiddleware>]> = Arc::from([]);
let handler = PublishingHandler {
def,
codec,
publisher,
pipeline,
};
self.subscribe(source, handler, meta);
}
pub fn with_codec<C>(&mut self, codec: C) -> RouterCodec<'_, B, C>
where
C: Codec + Clone + 'static,
{
RouterCodec {
router: self,
codec,
}
}
pub fn merge(&mut self, other: Self) {
self.starters.extend(other.starters);
self.handlers.extend(other.handlers);
}
#[must_use]
pub fn handlers(&self) -> &[HandlerMetadata] {
&self.handlers
}
pub(crate) fn into_parts(self) -> (Vec<BoundStarter<B>>, Vec<HandlerMetadata>) {
(self.starters, self.handlers)
}
}
pub struct RouterCodec<'a, B, C> {
router: &'a mut Router<B>,
codec: C,
}
impl<B, C> std::fmt::Debug for RouterCodec<'_, B, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RouterCodec").finish_non_exhaustive()
}
}
impl<B: Broker + 'static, C: Codec + Clone + 'static> RouterCodec<'_, B, C> {
pub fn include<D>(&mut self, def: D)
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.router.mount(def, self.codec.clone());
}
pub fn include_publishing<D, P, PC, PL>(&mut self, def: D, publisher: TypedPublisher<P, PC, PL>)
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
self.router
.mount_publishing(def, self.codec.clone(), publisher);
}
}