use crate::{Message, MessageProcessor, ProcessorCapabilities, Response};
use async_trait::async_trait;
use std::sync::Arc;
#[cfg(feature = "prometheus")]
pub mod prometheus;
#[cfg(feature = "opentelemetry")]
pub mod tracing;
pub mod macros;
#[doc(inline)]
pub use crate::observable_setup;
use crate::logger::Logger;
pub struct ObservableProcessor {
inner: Arc<dyn MessageProcessor + Send + Sync>,
#[cfg(feature = "prometheus")]
metrics: Option<Arc<prometheus::PrometheusMetrics>>,
#[cfg(feature = "opentelemetry")]
tracer: Option<Arc<tracing::TracingProcessor>>,
#[cfg(feature = "logging")]
logger: Option<Arc<dyn Logger>>,
}
impl ObservableProcessor {
pub fn builder(processor: Arc<dyn MessageProcessor + Send + Sync>) -> ObservabilityBuilder {
ObservabilityBuilder {
processor,
#[cfg(feature = "prometheus")]
metrics: None,
#[cfg(feature = "opentelemetry")]
tracer: None,
#[cfg(feature = "logging")]
logger: None,
}
}
}
#[async_trait]
impl MessageProcessor for ObservableProcessor {
async fn process_message(&self, message: Message) -> Option<Response> {
#[cfg(feature = "logging")]
if let Some(logger) = &self.logger {
match &message {
Message::Request(req) => {
logger.debug(
"Processing request",
&[("method", &req.method), ("has_id", &req.id.is_some())],
);
}
Message::Notification(notif) => {
logger.debug("Processing notification", &[("method", ¬if.method)]);
}
Message::Response(_) => {
logger.debug("Received response", &[]);
}
}
}
#[cfg(feature = "prometheus")]
let start = std::time::Instant::now();
#[cfg(feature = "opentelemetry")]
let span_guard = if let Some(tracer) = &self.tracer {
tracer.start_span(&message)
} else {
None
};
let response = self.inner.process_message(message.clone()).await;
#[cfg(feature = "prometheus")]
if let Some(metrics) = &self.metrics {
let duration = start.elapsed();
let method = match &message {
Message::Request(req) => &req.method,
Message::Notification(notif) => ¬if.method,
Message::Response(_) => "response",
};
metrics.record_request(
method,
duration,
response
.as_ref()
.is_none_or(super::types::Response::is_success),
);
}
#[cfg(feature = "opentelemetry")]
if let Some(mut guard) = span_guard
&& let Some(resp) = &response
&& !resp.is_success()
{
guard.record_error();
}
#[cfg(feature = "logging")]
if let Some(logger) = &self.logger
&& let Some(resp) = &response
{
if resp.is_success() {
logger.debug("Request succeeded", &[]);
} else {
logger.warn("Request failed", &[]);
}
}
response
}
fn get_capabilities(&self) -> ProcessorCapabilities {
self.inner.get_capabilities()
}
}
pub struct ObservabilityBuilder {
processor: Arc<dyn MessageProcessor + Send + Sync>,
#[cfg(feature = "prometheus")]
metrics: Option<Arc<prometheus::PrometheusMetrics>>,
#[cfg(feature = "opentelemetry")]
tracer: Option<Arc<tracing::TracingProcessor>>,
#[cfg(feature = "logging")]
logger: Option<Arc<dyn Logger>>,
}
impl ObservabilityBuilder {
#[cfg(feature = "prometheus")]
#[must_use]
pub fn with_metrics(mut self, metrics: Arc<prometheus::PrometheusMetrics>) -> Self {
self.metrics = Some(metrics);
self
}
#[cfg(feature = "opentelemetry")]
#[must_use]
pub fn with_tracing(mut self, tracer: Arc<tracing::TracingProcessor>) -> Self {
self.tracer = Some(tracer);
self
}
#[cfg(feature = "logging")]
#[must_use]
pub fn with_logger(mut self, logger: Arc<dyn Logger>) -> Self {
self.logger = Some(logger);
self
}
#[must_use]
pub fn build(self) -> ObservableProcessor {
ObservableProcessor {
inner: self.processor,
#[cfg(feature = "prometheus")]
metrics: self.metrics,
#[cfg(feature = "opentelemetry")]
tracer: self.tracer,
#[cfg(feature = "logging")]
logger: self.logger,
}
}
}