1use tracing::span::{Attributes, Id};
2use tracing::{field::Visit, Event, Level, Subscriber};
3use tracing_subscriber::layer::Context;
4use tracing_subscriber::prelude::*;
5use tracing_subscriber::registry::LookupSpan;
6use tracing_subscriber::Layer;
7use tracing_subscriber::Registry;
8
9use chrono::{Local, Utc};
10use console::{style, Term};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15use crate::store::{FollowOption, ReadOptions, Store};
16
17const INITIAL_BACKOFF: Duration = Duration::from_secs(10);
18const MAX_BACKOFF: Duration = Duration::from_secs(1800); #[derive(Debug, Clone)]
21struct TraceNode {
22 level: Level,
23 name: String,
24 parent_id: Option<Id>,
25 children: Vec<Child>,
26 module_path: Option<String>,
27 line: Option<u32>,
28 fields: HashMap<String, String>,
29 start_time: Option<Instant>,
30 took: Option<u128>, }
32
33#[derive(Debug, Clone)]
34enum Child {
35 Event(TraceNode),
36 Span(Id),
37}
38
39impl Visit for TraceNode {
40 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
41 self.fields
42 .insert(field.name().to_string(), format!("{value:?}"));
43 }
44}
45
46impl TraceNode {
47 fn new(
48 level: Level,
49 name: String,
50 parent_id: Option<Id>,
51 module_path: Option<String>,
52 line: Option<u32>,
53 ) -> Self {
54 Self {
55 level,
56 name,
57 parent_id,
58 children: Vec::new(),
59 module_path,
60 line,
61 fields: HashMap::new(),
62 start_time: None,
63 took: None,
64 }
65 }
66
67 fn duration_text(&self) -> String {
68 match self.took {
69 Some(micros) if micros >= 1000 => format!("{ms}ms", ms = micros / 1000),
70 _ => String::new(),
71 }
72 }
73
74 fn format_message(&self) -> String {
75 let mut parts = Vec::new();
76
77 if self.took.is_some() {
79 parts.push(style(&self.name).cyan().to_string());
80 } else {
81 parts.push(self.name.clone());
82 }
83
84 if let Some(msg) = self.fields.get("message") {
86 parts.push(style(msg.trim_matches('"')).italic().to_string());
87 }
88
89 let fields: String = self
91 .fields
92 .iter()
93 .filter(|(k, _)| *k != "message")
94 .map(|(k, v)| format!("{k}={value}", k = k, value = v.trim_matches('"')))
95 .collect::<Vec<_>>()
96 .join(" ");
97
98 if !fields.is_empty() {
99 parts.push(fields);
100 }
101
102 parts.join(" ")
103 }
104}
105
106#[derive(Clone)]
107pub struct HierarchicalSubscriber {
108 spans: Arc<Mutex<HashMap<Id, TraceNode>>>,
109 next_log_delta: Arc<Mutex<HashMap<Id, Duration>>>,
110}
111
112impl Default for HierarchicalSubscriber {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl HierarchicalSubscriber {
119 pub fn new() -> Self {
120 HierarchicalSubscriber {
121 spans: Arc::new(Mutex::new(HashMap::new())),
122 next_log_delta: Arc::new(Mutex::new(HashMap::new())),
123 }
124 }
125
126 fn format_trace_node(&self, node: &TraceNode, depth: usize, is_last: bool) -> String {
127 let now = Utc::now().with_timezone(&Local);
128 let formatted_time = now.format("%H:%M:%S%.3f").to_string();
129
130 let loc = if let Some(module_path) = &node.module_path {
132 if let Some(line) = node.line {
133 format!("{module_path}:{line}")
134 } else {
135 module_path.clone()
136 }
137 } else {
138 String::new()
139 };
140
141 let mut prefix = String::new();
143 if depth > 0 {
144 prefix.push_str(&"│ ".repeat(depth - 1));
145 prefix.push_str(if is_last { "└─ " } else { "├─ " });
146 }
147
148 let duration_text = format!("{dur:>7}", dur = node.duration_text());
150
151 let mut message = format!(
153 "{time} {level:>5} {duration} {prefix}{msg}",
154 time = formatted_time,
155 level = node.level,
156 duration = duration_text,
157 prefix = prefix,
158 msg = node.format_message()
159 );
160
161 let terminal_width = Term::stdout().size().1 as usize;
163 let content_width =
164 console::measure_text_width(&message) + console::measure_text_width(&loc);
165 let padding = " ".repeat(terminal_width.saturating_sub(content_width));
166 message.push_str(&padding);
167 message.push_str(&loc);
168
169 message
170 }
171
172 fn print_span_tree(&self, span_id: &Id, depth: usize, spans: &HashMap<Id, TraceNode>) {
173 if let Some(node) = spans.get(span_id) {
174 eprintln!("{}", self.format_trace_node(node, depth, false));
175 let children_count = node.children.len();
176 for (idx, child) in node.children.iter().enumerate() {
177 let is_last = idx == children_count - 1;
178 match child {
179 Child::Event(event_node) => {
180 eprintln!("{}", self.format_trace_node(event_node, depth + 1, is_last));
181 }
182 Child::Span(child_id) => {
183 self.print_span_tree(child_id, depth + 1, spans);
184 }
185 }
186 }
187 }
188 }
189
190 pub fn monitor_long_spans(&self) {
191 let spans = self.spans.lock().unwrap();
192 let mut next_log_delta = self.next_log_delta.lock().unwrap();
193 let now = Instant::now();
194 for (span_id, node) in spans.iter() {
195 if let Some(start_time) = node.start_time {
196 let next_delta = next_log_delta
197 .entry(span_id.clone())
198 .or_insert_with(|| INITIAL_BACKOFF);
199 if now >= start_time + *next_delta {
200 eprintln!(
201 "{}",
202 self.format_trace_node_with_incomplete(
203 node,
204 now.duration_since(start_time)
205 )
206 );
207 self.print_span_tree(span_id, 1, &spans);
208 *next_delta = calculate_next_backoff(*next_delta);
209 }
210 }
211 }
212 }
213
214 fn format_trace_node_with_incomplete(&self, node: &TraceNode, duration: Duration) -> String {
215 let now = Utc::now().with_timezone(&Local);
216 let formatted_time = now.format("%H:%M:%S%.3f").to_string();
217
218 let loc = if let Some(module_path) = &node.module_path {
219 if let Some(line) = node.line {
220 format!("{module_path}:{line}")
221 } else {
222 module_path.clone()
223 }
224 } else {
225 String::new()
226 };
227
228 let duration_text = format!(
230 "{arrow}{millis:>7}ms",
231 arrow = style(">").yellow(),
232 millis = style(duration.as_millis()).yellow()
233 );
234
235 let mut message = format!(
236 "{time} {level:>5} {duration} {name}",
237 time = formatted_time,
238 level = node.level,
239 duration = duration_text,
240 name = style(&node.name).yellow()
241 );
242
243 let terminal_width = Term::stdout().size().1 as usize;
244 let content_width =
245 console::measure_text_width(&message) + console::measure_text_width(&loc);
246 let padding = " ".repeat(terminal_width.saturating_sub(content_width));
247 message.push_str(&padding);
248 message.push_str(&loc);
249
250 message
251 }
252}
253
254impl<S> Layer<S> for HierarchicalSubscriber
255where
256 S: Subscriber + for<'a> LookupSpan<'a>,
257{
258 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
259 let mut spans = self.spans.lock().unwrap();
260 if let Some(node) = spans.get_mut(id) {
261 node.start_time = Some(Instant::now());
262 }
263 }
264
265 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
266 let mut spans = self.spans.lock().unwrap();
267 if let Some(node) = spans.get_mut(id) {
268 if let Some(start_time) = node.start_time.take() {
269 let elapsed = start_time.elapsed().as_micros();
270 node.took = Some(node.took.unwrap_or(0) + elapsed);
271 }
272 }
273 }
274
275 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
276 let metadata = event.metadata();
277
278 let mut event_node = TraceNode::new(
279 *metadata.level(),
280 metadata.name().to_string(),
281 None,
282 metadata.module_path().map(ToString::to_string),
283 metadata.line(),
284 );
285
286 event.record(&mut event_node);
287
288 let mut spans = self.spans.lock().unwrap();
289
290 if let Some(span) = ctx.lookup_current() {
291 let id = span.id();
292 if let Some(parent_span) = spans.get_mut(&id) {
293 parent_span.children.push(Child::Event(event_node.clone()));
294 }
295 } else {
296 eprintln!("{}", self.format_trace_node(&event_node, 0, true));
297 }
298 }
299
300 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
301 let metadata = attrs.metadata();
302
303 let curr = ctx.current_span();
304 let parent_id = curr.id();
305
306 let mut node = TraceNode::new(
307 *metadata.level(),
308 metadata.name().to_string(),
309 parent_id.cloned(),
310 metadata.module_path().map(ToString::to_string),
311 metadata.line(),
312 );
313 attrs.record(&mut node);
314
315 let mut spans = self.spans.lock().unwrap();
316
317 if let Some(parent_id) = &parent_id {
318 if let Some(parent_node) = spans.get_mut(parent_id) {
319 parent_node.children.push(Child::Span(id.clone()));
320 }
321 }
322
323 spans.insert(id.clone(), node);
324 }
325
326 fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
327 let spans = self.spans.lock().unwrap();
328 if let Some(node) = spans.get(&id) {
329 if node.parent_id.is_none() {
331 self.print_span_tree(&id, 0, &spans);
332 }
333 } else {
334 eprintln!("DEBUG: No node found for closing span");
335 }
336 }
337}
338
339fn calculate_next_backoff(current_backoff: Duration) -> Duration {
340 if current_backoff > MAX_BACKOFF {
341 current_backoff + MAX_BACKOFF
342 } else {
343 current_backoff * 2
344 }
345}
346
347pub async fn log_stream(store: Store) {
348 let options = ReadOptions::builder()
349 .follow(FollowOption::On)
350 .tail(true)
351 .build();
352 let mut recver = store.read(options).await;
353 while let Some(frame) = recver.recv().await {
354 let now = Utc::now().with_timezone(&Local);
355 let formatted_time = now.format("%H:%M:%S%.3f").to_string();
356 let id = frame.id.to_string();
357 let id = &id[id.len() - 5..];
358 eprintln!("{} {:>5} {}", formatted_time, id, frame.topic);
359 }
360}
361
362pub fn init() {
363 let subscriber = HierarchicalSubscriber::new();
364
365 let monitor_subscriber = Arc::new(subscriber.clone());
367 std::thread::spawn(move || loop {
368 std::thread::sleep(Duration::from_secs(1));
369 monitor_subscriber.monitor_long_spans();
370 });
371
372 let registry = Registry::default().with(subscriber);
374 tracing::subscriber::set_global_default(registry).expect("setting tracing default failed");
375}