use std::marker::PhantomData;
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::codec::Codec;
use crate::{Broker, Publisher, Subscriber, SubscriptionSource};
use super::context::State;
use super::dispatch::{Delivery, spawn_dispatch};
use super::handler::Handler;
use super::lifecycle::{BoxError, BoxFuture};
use super::metadata::HandlerMetadata;
use super::middleware::{BlanketLayer, Identity, Stack};
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};
use super::publishing::{PublishingDef, PublishingHandler, publishing_metadata};
use super::subscriber_def::{SubscriberDef, subscriber_metadata};
use super::typed::{Typed, typed};
pub(crate) type BoundStarter<B> = Box<
dyn FnOnce(
Arc<B>,
Arc<State>,
Arc<Delivery>,
CancellationToken,
) -> BoxFuture<'static, Result<JoinHandle<()>, BoxError>>
+ Send,
>;
type SourceMessage<B, S> = <<S as SubscriptionSource<B>>::Subscriber as Subscriber>::Message;
type TypedRoute<B, S, D, C> = SubscribeRoute<
S,
Typed<SourceMessage<B, S>, <D as SubscriberDef>::Input, C, <D as SubscriberDef>::Handler>,
>;
type IncludedRouter<B, S, D, C, RC, RL, R> = Router<B, (TypedRoute<B, S, D, C>, R), RC, RL>;
type PublishingRouter<B, S, D, C, P, PC, PL, RC, RL, R> =
Router<B, (SubscribeRoute<S, PublishingHandler<D, C, P, PC, PL>>, R), RC, RL>;
type MergedRouter<B, R2, C2, L2, RC, RL, R> = Router<B, (Router<B, R2, C2, L2>, R), RC, RL>;
pub struct RouterSink<B> {
starters: Vec<BoundStarter<B>>,
handlers: Vec<HandlerMetadata>,
}
impl<B> std::fmt::Debug for RouterSink<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RouterSink")
.field("handlers", &self.handlers.len())
.finish_non_exhaustive()
}
}
impl<B: Broker + 'static> RouterSink<B> {
pub(crate) fn new() -> Self {
Self {
starters: Vec::new(),
handlers: Vec::new(),
}
}
pub(crate) fn push_handle<S, H>(&mut self, subscriber: S, handler: H, meta: HandlerMetadata)
where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters
.push(Box::new(move |_broker, state, delivery, token| {
Box::pin(async move {
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery,
))
})
}));
self.handlers.push(meta);
}
pub(crate) fn push_subscribe<S, H>(&mut self, source: S, handler: H, meta: HandlerMetadata)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
let handler = Arc::new(handler);
let name: Arc<str> = Arc::from(meta.name.as_ref());
self.starters
.push(Box::new(move |broker: Arc<B>, state, delivery, token| {
Box::pin(async move {
let subscriber = source
.subscribe(broker.as_ref())
.await
.map_err(|e| Box::new(e) as BoxError)?;
Ok(spawn_dispatch(
subscriber, handler, token, name, state, delivery,
))
})
}));
self.handlers.push(meta);
}
pub(crate) fn into_parts(self) -> (Vec<BoundStarter<B>>, Vec<HandlerMetadata>) {
(self.starters, self.handlers)
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct SubscribeRoute<S, H> {
source: S,
handler: H,
meta: HandlerMetadata,
}
#[doc(hidden)]
#[derive(Debug)]
pub struct HandleRoute<S, H> {
subscriber: S,
handler: H,
meta: HandlerMetadata,
}
trait MountRoute<B> {
fn mount_one<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>);
fn collect(&self, out: &mut Vec<HandlerMetadata>);
}
impl<B, S, H> MountRoute<B> for SubscribeRoute<S, H>
where
B: Broker + 'static,
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
SourceMessage<B, S>: Send + Sync + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
fn mount_one<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
let handler = global.apply::<SourceMessage<B, S>, H>(self.handler);
sink.push_subscribe(self.source, handler, self.meta);
}
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<B, S, H> MountRoute<B> for HandleRoute<S, H>
where
B: Broker + 'static,
S: Subscriber + Send + 'static,
S::Message: Send + Sync + 'static,
H: Handler<S::Message> + 'static,
{
fn mount_one<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
let handler = global.apply::<S::Message, H>(self.handler);
sink.push_handle(self.subscriber, handler, self.meta);
}
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
pub trait RouterDef<B> {
#[doc(hidden)]
fn mount<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>);
#[doc(hidden)]
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>);
}
impl<B: Broker + 'static> RouterDef<B> for () {
fn mount<G: BlanketLayer>(self, _global: &G, _sink: &mut RouterSink<B>) {}
fn collect_handlers(&self, _out: &mut Vec<HandlerMetadata>) {}
}
impl<B, Head, Tail> RouterDef<B> for (Head, Tail)
where
B: Broker + 'static,
Head: MountRoute<B>,
Tail: RouterDef<B>,
{
fn mount<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
self.1.mount(global, sink);
self.0.mount_one(global, sink);
}
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>) {
self.1.collect_handlers(out);
self.0.collect(out);
}
}
pub struct Router<B, R = (), C = (), L = Identity> {
routes: R,
codec: C,
layers: L,
_broker: PhantomData<fn() -> B>,
}
impl<B: Broker + 'static> Default for Router<B, (), (), Identity> {
fn default() -> Self {
Self {
routes: (),
codec: (),
layers: Identity,
_broker: PhantomData,
}
}
}
impl<B, R, C, L> std::fmt::Debug for Router<B, R, C, L> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Router").finish_non_exhaustive()
}
}
impl<B: Broker + 'static> Router<B, ()> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl<B: Broker + 'static, R, RC, RL> Router<B, R, RC, RL> {
#[must_use]
pub fn with_codec<C>(self, codec: C) -> Router<B, R, C, RL> {
Router {
routes: self.routes,
codec,
layers: self.layers,
_broker: PhantomData,
}
}
#[must_use]
pub fn layer<N>(self, layer: N) -> Router<B, R, RC, Stack<N, RL>> {
Router {
routes: self.routes,
codec: self.codec,
layers: Stack::new(layer, self.layers),
_broker: PhantomData,
}
}
#[must_use]
pub fn merge<R2, C2, L2>(
self,
other: Router<B, R2, C2, L2>,
) -> MergedRouter<B, R2, C2, L2, RC, RL, R>
where
R2: RouterDef<B>,
L2: BlanketLayer,
{
Router {
routes: (other, self.routes),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub fn handle<S, H>(
self,
subscriber: S,
handler: H,
meta: HandlerMetadata,
) -> Router<B, (HandleRoute<S, H>, R), RC, RL>
where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
{
Router {
routes: (
HandleRoute {
subscriber,
handler,
meta,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub fn subscribe<S, H>(
self,
source: S,
handler: H,
meta: HandlerMetadata,
) -> Router<B, (SubscribeRoute<S, H>, R), RC, RL>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
Router {
routes: (
SubscribeRoute {
source,
handler,
meta,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
fn mount_subscriber<S, D, C>(
self,
source: S,
def: D,
codec: C,
) -> IncludedRouter<B, S, D, C, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
{
let meta = subscriber_metadata(source.name().to_owned(), &def);
let handler = typed(codec, def.into_handler());
self.subscribe(source, handler, meta)
}
fn mount_publishing<S, D, C, P, PC, PL>(
self,
source: S,
def: D,
codec: C,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, C, P, PC, PL, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let meta = publishing_metadata(source.name().to_owned(), &def);
let pipeline: Arc<[Arc<dyn PublishMiddleware>]> = Arc::from([]);
let handler = PublishingHandler {
def,
codec,
publisher,
pipeline,
};
self.subscribe(source, handler, meta)
}
}
impl<B: Broker + 'static, R, RL> Router<B, R, (), RL> {
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include<D>(
self,
def: D,
) -> IncludedRouter<B, D::Source, D, crate::codec::DefaultCodec, (), RL, R>
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let source = def.source();
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default())
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_on<S, D>(
self,
source: S,
def: D,
) -> IncludedRouter<B, S, D, crate::codec::DefaultCodec, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default())
}
pub fn include_publishing<D, P, PC, PL>(
self,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, D::Source, D, PC, P, PC, PL, (), RL, R>
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
{
let codec = publisher.codec().clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher)
}
pub fn include_publishing_on<S, D, P, PC, PL>(
self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, PC, P, PC, PL, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
{
let codec = publisher.codec().clone();
self.mount_publishing(source, def, codec, publisher)
}
}
impl<B: Broker + 'static, R, C: Codec + Clone + 'static, RL> Router<B, R, C, RL> {
pub fn include<D>(self, def: D) -> IncludedRouter<B, D::Source, D, C, C, RL, R>
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_subscriber(source, def, codec)
}
pub fn include_on<S, D>(self, source: S, def: D) -> IncludedRouter<B, S, D, C, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
self.mount_subscriber(source, def, codec)
}
pub fn include_publishing<D, P, PC, PL>(
self,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, D::Source, D, C, P, PC, PL, C, RL, R>
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher)
}
pub fn include_publishing_on<S, D, P, PC, PL>(
self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, C, P, PC, PL, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let codec = self.codec.clone();
self.mount_publishing(source, def, codec, publisher)
}
}
impl<B: Broker + 'static, R: RouterDef<B>, C, L> Router<B, R, C, L> {
#[must_use]
pub fn handlers(&self) -> Vec<HandlerMetadata> {
let mut out = Vec::new();
self.routes.collect_handlers(&mut out);
out
}
}
struct ComposedBlanket<'a, Outer, Inner> {
outer: &'a Outer,
inner: &'a Inner,
}
impl<Outer: BlanketLayer, Inner: BlanketLayer> BlanketLayer for ComposedBlanket<'_, Outer, Inner> {
fn apply<M, H>(&self, handler: H) -> impl Handler<M> + 'static
where
M: Send + Sync + 'static,
H: Handler<M> + 'static,
{
self.outer.apply::<M, _>(self.inner.apply::<M, _>(handler))
}
}
impl<B, R, C, L> RouterDef<B> for Router<B, R, C, L>
where
B: Broker + 'static,
R: RouterDef<B>,
L: BlanketLayer,
{
fn mount<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
let composed = ComposedBlanket {
outer: global,
inner: &self.layers,
};
self.routes.mount(&composed, sink);
}
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>) {
self.routes.collect_handlers(out);
}
}
impl<B, R, C, L> MountRoute<B> for Router<B, R, C, L>
where
B: Broker + 'static,
R: RouterDef<B>,
L: BlanketLayer,
{
fn mount_one<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
RouterDef::mount(self, global, sink);
}
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
self.routes.collect_handlers(out);
}
}