use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::codec::Codec;
use crate::{BatchSubscriber, Broker, Publisher, Subscriber, SubscriptionSource};
use crate::runtime::batch::BatchDef;
use crate::runtime::batch_publishing::BatchPublishingDef;
use crate::runtime::handler::Handler;
use crate::runtime::middleware::Layer;
use crate::runtime::publish::{PublishLayer, ReplyPublisher, TypedPublisher};
use crate::runtime::publishing::{PublishingDef, PublishingHandler};
use crate::runtime::subscriber_def::SubscriberDef;
use crate::runtime::typed::Typed;
use super::scope::BrokerScope;
impl<B: Broker + 'static, L> BrokerScope<B, L, ()> {
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include<D>(&mut self, def: D)
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
L: Layer<
Typed<
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message,
D::Input,
crate::codec::DefaultCodec,
D::Handler,
>,
>,
L::Handler: Handler<<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message>
+ 'static,
{
let source = def.source();
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default());
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_on<S, D>(&mut self, source: S, def: D)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
L: Layer<
Typed<
<S::Subscriber as Subscriber>::Message,
D::Input,
crate::codec::DefaultCodec,
D::Handler,
>,
>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
self.mount_subscriber(source, def, crate::codec::DefaultCodec::default());
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_batch<D>(&mut self, def: D)
where
D: BatchDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let source = def.source();
self.mount_batch(source, def, crate::codec::DefaultCodec::default());
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
pub fn include_batch_on<S, D>(&mut self, source: S, def: D)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
self.mount_batch(source, def, crate::codec::DefaultCodec::default());
}
pub fn include_batch_publishing<D, RP>(&mut self, def: D, publisher: RP)
where
D: BatchPublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
RP::Codec: Clone + 'static,
{
let codec = publisher.reply_codec().clone();
let source = def.source();
self.mount_batch_publishing(source, def, codec, publisher);
}
pub fn include_batch_publishing_on<S, D, RP>(&mut self, source: S, def: D, publisher: RP)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
RP::Codec: Clone + 'static,
{
let codec = publisher.reply_codec().clone();
self.mount_batch_publishing(source, def, codec, publisher);
}
pub fn include_publishing<D, P, PC, PL>(&mut self, def: D, publisher: TypedPublisher<P, PC, PL>)
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
L: Layer<PublishingHandler<D, PC, P, PC, PL>>,
L::Handler: Handler<<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message>
+ 'static,
{
let codec = publisher.codec().clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher);
}
pub fn include_publishing_on<S, D, P, PC, PL>(
&mut self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + Clone + 'static,
PL: PublishLayer + 'static,
L: Layer<PublishingHandler<D, PC, P, PC, PL>>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let codec = publisher.codec().clone();
self.mount_publishing(source, def, codec, publisher);
}
}
impl<B: Broker + 'static, L, C: Codec + Clone + 'static> BrokerScope<B, L, C> {
pub fn include<D>(&mut self, def: D)
where
D: SubscriberDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
L: Layer<
Typed<
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message,
D::Input,
C,
D::Handler,
>,
>,
L::Handler: Handler<<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message>
+ 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_subscriber(source, def, codec);
}
pub fn include_on<S, D>(&mut self, source: S, def: D)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
L: Layer<Typed<<S::Subscriber as Subscriber>::Message, D::Input, C, D::Handler>>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let codec = self.codec.clone();
self.mount_subscriber(source, def, codec);
}
pub fn include_batch<D>(&mut self, def: D)
where
D: BatchDef,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_batch(source, def, codec);
}
pub fn include_batch_on<S, D>(&mut self, source: S, def: D)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
{
let codec = self.codec.clone();
self.mount_batch(source, def, codec);
}
pub fn include_batch_publishing<D, RP>(&mut self, def: D, publisher: RP)
where
D: BatchPublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: BatchSubscriber + Send + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_batch_publishing(source, def, codec, publisher);
}
pub fn include_batch_publishing_on<S, D, RP>(&mut self, source: S, def: D, publisher: RP)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
RP: ReplyPublisher + 'static,
{
let codec = self.codec.clone();
self.mount_batch_publishing(source, def, codec, publisher);
}
pub fn include_publishing<D, P, PC, PL>(&mut self, def: D, publisher: TypedPublisher<P, PC, PL>)
where
D: PublishingDef + 'static,
D::Source: SubscriptionSource<B> + Send + 'static,
<D::Source as SubscriptionSource<B>>::Subscriber: Send + 'static,
<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message: 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
L: Layer<PublishingHandler<D, C, P, PC, PL>>,
L::Handler: Handler<<<D::Source as SubscriptionSource<B>>::Subscriber as Subscriber>::Message>
+ 'static,
{
let codec = self.codec.clone();
let source = def.source();
self.mount_publishing(source, def, codec, publisher);
}
pub fn include_publishing_on<S, D, P, PC, PL>(
&mut self,
source: S,
def: D,
publisher: TypedPublisher<P, PC, PL>,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
L: Layer<PublishingHandler<D, C, P, PC, PL>>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let codec = self.codec.clone();
self.mount_publishing(source, def, codec, publisher);
}
}