1use crate::stream::SocketInfo;
6use rama_core::telemetry::opentelemetry::semantic_conventions::resource::{
7 SERVICE_NAME, SERVICE_VERSION,
8};
9use rama_core::telemetry::opentelemetry::semantic_conventions::trace::{
10 NETWORK_TRANSPORT, NETWORK_TYPE,
11};
12use rama_core::telemetry::opentelemetry::{AttributesFactory, MeterOptions, ServiceInfo};
13use rama_core::telemetry::opentelemetry::{
14 InstrumentationScope, KeyValue, global,
15 metrics::{Counter, Histogram, Meter},
16 semantic_conventions,
17};
18use rama_core::{Context, Layer, Service};
19use rama_utils::macros::define_inner_service_accessors;
20use std::borrow::Cow;
21use std::net::IpAddr;
22use std::{fmt, sync::Arc, time::SystemTime};
23
24const NETWORK_CONNECTION_DURATION: &str = "network.server.connection_duration";
25const NETWORK_SERVER_TOTAL_CONNECTIONS: &str = "network.server.total_connections";
26
27#[derive(Clone, Debug)]
29struct Metrics {
30 network_connection_duration: Histogram<f64>,
31 network_total_connections: Counter<u64>,
32}
33
34impl Metrics {
35 fn new(meter: Meter, prefix: Option<String>) -> Self {
37 let network_connection_duration = meter
38 .f64_histogram(match &prefix {
39 Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_CONNECTION_DURATION}")),
40 None => Cow::Borrowed(NETWORK_CONNECTION_DURATION),
41 })
42 .with_description("Measures the duration of inbound network connections.")
43 .with_unit("s")
44 .build();
45
46 let network_total_connections = meter
47 .u64_counter(match &prefix {
48 Some(prefix) => Cow::Owned(format!("{prefix}.{NETWORK_SERVER_TOTAL_CONNECTIONS}")),
49 None => Cow::Borrowed(NETWORK_SERVER_TOTAL_CONNECTIONS),
50 })
51 .with_description(
52 "measures the number of total network connections that have been established so far",
53 )
54 .build();
55
56 Metrics {
57 network_connection_duration,
58 network_total_connections,
59 }
60 }
61}
62
63pub struct NetworkMetricsLayer<F = ()> {
65 metrics: Arc<Metrics>,
66 base_attributes: Vec<KeyValue>,
67 attributes_factory: F,
68}
69
70impl<F: fmt::Debug> fmt::Debug for NetworkMetricsLayer<F> {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 f.debug_struct("NetworkMetricsLayer")
73 .field("metrics", &self.metrics)
74 .field("base_attributes", &self.base_attributes)
75 .field("attributes_factory", &self.attributes_factory)
76 .finish()
77 }
78}
79
80impl<F: Clone> Clone for NetworkMetricsLayer<F> {
81 fn clone(&self) -> Self {
82 NetworkMetricsLayer {
83 metrics: self.metrics.clone(),
84 base_attributes: self.base_attributes.clone(),
85 attributes_factory: self.attributes_factory.clone(),
86 }
87 }
88}
89
90impl NetworkMetricsLayer {
91 pub fn new() -> Self {
94 Self::custom(MeterOptions::default())
95 }
96
97 pub fn custom(opts: MeterOptions) -> Self {
100 let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
101 name: rama_utils::info::NAME.to_owned(),
102 version: rama_utils::info::VERSION.to_owned(),
103 });
104
105 let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
106 attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
107 attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
108
109 let meter = get_versioned_meter();
110 let metrics = Metrics::new(meter, opts.metric_prefix);
111
112 Self {
113 metrics: Arc::new(metrics),
114 base_attributes: attributes,
115 attributes_factory: (),
116 }
117 }
118
119 pub fn with_attributes<F>(self, attributes: F) -> NetworkMetricsLayer<F> {
122 NetworkMetricsLayer {
123 metrics: self.metrics,
124 base_attributes: self.base_attributes,
125 attributes_factory: attributes,
126 }
127 }
128}
129
130impl Default for NetworkMetricsLayer {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136fn get_versioned_meter() -> Meter {
137 global::meter_with_scope(
138 InstrumentationScope::builder(const_format::formatcp!(
139 "{}-network-transport",
140 rama_utils::info::NAME
141 ))
142 .with_version(rama_utils::info::VERSION)
143 .with_schema_url(semantic_conventions::SCHEMA_URL)
144 .build(),
145 )
146}
147
148impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
149 type Service = NetworkMetricsService<S, F>;
150
151 fn layer(&self, inner: S) -> Self::Service {
152 NetworkMetricsService {
153 inner,
154 metrics: self.metrics.clone(),
155 base_attributes: self.base_attributes.clone(),
156 attributes_factory: self.attributes_factory.clone(),
157 }
158 }
159
160 fn into_layer(self, inner: S) -> Self::Service {
161 NetworkMetricsService {
162 inner,
163 metrics: self.metrics,
164 base_attributes: self.base_attributes,
165 attributes_factory: self.attributes_factory,
166 }
167 }
168}
169
170pub struct NetworkMetricsService<S, F = ()> {
172 inner: S,
173 metrics: Arc<Metrics>,
174 base_attributes: Vec<KeyValue>,
175 attributes_factory: F,
176}
177
178impl<S> NetworkMetricsService<S, ()> {
179 pub fn new(inner: S) -> Self {
181 NetworkMetricsLayer::new().into_layer(inner)
182 }
183
184 define_inner_service_accessors!();
185}
186
187impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for NetworkMetricsService<S, F> {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("NetworkMetricsService")
190 .field("inner", &self.inner)
191 .field("metrics", &self.metrics)
192 .field("base_attributes", &self.base_attributes)
193 .field("attributes_factory", &self.attributes_factory)
194 .finish()
195 }
196}
197
198impl<S: Clone, F: Clone> Clone for NetworkMetricsService<S, F> {
199 fn clone(&self) -> Self {
200 Self {
201 inner: self.inner.clone(),
202 metrics: self.metrics.clone(),
203 base_attributes: self.base_attributes.clone(),
204 attributes_factory: self.attributes_factory.clone(),
205 }
206 }
207}
208
209impl<S, F> NetworkMetricsService<S, F> {
210 fn compute_attributes<State>(&self, ctx: &Context<State>) -> Vec<KeyValue>
211 where
212 F: AttributesFactory<State>,
213 {
214 let mut attributes = self
215 .attributes_factory
216 .attributes(2 + self.base_attributes.len(), ctx);
217 attributes.extend(self.base_attributes.iter().cloned());
218
219 if let Some(socket_info) = ctx.get::<SocketInfo>() {
221 let peer_addr = socket_info.peer_addr();
222 attributes.push(KeyValue::new(
223 NETWORK_TYPE,
224 match peer_addr.ip() {
225 IpAddr::V4(_) => "ipv4",
226 IpAddr::V6(_) => "ipv6",
227 },
228 ));
229 }
230
231 attributes.push(KeyValue::new(NETWORK_TRANSPORT, "tcp")); attributes
235 }
236}
237
238impl<S, F, State, Stream> Service<State, Stream> for NetworkMetricsService<S, F>
239where
240 S: Service<State, Stream>,
241 F: AttributesFactory<State>,
242 State: Clone + Send + Sync + 'static,
243 Stream: crate::stream::Stream,
244{
245 type Response = S::Response;
246 type Error = S::Error;
247
248 async fn serve(
249 &self,
250 ctx: Context<State>,
251 stream: Stream,
252 ) -> Result<Self::Response, Self::Error> {
253 let attributes: Vec<KeyValue> = self.compute_attributes(&ctx);
254
255 self.metrics.network_total_connections.add(1, &attributes);
256
257 let timer = SystemTime::now();
259
260 let result = self.inner.serve(ctx, stream).await;
261
262 match result {
263 Ok(res) => {
264 self.metrics.network_connection_duration.record(
265 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
266 &attributes,
267 );
268 Ok(res)
269 }
270 Err(err) => Err(err),
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_default_svc_compute_attributes_default() {
281 let svc = NetworkMetricsService::new(());
282 let attributes = svc.compute_attributes(&Context::default());
283 assert!(
284 attributes
285 .iter()
286 .any(|attr| attr.key.as_str() == SERVICE_NAME)
287 );
288 assert!(
289 attributes
290 .iter()
291 .any(|attr| attr.key.as_str() == SERVICE_VERSION)
292 );
293 assert!(
294 attributes
295 .iter()
296 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
297 );
298 }
299
300 #[test]
301 fn test_custom_svc_compute_attributes_default() {
302 let svc = NetworkMetricsLayer::custom(MeterOptions {
303 service: Some(ServiceInfo {
304 name: "test".to_owned(),
305 version: "42".to_owned(),
306 }),
307 metric_prefix: Some("foo".to_owned()),
308 ..Default::default()
309 })
310 .into_layer(());
311
312 let attributes = svc.compute_attributes(&Context::default());
313 assert!(
314 attributes
315 .iter()
316 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
317 );
318 assert!(
319 attributes
320 .iter()
321 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
322 );
323 assert!(
324 attributes
325 .iter()
326 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
327 );
328 }
329
330 #[test]
331 fn test_custom_svc_compute_attributes_attributes_vec() {
332 let svc = NetworkMetricsLayer::custom(MeterOptions {
333 service: Some(ServiceInfo {
334 name: "test".to_owned(),
335 version: "42".to_owned(),
336 }),
337 metric_prefix: Some("foo".to_owned()),
338 ..Default::default()
339 })
340 .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
341 .into_layer(());
342
343 let attributes = svc.compute_attributes(&Context::default());
344 assert!(
345 attributes
346 .iter()
347 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
348 );
349 assert!(
350 attributes
351 .iter()
352 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
353 );
354 assert!(
355 attributes
356 .iter()
357 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
358 );
359 assert!(
360 attributes
361 .iter()
362 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
363 );
364 }
365
366 #[test]
367 fn test_custom_svc_compute_attributes_attribute_fn() {
368 let svc = NetworkMetricsLayer::custom(MeterOptions {
369 service: Some(ServiceInfo {
370 name: "test".to_owned(),
371 version: "42".to_owned(),
372 }),
373 metric_prefix: Some("foo".to_owned()),
374 ..Default::default()
375 })
376 .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
377 let mut attributes = Vec::with_capacity(size_hint + 1);
378 attributes.push(KeyValue::new("test", "attribute_fn"));
379 attributes
380 })
381 .into_layer(());
382
383 let attributes = svc.compute_attributes(&Context::default());
384 assert!(
385 attributes
386 .iter()
387 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
388 );
389 assert!(
390 attributes
391 .iter()
392 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
393 );
394 assert!(
395 attributes
396 .iter()
397 .any(|attr| attr.key.as_str() == NETWORK_TRANSPORT)
398 );
399 assert!(
400 attributes
401 .iter()
402 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
403 );
404 }
405}