use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::codec::Codec;
use crate::{BatchSubscriber, Broker, Publisher, Subscriber, SubscriptionSource};
use crate::runtime::batch::BatchHandler;
use crate::runtime::batch_publishing::{BatchPublishingCall, BatchPublishingHandler};
use crate::runtime::dispatch::Workers;
use crate::runtime::failure::FailurePolicies;
use crate::runtime::handler::Handler;
use crate::runtime::metadata::HandlerMetadata;
use crate::runtime::middleware::BlanketLayer;
use crate::runtime::publish::{PublishPipeline, PublishTransform, ReplyPublisher, TypedPublisher};
use crate::runtime::publishing::{PublishingCall, PublishingHandler};
use super::SourceMessage;
use super::sink::RouterSink;
#[doc(hidden)]
#[derive(Debug)]
pub struct SubscribeRoute<S, H> {
pub(super) source: S,
pub(super) handler: H,
pub(super) meta: HandlerMetadata,
pub(super) policies: FailurePolicies,
pub(super) workers: Workers,
}
#[doc(hidden)]
#[derive(Debug)]
pub struct HandleRoute<S, H> {
pub(super) subscriber: S,
pub(super) handler: H,
pub(super) meta: HandlerMetadata,
pub(super) policies: FailurePolicies,
}
#[doc(hidden)]
#[derive(Debug)]
pub struct BatchRoute<S, H> {
pub(super) source: S,
pub(super) handler: H,
pub(super) meta: HandlerMetadata,
pub(super) policies: FailurePolicies,
pub(super) workers: Workers,
}
pub(super) trait MountRoute<B, St> {
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
pipeline: &PP,
sink: &mut RouterSink<B, St>,
);
}
pub(super) trait RouteMeta {
fn collect(&self, out: &mut Vec<HandlerMetadata>);
}
impl<S, H> RouteMeta for SubscribeRoute<S, H> {
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<S, H> RouteMeta for BatchRoute<S, H> {
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<S, H> RouteMeta for HandleRoute<S, H> {
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<B, S, H, St> MountRoute<B, St> for SubscribeRoute<S, H>
where
B: Broker + 'static,
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
SourceMessage<B, S>: Send + Sync + 'static,
St: Send + Sync + 'static,
H: Handler<SourceMessage<B, S>, (), St> + 'static,
{
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
_pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
let handler = global.apply::<SourceMessage<B, S>, (), St, H>(self.handler);
sink.push_subscribe_workers(self.source, handler, self.meta, self.policies, self.workers);
}
}
impl<B, S, H, St> MountRoute<B, St> for BatchRoute<S, H>
where
B: Broker + 'static,
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
SourceMessage<B, S>: Send + 'static,
St: Send + Sync + 'static,
H: BatchHandler<SourceMessage<B, S>, St> + 'static,
{
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
_global: &G,
_pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
sink.push_subscribe_batch(
self.source,
self.handler,
self.meta,
self.policies,
self.workers,
);
}
}
impl<B, S, H, St> MountRoute<B, St> for HandleRoute<S, H>
where
B: Broker + 'static,
S: Subscriber + Send + 'static,
S::Message: Send + Sync + 'static,
St: Send + Sync + 'static,
H: Handler<S::Message, (), St> + 'static,
{
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
_pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
let handler = global.apply::<S::Message, (), St, H>(self.handler);
sink.push_handle(self.subscriber, handler, self.meta, self.policies);
}
}
#[doc(hidden)]
pub struct PublishingRoute<S, D, C, P, PC, PL> {
pub(super) source: S,
pub(super) def: D,
pub(super) codec: C,
pub(super) publisher: TypedPublisher<P, PC, PL>,
pub(super) meta: HandlerMetadata,
pub(super) policies: FailurePolicies,
pub(super) workers: Workers,
}
impl<S, D, C, P, PC, PL> std::fmt::Debug for PublishingRoute<S, D, C, P, PC, PL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishingRoute")
.field("meta", &self.meta)
.finish_non_exhaustive()
}
}
#[doc(hidden)]
pub struct BatchPublishingRoute<S, D, C, R> {
pub(super) source: S,
pub(super) def: D,
pub(super) codec: C,
pub(super) publisher: R,
pub(super) meta: HandlerMetadata,
pub(super) policies: FailurePolicies,
pub(super) workers: Workers,
}
impl<S, D, C, R> std::fmt::Debug for BatchPublishingRoute<S, D, C, R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchPublishingRoute")
.field("meta", &self.meta)
.finish_non_exhaustive()
}
}
impl<S, D, C, P, PC, PL> RouteMeta for PublishingRoute<S, D, C, P, PC, PL> {
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<S, D, C, R> RouteMeta for BatchPublishingRoute<S, D, C, R> {
fn collect(&self, out: &mut Vec<HandlerMetadata>) {
out.push(self.meta.clone());
}
}
impl<B, S, D, C, P, PC, PL, St> MountRoute<B, St> for PublishingRoute<S, D, C, P, PC, PL>
where
B: Broker + 'static,
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
SourceMessage<B, S>: Send + Sync + 'static,
St: Send + Sync + 'static,
D: PublishingCall<St> + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
D::Context: crate::BuildContext<SourceMessage<B, S>> + Send + Sync + 'static,
C: Codec + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishTransform<D::Context> + 'static,
{
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
let handler = global.apply::<SourceMessage<B, S>, D::Context, St, _>(PublishingHandler {
def: self.def,
codec: self.codec,
publisher: self.publisher,
pipeline: pipeline.clone(),
decode: self.policies.decode,
});
sink.push_subscribe_workers(self.source, handler, self.meta, self.policies, self.workers);
}
}
impl<B, S, D, C, R, St> MountRoute<B, St> for BatchPublishingRoute<S, D, C, R>
where
B: Broker + 'static,
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
SourceMessage<B, S>: Send + Sync + 'static,
St: Send + Sync + 'static,
D: BatchPublishingCall<St> + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
R: ReplyPublisher + 'static,
{
fn mount_one<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
_global: &G,
pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
let handler = BatchPublishingHandler {
def: self.def,
codec: self.codec,
publisher: self.publisher,
pipeline: pipeline.clone(),
decode: self.policies.decode,
};
sink.push_subscribe_batch(self.source, handler, self.meta, self.policies, self.workers);
}
}
pub trait RouterDef<B, St = ()> {
#[doc(hidden)]
fn mount<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
pipeline: &PP,
sink: &mut RouterSink<B, St>,
);
}
pub trait RouterHandlers {
#[doc(hidden)]
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>);
}
impl<B: Broker + 'static, St> RouterDef<B, St> for () {
fn mount<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
_global: &G,
_pipeline: &PP,
_sink: &mut RouterSink<B, St>,
) {
}
}
impl RouterHandlers for () {
fn collect_handlers(&self, _out: &mut Vec<HandlerMetadata>) {}
}
impl<B, Head, Tail, St> RouterDef<B, St> for (Head, Tail)
where
B: Broker + 'static,
Head: MountRoute<B, St>,
Tail: RouterDef<B, St>,
{
fn mount<G: BlanketLayer, PP: PublishPipeline + Clone + 'static>(
self,
global: &G,
pipeline: &PP,
sink: &mut RouterSink<B, St>,
) {
self.1.mount(global, pipeline, sink);
self.0.mount_one(global, pipeline, sink);
}
}
impl<Head, Tail> RouterHandlers for (Head, Tail)
where
Head: RouteMeta,
Tail: RouterHandlers,
{
fn collect_handlers(&self, out: &mut Vec<HandlerMetadata>) {
self.1.collect_handlers(out);
self.0.collect(out);
}
}