ash_rpc/observability/
mod.rs1use crate::{Message, MessageProcessor, ProcessorCapabilities, Response};
6use async_trait::async_trait;
7use std::sync::Arc;
8
9#[cfg(feature = "prometheus")]
10pub mod prometheus;
11
12#[cfg(feature = "opentelemetry")]
13pub mod tracing;
14
15pub mod macros;
16
17#[doc(inline)]
19pub use crate::observable_setup;
20
21use crate::logger::Logger;
22
23pub struct ObservableProcessor {
25 inner: Arc<dyn MessageProcessor + Send + Sync>,
26 #[cfg(feature = "prometheus")]
27 metrics: Option<Arc<prometheus::PrometheusMetrics>>,
28 #[cfg(feature = "opentelemetry")]
29 tracer: Option<Arc<tracing::TracingProcessor>>,
30 #[cfg(feature = "logging")]
31 logger: Option<Arc<dyn Logger>>,
32}
33
34impl ObservableProcessor {
35 pub fn builder(processor: Arc<dyn MessageProcessor + Send + Sync>) -> ObservabilityBuilder {
37 ObservabilityBuilder {
38 processor,
39 #[cfg(feature = "prometheus")]
40 metrics: None,
41 #[cfg(feature = "opentelemetry")]
42 tracer: None,
43 #[cfg(feature = "logging")]
44 logger: None,
45 }
46 }
47}
48
49#[async_trait]
50impl MessageProcessor for ObservableProcessor {
51 async fn process_message(&self, message: Message) -> Option<Response> {
52 #[cfg(feature = "logging")]
53 if let Some(logger) = &self.logger {
54 match &message {
55 Message::Request(req) => {
56 logger.debug(
57 "Processing request",
58 &[("method", &req.method), ("has_id", &req.id.is_some())],
59 );
60 }
61 Message::Notification(notif) => {
62 logger.debug("Processing notification", &[("method", ¬if.method)]);
63 }
64 Message::Response(_) => {
65 logger.debug("Received response", &[]);
66 }
67 }
68 }
69
70 #[cfg(feature = "prometheus")]
71 let start = std::time::Instant::now();
72
73 #[cfg(feature = "opentelemetry")]
74 let span_guard = if let Some(tracer) = &self.tracer {
75 tracer.start_span(&message)
76 } else {
77 None
78 };
79
80 let response = self.inner.process_message(message.clone()).await;
81
82 #[cfg(feature = "prometheus")]
83 if let Some(metrics) = &self.metrics {
84 let duration = start.elapsed();
85 let method = match &message {
86 Message::Request(req) => &req.method,
87 Message::Notification(notif) => ¬if.method,
88 Message::Response(_) => "response",
89 };
90
91 metrics.record_request(
92 method,
93 duration,
94 response
95 .as_ref()
96 .is_none_or(super::types::Response::is_success),
97 );
98 }
99
100 #[cfg(feature = "opentelemetry")]
101 if let Some(mut guard) = span_guard
102 && let Some(resp) = &response
103 && !resp.is_success()
104 {
105 guard.record_error();
106 }
107
108 #[cfg(feature = "logging")]
109 if let Some(logger) = &self.logger
110 && let Some(resp) = &response
111 {
112 if resp.is_success() {
113 logger.debug("Request succeeded", &[]);
114 } else {
115 logger.warn("Request failed", &[]);
116 }
117 }
118
119 response
120 }
121
122 fn get_capabilities(&self) -> ProcessorCapabilities {
123 self.inner.get_capabilities()
124 }
125}
126
127pub struct ObservabilityBuilder {
129 processor: Arc<dyn MessageProcessor + Send + Sync>,
130 #[cfg(feature = "prometheus")]
131 metrics: Option<Arc<prometheus::PrometheusMetrics>>,
132 #[cfg(feature = "opentelemetry")]
133 tracer: Option<Arc<tracing::TracingProcessor>>,
134 #[cfg(feature = "logging")]
135 logger: Option<Arc<dyn Logger>>,
136}
137
138impl ObservabilityBuilder {
139 #[cfg(feature = "prometheus")]
141 #[must_use]
142 pub fn with_metrics(mut self, metrics: Arc<prometheus::PrometheusMetrics>) -> Self {
143 self.metrics = Some(metrics);
144 self
145 }
146
147 #[cfg(feature = "opentelemetry")]
149 #[must_use]
150 pub fn with_tracing(mut self, tracer: Arc<tracing::TracingProcessor>) -> Self {
151 self.tracer = Some(tracer);
152 self
153 }
154
155 #[cfg(feature = "logging")]
157 #[must_use]
158 pub fn with_logger(mut self, logger: Arc<dyn Logger>) -> Self {
159 self.logger = Some(logger);
160 self
161 }
162
163 #[must_use]
165 pub fn build(self) -> ObservableProcessor {
166 ObservableProcessor {
167 inner: self.processor,
168 #[cfg(feature = "prometheus")]
169 metrics: self.metrics,
170 #[cfg(feature = "opentelemetry")]
171 tracer: self.tracer,
172 #[cfg(feature = "logging")]
173 logger: self.logger,
174 }
175 }
176}