1use aptu_coder_core::lang::language_for_extension;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::io::AsyncWriteExt;
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19#[serde(default)]
20pub struct MetricEvent {
21 pub ts: u64,
22 pub tool: &'static str,
23 pub duration_ms: u64,
24 pub output_chars: usize,
25 pub param_path_depth: usize,
26 pub max_depth: Option<u32>,
27 pub result: &'static str,
28 pub error_type: Option<String>,
29 #[serde(default, skip_serializing_if = "Option::is_none")]
30 pub error_subtype: Option<String>,
31 #[serde(default)]
32 pub session_id: Option<String>,
33 #[serde(default)]
34 pub seq: Option<u32>,
35 #[serde(default)]
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub cache_hit: Option<bool>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub cache_tier: Option<&'static str>,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub cache_write_failure: Option<bool>,
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub exit_code: Option<i32>,
46 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
47 pub timed_out: bool,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub output_truncated: Option<bool>,
50 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
54 pub chars_threshold_breach: bool,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub file_ext: Option<&'static str>,
60}
61
62#[derive(Clone)]
64pub struct MetricsSender(pub tokio::sync::mpsc::UnboundedSender<MetricEvent>);
65
66impl MetricsSender {
67 pub fn send(&self, event: MetricEvent) {
68 let _ = self.0.send(event);
69 }
70}
71
72pub struct MetricsWriter {
74 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
75 base_dir: PathBuf,
76 dir_created: bool,
77}
78
79impl MetricsWriter {
80 pub fn new(
81 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
82 base_dir: Option<PathBuf>,
83 ) -> Self {
84 let dir = base_dir.unwrap_or_else(xdg_metrics_dir);
85 Self {
86 rx,
87 base_dir: dir,
88 dir_created: false,
89 }
90 }
91
92 fn accumulate_event(
94 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
95 export_session_id: &mut Option<String>,
96 event: &MetricEvent,
97 ) {
98 let entry = tool_counts.entry(event.tool).or_insert((0, 0));
99 entry.0 += 1;
100 entry.1 += event.duration_ms;
101 if export_session_id.is_none() {
102 *export_session_id = event.session_id.clone();
103 }
104 }
105
106 async fn flush_batch(file: &mut tokio::fs::File, batch: Vec<MetricEvent>) {
108 for event in batch {
109 record_otel_metrics(&event);
111
112 if let Ok(mut json) = serde_json::to_string(&event) {
114 json.push('\n');
115 let _ = file.write_all(json.as_bytes()).await;
116 }
117 }
118 let _ = file.flush().await;
119 }
120
121 fn rotate_metrics_file(
124 base_dir: &std::path::Path,
125 current_date: &mut String,
126 current_file: &mut Option<PathBuf>,
127 dir_created: &mut bool,
128 ) -> PathBuf {
129 let new_date = current_date_str();
130 if new_date != *current_date {
131 *current_date = new_date;
132 *current_file = None;
133 *dir_created = false;
134 }
135
136 if current_file.is_none() {
137 *current_file = Some(base_dir.join(format!("metrics-{}.jsonl", current_date)));
138 }
139
140 current_file
141 .as_ref()
142 .expect("current_file is guaranteed Some after check above")
143 .clone()
144 }
145
146 async fn receive_batch(
148 rx: &mut tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
149 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
150 export_session_id: &mut Option<String>,
151 ) -> Option<Vec<MetricEvent>> {
152 let mut batch = Vec::new();
153 if let Some(event) = rx.recv().await {
154 Self::accumulate_event(tool_counts, export_session_id, &event);
155 batch.push(event);
156 for _ in 0..99 {
157 match rx.try_recv() {
158 Ok(e) => {
159 Self::accumulate_event(tool_counts, export_session_id, &e);
160 batch.push(e);
161 }
162 Err(
163 mpsc::error::TryRecvError::Empty | mpsc::error::TryRecvError::Disconnected,
164 ) => break,
165 }
166 }
167 Some(batch)
168 } else {
169 None
170 }
171 }
172
173 async fn ensure_metrics_dir(path: &std::path::Path, dir_created: &mut bool) {
175 if !*dir_created
176 && let Some(parent) = path.parent()
177 && !parent.as_os_str().is_empty()
178 {
179 match tokio::fs::create_dir_all(parent).await {
180 Ok(()) => {
181 *dir_created = true;
182 }
183 Err(e) => {
184 tracing::warn!(
185 error = %e,
186 path = %parent.display(),
187 "metrics: failed to create directory; will retry next batch"
188 );
189 }
190 }
191 }
192 }
193
194 pub async fn run(mut self) {
195 cleanup_old_files(&self.base_dir).await;
196 let mut current_date = current_date_str();
197 let mut current_file: Option<PathBuf> = None;
198
199 let mut tool_counts: std::collections::HashMap<&'static str, (u64, u64)> =
201 std::collections::HashMap::new();
202 let mut export_session_id: Option<String> = None;
203
204 loop {
205 let Some(batch) =
206 Self::receive_batch(&mut self.rx, &mut tool_counts, &mut export_session_id).await
207 else {
208 break;
209 };
210
211 let path = Self::rotate_metrics_file(
212 &self.base_dir,
213 &mut current_date,
214 &mut current_file,
215 &mut self.dir_created,
216 );
217
218 Self::ensure_metrics_dir(&path, &mut self.dir_created).await;
219
220 let file = tokio::fs::OpenOptions::new()
222 .create(true)
223 .append(true)
224 .open(&path)
225 .await;
226
227 if let Ok(mut file) = file {
228 Self::flush_batch(&mut file, batch).await;
229 }
230 }
231
232 if let Ok(export_path) = std::env::var("APTU_CODER_METRICS_EXPORT_FILE") {
234 if !std::path::Path::new(&export_path).is_absolute() {
235 tracing::warn!(
236 path = %export_path,
237 "metrics: APTU_CODER_METRICS_EXPORT_FILE must be an absolute path; skipping export"
238 );
239 } else {
240 let mut tool_calls = Vec::new();
241 let mut total_duration_ms = 0u64;
242 for (tool_name, (count, duration)) in tool_counts {
243 tool_calls.push(serde_json::json!({
244 "tool": tool_name,
245 "call_count": count,
246 "total_duration_ms": duration
247 }));
248 total_duration_ms += duration;
249 }
250 let summary = serde_json::json!({
251 "session_id": export_session_id.unwrap_or_default(),
252 "tool_calls": tool_calls,
253 "total_duration_ms": total_duration_ms
254 });
255 if let Ok(json_str) = serde_json::to_string(&summary)
256 && let Err(e) = tokio::fs::write(&export_path, json_str).await
257 {
258 tracing::warn!(
259 error = %e,
260 path = %export_path,
261 "metrics: failed to write export file"
262 );
263 }
264 }
265 }
266 }
267}
268
269#[must_use]
271pub fn unix_ms() -> u64 {
272 SystemTime::now()
273 .duration_since(UNIX_EPOCH)
274 .unwrap_or_default()
275 .as_millis()
276 .try_into()
277 .unwrap_or(u64::MAX)
278}
279
280#[must_use]
282pub fn path_component_count(path: &str) -> usize {
283 Path::new(path).components().count()
284}
285
286#[must_use]
292pub fn path_file_ext(path: &str) -> Option<&'static str> {
293 let ext_os = Path::new(path).extension()?;
294 let ext_str = ext_os.to_str()?;
295 if ext_str.is_empty() {
296 return None;
297 }
298 if language_for_extension(ext_str).is_some() {
301 aptu_coder_core::lang::supported_extensions()
302 .into_iter()
303 .find(|e| e.eq_ignore_ascii_case(ext_str))
304 } else {
305 Some("other")
306 }
307}
308
309fn xdg_metrics_dir() -> PathBuf {
310 if let Ok(xdg_data_home) = std::env::var("XDG_DATA_HOME")
311 && !xdg_data_home.is_empty()
312 {
313 return PathBuf::from(xdg_data_home).join("aptu-coder");
314 }
315
316 if let Ok(home) = std::env::var("HOME") {
317 PathBuf::from(home)
318 .join(".local")
319 .join("share")
320 .join("aptu-coder")
321 } else {
322 PathBuf::from(".")
323 }
324}
325
326async fn cleanup_old_files(base_dir: &Path) {
327 let now_days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
328
329 let Ok(mut entries) = tokio::fs::read_dir(base_dir).await else {
330 return;
331 };
332
333 loop {
334 match entries.next_entry().await {
335 Ok(Some(entry)) => {
336 let path = entry.path();
337 let file_name = match path.file_name() {
338 Some(n) => n.to_string_lossy().into_owned(),
339 None => continue,
340 };
341
342 if !file_name.starts_with("metrics-")
344 || std::path::Path::new(&*file_name)
345 .extension()
346 .is_none_or(|e| !e.eq_ignore_ascii_case("jsonl"))
347 {
348 continue;
349 }
350 let date_part = &file_name[8..file_name.len() - 6];
351 if date_part.len() != 10
352 || date_part.as_bytes().get(4) != Some(&b'-')
353 || date_part.as_bytes().get(7) != Some(&b'-')
354 {
355 continue;
356 }
357 let Ok(year) = date_part[0..4].parse::<u32>() else {
358 continue;
359 };
360 let Ok(month) = date_part[5..7].parse::<u32>() else {
361 continue;
362 };
363 let Ok(day) = date_part[8..10].parse::<u32>() else {
364 continue;
365 };
366 if month == 0 || month > 12 || day == 0 || day > 31 {
367 continue;
368 }
369
370 let file_days = date_to_days_since_epoch(year, month, day);
371 if now_days > file_days && (now_days - file_days) > 30 {
372 let _ = tokio::fs::remove_file(&path).await;
373 }
374 }
375 Ok(None) => break,
376 Err(e) => {
377 tracing::warn!("error reading metrics directory entry: {e}");
378 }
379 }
380 }
381}
382
383fn date_to_days_since_epoch(y: u32, m: u32, d: u32) -> u32 {
384 let (y, m) = if m <= 2 { (y - 1, m + 9) } else { (y, m - 3) };
386 let era = y / 400;
387 let yoe = y - era * 400;
388 let doy = (153 * m + 2) / 5 + d - 1;
389 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
390 (era * 146_097 + doe).saturating_sub(719_468)
394}
395
396#[must_use]
398pub fn current_date_str() -> String {
399 let days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
400 let z = days + 719_468;
401 let era = z / 146_097;
402 let doe = z - era * 146_097;
403 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
404 let y = yoe + era * 400;
405 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
406 let mp = (5 * doy + 2) / 153;
407 let d = doy - (153 * mp + 2) / 5 + 1;
408 let m = if mp < 10 { mp + 3 } else { mp - 9 };
409 let y = if m <= 2 { y + 1 } else { y };
410 format!("{y:04}-{m:02}-{d:02}")
411}
412
413pub fn migrate_legacy_metrics_dir() -> std::io::Result<()> {
421 let home =
422 std::env::var("HOME").map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e))?;
423 migrate_legacy_metrics_dir_impl(&home)
424}
425
426#[allow(dead_code)]
427fn migrate_legacy_metrics_dir_impl(home: &str) -> std::io::Result<()> {
428 let old_dir = PathBuf::from(home).join(".local/share/code-analyze-mcp");
429 let new_dir = PathBuf::from(home).join(".local/share/aptu-coder");
430
431 let old_exists = old_dir.is_dir();
432 let new_exists = new_dir.is_dir();
433
434 if old_exists && !new_exists {
435 std::fs::rename(&old_dir, &new_dir)?;
436 tracing::info!(
437 "Migrated legacy metrics directory from {:?} to {:?}",
438 old_dir,
439 new_dir
440 );
441 } else if old_exists && new_exists {
442 tracing::warn!("Both legacy and new metrics directories exist; not migrating");
443 }
444 Ok(())
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use std::fs;
452 use std::sync::{Mutex, OnceLock};
453 use tempfile::TempDir;
454
455 fn metrics_export_lock() -> std::sync::MutexGuard<'static, ()> {
458 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
459 let m = LOCK.get_or_init(|| Mutex::new(()));
460 m.lock().unwrap_or_else(|e| e.into_inner())
461 }
462
463 #[test]
464 fn test_migrate_legacy_only_old_exists() {
465 let tmp_home = TempDir::new().unwrap();
467 let home_str = tmp_home.path().to_str().unwrap();
468 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
469 let new_path = tmp_home.path().join(".local/share/aptu-coder");
470 fs::create_dir_all(&old_path).unwrap();
471 assert!(!new_path.exists());
472
473 let result = migrate_legacy_metrics_dir_impl(home_str);
475
476 assert!(result.is_ok());
478 assert!(!old_path.exists(), "old dir should be moved");
479 assert!(new_path.is_dir(), "new dir should exist");
480 }
481
482 #[test]
483 fn test_migrate_legacy_both_exist() {
484 let tmp_home = TempDir::new().unwrap();
486 let home_str = tmp_home.path().to_str().unwrap();
487 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
488 let new_path = tmp_home.path().join(".local/share/aptu-coder");
489 fs::create_dir_all(&old_path).unwrap();
490 fs::create_dir_all(&new_path).unwrap();
491
492 let result = migrate_legacy_metrics_dir_impl(home_str);
494
495 assert!(result.is_ok());
497 assert!(old_path.is_dir(), "old dir should remain");
498 assert!(new_path.is_dir(), "new dir should remain");
499 }
500
501 #[test]
502 fn test_migrate_legacy_neither_exists() {
503 let tmp_home = TempDir::new().unwrap();
505 let home_str = tmp_home.path().to_str().unwrap();
506 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
507 let new_path = tmp_home.path().join(".local/share/aptu-coder");
508
509 let result = migrate_legacy_metrics_dir_impl(home_str);
511
512 assert!(result.is_ok());
514 assert!(!old_path.exists(), "old dir should not exist");
515 assert!(!new_path.exists(), "new dir should not exist");
516 }
517
518 #[test]
519 fn test_date_to_days_since_epoch_known_dates() {
520 assert_eq!(date_to_days_since_epoch(1970, 1, 1), 0);
521 assert_eq!(date_to_days_since_epoch(2020, 1, 1), 18_262);
522 assert_eq!(date_to_days_since_epoch(2000, 2, 29), 11_016);
523 }
524
525 #[test]
526 fn test_current_date_str_format() {
527 let s = current_date_str();
528 assert_eq!(s.len(), 10);
529 assert_eq!(s.as_bytes()[4], b'-');
530 assert_eq!(s.as_bytes()[7], b'-');
531 let year: u32 = s[0..4].parse().expect("year must be numeric");
532 assert!(year >= 2020 && year <= 2100);
533 }
534
535 #[tokio::test]
536 async fn test_metrics_writer_batching() {
537 let dir = TempDir::new().unwrap();
538 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
539 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
540 let make_event = || MetricEvent {
541 ts: unix_ms(),
542 tool: "analyze_directory",
543 duration_ms: 1,
544 output_chars: 10,
545 param_path_depth: 1,
546 max_depth: None,
547 result: "ok",
548 error_type: None,
549 error_subtype: None,
550 session_id: None,
551 seq: None,
552 cache_hit: None,
553 cache_write_failure: None,
554 exit_code: None,
555 timed_out: false,
556 cache_tier: None,
557 output_truncated: None,
558 chars_threshold_breach: false,
559 file_ext: None,
560 };
561 tx.send(make_event()).unwrap();
562 tx.send(make_event()).unwrap();
563 tx.send(make_event()).unwrap();
564 drop(tx);
565 writer.run().await;
566 let entries: Vec<_> = std::fs::read_dir(dir.path())
567 .unwrap()
568 .filter_map(|e| e.ok())
569 .filter(|e| {
570 e.path()
571 .extension()
572 .and_then(|x| x.to_str())
573 .map(|x| x.eq_ignore_ascii_case("jsonl"))
574 .unwrap_or(false)
575 })
576 .collect();
577 assert_eq!(entries.len(), 1);
578 let content = std::fs::read_to_string(entries[0].path()).unwrap();
579 let lines: Vec<&str> = content.lines().collect();
580 assert_eq!(lines.len(), 3);
581 }
582
583 #[tokio::test]
584 async fn test_cleanup_old_files_deletes_old_keeps_recent() {
585 let dir = TempDir::new().unwrap();
586 let old_file = dir.path().join("metrics-1970-01-01.jsonl");
587 let today = current_date_str();
588 let recent_file = dir.path().join(format!("metrics-{}.jsonl", today));
589 std::fs::write(&old_file, "old\n").unwrap();
590 std::fs::write(&recent_file, "recent\n").unwrap();
591 cleanup_old_files(dir.path()).await;
592 assert!(!old_file.exists());
593 assert!(recent_file.exists());
594 }
595
596 #[test]
597 fn test_metric_event_serialization() {
598 let event = MetricEvent {
599 ts: 1_700_000_000_000,
600 tool: "analyze_directory",
601 duration_ms: 42,
602 output_chars: 100,
603 param_path_depth: 3,
604 max_depth: Some(2),
605 result: "ok",
606 error_type: None,
607 error_subtype: None,
608 session_id: None,
609 seq: None,
610 cache_hit: None,
611 cache_write_failure: None,
612 exit_code: None,
613 timed_out: false,
614 cache_tier: None,
615 output_truncated: None,
616 chars_threshold_breach: false,
617 file_ext: None,
618 };
619 let json = serde_json::to_string(&event).unwrap();
620 assert!(json.contains("analyze_directory"));
621 assert!(json.contains(r#""result":"ok""#));
622 assert!(json.contains(r#""output_chars":100"#));
623 assert!(!json.contains("error_subtype"));
625 }
626
627 #[test]
628 fn test_metric_event_serialization_error() {
629 let event = MetricEvent {
630 ts: 1_700_000_000_000,
631 tool: "analyze_directory",
632 duration_ms: 5,
633 output_chars: 0,
634 param_path_depth: 3,
635 max_depth: Some(3),
636 result: "error",
637 error_type: Some("invalid_params".to_string()),
638 error_subtype: None,
639 session_id: None,
640 seq: None,
641 cache_hit: None,
642 cache_write_failure: None,
643 exit_code: None,
644 timed_out: false,
645 cache_tier: None,
646 output_truncated: None,
647 chars_threshold_breach: false,
648 file_ext: None,
649 };
650 let json = serde_json::to_string(&event).unwrap();
651 assert!(json.contains(r#""result":"error""#));
652 assert!(json.contains(r#""error_type":"invalid_params""#));
653 assert!(json.contains(r#""output_chars":0"#));
654 assert!(!json.contains("error_subtype"));
656 }
657
658 #[test]
659 fn test_metric_event_error_subtype_some_serializes() {
660 let event = MetricEvent {
661 ts: 1_700_000_000_000,
662 tool: "edit_replace",
663 duration_ms: 10,
664 output_chars: 0,
665 param_path_depth: 2,
666 max_depth: None,
667 result: "error",
668 error_type: Some("invalid_params".to_string()),
669 error_subtype: Some("not_found".to_string()),
670 session_id: None,
671 seq: None,
672 cache_hit: None,
673 cache_write_failure: None,
674 exit_code: None,
675 timed_out: false,
676 cache_tier: None,
677 output_truncated: None,
678 chars_threshold_breach: false,
679 file_ext: None,
680 };
681 let json = serde_json::to_string(&event).unwrap();
682 assert!(json.contains(r#""error_subtype":"not_found""#));
683 }
684
685 #[test]
686 fn test_metric_event_error_subtype_ambiguous() {
687 let event = MetricEvent {
688 ts: 1_700_000_000_000,
689 tool: "edit_replace",
690 duration_ms: 10,
691 output_chars: 0,
692 param_path_depth: 2,
693 max_depth: None,
694 result: "error",
695 error_type: Some("invalid_params".to_string()),
696 error_subtype: Some("ambiguous".to_string()),
697 session_id: None,
698 seq: None,
699 cache_hit: None,
700 cache_write_failure: None,
701 exit_code: None,
702 timed_out: false,
703 cache_tier: None,
704 output_truncated: None,
705 chars_threshold_breach: false,
706 file_ext: None,
707 };
708 let json = serde_json::to_string(&event).unwrap();
709 assert!(json.contains(r#""error_subtype":"ambiguous""#));
710 }
711
712 #[test]
713 fn test_metric_event_new_fields_round_trip() {
714 let event = MetricEvent {
715 ts: 1_700_000_000_000,
716 tool: "analyze_file",
717 duration_ms: 100,
718 output_chars: 500,
719 param_path_depth: 2,
720 max_depth: Some(3),
721 result: "ok",
722 error_type: None,
723 error_subtype: None,
724 session_id: Some("1742468880123-42".to_string()),
725 seq: Some(5),
726 cache_hit: None,
727 cache_write_failure: None,
728 exit_code: None,
729 timed_out: false,
730 cache_tier: None,
731 output_truncated: None,
732 chars_threshold_breach: false,
733 file_ext: None,
734 };
735 let serialized = serde_json::to_string(&event).unwrap();
736 let json_str = r#"{"ts":1700000000000,"tool":"analyze_file","duration_ms":100,"output_chars":500,"param_path_depth":2,"max_depth":3,"result":"ok","error_type":null,"session_id":"1742468880123-42","seq":5}"#;
737 assert_eq!(serialized, json_str);
738 }
739
740 #[test]
741 fn test_path_file_ext_known() {
742 assert_eq!(path_file_ext("src/main.rs"), Some("rs"));
744 }
745
746 #[test]
747 fn test_path_file_ext_unknown() {
748 assert_eq!(path_file_ext("file.xyz"), Some("other"));
750 }
751
752 #[test]
753 fn test_path_file_ext_no_ext() {
754 assert_eq!(path_file_ext("Makefile"), None);
756 }
757
758 #[test]
759 fn test_path_file_ext_case_insensitive() {
760 assert_eq!(path_file_ext("src/main.RS"), Some("rs"));
762 }
763
764 #[test]
765 fn test_path_file_ext_multi_dot() {
766 assert_eq!(path_file_ext("file.test.rs"), Some("rs"));
768 }
769
770 #[tokio::test]
771 async fn test_metrics_export_file_created() {
772 let _guard = metrics_export_lock();
773 let dir = TempDir::new().unwrap();
775 let export_file = dir.path().join("metrics_export.json");
776 let export_path_str = export_file.to_string_lossy().to_string();
777
778 unsafe {
779 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", &export_path_str);
780 }
781
782 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
784 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
785
786 tx.send(MetricEvent {
788 ts: unix_ms(),
789 tool: "analyze_directory",
790 duration_ms: 100,
791 output_chars: 50,
792 param_path_depth: 1,
793 max_depth: None,
794 result: "ok",
795 error_type: None,
796 error_subtype: None,
797 session_id: Some("test-session-123".to_string()),
798 seq: None,
799 cache_hit: None,
800 cache_write_failure: None,
801 exit_code: None,
802 timed_out: false,
803 cache_tier: None,
804 output_truncated: None,
805 chars_threshold_breach: false,
806 file_ext: None,
807 })
808 .unwrap();
809 tx.send(MetricEvent {
810 ts: unix_ms(),
811 tool: "analyze_file",
812 duration_ms: 50,
813 output_chars: 100,
814 param_path_depth: 2,
815 max_depth: Some(3),
816 result: "ok",
817 error_type: None,
818 error_subtype: None,
819 session_id: Some("test-session-123".to_string()),
820 seq: None,
821 cache_hit: None,
822 cache_write_failure: None,
823 exit_code: None,
824 timed_out: false,
825 cache_tier: None,
826 output_truncated: None,
827 chars_threshold_breach: false,
828 file_ext: None,
829 })
830 .unwrap();
831 drop(tx);
832 writer.run().await;
833
834 assert!(
836 export_file.exists(),
837 "export file should be created at {:?}",
838 export_file
839 );
840 let content = std::fs::read_to_string(&export_file).unwrap();
841 let json: serde_json::Value = serde_json::from_str(&content).unwrap();
842
843 assert_eq!(
844 json["session_id"], "test-session-123",
845 "export should contain correct session_id"
846 );
847 assert!(
848 json["tool_calls"].is_array(),
849 "export should contain tool_calls array"
850 );
851 let tool_calls = json["tool_calls"].as_array().unwrap();
852 assert_eq!(tool_calls.len(), 2, "should have 2 tool calls");
853 assert!(
854 json["total_duration_ms"].is_number(),
855 "export should contain total_duration_ms"
856 );
857 assert_eq!(
858 json["total_duration_ms"], 150,
859 "total_duration_ms should be sum of all durations"
860 );
861
862 unsafe {
864 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
865 }
866 }
867
868 #[tokio::test]
869 async fn test_metrics_export_env_var_unset() {
870 let _guard = metrics_export_lock();
871 unsafe {
873 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
874 }
875 let dir = TempDir::new().unwrap();
876 let marker = "metrics_export_unset_test";
878
879 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
881 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
882
883 tx.send(MetricEvent {
885 ts: unix_ms(),
886 tool: "analyze_directory",
887 duration_ms: 100,
888 output_chars: 50,
889 param_path_depth: 1,
890 max_depth: None,
891 result: "ok",
892 error_type: None,
893 error_subtype: None,
894 session_id: Some("test-session-456".to_string()),
895 seq: None,
896 cache_hit: None,
897 cache_write_failure: None,
898 exit_code: None,
899 timed_out: false,
900 cache_tier: None,
901 output_truncated: None,
902 chars_threshold_breach: false,
903 file_ext: None,
904 })
905 .unwrap();
906 drop(tx);
907 writer.run().await;
908
909 let entries: Vec<_> = std::fs::read_dir(dir.path())
911 .unwrap()
912 .filter_map(|e| e.ok())
913 .filter(|e| {
914 e.path()
915 .file_name()
916 .and_then(|n| n.to_str())
917 .map(|n| n.contains(marker))
918 .unwrap_or(false)
919 })
920 .collect();
921 assert_eq!(
922 entries.len(),
923 0,
924 "no export file should be created when env var is unset"
925 );
926 }
927
928 #[tokio::test]
929 async fn test_metrics_export_relative_path_rejected() {
930 let _guard = metrics_export_lock();
931 let relative_path = "relative/path/metrics.json";
933 unsafe {
934 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", relative_path);
935 }
936
937 let dir = TempDir::new().unwrap();
938 let marker = "metrics_export_relative_test";
940
941 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
943 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
944
945 tx.send(MetricEvent {
947 ts: unix_ms(),
948 tool: "analyze_directory",
949 duration_ms: 100,
950 output_chars: 50,
951 param_path_depth: 1,
952 max_depth: None,
953 result: "ok",
954 error_type: None,
955 error_subtype: None,
956 session_id: Some(marker.to_string()),
957 seq: None,
958 cache_hit: None,
959 cache_write_failure: None,
960 exit_code: None,
961 timed_out: false,
962 cache_tier: None,
963 output_truncated: None,
964 chars_threshold_breach: false,
965 file_ext: None,
966 })
967 .unwrap();
968 drop(tx);
969 writer.run().await;
970
971 let entries: Vec<_> = std::fs::read_dir(dir.path())
973 .unwrap()
974 .filter_map(|e| e.ok())
975 .filter(|e| {
976 e.path()
977 .file_name()
978 .and_then(|n| n.to_str())
979 .map(|n| n.contains("metrics.json"))
980 .unwrap_or(false)
981 })
982 .collect();
983 assert_eq!(
984 entries.len(),
985 0,
986 "no export file should be created for relative path"
987 );
988
989 unsafe {
991 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
992 }
993 }
994}
995
996fn record_otel_metrics(event: &MetricEvent) {
1006 use opentelemetry::metrics::{Counter, Histogram};
1007 use opentelemetry::{KeyValue, global};
1008 use std::sync::OnceLock;
1009
1010 static DURATION_HISTOGRAM: OnceLock<Histogram<f64>> = OnceLock::new();
1011 static CALL_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1012 static CACHE_HITS_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1013 static CACHE_WRITE_FAILURES_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1014
1015 let histogram = DURATION_HISTOGRAM.get_or_init(|| {
1016 global::meter("aptu-coder")
1017 .f64_histogram("mcp.server.operation.duration")
1018 .with_unit("s")
1019 .with_boundaries(vec![
1020 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
1021 ])
1022 .build()
1023 });
1024
1025 let counter = CALL_COUNTER.get_or_init(|| {
1026 global::meter("aptu-coder")
1027 .u64_counter("mcp.server.tool.calls")
1028 .build()
1029 });
1030
1031 let cache_hits_counter = CACHE_HITS_COUNTER.get_or_init(|| {
1032 global::meter("aptu-coder")
1033 .u64_counter("mcp.server.tool.cache_hits_total")
1034 .with_description("Number of tool responses served from cache (l1_memory or l2_disk)")
1035 .build()
1036 });
1037
1038 let cache_write_failures_counter = CACHE_WRITE_FAILURES_COUNTER.get_or_init(|| {
1039 global::meter("aptu-coder")
1040 .u64_counter("mcp.server.tool.cache_write_failures_total")
1041 .with_description(
1042 "Number of L2 disk cache write failures (dir, tempfile, write, rename)",
1043 )
1044 .build()
1045 });
1046
1047 let error_type = event.error_type.as_deref().unwrap_or("success");
1048 let attributes = [
1049 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
1050 KeyValue::new("error.type", error_type.to_string()),
1051 KeyValue::new("mcp.method.name", "tools/call"),
1052 KeyValue::new("mcp.protocol.version", "2025-11-25"),
1053 KeyValue::new("network.transport", "pipe"),
1054 ];
1055
1056 histogram.record(event.duration_ms as f64 / 1000.0, &attributes);
1057 counter.add(1, &attributes);
1058
1059 if event.cache_hit == Some(true) {
1060 let tier = event.cache_tier.unwrap_or("unknown");
1061 cache_hits_counter.add(
1062 1,
1063 &[
1064 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
1065 KeyValue::new("cache_tier", tier.to_string()),
1066 ],
1067 );
1068 }
1069
1070 if event.cache_write_failure == Some(true) {
1071 cache_write_failures_counter.add(
1072 1,
1073 &[KeyValue::new("gen_ai.tool.name", event.tool.to_string())],
1074 );
1075 }
1076}