use std::{future::Future, sync::Arc};
use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;
use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};
use super::context::Context;
use super::dispatch::Workers;
use super::failure::{FailurePolicies, FailurePolicy};
use super::handler::{Handler, HandlerResult, Settle};
use super::metadata::HandlerMetadata;
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};
pub trait PublishingDef: Send + Sync {
type Input;
type Reply;
type Source;
fn source(&self) -> Self::Source;
fn reply_name(&self) -> &str;
fn workers(&self) -> Workers {
Workers::sequential()
}
fn failure_policies(&self) -> FailurePolicies {
FailurePolicies::default()
}
fn description(&self) -> Option<&str> {
None
}
fn input_schema(&self) -> Option<String> {
None
}
fn message_name(&self) -> Option<&'static str> {
None
}
fn message_description(&self) -> Option<&'static str> {
None
}
fn call(
&self,
input: &Self::Input,
ctx: &mut Context<'_>,
) -> impl Future<Output = Result<Self::Reply, HandlerResult>> + Send;
}
pub(crate) fn publishing_metadata<D: PublishingDef>(name: String, def: &D) -> HandlerMetadata {
HandlerMetadata::typed::<D::Input>(name)
.with_output_type(std::any::type_name::<D::Reply>())
.with_def_details(
def.description(),
def.input_schema(),
def.message_name(),
def.message_description(),
)
}
pub struct PublishingHandler<D, C, P, PC, PL> {
pub(crate) def: D,
pub(crate) codec: C,
pub(crate) publisher: TypedPublisher<P, PC, PL>,
pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
pub(crate) decode: FailurePolicy,
}
impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishingHandler")
.field("publisher", &self.publisher)
.finish_non_exhaustive()
}
}
impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
M: IncomingMessage,
D: PublishingDef,
D::Input: DeserializeOwned + Send + Sync,
D::Reply: Serialize + Send + Sync,
C: Codec,
P: Publisher,
PC: Codec,
PL: PublishLayer,
{
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
let input = match self.codec.decode::<D::Input>(msg.payload()) {
Ok(value) => value,
Err(err) => {
warn!(
target: "ruststream::dispatch",
subscription = %ctx.name(),
message_type = std::any::type_name::<D::Input>(),
error = %err,
"codec decode failed",
);
return match self.decode {
FailurePolicy::FailFast => {
ctx.fail_fast(&format!("decode failed: {err}"));
HandlerResult::drop().into()
}
other => other
.settlement()
.unwrap_or_else(HandlerResult::drop)
.into(),
};
}
};
let reply = match self.def.call(&input, ctx).await {
Ok(reply) => reply,
Err(result) => return result.into(),
};
let name = self.def.reply_name();
let publish = self
.publisher
.publish(name, &reply, &self.pipeline, ctx.extensions());
if let Err(err) = publish.await {
warn!(
target: "ruststream::dispatch",
subscription = %ctx.name(),
reply = %name,
reply_type = std::any::type_name::<D::Reply>(),
error = %err,
"reply publish failed",
);
return HandlerResult::retry().into();
}
HandlerResult::Ack.into()
}
}
#[cfg(test)]
mod tests {
use super::{PublishingDef, publishing_metadata};
use crate::Headers;
use crate::Name;
use crate::runtime::context::{Context, State};
use crate::runtime::dispatch::{Delivery, Workers};
use crate::runtime::handler::HandlerResult;
struct ManualPub;
impl PublishingDef for ManualPub {
type Input = u32;
type Reply = u32;
type Source = Name;
fn source(&self) -> Name {
Name::new("in")
}
#[allow(clippy::unnecessary_literal_bound)]
fn reply_name(&self) -> &str {
"out"
}
async fn call(&self, input: &u32, _ctx: &mut Context<'_>) -> Result<u32, HandlerResult> {
Ok(*input)
}
}
#[tokio::test]
async fn defaults_metadata_and_call() {
let def = ManualPub;
assert_eq!(def.workers(), Workers::sequential());
assert!(def.description().is_none());
assert!(def.input_schema().is_none());
assert!(def.message_name().is_none());
assert!(def.message_description().is_none());
assert_eq!(def.reply_name(), "out");
let _source = def.source();
let meta = publishing_metadata("in".to_owned(), &def);
assert_eq!(meta.name, "in");
let state = State::default();
let delivery = Delivery::empty();
let headers = Headers::new();
let mut ctx = Context::new("in", &headers, &state, &delivery);
assert_eq!(def.call(&5, &mut ctx).await.unwrap(), 5);
}
}