Skip to main content

ash_rpc/observability/
mod.rs

1//! Observability features for ash-rpc
2//!
3//! Provides metrics collection, distributed tracing, and unified observability wrapper.
4
5use crate::{Message, MessageProcessor, ProcessorCapabilities, Response};
6use async_trait::async_trait;
7use std::sync::Arc;
8
9#[cfg(feature = "prometheus")]
10pub mod prometheus;
11
12#[cfg(feature = "opentelemetry")]
13pub mod tracing;
14
15pub mod macros;
16
17// Re-export the observable_setup macro from the crate root
18#[doc(inline)]
19pub use crate::observable_setup;
20
21use crate::logger::Logger;
22
23/// Unified observability processor wrapping metrics, tracing, and logging
24pub struct ObservableProcessor {
25    inner: Arc<dyn MessageProcessor + Send + Sync>,
26    #[cfg(feature = "prometheus")]
27    metrics: Option<Arc<prometheus::PrometheusMetrics>>,
28    #[cfg(feature = "opentelemetry")]
29    tracer: Option<Arc<tracing::TracingProcessor>>,
30    #[cfg(feature = "logging")]
31    logger: Option<Arc<dyn Logger>>,
32}
33
34impl ObservableProcessor {
35    /// Create a new builder for observable processor
36    pub fn builder(processor: Arc<dyn MessageProcessor + Send + Sync>) -> ObservabilityBuilder {
37        ObservabilityBuilder {
38            processor,
39            #[cfg(feature = "prometheus")]
40            metrics: None,
41            #[cfg(feature = "opentelemetry")]
42            tracer: None,
43            #[cfg(feature = "logging")]
44            logger: None,
45        }
46    }
47}
48
49#[async_trait]
50impl MessageProcessor for ObservableProcessor {
51    async fn process_message(&self, message: Message) -> Option<Response> {
52        #[cfg(feature = "logging")]
53        if let Some(logger) = &self.logger {
54            match &message {
55                Message::Request(req) => {
56                    logger.debug(
57                        "Processing request",
58                        &[("method", &req.method), ("has_id", &req.id.is_some())],
59                    );
60                }
61                Message::Notification(notif) => {
62                    logger.debug("Processing notification", &[("method", &notif.method)]);
63                }
64                Message::Response(_) => {
65                    logger.debug("Received response", &[]);
66                }
67            }
68        }
69
70        #[cfg(feature = "prometheus")]
71        let start = std::time::Instant::now();
72
73        #[cfg(feature = "opentelemetry")]
74        let span_guard = if let Some(tracer) = &self.tracer {
75            tracer.start_span(&message)
76        } else {
77            None
78        };
79
80        let response = self.inner.process_message(message.clone()).await;
81
82        #[cfg(feature = "prometheus")]
83        if let Some(metrics) = &self.metrics {
84            let duration = start.elapsed();
85            let method = match &message {
86                Message::Request(req) => &req.method,
87                Message::Notification(notif) => &notif.method,
88                Message::Response(_) => "response",
89            };
90
91            metrics.record_request(
92                method,
93                duration,
94                response
95                    .as_ref()
96                    .is_none_or(super::types::Response::is_success),
97            );
98        }
99
100        #[cfg(feature = "opentelemetry")]
101        if let Some(mut guard) = span_guard
102            && let Some(resp) = &response
103            && !resp.is_success()
104        {
105            guard.record_error();
106        }
107
108        #[cfg(feature = "logging")]
109        if let Some(logger) = &self.logger
110            && let Some(resp) = &response
111        {
112            if resp.is_success() {
113                logger.debug("Request succeeded", &[]);
114            } else {
115                logger.warn("Request failed", &[]);
116            }
117        }
118
119        response
120    }
121
122    fn get_capabilities(&self) -> ProcessorCapabilities {
123        self.inner.get_capabilities()
124    }
125}
126
127/// Builder for creating observable processors
128pub struct ObservabilityBuilder {
129    processor: Arc<dyn MessageProcessor + Send + Sync>,
130    #[cfg(feature = "prometheus")]
131    metrics: Option<Arc<prometheus::PrometheusMetrics>>,
132    #[cfg(feature = "opentelemetry")]
133    tracer: Option<Arc<tracing::TracingProcessor>>,
134    #[cfg(feature = "logging")]
135    logger: Option<Arc<dyn Logger>>,
136}
137
138impl ObservabilityBuilder {
139    /// Add Prometheus metrics collection
140    #[cfg(feature = "prometheus")]
141    #[must_use]
142    pub fn with_metrics(mut self, metrics: Arc<prometheus::PrometheusMetrics>) -> Self {
143        self.metrics = Some(metrics);
144        self
145    }
146
147    /// Add OpenTelemetry tracing
148    #[cfg(feature = "opentelemetry")]
149    #[must_use]
150    pub fn with_tracing(mut self, tracer: Arc<tracing::TracingProcessor>) -> Self {
151        self.tracer = Some(tracer);
152        self
153    }
154
155    /// Add structured logging
156    #[cfg(feature = "logging")]
157    #[must_use]
158    pub fn with_logger(mut self, logger: Arc<dyn Logger>) -> Self {
159        self.logger = Some(logger);
160        self
161    }
162
163    /// Build the observable processor
164    #[must_use]
165    pub fn build(self) -> ObservableProcessor {
166        ObservableProcessor {
167            inner: self.processor,
168            #[cfg(feature = "prometheus")]
169            metrics: self.metrics,
170            #[cfg(feature = "opentelemetry")]
171            tracer: self.tracer,
172            #[cfg(feature = "logging")]
173            logger: self.logger,
174        }
175    }
176}