xs/
trace.rs

1use tracing::span::{Attributes, Id};
2use tracing::{field::Visit, Event, Level, Subscriber};
3use tracing_subscriber::layer::Context;
4use tracing_subscriber::prelude::*;
5use tracing_subscriber::registry::LookupSpan;
6use tracing_subscriber::Layer;
7use tracing_subscriber::Registry;
8
9use chrono::{Local, Utc};
10use console::{style, Term};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15use crate::store::{FollowOption, ReadOptions, Store};
16
17const INITIAL_BACKOFF: Duration = Duration::from_secs(10);
18const MAX_BACKOFF: Duration = Duration::from_secs(1800); // 30 minutes
19
20#[derive(Debug, Clone)]
21struct TraceNode {
22    level: Level,
23    name: String,
24    parent_id: Option<Id>,
25    children: Vec<Child>,
26    module_path: Option<String>,
27    line: Option<u32>,
28    fields: HashMap<String, String>,
29    start_time: Option<Instant>,
30    took: Option<u128>, // Duration in microseconds
31}
32
33#[derive(Debug, Clone)]
34enum Child {
35    Event(TraceNode),
36    Span(Id),
37}
38
39impl Visit for TraceNode {
40    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
41        self.fields
42            .insert(field.name().to_string(), format!("{value:?}"));
43    }
44}
45
46impl TraceNode {
47    fn new(
48        level: Level,
49        name: String,
50        parent_id: Option<Id>,
51        module_path: Option<String>,
52        line: Option<u32>,
53    ) -> Self {
54        Self {
55            level,
56            name,
57            parent_id,
58            children: Vec::new(),
59            module_path,
60            line,
61            fields: HashMap::new(),
62            start_time: None,
63            took: None,
64        }
65    }
66
67    fn duration_text(&self) -> String {
68        match self.took {
69            Some(micros) if micros >= 1000 => format!("{ms}ms", ms = micros / 1000),
70            _ => String::new(),
71        }
72    }
73
74    fn format_message(&self) -> String {
75        let mut parts = Vec::new();
76
77        // Name is styled in cyan for spans (which have took value)
78        if self.took.is_some() {
79            parts.push(style(&self.name).cyan().to_string());
80        } else {
81            parts.push(self.name.clone());
82        }
83
84        // Message field doesn't get key=value treatment
85        if let Some(msg) = self.fields.get("message") {
86            parts.push(style(msg.trim_matches('"')).italic().to_string());
87        }
88
89        // Other fields get key=value format
90        let fields: String = self
91            .fields
92            .iter()
93            .filter(|(k, _)| *k != "message")
94            .map(|(k, v)| format!("{k}={value}", k = k, value = v.trim_matches('"')))
95            .collect::<Vec<_>>()
96            .join(" ");
97
98        if !fields.is_empty() {
99            parts.push(fields);
100        }
101
102        parts.join(" ")
103    }
104}
105
106#[derive(Clone)]
107pub struct HierarchicalSubscriber {
108    spans: Arc<Mutex<HashMap<Id, TraceNode>>>,
109    next_log_delta: Arc<Mutex<HashMap<Id, Duration>>>,
110}
111
112impl Default for HierarchicalSubscriber {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl HierarchicalSubscriber {
119    pub fn new() -> Self {
120        HierarchicalSubscriber {
121            spans: Arc::new(Mutex::new(HashMap::new())),
122            next_log_delta: Arc::new(Mutex::new(HashMap::new())),
123        }
124    }
125
126    fn format_trace_node(&self, node: &TraceNode, depth: usize, is_last: bool) -> String {
127        let now = Utc::now().with_timezone(&Local);
128        let formatted_time = now.format("%H:%M:%S%.3f").to_string();
129
130        // Format location info using module_path instead of file
131        let loc = if let Some(module_path) = &node.module_path {
132            if let Some(line) = node.line {
133                format!("{module_path}:{line}")
134            } else {
135                module_path.clone()
136            }
137        } else {
138            String::new()
139        };
140
141        // Build the tree visualization
142        let mut prefix = String::new();
143        if depth > 0 {
144            prefix.push_str(&"│   ".repeat(depth - 1));
145            prefix.push_str(if is_last { "└─ " } else { "├─ " });
146        }
147
148        // Format duration with proper alignment
149        let duration_text = format!("{dur:>7}", dur = node.duration_text());
150
151        // Build the message content
152        let mut message = format!(
153            "{time} {level:>5} {duration} {prefix}{msg}",
154            time = formatted_time,
155            level = node.level,
156            duration = duration_text,
157            prefix = prefix,
158            msg = node.format_message()
159        );
160
161        // Add right-aligned module path
162        let terminal_width = Term::stdout().size().1 as usize;
163        let content_width =
164            console::measure_text_width(&message) + console::measure_text_width(&loc);
165        let padding = " ".repeat(terminal_width.saturating_sub(content_width));
166        message.push_str(&padding);
167        message.push_str(&loc);
168
169        message
170    }
171
172    fn print_span_tree(&self, span_id: &Id, depth: usize, spans: &HashMap<Id, TraceNode>) {
173        if let Some(node) = spans.get(span_id) {
174            eprintln!("{}", self.format_trace_node(node, depth, false));
175            let children_count = node.children.len();
176            for (idx, child) in node.children.iter().enumerate() {
177                let is_last = idx == children_count - 1;
178                match child {
179                    Child::Event(event_node) => {
180                        eprintln!("{}", self.format_trace_node(event_node, depth + 1, is_last));
181                    }
182                    Child::Span(child_id) => {
183                        self.print_span_tree(child_id, depth + 1, spans);
184                    }
185                }
186            }
187        }
188    }
189
190    pub fn monitor_long_spans(&self) {
191        let spans = self.spans.lock().unwrap();
192        let mut next_log_delta = self.next_log_delta.lock().unwrap();
193        let now = Instant::now();
194        for (span_id, node) in spans.iter() {
195            if let Some(start_time) = node.start_time {
196                let next_delta = next_log_delta
197                    .entry(span_id.clone())
198                    .or_insert_with(|| INITIAL_BACKOFF);
199                if now >= start_time + *next_delta {
200                    eprintln!(
201                        "{}",
202                        self.format_trace_node_with_incomplete(
203                            node,
204                            now.duration_since(start_time)
205                        )
206                    );
207                    self.print_span_tree(span_id, 1, &spans);
208                    *next_delta = calculate_next_backoff(*next_delta);
209                }
210            }
211        }
212    }
213
214    fn format_trace_node_with_incomplete(&self, node: &TraceNode, duration: Duration) -> String {
215        let now = Utc::now().with_timezone(&Local);
216        let formatted_time = now.format("%H:%M:%S%.3f").to_string();
217
218        let loc = if let Some(module_path) = &node.module_path {
219            if let Some(line) = node.line {
220                format!("{module_path}:{line}")
221            } else {
222                module_path.clone()
223            }
224        } else {
225            String::new()
226        };
227
228        // Highlight incomplete spans
229        let duration_text = format!(
230            "{arrow}{millis:>7}ms",
231            arrow = style(">").yellow(),
232            millis = style(duration.as_millis()).yellow()
233        );
234
235        let mut message = format!(
236            "{time} {level:>5} {duration} {name}",
237            time = formatted_time,
238            level = node.level,
239            duration = duration_text,
240            name = style(&node.name).yellow()
241        );
242
243        let terminal_width = Term::stdout().size().1 as usize;
244        let content_width =
245            console::measure_text_width(&message) + console::measure_text_width(&loc);
246        let padding = " ".repeat(terminal_width.saturating_sub(content_width));
247        message.push_str(&padding);
248        message.push_str(&loc);
249
250        message
251    }
252}
253
254impl<S> Layer<S> for HierarchicalSubscriber
255where
256    S: Subscriber + for<'a> LookupSpan<'a>,
257{
258    fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
259        let mut spans = self.spans.lock().unwrap();
260        if let Some(node) = spans.get_mut(id) {
261            node.start_time = Some(Instant::now());
262        }
263    }
264
265    fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
266        let mut spans = self.spans.lock().unwrap();
267        if let Some(node) = spans.get_mut(id) {
268            if let Some(start_time) = node.start_time.take() {
269                let elapsed = start_time.elapsed().as_micros();
270                node.took = Some(node.took.unwrap_or(0) + elapsed);
271            }
272        }
273    }
274
275    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
276        let metadata = event.metadata();
277
278        let mut event_node = TraceNode::new(
279            *metadata.level(),
280            metadata.name().to_string(),
281            None,
282            metadata.module_path().map(ToString::to_string),
283            metadata.line(),
284        );
285
286        event.record(&mut event_node);
287
288        let mut spans = self.spans.lock().unwrap();
289
290        if let Some(span) = ctx.lookup_current() {
291            let id = span.id();
292            if let Some(parent_span) = spans.get_mut(&id) {
293                parent_span.children.push(Child::Event(event_node.clone()));
294            }
295        } else {
296            eprintln!("{}", self.format_trace_node(&event_node, 0, true));
297        }
298    }
299
300    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
301        let metadata = attrs.metadata();
302
303        let curr = ctx.current_span();
304        let parent_id = curr.id();
305
306        let mut node = TraceNode::new(
307            *metadata.level(),
308            metadata.name().to_string(),
309            parent_id.cloned(),
310            metadata.module_path().map(ToString::to_string),
311            metadata.line(),
312        );
313        attrs.record(&mut node);
314
315        let mut spans = self.spans.lock().unwrap();
316
317        if let Some(parent_id) = &parent_id {
318            if let Some(parent_node) = spans.get_mut(parent_id) {
319                parent_node.children.push(Child::Span(id.clone()));
320            }
321        }
322
323        spans.insert(id.clone(), node);
324    }
325
326    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
327        let spans = self.spans.lock().unwrap();
328        if let Some(node) = spans.get(&id) {
329            // Only print when a root span closes
330            if node.parent_id.is_none() {
331                self.print_span_tree(&id, 0, &spans);
332            }
333        } else {
334            eprintln!("DEBUG: No node found for closing span");
335        }
336    }
337}
338
339fn calculate_next_backoff(current_backoff: Duration) -> Duration {
340    if current_backoff > MAX_BACKOFF {
341        current_backoff + MAX_BACKOFF
342    } else {
343        current_backoff * 2
344    }
345}
346
347pub async fn log_stream(store: Store) {
348    let options = ReadOptions::builder()
349        .follow(FollowOption::On)
350        .tail(true)
351        .build();
352    let mut recver = store.read(options).await;
353    while let Some(frame) = recver.recv().await {
354        let now = Utc::now().with_timezone(&Local);
355        let formatted_time = now.format("%H:%M:%S%.3f").to_string();
356        let id = frame.id.to_string();
357        let id = &id[id.len() - 5..];
358        eprintln!("{} {:>5} {}", formatted_time, id, frame.topic);
359    }
360}
361
362pub fn init() {
363    let subscriber = HierarchicalSubscriber::new();
364
365    // Clone the subscriber for monitoring
366    let monitor_subscriber = Arc::new(subscriber.clone());
367    std::thread::spawn(move || loop {
368        std::thread::sleep(Duration::from_secs(1));
369        monitor_subscriber.monitor_long_spans();
370    });
371
372    // Register the subscriber directly
373    let registry = Registry::default().with(subscriber);
374    tracing::subscriber::set_global_default(registry).expect("setting tracing default failed");
375}