async_inspect/runtime/
smol.rs1use crate::inspector::Inspector;
7use crate::instrument::{clear_current_task_id, set_current_task_id};
8use crate::task::TaskId;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13
14pub fn spawn_tracked<F, T>(name: T, future: F) -> smol::Task<F::Output>
34where
35 F: Future + Send + 'static,
36 F::Output: Send + 'static,
37 T: Into<String>,
38{
39 use crate::instrument::current_task_id;
40
41 let task_name = name.into();
42
43 let task_id = if let Some(parent_id) = current_task_id() {
45 Inspector::global().register_child_task(task_name, parent_id)
46 } else {
47 Inspector::global().register_task(task_name)
48 };
49
50 smol::spawn(async move {
51 set_current_task_id(task_id);
53
54 let result = future.await;
56
57 Inspector::global().task_completed(task_id);
59
60 clear_current_task_id();
62
63 result
64 })
65}
66
67pub struct TrackedFuture<F> {
69 future: F,
70 task_id: TaskId,
71 started: bool,
72 poll_start: Option<Instant>,
73}
74
75impl<F> TrackedFuture<F> {
76 pub fn new(future: F, name: String) -> Self {
78 let task_id = Inspector::global().register_task(name);
79
80 Self {
81 future,
82 task_id,
83 started: false,
84 poll_start: None,
85 }
86 }
87
88 pub fn task_id(&self) -> TaskId {
90 self.task_id
91 }
92}
93
94impl<F: Future> Future for TrackedFuture<F> {
95 type Output = F::Output;
96
97 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
98 let this = unsafe { self.get_unchecked_mut() };
100
101 set_current_task_id(this.task_id);
103
104 if !this.started {
106 this.started = true;
107 }
108
109 let poll_start = Instant::now();
110 this.poll_start = Some(poll_start);
111
112 Inspector::global().poll_started(this.task_id);
113
114 let result = unsafe { Pin::new_unchecked(&mut this.future).poll(cx) };
117
118 let poll_duration = poll_start.elapsed();
120 Inspector::global().poll_ended(this.task_id, poll_duration);
121
122 match result {
123 Poll::Ready(output) => {
124 Inspector::global().task_completed(this.task_id);
126 clear_current_task_id();
127 Poll::Ready(output)
128 }
129 Poll::Pending => {
130 Poll::Pending
132 }
133 }
134 }
135}
136
137pub trait InspectExt: Future + Sized {
154 fn inspect(self, name: impl Into<String>) -> TrackedFuture<Self> {
156 TrackedFuture::new(self, name.into())
157 }
158
159 fn spawn_tracked(self, name: impl Into<String>) -> smol::Task<Self::Output>
161 where
162 Self: Send + 'static,
163 Self::Output: Send + 'static,
164 {
165 spawn_tracked(name, self)
166 }
167}
168
169impl<F: Future> InspectExt for F {}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn test_spawn_tracked() {
178 smol::block_on(async {
179 let task = spawn_tracked("test_task", async { 42 });
180 let result = task.await;
181 assert_eq!(result, 42);
182 });
183 }
184
185 #[test]
186 fn test_inspect_ext() {
187 smol::block_on(async {
188 let result = async { 42 }.inspect("test_operation").await;
189 assert_eq!(result, 42);
190 });
191 }
192}