1use std::{
17 collections::HashMap,
18 hash::{Hash, Hasher},
19 pin::Pin,
20 sync::Arc,
21 task::{Context as TaskContext, Poll},
22 time::{Duration, Instant},
23};
24
25use async_trait::async_trait;
26use futures_core::Stream;
27use tracing::Span;
28use tracing_opentelemetry::OpenTelemetrySpanExt;
29
30use crate::telemetry::capability_metrics::{
31 SlowThresholds, record_cache_op, record_messaging_publish,
32};
33use crate::telemetry::propagate::{extract_context_from_map, inject_current_context_map};
34
35use crate::Error;
36use crate::traits::{
37 Cache, Config, Database, DeliveredMessage, EventBus, MessageId, SecretStore, SubscribeOptions,
38 Subscription,
39};
40
41pub struct Instrumented<T: ?Sized> {
69 inner: Arc<T>,
70 slow: SlowThresholds,
71}
72
73impl<T: ?Sized> Clone for Instrumented<T> {
74 fn clone(&self) -> Self {
75 Self {
76 inner: self.inner.clone(),
77 slow: self.slow.clone(),
78 }
79 }
80}
81
82impl<T: ?Sized> Instrumented<T> {
83 pub fn new(inner: Arc<T>, slow: SlowThresholds) -> Self {
84 Self { inner, slow }
85 }
86
87 pub fn with_defaults(inner: Arc<T>) -> Self {
90 Self::new(inner, SlowThresholds::default())
91 }
92}
93
94#[async_trait]
97impl<T: Cache + ?Sized> Cache for Instrumented<T> {
98 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
99 let span = tracing::info_span!(
100 "cache.get",
101 cache.system = self.inner.system(),
102 cache.op = "get",
103 cache.key.hash = key_hash(key),
104 cache.hit = tracing::field::Empty,
105 );
106 let _enter = span.enter();
107 let start = Instant::now();
108 let result = self.inner.get(key).await;
109 let elapsed = start.elapsed();
110 let outcome = match &result {
111 Ok(Some(_)) => "hit",
112 Ok(None) => "miss",
113 Err(_) => "error",
114 };
115 if let Ok(opt) = &result {
116 Span::current().record("cache.hit", opt.is_some());
117 }
118 record_cache_op(self.inner.system(), "get", outcome, elapsed);
119 slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.get", key);
120 result
121 }
122
123 async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<(), Error> {
124 let span = tracing::info_span!(
125 "cache.set",
126 cache.system = self.inner.system(),
127 cache.op = "set",
128 cache.key.hash = key_hash(key),
129 ttl_ms = ttl.map(|d| d.as_millis() as u64),
130 );
131 let _enter = span.enter();
132 let start = Instant::now();
133 let result = self.inner.set(key, value, ttl).await;
134 let elapsed = start.elapsed();
135 let outcome = if result.is_ok() { "ok" } else { "error" };
136 record_cache_op(self.inner.system(), "set", outcome, elapsed);
137 slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.set", key);
138 result
139 }
140
141 async fn del(&self, key: &str) -> Result<(), Error> {
142 let span = tracing::info_span!(
143 "cache.del",
144 cache.system = self.inner.system(),
145 cache.op = "del",
146 cache.key.hash = key_hash(key),
147 );
148 let _enter = span.enter();
149 let start = Instant::now();
150 let result = self.inner.del(key).await;
151 let elapsed = start.elapsed();
152 let outcome = if result.is_ok() { "ok" } else { "error" };
153 record_cache_op(self.inner.system(), "del", outcome, elapsed);
154 slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.del", key);
155 result
156 }
157
158 async fn set_nx(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<bool, Error> {
159 let span = tracing::info_span!(
160 "cache.set_nx",
161 cache.system = self.inner.system(),
162 cache.op = "setnx",
163 cache.key.hash = key_hash(key),
164 ttl_ms = ttl.map(|d| d.as_millis() as u64),
165 cache.created = tracing::field::Empty,
166 );
167 let _enter = span.enter();
168 let start = Instant::now();
169 let result = self.inner.set_nx(key, value, ttl).await;
170 let elapsed = start.elapsed();
171 let outcome = match &result {
172 Ok(true) => "ok",
173 Ok(false) => "exists",
174 Err(_) => "error",
175 };
176 if let Ok(created) = &result {
177 Span::current().record("cache.created", *created);
178 }
179 record_cache_op(self.inner.system(), "setnx", outcome, elapsed);
180 slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.set_nx", key);
181 result
182 }
183
184 fn system(&self) -> &'static str {
185 self.inner.system()
186 }
187}
188
189#[async_trait]
197impl<T: Database + ?Sized> Database for Instrumented<T> {
198 fn url(&self) -> &str {
199 self.inner.url()
200 }
201 fn system(&self) -> &'static str {
202 self.inner.system()
203 }
204}
205
206#[async_trait]
209impl<T: SecretStore + ?Sized> SecretStore for Instrumented<T> {
210 async fn get(&self, key: &str) -> Result<String, Error> {
211 let span = tracing::info_span!("secret.get", secret.provider = self.inner.provider(),);
214 let _enter = span.enter();
215 let start = Instant::now();
216 let result = self.inner.get(key).await;
217 let elapsed = start.elapsed();
218 if result.is_err() {
222 tracing::warn!(
223 elapsed_ms = elapsed.as_millis() as u64,
224 provider = self.inner.provider(),
225 "secret.get failed",
226 );
227 }
228 result
229 }
230
231 fn provider(&self) -> &'static str {
232 self.inner.provider()
233 }
234}
235
236#[async_trait]
239impl<T: Config + ?Sized> Config for Instrumented<T> {
240 async fn get(&self, path: &str) -> Result<Option<Vec<u8>>, Error> {
241 let span = tracing::info_span!(
245 "config.get",
246 config.source = self.inner.source(),
247 config.path = path,
248 );
249 let _enter = span.enter();
250 let start = Instant::now();
251 let result = self.inner.get(path).await;
252 let elapsed = start.elapsed();
253 if result.is_err() {
254 tracing::warn!(
255 elapsed_ms = elapsed.as_millis() as u64,
256 source = self.inner.source(),
257 path,
258 "config.get failed",
259 );
260 }
261 result
262 }
263
264 fn watch(
265 &self,
266 path: &str,
267 interval: std::time::Duration,
268 ) -> tokio::sync::watch::Receiver<Option<Vec<u8>>> {
269 self.inner.watch(path, interval)
273 }
274
275 fn source(&self) -> &'static str {
276 self.inner.source()
277 }
278}
279
280#[async_trait]
283impl<T: EventBus + ?Sized> EventBus for Instrumented<T> {
284 async fn publish(&self, subject: &str, payload: &[u8]) -> Result<MessageId, Error> {
285 let mut headers = HashMap::new();
286 inject_current_context_map(&mut headers);
287 self.publish_inner(subject, payload, headers).await
288 }
289
290 async fn publish_with_headers(
291 &self,
292 subject: &str,
293 payload: &[u8],
294 mut headers: HashMap<String, String>,
295 ) -> Result<MessageId, Error> {
296 if !headers.contains_key("traceparent") {
299 inject_current_context_map(&mut headers);
300 }
301 self.publish_inner(subject, payload, headers).await
302 }
303
304 async fn subscribe(
305 &self,
306 subject_pattern: &str,
307 group: &str,
308 opts: SubscribeOptions,
309 ) -> Result<Subscription, Error> {
310 let inner_sub = self.inner.subscribe(subject_pattern, group, opts).await?;
311 let system: &'static str = self.inner.system();
312 let group_owned = group.to_string();
313 let subject_owned = subject_pattern.to_string();
314 Ok(Subscription::new(InstrumentedSubscription {
315 inner: inner_sub,
316 system,
317 group: group_owned,
318 subject: subject_owned,
319 }))
320 }
321
322 fn system(&self) -> &'static str {
323 self.inner.system()
324 }
325}
326
327impl<T: EventBus + ?Sized> Instrumented<T> {
328 async fn publish_inner(
329 &self,
330 subject: &str,
331 payload: &[u8],
332 headers: HashMap<String, String>,
333 ) -> Result<MessageId, Error> {
334 let span = tracing::info_span!(
335 "messaging.publish",
336 messaging.system = self.inner.system(),
337 messaging.destination.name = subject,
338 messaging.destination.kind = "topic",
339 messaging.message.body.size = payload.len(),
340 );
341 let _enter = span.enter();
342 let start = Instant::now();
343 let result = self
344 .inner
345 .publish_with_headers(subject, payload, headers)
346 .await;
347 let elapsed = start.elapsed();
348 let outcome = if result.is_ok() { "ok" } else { "error" };
349 record_messaging_publish(self.inner.system(), subject, outcome, elapsed);
350 if elapsed > self.slow.messaging_publish || result.is_err() {
351 tracing::warn!(
352 elapsed_ms = elapsed.as_millis() as u64,
353 subject,
354 error = result.as_ref().err().map(|e| e.to_string()),
355 "slow or failing messaging.publish",
356 );
357 }
358 result
359 }
360}
361
362struct InstrumentedSubscription {
368 inner: Subscription,
369 system: &'static str,
370 group: String,
371 subject: String,
372}
373
374impl Stream for InstrumentedSubscription {
375 type Item = DeliveredMessage;
376
377 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
378 let this = self.as_mut().get_mut();
379 match Pin::new(&mut this.inner).poll_next(cx) {
380 Poll::Ready(Some(msg)) => {
381 let span = tracing::info_span!(
387 "messaging.process",
388 messaging.system = this.system,
389 messaging.destination.name = %msg.subject,
390 messaging.consumer.id = %this.group,
391 messaging.message.id = %msg.id,
392 messaging.delivery_attempt = msg.delivery_attempt,
393 subscription.subject_pattern = %this.subject,
394 );
395 let parent_cx = extract_context_from_map(&msg.headers);
396 span.set_parent(parent_cx);
397 drop(span);
398 Poll::Ready(Some(msg))
399 }
400 Poll::Ready(None) => Poll::Ready(None),
401 Poll::Pending => Poll::Pending,
402 }
403 }
404}
405
406fn slow_or_error_log<T>(
409 elapsed: Duration,
410 threshold: Duration,
411 result: &Result<T, Error>,
412 op: &'static str,
413 key_for_hash: &str,
414) {
415 if elapsed > threshold || result.is_err() {
416 tracing::warn!(
417 elapsed_ms = elapsed.as_millis() as u64,
418 key.hash = key_hash(key_for_hash),
419 error = result.as_ref().err().map(|e| e.to_string()),
420 "slow or failing {op}",
421 op = op,
422 );
423 }
424}
425
426fn key_hash(key: &str) -> u64 {
429 let mut h = std::collections::hash_map::DefaultHasher::new();
430 key.hash(&mut h);
431 h.finish()
432}