use std::panic::UnwindSafe;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter, Layer, Registry};
use crate::events::{emit_to_sinks, EngineEvent, EventSink};
struct GlobalForwardLayer(tracing::Dispatch);
impl<S: tracing::Subscriber> Layer<S> for GlobalForwardLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.0.event(event);
}
}
pub fn run_with_per_run_log<F: FnOnce() + UnwindSafe>(
run_id: &str,
log_dir: PathBuf,
sinks: Vec<Arc<dyn EventSink>>,
f: F,
) {
let (subscriber, guard) = setup_per_run_tracing(run_id, &log_dir);
let panic_payload = tracing::subscriber::with_default(subscriber, || {
let result = std::panic::catch_unwind(f);
match result {
Ok(()) => None,
Err(payload) => {
let msg = extract_panic_msg(&payload);
let bt = std::backtrace::Backtrace::force_capture();
tracing::error!(run_id = %run_id, "engine panic: {msg}\n{bt}");
let event = EngineEvent::Panicked {
message: msg,
backtrace: bt.to_string(),
};
emit_to_sinks(run_id, event, &sinks);
Some(payload)
}
}
});
drop(guard);
if let Some(payload) = panic_payload {
std::panic::resume_unwind(payload);
}
}
fn setup_per_run_tracing(
run_id: &str,
log_dir: &Path,
) -> (impl tracing::Subscriber + Send + Sync, WorkerGuard) {
let _ = std::fs::create_dir_all(log_dir);
let log_basename = match ulid::Ulid::from_string(run_id) {
Ok(_) => format!("{run_id}.log"),
Err(e) => {
tracing::warn!(
run_id = %run_id,
"diagnostics: invalid run_id, using fallback log path: {e}"
);
"invalid-run-id.log".to_string()
}
};
let log_path = log_dir.join(log_basename);
let writer: Box<dyn std::io::Write + Send + 'static> = match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
{
Ok(f) => Box::new(f),
Err(e) => {
tracing::warn!(run_id = %run_id, "diagnostics: cannot open log file {}: {e}", log_path.display());
Box::new(std::io::sink())
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(writer);
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let global_dispatch = tracing::dispatcher::get_default(|d| d.clone());
let subscriber = Registry::default()
.with(filter)
.with(fmt::Layer::new().with_writer(non_blocking).with_ansi(false))
.with(GlobalForwardLayer(global_dispatch));
(subscriber, guard)
}
fn extract_panic_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic payload".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn log_file_created_on_normal_run() {
let dir = tempfile::tempdir().unwrap();
let run_id = ulid::Ulid::new().to_string();
let log_dir = dir.path().join("workflow-logs");
let run_id_clone = run_id.clone();
run_with_per_run_log(&run_id, log_dir.clone(), vec![], move || {
tracing::info!(run_id = %run_id_clone, "normal engine event");
});
let log_path = log_dir.join(format!("{run_id}.log"));
assert!(
log_path.exists(),
"per-run log file should be created on normal run"
);
}
#[test]
fn unwritable_log_dir_does_not_panic() {
let dir = tempfile::tempdir().unwrap();
let blocker_file = dir.path().join("not-a-dir");
std::fs::write(&blocker_file, b"").unwrap();
let log_dir = blocker_file.join("workflow-logs");
let run_id = ulid::Ulid::new().to_string();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
run_with_per_run_log(&run_id, log_dir, vec![], || {
panic!("no log file but still panicking")
});
}));
assert!(result.is_err(), "panic must still propagate");
}
}