Skip to main content

moire_runtime/
futures.rs

1use moire_trace_types::BacktraceId;
2use moire_types::{EdgeKind, EntityId, FutureEntity};
3use std::cell::RefCell;
4use std::future::{Future, IntoFuture};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use super::FUTURE_CAUSAL_STACK;
9use super::db::runtime_db;
10use super::handles::{EntityHandle, EntityRef, current_causal_target_from_stack};
11
12pub struct OperationFuture<F> {
13    inner: F,
14    actor_id: Option<EntityId>,
15    resource_id: EntityId,
16    current_edge: Option<EdgeKind>,
17    backtrace: BacktraceId,
18}
19
20impl<F> OperationFuture<F> {
21    fn new(inner: F, resource_id: EntityId) -> Self {
22        Self::new_with_actor(
23            inner,
24            resource_id,
25            current_causal_target_from_stack().map(|target| target.id().clone()),
26        )
27    }
28
29    fn new_with_actor(inner: F, resource_id: EntityId, actor_id: Option<EntityId>) -> Self {
30        Self {
31            inner,
32            actor_id,
33            resource_id,
34            current_edge: None,
35            backtrace: super::capture_backtrace_id(),
36        }
37    }
38
39    fn transition_edge(&mut self, next: Option<EdgeKind>) {
40        if self.current_edge == next {
41            return;
42        }
43        let Some(actor_id) = self.actor_id.as_ref() else {
44            self.current_edge = next;
45            return;
46        };
47        if let Ok(mut db) = runtime_db().lock() {
48            if let Some(current) = self.current_edge {
49                db.remove_edge(actor_id, &self.resource_id, current);
50            }
51            if let Some(edge) = next {
52                db.upsert_edge(actor_id, &self.resource_id, edge, self.backtrace);
53            }
54        }
55        self.current_edge = next;
56    }
57}
58
59impl<F> Future for OperationFuture<F>
60where
61    F: Future,
62{
63    type Output = F::Output;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let this = unsafe { self.get_unchecked_mut() };
67        if this.current_edge.is_none() {
68            this.transition_edge(Some(EdgeKind::Polls));
69        }
70
71        match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
72            Poll::Pending => {
73                this.transition_edge(Some(EdgeKind::WaitingOn));
74                Poll::Pending
75            }
76            Poll::Ready(output) => {
77                this.transition_edge(None);
78                Poll::Ready(output)
79            }
80        }
81    }
82}
83
84impl<F> Drop for OperationFuture<F> {
85    fn drop(&mut self) {
86        self.transition_edge(None);
87    }
88}
89
90pub fn instrument_operation_on<F, S>(on: &EntityHandle<S>, fut: F) -> OperationFuture<F::IntoFuture>
91where
92    F: IntoFuture,
93{
94    OperationFuture::new(fut.into_future(), EntityId::new(on.id().as_str()))
95}
96
97pub fn instrument_operation_on_with_actor<F, S>(
98    on: &EntityHandle<S>,
99    actor: Option<&EntityRef>,
100    fut: F,
101) -> OperationFuture<F::IntoFuture>
102where
103    F: IntoFuture,
104{
105    OperationFuture::new_with_actor(
106        fut.into_future(),
107        EntityId::new(on.id().as_str()),
108        actor.map(|target| target.id().clone()),
109    )
110}
111
112pub struct InstrumentedFuture<F> {
113    inner: F,
114    pub(super) future_handle: EntityHandle<FutureEntity>,
115    backtrace: BacktraceId,
116    awaited_by: Option<FutureEdgeRelation>,
117    waits_on: Option<FutureEdgeRelation>,
118}
119
120#[derive(Clone, Copy)]
121enum FutureEdgeDirection {
122    ParentToChild,
123    ChildToTarget,
124}
125
126struct FutureEdgeRelation {
127    target: EntityRef,
128    direction: FutureEdgeDirection,
129    current_edge: Option<EdgeKind>,
130}
131
132impl FutureEdgeRelation {
133    fn new(target: EntityRef, direction: FutureEdgeDirection) -> Self {
134        Self {
135            target,
136            direction,
137            current_edge: None,
138        }
139    }
140}
141
142impl<F> InstrumentedFuture<F> {
143    fn new(inner: F, future_handle: EntityHandle<FutureEntity>, target: Option<EntityRef>) -> Self {
144        let awaited_by = current_causal_target_from_stack().and_then(|parent| {
145            if parent.id().as_str() == future_handle.id().as_str() {
146                None
147            } else {
148                Some(FutureEdgeRelation::new(
149                    parent,
150                    FutureEdgeDirection::ParentToChild,
151                ))
152            }
153        });
154        let waits_on = target
155            .map(|target| FutureEdgeRelation::new(target, FutureEdgeDirection::ChildToTarget));
156        Self {
157            inner,
158            future_handle,
159            backtrace: super::capture_backtrace_id(),
160            awaited_by,
161            waits_on,
162        }
163    }
164
165    /// Sets how many entry frames to skip when displaying this future in the dashboard.
166    pub fn skip_entry_frames(self, n: u8) -> Self {
167        self.future_handle.mutate(|f| f.skip_entry_frames = Some(n));
168        self
169    }
170
171    /// Sets the entity this future is waiting on, for dashboard edge display.
172    pub fn on(mut self, target: EntityRef) -> Self {
173        self.waits_on = Some(FutureEdgeRelation::new(
174            target,
175            FutureEdgeDirection::ChildToTarget,
176        ));
177        self
178    }
179}
180
181fn transition_relation_edge(
182    future_id: &EntityId,
183    backtrace: BacktraceId,
184    relation: &mut FutureEdgeRelation,
185    next_edge: Option<EdgeKind>,
186) {
187    if relation.current_edge == next_edge {
188        return;
189    }
190
191    let (src, dst) = match relation.direction {
192        FutureEdgeDirection::ParentToChild => (
193            EntityId::new(relation.target.id().as_str()),
194            EntityId::new(future_id.as_str()),
195        ),
196        FutureEdgeDirection::ChildToTarget => (
197            EntityId::new(future_id.as_str()),
198            EntityId::new(relation.target.id().as_str()),
199        ),
200    };
201    if let Ok(mut db) = runtime_db().lock() {
202        if let Some(current_edge) = relation.current_edge {
203            db.remove_edge(&src, &dst, current_edge);
204        }
205        if let Some(edge) = next_edge {
206            db.upsert_edge(&src, &dst, edge, backtrace);
207        }
208    }
209    relation.current_edge = next_edge;
210}
211
212impl<F: Future> InstrumentedFuture<F> {
213    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<F::Output> {
214        let future_id = EntityId::new(self.future_handle.id().as_str());
215        if let Ok(mut db) = runtime_db().lock() {
216            let _ = db.link_entity_to_current_task_scope(&future_id);
217        }
218        FUTURE_CAUSAL_STACK.with(|stack| {
219            stack.borrow_mut().push(EntityId::new(future_id.as_str()));
220        });
221
222        if let Some(relation) = self.awaited_by.as_mut() {
223            transition_relation_edge(&future_id, self.backtrace, relation, Some(EdgeKind::Polls));
224        }
225        if let Some(relation) = self.waits_on.as_mut() {
226            transition_relation_edge(&future_id, self.backtrace, relation, Some(EdgeKind::Polls));
227        }
228
229        let poll = unsafe { Pin::new_unchecked(&mut self.inner) }.poll(cx);
230        FUTURE_CAUSAL_STACK.with(|stack| {
231            stack.borrow_mut().pop();
232        });
233
234        match poll {
235            Poll::Pending => {
236                if let Some(relation) = self.awaited_by.as_mut() {
237                    transition_relation_edge(
238                        &future_id,
239                        self.backtrace,
240                        relation,
241                        Some(EdgeKind::WaitingOn),
242                    );
243                }
244                if let Some(relation) = self.waits_on.as_mut() {
245                    transition_relation_edge(
246                        &future_id,
247                        self.backtrace,
248                        relation,
249                        Some(EdgeKind::WaitingOn),
250                    );
251                }
252                Poll::Pending
253            }
254            Poll::Ready(output) => {
255                if let Some(relation) = self.awaited_by.as_mut() {
256                    transition_relation_edge(&future_id, self.backtrace, relation, None);
257                }
258                if let Some(relation) = self.waits_on.as_mut() {
259                    transition_relation_edge(&future_id, self.backtrace, relation, None);
260                }
261                Poll::Ready(output)
262            }
263        }
264    }
265}
266
267impl<F> Future for InstrumentedFuture<F>
268where
269    F: Future,
270{
271    type Output = F::Output;
272
273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274        let this = unsafe { self.get_unchecked_mut() };
275        let has_stack = FUTURE_CAUSAL_STACK.try_with(|_| ()).is_ok();
276        if has_stack {
277            this.poll_inner(cx)
278        } else {
279            FUTURE_CAUSAL_STACK.sync_scope(RefCell::new(Vec::new()), || this.poll_inner(cx))
280        }
281    }
282}
283
284impl<F> Drop for InstrumentedFuture<F> {
285    fn drop(&mut self) {
286        let future_id = EntityId::new(self.future_handle.id().as_str());
287        if let Some(relation) = self.awaited_by.as_mut() {
288            transition_relation_edge(&future_id, self.backtrace, relation, None);
289        }
290        if let Some(relation) = self.waits_on.as_mut() {
291            transition_relation_edge(&future_id, self.backtrace, relation, None);
292        }
293    }
294}
295
296pub fn instrument_future<F>(
297    name: impl Into<String>,
298    fut: F,
299    on: Option<EntityRef>,
300    _meta: Option<facet_value::Value>,
301) -> InstrumentedFuture<F::IntoFuture>
302where
303    F: IntoFuture,
304{
305    let handle = EntityHandle::new(name, FutureEntity::default());
306    instrument_future_with_handle(handle, fut, on, None)
307}
308
309pub fn instrument_future_with_handle<F>(
310    handle: EntityHandle<FutureEntity>,
311    fut: F,
312    on: Option<EntityRef>,
313    _meta: Option<facet_value::Value>,
314) -> InstrumentedFuture<F::IntoFuture>
315where
316    F: IntoFuture,
317{
318    InstrumentedFuture::new(fut.into_future(), handle, on)
319}