1use crate::stream::SocketInfo;
6use rama_core::telemetry::opentelemetry::semantic_conventions::resource::{
7 SERVICE_NAME, SERVICE_VERSION,
8};
9use rama_core::telemetry::opentelemetry::semantic_conventions::trace::{
10 NETWORK_TRANSPORT, NETWORK_TYPE,
11};
12use rama_core::telemetry::opentelemetry::{
13 global,
14 metrics::{Counter, Histogram, Meter},
15 semantic_conventions, InstrumentationScope, KeyValue,
16};
17use rama_core::telemetry::opentelemetry::{AttributesFactory, MeterOptions, ServiceInfo};
18use rama_core::{Context, Layer, Service};
19use rama_utils::macros::define_inner_service_accessors;
20use std::borrow::Cow;
21use std::net::IpAddr;
22use std::{fmt, sync::Arc, time::SystemTime};
23
24const NETWORK_CONNECTION_DURATION: &str = "network.server.connection_duration";
25const NETWORK_SERVER_TOTAL_CONNECTIONS: &str = "network.server.total_connections";
26
27#[derive(Clone, Debug)]
29struct Metrics {
30 network_connection_duration: Histogram<f64>,
31 network_total_connections: Counter<u64>,
32}
33
34impl Metrics {
35 fn new(meter: Meter, prefix: Option<String>) -> Self {
37 let network_connection_duration = meter
38 .f64_histogram(match &prefix {
39 Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_CONNECTION_DURATION}")),
40 None => Cow::Borrowed(NETWORK_CONNECTION_DURATION),
41 })
42 .with_description("Measures the duration of inbound network connections.")
43 .with_unit("s")
44 .build();
45
46 let network_total_connections = meter
47 .u64_counter(match &prefix {
48 Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_SERVER_TOTAL_CONNECTIONS}")),
49 None => Cow::Borrowed(NETWORK_SERVER_TOTAL_CONNECTIONS),
50 })
51 .with_description(
52 "measures the number of total network connections that have been established so far",
53 )
54 .build();
55
56 Metrics {
57 network_connection_duration,
58 network_total_connections,
59 }
60 }
61}
62
63pub struct NetworkMetricsLayer<F = ()> {
65 metrics: Arc<Metrics>,
66 base_attributes: Vec<KeyValue>,
67 attributes_factory: F,
68}
69
70impl<F: fmt::Debug> fmt::Debug for NetworkMetricsLayer<F> {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 f.debug_struct("NetworkMetricsLayer")
73 .field("metrics", &self.metrics)
74 .field("base_attributes", &self.base_attributes)
75 .field("attributes_factory", &self.attributes_factory)
76 .finish()
77 }
78}
79
80impl<F: Clone> Clone for NetworkMetricsLayer<F> {
81 fn clone(&self) -> Self {
82 NetworkMetricsLayer {
83 metrics: self.metrics.clone(),
84 base_attributes: self.base_attributes.clone(),
85 attributes_factory: self.attributes_factory.clone(),
86 }
87 }
88}
89
90impl NetworkMetricsLayer {
91 pub fn new() -> Self {
94 Self::custom(MeterOptions::default())
95 }
96
97 pub fn custom(opts: MeterOptions) -> Self {
100 let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
101 name: rama_utils::info::NAME.to_owned(),
102 version: rama_utils::info::VERSION.to_owned(),
103 });
104
105 let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
106 attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
107 attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
108
109 let meter = get_versioned_meter();
110 let metrics = Metrics::new(meter, opts.metric_prefix);
111
112 Self {
113 metrics: Arc::new(metrics),
114 base_attributes: attributes,
115 attributes_factory: (),
116 }
117 }
118
119 pub fn with_attributes<F>(self, attributes: F) -> NetworkMetricsLayer<F> {
122 NetworkMetricsLayer {
123 metrics: self.metrics,
124 base_attributes: self.base_attributes,
125 attributes_factory: attributes,
126 }
127 }
128}
129
130impl Default for NetworkMetricsLayer {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136fn get_versioned_meter() -> Meter {
137 global::meter_with_scope(
138 InstrumentationScope::builder(const_format::formatcp!(
139 "{}-network-transport",
140 rama_utils::info::NAME
141 ))
142 .with_version(rama_utils::info::VERSION)
143 .with_schema_url(semantic_conventions::SCHEMA_URL)
144 .build(),
145 )
146}
147
148impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
149 type Service = NetworkMetricsService<S, F>;
150
151 fn layer(&self, inner: S) -> Self::Service {
152 NetworkMetricsService {
153 inner,
154 metrics: self.metrics.clone(),
155 base_attributes: self.base_attributes.clone(),
156 attributes_factory: self.attributes_factory.clone(),
157 }
158 }
159}
160
161pub struct NetworkMetricsService<S, F = ()> {
163 inner: S,
164 metrics: Arc<Metrics>,
165 base_attributes: Vec<KeyValue>,
166 attributes_factory: F,
167}
168
169impl<S> NetworkMetricsService<S, ()> {
170 pub fn new(inner: S) -> Self {
172 NetworkMetricsLayer::new().layer(inner)
173 }
174
175 define_inner_service_accessors!();
176}
177
178impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for NetworkMetricsService<S, F> {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("NetworkMetricsService")
181 .field("inner", &self.inner)
182 .field("metrics", &self.metrics)
183 .field("base_attributes", &self.base_attributes)
184 .field("attributes_factory", &self.attributes_factory)
185 .finish()
186 }
187}
188
189impl<S: Clone, F: Clone> Clone for NetworkMetricsService<S, F> {
190 fn clone(&self) -> Self {
191 Self {
192 inner: self.inner.clone(),
193 metrics: self.metrics.clone(),
194 base_attributes: self.base_attributes.clone(),
195 attributes_factory: self.attributes_factory.clone(),
196 }
197 }
198}
199
200impl<S, F> NetworkMetricsService<S, F> {
201 fn compute_attributes<State>(&self, ctx: &Context<State>) -> Vec<KeyValue>
202 where
203 F: AttributesFactory<State>,
204 {
205 let mut attributes = self
206 .attributes_factory
207 .attributes(2 + self.base_attributes.len(), ctx);
208 attributes.extend(self.base_attributes.iter().cloned());
209
210 if let Some(socket_info) = ctx.get::<SocketInfo>() {
212 let peer_addr = socket_info.peer_addr();
213 attributes.push(KeyValue::new(
214 NETWORK_TYPE,
215 match peer_addr.ip() {
216 IpAddr::V4(_) => "ipv4",
217 IpAddr::V6(_) => "ipv6",
218 },
219 ));
220 }
221
222 attributes.push(KeyValue::new(NETWORK_TRANSPORT, "tcp")); attributes
226 }
227}
228
229impl<S, F, State, Stream> Service<State, Stream> for NetworkMetricsService<S, F>
230where
231 S: Service<State, Stream>,
232 F: AttributesFactory<State>,
233 State: Clone + Send + Sync + 'static,
234 Stream: crate::stream::Stream,
235{
236 type Response = S::Response;
237 type Error = S::Error;
238
239 async fn serve(
240 &self,
241 ctx: Context<State>,
242 stream: Stream,
243 ) -> Result<Self::Response, Self::Error> {
244 let attributes: Vec<KeyValue> = self.compute_attributes(&ctx);
245
246 self.metrics.network_total_connections.add(1, &attributes);
247
248 let timer = SystemTime::now();
250
251 let result = self.inner.serve(ctx, stream).await;
252
253 match result {
254 Ok(res) => {
255 self.metrics.network_connection_duration.record(
256 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
257 &attributes,
258 );
259 Ok(res)
260 }
261 Err(err) => Err(err),
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_default_svc_compute_attributes_default() {
272 let svc = NetworkMetricsService::new(());
273 let attributes = svc.compute_attributes(&Context::default());
274 assert!(attributes
275 .iter()
276 .any(|attr| attr.key.as_str() == SERVICE_NAME));
277 assert!(attributes
278 .iter()
279 .any(|attr| attr.key.as_str() == SERVICE_VERSION));
280 assert!(attributes
281 .iter()
282 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
283 }
284
285 #[test]
286 fn test_custom_svc_compute_attributes_default() {
287 let svc = NetworkMetricsLayer::custom(MeterOptions {
288 service: Some(ServiceInfo {
289 name: "test".to_owned(),
290 version: "42".to_owned(),
291 }),
292 metric_prefix: Some("foo".to_owned()),
293 ..Default::default()
294 })
295 .layer(());
296
297 let attributes = svc.compute_attributes(&Context::default());
298 assert!(attributes
299 .iter()
300 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
301 assert!(attributes
302 .iter()
303 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
304 assert!(attributes
305 .iter()
306 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
307 }
308
309 #[test]
310 fn test_custom_svc_compute_attributes_attributes_vec() {
311 let svc = NetworkMetricsLayer::custom(MeterOptions {
312 service: Some(ServiceInfo {
313 name: "test".to_owned(),
314 version: "42".to_owned(),
315 }),
316 metric_prefix: Some("foo".to_owned()),
317 ..Default::default()
318 })
319 .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
320 .layer(());
321
322 let attributes = svc.compute_attributes(&Context::default());
323 assert!(attributes
324 .iter()
325 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
326 assert!(attributes
327 .iter()
328 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
329 assert!(attributes
330 .iter()
331 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
332 assert!(attributes
333 .iter()
334 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn"));
335 }
336
337 #[test]
338 fn test_custom_svc_compute_attributes_attribute_fn() {
339 let svc = NetworkMetricsLayer::custom(MeterOptions {
340 service: Some(ServiceInfo {
341 name: "test".to_owned(),
342 version: "42".to_owned(),
343 }),
344 metric_prefix: Some("foo".to_owned()),
345 ..Default::default()
346 })
347 .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
348 let mut attributes = Vec::with_capacity(size_hint + 1);
349 attributes.push(KeyValue::new("test", "attribute_fn"));
350 attributes
351 })
352 .layer(());
353
354 let attributes = svc.compute_attributes(&Context::default());
355 assert!(attributes
356 .iter()
357 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test"));
358 assert!(attributes
359 .iter()
360 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42"));
361 assert!(attributes
362 .iter()
363 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT));
364 assert!(attributes
365 .iter()
366 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn"));
367 }
368}