use std::{future::Future, sync::Arc};
use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;
use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};
use super::context::Context;
use super::handler::{Handler, HandlerResult};
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};
pub trait PublishingDef: Send + Sync {
type Input;
type Reply;
type Source;
fn source(&self) -> Self::Source;
fn reply_name(&self) -> &str;
fn description(&self) -> Option<&str> {
None
}
fn input_schema(&self) -> Option<String> {
None
}
fn call(&self, input: &Self::Input) -> impl Future<Output = Self::Reply> + Send;
}
pub struct PublishingHandler<D, C, P, PC, PL> {
pub(crate) def: D,
pub(crate) codec: C,
pub(crate) publisher: TypedPublisher<P, PC, PL>,
pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
}
impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublishingHandler")
.field("publisher", &self.publisher)
.finish_non_exhaustive()
}
}
impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
M: IncomingMessage,
D: PublishingDef,
D::Input: DeserializeOwned + Send + Sync,
D::Reply: Serialize + Send + Sync,
C: Codec,
P: Publisher,
PC: Codec,
PL: PublishLayer,
{
async fn handle(&self, msg: &M, _ctx: &mut Context<'_>) -> HandlerResult {
let input = match self.codec.decode::<D::Input>(msg.payload()) {
Ok(value) => value,
Err(err) => {
warn!(target: "ruststream::dispatch", error = %err, "codec decode failed");
return HandlerResult::drop();
}
};
let reply = self.def.call(&input).await;
let name = self.def.reply_name();
if let Err(err) = self.publisher.publish(name, &reply, &self.pipeline).await {
warn!(target: "ruststream::dispatch", error = %err, "reply publish failed");
}
HandlerResult::Ack
}
}