rama_net/stream/layer/
opentelemetry.rs

1//! Http OpenTelemetry [`Layer`] Support for Rama.
2//!
3//! [`Layer`]: rama_core::Layer
4
5use crate::stream::SocketInfo;
6use rama_core::telemetry::opentelemetry::semantic_conventions::resource::{
7    SERVICE_NAME, SERVICE_VERSION,
8};
9use rama_core::telemetry::opentelemetry::semantic_conventions::trace::{
10    NETWORK_TRANSPORT, NETWORK_TYPE,
11};
12use rama_core::telemetry::opentelemetry::{
13    global,
14    metrics::{Counter, Histogram, Meter},
15    semantic_conventions, InstrumentationScope, KeyValue,
16};
17use rama_core::telemetry::opentelemetry::{AttributesFactory, MeterOptions, ServiceInfo};
18use rama_core::{Context, Layer, Service};
19use rama_utils::macros::define_inner_service_accessors;
20use std::borrow::Cow;
21use std::net::IpAddr;
22use std::{fmt, sync::Arc, time::SystemTime};
23
24const NETWORK_CONNECTION_DURATION: &str = "network.server.connection_duration";
25const NETWORK_SERVER_TOTAL_CONNECTIONS: &str = "network.server.total_connections";
26
27/// Records network server metrics
28#[derive(Clone, Debug)]
29struct Metrics {
30    network_connection_duration: Histogram<f64>,
31    network_total_connections: Counter<u64>,
32}
33
34impl Metrics {
35    /// Create a new [`NetworkMetrics`]
36    fn new(meter: Meter, prefix: Option<String>) -> Self {
37        let network_connection_duration = meter
38            .f64_histogram(match &prefix {
39                Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_CONNECTION_DURATION}")),
40                None => Cow::Borrowed(NETWORK_CONNECTION_DURATION),
41            })
42            .with_description("Measures the duration of inbound network connections.")
43            .with_unit("s")
44            .build();
45
46        let network_total_connections = meter
47            .u64_counter(match &prefix {
48                Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_SERVER_TOTAL_CONNECTIONS}")),
49                None => Cow::Borrowed(NETWORK_SERVER_TOTAL_CONNECTIONS),
50            })
51            .with_description(
52                "measures the number of total network connections that have been established so far",
53            )
54            .build();
55
56        Metrics {
57            network_connection_duration,
58            network_total_connections,
59        }
60    }
61}
62
63/// A layer that records network server metrics using OpenTelemetry.
64pub struct NetworkMetricsLayer<F = ()> {
65    metrics: Arc<Metrics>,
66    base_attributes: Vec<KeyValue>,
67    attributes_factory: F,
68}
69
70impl<F: fmt::Debug> fmt::Debug for NetworkMetricsLayer<F> {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        f.debug_struct("NetworkMetricsLayer")
73            .field("metrics", &self.metrics)
74            .field("base_attributes", &self.base_attributes)
75            .field("attributes_factory", &self.attributes_factory)
76            .finish()
77    }
78}
79
80impl<F: Clone> Clone for NetworkMetricsLayer<F> {
81    fn clone(&self) -> Self {
82        NetworkMetricsLayer {
83            metrics: self.metrics.clone(),
84            base_attributes: self.base_attributes.clone(),
85            attributes_factory: self.attributes_factory.clone(),
86        }
87    }
88}
89
90impl NetworkMetricsLayer {
91    /// Create a new [`NetworkMetricsLayer`] using the global [`Meter`] provider,
92    /// with the default name and version.
93    pub fn new() -> Self {
94        Self::custom(MeterOptions::default())
95    }
96
97    /// Create a new [`NetworkMetricsLayer`] using the global [`Meter`] provider,
98    /// with a custom name and version.
99    pub fn custom(opts: MeterOptions) -> Self {
100        let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
101            name: rama_utils::info::NAME.to_owned(),
102            version: rama_utils::info::VERSION.to_owned(),
103        });
104
105        let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
106        attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
107        attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
108
109        let meter = get_versioned_meter();
110        let metrics = Metrics::new(meter, opts.metric_prefix);
111
112        Self {
113            metrics: Arc::new(metrics),
114            base_attributes: attributes,
115            attributes_factory: (),
116        }
117    }
118
119    /// Attach an [`AttributesFactory`] to this [`NetworkMetricsLayer`], allowing
120    /// you to inject custom attributes.
121    pub fn with_attributes<F>(self, attributes: F) -> NetworkMetricsLayer<F> {
122        NetworkMetricsLayer {
123            metrics: self.metrics,
124            base_attributes: self.base_attributes,
125            attributes_factory: attributes,
126        }
127    }
128}
129
130impl Default for NetworkMetricsLayer {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136fn get_versioned_meter() -> Meter {
137    global::meter_with_scope(
138        InstrumentationScope::builder(const_format::formatcp!(
139            "{}-network-transport",
140            rama_utils::info::NAME
141        ))
142        .with_version(rama_utils::info::VERSION)
143        .with_schema_url(semantic_conventions::SCHEMA_URL)
144        .build(),
145    )
146}
147
148impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
149    type Service = NetworkMetricsService<S, F>;
150
151    fn layer(&self, inner: S) -> Self::Service {
152        NetworkMetricsService {
153            inner,
154            metrics: self.metrics.clone(),
155            base_attributes: self.base_attributes.clone(),
156            attributes_factory: self.attributes_factory.clone(),
157        }
158    }
159}
160
161/// A [`Service`] that records network server metrics using OpenTelemetry.
162pub struct NetworkMetricsService<S, F = ()> {
163    inner: S,
164    metrics: Arc<Metrics>,
165    base_attributes: Vec<KeyValue>,
166    attributes_factory: F,
167}
168
169impl<S> NetworkMetricsService<S, ()> {
170    /// Create a new [`NetworkMetricsService`].
171    pub fn new(inner: S) -> Self {
172        NetworkMetricsLayer::new().layer(inner)
173    }
174
175    define_inner_service_accessors!();
176}
177
178impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for NetworkMetricsService<S, F> {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("NetworkMetricsService")
181            .field("inner", &self.inner)
182            .field("metrics", &self.metrics)
183            .field("base_attributes", &self.base_attributes)
184            .field("attributes_factory", &self.attributes_factory)
185            .finish()
186    }
187}
188
189impl<S: Clone, F: Clone> Clone for NetworkMetricsService<S, F> {
190    fn clone(&self) -> Self {
191        Self {
192            inner: self.inner.clone(),
193            metrics: self.metrics.clone(),
194            base_attributes: self.base_attributes.clone(),
195            attributes_factory: self.attributes_factory.clone(),
196        }
197    }
198}
199
200impl<S, F> NetworkMetricsService<S, F> {
201    fn compute_attributes<State>(&self, ctx: &Context<State>) -> Vec<KeyValue>
202    where
203        F: AttributesFactory<State>,
204    {
205        let mut attributes = self
206            .attributes_factory
207            .attributes(2 + self.base_attributes.len(), ctx);
208        attributes.extend(self.base_attributes.iter().cloned());
209
210        // client info
211        if let Some(socket_info) = ctx.get::<SocketInfo>() {
212            let peer_addr = socket_info.peer_addr();
213            attributes.push(KeyValue::new(
214                NETWORK_TYPE,
215                match peer_addr.ip() {
216                    IpAddr::V4(_) => "ipv4",
217                    IpAddr::V6(_) => "ipv6",
218                },
219            ));
220        }
221
222        // connection info
223        attributes.push(KeyValue::new(NETWORK_TRANSPORT, "tcp")); // TODO: do not hardcode this once we support UDP
224
225        attributes
226    }
227}
228
229impl<S, F, State, Stream> Service<State, Stream> for NetworkMetricsService<S, F>
230where
231    S: Service<State, Stream>,
232    F: AttributesFactory<State>,
233    State: Clone + Send + Sync + 'static,
234    Stream: crate::stream::Stream,
235{
236    type Response = S::Response;
237    type Error = S::Error;
238
239    async fn serve(
240        &self,
241        ctx: Context<State>,
242        stream: Stream,
243    ) -> Result<Self::Response, Self::Error> {
244        let attributes: Vec<KeyValue> = self.compute_attributes(&ctx);
245
246        self.metrics.network_total_connections.add(1, &attributes);
247
248        // used to compute the duration of the connection
249        let timer = SystemTime::now();
250
251        let result = self.inner.serve(ctx, stream).await;
252
253        match result {
254            Ok(res) => {
255                self.metrics.network_connection_duration.record(
256                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
257                    &attributes,
258                );
259                Ok(res)
260            }
261            Err(err) => Err(err),
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_default_svc_compute_attributes_default() {
272        let svc = NetworkMetricsService::new(());
273        let attributes = svc.compute_attributes(&Context::default());
274        assert!(attributes
275            .iter()
276            .any(|attr| attr.key.as_str() == SERVICE_NAME));
277        assert!(attributes
278            .iter()
279            .any(|attr| attr.key.as_str() == SERVICE_VERSION));
280        assert!(attributes
281            .iter()
282            .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
283    }
284
285    #[test]
286    fn test_custom_svc_compute_attributes_default() {
287        let svc = NetworkMetricsLayer::custom(MeterOptions {
288            service: Some(ServiceInfo {
289                name: "test".to_owned(),
290                version: "42".to_owned(),
291            }),
292            metric_prefix: Some("foo".to_owned()),
293            ..Default::default()
294        })
295        .layer(());
296
297        let attributes = svc.compute_attributes(&Context::default());
298        assert!(attributes
299            .iter()
300            .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
301        assert!(attributes
302            .iter()
303            .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
304        assert!(attributes
305            .iter()
306            .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
307    }
308
309    #[test]
310    fn test_custom_svc_compute_attributes_attributes_vec() {
311        let svc = NetworkMetricsLayer::custom(MeterOptions {
312            service: Some(ServiceInfo {
313                name: "test".to_owned(),
314                version: "42".to_owned(),
315            }),
316            metric_prefix: Some("foo".to_owned()),
317            ..Default::default()
318        })
319        .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
320        .layer(());
321
322        let attributes = svc.compute_attributes(&Context::default());
323        assert!(attributes
324            .iter()
325            .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
326        assert!(attributes
327            .iter()
328            .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
329        assert!(attributes
330            .iter()
331            .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
332        assert!(attributes
333            .iter()
334            .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn"));
335    }
336
337    #[test]
338    fn test_custom_svc_compute_attributes_attribute_fn() {
339        let svc = NetworkMetricsLayer::custom(MeterOptions {
340            service: Some(ServiceInfo {
341                name: "test".to_owned(),
342                version: "42".to_owned(),
343            }),
344            metric_prefix: Some("foo".to_owned()),
345            ..Default::default()
346        })
347        .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
348            let mut attributes = Vec::with_capacity(size_hint + 1);
349            attributes.push(KeyValue::new("test", "attribute_fn"));
350            attributes
351        })
352        .layer(());
353
354        let attributes = svc.compute_attributes(&Context::default());
355        assert!(attributes
356            .iter()
357            .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
358        assert!(attributes
359            .iter()
360            .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
361        assert!(attributes
362            .iter()
363            .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
364        assert!(attributes
365            .iter()
366            .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn"));
367    }
368}