Skip to main content

obs_core/
instrumented.rs

1//! `Instrumented<F>` — async scope + observer adapter.
2//!
3//! `obs::scope!` returns a `ScopeGuard` that pushes a frame onto the
4//! per-task stack on construction and pops it on `Drop`. For futures
5//! that cross `tokio::spawn` we cannot rely on a single-poll RAII
6//! guard, so we wrap the future in `Instrumented<F>` which re-enters
7//! the scope on every `poll`.
8//!
9//! Spec 13 § 3.
10
11use std::{
12    future::Future,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18use pin_project_lite::pin_project;
19
20use crate::{
21    observer::{Observer, with_observer_task_sync},
22    scope::{ScopeField, ScopeFrame, ScopeKind, pop_frame, push_frame},
23};
24
25pin_project! {
26    /// Future adapter that re-enters an `obs::scope!` frame and an
27    /// `Arc<dyn Observer>` override on every poll. Constructed via
28    /// [`Instrument::instrument`] / [`WithObserver::with_observer`].
29    #[must_use = "Instrumented<F> is a future; await it to drive the inner future"]
30    pub struct Instrumented<F> {
31        #[pin]
32        inner: F,
33        scope_seed: Option<ScopeSeed>,
34        observer: Option<Arc<dyn Observer>>,
35    }
36}
37
38impl<F> std::fmt::Debug for Instrumented<F> {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("Instrumented")
41            .field("has_scope", &self.scope_seed.is_some())
42            .field("has_observer", &self.observer.is_some())
43            .finish()
44    }
45}
46
47/// Frame-shaped seed cloned on every poll. Holding the frame as a seed
48/// (rather than the live `ScopeFrame`) means each poll gets a fresh
49/// tail buffer, which is what the spec requires for safe re-entry.
50#[derive(Debug, Clone)]
51struct ScopeSeed {
52    fields: Vec<ScopeField>,
53    kind: ScopeKind,
54    tail_capacity: u16,
55}
56
57impl ScopeSeed {
58    fn into_frame(self) -> ScopeFrame {
59        ScopeFrame::new(self.fields, self.kind, self.tail_capacity)
60    }
61
62    fn from_frame(f: &ScopeFrame) -> Self {
63        Self {
64            fields: f.fields().to_vec(),
65            kind: f.kind(),
66            tail_capacity: f.tail_capacity(),
67        }
68    }
69}
70
71impl<F: Future> Future for Instrumented<F> {
72    type Output = F::Output;
73
74    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
75        let this = self.project();
76        let _scope_guard = this
77            .scope_seed
78            .as_ref()
79            .map(|seed| PollScopeGuard::push(seed.clone().into_frame()));
80        match this.observer.as_ref() {
81            Some(o) => with_observer_task_sync(o.clone(), || this.inner.poll(cx)),
82            None => this.inner.poll(cx),
83        }
84    }
85}
86
87/// Per-poll scope guard: pushes the frame at poll-start, pops at
88/// poll-end. Kept private to this module so callers cannot bypass the
89/// guarantee that `Instrumented::poll` is the only path that mutates
90/// the per-task scope stack.
91struct PollScopeGuard;
92
93impl PollScopeGuard {
94    fn push(frame: ScopeFrame) -> Self {
95        let _ = push_frame(frame);
96        Self
97    }
98}
99
100impl Drop for PollScopeGuard {
101    fn drop(&mut self) {
102        let _ = pop_frame();
103    }
104}
105
106/// Public extension trait — `.instrument(scope!(...))` on every future
107/// owned by user code.
108pub trait Instrument: Future + Sized {
109    /// Attach an `obs::scope!`-built frame to the future. The frame
110    /// is re-entered on every poll, so suspended futures keep their
111    /// scope across `.await` and `tokio::spawn` boundaries.
112    fn instrument(self, scope: ScopeFrame) -> Instrumented<Self> {
113        Instrumented {
114            inner: self,
115            scope_seed: Some(ScopeSeed::from_frame(&scope)),
116            observer: None,
117        }
118    }
119}
120
121impl<F: Future> Instrument for F {}
122
123/// Public extension trait — `.with_observer(o)` on a future binds an
124/// observer override that follows the future across thread migration
125/// (per-task tier; spec 11 § 3.1).
126pub trait WithObserver: Future + Sized {
127    /// Bind a per-task observer override to the future.
128    fn with_observer(self, observer: Arc<dyn Observer>) -> Instrumented<Self> {
129        Instrumented {
130            inner: self,
131            scope_seed: None,
132            observer: Some(observer),
133        }
134    }
135}
136
137impl<F: Future> WithObserver for F {}
138
139impl<F: Future> Instrumented<F> {
140    /// Layer a scope on top of an `Instrumented` that already carries
141    /// an observer — supports both call orders described in spec 13.
142    pub fn instrument(mut self, scope: ScopeFrame) -> Self {
143        self.scope_seed = Some(ScopeSeed::from_frame(&scope));
144        self
145    }
146
147    /// Layer an observer on top of an `Instrumented` that already
148    /// carries a scope.
149    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
150        self.observer = Some(observer);
151        self
152    }
153}