Skip to main content

sc_observability/
lib.rs

1//! Shared observability primitives for ATM ecosystem tools.
2//!
3//! AH.1 scope:
4//! - `Logger` + `emit()` with JSONL rotation
5//! - `LogConfig` with environment-driven defaults
6//! - spool write/merge semantics
7//! - socket error-code constants for the `log-event` contract
8
9use agent_team_mail_core::logging_event::{LogEventV1, ValidationError};
10use std::fs::{self, OpenOptions};
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14use std::sync::{Arc, Mutex, OnceLock};
15use std::time::Duration;
16use thiserror::Error;
17
18pub const DEFAULT_QUEUE_CAPACITY: usize = 4096;
19pub const DEFAULT_MAX_EVENT_BYTES: usize = 64 * 1024;
20pub const DEFAULT_MAX_BYTES: u64 = 50 * 1024 * 1024;
21pub const DEFAULT_MAX_FILES: u32 = 5;
22pub const DEFAULT_RETENTION_DAYS: u32 = 7;
23pub const DEFAULT_OTEL_MAX_RETRIES: u32 = 2;
24pub const DEFAULT_OTEL_INITIAL_BACKOFF_MS: u64 = 25;
25pub const DEFAULT_OTEL_MAX_BACKOFF_MS: u64 = 250;
26
27pub const SOCKET_ERROR_VERSION_MISMATCH: &str = "VERSION_MISMATCH";
28pub const SOCKET_ERROR_INVALID_PAYLOAD: &str = "INVALID_PAYLOAD";
29pub const SOCKET_ERROR_INTERNAL_ERROR: &str = "INTERNAL_ERROR";
30
31#[derive(Debug, Clone)]
32pub struct OtelConfig {
33    pub enabled: bool,
34    pub max_retries: u32,
35    pub initial_backoff_ms: u64,
36    pub max_backoff_ms: u64,
37}
38
39impl Default for OtelConfig {
40    fn default() -> Self {
41        Self {
42            enabled: true,
43            max_retries: DEFAULT_OTEL_MAX_RETRIES,
44            initial_backoff_ms: DEFAULT_OTEL_INITIAL_BACKOFF_MS,
45            max_backoff_ms: DEFAULT_OTEL_MAX_BACKOFF_MS,
46        }
47    }
48}
49
50impl OtelConfig {
51    pub fn from_env() -> Self {
52        let mut cfg = Self::default();
53
54        if let Ok(raw) = std::env::var("ATM_OTEL_ENABLED") {
55            let norm = raw.trim().to_ascii_lowercase();
56            cfg.enabled = !matches!(norm.as_str(), "0" | "false" | "off" | "no");
57        }
58        if let Ok(raw) = std::env::var("ATM_OTEL_MAX_RETRIES")
59            && let Ok(parsed) = raw.parse::<u32>()
60        {
61            cfg.max_retries = parsed;
62        }
63        if let Ok(raw) = std::env::var("ATM_OTEL_INITIAL_BACKOFF_MS")
64            && let Ok(parsed) = raw.parse::<u64>()
65        {
66            cfg.initial_backoff_ms = parsed;
67        }
68        if let Ok(raw) = std::env::var("ATM_OTEL_MAX_BACKOFF_MS")
69            && let Ok(parsed) = raw.parse::<u64>()
70        {
71            cfg.max_backoff_ms = parsed;
72        }
73        if cfg.max_backoff_ms < cfg.initial_backoff_ms {
74            cfg.max_backoff_ms = cfg.initial_backoff_ms;
75        }
76        cfg
77    }
78}
79
80#[derive(Debug, Error, PartialEq, Eq)]
81pub enum OtelError {
82    #[error("missing required correlation field '{field}'")]
83    MissingRequiredField { field: &'static str },
84    #[error(
85        "invalid span context: trace_id and span_id must either both be present or both be absent"
86    )]
87    InvalidSpanContext,
88    #[error("export failed: {0}")]
89    ExportFailed(String),
90}
91
92pub trait OtelExporter: Send + Sync {
93    fn export(&self, record: &OtelRecord) -> Result<(), OtelError>;
94}
95
96#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
97pub struct OtelRecord {
98    pub name: String,
99    pub trace_id: Option<String>,
100    pub span_id: Option<String>,
101    pub attributes: serde_json::Map<String, serde_json::Value>,
102}
103
104#[derive(Debug, Clone)]
105pub struct FileOtelExporter {
106    path: PathBuf,
107}
108
109impl FileOtelExporter {
110    pub fn new(path: PathBuf) -> Self {
111        Self { path }
112    }
113}
114
115impl OtelExporter for FileOtelExporter {
116    fn export(&self, record: &OtelRecord) -> Result<(), OtelError> {
117        if let Some(parent) = self.path.parent()
118            && let Err(err) = fs::create_dir_all(parent)
119        {
120            return Err(OtelError::ExportFailed(err.to_string()));
121        }
122        let mut file = OpenOptions::new()
123            .create(true)
124            .append(true)
125            .open(&self.path)
126            .map_err(|err| OtelError::ExportFailed(err.to_string()))?;
127        let line = serde_json::to_string(record)
128            .map_err(|err| OtelError::ExportFailed(err.to_string()))?;
129        writeln!(file, "{line}").map_err(|err| OtelError::ExportFailed(err.to_string()))
130    }
131}
132
133#[derive(Clone)]
134struct OtelPipeline {
135    config: OtelConfig,
136    exporter: Arc<dyn OtelExporter>,
137    sleeper: fn(Duration),
138}
139
140impl std::fmt::Debug for OtelPipeline {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("OtelPipeline")
143            .field("config", &self.config)
144            .finish()
145    }
146}
147
148impl OtelPipeline {
149    fn new_default(log_path: &Path) -> Self {
150        let mut otel_path = log_path.to_path_buf();
151        let stem = log_path
152            .file_stem()
153            .and_then(|s| s.to_str())
154            .unwrap_or("telemetry");
155        otel_path.set_file_name(format!("{stem}.otel.jsonl"));
156        Self {
157            config: OtelConfig::from_env(),
158            exporter: Arc::new(FileOtelExporter::new(otel_path)),
159            sleeper: std::thread::sleep,
160        }
161    }
162
163    fn export_event(&self, event: &LogEventV1) -> Result<(), OtelError> {
164        export_otel_with_retry(event, &self.config, self.exporter.as_ref(), self.sleeper)
165    }
166}
167
168fn export_otel_with_retry(
169    event: &LogEventV1,
170    config: &OtelConfig,
171    exporter: &dyn OtelExporter,
172    sleeper: fn(Duration),
173) -> Result<(), OtelError> {
174    if !config.enabled {
175        return Ok(());
176    }
177    let record = build_otel_record(event)?;
178
179    let mut attempt: u32 = 0;
180    let mut backoff = config.initial_backoff_ms;
181    loop {
182        match exporter.export(&record) {
183            Ok(()) => return Ok(()),
184            Err(err) => {
185                if attempt >= config.max_retries {
186                    return Err(err);
187                }
188                sleeper(Duration::from_millis(backoff));
189                backoff = backoff.saturating_mul(2).min(config.max_backoff_ms);
190                attempt = attempt.saturating_add(1);
191            }
192        }
193    }
194}
195
196/// Export to OTel without allowing exporter failures to fail the caller.
197///
198/// This is the public fail-open helper for producer-only code paths that do
199/// not own a full [`Logger`] instance.
200pub fn export_otel_best_effort(
201    event: &LogEventV1,
202    config: &OtelConfig,
203    exporter: &dyn OtelExporter,
204) {
205    let _ = export_otel_with_retry(event, config, exporter, std::thread::sleep);
206}
207
208fn build_otel_record(event: &LogEventV1) -> Result<OtelRecord, OtelError> {
209    let runtime_scoped = event.team.is_some()
210        || event.agent.is_some()
211        || event.runtime.is_some()
212        || event.session_id.is_some();
213    if runtime_scoped {
214        for (field, value) in [
215            ("team", event.team.as_deref()),
216            ("agent", event.agent.as_deref()),
217            ("runtime", event.runtime.as_deref()),
218            ("session_id", event.session_id.as_deref()),
219        ] {
220            if value.is_none_or(|v| v.trim().is_empty()) {
221                return Err(OtelError::MissingRequiredField { field });
222            }
223        }
224    }
225
226    let has_trace = event
227        .trace_id
228        .as_deref()
229        .is_some_and(|value| !value.trim().is_empty());
230    let has_span = event
231        .span_id
232        .as_deref()
233        .is_some_and(|value| !value.trim().is_empty());
234    if has_trace != has_span {
235        return Err(OtelError::InvalidSpanContext);
236    }
237
238    let subagent_scoped = event.subagent_id.is_some() || event.action.starts_with("subagent.");
239    if subagent_scoped {
240        if event
241            .subagent_id
242            .as_deref()
243            .is_none_or(|value| value.trim().is_empty())
244        {
245            return Err(OtelError::MissingRequiredField {
246                field: "subagent_id",
247            });
248        }
249        for (field, value) in [
250            ("team", event.team.as_deref()),
251            ("agent", event.agent.as_deref()),
252            ("runtime", event.runtime.as_deref()),
253            ("session_id", event.session_id.as_deref()),
254            ("trace_id", event.trace_id.as_deref()),
255            ("span_id", event.span_id.as_deref()),
256        ] {
257            if value.is_none_or(|v| v.trim().is_empty()) {
258                return Err(OtelError::MissingRequiredField { field });
259            }
260        }
261    }
262
263    let mut attributes = serde_json::Map::new();
264    if let Some(team) = event.team.as_ref() {
265        attributes.insert("team".to_string(), serde_json::Value::String(team.clone()));
266    }
267    if let Some(agent) = event.agent.as_ref() {
268        attributes.insert(
269            "agent".to_string(),
270            serde_json::Value::String(agent.clone()),
271        );
272    }
273    if let Some(runtime) = event.runtime.as_ref() {
274        attributes.insert(
275            "runtime".to_string(),
276            serde_json::Value::String(runtime.clone()),
277        );
278    }
279    if let Some(session_id) = event.session_id.as_ref() {
280        attributes.insert(
281            "session_id".to_string(),
282            serde_json::Value::String(session_id.clone()),
283        );
284    }
285    if let Some(subagent_id) = event.subagent_id.as_ref() {
286        attributes.insert(
287            "subagent_id".to_string(),
288            serde_json::Value::String(subagent_id.clone()),
289        );
290    }
291    attributes.insert(
292        "source_binary".to_string(),
293        serde_json::Value::String(event.source_binary.clone()),
294    );
295    attributes.insert(
296        "target".to_string(),
297        serde_json::Value::String(event.target.clone()),
298    );
299    attributes.insert(
300        "action".to_string(),
301        serde_json::Value::String(event.action.clone()),
302    );
303
304    Ok(OtelRecord {
305        name: event.action.clone(),
306        trace_id: event.trace_id.clone(),
307        span_id: event.span_id.clone(),
308        attributes,
309    })
310}
311
312pub use agent_team_mail_core::logging_event::SpanRefV1;
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub enum LogLevel {
316    Trace,
317    Debug,
318    Info,
319    Warn,
320    Error,
321}
322
323impl LogLevel {
324    pub fn as_str(self) -> &'static str {
325        match self {
326            Self::Trace => "trace",
327            Self::Debug => "debug",
328            Self::Info => "info",
329            Self::Warn => "warn",
330            Self::Error => "error",
331        }
332    }
333}
334
335impl FromStr for LogLevel {
336    type Err = ();
337
338    fn from_str(s: &str) -> Result<Self, Self::Err> {
339        match s.trim().to_ascii_lowercase().as_str() {
340            "trace" => Ok(Self::Trace),
341            "debug" => Ok(Self::Debug),
342            "info" => Ok(Self::Info),
343            "warn" => Ok(Self::Warn),
344            "error" => Ok(Self::Error),
345            _ => Err(()),
346        }
347    }
348}
349
350#[derive(Debug, Clone)]
351pub struct LogConfig {
352    pub log_path: PathBuf,
353    pub spool_dir: PathBuf,
354    pub level: LogLevel,
355    pub message_preview_enabled: bool,
356    pub max_bytes: u64,
357    pub max_files: u32,
358    pub retention_days: u32,
359    pub queue_capacity: usize,
360    pub max_event_bytes: usize,
361}
362
363impl LogConfig {
364    fn normalize_tool_name(tool: &str) -> String {
365        let trimmed = tool.trim();
366        if trimmed.is_empty() {
367            return "atm".to_string();
368        }
369        trimmed
370            .chars()
371            .map(|ch| {
372                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
373                    ch
374                } else {
375                    '_'
376                }
377            })
378            .collect()
379    }
380
381    fn canonical_log_path(home_dir: &Path, tool: &str) -> PathBuf {
382        let tool = Self::normalize_tool_name(tool);
383        home_dir
384            .join(".config")
385            .join("atm")
386            .join("logs")
387            .join(&tool)
388            .join(format!("{tool}.log.jsonl"))
389    }
390
391    fn canonical_spool_dir(home_dir: &Path, tool: &str) -> PathBuf {
392        let tool = Self::normalize_tool_name(tool);
393        home_dir
394            .join(".config")
395            .join("atm")
396            .join("logs")
397            .join(tool)
398            .join("spool")
399    }
400
401    fn spool_dir_from_log_path(log_path: &Path) -> PathBuf {
402        log_path
403            .parent()
404            .unwrap_or_else(|| Path::new("."))
405            .join("spool")
406    }
407
408    pub fn from_home(home_dir: &Path) -> Self {
409        Self::from_home_for_tool(home_dir, "atm")
410    }
411
412    pub fn from_home_for_tool(home_dir: &Path, tool: &str) -> Self {
413        let log_path = std::env::var("ATM_LOG_FILE")
414            .or_else(|_| std::env::var("ATM_LOG_PATH"))
415            .map(PathBuf::from)
416            .unwrap_or_else(|_| Self::canonical_log_path(home_dir, tool));
417
418        let spool_dir =
419            if std::env::var("ATM_LOG_FILE").is_ok() || std::env::var("ATM_LOG_PATH").is_ok() {
420                Self::spool_dir_from_log_path(&log_path)
421            } else {
422                Self::canonical_spool_dir(home_dir, tool)
423            };
424        let level = std::env::var("ATM_LOG")
425            .ok()
426            .and_then(|v| LogLevel::from_str(&v).ok())
427            .unwrap_or(LogLevel::Info);
428        let message_preview_enabled = std::env::var("ATM_LOG_MSG")
429            .ok()
430            .map(|v| v.trim() == "1")
431            .unwrap_or(false);
432        let max_bytes = std::env::var("ATM_LOG_MAX_BYTES")
433            .ok()
434            .and_then(|v| v.parse::<u64>().ok())
435            .unwrap_or(DEFAULT_MAX_BYTES);
436        let max_files = std::env::var("ATM_LOG_MAX_FILES")
437            .ok()
438            .and_then(|v| v.parse::<u32>().ok())
439            .unwrap_or(DEFAULT_MAX_FILES);
440        let retention_days = std::env::var("ATM_LOG_RETENTION_DAYS")
441            .ok()
442            .and_then(|v| v.parse::<u32>().ok())
443            .filter(|days| *days > 0)
444            .unwrap_or(DEFAULT_RETENTION_DAYS);
445
446        Self {
447            log_path,
448            spool_dir,
449            level,
450            message_preview_enabled,
451            max_bytes,
452            max_files,
453            retention_days,
454            queue_capacity: DEFAULT_QUEUE_CAPACITY,
455            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
456        }
457    }
458}
459
460#[derive(Debug, Error)]
461pub enum LoggerError {
462    #[error("event validation failed: {0}")]
463    Validation(#[from] ValidationError),
464    #[error("failed to serialize log event: {0}")]
465    Serialize(#[from] serde_json::Error),
466    #[error("i/o error: {0}")]
467    Io(#[from] std::io::Error),
468    #[error("event exceeds configured size guard: {size} > {max}")]
469    EventTooLarge { size: usize, max: usize },
470}
471
472#[derive(Debug, Clone)]
473pub struct Logger {
474    config: LogConfig,
475    otel: OtelPipeline,
476}
477
478/// Apply canonical redaction rules to a logging event.
479pub fn redact_event(event: &mut LogEventV1) {
480    event.redact();
481}
482
483impl Logger {
484    pub fn new(config: LogConfig) -> Self {
485        let otel = OtelPipeline::new_default(&config.log_path);
486        Self { config, otel }
487    }
488
489    pub fn with_otel_exporter(
490        config: LogConfig,
491        otel_config: OtelConfig,
492        exporter: Arc<dyn OtelExporter>,
493    ) -> Self {
494        Self {
495            config,
496            otel: OtelPipeline {
497                config: otel_config,
498                exporter,
499                sleeper: std::thread::sleep,
500            },
501        }
502    }
503
504    pub fn config(&self) -> &LogConfig {
505        &self.config
506    }
507
508    /// Validate, redact, and append an event to the canonical JSONL log.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error when validation fails, serialization fails, the event
513    /// exceeds `max_event_bytes`, or filesystem writes fail.
514    pub fn emit(&self, event: &LogEventV1) -> Result<(), LoggerError> {
515        let line = self.prepare_line(event)?;
516        self.append_line_to_canonical(&line)?;
517        let _ = self.otel.export_event(event);
518        Ok(())
519    }
520
521    /// Convenience helper for tools that only need action/outcome + fields.
522    ///
523    /// This builds a [`LogEventV1`] with the configured log level and emits it
524    /// through the same validation/redaction/path pipeline as [`Self::emit`].
525    pub fn emit_action(
526        &self,
527        source_binary: &str,
528        target: &str,
529        action: &str,
530        outcome: Option<&str>,
531        fields: serde_json::Value,
532    ) -> Result<(), LoggerError> {
533        let mut event = LogEventV1::builder(source_binary, action, target)
534            .level(self.config.level.as_str())
535            .build();
536        event.outcome = outcome.map(ToOwned::to_owned);
537        event.fields = value_to_map(fields);
538        self.emit(&event)
539    }
540    /// Write a human-readable log line to the canonical log path.
541    ///
542    /// Produces `<timestamp> level=<level> action=<action> outcome=<outcome> fields=<json>`
543    /// format, sharing the same file-path and directory-creation logic as
544    /// [`Self::emit`]. This routes Human-mode output through SharedLogger rather
545    /// than a parallel per-tool implementation.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error when directory creation or file appending fails.
550    pub fn emit_human(
551        &self,
552        level: &str,
553        action: &str,
554        outcome: &str,
555        fields: &serde_json::Value,
556    ) -> Result<(), LoggerError> {
557        use chrono::{SecondsFormat, Utc};
558        let ts = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
559        let fields_json = serde_json::to_string(fields).unwrap_or_else(|_| "{}".to_string());
560        let line =
561            format!("{ts} level={level} action={action} outcome={outcome} fields={fields_json}");
562        self.append_line_to_canonical(&line)?;
563        Ok(())
564    }
565
566    /// Write one event to a per-source spool file for deferred fan-in merge.
567    ///
568    /// # Errors
569    ///
570    /// Returns an error when validation/serialization fails, the event exceeds
571    /// `max_event_bytes`, or spool file creation/appending fails.
572    pub fn write_to_spool(
573        &self,
574        event: &LogEventV1,
575        unix_millis: u128,
576    ) -> Result<PathBuf, LoggerError> {
577        let line = self.prepare_line(event)?;
578        fs::create_dir_all(&self.config.spool_dir)?;
579
580        let name = spool_file_name(&event.source_binary, event.pid, unix_millis);
581        let path = self.config.spool_dir.join(name);
582        let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
583        writeln!(file, "{line}")?;
584        Ok(path)
585    }
586
587    /// Merge spool fragments into the canonical log in deterministic order.
588    ///
589    /// Supports crash-recovery of stale `.claiming` files from interrupted
590    /// prior merges.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error when reading the spool directory or writing to the
595    /// canonical log fails.
596    pub fn merge_spool(&self) -> Result<u64, LoggerError> {
597        if !self.config.spool_dir.exists() {
598            return Ok(0);
599        }
600
601        let mut spool_files: Vec<PathBuf> = fs::read_dir(&self.config.spool_dir)?
602            .filter_map(|entry| entry.ok().map(|e| e.path()))
603            .filter(|path| {
604                path.is_file()
605                    && path
606                        .extension()
607                        .and_then(|ext| ext.to_str())
608                        .map(|ext| ext == "jsonl" || ext == "claiming")
609                        .unwrap_or(false)
610            })
611            .collect();
612        spool_files.sort();
613
614        let mut claimed_files: Vec<PathBuf> = Vec::new();
615        let mut events: Vec<(LogEventV1, String)> = Vec::new();
616
617        for path in spool_files {
618            let claiming = if path
619                .extension()
620                .and_then(|ext| ext.to_str())
621                .is_some_and(|ext| ext == "claiming")
622            {
623                path.clone()
624            } else {
625                let claiming = path.with_extension("claiming");
626                if fs::rename(&path, &claiming).is_err() {
627                    continue;
628                }
629                claiming
630            };
631            let ordering_key = claiming
632                .file_name()
633                .and_then(|n| n.to_str())
634                .unwrap_or_default()
635                .to_string();
636
637            let content = match fs::read_to_string(&claiming) {
638                Ok(content) => content,
639                Err(_) => {
640                    let _ = fs::remove_file(&claiming);
641                    continue;
642                }
643            };
644            for line in content.lines() {
645                let trimmed = line.trim();
646                if trimmed.is_empty() {
647                    continue;
648                }
649                if let Ok(event) = serde_json::from_str::<LogEventV1>(trimmed) {
650                    events.push((event, ordering_key.clone()));
651                }
652            }
653            claimed_files.push(claiming);
654        }
655
656        events.sort_by(|(a, file_a), (b, file_b)| a.ts.cmp(&b.ts).then(file_a.cmp(file_b)));
657
658        let mut merged = 0_u64;
659        for (event, _) in events {
660            let line = serde_json::to_string(&event)?;
661            if line.len() > self.config.max_event_bytes {
662                continue;
663            }
664            self.append_line_to_canonical(&line)?;
665            merged += 1;
666        }
667
668        for claimed in claimed_files {
669            let _ = fs::remove_file(claimed);
670        }
671
672        Ok(merged)
673    }
674
675    fn prepare_line(&self, event: &LogEventV1) -> Result<String, LoggerError> {
676        let mut event = event.clone();
677        event.validate()?;
678        redact_event(&mut event);
679        let line = serde_json::to_string(&event)?;
680        let size = line.len();
681        if size > self.config.max_event_bytes {
682            return Err(LoggerError::EventTooLarge {
683                size,
684                max: self.config.max_event_bytes,
685            });
686        }
687        Ok(line)
688    }
689
690    fn append_line_to_canonical(&self, line: &str) -> Result<(), LoggerError> {
691        static CANONICAL_APPEND_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
692        let lock = CANONICAL_APPEND_LOCK.get_or_init(|| Mutex::new(()));
693        let _guard = lock.lock().expect("canonical append lock poisoned");
694
695        if let Some(parent) = self.config.log_path.parent() {
696            fs::create_dir_all(parent)?;
697        }
698
699        self.rotate_if_needed()?;
700
701        let mut file = OpenOptions::new()
702            .create(true)
703            .append(true)
704            .open(&self.config.log_path)?;
705        writeln!(file, "{line}")?;
706        Ok(())
707    }
708
709    fn rotate_if_needed(&self) -> Result<(), LoggerError> {
710        let current_size = fs::metadata(&self.config.log_path)
711            .map(|m| m.len())
712            .unwrap_or(0);
713        if current_size < self.config.max_bytes {
714            return Ok(());
715        }
716        rotate_log_files(&self.config.log_path, self.config.max_files)?;
717        Ok(())
718    }
719}
720
721/// Export a single event to OTel using default pipeline settings (fail-open).
722///
723/// This helper is intended for producers that already own canonical JSONL
724/// writing and only need shared OTel export semantics. It creates an
725/// [`OtelPipeline`] from the given log path using default configuration.
726///
727/// For callers that already have an exporter, use [`export_otel_best_effort`]
728/// instead.
729pub fn export_otel_best_effort_from_path(log_path: &Path, event: &LogEventV1) {
730    let pipeline = OtelPipeline::new_default(log_path);
731    export_otel_best_effort_with_pipeline(&pipeline, event);
732}
733
734fn export_otel_best_effort_with_pipeline(pipeline: &OtelPipeline, event: &LogEventV1) {
735    let _ = pipeline.export_event(event);
736}
737
738pub fn spool_file_name(source_binary: &str, pid: u32, unix_millis: u128) -> String {
739    let sanitized = sanitize_source_binary(source_binary);
740    format!("{}-{}-{}.jsonl", sanitized, pid, unix_millis)
741}
742
743fn sanitize_source_binary(source_binary: &str) -> String {
744    let mut out = String::with_capacity(source_binary.len());
745    for ch in source_binary.chars() {
746        if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
747            out.push(ch);
748        } else {
749            out.push('_');
750        }
751    }
752    if out.is_empty() {
753        "unknown".to_string()
754    } else {
755        out
756    }
757}
758
759fn rotate_log_files(base: &Path, max_files: u32) -> Result<(), LoggerError> {
760    if max_files == 0 {
761        let _ = fs::remove_file(base);
762        return Ok(());
763    }
764
765    let oldest = rotation_path(base, max_files);
766    let _ = fs::remove_file(&oldest);
767
768    for idx in (1..max_files).rev() {
769        let from = rotation_path(base, idx);
770        let to = rotation_path(base, idx + 1);
771        if from.exists() {
772            let _ = fs::rename(&from, &to);
773        }
774    }
775
776    if base.exists() {
777        let first = rotation_path(base, 1);
778        fs::rename(base, first)?;
779    }
780    Ok(())
781}
782
783fn rotation_path(base: &Path, n: u32) -> PathBuf {
784    let mut os = base.as_os_str().to_os_string();
785    os.push(format!(".{n}"));
786    PathBuf::from(os)
787}
788
789fn value_to_map(value: serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
790    match value {
791        serde_json::Value::Object(map) => map,
792        other => {
793            let mut map = serde_json::Map::new();
794            map.insert("value".to_string(), other);
795            map
796        }
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use agent_team_mail_core::logging_event::new_log_event;
804    use serial_test::serial;
805    use std::sync::Mutex;
806    use std::sync::OnceLock;
807    use std::sync::atomic::{AtomicUsize, Ordering};
808    use tempfile::TempDir;
809
810    #[derive(Default)]
811    struct CountingExporter {
812        attempts: AtomicUsize,
813        fail_for: AtomicUsize,
814        records: Mutex<Vec<OtelRecord>>,
815    }
816
817    impl CountingExporter {
818        fn with_failures(failures: usize) -> Self {
819            Self {
820                attempts: AtomicUsize::new(0),
821                fail_for: AtomicUsize::new(failures),
822                records: Mutex::new(Vec::new()),
823            }
824        }
825    }
826
827    impl OtelExporter for CountingExporter {
828        fn export(&self, record: &OtelRecord) -> Result<(), OtelError> {
829            let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
830            let fail_for = self.fail_for.load(Ordering::SeqCst);
831            if attempt <= fail_for {
832                return Err(OtelError::ExportFailed(
833                    "simulated transport outage".to_string(),
834                ));
835            }
836            self.records
837                .lock()
838                .expect("records lock")
839                .push(record.clone());
840            Ok(())
841        }
842    }
843
844    fn make_event(ts: &str) -> LogEventV1 {
845        let mut event = new_log_event("atm", "test_action", "atm::test", "info");
846        event.ts = ts.to_string();
847        event
848    }
849
850    static BACKOFF_SLEEPS_MS: OnceLock<Mutex<Vec<u64>>> = OnceLock::new();
851
852    fn record_sleep(duration: Duration) {
853        BACKOFF_SLEEPS_MS
854            .get_or_init(|| Mutex::new(Vec::new()))
855            .lock()
856            .expect("backoff sleeps lock")
857            .push(duration.as_millis() as u64);
858    }
859
860    #[test]
861    #[serial]
862    fn config_defaults_and_env_overrides() {
863        let tmp = TempDir::new().expect("temp dir");
864        let custom_log = tmp.path().join("custom-atm.log");
865        let home_root = tmp.path().join("home-root");
866        // SAFETY: test-scoped env mutation.
867        unsafe {
868            std::env::set_var("ATM_LOG", "debug");
869            std::env::set_var("ATM_LOG_MSG", "1");
870            std::env::set_var("ATM_LOG_FILE", &custom_log);
871            std::env::set_var("ATM_LOG_MAX_BYTES", "1024");
872            std::env::set_var("ATM_LOG_MAX_FILES", "7");
873            std::env::set_var("ATM_LOG_RETENTION_DAYS", "9");
874        }
875        let cfg = LogConfig::from_home(&home_root);
876        assert_eq!(cfg.level, LogLevel::Debug);
877        assert!(cfg.message_preview_enabled);
878        assert_eq!(cfg.log_path, custom_log);
879        assert_eq!(cfg.spool_dir, tmp.path().join("spool"));
880        assert_eq!(cfg.max_bytes, 1024);
881        assert_eq!(cfg.max_files, 7);
882        assert_eq!(cfg.retention_days, 9);
883        assert_eq!(cfg.queue_capacity, DEFAULT_QUEUE_CAPACITY);
884        assert_eq!(cfg.max_event_bytes, DEFAULT_MAX_EVENT_BYTES);
885        // SAFETY: cleanup after test.
886        unsafe {
887            std::env::remove_var("ATM_LOG");
888            std::env::remove_var("ATM_LOG_MSG");
889            std::env::remove_var("ATM_LOG_FILE");
890            std::env::remove_var("ATM_LOG_MAX_BYTES");
891            std::env::remove_var("ATM_LOG_MAX_FILES");
892            std::env::remove_var("ATM_LOG_RETENTION_DAYS");
893        }
894    }
895
896    #[test]
897    #[serial]
898    fn config_default_paths_follow_tool_scoped_contract() {
899        let tmp = TempDir::new().expect("temp dir");
900        // SAFETY: test-scoped env cleanup to force default path resolution.
901        unsafe {
902            std::env::remove_var("ATM_LOG_FILE");
903            std::env::remove_var("ATM_LOG_PATH");
904        }
905
906        let cfg = LogConfig::from_home_for_tool(tmp.path(), "atm-daemon");
907        assert_eq!(
908            cfg.log_path,
909            tmp.path()
910                .join(".config/atm/logs/atm-daemon/atm-daemon.log.jsonl")
911        );
912        assert_eq!(
913            cfg.spool_dir,
914            tmp.path().join(".config/atm/logs/atm-daemon/spool")
915        );
916    }
917
918    #[test]
919    fn span_ref_v1_round_trip_serialization() {
920        let span = SpanRefV1 {
921            name: "compose".to_string(),
922            trace_id: "trace-123".to_string(),
923            span_id: "span-456".to_string(),
924            parent_span_id: None,
925            fields: serde_json::Map::new(),
926        };
927        let json = serde_json::to_string(&span).expect("serialize span");
928        let decoded: SpanRefV1 = serde_json::from_str(&json).expect("deserialize span");
929        assert_eq!(decoded, span);
930    }
931
932    #[test]
933    fn span_ref_v1_fields_are_non_empty_after_construction() {
934        let span = SpanRefV1 {
935            name: "compose".to_string(),
936            trace_id: "trace-abc".to_string(),
937            span_id: "span-def".to_string(),
938            parent_span_id: None,
939            fields: serde_json::Map::new(),
940        };
941        assert!(!span.trace_id.is_empty());
942        assert!(!span.span_id.is_empty());
943    }
944
945    #[test]
946    fn spool_filename_format_matches_contract() {
947        let name = spool_file_name("atm-daemon", 44201, 123456789);
948        assert_eq!(name, "atm-daemon-44201-123456789.jsonl");
949    }
950
951    #[test]
952    fn spool_filename_sanitizes_windows_unsafe_chars() {
953        let name = spool_file_name(r"atm\daemon:core?*", 44201, 123456789);
954        assert_eq!(name, "atm_daemon_core__-44201-123456789.jsonl");
955    }
956
957    #[test]
958    fn emit_rotates_file() {
959        let tmp = TempDir::new().expect("temp dir");
960        let cfg = LogConfig {
961            log_path: tmp.path().join("atm.log.jsonl"),
962            spool_dir: tmp.path().join("log-spool"),
963            level: LogLevel::Info,
964            message_preview_enabled: false,
965            max_bytes: 1,
966            max_files: 2,
967            retention_days: DEFAULT_RETENTION_DAYS,
968            queue_capacity: DEFAULT_QUEUE_CAPACITY,
969            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
970        };
971        let logger = Logger::new(cfg);
972
973        let ev1 = make_event("2026-03-09T00:00:01Z");
974        logger.emit(&ev1).expect("first emit");
975        let ev2 = make_event("2026-03-09T00:00:02Z");
976        logger.emit(&ev2).expect("second emit");
977
978        assert!(logger.config.log_path.exists());
979        assert!(rotation_path(&logger.config.log_path, 1).exists());
980    }
981
982    #[test]
983    fn emit_rejects_event_larger_than_configured_guard() {
984        let tmp = TempDir::new().expect("temp dir");
985        let cfg = LogConfig {
986            log_path: tmp.path().join("atm.log.jsonl"),
987            spool_dir: tmp.path().join("log-spool"),
988            level: LogLevel::Info,
989            message_preview_enabled: false,
990            max_bytes: DEFAULT_MAX_BYTES,
991            max_files: DEFAULT_MAX_FILES,
992            retention_days: DEFAULT_RETENTION_DAYS,
993            queue_capacity: DEFAULT_QUEUE_CAPACITY,
994            max_event_bytes: 256,
995        };
996        let logger = Logger::new(cfg);
997
998        let mut event = make_event("2026-03-09T00:00:01Z");
999        event.fields.insert(
1000            "blob".to_string(),
1001            serde_json::Value::String("x".repeat(2048)),
1002        );
1003        let err = logger.emit(&event).expect_err("expected size guard error");
1004        assert!(matches!(err, LoggerError::EventTooLarge { .. }));
1005    }
1006
1007    #[test]
1008    fn merge_spool_sorts_by_timestamp_and_deletes_claimed_files() {
1009        let tmp = TempDir::new().expect("temp dir");
1010        let cfg = LogConfig {
1011            log_path: tmp.path().join("atm.log.jsonl"),
1012            spool_dir: tmp.path().join("log-spool"),
1013            level: LogLevel::Info,
1014            message_preview_enabled: false,
1015            max_bytes: DEFAULT_MAX_BYTES,
1016            max_files: DEFAULT_MAX_FILES,
1017            retention_days: DEFAULT_RETENTION_DAYS,
1018            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1019            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1020        };
1021        let logger = Logger::new(cfg.clone());
1022
1023        let ev_late = make_event("2026-03-09T00:00:05Z");
1024        let ev_early = make_event("2026-03-09T00:00:01Z");
1025        logger
1026            .write_to_spool(&ev_late, 2000)
1027            .expect("write late spool");
1028        logger
1029            .write_to_spool(&ev_early, 1000)
1030            .expect("write early spool");
1031
1032        let merged = logger.merge_spool().expect("merge spool");
1033        assert_eq!(merged, 2);
1034
1035        let lines: Vec<String> = fs::read_to_string(&cfg.log_path)
1036            .expect("read canonical log")
1037            .lines()
1038            .map(str::to_string)
1039            .collect();
1040        assert_eq!(lines.len(), 2);
1041        let parsed0: LogEventV1 = serde_json::from_str(&lines[0]).expect("line 0 parse");
1042        let parsed1: LogEventV1 = serde_json::from_str(&lines[1]).expect("line 1 parse");
1043        assert_eq!(parsed0.ts, "2026-03-09T00:00:01Z");
1044        assert_eq!(parsed1.ts, "2026-03-09T00:00:05Z");
1045
1046        let leftover: Vec<_> = fs::read_dir(&cfg.spool_dir)
1047            .expect("spool dir")
1048            .filter_map(|e| e.ok())
1049            .collect();
1050        assert!(
1051            leftover.is_empty(),
1052            "spool files should be deleted after merge"
1053        );
1054    }
1055
1056    #[test]
1057    fn merge_spool_recovers_stale_claiming_files() {
1058        let tmp = TempDir::new().expect("temp dir");
1059        let cfg = LogConfig {
1060            log_path: tmp.path().join("atm.log.jsonl"),
1061            spool_dir: tmp.path().join("log-spool"),
1062            level: LogLevel::Info,
1063            message_preview_enabled: false,
1064            max_bytes: DEFAULT_MAX_BYTES,
1065            max_files: DEFAULT_MAX_FILES,
1066            retention_days: DEFAULT_RETENTION_DAYS,
1067            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1068            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1069        };
1070        fs::create_dir_all(&cfg.spool_dir).expect("create spool dir");
1071        let stale_claiming = cfg.spool_dir.join("atm-44201-1000.claiming");
1072        let ev = make_event("2026-03-09T00:00:01Z");
1073        fs::write(
1074            &stale_claiming,
1075            format!("{}\n", serde_json::to_string(&ev).expect("serialize")),
1076        )
1077        .expect("write stale claiming");
1078
1079        let logger = Logger::new(cfg.clone());
1080        let merged = logger.merge_spool().expect("merge spool");
1081        assert_eq!(merged, 1);
1082        assert!(!stale_claiming.exists());
1083
1084        let log_content = fs::read_to_string(&cfg.log_path).expect("read log");
1085        let lines: Vec<_> = log_content.lines().collect();
1086        assert_eq!(lines.len(), 1);
1087    }
1088
1089    #[test]
1090    fn write_to_spool_creates_dir_and_appends() {
1091        let tmp = TempDir::new().expect("temp dir");
1092        let cfg = LogConfig {
1093            log_path: tmp.path().join("atm.log.jsonl"),
1094            spool_dir: tmp.path().join("log-spool"),
1095            level: LogLevel::Info,
1096            message_preview_enabled: false,
1097            max_bytes: DEFAULT_MAX_BYTES,
1098            max_files: DEFAULT_MAX_FILES,
1099            retention_days: DEFAULT_RETENTION_DAYS,
1100            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1101            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1102        };
1103        let logger = Logger::new(cfg);
1104        let ev = make_event("2026-03-09T00:00:01Z");
1105        let path1 = logger.write_to_spool(&ev, 1000).expect("spool write 1");
1106        let path2 = logger.write_to_spool(&ev, 1000).expect("spool write 2");
1107        assert_eq!(path1, path2);
1108        let spool_content = fs::read_to_string(path1).expect("read spool");
1109        let lines: Vec<_> = spool_content.lines().collect();
1110        assert_eq!(lines.len(), 2);
1111    }
1112
1113    #[test]
1114    fn rotate_log_files_max_files_zero_removes_base() {
1115        let tmp = TempDir::new().expect("temp dir");
1116        let base = tmp.path().join("atm.log.jsonl");
1117        fs::write(&base, "line\n").expect("write base");
1118        rotate_log_files(&base, 0).expect("rotate");
1119        assert!(!base.exists());
1120    }
1121
1122    #[test]
1123    fn rotate_log_files_evicts_oldest_when_limit_reached() {
1124        let tmp = TempDir::new().expect("temp dir");
1125        let base = tmp.path().join("atm.log.jsonl");
1126        fs::write(&base, "base\n").expect("write base");
1127        fs::write(rotation_path(&base, 1), "one\n").expect("write .1");
1128        fs::write(rotation_path(&base, 2), "two\n").expect("write .2");
1129
1130        rotate_log_files(&base, 2).expect("rotate");
1131
1132        assert_eq!(
1133            fs::read_to_string(rotation_path(&base, 1)).expect("read .1"),
1134            "base\n"
1135        );
1136        assert_eq!(
1137            fs::read_to_string(rotation_path(&base, 2)).expect("read .2"),
1138            "one\n"
1139        );
1140        assert!(!rotation_path(&base, 3).exists());
1141    }
1142
1143    #[test]
1144    fn socket_error_codes_match_contract() {
1145        assert_eq!(SOCKET_ERROR_VERSION_MISMATCH, "VERSION_MISMATCH");
1146        assert_eq!(SOCKET_ERROR_INVALID_PAYLOAD, "INVALID_PAYLOAD");
1147        assert_eq!(SOCKET_ERROR_INTERNAL_ERROR, "INTERNAL_ERROR");
1148    }
1149
1150    #[test]
1151    fn emit_action_writes_schema_compatible_event() {
1152        let tmp = TempDir::new().expect("temp dir");
1153        let cfg = LogConfig {
1154            log_path: tmp.path().join("atm.log.jsonl"),
1155            spool_dir: tmp.path().join("log-spool"),
1156            level: LogLevel::Info,
1157            message_preview_enabled: false,
1158            max_bytes: DEFAULT_MAX_BYTES,
1159            max_files: DEFAULT_MAX_FILES,
1160            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1161            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1162            retention_days: DEFAULT_RETENTION_DAYS,
1163        };
1164        let logger = Logger::new(cfg.clone());
1165
1166        logger
1167            .emit_action(
1168                "sc-compose",
1169                "sc_compose::cli",
1170                "command_end",
1171                Some("success"),
1172                serde_json::json!({"code": 0}),
1173            )
1174            .expect("emit action");
1175
1176        let lines: Vec<_> = fs::read_to_string(&cfg.log_path)
1177            .expect("read log")
1178            .lines()
1179            .map(str::to_string)
1180            .collect();
1181        assert_eq!(lines.len(), 1);
1182        let parsed: LogEventV1 = serde_json::from_str(&lines[0]).expect("parse event");
1183        assert_eq!(parsed.source_binary, "sc-compose");
1184        assert_eq!(parsed.action, "command_end");
1185        assert_eq!(parsed.outcome.as_deref(), Some("success"));
1186        assert_eq!(parsed.fields.get("code").and_then(|v| v.as_u64()), Some(0));
1187    }
1188
1189    #[test]
1190    #[serial]
1191    fn otel_default_on_env_override_supported() {
1192        // SAFETY: test-scoped environment mutation.
1193        unsafe {
1194            std::env::remove_var("ATM_OTEL_ENABLED");
1195        }
1196        let default_cfg = OtelConfig::from_env();
1197        assert!(default_cfg.enabled, "OTel should be enabled by default");
1198
1199        // SAFETY: test-scoped environment mutation.
1200        unsafe {
1201            std::env::set_var("ATM_OTEL_ENABLED", "false");
1202        }
1203        let disabled_cfg = OtelConfig::from_env();
1204        assert!(
1205            !disabled_cfg.enabled,
1206            "ATM_OTEL_ENABLED=false should disable exporter"
1207        );
1208        // SAFETY: cleanup after test.
1209        unsafe {
1210            std::env::remove_var("ATM_OTEL_ENABLED");
1211        }
1212    }
1213
1214    #[test]
1215    fn emit_is_fail_open_when_otel_exporter_fails() {
1216        let tmp = TempDir::new().expect("temp dir");
1217        let cfg = LogConfig {
1218            log_path: tmp.path().join("atm.log.jsonl"),
1219            spool_dir: tmp.path().join("log-spool"),
1220            level: LogLevel::Info,
1221            message_preview_enabled: false,
1222            max_bytes: DEFAULT_MAX_BYTES,
1223            max_files: DEFAULT_MAX_FILES,
1224            retention_days: DEFAULT_RETENTION_DAYS,
1225            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1226            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1227        };
1228        let exporter = Arc::new(CountingExporter::with_failures(10));
1229        let logger = Logger::with_otel_exporter(
1230            cfg.clone(),
1231            OtelConfig {
1232                enabled: true,
1233                max_retries: 2,
1234                initial_backoff_ms: 0,
1235                max_backoff_ms: 0,
1236            },
1237            exporter.clone(),
1238        );
1239
1240        let event = new_log_event("atm", "send_message", "atm::send", "info");
1241        logger.emit(&event).expect("emit should not fail");
1242
1243        let log_lines = fs::read_to_string(&cfg.log_path).expect("canonical log should exist");
1244        assert!(
1245            !log_lines.trim().is_empty(),
1246            "canonical log should be written"
1247        );
1248        assert_eq!(
1249            exporter.attempts.load(Ordering::SeqCst),
1250            3,
1251            "initial attempt + 2 retries"
1252        );
1253        assert!(
1254            exporter.records.lock().expect("records lock").is_empty(),
1255            "all export attempts should fail in this test"
1256        );
1257    }
1258
1259    #[test]
1260    fn export_otel_best_effort_from_path_is_fail_open_when_export_fails() {
1261        let tmp = TempDir::new().expect("temp dir");
1262        let parent_file = tmp.path().join("not-a-directory");
1263        std::fs::write(&parent_file, "occupied").expect("create parent file");
1264        let log_path = parent_file.join("atm.log.jsonl");
1265        let event = new_log_event("atm-daemon", "register_hint", "atm_daemon::socket", "info");
1266
1267        // Must not panic or propagate errors when exporter cannot create its output path.
1268        export_otel_best_effort_from_path(&log_path, &event);
1269    }
1270
1271    #[test]
1272    fn otel_exporter_retries_then_succeeds() {
1273        let tmp = TempDir::new().expect("temp dir");
1274        let cfg = LogConfig {
1275            log_path: tmp.path().join("atm.log.jsonl"),
1276            spool_dir: tmp.path().join("log-spool"),
1277            level: LogLevel::Info,
1278            message_preview_enabled: false,
1279            max_bytes: DEFAULT_MAX_BYTES,
1280            max_files: DEFAULT_MAX_FILES,
1281            retention_days: DEFAULT_RETENTION_DAYS,
1282            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1283            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1284        };
1285        let exporter = Arc::new(CountingExporter::with_failures(2));
1286        let logger = Logger::with_otel_exporter(
1287            cfg,
1288            OtelConfig {
1289                enabled: true,
1290                max_retries: 4,
1291                initial_backoff_ms: 0,
1292                max_backoff_ms: 0,
1293            },
1294            exporter.clone(),
1295        );
1296
1297        let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1298        event.team = Some("atm-dev".to_string());
1299        event.agent = Some("arch-ctm".to_string());
1300        event.runtime = Some("codex".to_string());
1301        event.session_id = Some("local:arch-ctm:123".to_string());
1302        event.trace_id = Some("trace-123".to_string());
1303        event.span_id = Some("span-456".to_string());
1304        event.subagent_id = Some("subagent-7".to_string());
1305
1306        logger.emit(&event).expect("emit should succeed");
1307        assert_eq!(
1308            exporter.attempts.load(Ordering::SeqCst),
1309            3,
1310            "2 failures + 1 success"
1311        );
1312        assert_eq!(
1313            exporter.records.lock().expect("records lock").len(),
1314            1,
1315            "record should be exported after retries"
1316        );
1317    }
1318
1319    #[test]
1320    fn producer_events_export_through_pipeline_with_counting_exporter() {
1321        let tmp = TempDir::new().expect("temp dir");
1322        let cfg = LogConfig {
1323            log_path: tmp.path().join("atm.log.jsonl"),
1324            spool_dir: tmp.path().join("log-spool"),
1325            level: LogLevel::Info,
1326            message_preview_enabled: false,
1327            max_bytes: DEFAULT_MAX_BYTES,
1328            max_files: DEFAULT_MAX_FILES,
1329            retention_days: DEFAULT_RETENTION_DAYS,
1330            queue_capacity: DEFAULT_QUEUE_CAPACITY,
1331            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1332        };
1333        let exporter = Arc::new(CountingExporter::with_failures(0));
1334        let logger = Logger::with_otel_exporter(
1335            cfg,
1336            OtelConfig {
1337                enabled: true,
1338                max_retries: 0,
1339                initial_backoff_ms: 0,
1340                max_backoff_ms: 0,
1341            },
1342            exporter.clone(),
1343        );
1344
1345        for (idx, source, action) in [
1346            (1u8, "atm", "send"),
1347            (2u8, "atm-daemon", "register_hint"),
1348            (3u8, "sc-composer", "compose"),
1349        ] {
1350            let mut event = new_log_event(source, action, "atm::test", "info");
1351            event.team = Some("atm-dev".to_string());
1352            event.agent = Some("arch-ctm".to_string());
1353            event.runtime = Some("codex".to_string());
1354            event.session_id = Some("sess-123".to_string());
1355            event.trace_id = Some("trace-123".to_string());
1356            event.span_id = Some(format!("span-{idx}"));
1357            logger.emit(&event).expect("emit should succeed");
1358        }
1359
1360        let records = exporter.records.lock().expect("records lock");
1361        assert_eq!(records.len(), 3, "all producer events should export");
1362        assert_eq!(
1363            records.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
1364            vec![
1365                "send".to_string(),
1366                "register_hint".to_string(),
1367                "compose".to_string()
1368            ]
1369        );
1370    }
1371
1372    #[test]
1373    fn export_otel_best_effort_is_public_and_fail_open() {
1374        let exporter = CountingExporter::with_failures(10);
1375        let event = new_log_event("atm", "send_message", "atm::send", "info");
1376        export_otel_best_effort(
1377            &event,
1378            &OtelConfig {
1379                enabled: true,
1380                max_retries: 2,
1381                initial_backoff_ms: 0,
1382                max_backoff_ms: 0,
1383            },
1384            &exporter,
1385        );
1386
1387        assert_eq!(
1388            exporter.attempts.load(Ordering::SeqCst),
1389            3,
1390            "initial attempt + 2 retries"
1391        );
1392    }
1393
1394    #[test]
1395    fn otel_retry_backoff_is_bounded_by_max_backoff() {
1396        let sleeps = BACKOFF_SLEEPS_MS.get_or_init(|| Mutex::new(Vec::new()));
1397        sleeps.lock().expect("backoff sleeps lock").clear();
1398
1399        let exporter = CountingExporter::with_failures(10);
1400        let event = new_log_event("atm", "send_message", "atm::send", "info");
1401        let err = export_otel_with_retry(
1402            &event,
1403            &OtelConfig {
1404                enabled: true,
1405                max_retries: 4,
1406                initial_backoff_ms: 5,
1407                max_backoff_ms: 12,
1408            },
1409            &exporter,
1410            record_sleep,
1411        )
1412        .expect_err("should return final export error");
1413        assert!(matches!(err, OtelError::ExportFailed(_)));
1414
1415        let sleeps = sleeps.lock().expect("backoff sleeps lock").clone();
1416        assert_eq!(sleeps, vec![5, 10, 12, 12]);
1417        assert!(
1418            sleeps.iter().all(|v| *v <= 12),
1419            "sleep exceeded max_backoff"
1420        );
1421    }
1422
1423    #[test]
1424    fn build_otel_record_requires_runtime_for_runtime_scoped_events() {
1425        let mut event = new_log_event("atm", "send_message", "atm::send", "info");
1426        event.team = Some("atm-dev".to_string());
1427        event.agent = Some("arch-ctm".to_string());
1428        event.session_id = Some("local:arch-ctm".to_string());
1429        // runtime intentionally missing
1430
1431        let err = build_otel_record(&event).expect_err("runtime should be required");
1432        assert_eq!(err, OtelError::MissingRequiredField { field: "runtime" });
1433    }
1434
1435    #[test]
1436    fn build_otel_record_requires_subagent_id_for_subagent_actions() {
1437        let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1438        event.team = Some("atm-dev".to_string());
1439        event.agent = Some("arch-ctm".to_string());
1440        event.runtime = Some("codex".to_string());
1441        event.session_id = Some("local:arch-ctm".to_string());
1442        event.trace_id = Some("trace-123".to_string());
1443        event.span_id = Some("span-456".to_string());
1444        // subagent_id intentionally missing
1445
1446        let err = build_otel_record(&event).expect_err("subagent_id should be required");
1447        assert_eq!(
1448            err,
1449            OtelError::MissingRequiredField {
1450                field: "subagent_id"
1451            }
1452        );
1453    }
1454
1455    #[test]
1456    fn build_otel_record_requires_full_span_context_when_partial() {
1457        let mut event = new_log_event("atm", "send_message", "atm::send", "info");
1458        event.trace_id = Some("trace-123".to_string());
1459        // span_id intentionally missing
1460        let err = build_otel_record(&event).expect_err("partial span context should fail");
1461        assert_eq!(err, OtelError::InvalidSpanContext);
1462    }
1463
1464    #[test]
1465    fn build_otel_record_includes_required_correlation_attributes() {
1466        let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1467        event.team = Some("atm-dev".to_string());
1468        event.agent = Some("arch-ctm".to_string());
1469        event.runtime = Some("codex".to_string());
1470        event.session_id = Some("local:arch-ctm".to_string());
1471        event.trace_id = Some("trace-123".to_string());
1472        event.span_id = Some("span-456".to_string());
1473        event.subagent_id = Some("subagent-7".to_string());
1474
1475        let record = build_otel_record(&event).expect("record should build");
1476        assert_eq!(record.trace_id.as_deref(), Some("trace-123"));
1477        assert_eq!(record.span_id.as_deref(), Some("span-456"));
1478        assert_eq!(
1479            record.attributes.get("team").and_then(|v| v.as_str()),
1480            Some("atm-dev")
1481        );
1482        assert_eq!(
1483            record.attributes.get("agent").and_then(|v| v.as_str()),
1484            Some("arch-ctm")
1485        );
1486        assert_eq!(
1487            record.attributes.get("runtime").and_then(|v| v.as_str()),
1488            Some("codex")
1489        );
1490        assert_eq!(
1491            record.attributes.get("session_id").and_then(|v| v.as_str()),
1492            Some("local:arch-ctm")
1493        );
1494        assert_eq!(
1495            record
1496                .attributes
1497                .get("subagent_id")
1498                .and_then(|v| v.as_str()),
1499            Some("subagent-7")
1500        );
1501    }
1502}