use std::{future::Future, sync::Arc};
use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;
use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};
use super::context::Context;
use super::handler::{Handler, HandlerResult};
use super::metadata::HandlerMetadata;
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};
pub trait PublishingDef: Send + Sync {
type Input;
type Reply;
type Source;
fn source(&self) -> Self::Source;
fn reply_name(&self) -> &str;
fn description(&self) -> Option<&str> {
None
}
fn input_schema(&self) -> Option<String> {
None
}
fn message_name(&self) -> Option<&'static str> {
None
}
fn message_description(&self) -> Option<&'static str> {
None
}
fn call(
&self,
input: &Self::Input,
ctx: &mut Context<'_>,
) -> impl Future<Output = Result<Self::Reply, HandlerResult>> + Send;
}
pub(crate) fn publishing_metadata<D: PublishingDef>(name: String, def: &D) -> HandlerMetadata {
let mut meta = HandlerMetadata::typed::<D::Input>(name)
.with_output_type(std::any::type_name::<D::Reply>());
if let Some(description) = def.description() {
meta = meta.with_description(description.to_owned());
}
if let Some(schema) = def.input_schema() {
meta = meta.with_payload_schema(schema);
}
if let Some(message_name) = def.message_name() {
meta = meta.with_message_name(message_name);
}
if let Some(message_description) = def.message_description() {
meta = meta.with_message_description(message_description);
}
meta
}
pub struct PublishingHandler<D, C, P, PC, PL> {
pub(crate) def: D,
pub(crate) codec: C,
pub(crate) publisher: TypedPublisher<P, PC, PL>,
pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
}
impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishingHandler")
.field("publisher", &self.publisher)
.finish_non_exhaustive()
}
}
impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
M: IncomingMessage,
D: PublishingDef,
D::Input: DeserializeOwned + Send + Sync,
D::Reply: Serialize + Send + Sync,
C: Codec,
P: Publisher,
PC: Codec,
PL: PublishLayer,
{
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
let input = match self.codec.decode::<D::Input>(msg.payload()) {
Ok(value) => value,
Err(err) => {
warn!(target: "ruststream::dispatch", error = %err, "codec decode failed");
return HandlerResult::drop();
}
};
let reply = match self.def.call(&input, ctx).await {
Ok(reply) => reply,
Err(result) => return result,
};
let name = self.def.reply_name();
if let Err(err) = self.publisher.publish(name, &reply, &self.pipeline).await {
warn!(target: "ruststream::dispatch", error = %err, "reply publish failed");
return HandlerResult::retry();
}
HandlerResult::Ack
}
}