1use agent_team_mail_core::logging_event::{LogEventV1, ValidationError};
10use std::fs::{self, OpenOptions};
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14use std::sync::{Arc, Mutex, OnceLock};
15use std::time::Duration;
16use thiserror::Error;
17
18pub const DEFAULT_QUEUE_CAPACITY: usize = 4096;
19pub const DEFAULT_MAX_EVENT_BYTES: usize = 64 * 1024;
20pub const DEFAULT_MAX_BYTES: u64 = 50 * 1024 * 1024;
21pub const DEFAULT_MAX_FILES: u32 = 5;
22pub const DEFAULT_RETENTION_DAYS: u32 = 7;
23pub const DEFAULT_OTEL_MAX_RETRIES: u32 = 2;
24pub const DEFAULT_OTEL_INITIAL_BACKOFF_MS: u64 = 25;
25pub const DEFAULT_OTEL_MAX_BACKOFF_MS: u64 = 250;
26
27pub const SOCKET_ERROR_VERSION_MISMATCH: &str = "VERSION_MISMATCH";
28pub const SOCKET_ERROR_INVALID_PAYLOAD: &str = "INVALID_PAYLOAD";
29pub const SOCKET_ERROR_INTERNAL_ERROR: &str = "INTERNAL_ERROR";
30
31#[derive(Debug, Clone)]
32pub struct OtelConfig {
33 pub enabled: bool,
34 pub max_retries: u32,
35 pub initial_backoff_ms: u64,
36 pub max_backoff_ms: u64,
37}
38
39impl Default for OtelConfig {
40 fn default() -> Self {
41 Self {
42 enabled: true,
43 max_retries: DEFAULT_OTEL_MAX_RETRIES,
44 initial_backoff_ms: DEFAULT_OTEL_INITIAL_BACKOFF_MS,
45 max_backoff_ms: DEFAULT_OTEL_MAX_BACKOFF_MS,
46 }
47 }
48}
49
50impl OtelConfig {
51 pub fn from_env() -> Self {
52 let mut cfg = Self::default();
53
54 if let Ok(raw) = std::env::var("ATM_OTEL_ENABLED") {
55 let norm = raw.trim().to_ascii_lowercase();
56 cfg.enabled = !matches!(norm.as_str(), "0" | "false" | "off" | "no");
57 }
58 if let Ok(raw) = std::env::var("ATM_OTEL_MAX_RETRIES")
59 && let Ok(parsed) = raw.parse::<u32>()
60 {
61 cfg.max_retries = parsed;
62 }
63 if let Ok(raw) = std::env::var("ATM_OTEL_INITIAL_BACKOFF_MS")
64 && let Ok(parsed) = raw.parse::<u64>()
65 {
66 cfg.initial_backoff_ms = parsed;
67 }
68 if let Ok(raw) = std::env::var("ATM_OTEL_MAX_BACKOFF_MS")
69 && let Ok(parsed) = raw.parse::<u64>()
70 {
71 cfg.max_backoff_ms = parsed;
72 }
73 if cfg.max_backoff_ms < cfg.initial_backoff_ms {
74 cfg.max_backoff_ms = cfg.initial_backoff_ms;
75 }
76 cfg
77 }
78}
79
80#[derive(Debug, Error, PartialEq, Eq)]
81pub enum OtelError {
82 #[error("missing required correlation field '{field}'")]
83 MissingRequiredField { field: &'static str },
84 #[error(
85 "invalid span context: trace_id and span_id must either both be present or both be absent"
86 )]
87 InvalidSpanContext,
88 #[error("export failed: {0}")]
89 ExportFailed(String),
90}
91
92pub trait OtelExporter: Send + Sync {
93 fn export(&self, record: &OtelRecord) -> Result<(), OtelError>;
94}
95
96#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
97pub struct OtelRecord {
98 pub name: String,
99 pub trace_id: Option<String>,
100 pub span_id: Option<String>,
101 pub attributes: serde_json::Map<String, serde_json::Value>,
102}
103
104#[derive(Debug, Clone)]
105pub struct FileOtelExporter {
106 path: PathBuf,
107}
108
109impl FileOtelExporter {
110 pub fn new(path: PathBuf) -> Self {
111 Self { path }
112 }
113}
114
115impl OtelExporter for FileOtelExporter {
116 fn export(&self, record: &OtelRecord) -> Result<(), OtelError> {
117 if let Some(parent) = self.path.parent()
118 && let Err(err) = fs::create_dir_all(parent)
119 {
120 return Err(OtelError::ExportFailed(err.to_string()));
121 }
122 let mut file = OpenOptions::new()
123 .create(true)
124 .append(true)
125 .open(&self.path)
126 .map_err(|err| OtelError::ExportFailed(err.to_string()))?;
127 let line = serde_json::to_string(record)
128 .map_err(|err| OtelError::ExportFailed(err.to_string()))?;
129 writeln!(file, "{line}").map_err(|err| OtelError::ExportFailed(err.to_string()))
130 }
131}
132
133#[derive(Clone)]
134struct OtelPipeline {
135 config: OtelConfig,
136 exporter: Arc<dyn OtelExporter>,
137 sleeper: fn(Duration),
138}
139
140impl std::fmt::Debug for OtelPipeline {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("OtelPipeline")
143 .field("config", &self.config)
144 .finish()
145 }
146}
147
148impl OtelPipeline {
149 fn new_default(log_path: &Path) -> Self {
150 let mut otel_path = log_path.to_path_buf();
151 let stem = log_path
152 .file_stem()
153 .and_then(|s| s.to_str())
154 .unwrap_or("telemetry");
155 otel_path.set_file_name(format!("{stem}.otel.jsonl"));
156 Self {
157 config: OtelConfig::from_env(),
158 exporter: Arc::new(FileOtelExporter::new(otel_path)),
159 sleeper: std::thread::sleep,
160 }
161 }
162
163 fn export_event(&self, event: &LogEventV1) -> Result<(), OtelError> {
164 export_otel_with_retry(event, &self.config, self.exporter.as_ref(), self.sleeper)
165 }
166}
167
168fn export_otel_with_retry(
169 event: &LogEventV1,
170 config: &OtelConfig,
171 exporter: &dyn OtelExporter,
172 sleeper: fn(Duration),
173) -> Result<(), OtelError> {
174 if !config.enabled {
175 return Ok(());
176 }
177 let record = build_otel_record(event)?;
178
179 let mut attempt: u32 = 0;
180 let mut backoff = config.initial_backoff_ms;
181 loop {
182 match exporter.export(&record) {
183 Ok(()) => return Ok(()),
184 Err(err) => {
185 if attempt >= config.max_retries {
186 return Err(err);
187 }
188 sleeper(Duration::from_millis(backoff));
189 backoff = backoff.saturating_mul(2).min(config.max_backoff_ms);
190 attempt = attempt.saturating_add(1);
191 }
192 }
193 }
194}
195
196pub fn export_otel_best_effort(
201 event: &LogEventV1,
202 config: &OtelConfig,
203 exporter: &dyn OtelExporter,
204) {
205 let _ = export_otel_with_retry(event, config, exporter, std::thread::sleep);
206}
207
208fn build_otel_record(event: &LogEventV1) -> Result<OtelRecord, OtelError> {
209 let runtime_scoped = event.team.is_some()
210 || event.agent.is_some()
211 || event.runtime.is_some()
212 || event.session_id.is_some();
213 if runtime_scoped {
214 for (field, value) in [
215 ("team", event.team.as_deref()),
216 ("agent", event.agent.as_deref()),
217 ("runtime", event.runtime.as_deref()),
218 ("session_id", event.session_id.as_deref()),
219 ] {
220 if value.is_none_or(|v| v.trim().is_empty()) {
221 return Err(OtelError::MissingRequiredField { field });
222 }
223 }
224 }
225
226 let has_trace = event
227 .trace_id
228 .as_deref()
229 .is_some_and(|value| !value.trim().is_empty());
230 let has_span = event
231 .span_id
232 .as_deref()
233 .is_some_and(|value| !value.trim().is_empty());
234 if has_trace != has_span {
235 return Err(OtelError::InvalidSpanContext);
236 }
237
238 let subagent_scoped = event.subagent_id.is_some() || event.action.starts_with("subagent.");
239 if subagent_scoped {
240 if event
241 .subagent_id
242 .as_deref()
243 .is_none_or(|value| value.trim().is_empty())
244 {
245 return Err(OtelError::MissingRequiredField {
246 field: "subagent_id",
247 });
248 }
249 for (field, value) in [
250 ("team", event.team.as_deref()),
251 ("agent", event.agent.as_deref()),
252 ("runtime", event.runtime.as_deref()),
253 ("session_id", event.session_id.as_deref()),
254 ("trace_id", event.trace_id.as_deref()),
255 ("span_id", event.span_id.as_deref()),
256 ] {
257 if value.is_none_or(|v| v.trim().is_empty()) {
258 return Err(OtelError::MissingRequiredField { field });
259 }
260 }
261 }
262
263 let mut attributes = serde_json::Map::new();
264 if let Some(team) = event.team.as_ref() {
265 attributes.insert("team".to_string(), serde_json::Value::String(team.clone()));
266 }
267 if let Some(agent) = event.agent.as_ref() {
268 attributes.insert(
269 "agent".to_string(),
270 serde_json::Value::String(agent.clone()),
271 );
272 }
273 if let Some(runtime) = event.runtime.as_ref() {
274 attributes.insert(
275 "runtime".to_string(),
276 serde_json::Value::String(runtime.clone()),
277 );
278 }
279 if let Some(session_id) = event.session_id.as_ref() {
280 attributes.insert(
281 "session_id".to_string(),
282 serde_json::Value::String(session_id.clone()),
283 );
284 }
285 if let Some(subagent_id) = event.subagent_id.as_ref() {
286 attributes.insert(
287 "subagent_id".to_string(),
288 serde_json::Value::String(subagent_id.clone()),
289 );
290 }
291 attributes.insert(
292 "source_binary".to_string(),
293 serde_json::Value::String(event.source_binary.clone()),
294 );
295 attributes.insert(
296 "target".to_string(),
297 serde_json::Value::String(event.target.clone()),
298 );
299 attributes.insert(
300 "action".to_string(),
301 serde_json::Value::String(event.action.clone()),
302 );
303
304 Ok(OtelRecord {
305 name: event.action.clone(),
306 trace_id: event.trace_id.clone(),
307 span_id: event.span_id.clone(),
308 attributes,
309 })
310}
311
312pub use agent_team_mail_core::logging_event::SpanRefV1;
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub enum LogLevel {
316 Trace,
317 Debug,
318 Info,
319 Warn,
320 Error,
321}
322
323impl LogLevel {
324 pub fn as_str(self) -> &'static str {
325 match self {
326 Self::Trace => "trace",
327 Self::Debug => "debug",
328 Self::Info => "info",
329 Self::Warn => "warn",
330 Self::Error => "error",
331 }
332 }
333}
334
335impl FromStr for LogLevel {
336 type Err = ();
337
338 fn from_str(s: &str) -> Result<Self, Self::Err> {
339 match s.trim().to_ascii_lowercase().as_str() {
340 "trace" => Ok(Self::Trace),
341 "debug" => Ok(Self::Debug),
342 "info" => Ok(Self::Info),
343 "warn" => Ok(Self::Warn),
344 "error" => Ok(Self::Error),
345 _ => Err(()),
346 }
347 }
348}
349
350#[derive(Debug, Clone)]
351pub struct LogConfig {
352 pub log_path: PathBuf,
353 pub spool_dir: PathBuf,
354 pub level: LogLevel,
355 pub message_preview_enabled: bool,
356 pub max_bytes: u64,
357 pub max_files: u32,
358 pub retention_days: u32,
359 pub queue_capacity: usize,
360 pub max_event_bytes: usize,
361}
362
363impl LogConfig {
364 fn normalize_tool_name(tool: &str) -> String {
365 let trimmed = tool.trim();
366 if trimmed.is_empty() {
367 return "atm".to_string();
368 }
369 trimmed
370 .chars()
371 .map(|ch| {
372 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
373 ch
374 } else {
375 '_'
376 }
377 })
378 .collect()
379 }
380
381 fn canonical_log_path(home_dir: &Path, tool: &str) -> PathBuf {
382 let tool = Self::normalize_tool_name(tool);
383 home_dir
384 .join(".config")
385 .join("atm")
386 .join("logs")
387 .join(&tool)
388 .join(format!("{tool}.log.jsonl"))
389 }
390
391 fn canonical_spool_dir(home_dir: &Path, tool: &str) -> PathBuf {
392 let tool = Self::normalize_tool_name(tool);
393 home_dir
394 .join(".config")
395 .join("atm")
396 .join("logs")
397 .join(tool)
398 .join("spool")
399 }
400
401 fn spool_dir_from_log_path(log_path: &Path) -> PathBuf {
402 log_path
403 .parent()
404 .unwrap_or_else(|| Path::new("."))
405 .join("spool")
406 }
407
408 pub fn from_home(home_dir: &Path) -> Self {
409 Self::from_home_for_tool(home_dir, "atm")
410 }
411
412 pub fn from_home_for_tool(home_dir: &Path, tool: &str) -> Self {
413 let log_path = std::env::var("ATM_LOG_FILE")
414 .or_else(|_| std::env::var("ATM_LOG_PATH"))
415 .map(PathBuf::from)
416 .unwrap_or_else(|_| Self::canonical_log_path(home_dir, tool));
417
418 let spool_dir =
419 if std::env::var("ATM_LOG_FILE").is_ok() || std::env::var("ATM_LOG_PATH").is_ok() {
420 Self::spool_dir_from_log_path(&log_path)
421 } else {
422 Self::canonical_spool_dir(home_dir, tool)
423 };
424 let level = std::env::var("ATM_LOG")
425 .ok()
426 .and_then(|v| LogLevel::from_str(&v).ok())
427 .unwrap_or(LogLevel::Info);
428 let message_preview_enabled = std::env::var("ATM_LOG_MSG")
429 .ok()
430 .map(|v| v.trim() == "1")
431 .unwrap_or(false);
432 let max_bytes = std::env::var("ATM_LOG_MAX_BYTES")
433 .ok()
434 .and_then(|v| v.parse::<u64>().ok())
435 .unwrap_or(DEFAULT_MAX_BYTES);
436 let max_files = std::env::var("ATM_LOG_MAX_FILES")
437 .ok()
438 .and_then(|v| v.parse::<u32>().ok())
439 .unwrap_or(DEFAULT_MAX_FILES);
440 let retention_days = std::env::var("ATM_LOG_RETENTION_DAYS")
441 .ok()
442 .and_then(|v| v.parse::<u32>().ok())
443 .filter(|days| *days > 0)
444 .unwrap_or(DEFAULT_RETENTION_DAYS);
445
446 Self {
447 log_path,
448 spool_dir,
449 level,
450 message_preview_enabled,
451 max_bytes,
452 max_files,
453 retention_days,
454 queue_capacity: DEFAULT_QUEUE_CAPACITY,
455 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
456 }
457 }
458}
459
460#[derive(Debug, Error)]
461pub enum LoggerError {
462 #[error("event validation failed: {0}")]
463 Validation(#[from] ValidationError),
464 #[error("failed to serialize log event: {0}")]
465 Serialize(#[from] serde_json::Error),
466 #[error("i/o error: {0}")]
467 Io(#[from] std::io::Error),
468 #[error("event exceeds configured size guard: {size} > {max}")]
469 EventTooLarge { size: usize, max: usize },
470}
471
472#[derive(Debug, Clone)]
473pub struct Logger {
474 config: LogConfig,
475 otel: OtelPipeline,
476}
477
478pub fn redact_event(event: &mut LogEventV1) {
480 event.redact();
481}
482
483impl Logger {
484 pub fn new(config: LogConfig) -> Self {
485 let otel = OtelPipeline::new_default(&config.log_path);
486 Self { config, otel }
487 }
488
489 pub fn with_otel_exporter(
490 config: LogConfig,
491 otel_config: OtelConfig,
492 exporter: Arc<dyn OtelExporter>,
493 ) -> Self {
494 Self {
495 config,
496 otel: OtelPipeline {
497 config: otel_config,
498 exporter,
499 sleeper: std::thread::sleep,
500 },
501 }
502 }
503
504 pub fn config(&self) -> &LogConfig {
505 &self.config
506 }
507
508 pub fn emit(&self, event: &LogEventV1) -> Result<(), LoggerError> {
515 let line = self.prepare_line(event)?;
516 self.append_line_to_canonical(&line)?;
517 let _ = self.otel.export_event(event);
518 Ok(())
519 }
520
521 pub fn emit_action(
526 &self,
527 source_binary: &str,
528 target: &str,
529 action: &str,
530 outcome: Option<&str>,
531 fields: serde_json::Value,
532 ) -> Result<(), LoggerError> {
533 let mut event = LogEventV1::builder(source_binary, action, target)
534 .level(self.config.level.as_str())
535 .build();
536 event.outcome = outcome.map(ToOwned::to_owned);
537 event.fields = value_to_map(fields);
538 self.emit(&event)
539 }
540 pub fn emit_human(
551 &self,
552 level: &str,
553 action: &str,
554 outcome: &str,
555 fields: &serde_json::Value,
556 ) -> Result<(), LoggerError> {
557 use chrono::{SecondsFormat, Utc};
558 let ts = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
559 let fields_json = serde_json::to_string(fields).unwrap_or_else(|_| "{}".to_string());
560 let line =
561 format!("{ts} level={level} action={action} outcome={outcome} fields={fields_json}");
562 self.append_line_to_canonical(&line)?;
563 Ok(())
564 }
565
566 pub fn write_to_spool(
573 &self,
574 event: &LogEventV1,
575 unix_millis: u128,
576 ) -> Result<PathBuf, LoggerError> {
577 let line = self.prepare_line(event)?;
578 fs::create_dir_all(&self.config.spool_dir)?;
579
580 let name = spool_file_name(&event.source_binary, event.pid, unix_millis);
581 let path = self.config.spool_dir.join(name);
582 let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
583 writeln!(file, "{line}")?;
584 Ok(path)
585 }
586
587 pub fn merge_spool(&self) -> Result<u64, LoggerError> {
597 if !self.config.spool_dir.exists() {
598 return Ok(0);
599 }
600
601 let mut spool_files: Vec<PathBuf> = fs::read_dir(&self.config.spool_dir)?
602 .filter_map(|entry| entry.ok().map(|e| e.path()))
603 .filter(|path| {
604 path.is_file()
605 && path
606 .extension()
607 .and_then(|ext| ext.to_str())
608 .map(|ext| ext == "jsonl" || ext == "claiming")
609 .unwrap_or(false)
610 })
611 .collect();
612 spool_files.sort();
613
614 let mut claimed_files: Vec<PathBuf> = Vec::new();
615 let mut events: Vec<(LogEventV1, String)> = Vec::new();
616
617 for path in spool_files {
618 let claiming = if path
619 .extension()
620 .and_then(|ext| ext.to_str())
621 .is_some_and(|ext| ext == "claiming")
622 {
623 path.clone()
624 } else {
625 let claiming = path.with_extension("claiming");
626 if fs::rename(&path, &claiming).is_err() {
627 continue;
628 }
629 claiming
630 };
631 let ordering_key = claiming
632 .file_name()
633 .and_then(|n| n.to_str())
634 .unwrap_or_default()
635 .to_string();
636
637 let content = match fs::read_to_string(&claiming) {
638 Ok(content) => content,
639 Err(_) => {
640 let _ = fs::remove_file(&claiming);
641 continue;
642 }
643 };
644 for line in content.lines() {
645 let trimmed = line.trim();
646 if trimmed.is_empty() {
647 continue;
648 }
649 if let Ok(event) = serde_json::from_str::<LogEventV1>(trimmed) {
650 events.push((event, ordering_key.clone()));
651 }
652 }
653 claimed_files.push(claiming);
654 }
655
656 events.sort_by(|(a, file_a), (b, file_b)| a.ts.cmp(&b.ts).then(file_a.cmp(file_b)));
657
658 let mut merged = 0_u64;
659 for (event, _) in events {
660 let line = serde_json::to_string(&event)?;
661 if line.len() > self.config.max_event_bytes {
662 continue;
663 }
664 self.append_line_to_canonical(&line)?;
665 merged += 1;
666 }
667
668 for claimed in claimed_files {
669 let _ = fs::remove_file(claimed);
670 }
671
672 Ok(merged)
673 }
674
675 fn prepare_line(&self, event: &LogEventV1) -> Result<String, LoggerError> {
676 let mut event = event.clone();
677 event.validate()?;
678 redact_event(&mut event);
679 let line = serde_json::to_string(&event)?;
680 let size = line.len();
681 if size > self.config.max_event_bytes {
682 return Err(LoggerError::EventTooLarge {
683 size,
684 max: self.config.max_event_bytes,
685 });
686 }
687 Ok(line)
688 }
689
690 fn append_line_to_canonical(&self, line: &str) -> Result<(), LoggerError> {
691 static CANONICAL_APPEND_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
692 let lock = CANONICAL_APPEND_LOCK.get_or_init(|| Mutex::new(()));
693 let _guard = lock.lock().expect("canonical append lock poisoned");
694
695 if let Some(parent) = self.config.log_path.parent() {
696 fs::create_dir_all(parent)?;
697 }
698
699 self.rotate_if_needed()?;
700
701 let mut file = OpenOptions::new()
702 .create(true)
703 .append(true)
704 .open(&self.config.log_path)?;
705 writeln!(file, "{line}")?;
706 Ok(())
707 }
708
709 fn rotate_if_needed(&self) -> Result<(), LoggerError> {
710 let current_size = fs::metadata(&self.config.log_path)
711 .map(|m| m.len())
712 .unwrap_or(0);
713 if current_size < self.config.max_bytes {
714 return Ok(());
715 }
716 rotate_log_files(&self.config.log_path, self.config.max_files)?;
717 Ok(())
718 }
719}
720
721pub fn export_otel_best_effort_from_path(log_path: &Path, event: &LogEventV1) {
730 let pipeline = OtelPipeline::new_default(log_path);
731 export_otel_best_effort_with_pipeline(&pipeline, event);
732}
733
734fn export_otel_best_effort_with_pipeline(pipeline: &OtelPipeline, event: &LogEventV1) {
735 let _ = pipeline.export_event(event);
736}
737
738pub fn spool_file_name(source_binary: &str, pid: u32, unix_millis: u128) -> String {
739 let sanitized = sanitize_source_binary(source_binary);
740 format!("{}-{}-{}.jsonl", sanitized, pid, unix_millis)
741}
742
743fn sanitize_source_binary(source_binary: &str) -> String {
744 let mut out = String::with_capacity(source_binary.len());
745 for ch in source_binary.chars() {
746 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
747 out.push(ch);
748 } else {
749 out.push('_');
750 }
751 }
752 if out.is_empty() {
753 "unknown".to_string()
754 } else {
755 out
756 }
757}
758
759fn rotate_log_files(base: &Path, max_files: u32) -> Result<(), LoggerError> {
760 if max_files == 0 {
761 let _ = fs::remove_file(base);
762 return Ok(());
763 }
764
765 let oldest = rotation_path(base, max_files);
766 let _ = fs::remove_file(&oldest);
767
768 for idx in (1..max_files).rev() {
769 let from = rotation_path(base, idx);
770 let to = rotation_path(base, idx + 1);
771 if from.exists() {
772 let _ = fs::rename(&from, &to);
773 }
774 }
775
776 if base.exists() {
777 let first = rotation_path(base, 1);
778 fs::rename(base, first)?;
779 }
780 Ok(())
781}
782
783fn rotation_path(base: &Path, n: u32) -> PathBuf {
784 let mut os = base.as_os_str().to_os_string();
785 os.push(format!(".{n}"));
786 PathBuf::from(os)
787}
788
789fn value_to_map(value: serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
790 match value {
791 serde_json::Value::Object(map) => map,
792 other => {
793 let mut map = serde_json::Map::new();
794 map.insert("value".to_string(), other);
795 map
796 }
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use agent_team_mail_core::logging_event::new_log_event;
804 use serial_test::serial;
805 use std::sync::Mutex;
806 use std::sync::OnceLock;
807 use std::sync::atomic::{AtomicUsize, Ordering};
808 use tempfile::TempDir;
809
810 #[derive(Default)]
811 struct CountingExporter {
812 attempts: AtomicUsize,
813 fail_for: AtomicUsize,
814 records: Mutex<Vec<OtelRecord>>,
815 }
816
817 impl CountingExporter {
818 fn with_failures(failures: usize) -> Self {
819 Self {
820 attempts: AtomicUsize::new(0),
821 fail_for: AtomicUsize::new(failures),
822 records: Mutex::new(Vec::new()),
823 }
824 }
825 }
826
827 impl OtelExporter for CountingExporter {
828 fn export(&self, record: &OtelRecord) -> Result<(), OtelError> {
829 let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
830 let fail_for = self.fail_for.load(Ordering::SeqCst);
831 if attempt <= fail_for {
832 return Err(OtelError::ExportFailed(
833 "simulated transport outage".to_string(),
834 ));
835 }
836 self.records
837 .lock()
838 .expect("records lock")
839 .push(record.clone());
840 Ok(())
841 }
842 }
843
844 fn make_event(ts: &str) -> LogEventV1 {
845 let mut event = new_log_event("atm", "test_action", "atm::test", "info");
846 event.ts = ts.to_string();
847 event
848 }
849
850 static BACKOFF_SLEEPS_MS: OnceLock<Mutex<Vec<u64>>> = OnceLock::new();
851
852 fn record_sleep(duration: Duration) {
853 BACKOFF_SLEEPS_MS
854 .get_or_init(|| Mutex::new(Vec::new()))
855 .lock()
856 .expect("backoff sleeps lock")
857 .push(duration.as_millis() as u64);
858 }
859
860 #[test]
861 #[serial]
862 fn config_defaults_and_env_overrides() {
863 let tmp = TempDir::new().expect("temp dir");
864 let custom_log = tmp.path().join("custom-atm.log");
865 let home_root = tmp.path().join("home-root");
866 unsafe {
868 std::env::set_var("ATM_LOG", "debug");
869 std::env::set_var("ATM_LOG_MSG", "1");
870 std::env::set_var("ATM_LOG_FILE", &custom_log);
871 std::env::set_var("ATM_LOG_MAX_BYTES", "1024");
872 std::env::set_var("ATM_LOG_MAX_FILES", "7");
873 std::env::set_var("ATM_LOG_RETENTION_DAYS", "9");
874 }
875 let cfg = LogConfig::from_home(&home_root);
876 assert_eq!(cfg.level, LogLevel::Debug);
877 assert!(cfg.message_preview_enabled);
878 assert_eq!(cfg.log_path, custom_log);
879 assert_eq!(cfg.spool_dir, tmp.path().join("spool"));
880 assert_eq!(cfg.max_bytes, 1024);
881 assert_eq!(cfg.max_files, 7);
882 assert_eq!(cfg.retention_days, 9);
883 assert_eq!(cfg.queue_capacity, DEFAULT_QUEUE_CAPACITY);
884 assert_eq!(cfg.max_event_bytes, DEFAULT_MAX_EVENT_BYTES);
885 unsafe {
887 std::env::remove_var("ATM_LOG");
888 std::env::remove_var("ATM_LOG_MSG");
889 std::env::remove_var("ATM_LOG_FILE");
890 std::env::remove_var("ATM_LOG_MAX_BYTES");
891 std::env::remove_var("ATM_LOG_MAX_FILES");
892 std::env::remove_var("ATM_LOG_RETENTION_DAYS");
893 }
894 }
895
896 #[test]
897 #[serial]
898 fn config_default_paths_follow_tool_scoped_contract() {
899 let tmp = TempDir::new().expect("temp dir");
900 unsafe {
902 std::env::remove_var("ATM_LOG_FILE");
903 std::env::remove_var("ATM_LOG_PATH");
904 }
905
906 let cfg = LogConfig::from_home_for_tool(tmp.path(), "atm-daemon");
907 assert_eq!(
908 cfg.log_path,
909 tmp.path()
910 .join(".config/atm/logs/atm-daemon/atm-daemon.log.jsonl")
911 );
912 assert_eq!(
913 cfg.spool_dir,
914 tmp.path().join(".config/atm/logs/atm-daemon/spool")
915 );
916 }
917
918 #[test]
919 fn span_ref_v1_round_trip_serialization() {
920 let span = SpanRefV1 {
921 name: "compose".to_string(),
922 trace_id: "trace-123".to_string(),
923 span_id: "span-456".to_string(),
924 parent_span_id: None,
925 fields: serde_json::Map::new(),
926 };
927 let json = serde_json::to_string(&span).expect("serialize span");
928 let decoded: SpanRefV1 = serde_json::from_str(&json).expect("deserialize span");
929 assert_eq!(decoded, span);
930 }
931
932 #[test]
933 fn span_ref_v1_fields_are_non_empty_after_construction() {
934 let span = SpanRefV1 {
935 name: "compose".to_string(),
936 trace_id: "trace-abc".to_string(),
937 span_id: "span-def".to_string(),
938 parent_span_id: None,
939 fields: serde_json::Map::new(),
940 };
941 assert!(!span.trace_id.is_empty());
942 assert!(!span.span_id.is_empty());
943 }
944
945 #[test]
946 fn spool_filename_format_matches_contract() {
947 let name = spool_file_name("atm-daemon", 44201, 123456789);
948 assert_eq!(name, "atm-daemon-44201-123456789.jsonl");
949 }
950
951 #[test]
952 fn spool_filename_sanitizes_windows_unsafe_chars() {
953 let name = spool_file_name(r"atm\daemon:core?*", 44201, 123456789);
954 assert_eq!(name, "atm_daemon_core__-44201-123456789.jsonl");
955 }
956
957 #[test]
958 fn emit_rotates_file() {
959 let tmp = TempDir::new().expect("temp dir");
960 let cfg = LogConfig {
961 log_path: tmp.path().join("atm.log.jsonl"),
962 spool_dir: tmp.path().join("log-spool"),
963 level: LogLevel::Info,
964 message_preview_enabled: false,
965 max_bytes: 1,
966 max_files: 2,
967 retention_days: DEFAULT_RETENTION_DAYS,
968 queue_capacity: DEFAULT_QUEUE_CAPACITY,
969 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
970 };
971 let logger = Logger::new(cfg);
972
973 let ev1 = make_event("2026-03-09T00:00:01Z");
974 logger.emit(&ev1).expect("first emit");
975 let ev2 = make_event("2026-03-09T00:00:02Z");
976 logger.emit(&ev2).expect("second emit");
977
978 assert!(logger.config.log_path.exists());
979 assert!(rotation_path(&logger.config.log_path, 1).exists());
980 }
981
982 #[test]
983 fn emit_rejects_event_larger_than_configured_guard() {
984 let tmp = TempDir::new().expect("temp dir");
985 let cfg = LogConfig {
986 log_path: tmp.path().join("atm.log.jsonl"),
987 spool_dir: tmp.path().join("log-spool"),
988 level: LogLevel::Info,
989 message_preview_enabled: false,
990 max_bytes: DEFAULT_MAX_BYTES,
991 max_files: DEFAULT_MAX_FILES,
992 retention_days: DEFAULT_RETENTION_DAYS,
993 queue_capacity: DEFAULT_QUEUE_CAPACITY,
994 max_event_bytes: 256,
995 };
996 let logger = Logger::new(cfg);
997
998 let mut event = make_event("2026-03-09T00:00:01Z");
999 event.fields.insert(
1000 "blob".to_string(),
1001 serde_json::Value::String("x".repeat(2048)),
1002 );
1003 let err = logger.emit(&event).expect_err("expected size guard error");
1004 assert!(matches!(err, LoggerError::EventTooLarge { .. }));
1005 }
1006
1007 #[test]
1008 fn merge_spool_sorts_by_timestamp_and_deletes_claimed_files() {
1009 let tmp = TempDir::new().expect("temp dir");
1010 let cfg = LogConfig {
1011 log_path: tmp.path().join("atm.log.jsonl"),
1012 spool_dir: tmp.path().join("log-spool"),
1013 level: LogLevel::Info,
1014 message_preview_enabled: false,
1015 max_bytes: DEFAULT_MAX_BYTES,
1016 max_files: DEFAULT_MAX_FILES,
1017 retention_days: DEFAULT_RETENTION_DAYS,
1018 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1019 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1020 };
1021 let logger = Logger::new(cfg.clone());
1022
1023 let ev_late = make_event("2026-03-09T00:00:05Z");
1024 let ev_early = make_event("2026-03-09T00:00:01Z");
1025 logger
1026 .write_to_spool(&ev_late, 2000)
1027 .expect("write late spool");
1028 logger
1029 .write_to_spool(&ev_early, 1000)
1030 .expect("write early spool");
1031
1032 let merged = logger.merge_spool().expect("merge spool");
1033 assert_eq!(merged, 2);
1034
1035 let lines: Vec<String> = fs::read_to_string(&cfg.log_path)
1036 .expect("read canonical log")
1037 .lines()
1038 .map(str::to_string)
1039 .collect();
1040 assert_eq!(lines.len(), 2);
1041 let parsed0: LogEventV1 = serde_json::from_str(&lines[0]).expect("line 0 parse");
1042 let parsed1: LogEventV1 = serde_json::from_str(&lines[1]).expect("line 1 parse");
1043 assert_eq!(parsed0.ts, "2026-03-09T00:00:01Z");
1044 assert_eq!(parsed1.ts, "2026-03-09T00:00:05Z");
1045
1046 let leftover: Vec<_> = fs::read_dir(&cfg.spool_dir)
1047 .expect("spool dir")
1048 .filter_map(|e| e.ok())
1049 .collect();
1050 assert!(
1051 leftover.is_empty(),
1052 "spool files should be deleted after merge"
1053 );
1054 }
1055
1056 #[test]
1057 fn merge_spool_recovers_stale_claiming_files() {
1058 let tmp = TempDir::new().expect("temp dir");
1059 let cfg = LogConfig {
1060 log_path: tmp.path().join("atm.log.jsonl"),
1061 spool_dir: tmp.path().join("log-spool"),
1062 level: LogLevel::Info,
1063 message_preview_enabled: false,
1064 max_bytes: DEFAULT_MAX_BYTES,
1065 max_files: DEFAULT_MAX_FILES,
1066 retention_days: DEFAULT_RETENTION_DAYS,
1067 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1068 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1069 };
1070 fs::create_dir_all(&cfg.spool_dir).expect("create spool dir");
1071 let stale_claiming = cfg.spool_dir.join("atm-44201-1000.claiming");
1072 let ev = make_event("2026-03-09T00:00:01Z");
1073 fs::write(
1074 &stale_claiming,
1075 format!("{}\n", serde_json::to_string(&ev).expect("serialize")),
1076 )
1077 .expect("write stale claiming");
1078
1079 let logger = Logger::new(cfg.clone());
1080 let merged = logger.merge_spool().expect("merge spool");
1081 assert_eq!(merged, 1);
1082 assert!(!stale_claiming.exists());
1083
1084 let log_content = fs::read_to_string(&cfg.log_path).expect("read log");
1085 let lines: Vec<_> = log_content.lines().collect();
1086 assert_eq!(lines.len(), 1);
1087 }
1088
1089 #[test]
1090 fn write_to_spool_creates_dir_and_appends() {
1091 let tmp = TempDir::new().expect("temp dir");
1092 let cfg = LogConfig {
1093 log_path: tmp.path().join("atm.log.jsonl"),
1094 spool_dir: tmp.path().join("log-spool"),
1095 level: LogLevel::Info,
1096 message_preview_enabled: false,
1097 max_bytes: DEFAULT_MAX_BYTES,
1098 max_files: DEFAULT_MAX_FILES,
1099 retention_days: DEFAULT_RETENTION_DAYS,
1100 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1101 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1102 };
1103 let logger = Logger::new(cfg);
1104 let ev = make_event("2026-03-09T00:00:01Z");
1105 let path1 = logger.write_to_spool(&ev, 1000).expect("spool write 1");
1106 let path2 = logger.write_to_spool(&ev, 1000).expect("spool write 2");
1107 assert_eq!(path1, path2);
1108 let spool_content = fs::read_to_string(path1).expect("read spool");
1109 let lines: Vec<_> = spool_content.lines().collect();
1110 assert_eq!(lines.len(), 2);
1111 }
1112
1113 #[test]
1114 fn rotate_log_files_max_files_zero_removes_base() {
1115 let tmp = TempDir::new().expect("temp dir");
1116 let base = tmp.path().join("atm.log.jsonl");
1117 fs::write(&base, "line\n").expect("write base");
1118 rotate_log_files(&base, 0).expect("rotate");
1119 assert!(!base.exists());
1120 }
1121
1122 #[test]
1123 fn rotate_log_files_evicts_oldest_when_limit_reached() {
1124 let tmp = TempDir::new().expect("temp dir");
1125 let base = tmp.path().join("atm.log.jsonl");
1126 fs::write(&base, "base\n").expect("write base");
1127 fs::write(rotation_path(&base, 1), "one\n").expect("write .1");
1128 fs::write(rotation_path(&base, 2), "two\n").expect("write .2");
1129
1130 rotate_log_files(&base, 2).expect("rotate");
1131
1132 assert_eq!(
1133 fs::read_to_string(rotation_path(&base, 1)).expect("read .1"),
1134 "base\n"
1135 );
1136 assert_eq!(
1137 fs::read_to_string(rotation_path(&base, 2)).expect("read .2"),
1138 "one\n"
1139 );
1140 assert!(!rotation_path(&base, 3).exists());
1141 }
1142
1143 #[test]
1144 fn socket_error_codes_match_contract() {
1145 assert_eq!(SOCKET_ERROR_VERSION_MISMATCH, "VERSION_MISMATCH");
1146 assert_eq!(SOCKET_ERROR_INVALID_PAYLOAD, "INVALID_PAYLOAD");
1147 assert_eq!(SOCKET_ERROR_INTERNAL_ERROR, "INTERNAL_ERROR");
1148 }
1149
1150 #[test]
1151 fn emit_action_writes_schema_compatible_event() {
1152 let tmp = TempDir::new().expect("temp dir");
1153 let cfg = LogConfig {
1154 log_path: tmp.path().join("atm.log.jsonl"),
1155 spool_dir: tmp.path().join("log-spool"),
1156 level: LogLevel::Info,
1157 message_preview_enabled: false,
1158 max_bytes: DEFAULT_MAX_BYTES,
1159 max_files: DEFAULT_MAX_FILES,
1160 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1161 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1162 retention_days: DEFAULT_RETENTION_DAYS,
1163 };
1164 let logger = Logger::new(cfg.clone());
1165
1166 logger
1167 .emit_action(
1168 "sc-compose",
1169 "sc_compose::cli",
1170 "command_end",
1171 Some("success"),
1172 serde_json::json!({"code": 0}),
1173 )
1174 .expect("emit action");
1175
1176 let lines: Vec<_> = fs::read_to_string(&cfg.log_path)
1177 .expect("read log")
1178 .lines()
1179 .map(str::to_string)
1180 .collect();
1181 assert_eq!(lines.len(), 1);
1182 let parsed: LogEventV1 = serde_json::from_str(&lines[0]).expect("parse event");
1183 assert_eq!(parsed.source_binary, "sc-compose");
1184 assert_eq!(parsed.action, "command_end");
1185 assert_eq!(parsed.outcome.as_deref(), Some("success"));
1186 assert_eq!(parsed.fields.get("code").and_then(|v| v.as_u64()), Some(0));
1187 }
1188
1189 #[test]
1190 #[serial]
1191 fn otel_default_on_env_override_supported() {
1192 unsafe {
1194 std::env::remove_var("ATM_OTEL_ENABLED");
1195 }
1196 let default_cfg = OtelConfig::from_env();
1197 assert!(default_cfg.enabled, "OTel should be enabled by default");
1198
1199 unsafe {
1201 std::env::set_var("ATM_OTEL_ENABLED", "false");
1202 }
1203 let disabled_cfg = OtelConfig::from_env();
1204 assert!(
1205 !disabled_cfg.enabled,
1206 "ATM_OTEL_ENABLED=false should disable exporter"
1207 );
1208 unsafe {
1210 std::env::remove_var("ATM_OTEL_ENABLED");
1211 }
1212 }
1213
1214 #[test]
1215 fn emit_is_fail_open_when_otel_exporter_fails() {
1216 let tmp = TempDir::new().expect("temp dir");
1217 let cfg = LogConfig {
1218 log_path: tmp.path().join("atm.log.jsonl"),
1219 spool_dir: tmp.path().join("log-spool"),
1220 level: LogLevel::Info,
1221 message_preview_enabled: false,
1222 max_bytes: DEFAULT_MAX_BYTES,
1223 max_files: DEFAULT_MAX_FILES,
1224 retention_days: DEFAULT_RETENTION_DAYS,
1225 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1226 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1227 };
1228 let exporter = Arc::new(CountingExporter::with_failures(10));
1229 let logger = Logger::with_otel_exporter(
1230 cfg.clone(),
1231 OtelConfig {
1232 enabled: true,
1233 max_retries: 2,
1234 initial_backoff_ms: 0,
1235 max_backoff_ms: 0,
1236 },
1237 exporter.clone(),
1238 );
1239
1240 let event = new_log_event("atm", "send_message", "atm::send", "info");
1241 logger.emit(&event).expect("emit should not fail");
1242
1243 let log_lines = fs::read_to_string(&cfg.log_path).expect("canonical log should exist");
1244 assert!(
1245 !log_lines.trim().is_empty(),
1246 "canonical log should be written"
1247 );
1248 assert_eq!(
1249 exporter.attempts.load(Ordering::SeqCst),
1250 3,
1251 "initial attempt + 2 retries"
1252 );
1253 assert!(
1254 exporter.records.lock().expect("records lock").is_empty(),
1255 "all export attempts should fail in this test"
1256 );
1257 }
1258
1259 #[test]
1260 fn export_otel_best_effort_from_path_is_fail_open_when_export_fails() {
1261 let tmp = TempDir::new().expect("temp dir");
1262 let parent_file = tmp.path().join("not-a-directory");
1263 std::fs::write(&parent_file, "occupied").expect("create parent file");
1264 let log_path = parent_file.join("atm.log.jsonl");
1265 let event = new_log_event("atm-daemon", "register_hint", "atm_daemon::socket", "info");
1266
1267 export_otel_best_effort_from_path(&log_path, &event);
1269 }
1270
1271 #[test]
1272 fn otel_exporter_retries_then_succeeds() {
1273 let tmp = TempDir::new().expect("temp dir");
1274 let cfg = LogConfig {
1275 log_path: tmp.path().join("atm.log.jsonl"),
1276 spool_dir: tmp.path().join("log-spool"),
1277 level: LogLevel::Info,
1278 message_preview_enabled: false,
1279 max_bytes: DEFAULT_MAX_BYTES,
1280 max_files: DEFAULT_MAX_FILES,
1281 retention_days: DEFAULT_RETENTION_DAYS,
1282 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1283 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1284 };
1285 let exporter = Arc::new(CountingExporter::with_failures(2));
1286 let logger = Logger::with_otel_exporter(
1287 cfg,
1288 OtelConfig {
1289 enabled: true,
1290 max_retries: 4,
1291 initial_backoff_ms: 0,
1292 max_backoff_ms: 0,
1293 },
1294 exporter.clone(),
1295 );
1296
1297 let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1298 event.team = Some("atm-dev".to_string());
1299 event.agent = Some("arch-ctm".to_string());
1300 event.runtime = Some("codex".to_string());
1301 event.session_id = Some("local:arch-ctm:123".to_string());
1302 event.trace_id = Some("trace-123".to_string());
1303 event.span_id = Some("span-456".to_string());
1304 event.subagent_id = Some("subagent-7".to_string());
1305
1306 logger.emit(&event).expect("emit should succeed");
1307 assert_eq!(
1308 exporter.attempts.load(Ordering::SeqCst),
1309 3,
1310 "2 failures + 1 success"
1311 );
1312 assert_eq!(
1313 exporter.records.lock().expect("records lock").len(),
1314 1,
1315 "record should be exported after retries"
1316 );
1317 }
1318
1319 #[test]
1320 fn producer_events_export_through_pipeline_with_counting_exporter() {
1321 let tmp = TempDir::new().expect("temp dir");
1322 let cfg = LogConfig {
1323 log_path: tmp.path().join("atm.log.jsonl"),
1324 spool_dir: tmp.path().join("log-spool"),
1325 level: LogLevel::Info,
1326 message_preview_enabled: false,
1327 max_bytes: DEFAULT_MAX_BYTES,
1328 max_files: DEFAULT_MAX_FILES,
1329 retention_days: DEFAULT_RETENTION_DAYS,
1330 queue_capacity: DEFAULT_QUEUE_CAPACITY,
1331 max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
1332 };
1333 let exporter = Arc::new(CountingExporter::with_failures(0));
1334 let logger = Logger::with_otel_exporter(
1335 cfg,
1336 OtelConfig {
1337 enabled: true,
1338 max_retries: 0,
1339 initial_backoff_ms: 0,
1340 max_backoff_ms: 0,
1341 },
1342 exporter.clone(),
1343 );
1344
1345 for (idx, source, action) in [
1346 (1u8, "atm", "send"),
1347 (2u8, "atm-daemon", "register_hint"),
1348 (3u8, "sc-composer", "compose"),
1349 ] {
1350 let mut event = new_log_event(source, action, "atm::test", "info");
1351 event.team = Some("atm-dev".to_string());
1352 event.agent = Some("arch-ctm".to_string());
1353 event.runtime = Some("codex".to_string());
1354 event.session_id = Some("sess-123".to_string());
1355 event.trace_id = Some("trace-123".to_string());
1356 event.span_id = Some(format!("span-{idx}"));
1357 logger.emit(&event).expect("emit should succeed");
1358 }
1359
1360 let records = exporter.records.lock().expect("records lock");
1361 assert_eq!(records.len(), 3, "all producer events should export");
1362 assert_eq!(
1363 records.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
1364 vec![
1365 "send".to_string(),
1366 "register_hint".to_string(),
1367 "compose".to_string()
1368 ]
1369 );
1370 }
1371
1372 #[test]
1373 fn export_otel_best_effort_is_public_and_fail_open() {
1374 let exporter = CountingExporter::with_failures(10);
1375 let event = new_log_event("atm", "send_message", "atm::send", "info");
1376 export_otel_best_effort(
1377 &event,
1378 &OtelConfig {
1379 enabled: true,
1380 max_retries: 2,
1381 initial_backoff_ms: 0,
1382 max_backoff_ms: 0,
1383 },
1384 &exporter,
1385 );
1386
1387 assert_eq!(
1388 exporter.attempts.load(Ordering::SeqCst),
1389 3,
1390 "initial attempt + 2 retries"
1391 );
1392 }
1393
1394 #[test]
1395 fn otel_retry_backoff_is_bounded_by_max_backoff() {
1396 let sleeps = BACKOFF_SLEEPS_MS.get_or_init(|| Mutex::new(Vec::new()));
1397 sleeps.lock().expect("backoff sleeps lock").clear();
1398
1399 let exporter = CountingExporter::with_failures(10);
1400 let event = new_log_event("atm", "send_message", "atm::send", "info");
1401 let err = export_otel_with_retry(
1402 &event,
1403 &OtelConfig {
1404 enabled: true,
1405 max_retries: 4,
1406 initial_backoff_ms: 5,
1407 max_backoff_ms: 12,
1408 },
1409 &exporter,
1410 record_sleep,
1411 )
1412 .expect_err("should return final export error");
1413 assert!(matches!(err, OtelError::ExportFailed(_)));
1414
1415 let sleeps = sleeps.lock().expect("backoff sleeps lock").clone();
1416 assert_eq!(sleeps, vec![5, 10, 12, 12]);
1417 assert!(
1418 sleeps.iter().all(|v| *v <= 12),
1419 "sleep exceeded max_backoff"
1420 );
1421 }
1422
1423 #[test]
1424 fn build_otel_record_requires_runtime_for_runtime_scoped_events() {
1425 let mut event = new_log_event("atm", "send_message", "atm::send", "info");
1426 event.team = Some("atm-dev".to_string());
1427 event.agent = Some("arch-ctm".to_string());
1428 event.session_id = Some("local:arch-ctm".to_string());
1429 let err = build_otel_record(&event).expect_err("runtime should be required");
1432 assert_eq!(err, OtelError::MissingRequiredField { field: "runtime" });
1433 }
1434
1435 #[test]
1436 fn build_otel_record_requires_subagent_id_for_subagent_actions() {
1437 let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1438 event.team = Some("atm-dev".to_string());
1439 event.agent = Some("arch-ctm".to_string());
1440 event.runtime = Some("codex".to_string());
1441 event.session_id = Some("local:arch-ctm".to_string());
1442 event.trace_id = Some("trace-123".to_string());
1443 event.span_id = Some("span-456".to_string());
1444 let err = build_otel_record(&event).expect_err("subagent_id should be required");
1447 assert_eq!(
1448 err,
1449 OtelError::MissingRequiredField {
1450 field: "subagent_id"
1451 }
1452 );
1453 }
1454
1455 #[test]
1456 fn build_otel_record_requires_full_span_context_when_partial() {
1457 let mut event = new_log_event("atm", "send_message", "atm::send", "info");
1458 event.trace_id = Some("trace-123".to_string());
1459 let err = build_otel_record(&event).expect_err("partial span context should fail");
1461 assert_eq!(err, OtelError::InvalidSpanContext);
1462 }
1463
1464 #[test]
1465 fn build_otel_record_includes_required_correlation_attributes() {
1466 let mut event = new_log_event("atm", "subagent.run", "atm::runtime", "info");
1467 event.team = Some("atm-dev".to_string());
1468 event.agent = Some("arch-ctm".to_string());
1469 event.runtime = Some("codex".to_string());
1470 event.session_id = Some("local:arch-ctm".to_string());
1471 event.trace_id = Some("trace-123".to_string());
1472 event.span_id = Some("span-456".to_string());
1473 event.subagent_id = Some("subagent-7".to_string());
1474
1475 let record = build_otel_record(&event).expect("record should build");
1476 assert_eq!(record.trace_id.as_deref(), Some("trace-123"));
1477 assert_eq!(record.span_id.as_deref(), Some("span-456"));
1478 assert_eq!(
1479 record.attributes.get("team").and_then(|v| v.as_str()),
1480 Some("atm-dev")
1481 );
1482 assert_eq!(
1483 record.attributes.get("agent").and_then(|v| v.as_str()),
1484 Some("arch-ctm")
1485 );
1486 assert_eq!(
1487 record.attributes.get("runtime").and_then(|v| v.as_str()),
1488 Some("codex")
1489 );
1490 assert_eq!(
1491 record.attributes.get("session_id").and_then(|v| v.as_str()),
1492 Some("local:arch-ctm")
1493 );
1494 assert_eq!(
1495 record
1496 .attributes
1497 .get("subagent_id")
1498 .and_then(|v| v.as_str()),
1499 Some("subagent-7")
1500 );
1501 }
1502}