async_inspect/runtime/
async_std.rs1use crate::inspector::Inspector;
7use crate::instrument::{clear_current_task_id, set_current_task_id};
8use crate::task::TaskId;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13
14pub fn spawn_tracked<F, T>(name: T, future: F) -> async_std::task::JoinHandle<F::Output>
34where
35 F: Future + Send + 'static,
36 F::Output: Send + 'static,
37 T: Into<String>,
38{
39 use crate::instrument::current_task_id;
40
41 let task_name = name.into();
42
43 let task_id = if let Some(parent_id) = current_task_id() {
45 Inspector::global().register_child_task(task_name, parent_id)
46 } else {
47 Inspector::global().register_task(task_name)
48 };
49
50 async_std::task::spawn(async move {
51 set_current_task_id(task_id);
53
54 let result = future.await;
56
57 Inspector::global().task_completed(task_id);
59
60 clear_current_task_id();
62
63 result
64 })
65}
66
67#[cfg(feature = "unstable")]
90pub fn spawn_local_tracked<F, T>(name: T, future: F) -> async_std::task::JoinHandle<F::Output>
91where
92 F: Future + 'static,
93 F::Output: 'static,
94 T: Into<String>,
95{
96 let task_name = name.into();
97 let task_id = Inspector::global().register_task(task_name);
98
99 async_std::task::spawn_local(async move {
100 set_current_task_id(task_id);
101
102 let result = future.await;
103
104 Inspector::global().task_completed(task_id);
105 clear_current_task_id();
106
107 result
108 })
109}
110
111pub struct TrackedFuture<F> {
113 future: F,
114 task_id: TaskId,
115 started: bool,
116 poll_start: Option<Instant>,
117}
118
119impl<F> TrackedFuture<F> {
120 pub fn new(future: F, name: String) -> Self {
122 let task_id = Inspector::global().register_task(name);
123
124 Self {
125 future,
126 task_id,
127 started: false,
128 poll_start: None,
129 }
130 }
131
132 pub fn task_id(&self) -> TaskId {
134 self.task_id
135 }
136}
137
138impl<F: Future> Future for TrackedFuture<F> {
139 type Output = F::Output;
140
141 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142 let this = unsafe { self.get_unchecked_mut() };
144
145 set_current_task_id(this.task_id);
147
148 if !this.started {
150 this.started = true;
151 }
152
153 let poll_start = Instant::now();
154 this.poll_start = Some(poll_start);
155
156 Inspector::global().poll_started(this.task_id);
157
158 let result = unsafe { Pin::new_unchecked(&mut this.future).poll(cx) };
161
162 let poll_duration = poll_start.elapsed();
164 Inspector::global().poll_ended(this.task_id, poll_duration);
165
166 match result {
167 Poll::Ready(output) => {
168 Inspector::global().task_completed(this.task_id);
170 clear_current_task_id();
171 Poll::Ready(output)
172 }
173 Poll::Pending => {
174 Poll::Pending
176 }
177 }
178 }
179}
180
181pub trait InspectExt: Future + Sized {
198 fn inspect(self, name: impl Into<String>) -> TrackedFuture<Self> {
200 TrackedFuture::new(self, name.into())
201 }
202
203 fn spawn_tracked(self, name: impl Into<String>) -> async_std::task::JoinHandle<Self::Output>
205 where
206 Self: Send + 'static,
207 Self::Output: Send + 'static,
208 {
209 spawn_tracked(name, self)
210 }
211}
212
213impl<F: Future> InspectExt for F {}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_spawn_tracked() {
222 async_std::task::block_on(async {
223 let handle = spawn_tracked("test_task", async { 42 });
224 let result = handle.await;
225 assert_eq!(result, 42);
226 });
227 }
228
229 #[test]
230 fn test_inspect_ext() {
231 async_std::task::block_on(async {
232 let result = async { 42 }.inspect("test_operation").await;
233 assert_eq!(result, 42);
234 });
235 }
236}