Skip to main content

tandem_observability/
lib.rs

1use chrono::{DateTime, Utc};
2use serde::Serialize;
3use std::fs;
4use std::path::{Path, PathBuf};
5use tracing::Level;
6use tracing_appender::non_blocking::WorkerGuard;
7use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
8
9#[derive(Debug, Clone, Copy, Serialize)]
10#[serde(rename_all = "snake_case")]
11pub enum ProcessKind {
12    Engine,
13    Desktop,
14    Tui,
15}
16
17impl ProcessKind {
18    pub fn as_str(self) -> &'static str {
19        match self {
20            ProcessKind::Engine => "engine",
21            ProcessKind::Desktop => "desktop",
22            ProcessKind::Tui => "tui",
23        }
24    }
25}
26
27#[derive(Debug, Clone, Serialize)]
28pub struct LoggingInitInfo {
29    pub process: String,
30    pub logs_dir: String,
31    pub prefix: String,
32    pub retention_days: u64,
33    pub initialized_at: DateTime<Utc>,
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct ObservabilityEvent<'a> {
38    pub event: &'a str,
39    pub component: &'a str,
40    pub correlation_id: Option<&'a str>,
41    pub session_id: Option<&'a str>,
42    pub run_id: Option<&'a str>,
43    pub message_id: Option<&'a str>,
44    pub provider_id: Option<&'a str>,
45    pub model_id: Option<&'a str>,
46    pub status: Option<&'a str>,
47    pub error_code: Option<&'a str>,
48    pub detail: Option<&'a str>,
49}
50
51pub fn redact_text(input: &str) -> String {
52    let trimmed = input.trim();
53    if trimmed.is_empty() {
54        return String::new();
55    }
56    format!(
57        "[redacted len={} sha256={}]",
58        trimmed.len(),
59        short_hash(trimmed)
60    )
61}
62
63pub fn short_hash(input: &str) -> String {
64    use std::hash::{Hash, Hasher};
65    let mut hasher = std::collections::hash_map::DefaultHasher::new();
66    input.hash(&mut hasher);
67    format!("{:016x}", hasher.finish())
68}
69
70pub fn emit_event(level: Level, process: ProcessKind, event: ObservabilityEvent<'_>) {
71    match level {
72        Level::ERROR => tracing::error!(
73            target: "tandem.obs",
74            process = process.as_str(),
75            component = event.component,
76            event = event.event,
77            correlation_id = event.correlation_id.unwrap_or(""),
78            session_id = event.session_id.unwrap_or(""),
79            run_id = event.run_id.unwrap_or(""),
80            message_id = event.message_id.unwrap_or(""),
81            provider_id = event.provider_id.unwrap_or(""),
82            model_id = event.model_id.unwrap_or(""),
83            status = event.status.unwrap_or(""),
84            error_code = event.error_code.unwrap_or(""),
85            detail = event.detail.unwrap_or(""),
86            "observability_event"
87        ),
88        Level::WARN => tracing::warn!(
89            target: "tandem.obs",
90            process = process.as_str(),
91            component = event.component,
92            event = event.event,
93            correlation_id = event.correlation_id.unwrap_or(""),
94            session_id = event.session_id.unwrap_or(""),
95            run_id = event.run_id.unwrap_or(""),
96            message_id = event.message_id.unwrap_or(""),
97            provider_id = event.provider_id.unwrap_or(""),
98            model_id = event.model_id.unwrap_or(""),
99            status = event.status.unwrap_or(""),
100            error_code = event.error_code.unwrap_or(""),
101            detail = event.detail.unwrap_or(""),
102            "observability_event"
103        ),
104        _ => tracing::info!(
105            target: "tandem.obs",
106            process = process.as_str(),
107            component = event.component,
108            event = event.event,
109            correlation_id = event.correlation_id.unwrap_or(""),
110            session_id = event.session_id.unwrap_or(""),
111            run_id = event.run_id.unwrap_or(""),
112            message_id = event.message_id.unwrap_or(""),
113            provider_id = event.provider_id.unwrap_or(""),
114            model_id = event.model_id.unwrap_or(""),
115            status = event.status.unwrap_or(""),
116            error_code = event.error_code.unwrap_or(""),
117            detail = event.detail.unwrap_or(""),
118            "observability_event"
119        ),
120    }
121}
122
123pub fn init_process_logging(
124    process: ProcessKind,
125    logs_dir: &Path,
126    retention_days: u64,
127) -> anyhow::Result<(WorkerGuard, LoggingInitInfo)> {
128    fs::create_dir_all(logs_dir)?;
129    cleanup_old_jsonl(logs_dir, process.as_str(), retention_days)?;
130
131    let file_appender = tracing_appender::rolling::Builder::new()
132        .rotation(tracing_appender::rolling::Rotation::DAILY)
133        .filename_prefix(format!("tandem.{}", process.as_str()))
134        .filename_suffix("jsonl")
135        .build(logs_dir)?;
136
137    let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
138
139    let file_layer = tracing_subscriber::fmt::layer()
140        .json()
141        .with_writer(non_blocking)
142        .with_ansi(false)
143        .with_current_span(false)
144        .with_span_list(false);
145
146    let console_layer = tracing_subscriber::fmt::layer()
147        .compact()
148        .with_target(true)
149        .with_ansi(true);
150
151    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
152
153    tracing_subscriber::registry()
154        .with(filter)
155        .with(console_layer)
156        .with(file_layer)
157        .try_init()
158        .ok();
159
160    let info = LoggingInitInfo {
161        process: process.as_str().to_string(),
162        logs_dir: logs_dir.display().to_string(),
163        prefix: format!("tandem.{}", process.as_str()),
164        retention_days,
165        initialized_at: Utc::now(),
166    };
167
168    Ok((guard, info))
169}
170
171fn cleanup_old_jsonl(logs_dir: &Path, process: &str, retention_days: u64) -> anyhow::Result<()> {
172    let cutoff = Utc::now() - chrono::Duration::days(retention_days as i64);
173    let prefix = format!("tandem.{}.", process);
174
175    for entry in fs::read_dir(logs_dir)? {
176        let Ok(entry) = entry else { continue };
177        let path = entry.path();
178        if !path.is_file() {
179            continue;
180        }
181
182        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
183            continue;
184        };
185
186        if !name.starts_with(&prefix) || !name.ends_with(".jsonl") {
187            continue;
188        }
189
190        // expected: tandem.<proc>.YYYY-MM-DD.jsonl
191        let date_part = name.trim_start_matches(&prefix).trim_end_matches(".jsonl");
192
193        let Ok(date) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") else {
194            continue;
195        };
196
197        let Some(dt) = date.and_hms_opt(0, 0, 0) else {
198            continue;
199        };
200
201        if DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc) < cutoff {
202            let _ = fs::remove_file(path);
203        }
204    }
205
206    Ok(())
207}
208
209pub fn canonical_logs_dir_from_root(root: &Path) -> PathBuf {
210    root.join("logs")
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn redact_text_masks_content() {
219        let raw = "super-secret-token-123";
220        let redacted = redact_text(raw);
221        assert!(redacted.contains("[redacted len="));
222        assert!(!redacted.contains("super-secret-token-123"));
223    }
224
225    #[test]
226    fn canonical_logs_dir_joins_logs_folder() {
227        let root = PathBuf::from("C:/tmp/tandem");
228        let logs = canonical_logs_dir_from_root(&root);
229        assert_eq!(logs, PathBuf::from("C:/tmp/tandem").join("logs"));
230    }
231}