Skip to main content

tonin_core/
instrumented.rs

1//! `Instrumented<T>` — the single decorator that gives every capability
2//! call a span, metric, and slow-op WARN log.
3//!
4//! Service authors never see this directly. Phase 5 will wrap each
5//! capability returned from `Service` accessors in `Instrumented`, so
6//! `svc.cache().get("k")` automatically gets:
7//!
8//! - a `cache.get` span as a child of the current request span,
9//! - a `cache_op_duration_ms` histogram observation,
10//! - a `cache_op_total` counter increment,
11//! - a WARN log if the call exceeds `SlowThresholds::cache_op` or fails.
12//!
13//! New backend impls (`tonin-redis`, future `tonin-postgres`) get
14//! all of this for free — they only implement the trait methods.
15
16use std::{
17    collections::HashMap,
18    hash::{Hash, Hasher},
19    pin::Pin,
20    sync::Arc,
21    task::{Context as TaskContext, Poll},
22    time::{Duration, Instant},
23};
24
25use async_trait::async_trait;
26use futures_core::Stream;
27use tracing::Span;
28use tracing_opentelemetry::OpenTelemetrySpanExt;
29
30use crate::telemetry::capability_metrics::{
31    SlowThresholds, record_cache_op, record_messaging_publish,
32};
33use crate::telemetry::propagate::{extract_context_from_map, inject_current_context_map};
34
35use crate::Error;
36use crate::traits::{
37    Cache, Config, Database, DeliveredMessage, EventBus, MessageId, SecretStore, SubscribeOptions,
38    Subscription,
39};
40
41/// Decorator that wraps any capability impl with telemetry. Cheap to
42/// clone — internal state is an `Arc` to the inner impl plus a small
43/// `SlowThresholds`.
44///
45/// ```
46/// # use std::sync::Arc;
47/// # use std::time::Duration;
48/// # use async_trait::async_trait;
49/// # use tonin_core::{instrumented::Instrumented, traits::Cache, Error};
50/// struct MockCache;
51/// #[async_trait]
52/// impl Cache for MockCache {
53///     fn system(&self) -> &'static str { "mock" }
54///     async fn get(&self, _k: &str) -> Result<Option<Vec<u8>>, Error> {
55///         Ok(Some(b"hi".to_vec()))
56///     }
57///     async fn set(&self, _k: &str, _v: &[u8], _t: Option<Duration>) -> Result<(), Error> { Ok(()) }
58///     async fn del(&self, _k: &str) -> Result<(), Error> { Ok(()) }
59///     async fn set_nx(&self, _k: &str, _v: &[u8], _t: Option<Duration>) -> Result<bool, Error> { Ok(true) }
60/// }
61///
62/// # tokio_test::block_on(async {
63/// let cache = Instrumented::with_defaults(Arc::new(MockCache));
64/// let v = Cache::get(&cache, "hello").await.unwrap();
65/// assert_eq!(v, Some(b"hi".to_vec()));
66/// # });
67/// ```
68pub struct Instrumented<T: ?Sized> {
69    inner: Arc<T>,
70    slow: SlowThresholds,
71}
72
73impl<T: ?Sized> Clone for Instrumented<T> {
74    fn clone(&self) -> Self {
75        Self {
76            inner: self.inner.clone(),
77            slow: self.slow.clone(),
78        }
79    }
80}
81
82impl<T: ?Sized> Instrumented<T> {
83    pub fn new(inner: Arc<T>, slow: SlowThresholds) -> Self {
84        Self { inner, slow }
85    }
86
87    /// Use the framework's default thresholds — handy for tests and the
88    /// Phase 5 wiring path when no `[telemetry.slow]` was provided.
89    pub fn with_defaults(inner: Arc<T>) -> Self {
90        Self::new(inner, SlowThresholds::default())
91    }
92}
93
94// ---------- Cache impl ----------
95
96#[async_trait]
97impl<T: Cache + ?Sized> Cache for Instrumented<T> {
98    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
99        let span = tracing::info_span!(
100            "cache.get",
101            cache.system = self.inner.system(),
102            cache.op = "get",
103            cache.key.hash = key_hash(key),
104            cache.hit = tracing::field::Empty,
105        );
106        let _enter = span.enter();
107        let start = Instant::now();
108        let result = self.inner.get(key).await;
109        let elapsed = start.elapsed();
110        let outcome = match &result {
111            Ok(Some(_)) => "hit",
112            Ok(None) => "miss",
113            Err(_) => "error",
114        };
115        if let Ok(opt) = &result {
116            Span::current().record("cache.hit", opt.is_some());
117        }
118        record_cache_op(self.inner.system(), "get", outcome, elapsed);
119        slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.get", key);
120        result
121    }
122
123    async fn set(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<(), Error> {
124        let span = tracing::info_span!(
125            "cache.set",
126            cache.system = self.inner.system(),
127            cache.op = "set",
128            cache.key.hash = key_hash(key),
129            ttl_ms = ttl.map(|d| d.as_millis() as u64),
130        );
131        let _enter = span.enter();
132        let start = Instant::now();
133        let result = self.inner.set(key, value, ttl).await;
134        let elapsed = start.elapsed();
135        let outcome = if result.is_ok() { "ok" } else { "error" };
136        record_cache_op(self.inner.system(), "set", outcome, elapsed);
137        slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.set", key);
138        result
139    }
140
141    async fn del(&self, key: &str) -> Result<(), Error> {
142        let span = tracing::info_span!(
143            "cache.del",
144            cache.system = self.inner.system(),
145            cache.op = "del",
146            cache.key.hash = key_hash(key),
147        );
148        let _enter = span.enter();
149        let start = Instant::now();
150        let result = self.inner.del(key).await;
151        let elapsed = start.elapsed();
152        let outcome = if result.is_ok() { "ok" } else { "error" };
153        record_cache_op(self.inner.system(), "del", outcome, elapsed);
154        slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.del", key);
155        result
156    }
157
158    async fn set_nx(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<bool, Error> {
159        let span = tracing::info_span!(
160            "cache.set_nx",
161            cache.system = self.inner.system(),
162            cache.op = "setnx",
163            cache.key.hash = key_hash(key),
164            ttl_ms = ttl.map(|d| d.as_millis() as u64),
165            cache.created = tracing::field::Empty,
166        );
167        let _enter = span.enter();
168        let start = Instant::now();
169        let result = self.inner.set_nx(key, value, ttl).await;
170        let elapsed = start.elapsed();
171        let outcome = match &result {
172            Ok(true) => "ok",
173            Ok(false) => "exists",
174            Err(_) => "error",
175        };
176        if let Ok(created) = &result {
177            Span::current().record("cache.created", *created);
178        }
179        record_cache_op(self.inner.system(), "setnx", outcome, elapsed);
180        slow_or_error_log(elapsed, self.slow.cache_op, &result, "cache.set_nx", key);
181        result
182    }
183
184    fn system(&self) -> &'static str {
185        self.inner.system()
186    }
187}
188
189// ---------- Database impl ----------
190//
191// Pass-through. The trait has no methods worth wrapping (url() is sync
192// and infallible; pool() is a borrow). Query spans come from the user's
193// chosen DB client (sqlx, sea-orm), which emits OTel spans via the
194// global tracer that `crate::telemetry::init` installs.
195
196#[async_trait]
197impl<T: Database + ?Sized> Database for Instrumented<T> {
198    fn url(&self) -> &str {
199        self.inner.url()
200    }
201    fn system(&self) -> &'static str {
202        self.inner.system()
203    }
204}
205
206// ---------- SecretStore impl ----------
207
208#[async_trait]
209impl<T: SecretStore + ?Sized> SecretStore for Instrumented<T> {
210    async fn get(&self, key: &str) -> Result<String, Error> {
211        // The key name is intentionally NOT a span attribute — secret
212        // names can be sensitive. Only the provider is recorded.
213        let span = tracing::info_span!("secret.get", secret.provider = self.inner.provider(),);
214        let _enter = span.enter();
215        let start = Instant::now();
216        let result = self.inner.get(key).await;
217        let elapsed = start.elapsed();
218        // No metric for SecretStore in Phase 3 — it's hot-path-cached
219        // and uninteresting volumetrically. Add later if a backend
220        // makes per-call network hops.
221        if result.is_err() {
222            tracing::warn!(
223                elapsed_ms = elapsed.as_millis() as u64,
224                provider = self.inner.provider(),
225                "secret.get failed",
226            );
227        }
228        result
229    }
230
231    fn provider(&self) -> &'static str {
232        self.inner.provider()
233    }
234}
235
236// ---------- Config impl ----------
237
238#[async_trait]
239impl<T: Config + ?Sized> Config for Instrumented<T> {
240    async fn get(&self, path: &str) -> Result<Option<Vec<u8>>, Error> {
241        // Path IS recorded as a span attribute — unlike SecretStore::get
242        // where the key name can be sensitive, config paths are application
243        // structure (db.pool.max, feature.flag) and are safe to record.
244        let span = tracing::info_span!(
245            "config.get",
246            config.source = self.inner.source(),
247            config.path = path,
248        );
249        let _enter = span.enter();
250        let start = Instant::now();
251        let result = self.inner.get(path).await;
252        let elapsed = start.elapsed();
253        if result.is_err() {
254            tracing::warn!(
255                elapsed_ms = elapsed.as_millis() as u64,
256                source = self.inner.source(),
257                path,
258                "config.get failed",
259            );
260        }
261        result
262    }
263
264    fn watch(
265        &self,
266        path: &str,
267        interval: std::time::Duration,
268    ) -> tokio::sync::watch::Receiver<Option<Vec<u8>>> {
269        // No span around watch() — the channel lives much longer than
270        // any single span and emits a stream of values; instrumenting
271        // each emission lives in the consumer's processing loop.
272        self.inner.watch(path, interval)
273    }
274
275    fn source(&self) -> &'static str {
276        self.inner.source()
277    }
278}
279
280// ---------- EventBus impl ----------
281
282#[async_trait]
283impl<T: EventBus + ?Sized> EventBus for Instrumented<T> {
284    async fn publish(&self, subject: &str, payload: &[u8]) -> Result<MessageId, Error> {
285        let mut headers = HashMap::new();
286        inject_current_context_map(&mut headers);
287        self.publish_inner(subject, payload, headers).await
288    }
289
290    async fn publish_with_headers(
291        &self,
292        subject: &str,
293        payload: &[u8],
294        mut headers: HashMap<String, String>,
295    ) -> Result<MessageId, Error> {
296        // Only inject if the caller didn't already provide one — caller-
297        // supplied `traceparent` wins (rare, but matters for testing).
298        if !headers.contains_key("traceparent") {
299            inject_current_context_map(&mut headers);
300        }
301        self.publish_inner(subject, payload, headers).await
302    }
303
304    async fn subscribe(
305        &self,
306        subject_pattern: &str,
307        group: &str,
308        opts: SubscribeOptions,
309    ) -> Result<Subscription, Error> {
310        let inner_sub = self.inner.subscribe(subject_pattern, group, opts).await?;
311        let system: &'static str = self.inner.system();
312        let group_owned = group.to_string();
313        let subject_owned = subject_pattern.to_string();
314        Ok(Subscription::new(InstrumentedSubscription {
315            inner: inner_sub,
316            system,
317            group: group_owned,
318            subject: subject_owned,
319        }))
320    }
321
322    fn system(&self) -> &'static str {
323        self.inner.system()
324    }
325}
326
327impl<T: EventBus + ?Sized> Instrumented<T> {
328    async fn publish_inner(
329        &self,
330        subject: &str,
331        payload: &[u8],
332        headers: HashMap<String, String>,
333    ) -> Result<MessageId, Error> {
334        let span = tracing::info_span!(
335            "messaging.publish",
336            messaging.system = self.inner.system(),
337            messaging.destination.name = subject,
338            messaging.destination.kind = "topic",
339            messaging.message.body.size = payload.len(),
340        );
341        let _enter = span.enter();
342        let start = Instant::now();
343        let result = self
344            .inner
345            .publish_with_headers(subject, payload, headers)
346            .await;
347        let elapsed = start.elapsed();
348        let outcome = if result.is_ok() { "ok" } else { "error" };
349        record_messaging_publish(self.inner.system(), subject, outcome, elapsed);
350        if elapsed > self.slow.messaging_publish || result.is_err() {
351            tracing::warn!(
352                elapsed_ms = elapsed.as_millis() as u64,
353                subject,
354                error = result.as_ref().err().map(|e| e.to_string()),
355                "slow or failing messaging.publish",
356            );
357        }
358        result
359    }
360}
361
362/// Wraps the backend `Subscription` and creates a `messaging.process`
363/// span per yielded message. The span's parent is the publisher's
364/// context, extracted from `msg.headers`. If the header is missing or
365/// malformed, the span becomes a new root — never grafted onto
366/// `Span::current()`, which would lie about causality.
367struct InstrumentedSubscription {
368    inner: Subscription,
369    system: &'static str,
370    group: String,
371    subject: String,
372}
373
374impl Stream for InstrumentedSubscription {
375    type Item = DeliveredMessage;
376
377    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
378        let this = self.as_mut().get_mut();
379        match Pin::new(&mut this.inner).poll_next(cx) {
380            Poll::Ready(Some(msg)) => {
381                // Emit messaging.process parented to the publisher's
382                // context. Consumer handlers create their own spans;
383                // those are linked to this trace via the propagator if
384                // the handler enters this span. We don't enter it here
385                // — the consumer owns processing-time accounting.
386                let span = tracing::info_span!(
387                    "messaging.process",
388                    messaging.system = this.system,
389                    messaging.destination.name = %msg.subject,
390                    messaging.consumer.id = %this.group,
391                    messaging.message.id = %msg.id,
392                    messaging.delivery_attempt = msg.delivery_attempt,
393                    subscription.subject_pattern = %this.subject,
394                );
395                let parent_cx = extract_context_from_map(&msg.headers);
396                span.set_parent(parent_cx);
397                drop(span);
398                Poll::Ready(Some(msg))
399            }
400            Poll::Ready(None) => Poll::Ready(None),
401            Poll::Pending => Poll::Pending,
402        }
403    }
404}
405
406// ---------- helpers ----------
407
408fn slow_or_error_log<T>(
409    elapsed: Duration,
410    threshold: Duration,
411    result: &Result<T, Error>,
412    op: &'static str,
413    key_for_hash: &str,
414) {
415    if elapsed > threshold || result.is_err() {
416        tracing::warn!(
417            elapsed_ms = elapsed.as_millis() as u64,
418            key.hash = key_hash(key_for_hash),
419            error = result.as_ref().err().map(|e| e.to_string()),
420            "slow or failing {op}",
421            op = op,
422        );
423    }
424}
425
426/// Stable, low-cardinality hash. Not cryptographic — bounded label
427/// cardinality, that's all.
428fn key_hash(key: &str) -> u64 {
429    let mut h = std::collections::hash_map::DefaultHasher::new();
430    key.hash(&mut h);
431    h.finish()
432}