rama_net/stream/layer/
opentelemetry.rs

1//! Http OpenTelemetry [`Layer`] Support for Rama.
2//!
3//! [`Layer`]: rama_core::Layer
4
5use crate::stream::SocketInfo;
6use rama_core::telemetry::opentelemetry::semantic_conventions::resource::{
7    SERVICE_NAME, SERVICE_VERSION,
8};
9use rama_core::telemetry::opentelemetry::semantic_conventions::trace::{
10    NETWORK_TRANSPORT, NETWORK_TYPE,
11};
12use rama_core::telemetry::opentelemetry::{AttributesFactory, MeterOptions, ServiceInfo};
13use rama_core::telemetry::opentelemetry::{
14    InstrumentationScope, KeyValue, global,
15    metrics::{Counter, Histogram, Meter},
16    semantic_conventions,
17};
18use rama_core::{Context, Layer, Service};
19use rama_utils::macros::define_inner_service_accessors;
20use std::borrow::Cow;
21use std::net::IpAddr;
22use std::{fmt, sync::Arc, time::SystemTime};
23
24const NETWORK_CONNECTION_DURATION: &str = "network.server.connection_duration";
25const NETWORK_SERVER_TOTAL_CONNECTIONS: &str = "network.server.total_connections";
26
27/// Records network server metrics
28#[derive(Clone, Debug)]
29struct Metrics {
30    network_connection_duration: Histogram<f64>,
31    network_total_connections: Counter<u64>,
32}
33
34impl Metrics {
35    /// Create a new [`NetworkMetrics`]
36    fn new(meter: Meter, prefix: Option<String>) -> Self {
37        let network_connection_duration = meter
38            .f64_histogram(match &prefix {
39                Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_CONNECTION_DURATION}")),
40                None => Cow::Borrowed(NETWORK_CONNECTION_DURATION),
41            })
42            .with_description("Measures the duration of inbound network connections.")
43            .with_unit("s")
44            .build();
45
46        let network_total_connections = meter
47            .u64_counter(match &prefix {
48                Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_SERVER_TOTAL_CONNECTIONS}")),
49                None => Cow::Borrowed(NETWORK_SERVER_TOTAL_CONNECTIONS),
50            })
51            .with_description(
52                "measures the number of total network connections that have been established so far",
53            )
54            .build();
55
56        Metrics {
57            network_connection_duration,
58            network_total_connections,
59        }
60    }
61}
62
63/// A layer that records network server metrics using OpenTelemetry.
64pub struct NetworkMetricsLayer<F = ()> {
65    metrics: Arc<Metrics>,
66    base_attributes: Vec<KeyValue>,
67    attributes_factory: F,
68}
69
70impl<F: fmt::Debug> fmt::Debug for NetworkMetricsLayer<F> {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        f.debug_struct("NetworkMetricsLayer")
73            .field("metrics", &self.metrics)
74            .field("base_attributes", &self.base_attributes)
75            .field("attributes_factory", &self.attributes_factory)
76            .finish()
77    }
78}
79
80impl<F: Clone> Clone for NetworkMetricsLayer<F> {
81    fn clone(&self) -> Self {
82        NetworkMetricsLayer {
83            metrics: self.metrics.clone(),
84            base_attributes: self.base_attributes.clone(),
85            attributes_factory: self.attributes_factory.clone(),
86        }
87    }
88}
89
90impl NetworkMetricsLayer {
91    /// Create a new [`NetworkMetricsLayer`] using the global [`Meter`] provider,
92    /// with the default name and version.
93    pub fn new() -> Self {
94        Self::custom(MeterOptions::default())
95    }
96
97    /// Create a new [`NetworkMetricsLayer`] using the global [`Meter`] provider,
98    /// with a custom name and version.
99    pub fn custom(opts: MeterOptions) -> Self {
100        let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
101            name: rama_utils::info::NAME.to_owned(),
102            version: rama_utils::info::VERSION.to_owned(),
103        });
104
105        let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
106        attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
107        attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
108
109        let meter = get_versioned_meter();
110        let metrics = Metrics::new(meter, opts.metric_prefix);
111
112        Self {
113            metrics: Arc::new(metrics),
114            base_attributes: attributes,
115            attributes_factory: (),
116        }
117    }
118
119    /// Attach an [`AttributesFactory`] to this [`NetworkMetricsLayer`], allowing
120    /// you to inject custom attributes.
121    pub fn with_attributes<F>(self, attributes: F) -> NetworkMetricsLayer<F> {
122        NetworkMetricsLayer {
123            metrics: self.metrics,
124            base_attributes: self.base_attributes,
125            attributes_factory: attributes,
126        }
127    }
128}
129
130impl Default for NetworkMetricsLayer {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136fn get_versioned_meter() -> Meter {
137    global::meter_with_scope(
138        InstrumentationScope::builder(const_format::formatcp!(
139            "{}-network-transport",
140            rama_utils::info::NAME
141        ))
142        .with_version(rama_utils::info::VERSION)
143        .with_schema_url(semantic_conventions::SCHEMA_URL)
144        .build(),
145    )
146}
147
148impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
149    type Service = NetworkMetricsService<S, F>;
150
151    fn layer(&self, inner: S) -> Self::Service {
152        NetworkMetricsService {
153            inner,
154            metrics: self.metrics.clone(),
155            base_attributes: self.base_attributes.clone(),
156            attributes_factory: self.attributes_factory.clone(),
157        }
158    }
159
160    fn into_layer(self, inner: S) -> Self::Service {
161        NetworkMetricsService {
162            inner,
163            metrics: self.metrics,
164            base_attributes: self.base_attributes,
165            attributes_factory: self.attributes_factory,
166        }
167    }
168}
169
170/// A [`Service`] that records network server metrics using OpenTelemetry.
171pub struct NetworkMetricsService<S, F = ()> {
172    inner: S,
173    metrics: Arc<Metrics>,
174    base_attributes: Vec<KeyValue>,
175    attributes_factory: F,
176}
177
178impl<S> NetworkMetricsService<S, ()> {
179    /// Create a new [`NetworkMetricsService`].
180    pub fn new(inner: S) -> Self {
181        NetworkMetricsLayer::new().into_layer(inner)
182    }
183
184    define_inner_service_accessors!();
185}
186
187impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for NetworkMetricsService<S, F> {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("NetworkMetricsService")
190            .field("inner", &self.inner)
191            .field("metrics", &self.metrics)
192            .field("base_attributes", &self.base_attributes)
193            .field("attributes_factory", &self.attributes_factory)
194            .finish()
195    }
196}
197
198impl<S: Clone, F: Clone> Clone for NetworkMetricsService<S, F> {
199    fn clone(&self) -> Self {
200        Self {
201            inner: self.inner.clone(),
202            metrics: self.metrics.clone(),
203            base_attributes: self.base_attributes.clone(),
204            attributes_factory: self.attributes_factory.clone(),
205        }
206    }
207}
208
209impl<S, F> NetworkMetricsService<S, F> {
210    fn compute_attributes<State>(&self, ctx: &Context<State>) -> Vec<KeyValue>
211    where
212        F: AttributesFactory<State>,
213    {
214        let mut attributes = self
215            .attributes_factory
216            .attributes(2 + self.base_attributes.len(), ctx);
217        attributes.extend(self.base_attributes.iter().cloned());
218
219        // client info
220        if let Some(socket_info) = ctx.get::<SocketInfo>() {
221            let peer_addr = socket_info.peer_addr();
222            attributes.push(KeyValue::new(
223                NETWORK_TYPE,
224                match peer_addr.ip() {
225                    IpAddr::V4(_) => "ipv4",
226                    IpAddr::V6(_) => "ipv6",
227                },
228            ));
229        }
230
231        // connection info
232        attributes.push(KeyValue::new(NETWORK_TRANSPORT, "tcp")); // TODO: do not hardcode this once we support UDP
233
234        attributes
235    }
236}
237
238impl<S, F, State, Stream> Service<State, Stream> for NetworkMetricsService<S, F>
239where
240    S: Service<State, Stream>,
241    F: AttributesFactory<State>,
242    State: Clone + Send + Sync + 'static,
243    Stream: crate::stream::Stream,
244{
245    type Response = S::Response;
246    type Error = S::Error;
247
248    async fn serve(
249        &self,
250        ctx: Context<State>,
251        stream: Stream,
252    ) -> Result<Self::Response, Self::Error> {
253        let attributes: Vec<KeyValue> = self.compute_attributes(&ctx);
254
255        self.metrics.network_total_connections.add(1, &attributes);
256
257        // used to compute the duration of the connection
258        let timer = SystemTime::now();
259
260        let result = self.inner.serve(ctx, stream).await;
261
262        match result {
263            Ok(res) => {
264                self.metrics.network_connection_duration.record(
265                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
266                    &attributes,
267                );
268                Ok(res)
269            }
270            Err(err) => Err(err),
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_default_svc_compute_attributes_default() {
281        let svc = NetworkMetricsService::new(());
282        let attributes = svc.compute_attributes(&Context::default());
283        assert!(
284            attributes
285                .iter()
286                .any(|attr| attr.key.as_str() == SERVICE_NAME)
287        );
288        assert!(
289            attributes
290                .iter()
291                .any(|attr| attr.key.as_str() == SERVICE_VERSION)
292        );
293        assert!(
294            attributes
295                .iter()
296                .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
297        );
298    }
299
300    #[test]
301    fn test_custom_svc_compute_attributes_default() {
302        let svc = NetworkMetricsLayer::custom(MeterOptions {
303            service: Some(ServiceInfo {
304                name: "test".to_owned(),
305                version: "42".to_owned(),
306            }),
307            metric_prefix: Some("foo".to_owned()),
308            ..Default::default()
309        })
310        .into_layer(());
311
312        let attributes = svc.compute_attributes(&Context::default());
313        assert!(
314            attributes
315                .iter()
316                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
317        );
318        assert!(
319            attributes
320                .iter()
321                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
322        );
323        assert!(
324            attributes
325                .iter()
326                .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
327        );
328    }
329
330    #[test]
331    fn test_custom_svc_compute_attributes_attributes_vec() {
332        let svc = NetworkMetricsLayer::custom(MeterOptions {
333            service: Some(ServiceInfo {
334                name: "test".to_owned(),
335                version: "42".to_owned(),
336            }),
337            metric_prefix: Some("foo".to_owned()),
338            ..Default::default()
339        })
340        .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
341        .into_layer(());
342
343        let attributes = svc.compute_attributes(&Context::default());
344        assert!(
345            attributes
346                .iter()
347                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
348        );
349        assert!(
350            attributes
351                .iter()
352                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
353        );
354        assert!(
355            attributes
356                .iter()
357                .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
358        );
359        assert!(
360            attributes
361                .iter()
362                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
363        );
364    }
365
366    #[test]
367    fn test_custom_svc_compute_attributes_attribute_fn() {
368        let svc = NetworkMetricsLayer::custom(MeterOptions {
369            service: Some(ServiceInfo {
370                name: "test".to_owned(),
371                version: "42".to_owned(),
372            }),
373            metric_prefix: Some("foo".to_owned()),
374            ..Default::default()
375        })
376        .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
377            let mut attributes = Vec::with_capacity(size_hint + 1);
378            attributes.push(KeyValue::new("test", "attribute_fn"));
379            attributes
380        })
381        .into_layer(());
382
383        let attributes = svc.compute_attributes(&Context::default());
384        assert!(
385            attributes
386                .iter()
387                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
388        );
389        assert!(
390            attributes
391                .iter()
392                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
393        );
394        assert!(
395            attributes
396                .iter()
397                .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
398        );
399        assert!(
400            attributes
401                .iter()
402                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
403        );
404    }
405}