use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
use bytes::BytesMut;
use serde::Serialize;
use tracing::warn;
use super::lifecycle::BoxError;
use super::publisher_registry::ErasedPublisher;
use crate::codec::Codec;
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
use crate::codec::DefaultCodec;
use crate::runtime::publish::sealed::Sealed;
use crate::{Headers, Publisher, TransactionalPublisher};
type PublishFut<'a> = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'a>>;
#[derive(Debug, Clone)]
pub struct Outgoing<'a> {
name: Cow<'a, str>,
payload: BytesMut,
headers: Headers,
}
impl<'a> Outgoing<'a> {
#[must_use]
pub fn new(name: impl Into<Cow<'a, str>>, payload: impl Into<BytesMut>) -> Self {
Self {
name: name.into(),
payload: payload.into(),
headers: Headers::new(),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub fn set_name(&mut self, name: impl Into<Cow<'a, str>>) {
self.name = name.into();
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn payload_mut(&mut self) -> &mut BytesMut {
&mut self.payload
}
pub fn set_payload(&mut self, payload: impl Into<BytesMut>) {
self.payload = payload.into();
}
#[must_use]
pub fn headers(&self) -> &Headers {
&self.headers
}
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}
}
pub trait PublishPipeline: Send + Sync {
fn run<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
send: &'a dyn ErasedPublisher,
) -> PublishFut<'a>;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishIdentity;
impl PublishPipeline for PublishIdentity {
fn run<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
send: &'a dyn ErasedPublisher,
) -> PublishFut<'a> {
send.publish_message(out.name(), out.payload(), out.headers())
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishStack<Head, Tail> {
head: Head,
tail: Tail,
}
impl<Head, Tail> PublishStack<Head, Tail> {
pub(crate) const fn new(head: Head, tail: Tail) -> Self {
Self { head, tail }
}
}
impl<Head: PublishLayer, Tail: PublishPipeline> PublishPipeline for PublishStack<Head, Tail> {
fn run<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
send: &'a dyn ErasedPublisher,
) -> PublishFut<'a> {
self.head.on_publish(
out,
PublishNext {
tail: &self.tail,
send,
},
)
}
}
pub trait PublishLayer: Send + Sync {
fn on_publish<'a, N: PublishPipeline>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a, N>,
) -> PublishFut<'a>;
}
pub struct PublishNext<'a, N> {
tail: &'a N,
send: &'a dyn ErasedPublisher,
}
impl<'a, N: PublishPipeline> PublishNext<'a, N> {
#[must_use]
pub fn run(self, out: &'a mut Outgoing<'a>) -> PublishFut<'a> {
self.tail.run(out, self.send)
}
}
impl<N> std::fmt::Debug for PublishNext<'_, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishNext").finish_non_exhaustive()
}
}
pub trait PublishDynLayer: Send + Sync {
fn on_publish<'a>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishDynNext<'a>,
) -> PublishFut<'a>;
}
pub struct PublishDynNext<'a> {
rest: &'a [Arc<dyn PublishDynLayer>],
tail: Box<dyn FnOnce(&'a mut Outgoing<'a>) -> PublishFut<'a> + Send + 'a>,
}
impl<'a> PublishDynNext<'a> {
#[must_use]
pub fn run(self, out: &'a mut Outgoing<'a>) -> PublishFut<'a> {
match self.rest.split_first() {
Some((middleware, rest)) => middleware.on_publish(
out,
PublishDynNext {
rest,
tail: self.tail,
},
),
None => (self.tail)(out),
}
}
}
impl std::fmt::Debug for PublishDynNext<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishDynNext")
.field("remaining", &self.rest.len())
.finish_non_exhaustive()
}
}
pub struct PublishDynStack(Arc<[Arc<dyn PublishDynLayer>]>);
impl Clone for PublishDynStack {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl PublishDynStack {
#[must_use]
pub fn new(middleware: impl IntoIterator<Item = Arc<dyn PublishDynLayer>>) -> Self {
Self(middleware.into_iter().collect())
}
}
impl std::fmt::Debug for PublishDynStack {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishDynStack")
.field("middleware", &self.0.len())
.finish_non_exhaustive()
}
}
impl PublishLayer for PublishDynStack {
fn on_publish<'a, N: PublishPipeline>(
&'a self,
out: &'a mut Outgoing<'a>,
next: PublishNext<'a, N>,
) -> PublishFut<'a> {
PublishDynNext {
rest: &self.0,
tail: Box::new(move |out| next.run(out)),
}
.run(out)
}
}
pub struct PublishContext<'a, C = ()> {
name: &'a str,
headers: &'a Headers,
cx: &'a C,
}
impl<'a, C> PublishContext<'a, C> {
pub(crate) fn new(name: &'a str, headers: &'a Headers, cx: &'a C) -> Self {
Self { name, headers, cx }
}
#[must_use]
pub fn name(&self) -> &str {
self.name
}
#[must_use]
pub fn headers(&self) -> &Headers {
self.headers
}
pub fn context<K: crate::Field<C>>(&self, key: K) -> K::Value<'_> {
key.get(self.cx)
}
}
impl<C> std::fmt::Debug for PublishContext<'_, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishContext")
.field("name", &self.name)
.finish_non_exhaustive()
}
}
pub trait PublishTransform<C = ()>: Send + Sync {
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishTransformIdentity;
impl<C> PublishTransform<C> for PublishTransformIdentity {
fn apply(&self, _out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PublishTransformStack<Inner, Outer> {
inner: Inner,
outer: Outer,
}
impl<C, Inner: PublishTransform<C>, Outer: PublishTransform<C>> PublishTransform<C>
for PublishTransformStack<Inner, Outer>
{
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
self.inner.apply(out, cx);
self.outer.apply(out, cx);
}
}
pub trait BatchPublishTransform<C = ()>: Send + Sync {
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct BatchTransformIdentity;
impl<C> BatchPublishTransform<C> for BatchTransformIdentity {
fn apply(&self, _out: &mut Outgoing<'_>, _cx: &PublishContext<'_, C>) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct BatchPublishTransformStack<Inner, Outer> {
inner: Inner,
outer: Outer,
}
impl<C, Inner: BatchPublishTransform<C>, Outer: BatchPublishTransform<C>> BatchPublishTransform<C>
for BatchPublishTransformStack<Inner, Outer>
{
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
self.inner.apply(out, cx);
self.outer.apply(out, cx);
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ForBatch<L>(L);
impl<C, L: PublishTransform<C>> BatchPublishTransform<C> for ForBatch<L> {
fn apply(&self, out: &mut Outgoing<'_>, cx: &PublishContext<'_, C>) {
self.0.apply(out, cx);
}
}
#[must_use]
pub fn for_batch<L>(transform: L) -> ForBatch<L> {
ForBatch(transform)
}
pub struct TypedPublisher<P, C, PL = PublishTransformIdentity, BL = BatchTransformIdentity> {
publisher: P,
codec: C,
layers: PL,
batch_layers: BL,
}
impl<P, C> TypedPublisher<P, C, PublishTransformIdentity, BatchTransformIdentity> {
#[must_use]
pub fn with_codec(publisher: P, codec: C) -> Self {
Self {
publisher,
codec,
layers: PublishTransformIdentity,
batch_layers: BatchTransformIdentity,
}
}
}
#[cfg(any(feature = "json", feature = "cbor", feature = "msgpack"))]
impl<P> TypedPublisher<P, DefaultCodec, PublishTransformIdentity, BatchTransformIdentity> {
#[must_use]
pub fn new(publisher: P) -> Self {
Self::with_codec(publisher, DefaultCodec::default())
}
}
impl<P, C, PL, BL> TypedPublisher<P, C, PL, BL> {
pub(crate) const fn codec(&self) -> &C {
&self.codec
}
#[must_use]
pub fn transform<N>(
self,
transform: N,
) -> TypedPublisher<P, C, PublishTransformStack<PL, N>, BL> {
TypedPublisher {
publisher: self.publisher,
codec: self.codec,
layers: PublishTransformStack {
inner: self.layers,
outer: transform,
},
batch_layers: self.batch_layers,
}
}
#[must_use]
pub fn batch_transform<N>(
self,
transform: N,
) -> TypedPublisher<P, C, PL, BatchPublishTransformStack<BL, N>> {
TypedPublisher {
publisher: self.publisher,
codec: self.codec,
layers: self.layers,
batch_layers: BatchPublishTransformStack {
inner: self.batch_layers,
outer: transform,
},
}
}
#[must_use]
pub fn transactional(self) -> Transactional<P, C, PL, BL>
where
P: TransactionalPublisher,
{
Transactional { inner: self }
}
}
impl<P: Publisher, C: Codec, PL, BL> TypedPublisher<P, C, PL, BL> {
pub(crate) async fn publish<T: Serialize + Sync, Cx, PP>(
&self,
name: &str,
value: &T,
pipeline: &PP,
cx: &PublishContext<'_, Cx>,
) -> Result<(), BoxError>
where
PL: PublishTransform<Cx>,
BL: Sync,
Cx: Sync,
PP: PublishPipeline,
{
let payload = self
.codec
.encode(value)
.map_err(|e| Box::new(e) as BoxError)?;
let mut out = Outgoing::new(name, payload);
self.layers.apply(&mut out, cx);
pipeline.run(&mut out, &self.publisher).await
}
pub(crate) async fn publish_batched<T: Serialize + Sync, Cx, PP>(
&self,
name: &str,
value: &T,
pipeline: &PP,
cx: &PublishContext<'_, Cx>,
) -> Result<(), BoxError>
where
PL: Sync,
BL: BatchPublishTransform<Cx>,
Cx: Sync,
PP: PublishPipeline,
{
let payload = self
.codec
.encode(value)
.map_err(|e| Box::new(e) as BoxError)?;
let mut out = Outgoing::new(name, payload);
self.batch_layers.apply(&mut out, cx);
pipeline.run(&mut out, &self.publisher).await
}
}
impl<P, C, PL, BL> std::fmt::Debug for TypedPublisher<P, C, PL, BL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TypedPublisher").finish_non_exhaustive()
}
}
pub struct Transactional<P, C, PL = PublishTransformIdentity, BL = BatchTransformIdentity> {
inner: TypedPublisher<P, C, PL, BL>,
}
impl<P, C, PL, BL> std::fmt::Debug for Transactional<P, C, PL, BL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transactional").finish_non_exhaustive()
}
}
mod sealed {
pub trait Sealed {}
impl<P, C, PL, BL> Sealed for super::TypedPublisher<P, C, PL, BL> {}
impl<P, C, PL, BL> Sealed for super::Transactional<P, C, PL, BL> {}
}
pub trait ReplyPublisher<Cx = ()>: Sealed + Send + Sync {
type Codec: Codec;
#[doc(hidden)]
fn reply_codec(&self) -> &Self::Codec;
#[doc(hidden)]
fn publish_batch<'a, T, PP>(
&'a self,
name: &'a str,
replies: &'a [T],
pipeline: &'a PP,
cx: &'a PublishContext<'a, Cx>,
) -> impl Future<Output = Result<(), BoxError>> + Send
where
T: Serialize + Sync,
PP: PublishPipeline;
}
impl<P, C, PL, BL, Cx> ReplyPublisher<Cx> for TypedPublisher<P, C, PL, BL>
where
P: Publisher,
C: Codec,
PL: Send + Sync,
BL: BatchPublishTransform<Cx>,
Cx: Sync,
{
type Codec = C;
fn reply_codec(&self) -> &C {
self.codec()
}
async fn publish_batch<'a, T, PP>(
&'a self,
name: &'a str,
replies: &'a [T],
pipeline: &'a PP,
cx: &'a PublishContext<'a, Cx>,
) -> Result<(), BoxError>
where
T: Serialize + Sync,
PP: PublishPipeline,
{
for reply in replies {
self.publish_batched(name, reply, pipeline, cx).await?;
}
Ok(())
}
}
impl<P, C, PL, BL, Cx> ReplyPublisher<Cx> for Transactional<P, C, PL, BL>
where
P: TransactionalPublisher,
C: Codec,
PL: Send + Sync,
BL: BatchPublishTransform<Cx>,
Cx: Sync,
{
type Codec = C;
fn reply_codec(&self) -> &C {
self.inner.codec()
}
async fn publish_batch<'a, T, PP>(
&'a self,
name: &'a str,
replies: &'a [T],
pipeline: &'a PP,
cx: &'a PublishContext<'a, Cx>,
) -> Result<(), BoxError>
where
T: Serialize + Sync,
PP: PublishPipeline,
{
let publisher = &self.inner.publisher;
publisher
.begin_transaction()
.await
.map_err(|e| Box::new(e) as BoxError)?;
for reply in replies {
if let Err(err) = self.inner.publish_batched(name, reply, pipeline, cx).await {
abort_quietly(publisher).await;
return Err(err);
}
}
if let Err(err) = publisher.commit().await {
abort_quietly(publisher).await;
return Err(Box::new(err) as BoxError);
}
Ok(())
}
}
async fn abort_quietly<P: TransactionalPublisher>(publisher: &P) {
if let Err(err) = publisher.abort().await {
warn!(target: "ruststream::dispatch", error = %err, "transaction abort failed");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn borrowed_name_is_not_owned() {
let out = Outgoing::new("orders.created", b"payload".as_slice());
assert!(matches!(out.name, Cow::Borrowed(_)));
assert_eq!(out.name(), "orders.created");
assert_eq!(out.payload(), b"payload");
}
#[test]
fn owned_name_moves_in() {
let computed = format!("orders.{}", 42);
let out = Outgoing::new(computed, BytesMut::from(&b"x"[..]));
assert!(matches!(out.name, Cow::Owned(_)));
assert_eq!(out.name(), "orders.42");
}
#[test]
fn payload_mutates_in_place() {
let mut out = Outgoing::new("t", BytesMut::from(&b"body"[..]));
out.payload_mut().extend_from_slice(b"!");
assert_eq!(out.payload(), b"body!");
out.set_payload(b"fresh".as_slice());
assert_eq!(out.payload(), b"fresh");
}
#[test]
fn set_name_and_headers() {
let mut out = Outgoing::new("a", b"".as_slice());
out.set_name("b");
out.headers_mut().insert("k", "v");
assert_eq!(out.name(), "b");
assert_eq!(out.headers().get_str("k"), Some("v"));
}
}