1use aptu_coder_core::lang::language_for_extension;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::time::{SystemTime, UNIX_EPOCH};
14use tokio::io::AsyncWriteExt;
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19#[serde(default)]
20pub struct MetricEvent {
21 pub ts: u64,
22 pub tool: &'static str,
23 pub duration_ms: u64,
24 pub output_chars: usize,
25 pub param_path_depth: usize,
26 pub max_depth: Option<u32>,
27 pub result: &'static str,
28 pub error_type: Option<String>,
29 #[serde(default, skip_serializing_if = "Option::is_none")]
30 pub error_subtype: Option<String>,
31 #[serde(default)]
32 pub session_id: Option<String>,
33 #[serde(default)]
34 pub seq: Option<u32>,
35 #[serde(default)]
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub cache_hit: Option<bool>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub cache_tier: Option<&'static str>,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub cache_write_failure: Option<bool>,
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub exit_code: Option<i32>,
46 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
47 pub timed_out: bool,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub output_truncated: Option<bool>,
50 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
54 pub chars_threshold_breach: bool,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub file_ext: Option<&'static str>,
60}
61
62#[derive(Clone)]
64pub struct MetricsSender(pub tokio::sync::mpsc::UnboundedSender<MetricEvent>);
65
66impl MetricsSender {
67 pub fn send(&self, event: MetricEvent) {
68 let _ = self.0.send(event);
69 }
70}
71
72pub struct MetricsWriter {
74 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
75 base_dir: PathBuf,
76 dir_created: bool,
77}
78
79#[derive(Default, Debug)]
81struct ToolMetrics {
82 count: u64,
83 duration_ms: u64,
84 output_chars: u64,
85}
86
87impl MetricsWriter {
88 pub fn new(
89 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
90 base_dir: Option<PathBuf>,
91 ) -> Self {
92 let dir = base_dir.unwrap_or_else(xdg_metrics_dir);
93 Self {
94 rx,
95 base_dir: dir,
96 dir_created: false,
97 }
98 }
99
100 fn accumulate_event(
102 tool_counts: &mut std::collections::HashMap<&'static str, ToolMetrics>,
103 export_session_id: &mut Option<String>,
104 event: &MetricEvent,
105 ) {
106 let entry = tool_counts.entry(event.tool).or_default();
107 entry.count += 1;
108 entry.duration_ms += event.duration_ms;
109 entry.output_chars += event.output_chars as u64;
111 if export_session_id.is_none() {
112 *export_session_id = event.session_id.clone();
113 }
114 }
115
116 async fn flush_batch(file: &mut tokio::fs::File, batch: Vec<MetricEvent>) {
118 for event in batch {
119 record_otel_metrics(&event);
121
122 if let Ok(mut json) = serde_json::to_string(&event) {
124 json.push('\n');
125 let _ = file.write_all(json.as_bytes()).await;
126 }
127 }
128 let _ = file.flush().await;
129 }
130
131 fn rotate_metrics_file(
134 base_dir: &std::path::Path,
135 current_date: &mut String,
136 current_file: &mut Option<PathBuf>,
137 dir_created: &mut bool,
138 ) -> PathBuf {
139 let new_date = current_date_str();
140 if new_date != *current_date {
141 *current_date = new_date;
142 *current_file = None;
143 *dir_created = false;
144 }
145
146 if current_file.is_none() {
147 *current_file = Some(base_dir.join(format!("metrics-{}.jsonl", current_date)));
148 }
149
150 current_file
151 .as_ref()
152 .expect("current_file is guaranteed Some after check above")
153 .clone()
154 }
155
156 async fn receive_batch(
158 rx: &mut tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
159 tool_counts: &mut std::collections::HashMap<&'static str, ToolMetrics>,
160 export_session_id: &mut Option<String>,
161 ) -> Option<Vec<MetricEvent>> {
162 let mut batch = Vec::new();
163 if let Some(event) = rx.recv().await {
164 Self::accumulate_event(tool_counts, export_session_id, &event);
165 batch.push(event);
166 for _ in 0..99 {
167 match rx.try_recv() {
168 Ok(e) => {
169 Self::accumulate_event(tool_counts, export_session_id, &e);
170 batch.push(e);
171 }
172 Err(
173 mpsc::error::TryRecvError::Empty | mpsc::error::TryRecvError::Disconnected,
174 ) => break,
175 }
176 }
177 Some(batch)
178 } else {
179 None
180 }
181 }
182
183 async fn ensure_metrics_dir(path: &std::path::Path, dir_created: &mut bool) {
185 if !*dir_created
186 && let Some(parent) = path.parent()
187 && !parent.as_os_str().is_empty()
188 {
189 match tokio::fs::create_dir_all(parent).await {
190 Ok(()) => {
191 *dir_created = true;
192 }
193 Err(e) => {
194 tracing::warn!(
195 error = %e,
196 path = %parent.display(),
197 "metrics: failed to create directory; will retry next batch"
198 );
199 }
200 }
201 }
202 }
203
204 pub async fn run(mut self) {
205 cleanup_old_files(&self.base_dir).await;
206 let mut current_date = current_date_str();
207 let mut current_file: Option<PathBuf> = None;
208
209 let mut tool_counts: std::collections::HashMap<&'static str, ToolMetrics> =
211 std::collections::HashMap::new();
212 let mut export_session_id: Option<String> = None;
213
214 loop {
215 let Some(batch) =
216 Self::receive_batch(&mut self.rx, &mut tool_counts, &mut export_session_id).await
217 else {
218 break;
219 };
220
221 let path = Self::rotate_metrics_file(
222 &self.base_dir,
223 &mut current_date,
224 &mut current_file,
225 &mut self.dir_created,
226 );
227
228 Self::ensure_metrics_dir(&path, &mut self.dir_created).await;
229
230 let file = tokio::fs::OpenOptions::new()
232 .create(true)
233 .append(true)
234 .open(&path)
235 .await;
236
237 if let Ok(mut file) = file {
238 Self::flush_batch(&mut file, batch).await;
239 }
240 }
241
242 if let Ok(export_path) = std::env::var("APTU_CODER_METRICS_EXPORT_FILE") {
244 if !std::path::Path::new(&export_path).is_absolute() {
245 tracing::warn!(
246 path = %export_path,
247 "metrics: APTU_CODER_METRICS_EXPORT_FILE must be an absolute path; skipping export"
248 );
249 } else {
250 let mut tool_calls = Vec::new();
251 let mut total_duration_ms = 0u64;
252 let mut total_output_chars_sum = 0u64;
253 let mut sorted_tools: Vec<_> = tool_counts.iter().collect();
255 sorted_tools.sort_by_key(|&(name, _)| name);
256 for (tool_name, metrics) in sorted_tools {
257 tool_calls.push(serde_json::json!({
258 "tool": tool_name,
259 "call_count": metrics.count,
260 "total_duration_ms": metrics.duration_ms,
261 "total_output_chars": metrics.output_chars
262 }));
263 total_duration_ms += metrics.duration_ms;
264 total_output_chars_sum += metrics.output_chars;
265 }
266 let summary = serde_json::json!({
267 "session_id": export_session_id.unwrap_or_default(),
268 "tool_calls": tool_calls,
269 "total_duration_ms": total_duration_ms,
270 "total_output_chars": total_output_chars_sum
271 });
272 if let Ok(json_str) = serde_json::to_string(&summary)
273 && let Err(e) = tokio::fs::write(&export_path, json_str).await
274 {
275 tracing::warn!(
276 error = %e,
277 path = %export_path,
278 "metrics: failed to write export file"
279 );
280 }
281 }
282 }
283 }
284}
285
286#[must_use]
288pub fn unix_ms() -> u64 {
289 SystemTime::now()
290 .duration_since(UNIX_EPOCH)
291 .unwrap_or_default()
292 .as_millis()
293 .try_into()
294 .unwrap_or(u64::MAX)
295}
296
297#[must_use]
299pub fn path_component_count(path: &str) -> usize {
300 Path::new(path).components().count()
301}
302
303#[must_use]
309pub fn path_file_ext(path: &str) -> Option<&'static str> {
310 let ext_os = Path::new(path).extension()?;
311 let ext_str = ext_os.to_str()?;
312 if ext_str.is_empty() {
313 return None;
314 }
315 if language_for_extension(ext_str).is_some() {
318 aptu_coder_core::lang::supported_extensions()
319 .into_iter()
320 .find(|e| e.eq_ignore_ascii_case(ext_str))
321 } else {
322 Some("other")
323 }
324}
325
326fn xdg_metrics_dir() -> PathBuf {
327 if let Ok(xdg_data_home) = std::env::var("XDG_DATA_HOME")
328 && !xdg_data_home.is_empty()
329 {
330 return PathBuf::from(xdg_data_home).join("aptu-coder");
331 }
332
333 if let Ok(home) = std::env::var("HOME") {
334 PathBuf::from(home)
335 .join(".local")
336 .join("share")
337 .join("aptu-coder")
338 } else {
339 PathBuf::from(".")
340 }
341}
342
343async fn cleanup_old_files(base_dir: &Path) {
344 let now_days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
345
346 let Ok(mut entries) = tokio::fs::read_dir(base_dir).await else {
347 return;
348 };
349
350 loop {
351 match entries.next_entry().await {
352 Ok(Some(entry)) => {
353 let path = entry.path();
354 let file_name = match path.file_name() {
355 Some(n) => n.to_string_lossy().into_owned(),
356 None => continue,
357 };
358
359 if !file_name.starts_with("metrics-")
361 || std::path::Path::new(&*file_name)
362 .extension()
363 .is_none_or(|e| !e.eq_ignore_ascii_case("jsonl"))
364 {
365 continue;
366 }
367 let date_part = &file_name[8..file_name.len() - 6];
368 if date_part.len() != 10
369 || date_part.as_bytes().get(4) != Some(&b'-')
370 || date_part.as_bytes().get(7) != Some(&b'-')
371 {
372 continue;
373 }
374 let Ok(year) = date_part[0..4].parse::<u32>() else {
375 continue;
376 };
377 let Ok(month) = date_part[5..7].parse::<u32>() else {
378 continue;
379 };
380 let Ok(day) = date_part[8..10].parse::<u32>() else {
381 continue;
382 };
383 if month == 0 || month > 12 || day == 0 || day > 31 {
384 continue;
385 }
386
387 let file_days = date_to_days_since_epoch(year, month, day);
388 if now_days > file_days && (now_days - file_days) > 30 {
389 let _ = tokio::fs::remove_file(&path).await;
390 }
391 }
392 Ok(None) => break,
393 Err(e) => {
394 tracing::warn!("error reading metrics directory entry: {e}");
395 }
396 }
397 }
398}
399
400fn date_to_days_since_epoch(y: u32, m: u32, d: u32) -> u32 {
401 let (y, m) = if m <= 2 { (y - 1, m + 9) } else { (y, m - 3) };
403 let era = y / 400;
404 let yoe = y - era * 400;
405 let doy = (153 * m + 2) / 5 + d - 1;
406 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
407 (era * 146_097 + doe).saturating_sub(719_468)
411}
412
413#[must_use]
415pub fn current_date_str() -> String {
416 let days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
417 let z = days + 719_468;
418 let era = z / 146_097;
419 let doe = z - era * 146_097;
420 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
421 let y = yoe + era * 400;
422 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
423 let mp = (5 * doy + 2) / 153;
424 let d = doy - (153 * mp + 2) / 5 + 1;
425 let m = if mp < 10 { mp + 3 } else { mp - 9 };
426 let y = if m <= 2 { y + 1 } else { y };
427 format!("{y:04}-{m:02}-{d:02}")
428}
429
430pub fn migrate_legacy_metrics_dir() -> std::io::Result<()> {
438 let home =
439 std::env::var("HOME").map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e))?;
440 migrate_legacy_metrics_dir_impl(&home)
441}
442
443#[allow(dead_code)]
444fn migrate_legacy_metrics_dir_impl(home: &str) -> std::io::Result<()> {
445 let old_dir = PathBuf::from(home).join(".local/share/code-analyze-mcp");
446 let new_dir = PathBuf::from(home).join(".local/share/aptu-coder");
447
448 let old_exists = old_dir.is_dir();
449 let new_exists = new_dir.is_dir();
450
451 if old_exists && !new_exists {
452 std::fs::rename(&old_dir, &new_dir)?;
453 tracing::info!(
454 "Migrated legacy metrics directory from {:?} to {:?}",
455 old_dir,
456 new_dir
457 );
458 } else if old_exists && new_exists {
459 tracing::warn!("Both legacy and new metrics directories exist; not migrating");
460 }
461 Ok(())
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use std::fs;
469 use std::sync::{Mutex, OnceLock};
470 use tempfile::TempDir;
471
472 fn metrics_export_lock() -> std::sync::MutexGuard<'static, ()> {
475 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
476 let m = LOCK.get_or_init(|| Mutex::new(()));
477 m.lock().unwrap_or_else(|e| e.into_inner())
478 }
479
480 #[test]
481 fn test_migrate_legacy_only_old_exists() {
482 let tmp_home = TempDir::new().unwrap();
484 let home_str = tmp_home.path().to_str().unwrap();
485 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
486 let new_path = tmp_home.path().join(".local/share/aptu-coder");
487 fs::create_dir_all(&old_path).unwrap();
488 assert!(!new_path.exists());
489
490 let result = migrate_legacy_metrics_dir_impl(home_str);
492
493 assert!(result.is_ok());
495 assert!(!old_path.exists(), "old dir should be moved");
496 assert!(new_path.is_dir(), "new dir should exist");
497 }
498
499 #[test]
500 fn test_migrate_legacy_both_exist() {
501 let tmp_home = TempDir::new().unwrap();
503 let home_str = tmp_home.path().to_str().unwrap();
504 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
505 let new_path = tmp_home.path().join(".local/share/aptu-coder");
506 fs::create_dir_all(&old_path).unwrap();
507 fs::create_dir_all(&new_path).unwrap();
508
509 let result = migrate_legacy_metrics_dir_impl(home_str);
511
512 assert!(result.is_ok());
514 assert!(old_path.is_dir(), "old dir should remain");
515 assert!(new_path.is_dir(), "new dir should remain");
516 }
517
518 #[test]
519 fn test_migrate_legacy_neither_exists() {
520 let tmp_home = TempDir::new().unwrap();
522 let home_str = tmp_home.path().to_str().unwrap();
523 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
524 let new_path = tmp_home.path().join(".local/share/aptu-coder");
525
526 let result = migrate_legacy_metrics_dir_impl(home_str);
528
529 assert!(result.is_ok());
531 assert!(!old_path.exists(), "old dir should not exist");
532 assert!(!new_path.exists(), "new dir should not exist");
533 }
534
535 #[test]
536 fn test_date_to_days_since_epoch_known_dates() {
537 assert_eq!(date_to_days_since_epoch(1970, 1, 1), 0);
538 assert_eq!(date_to_days_since_epoch(2020, 1, 1), 18_262);
539 assert_eq!(date_to_days_since_epoch(2000, 2, 29), 11_016);
540 }
541
542 #[test]
543 fn test_current_date_str_format() {
544 let s = current_date_str();
545 assert_eq!(s.len(), 10);
546 assert_eq!(s.as_bytes()[4], b'-');
547 assert_eq!(s.as_bytes()[7], b'-');
548 let year: u32 = s[0..4].parse().expect("year must be numeric");
549 assert!(year >= 2020 && year <= 2100);
550 }
551
552 #[tokio::test]
553 async fn test_metrics_writer_batching() {
554 let dir = TempDir::new().unwrap();
555 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
556 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
557 let make_event = || MetricEvent {
558 ts: unix_ms(),
559 tool: "analyze_directory",
560 duration_ms: 1,
561 output_chars: 10,
562 param_path_depth: 1,
563 max_depth: None,
564 result: "ok",
565 error_type: None,
566 error_subtype: None,
567 session_id: None,
568 seq: None,
569 cache_hit: None,
570 cache_write_failure: None,
571 exit_code: None,
572 timed_out: false,
573 cache_tier: None,
574 output_truncated: None,
575 chars_threshold_breach: false,
576 file_ext: None,
577 };
578 tx.send(make_event()).unwrap();
579 tx.send(make_event()).unwrap();
580 tx.send(make_event()).unwrap();
581 drop(tx);
582 writer.run().await;
583 let entries: Vec<_> = std::fs::read_dir(dir.path())
584 .unwrap()
585 .filter_map(|e| e.ok())
586 .filter(|e| {
587 e.path()
588 .extension()
589 .and_then(|x| x.to_str())
590 .map(|x| x.eq_ignore_ascii_case("jsonl"))
591 .unwrap_or(false)
592 })
593 .collect();
594 assert_eq!(entries.len(), 1);
595 let content = std::fs::read_to_string(entries[0].path()).unwrap();
596 let lines: Vec<&str> = content.lines().collect();
597 assert_eq!(lines.len(), 3);
598 }
599
600 #[tokio::test]
601 async fn test_cleanup_old_files_deletes_old_keeps_recent() {
602 let dir = TempDir::new().unwrap();
603 let old_file = dir.path().join("metrics-1970-01-01.jsonl");
604 let today = current_date_str();
605 let recent_file = dir.path().join(format!("metrics-{}.jsonl", today));
606 std::fs::write(&old_file, "old\n").unwrap();
607 std::fs::write(&recent_file, "recent\n").unwrap();
608 cleanup_old_files(dir.path()).await;
609 assert!(!old_file.exists());
610 assert!(recent_file.exists());
611 }
612
613 #[test]
614 fn test_metric_event_serialization() {
615 let event = MetricEvent {
616 ts: 1_700_000_000_000,
617 tool: "analyze_directory",
618 duration_ms: 42,
619 output_chars: 100,
620 param_path_depth: 3,
621 max_depth: Some(2),
622 result: "ok",
623 error_type: None,
624 error_subtype: None,
625 session_id: None,
626 seq: None,
627 cache_hit: None,
628 cache_write_failure: None,
629 exit_code: None,
630 timed_out: false,
631 cache_tier: None,
632 output_truncated: None,
633 chars_threshold_breach: false,
634 file_ext: None,
635 };
636 let json = serde_json::to_string(&event).unwrap();
637 assert!(json.contains("analyze_directory"));
638 assert!(json.contains(r#""result":"ok""#));
639 assert!(json.contains(r#""output_chars":100"#));
640 assert!(!json.contains("error_subtype"));
642 }
643
644 #[test]
645 fn test_metric_event_serialization_error() {
646 let event = MetricEvent {
647 ts: 1_700_000_000_000,
648 tool: "analyze_directory",
649 duration_ms: 5,
650 output_chars: 0,
651 param_path_depth: 3,
652 max_depth: Some(3),
653 result: "error",
654 error_type: Some("invalid_params".to_string()),
655 error_subtype: None,
656 session_id: None,
657 seq: None,
658 cache_hit: None,
659 cache_write_failure: None,
660 exit_code: None,
661 timed_out: false,
662 cache_tier: None,
663 output_truncated: None,
664 chars_threshold_breach: false,
665 file_ext: None,
666 };
667 let json = serde_json::to_string(&event).unwrap();
668 assert!(json.contains(r#""result":"error""#));
669 assert!(json.contains(r#""error_type":"invalid_params""#));
670 assert!(json.contains(r#""output_chars":0"#));
671 assert!(!json.contains("error_subtype"));
673 }
674
675 #[test]
676 fn test_metric_event_error_subtype_some_serializes() {
677 let event = MetricEvent {
678 ts: 1_700_000_000_000,
679 tool: "edit_replace",
680 duration_ms: 10,
681 output_chars: 0,
682 param_path_depth: 2,
683 max_depth: None,
684 result: "error",
685 error_type: Some("invalid_params".to_string()),
686 error_subtype: Some("not_found".to_string()),
687 session_id: None,
688 seq: None,
689 cache_hit: None,
690 cache_write_failure: None,
691 exit_code: None,
692 timed_out: false,
693 cache_tier: None,
694 output_truncated: None,
695 chars_threshold_breach: false,
696 file_ext: None,
697 };
698 let json = serde_json::to_string(&event).unwrap();
699 assert!(json.contains(r#""error_subtype":"not_found""#));
700 }
701
702 #[test]
703 fn test_metric_event_error_subtype_ambiguous() {
704 let event = MetricEvent {
705 ts: 1_700_000_000_000,
706 tool: "edit_replace",
707 duration_ms: 10,
708 output_chars: 0,
709 param_path_depth: 2,
710 max_depth: None,
711 result: "error",
712 error_type: Some("invalid_params".to_string()),
713 error_subtype: Some("ambiguous".to_string()),
714 session_id: None,
715 seq: None,
716 cache_hit: None,
717 cache_write_failure: None,
718 exit_code: None,
719 timed_out: false,
720 cache_tier: None,
721 output_truncated: None,
722 chars_threshold_breach: false,
723 file_ext: None,
724 };
725 let json = serde_json::to_string(&event).unwrap();
726 assert!(json.contains(r#""error_subtype":"ambiguous""#));
727 }
728
729 #[test]
730 fn test_metric_event_new_fields_round_trip() {
731 let event = MetricEvent {
732 ts: 1_700_000_000_000,
733 tool: "analyze_file",
734 duration_ms: 100,
735 output_chars: 500,
736 param_path_depth: 2,
737 max_depth: Some(3),
738 result: "ok",
739 error_type: None,
740 error_subtype: None,
741 session_id: Some("1742468880123-42".to_string()),
742 seq: Some(5),
743 cache_hit: None,
744 cache_write_failure: None,
745 exit_code: None,
746 timed_out: false,
747 cache_tier: None,
748 output_truncated: None,
749 chars_threshold_breach: false,
750 file_ext: None,
751 };
752 let serialized = serde_json::to_string(&event).unwrap();
753 let json_str = r#"{"ts":1700000000000,"tool":"analyze_file","duration_ms":100,"output_chars":500,"param_path_depth":2,"max_depth":3,"result":"ok","error_type":null,"session_id":"1742468880123-42","seq":5}"#;
754 assert_eq!(serialized, json_str);
755 }
756
757 #[test]
758 fn test_path_file_ext_known() {
759 assert_eq!(path_file_ext("src/main.rs"), Some("rs"));
761 }
762
763 #[test]
764 fn test_path_file_ext_unknown() {
765 assert_eq!(path_file_ext("file.xyz"), Some("other"));
767 }
768
769 #[test]
770 fn test_path_file_ext_no_ext() {
771 assert_eq!(path_file_ext("Makefile"), None);
773 }
774
775 #[test]
776 fn test_path_file_ext_case_insensitive() {
777 assert_eq!(path_file_ext("src/main.RS"), Some("rs"));
779 }
780
781 #[test]
782 fn test_path_file_ext_multi_dot() {
783 assert_eq!(path_file_ext("file.test.rs"), Some("rs"));
785 }
786
787 #[tokio::test]
788 async fn test_metrics_export_file_created() {
789 let _guard = metrics_export_lock();
790 let dir = TempDir::new().unwrap();
792 let export_file = dir.path().join("metrics_export.json");
793 let export_path_str = export_file.to_string_lossy().to_string();
794
795 unsafe {
796 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", &export_path_str);
797 }
798
799 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
801 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
802
803 tx.send(MetricEvent {
805 ts: unix_ms(),
806 tool: "analyze_directory",
807 duration_ms: 100,
808 output_chars: 50,
809 param_path_depth: 1,
810 max_depth: None,
811 result: "ok",
812 error_type: None,
813 error_subtype: None,
814 session_id: Some("test-session-123".to_string()),
815 seq: None,
816 cache_hit: None,
817 cache_write_failure: None,
818 exit_code: None,
819 timed_out: false,
820 cache_tier: None,
821 output_truncated: None,
822 chars_threshold_breach: false,
823 file_ext: None,
824 })
825 .unwrap();
826 tx.send(MetricEvent {
827 ts: unix_ms(),
828 tool: "analyze_file",
829 duration_ms: 50,
830 output_chars: 100,
831 param_path_depth: 2,
832 max_depth: Some(3),
833 result: "ok",
834 error_type: None,
835 error_subtype: None,
836 session_id: Some("test-session-123".to_string()),
837 seq: None,
838 cache_hit: None,
839 cache_write_failure: None,
840 exit_code: None,
841 timed_out: false,
842 cache_tier: None,
843 output_truncated: None,
844 chars_threshold_breach: false,
845 file_ext: None,
846 })
847 .unwrap();
848 drop(tx);
849 writer.run().await;
850
851 assert!(
853 export_file.exists(),
854 "export file should be created at {:?}",
855 export_file
856 );
857 let content = std::fs::read_to_string(&export_file).unwrap();
858 let json: serde_json::Value = serde_json::from_str(&content).unwrap();
859
860 assert_eq!(
861 json["session_id"], "test-session-123",
862 "export should contain correct session_id"
863 );
864 assert!(
865 json["tool_calls"].is_array(),
866 "export should contain tool_calls array"
867 );
868 let tool_calls = json["tool_calls"].as_array().unwrap();
869 assert_eq!(tool_calls.len(), 2, "should have 2 tool calls");
870 assert!(
871 json["total_duration_ms"].is_number(),
872 "export should contain total_duration_ms"
873 );
874 assert_eq!(
875 json["total_duration_ms"], 150,
876 "total_duration_ms should be sum of all durations"
877 );
878 assert_eq!(
879 json["tool_calls"][0]["total_output_chars"], 50,
880 "first tool call should have total_output_chars=50"
881 );
882 assert_eq!(
883 json["tool_calls"][1]["total_output_chars"], 100,
884 "second tool call should have total_output_chars=100"
885 );
886 assert_eq!(
887 json["total_output_chars"], 150,
888 "total_output_chars should be sum of all output_chars"
889 );
890
891 unsafe {
893 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
894 }
895 }
896
897 #[tokio::test]
898 async fn test_metrics_export_env_var_unset() {
899 let _guard = metrics_export_lock();
900 unsafe {
902 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
903 }
904 let dir = TempDir::new().unwrap();
905 let marker = "metrics_export_unset_test";
907
908 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
910 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
911
912 tx.send(MetricEvent {
914 ts: unix_ms(),
915 tool: "analyze_directory",
916 duration_ms: 100,
917 output_chars: 50,
918 param_path_depth: 1,
919 max_depth: None,
920 result: "ok",
921 error_type: None,
922 error_subtype: None,
923 session_id: Some("test-session-456".to_string()),
924 seq: None,
925 cache_hit: None,
926 cache_write_failure: None,
927 exit_code: None,
928 timed_out: false,
929 cache_tier: None,
930 output_truncated: None,
931 chars_threshold_breach: false,
932 file_ext: None,
933 })
934 .unwrap();
935 drop(tx);
936 writer.run().await;
937
938 let entries: Vec<_> = std::fs::read_dir(dir.path())
940 .unwrap()
941 .filter_map(|e| e.ok())
942 .filter(|e| {
943 e.path()
944 .file_name()
945 .and_then(|n| n.to_str())
946 .map(|n| n.contains(marker))
947 .unwrap_or(false)
948 })
949 .collect();
950 assert_eq!(
951 entries.len(),
952 0,
953 "no export file should be created when env var is unset"
954 );
955 }
956
957 #[tokio::test]
958 async fn test_metrics_export_relative_path_rejected() {
959 let _guard = metrics_export_lock();
960 let relative_path = "relative/path/metrics.json";
962 unsafe {
963 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", relative_path);
964 }
965
966 let dir = TempDir::new().unwrap();
967 let marker = "metrics_export_relative_test";
969
970 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
972 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
973
974 tx.send(MetricEvent {
976 ts: unix_ms(),
977 tool: "analyze_directory",
978 duration_ms: 100,
979 output_chars: 50,
980 param_path_depth: 1,
981 max_depth: None,
982 result: "ok",
983 error_type: None,
984 error_subtype: None,
985 session_id: Some(marker.to_string()),
986 seq: None,
987 cache_hit: None,
988 cache_write_failure: None,
989 exit_code: None,
990 timed_out: false,
991 cache_tier: None,
992 output_truncated: None,
993 chars_threshold_breach: false,
994 file_ext: None,
995 })
996 .unwrap();
997 drop(tx);
998 writer.run().await;
999
1000 let entries: Vec<_> = std::fs::read_dir(dir.path())
1002 .unwrap()
1003 .filter_map(|e| e.ok())
1004 .filter(|e| {
1005 e.path()
1006 .file_name()
1007 .and_then(|n| n.to_str())
1008 .map(|n| n.contains("metrics.json"))
1009 .unwrap_or(false)
1010 })
1011 .collect();
1012 assert_eq!(
1013 entries.len(),
1014 0,
1015 "no export file should be created for relative path"
1016 );
1017
1018 unsafe {
1020 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
1021 }
1022 }
1023}
1024
1025fn record_otel_metrics(event: &MetricEvent) {
1035 use opentelemetry::metrics::{Counter, Histogram};
1036 use opentelemetry::{KeyValue, global};
1037 use std::sync::OnceLock;
1038
1039 static DURATION_HISTOGRAM: OnceLock<Histogram<f64>> = OnceLock::new();
1040 static CALL_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1041 static CACHE_HITS_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1042 static CACHE_WRITE_FAILURES_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
1043
1044 let histogram = DURATION_HISTOGRAM.get_or_init(|| {
1045 global::meter("aptu-coder")
1046 .f64_histogram("mcp.server.operation.duration")
1047 .with_unit("s")
1048 .with_boundaries(vec![
1049 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
1050 ])
1051 .build()
1052 });
1053
1054 let counter = CALL_COUNTER.get_or_init(|| {
1055 global::meter("aptu-coder")
1056 .u64_counter("mcp.server.tool.calls")
1057 .build()
1058 });
1059
1060 let cache_hits_counter = CACHE_HITS_COUNTER.get_or_init(|| {
1061 global::meter("aptu-coder")
1062 .u64_counter("mcp.server.tool.cache_hits_total")
1063 .with_description("Number of tool responses served from cache (l1_memory or l2_disk)")
1064 .build()
1065 });
1066
1067 let cache_write_failures_counter = CACHE_WRITE_FAILURES_COUNTER.get_or_init(|| {
1068 global::meter("aptu-coder")
1069 .u64_counter("mcp.server.tool.cache_write_failures_total")
1070 .with_description(
1071 "Number of L2 disk cache write failures (dir, tempfile, write, rename)",
1072 )
1073 .build()
1074 });
1075
1076 let error_type = event.error_type.as_deref().unwrap_or("success");
1077 let attributes = [
1078 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
1079 KeyValue::new("error.type", error_type.to_string()),
1080 KeyValue::new("mcp.method.name", "tools/call"),
1081 KeyValue::new("mcp.protocol.version", "2025-11-25"),
1082 KeyValue::new("network.transport", "pipe"),
1083 ];
1084
1085 histogram.record(event.duration_ms as f64 / 1000.0, &attributes);
1086 counter.add(1, &attributes);
1087
1088 if event.cache_hit == Some(true) {
1089 let tier = event.cache_tier.unwrap_or("unknown");
1090 cache_hits_counter.add(
1091 1,
1092 &[
1093 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
1094 KeyValue::new("cache_tier", tier.to_string()),
1095 ],
1096 );
1097 }
1098
1099 if event.cache_write_failure == Some(true) {
1100 cache_write_failures_counter.add(
1101 1,
1102 &[KeyValue::new("gen_ai.tool.name", event.tool.to_string())],
1103 );
1104 }
1105}