use std::time::Instant;
use ruststream::runtime::{BlanketLayer, Context, Handler, Layer, Outgoing, PublishLayer, Settle};
#[derive(Clone)]
pub(crate) struct Observe;
pub(crate) struct Observed<H>(H);
impl<H> Layer<H> for Observe {
type Handler = Observed<H>;
fn layer(&self, inner: H) -> Observed<H> {
Observed(inner)
}
}
impl BlanketLayer for Observe {
fn apply<M, H>(&self, handler: H) -> impl Handler<M> + 'static
where
M: Send + Sync + 'static,
H: Handler<M> + 'static,
{
Observed(handler)
}
}
impl<M: Send + Sync, H: Handler<M>> Handler<M> for Observed<H> {
async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
let channel = ctx.name().to_owned();
let started = Instant::now();
let settle = self.0.handle(msg, ctx).await;
tracing::info!(channel = %channel, elapsed = ?started.elapsed(), "handled");
settle
}
}
pub(crate) struct StampSource;
impl PublishLayer for StampSource {
fn apply(&self, out: &mut Outgoing<'_>) {
out.headers_mut()
.insert("x-source-service", b"orders-service".to_vec());
}
}