1use moire_trace_types::BacktraceId;
2use moire_types::{EdgeKind, EntityId, FutureEntity};
3use std::cell::RefCell;
4use std::future::{Future, IntoFuture};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use super::FUTURE_CAUSAL_STACK;
9use super::db::runtime_db;
10use super::handles::{EntityHandle, EntityRef, current_causal_target_from_stack};
11
12pub struct OperationFuture<F> {
13 inner: F,
14 actor_id: Option<EntityId>,
15 resource_id: EntityId,
16 current_edge: Option<EdgeKind>,
17 backtrace: BacktraceId,
18}
19
20impl<F> OperationFuture<F> {
21 fn new(inner: F, resource_id: EntityId) -> Self {
22 Self::new_with_actor(
23 inner,
24 resource_id,
25 current_causal_target_from_stack().map(|target| target.id().clone()),
26 )
27 }
28
29 fn new_with_actor(inner: F, resource_id: EntityId, actor_id: Option<EntityId>) -> Self {
30 Self {
31 inner,
32 actor_id,
33 resource_id,
34 current_edge: None,
35 backtrace: super::capture_backtrace_id(),
36 }
37 }
38
39 fn transition_edge(&mut self, next: Option<EdgeKind>) {
40 if self.current_edge == next {
41 return;
42 }
43 let Some(actor_id) = self.actor_id.as_ref() else {
44 self.current_edge = next;
45 return;
46 };
47 if let Ok(mut db) = runtime_db().lock() {
48 if let Some(current) = self.current_edge {
49 db.remove_edge(actor_id, &self.resource_id, current);
50 }
51 if let Some(edge) = next {
52 db.upsert_edge(actor_id, &self.resource_id, edge, self.backtrace);
53 }
54 }
55 self.current_edge = next;
56 }
57}
58
59impl<F> Future for OperationFuture<F>
60where
61 F: Future,
62{
63 type Output = F::Output;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let this = unsafe { self.get_unchecked_mut() };
67 if this.current_edge.is_none() {
68 this.transition_edge(Some(EdgeKind::Polls));
69 }
70
71 match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
72 Poll::Pending => {
73 this.transition_edge(Some(EdgeKind::WaitingOn));
74 Poll::Pending
75 }
76 Poll::Ready(output) => {
77 this.transition_edge(None);
78 Poll::Ready(output)
79 }
80 }
81 }
82}
83
84impl<F> Drop for OperationFuture<F> {
85 fn drop(&mut self) {
86 self.transition_edge(None);
87 }
88}
89
90pub fn instrument_operation_on<F, S>(on: &EntityHandle<S>, fut: F) -> OperationFuture<F::IntoFuture>
91where
92 F: IntoFuture,
93{
94 OperationFuture::new(fut.into_future(), EntityId::new(on.id().as_str()))
95}
96
97pub fn instrument_operation_on_with_actor<F, S>(
98 on: &EntityHandle<S>,
99 actor: Option<&EntityRef>,
100 fut: F,
101) -> OperationFuture<F::IntoFuture>
102where
103 F: IntoFuture,
104{
105 OperationFuture::new_with_actor(
106 fut.into_future(),
107 EntityId::new(on.id().as_str()),
108 actor.map(|target| target.id().clone()),
109 )
110}
111
112pub struct InstrumentedFuture<F> {
113 inner: F,
114 pub(super) future_handle: EntityHandle<FutureEntity>,
115 backtrace: BacktraceId,
116 awaited_by: Option<FutureEdgeRelation>,
117 waits_on: Option<FutureEdgeRelation>,
118}
119
120#[derive(Clone, Copy)]
121enum FutureEdgeDirection {
122 ParentToChild,
123 ChildToTarget,
124}
125
126struct FutureEdgeRelation {
127 target: EntityRef,
128 direction: FutureEdgeDirection,
129 current_edge: Option<EdgeKind>,
130}
131
132impl FutureEdgeRelation {
133 fn new(target: EntityRef, direction: FutureEdgeDirection) -> Self {
134 Self {
135 target,
136 direction,
137 current_edge: None,
138 }
139 }
140}
141
142impl<F> InstrumentedFuture<F> {
143 fn new(inner: F, future_handle: EntityHandle<FutureEntity>, target: Option<EntityRef>) -> Self {
144 let awaited_by = current_causal_target_from_stack().and_then(|parent| {
145 if parent.id().as_str() == future_handle.id().as_str() {
146 None
147 } else {
148 Some(FutureEdgeRelation::new(
149 parent,
150 FutureEdgeDirection::ParentToChild,
151 ))
152 }
153 });
154 let waits_on = target
155 .map(|target| FutureEdgeRelation::new(target, FutureEdgeDirection::ChildToTarget));
156 Self {
157 inner,
158 future_handle,
159 backtrace: super::capture_backtrace_id(),
160 awaited_by,
161 waits_on,
162 }
163 }
164
165 pub fn skip_entry_frames(self, n: u8) -> Self {
167 self.future_handle.mutate(|f| f.skip_entry_frames = Some(n));
168 self
169 }
170
171 pub fn on(mut self, target: EntityRef) -> Self {
173 self.waits_on = Some(FutureEdgeRelation::new(
174 target,
175 FutureEdgeDirection::ChildToTarget,
176 ));
177 self
178 }
179}
180
181fn transition_relation_edge(
182 future_id: &EntityId,
183 backtrace: BacktraceId,
184 relation: &mut FutureEdgeRelation,
185 next_edge: Option<EdgeKind>,
186) {
187 if relation.current_edge == next_edge {
188 return;
189 }
190
191 let (src, dst) = match relation.direction {
192 FutureEdgeDirection::ParentToChild => (
193 EntityId::new(relation.target.id().as_str()),
194 EntityId::new(future_id.as_str()),
195 ),
196 FutureEdgeDirection::ChildToTarget => (
197 EntityId::new(future_id.as_str()),
198 EntityId::new(relation.target.id().as_str()),
199 ),
200 };
201 if let Ok(mut db) = runtime_db().lock() {
202 if let Some(current_edge) = relation.current_edge {
203 db.remove_edge(&src, &dst, current_edge);
204 }
205 if let Some(edge) = next_edge {
206 db.upsert_edge(&src, &dst, edge, backtrace);
207 }
208 }
209 relation.current_edge = next_edge;
210}
211
212impl<F: Future> InstrumentedFuture<F> {
213 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<F::Output> {
214 let future_id = EntityId::new(self.future_handle.id().as_str());
215 if let Ok(mut db) = runtime_db().lock() {
216 let _ = db.link_entity_to_current_task_scope(&future_id);
217 }
218 FUTURE_CAUSAL_STACK.with(|stack| {
219 stack.borrow_mut().push(EntityId::new(future_id.as_str()));
220 });
221
222 if let Some(relation) = self.awaited_by.as_mut() {
223 transition_relation_edge(&future_id, self.backtrace, relation, Some(EdgeKind::Polls));
224 }
225 if let Some(relation) = self.waits_on.as_mut() {
226 transition_relation_edge(&future_id, self.backtrace, relation, Some(EdgeKind::Polls));
227 }
228
229 let poll = unsafe { Pin::new_unchecked(&mut self.inner) }.poll(cx);
230 FUTURE_CAUSAL_STACK.with(|stack| {
231 stack.borrow_mut().pop();
232 });
233
234 match poll {
235 Poll::Pending => {
236 if let Some(relation) = self.awaited_by.as_mut() {
237 transition_relation_edge(
238 &future_id,
239 self.backtrace,
240 relation,
241 Some(EdgeKind::WaitingOn),
242 );
243 }
244 if let Some(relation) = self.waits_on.as_mut() {
245 transition_relation_edge(
246 &future_id,
247 self.backtrace,
248 relation,
249 Some(EdgeKind::WaitingOn),
250 );
251 }
252 Poll::Pending
253 }
254 Poll::Ready(output) => {
255 if let Some(relation) = self.awaited_by.as_mut() {
256 transition_relation_edge(&future_id, self.backtrace, relation, None);
257 }
258 if let Some(relation) = self.waits_on.as_mut() {
259 transition_relation_edge(&future_id, self.backtrace, relation, None);
260 }
261 Poll::Ready(output)
262 }
263 }
264 }
265}
266
267impl<F> Future for InstrumentedFuture<F>
268where
269 F: Future,
270{
271 type Output = F::Output;
272
273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274 let this = unsafe { self.get_unchecked_mut() };
275 let has_stack = FUTURE_CAUSAL_STACK.try_with(|_| ()).is_ok();
276 if has_stack {
277 this.poll_inner(cx)
278 } else {
279 FUTURE_CAUSAL_STACK.sync_scope(RefCell::new(Vec::new()), || this.poll_inner(cx))
280 }
281 }
282}
283
284impl<F> Drop for InstrumentedFuture<F> {
285 fn drop(&mut self) {
286 let future_id = EntityId::new(self.future_handle.id().as_str());
287 if let Some(relation) = self.awaited_by.as_mut() {
288 transition_relation_edge(&future_id, self.backtrace, relation, None);
289 }
290 if let Some(relation) = self.waits_on.as_mut() {
291 transition_relation_edge(&future_id, self.backtrace, relation, None);
292 }
293 }
294}
295
296pub fn instrument_future<F>(
297 name: impl Into<String>,
298 fut: F,
299 on: Option<EntityRef>,
300 _meta: Option<facet_value::Value>,
301) -> InstrumentedFuture<F::IntoFuture>
302where
303 F: IntoFuture,
304{
305 let handle = EntityHandle::new(name, FutureEntity::default());
306 instrument_future_with_handle(handle, fut, on, None)
307}
308
309pub fn instrument_future_with_handle<F>(
310 handle: EntityHandle<FutureEntity>,
311 fut: F,
312 on: Option<EntityRef>,
313 _meta: Option<facet_value::Value>,
314) -> InstrumentedFuture<F::IntoFuture>
315where
316 F: IntoFuture,
317{
318 InstrumentedFuture::new(fut.into_future(), handle, on)
319}