use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::codec::Codec;
use crate::{BatchSubscriber, Broker, Publisher, SubscriptionSource};
use crate::runtime::batch::{BatchDef, SliceHandler};
use crate::runtime::batch_publishing::BatchPublishingDef;
use crate::runtime::metadata::HandlerMetadata;
use crate::runtime::publish::{PublishLayer, ReplyPublisher, TypedPublisher};
use crate::runtime::publishing::PublishingDef;
use crate::runtime::subscriber_def::SubscriberDef;
use super::builder::Router;
use super::{
BatchPublishingRouter, IncludedBatchRouter, IncludedRouter, PublishingRouter,
SubscribedBatchRouter,
};
impl<B: Broker + 'static, R, RL> Router<B, R, (), RL> {
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include<D>(
self,
def: D,
) -> IncludedRouter<B, D::Source, D, crate::codec::DefaultCodec, (), RL, R>
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let source = def.source();
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default())
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_on<S, D>(
self,
source: S,
def: D,
) -> IncludedRouter<B, S, D, crate::codec::DefaultCodec, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default())
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_batch<D>(
self,
def: D,
) -> IncludedBatchRouter<B, D::Source, D, crate::codec::DefaultCodec, (), RL, R>
where
D: BatchDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let source = def.source();
self.mount_batch(source, def, crate::codec::DefaultCodec::default())
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_batch_on<S, D>(
self,
source: S,
def: D,
) -> IncludedBatchRouter<B, S, D, crate::codec::DefaultCodec, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.mount_batch(source, def, crate::codec::DefaultCodec::default())
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn subscribe_batch<S, T, H>(
self,
source: S,
handler: H,
meta: HandlerMetadata,
) -> SubscribedBatchRouter<B, S, T, crate::codec::DefaultCodec, H, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
T: DeserializeOwned + Send + Sync + 'static,
H: SliceHandler<T> + 'static,
{
self.push_batch_route(source, handler, crate::codec::DefaultCodec::default(), meta)
}
pub fn include_batch_publishing<D, RP>(
self,
def: D,
publisher: RP,
) -> BatchPublishingRouter<B, D::Source, D, RP::Codec, RP, (), RL, R>
where
D: BatchPublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
RP::Codec: Clone + 'static,
{
let codec = publisher.reply_codec().clone();
let source = def.source();
self.mount_batch_publishing(source, def, codec, publisher)
}
pub fn include_batch_publishing_on<S, D, RP>(
self,
source: S,
def: D,
publisher: RP,
) -> BatchPublishingRouter<B, S, D, RP::Codec, RP, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
RP::Codec: Clone + 'static,
{
let codec = publisher.reply_codec().clone();
self.mount_batch_publishing(source, def, codec, publisher)
}
pub fn include_publishing<D, P, PC, PL>(
self,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, D::Source, D, PC, P, PC, PL, (), RL, R>
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
{
let codec = publisher.codec().clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher)
}
pub fn include_publishing_on<S, D, P, PC, PL>(
self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, PC, P, PC, PL, (), RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
{
let codec = publisher.codec().clone();
self.mount_publishing(source, def, codec, publisher)
}
}
impl<B: Broker + 'static, R, C: Codec + Clone + 'static, RL> Router<B, R, C, RL> {
pub fn include<D>(self, def: D) -> IncludedRouter<B, D::Source, D, C, C, RL, R>
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_subscriber(source, def, codec)
}
pub fn include_on<S, D>(self, source: S, def: D) -> IncludedRouter<B, S, D, C, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
self.mount_subscriber(source, def, codec)
}
pub fn include_batch<D>(self, def: D) -> IncludedBatchRouter<B, D::Source, D, C, C, RL, R>
where
D: BatchDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_batch(source, def, codec)
}
pub fn include_batch_on<S, D>(
self,
source: S,
def: D,
) -> IncludedBatchRouter<B, S, D, C, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
self.mount_batch(source, def, codec)
}
pub fn subscribe_batch<S, T, H>(
self,
source: S,
handler: H,
meta: HandlerMetadata,
) -> SubscribedBatchRouter<B, S, T, C, H, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
T: DeserializeOwned + Send + Sync + 'static,
H: SliceHandler<T> + 'static,
{
let codec = self.codec.clone();
self.push_batch_route(source, handler, codec, meta)
}
pub fn include_batch_publishing<D, RP>(
self,
def: D,
publisher: RP,
) -> BatchPublishingRouter<B, D::Source, D, C, RP, C, RL, R>
where
D: BatchPublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_batch_publishing(source, def, codec, publisher)
}
pub fn include_batch_publishing_on<S, D, RP>(
self,
source: S,
def: D,
publisher: RP,
) -> BatchPublishingRouter<B, S, D, C, RP, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
{
let codec = self.codec.clone();
self.mount_batch_publishing(source, def, codec, publisher)
}
pub fn include_publishing<D, P, PC, PL>(
self,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, D::Source, D, C, P, PC, PL, C, RL, R>
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher)
}
pub fn include_publishing_on<S, D, P, PC, PL>(
self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) -> PublishingRouter<B, S, D, C, P, PC, PL, C, RL, R>
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
{
let codec = self.codec.clone();
self.mount_publishing(source, def, codec, publisher)
}
}