use std::{future::Future, pin::Pin, sync::Arc};
use serde::Serialize;
use crate::codec::Codec;
use crate::{Headers, Publisher};
use super::lifecycle::BoxError;
use super::publisher_registry::ErasedPublisher;
type PublishFut<'a> = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'a>>;
#[derive(Debug, Clone)]
pub struct Outgoing {
name: String,
payload: Vec<u8>,
headers: Headers,
}
impl Outgoing {
#[must_use]
pub fn new(name: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
Self {
name: name.into(),
payload: payload.into(),
headers: Headers::new(),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn set_name(&mut self, name: impl Into<String>) {
self.name = name.into();
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn payload_mut(&mut self) -> &mut Vec<u8> {
&mut self.payload
}
pub fn set_payload(&mut self, payload: impl Into<Vec<u8>>) {
self.payload = payload.into();
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}
}
pub trait PublishMiddleware: Send + Sync {
fn on_publish<'a>(&'a self, out: &'a mut Outgoing, next: PublishNext<'a>) -> PublishFut<'a>;
}
pub struct PublishNext<'a> {
rest: &'a [Arc<dyn PublishMiddleware>],
publisher: &'a dyn ErasedPublisher,
}
impl<'a> PublishNext<'a> {
#[must_use]
pub fn run(self, out: &'a mut Outgoing) -> PublishFut<'a> {
match self.rest.split_first() {
Some((middleware, rest)) => middleware.on_publish(
out,
PublishNext {
rest,
publisher: self.publisher,
},
),
None => self
.publisher
.publish_message(out.name(), out.payload(), out.headers()),
}
}
}
impl std::fmt::Debug for PublishNext<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishNext")
.field("remaining", &self.rest.len())
.finish_non_exhaustive()
}
}
pub(crate) fn run_publish<'a>(
pipeline: &'a [Arc<dyn PublishMiddleware>],
publisher: &'a dyn ErasedPublisher,
out: &'a mut Outgoing,
) -> PublishFut<'a> {
PublishNext {
rest: pipeline,
publisher,
}
.run(out)
}
pub struct ScopedPublisher<'a> {
publisher: &'a dyn ErasedPublisher,
pipeline: &'a [Arc<dyn PublishMiddleware>],
}
impl<'a> ScopedPublisher<'a> {
pub(crate) fn new(
publisher: &'a dyn ErasedPublisher,
pipeline: &'a [Arc<dyn PublishMiddleware>],
) -> Self {
Self {
publisher,
pipeline,
}
}
pub async fn publish(&self, mut out: Outgoing) -> Result<(), BoxError> {
run_publish(self.pipeline, self.publisher, &mut out).await
}
}
impl std::fmt::Debug for ScopedPublisher<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ScopedPublisher")
.field("layers", &self.pipeline.len())
.finish_non_exhaustive()
}
}
pub trait PublishLayer: Send + Sync {
fn apply(&self, out: &mut Outgoing);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishIdentity;
impl PublishLayer for PublishIdentity {
fn apply(&self, _out: &mut Outgoing) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishStack<Inner, Outer> {
inner: Inner,
outer: Outer,
}
impl<Inner: PublishLayer, Outer: PublishLayer> PublishLayer for PublishStack<Inner, Outer> {
fn apply(&self, out: &mut Outgoing) {
self.inner.apply(out);
self.outer.apply(out);
}
}
pub struct TypedPublisher<P, C, PL = PublishIdentity> {
publisher: P,
codec: C,
layers: PL,
}
impl<P, C> TypedPublisher<P, C, PublishIdentity> {
#[must_use]
pub fn with_codec(publisher: P, codec: C) -> Self {
Self {
publisher,
codec,
layers: PublishIdentity,
}
}
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
impl<P> TypedPublisher<P, crate::codec::DefaultCodec, PublishIdentity> {
#[must_use]
pub fn new(publisher: P) -> Self {
Self::with_codec(publisher, crate::codec::DefaultCodec::default())
}
}
impl<P, C, PL> TypedPublisher<P, C, PL> {
pub(crate) const fn codec(&self) -> &C {
&self.codec
}
#[must_use]
pub fn layer<N>(self, layer: N) -> TypedPublisher<P, C, PublishStack<PL, N>> {
TypedPublisher {
publisher: self.publisher,
codec: self.codec,
layers: PublishStack {
inner: self.layers,
outer: layer,
},
}
}
}
impl<P: Publisher, C: Codec, PL: PublishLayer> TypedPublisher<P, C, PL> {
pub(crate) async fn publish<T: Serialize + Sync>(
&self,
name: &str,
value: &T,
pipeline: &[Arc<dyn PublishMiddleware>],
) -> Result<(), BoxError> {
let bytes = self
.codec
.encode(value)
.map_err(|e| Box::new(e) as BoxError)?;
let mut out = Outgoing::new(name.to_owned(), bytes.to_vec());
self.layers.apply(&mut out);
run_publish(pipeline, &self.publisher, &mut out).await
}
}
impl<P, C, PL> std::fmt::Debug for TypedPublisher<P, C, PL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TypedPublisher").finish_non_exhaustive()
}
}