use std::sync::Arc;
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::codec::Codec;
use crate::{BatchSubscriber, Broker, Publisher, Subscriber, SubscriptionSource};
use crate::runtime::batch::{BatchDef, batch_metadata, typed_batch};
use crate::runtime::batch_publishing::{
BatchPublishingDef, BatchPublishingHandler, batch_publishing_metadata,
};
use crate::runtime::dispatch::Publishers;
use crate::runtime::failure::FailurePolicies;
use crate::runtime::handler::Handler;
use crate::runtime::metadata::HandlerMetadata;
use crate::runtime::middleware::{BlanketLayer, Identity, Layer};
use crate::runtime::publish::{PublishLayer, PublishMiddleware, ReplyPublisher, TypedPublisher};
use crate::runtime::publisher_registry::ErasedPublisher;
use crate::runtime::publishing::{PublishingDef, PublishingHandler, publishing_metadata};
use crate::runtime::router::{RouterDef, RouterSink};
use crate::runtime::subscriber_def::{SubscriberDef, subscriber_metadata};
use crate::runtime::typed::{Typed, typed};
pub struct BrokerScope<B, L = Identity, C = ()> {
pub(super) broker: Arc<B>,
pub(super) sink: RouterSink<B>,
pub(super) publishers: Publishers,
pub(super) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
pub(super) retry_publisher: Option<Arc<dyn ErasedPublisher>>,
pub(super) global: L,
pub(super) codec: C,
}
impl<B: Broker + 'static, L, C> BrokerScope<B, L, C> {
#[must_use]
pub fn broker(&self) -> &B {
&self.broker
}
#[must_use]
pub fn publisher(&self, name: &str) -> Option<Arc<dyn ErasedPublisher>> {
self.publishers.get(name).cloned()
}
pub fn retry_via<P>(&mut self, publisher: P)
where
P: Publisher + 'static,
{
self.retry_publisher = Some(Arc::new(publisher));
}
pub fn handle<S, H>(&mut self, subscriber: S, handler: H, meta: HandlerMetadata)
where
S: Subscriber + Send + 'static,
H: Handler<S::Message> + 'static,
L: Layer<H>,
L::Handler: Handler<S::Message> + 'static,
{
let handler = self.global.layer(handler);
self.sink
.push_handle(subscriber, handler, meta, FailurePolicies::default());
}
pub fn subscribe<S, H>(&mut self, source: S, handler: H, meta: HandlerMetadata)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
H: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
L: Layer<H>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let handler = self.global.layer(handler);
self.sink
.push_subscribe(source, handler, meta, FailurePolicies::default());
}
pub fn include_router<R>(&mut self, router: R)
where
R: RouterDef<B>,
L: BlanketLayer,
{
router.mount(&self.global, &mut self.sink);
}
}
impl<B: Broker + 'static, L, SC> BrokerScope<B, L, SC> {
pub(super) fn mount_subscriber<S, D, C>(&mut self, source: S, def: D, codec: C)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: SubscriberDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
L: Layer<Typed<<S::Subscriber as Subscriber>::Message, D::Input, C, D::Handler>>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let meta = subscriber_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = self
.global
.layer(typed(codec, def.into_handler()).on_decode_failure(policies.decode));
self.sink
.push_subscribe_workers(source, handler, meta, policies, workers);
}
pub(super) fn mount_batch<S, D, C>(&mut self, source: S, def: D, codec: C)
where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchDef,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Handler: 'static,
C: Codec + 'static,
{
let meta = batch_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = typed_batch(codec, def.into_handler()).with_decode(policies.decode);
self.sink
.push_subscribe_batch(source, handler, meta, policies, workers);
}
pub(super) fn mount_batch_publishing<S, D, C, RP>(
&mut self,
source: S,
def: D,
codec: C,
publisher: RP,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: BatchSubscriber + Send + 'static,
D: BatchPublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
RP: ReplyPublisher + 'static,
{
let meta = batch_publishing_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = BatchPublishingHandler {
def,
codec,
publisher,
pipeline: self.pipeline.clone(),
decode: policies.decode,
};
self.sink
.push_subscribe_batch(source, handler, meta, policies, workers);
}
pub(super) fn mount_publishing<S, D, C, P, PC, PL>(
&mut self,
source: S,
def: D,
codec: C,
publisher: TypedPublisher<P, PC, PL>,
) where
S: SubscriptionSource<B> + Send + 'static,
S::Subscriber: Send + 'static,
<S::Subscriber as Subscriber>::Message: 'static,
D: PublishingDef + 'static,
D::Input: DeserializeOwned + Send + Sync + 'static,
D::Reply: Serialize + Send + Sync + 'static,
C: Codec + 'static,
P: Publisher + 'static,
PC: Codec + 'static,
PL: PublishLayer + 'static,
L: Layer<PublishingHandler<D, C, P, PC, PL>>,
L::Handler: Handler<<S::Subscriber as Subscriber>::Message> + 'static,
{
let meta = publishing_metadata(source.name().to_owned(), &def);
let policies = def.failure_policies();
let workers = def.workers();
let handler = self.global.layer(PublishingHandler {
def,
codec,
publisher,
pipeline: self.pipeline.clone(),
decode: policies.decode,
});
self.sink
.push_subscribe_workers(source, handler, meta, policies, workers);
}
}
impl<B, L, C> std::fmt::Debug for BrokerScope<B, L, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BrokerScope")
.field("sink", &self.sink)
.finish_non_exhaustive()
}
}