async_inspect/integrations/
tracing_layer.rs1use crate::inspector::Inspector;
7use crate::task::{TaskId, TaskState};
8use crate::timeline::EventKind;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11use tracing::span::{Attributes, Id};
12use tracing::{Event as TracingEvent, Subscriber};
13use tracing_subscriber::layer::{Context, Layer};
14use tracing_subscriber::registry::LookupSpan;
15
16pub struct AsyncInspectLayer {
32 span_map: Arc<Mutex<HashMap<Id, TaskId>>>,
33}
34
35impl AsyncInspectLayer {
36 #[must_use]
38 pub fn new() -> Self {
39 Self {
40 span_map: Arc::new(Mutex::new(HashMap::new())),
41 }
42 }
43}
44
45impl Default for AsyncInspectLayer {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl<S> Layer<S> for AsyncInspectLayer
52where
53 S: Subscriber + for<'a> LookupSpan<'a>,
54{
55 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
56 let metadata = attrs.metadata();
57 let name = metadata.name();
58
59 if metadata.is_span() && (name.starts_with("async") || name.contains("task")) {
61 let task_id = Inspector::global().register_task(name.to_string());
63
64 if let Ok(mut map) = self.span_map.lock() {
66 map.insert(id.clone(), task_id);
67 }
68
69 Inspector::global().add_event(
71 task_id,
72 EventKind::StateChanged {
73 old_state: TaskState::Pending,
74 new_state: TaskState::Pending,
75 },
76 );
77 }
78 }
79
80 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
81 if let Ok(map) = self.span_map.lock() {
82 if let Some(&task_id) = map.get(id) {
83 let old_state = Inspector::global()
85 .get_task(task_id)
86 .map_or(TaskState::Pending, |t| t.state);
87
88 Inspector::global().update_task_state(task_id, TaskState::Running);
89
90 Inspector::global().add_event(
92 task_id,
93 EventKind::StateChanged {
94 old_state,
95 new_state: TaskState::Running,
96 },
97 );
98 }
99 }
100 }
101
102 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
103 if let Ok(map) = self.span_map.lock() {
104 if let Some(&task_id) = map.get(id) {
105 let old_state = Inspector::global()
107 .get_task(task_id)
108 .map_or(TaskState::Running, |t| t.state);
109
110 if !matches!(old_state, TaskState::Completed | TaskState::Failed) {
112 Inspector::global().update_task_state(task_id, TaskState::Pending);
113
114 Inspector::global().add_event(
115 task_id,
116 EventKind::StateChanged {
117 old_state,
118 new_state: TaskState::Pending,
119 },
120 );
121 }
122 }
123 }
124 }
125
126 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
127 if let Ok(mut map) = self.span_map.lock() {
128 if let Some(task_id) = map.remove(&id) {
129 let duration = Inspector::global()
131 .get_task(task_id)
132 .map(|t| t.created_at.elapsed())
133 .unwrap_or_default();
134
135 Inspector::global().update_task_state(task_id, TaskState::Completed);
136
137 Inspector::global().add_event(task_id, EventKind::TaskCompleted { duration });
138 }
139 }
140 }
141
142 fn on_event(&self, event: &TracingEvent<'_>, _ctx: Context<'_, S>) {
143 let metadata = event.metadata();
145
146 if let Some(id) = _ctx.current_span().id() {
148 if let Ok(map) = self.span_map.lock() {
149 if let Some(&task_id) = map.get(id) {
150 Inspector::global().add_event(
151 task_id,
152 EventKind::InspectionPoint {
153 label: metadata.name().to_string(),
154 message: Some(format!("{event:?}")),
155 },
156 );
157 }
158 }
159 }
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn test_layer_creation() {
169 let _layer = AsyncInspectLayer::new();
170 }
171}