use tracing::span::{Attributes, Id};
use tracing::{field::Visit, Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
use tracing_subscriber::Registry;
use chrono::{Local, Utc};
use console::{style, Term};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::store::{FollowOption, ReadOptions, Store};
const INITIAL_BACKOFF: Duration = Duration::from_secs(10);
const MAX_BACKOFF: Duration = Duration::from_secs(1800);
#[derive(Debug, Clone)]
struct TraceNode {
level: Level,
name: String,
parent_id: Option<Id>,
children: Vec<Child>,
module_path: Option<String>,
line: Option<u32>,
fields: HashMap<String, String>,
start_time: Option<Instant>,
took: Option<u128>, }
#[derive(Debug, Clone)]
enum Child {
Event(TraceNode),
Span(Id),
}
impl Visit for TraceNode {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name().to_string(), format!("{value:?}"));
}
}
impl TraceNode {
fn new(
level: Level,
name: String,
parent_id: Option<Id>,
module_path: Option<String>,
line: Option<u32>,
) -> Self {
Self {
level,
name,
parent_id,
children: Vec::new(),
module_path,
line,
fields: HashMap::new(),
start_time: None,
took: None,
}
}
fn duration_text(&self) -> String {
match self.took {
Some(micros) if micros >= 1000 => format!("{ms}ms", ms = micros / 1000),
_ => String::new(),
}
}
fn format_message(&self) -> String {
let mut parts = Vec::new();
if self.took.is_some() {
parts.push(style(&self.name).cyan().to_string());
} else {
parts.push(self.name.clone());
}
if let Some(msg) = self.fields.get("message") {
parts.push(style(msg.trim_matches('"')).italic().to_string());
}
let fields: String = self
.fields
.iter()
.filter(|(k, _)| *k != "message")
.map(|(k, v)| format!("{k}={value}", k = k, value = v.trim_matches('"')))
.collect::<Vec<_>>()
.join(" ");
if !fields.is_empty() {
parts.push(fields);
}
parts.join(" ")
}
}
#[derive(Clone)]
pub struct HierarchicalSubscriber {
spans: Arc<Mutex<HashMap<Id, TraceNode>>>,
next_log_delta: Arc<Mutex<HashMap<Id, Duration>>>,
}
impl Default for HierarchicalSubscriber {
fn default() -> Self {
Self::new()
}
}
impl HierarchicalSubscriber {
pub fn new() -> Self {
HierarchicalSubscriber {
spans: Arc::new(Mutex::new(HashMap::new())),
next_log_delta: Arc::new(Mutex::new(HashMap::new())),
}
}
fn format_trace_node(&self, node: &TraceNode, depth: usize, is_last: bool) -> String {
let now = Utc::now().with_timezone(&Local);
let formatted_time = now.format("%H:%M:%S%.3f").to_string();
let loc = if let Some(module_path) = &node.module_path {
if let Some(line) = node.line {
format!("{module_path}:{line}")
} else {
module_path.clone()
}
} else {
String::new()
};
let mut prefix = String::new();
if depth > 0 {
prefix.push_str(&"│ ".repeat(depth - 1));
prefix.push_str(if is_last { "└─ " } else { "├─ " });
}
let duration_text = format!("{dur:>7}", dur = node.duration_text());
let mut message = format!(
"{time} {level:>5} {duration} {prefix}{msg}",
time = formatted_time,
level = node.level,
duration = duration_text,
prefix = prefix,
msg = node.format_message()
);
let terminal_width = Term::stdout().size().1 as usize;
let content_width =
console::measure_text_width(&message) + console::measure_text_width(&loc);
let padding = " ".repeat(terminal_width.saturating_sub(content_width));
message.push_str(&padding);
message.push_str(&loc);
message
}
fn print_span_tree(&self, span_id: &Id, depth: usize, spans: &HashMap<Id, TraceNode>) {
if let Some(node) = spans.get(span_id) {
eprintln!("{}", self.format_trace_node(node, depth, false));
let children_count = node.children.len();
for (idx, child) in node.children.iter().enumerate() {
let is_last = idx == children_count - 1;
match child {
Child::Event(event_node) => {
eprintln!("{}", self.format_trace_node(event_node, depth + 1, is_last));
}
Child::Span(child_id) => {
self.print_span_tree(child_id, depth + 1, spans);
}
}
}
}
}
pub fn monitor_long_spans(&self) {
let spans = self.spans.lock().unwrap();
let mut next_log_delta = self.next_log_delta.lock().unwrap();
let now = Instant::now();
for (span_id, node) in spans.iter() {
if let Some(start_time) = node.start_time {
let next_delta = next_log_delta
.entry(span_id.clone())
.or_insert_with(|| INITIAL_BACKOFF);
if now >= start_time + *next_delta {
eprintln!(
"{}",
self.format_trace_node_with_incomplete(
node,
now.duration_since(start_time)
)
);
self.print_span_tree(span_id, 1, &spans);
*next_delta = calculate_next_backoff(*next_delta);
}
}
}
}
fn format_trace_node_with_incomplete(&self, node: &TraceNode, duration: Duration) -> String {
let now = Utc::now().with_timezone(&Local);
let formatted_time = now.format("%H:%M:%S%.3f").to_string();
let loc = if let Some(module_path) = &node.module_path {
if let Some(line) = node.line {
format!("{module_path}:{line}")
} else {
module_path.clone()
}
} else {
String::new()
};
let duration_text = format!(
"{arrow}{millis:>7}ms",
arrow = style(">").yellow(),
millis = style(duration.as_millis()).yellow()
);
let mut message = format!(
"{time} {level:>5} {duration} {name}",
time = formatted_time,
level = node.level,
duration = duration_text,
name = style(&node.name).yellow()
);
let terminal_width = Term::stdout().size().1 as usize;
let content_width =
console::measure_text_width(&message) + console::measure_text_width(&loc);
let padding = " ".repeat(terminal_width.saturating_sub(content_width));
message.push_str(&padding);
message.push_str(&loc);
message
}
}
impl<S> Layer<S> for HierarchicalSubscriber
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) {
let mut spans = self.spans.lock().unwrap();
if let Some(node) = spans.get_mut(id) {
node.start_time = Some(Instant::now());
}
}
fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) {
let mut spans = self.spans.lock().unwrap();
if let Some(node) = spans.get_mut(id) {
if let Some(start_time) = node.start_time.take() {
let elapsed = start_time.elapsed().as_micros();
node.took = Some(node.took.unwrap_or(0) + elapsed);
}
}
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let metadata = event.metadata();
let mut event_node = TraceNode::new(
*metadata.level(),
metadata.name().to_string(),
None,
metadata.module_path().map(ToString::to_string),
metadata.line(),
);
event.record(&mut event_node);
let mut spans = self.spans.lock().unwrap();
if let Some(span) = ctx.lookup_current() {
let id = span.id();
if let Some(parent_span) = spans.get_mut(&id) {
parent_span.children.push(Child::Event(event_node.clone()));
}
} else {
eprintln!("{}", self.format_trace_node(&event_node, 0, true));
}
}
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let metadata = attrs.metadata();
let curr = ctx.current_span();
let parent_id = curr.id();
let mut node = TraceNode::new(
*metadata.level(),
metadata.name().to_string(),
parent_id.cloned(),
metadata.module_path().map(ToString::to_string),
metadata.line(),
);
attrs.record(&mut node);
let mut spans = self.spans.lock().unwrap();
if let Some(parent_id) = &parent_id {
if let Some(parent_node) = spans.get_mut(parent_id) {
parent_node.children.push(Child::Span(id.clone()));
}
}
spans.insert(id.clone(), node);
}
fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
let spans = self.spans.lock().unwrap();
if let Some(node) = spans.get(&id) {
if node.parent_id.is_none() {
self.print_span_tree(&id, 0, &spans);
}
} else {
eprintln!("DEBUG: No node found for closing span");
}
}
}
fn calculate_next_backoff(current_backoff: Duration) -> Duration {
if current_backoff > MAX_BACKOFF {
current_backoff + MAX_BACKOFF
} else {
current_backoff * 2
}
}
pub async fn log_stream(store: Store) {
let options = ReadOptions::builder()
.follow(FollowOption::On)
.new(true)
.build();
let mut recver = store.read(options).await;
while let Some(frame) = recver.recv().await {
let now = Utc::now().with_timezone(&Local);
let formatted_time = now.format("%H:%M:%S%.3f").to_string();
let id = frame.id.to_string();
let id = &id[id.len() - 5..];
eprintln!("{} {:>5} {}", formatted_time, id, frame.topic);
}
}
pub fn init() {
let subscriber = HierarchicalSubscriber::new();
let monitor_subscriber = Arc::new(subscriber.clone());
std::thread::spawn(move || loop {
std::thread::sleep(Duration::from_secs(1));
monitor_subscriber.monitor_long_spans();
});
let registry = Registry::default().with(subscriber);
tracing::subscriber::set_global_default(registry).expect("setting tracing default failed");
}