use std::marker::PhantomData;
use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::codec::Codec;
use crate::{BatchSubscriber, Broker, Publisher, Subscriber, SubscriptionSource};
use crate::runtime::batch::{BatchDef, SliceHandler, batch_metadata, typed_batch};
use crate::runtime::batch_publishing::{
BatchPublishingDef, BatchPublishingHandler, batch_publishing_metadata,
};
use crate::runtime::dispatch::Workers;
use crate::runtime::failure::FailurePolicies;
use crate::runtime::handler::Handler;
use crate::runtime::metadata::HandlerMetadata;
use crate::runtime::middleware::{BlanketLayer, Identity, Stack};
use crate::runtime::publish::{PublishLayer, PublishMiddleware, ReplyPublisher, TypedPublisher};
use crate::runtime::publishing::{PublishingDef, PublishingHandler, publishing_metadata};
use crate::runtime::subscriber_def::{SubscriberDef, subscriber_metadata};
use crate::runtime::typed::typed;
use super::routes::{BatchRoute, HandleRoute, MountRoute, RouterDef, SubscribeRoute};
use super::sink::RouterSink;
use super::{
BatchPublishingRouter, IncludedBatchRouter, IncludedRouter, MergedRouter, PublishingRouter,
SourceMessage, SubscribedBatchRouter,
};
pub struct Router<B, R = (), C = (), L = Identity> {
pub(super) routes: R,
pub(super) codec: C,
pub(super) layers: L,
pub(super) _broker: PhantomData<fn() -> B>,
}
impl<B: Broker + 'static> Default for Router<B, (), (), Identity> {
fn default() -> Self {
Self {
routes: (),
codec: (),
layers: Identity,
_broker: PhantomData,
}
}
}
impl<B, R, C, L> std::fmt::Debug for Router<B, R, C, L> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Router").finish_non_exhaustive()
}
}
impl<B: Broker + 'static> Router<B, ()> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl<B: Broker + 'static, R, RC, RL> Router<B, R, RC, RL> {
#[must_use]
pub fn with_codec<C>(self, codec: C) -> Router<B, R, C, RL> {
Router {
routes: self.routes,
codec,
layers: self.layers,
_broker: PhantomData,
}
}
#[must_use]
pub fn layer<N>(self, layer: N) -> Router<B, R, RC, Stack<N, RL>> {
Router {
routes: self.routes,
codec: self.codec,
layers: Stack::new(layer, self.layers),
_broker: PhantomData,
}
}
#[must_use]
pub fn merge<R2, C2, L2>(
self,
other: Router<B, R2, C2, L2>,
) -> MergedRouter<B, R2, C2, L2, RC, RL, R>
where
R2: RouterDef<B>,
L2: BlanketLayer,
{
Router {
routes: (other, self.routes),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub fn handle<S, H>(
self,
subscriber: S,
handler: H,
meta: HandlerMetadata,
) -> Router<B, (HandleRoute<S, H>, R), RC, RL>
where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
{
Router {
routes: (
HandleRoute {
subscriber,
handler,
meta,
policies: FailurePolicies::default(),
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub fn subscribe<S, H>(
self,
source: S,
handler: H,
meta: HandlerMetadata,
) -> Router<B, (SubscribeRoute<S, H>, R), RC, RL>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<SourceMessage<B, S>> + 'static,
{
Router {
routes: (
SubscribeRoute {
source,
handler,
meta,
policies: FailurePolicies::default(),
workers: Workers::sequential(),
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub(super) fn push_batch_route<S, T, C, H>(
self,
source: S,
handler: H,
codec: C,
meta: HandlerMetadata,
) -> SubscribedBatchRouter<B, S, T, C, H, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
T: DeserializeOwned + Send + Sync + 'static,
C: Codec + 'static,
H: SliceHandler<T> + 'static,
{
Router {
routes: (
BatchRoute {
source,
handler: typed_batch(codec, handler),
meta,
policies: FailurePolicies::default(),
workers: Workers::sequential(),
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub(super) fn mount_subscriber<S, D, C>(
self,
source: S,
def: D,
codec: C,
) -> IncludedRouter<B, S, D, C, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
{
let meta = subscriber_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = typed(codec, def.into_handler()).on_decode_failure(policies.decode);
Router {
routes: (
SubscribeRoute {
source,
handler,
meta,
policies,
workers,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub(super) fn mount_batch<S, D, C>(
self,
source: S,
def: D,
codec: C,
) -> IncludedBatchRouter<B, S, D, C, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
{
let meta = batch_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = typed_batch(codec, def.into_handler()).with_decode(policies.decode);
Router {
routes: (
BatchRoute {
source,
handler,
meta,
policies,
workers,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub(super) fn mount_batch_publishing<S, D, C, RP>(
self,
source: S,
def: D,
codec: C,
publisher: RP,
) -> BatchPublishingRouter<B, S, D, C, RP, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
RP: ReplyPublisher + 'static,
{
let meta = batch_publishing_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let pipeline: Arc<[Arc<dyn PublishMiddleware>]> = Arc::from([]);
let handler = BatchPublishingHandler {
def,
codec,
publisher,
pipeline,
decode: policies.decode,
};
Router {
routes: (
BatchRoute {
source,
handler,
meta,
policies,
workers,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
pub(super) fn mount_publishing<S, D, C, P, PC, PL>(
self,
source: S,
def: D,
codec: C,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, C, P, PC, PL, RC, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let meta = publishing_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let pipeline: Arc<[Arc<dyn PublishMiddleware>]> = Arc::from([]);
let handler = PublishingHandler {
def,
codec,
publisher,
pipeline,
decode: policies.decode,
};
Router {
routes: (
SubscribeRoute {
source,
handler,
meta,
policies,
workers,
},
self.routes,
),
codec: self.codec,
layers: self.layers,
_broker: PhantomData,
}
}
}
impl<B, S, H, R, RC, RL> Router<B, (SubscribeRoute<S, H>, R), RC, RL> {
#[must_use]
pub fn workers(mut self, workers: Workers) -> Self {
self.routes.0.workers = workers;
self
}
}
impl<B, S, H, R, RC, RL> Router<B, (BatchRoute<S, H>, R), RC, RL> {
#[must_use]
pub fn workers(mut self, workers: Workers) -> Self {
self.routes.0.workers = workers;
self
}
}
impl<B: Broker + 'static, R: RouterDef<B>, C, L> Router<B, R, C, L> {
#[must_use]
pub fn handlers(&self) -> Vec<HandlerMetadata> {
let mut out = Vec::new();
self.routes.collect_handlers(&mut out);
out
}
}
struct ComposedBlanket<'a, Outer, Inner> {
outer: &'a Outer,
inner: &'a Inner,
}
impl<Outer: BlanketLayer, Inner: BlanketLayer> BlanketLayer for ComposedBlanket<'_, Outer, Inner> {
fn apply<M, H>(&self, handler: H) -> impl Handler<M> + 'static
where
M: Send + Sync + 'static,
H: Handler<M> + 'static,
{
self.outer.apply::<M, _>(self.inner.apply::<M, _>(handler))
}
}
impl<B, R, C, L> RouterDef<B> for Router<B, R, C, L>
where
B: Broker + 'static,
R: RouterDef<B>,
L: BlanketLayer,
{
fn mount<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
let composed = ComposedBlanket {
outer: global,
inner: &self.layers,
};
self.routes.mount(&composed, sink);
}
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>) {
self.routes.collect_handlers(out);
}
}
impl<B, R, C, L> MountRoute<B> for Router<B, R, C, L>
where
B: Broker + 'static,
R: RouterDef<B>,
L: BlanketLayer,
{
fn mount_one<G: BlanketLayer>(self, global: &G, sink: &mut RouterSink<B>) {
RouterDef::mount(self, global, sink);
}
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
self.routes.collect_handlers(out);
}
}