Skip to main content

obs_core/
instrumented.rs

1//! `Instrumented<F>` — async scope + observer adapter.
2//!
3//! `obs::scope!` returns a `ScopeGuard` that pushes a frame onto the
4//! per-task stack on construction and pops it on `Drop`. For futures
5//! that cross `tokio::spawn` we cannot rely on a single-poll RAII
6//! guard, so we wrap the future in `Instrumented<F>` which re-enters
7//! the scope on every `poll`.
8//!
9//! Spec 13 § 3.
10
11use std::{
12    future::Future,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18use pin_project_lite::pin_project;
19
20use crate::{
21    observer::{Observer, with_observer_task_sync},
22    scope::{ScopeFrame, finish_scope_frame, pop_frame, push_frame},
23};
24
25pin_project! {
26    /// Future adapter that re-enters an `obs::scope!` frame and an
27    /// `Arc<dyn Observer>` override on every poll. Constructed via
28    /// [`Instrument::instrument`] / [`WithObserver::with_observer`].
29    #[must_use = "Instrumented<F> is a future; await it to drive the inner future"]
30    pub struct Instrumented<F> {
31        #[pin]
32        inner: F,
33        scope_frame: Option<ScopeFrame>,
34        observer: Option<Arc<dyn Observer>>,
35    }
36}
37
38impl<F> std::fmt::Debug for Instrumented<F> {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("Instrumented")
41            .field("has_scope", &self.scope_frame.is_some())
42            .field("has_observer", &self.observer.is_some())
43            .finish()
44    }
45}
46
47impl<F: Future> Future for Instrumented<F> {
48    type Output = F::Output;
49
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
51        let this = self.project();
52        let mut scope_guard = PollScopeGuard::enter(this.scope_frame);
53        let result = match this.observer.as_ref() {
54            Some(o) => with_observer_task_sync(o.clone(), || this.inner.poll(cx)),
55            None => this.inner.poll(cx),
56        };
57        if result.is_ready() {
58            scope_guard.finish_on_drop();
59        }
60        result
61    }
62}
63
64/// Per-poll scope guard: pushes the frame at poll-start, pops at
65/// poll-end. Kept private to this module so callers cannot bypass the
66/// guarantee that `Instrumented::poll` is the only path that mutates
67/// the per-task scope stack.
68struct PollScopeGuard<'a> {
69    slot: &'a mut Option<ScopeFrame>,
70    active: bool,
71    finished: bool,
72}
73
74impl<'a> PollScopeGuard<'a> {
75    fn enter(slot: &'a mut Option<ScopeFrame>) -> Self {
76        let Some(frame) = slot.take() else {
77            return Self {
78                slot,
79                active: false,
80                finished: false,
81            };
82        };
83        let _ = push_frame(frame);
84        Self {
85            slot,
86            active: true,
87            finished: false,
88        }
89    }
90
91    fn finish_on_drop(&mut self) {
92        self.finished = true;
93    }
94}
95
96impl Drop for PollScopeGuard<'_> {
97    fn drop(&mut self) {
98        if self.active
99            && let Some(frame) = pop_frame()
100        {
101            if self.finished {
102                finish_scope_frame(frame);
103            } else {
104                *self.slot = Some(frame);
105            }
106        }
107    }
108}
109
110/// Public extension trait — `.instrument(scope!(...))` on every future
111/// owned by user code.
112pub trait Instrument: Future + Sized {
113    /// Attach an `obs::scope!`-built frame to the future. The frame
114    /// is re-entered on every poll, so suspended futures keep their
115    /// scope across `.await` and `tokio::spawn` boundaries.
116    fn instrument(self, scope: ScopeFrame) -> Instrumented<Self> {
117        Instrumented {
118            inner: self,
119            scope_frame: Some(scope),
120            observer: None,
121        }
122    }
123}
124
125impl<F: Future> Instrument for F {}
126
127/// Public extension trait — `.with_observer(o)` on a future binds an
128/// observer override that follows the future across thread migration
129/// (per-task tier; spec 11 § 3.1).
130pub trait WithObserver: Future + Sized {
131    /// Bind a per-task observer override to the future.
132    fn with_observer(self, observer: Arc<dyn Observer>) -> Instrumented<Self> {
133        Instrumented {
134            inner: self,
135            scope_frame: None,
136            observer: Some(observer),
137        }
138    }
139}
140
141impl<F: Future> WithObserver for F {}
142
143impl<F: Future> Instrumented<F> {
144    /// Layer a scope on top of an `Instrumented` that already carries
145    /// an observer — supports both call orders described in spec 13.
146    pub fn instrument(mut self, scope: ScopeFrame) -> Self {
147        self.scope_frame = Some(scope);
148        self
149    }
150
151    /// Layer an observer on top of an `Instrumented` that already
152    /// carries a scope.
153    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
154        self.observer = Some(observer);
155        self
156    }
157}