1use serde::{Deserialize, Serialize};
11use std::path::{Path, PathBuf};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::io::AsyncWriteExt;
14use tokio::sync::mpsc;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct MetricEvent {
20 pub ts: u64,
21 pub tool: &'static str,
22 pub duration_ms: u64,
23 pub output_chars: usize,
24 pub param_path_depth: usize,
25 pub max_depth: Option<u32>,
26 pub result: &'static str,
27 pub error_type: Option<String>,
28 #[serde(default)]
29 pub session_id: Option<String>,
30 #[serde(default)]
31 pub seq: Option<u32>,
32 #[serde(default)]
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub cache_hit: Option<bool>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub cache_tier: Option<&'static str>,
37 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub cache_write_failure: Option<bool>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub exit_code: Option<i32>,
43 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
44 pub timed_out: bool,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub output_truncated: Option<bool>,
47 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
51 pub chars_threshold_breach: bool,
52}
53
54#[derive(Clone)]
56pub struct MetricsSender(pub tokio::sync::mpsc::UnboundedSender<MetricEvent>);
57
58impl MetricsSender {
59 pub fn send(&self, event: MetricEvent) {
60 let _ = self.0.send(event);
61 }
62}
63
64pub struct MetricsWriter {
66 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
67 base_dir: PathBuf,
68 dir_created: bool,
69}
70
71impl MetricsWriter {
72 pub fn new(
73 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
74 base_dir: Option<PathBuf>,
75 ) -> Self {
76 let dir = base_dir.unwrap_or_else(xdg_metrics_dir);
77 Self {
78 rx,
79 base_dir: dir,
80 dir_created: false,
81 }
82 }
83
84 fn accumulate_event(
86 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
87 export_session_id: &mut Option<String>,
88 event: &MetricEvent,
89 ) {
90 let entry = tool_counts.entry(event.tool).or_insert((0, 0));
91 entry.0 += 1;
92 entry.1 += event.duration_ms;
93 if export_session_id.is_none() {
94 *export_session_id = event.session_id.clone();
95 }
96 }
97
98 async fn flush_batch(file: &mut tokio::fs::File, batch: Vec<MetricEvent>) {
100 for event in batch {
101 record_otel_metrics(&event);
103
104 if let Ok(mut json) = serde_json::to_string(&event) {
106 json.push('\n');
107 let _ = file.write_all(json.as_bytes()).await;
108 }
109 }
110 let _ = file.flush().await;
111 }
112
113 fn rotate_metrics_file(
116 base_dir: &std::path::Path,
117 current_date: &mut String,
118 current_file: &mut Option<PathBuf>,
119 dir_created: &mut bool,
120 ) -> PathBuf {
121 let new_date = current_date_str();
122 if new_date != *current_date {
123 *current_date = new_date;
124 *current_file = None;
125 *dir_created = false;
126 }
127
128 if current_file.is_none() {
129 *current_file = Some(base_dir.join(format!("metrics-{}.jsonl", current_date)));
130 }
131
132 current_file
133 .as_ref()
134 .expect("current_file is guaranteed Some after check above")
135 .clone()
136 }
137
138 async fn receive_batch(
140 rx: &mut tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
141 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
142 export_session_id: &mut Option<String>,
143 ) -> Option<Vec<MetricEvent>> {
144 let mut batch = Vec::new();
145 if let Some(event) = rx.recv().await {
146 Self::accumulate_event(tool_counts, export_session_id, &event);
147 batch.push(event);
148 for _ in 0..99 {
149 match rx.try_recv() {
150 Ok(e) => {
151 Self::accumulate_event(tool_counts, export_session_id, &e);
152 batch.push(e);
153 }
154 Err(
155 mpsc::error::TryRecvError::Empty | mpsc::error::TryRecvError::Disconnected,
156 ) => break,
157 }
158 }
159 Some(batch)
160 } else {
161 None
162 }
163 }
164
165 async fn ensure_metrics_dir(path: &std::path::Path, dir_created: &mut bool) {
167 if !*dir_created
168 && let Some(parent) = path.parent()
169 && !parent.as_os_str().is_empty()
170 {
171 match tokio::fs::create_dir_all(parent).await {
172 Ok(()) => {
173 *dir_created = true;
174 }
175 Err(e) => {
176 tracing::warn!(
177 error = %e,
178 path = %parent.display(),
179 "metrics: failed to create directory; will retry next batch"
180 );
181 }
182 }
183 }
184 }
185
186 pub async fn run(mut self) {
187 cleanup_old_files(&self.base_dir).await;
188 let mut current_date = current_date_str();
189 let mut current_file: Option<PathBuf> = None;
190
191 let mut tool_counts: std::collections::HashMap<&'static str, (u64, u64)> =
193 std::collections::HashMap::new();
194 let mut export_session_id: Option<String> = None;
195
196 loop {
197 let Some(batch) =
198 Self::receive_batch(&mut self.rx, &mut tool_counts, &mut export_session_id).await
199 else {
200 break;
201 };
202
203 let path = Self::rotate_metrics_file(
204 &self.base_dir,
205 &mut current_date,
206 &mut current_file,
207 &mut self.dir_created,
208 );
209
210 Self::ensure_metrics_dir(&path, &mut self.dir_created).await;
211
212 let file = tokio::fs::OpenOptions::new()
214 .create(true)
215 .append(true)
216 .open(&path)
217 .await;
218
219 if let Ok(mut file) = file {
220 Self::flush_batch(&mut file, batch).await;
221 }
222 }
223
224 if let Ok(export_path) = std::env::var("APTU_CODER_METRICS_EXPORT_FILE") {
226 if !std::path::Path::new(&export_path).is_absolute() {
227 tracing::warn!(
228 path = %export_path,
229 "metrics: APTU_CODER_METRICS_EXPORT_FILE must be an absolute path; skipping export"
230 );
231 } else {
232 let mut tool_calls = Vec::new();
233 let mut total_duration_ms = 0u64;
234 for (tool_name, (count, duration)) in tool_counts {
235 tool_calls.push(serde_json::json!({
236 "tool": tool_name,
237 "call_count": count,
238 "total_duration_ms": duration
239 }));
240 total_duration_ms += duration;
241 }
242 let summary = serde_json::json!({
243 "session_id": export_session_id.unwrap_or_default(),
244 "tool_calls": tool_calls,
245 "total_duration_ms": total_duration_ms
246 });
247 if let Ok(json_str) = serde_json::to_string(&summary)
248 && let Err(e) = tokio::fs::write(&export_path, json_str).await
249 {
250 tracing::warn!(
251 error = %e,
252 path = %export_path,
253 "metrics: failed to write export file"
254 );
255 }
256 }
257 }
258 }
259}
260
261#[must_use]
263pub fn unix_ms() -> u64 {
264 SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .unwrap_or_default()
267 .as_millis()
268 .try_into()
269 .unwrap_or(u64::MAX)
270}
271
272#[must_use]
274pub fn path_component_count(path: &str) -> usize {
275 Path::new(path).components().count()
276}
277
278fn xdg_metrics_dir() -> PathBuf {
279 if let Ok(xdg_data_home) = std::env::var("XDG_DATA_HOME")
280 && !xdg_data_home.is_empty()
281 {
282 return PathBuf::from(xdg_data_home).join("aptu-coder");
283 }
284
285 if let Ok(home) = std::env::var("HOME") {
286 PathBuf::from(home)
287 .join(".local")
288 .join("share")
289 .join("aptu-coder")
290 } else {
291 PathBuf::from(".")
292 }
293}
294
295async fn cleanup_old_files(base_dir: &Path) {
296 let now_days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
297
298 let Ok(mut entries) = tokio::fs::read_dir(base_dir).await else {
299 return;
300 };
301
302 loop {
303 match entries.next_entry().await {
304 Ok(Some(entry)) => {
305 let path = entry.path();
306 let file_name = match path.file_name() {
307 Some(n) => n.to_string_lossy().into_owned(),
308 None => continue,
309 };
310
311 if !file_name.starts_with("metrics-")
313 || std::path::Path::new(&*file_name)
314 .extension()
315 .is_none_or(|e| !e.eq_ignore_ascii_case("jsonl"))
316 {
317 continue;
318 }
319 let date_part = &file_name[8..file_name.len() - 6];
320 if date_part.len() != 10
321 || date_part.as_bytes().get(4) != Some(&b'-')
322 || date_part.as_bytes().get(7) != Some(&b'-')
323 {
324 continue;
325 }
326 let Ok(year) = date_part[0..4].parse::<u32>() else {
327 continue;
328 };
329 let Ok(month) = date_part[5..7].parse::<u32>() else {
330 continue;
331 };
332 let Ok(day) = date_part[8..10].parse::<u32>() else {
333 continue;
334 };
335 if month == 0 || month > 12 || day == 0 || day > 31 {
336 continue;
337 }
338
339 let file_days = date_to_days_since_epoch(year, month, day);
340 if now_days > file_days && (now_days - file_days) > 30 {
341 let _ = tokio::fs::remove_file(&path).await;
342 }
343 }
344 Ok(None) => break,
345 Err(e) => {
346 tracing::warn!("error reading metrics directory entry: {e}");
347 }
348 }
349 }
350}
351
352fn date_to_days_since_epoch(y: u32, m: u32, d: u32) -> u32 {
353 let (y, m) = if m <= 2 { (y - 1, m + 9) } else { (y, m - 3) };
355 let era = y / 400;
356 let yoe = y - era * 400;
357 let doy = (153 * m + 2) / 5 + d - 1;
358 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
359 (era * 146_097 + doe).saturating_sub(719_468)
363}
364
365#[must_use]
367pub fn current_date_str() -> String {
368 let days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
369 let z = days + 719_468;
370 let era = z / 146_097;
371 let doe = z - era * 146_097;
372 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
373 let y = yoe + era * 400;
374 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
375 let mp = (5 * doy + 2) / 153;
376 let d = doy - (153 * mp + 2) / 5 + 1;
377 let m = if mp < 10 { mp + 3 } else { mp - 9 };
378 let y = if m <= 2 { y + 1 } else { y };
379 format!("{y:04}-{m:02}-{d:02}")
380}
381
382pub fn migrate_legacy_metrics_dir() -> std::io::Result<()> {
390 let home =
391 std::env::var("HOME").map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e))?;
392 migrate_legacy_metrics_dir_impl(&home)
393}
394
395#[allow(dead_code)]
396fn migrate_legacy_metrics_dir_impl(home: &str) -> std::io::Result<()> {
397 let old_dir = PathBuf::from(home).join(".local/share/code-analyze-mcp");
398 let new_dir = PathBuf::from(home).join(".local/share/aptu-coder");
399
400 let old_exists = old_dir.is_dir();
401 let new_exists = new_dir.is_dir();
402
403 if old_exists && !new_exists {
404 std::fs::rename(&old_dir, &new_dir)?;
405 tracing::info!(
406 "Migrated legacy metrics directory from {:?} to {:?}",
407 old_dir,
408 new_dir
409 );
410 } else if old_exists && new_exists {
411 tracing::warn!("Both legacy and new metrics directories exist; not migrating");
412 }
413 Ok(())
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use std::fs;
421 use std::sync::{Mutex, OnceLock};
422 use tempfile::TempDir;
423
424 fn metrics_export_lock() -> std::sync::MutexGuard<'static, ()> {
427 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
428 let m = LOCK.get_or_init(|| Mutex::new(()));
429 m.lock().unwrap_or_else(|e| e.into_inner())
430 }
431
432 #[test]
433 fn test_migrate_legacy_only_old_exists() {
434 let tmp_home = TempDir::new().unwrap();
436 let home_str = tmp_home.path().to_str().unwrap();
437 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
438 let new_path = tmp_home.path().join(".local/share/aptu-coder");
439 fs::create_dir_all(&old_path).unwrap();
440 assert!(!new_path.exists());
441
442 let result = migrate_legacy_metrics_dir_impl(home_str);
444
445 assert!(result.is_ok());
447 assert!(!old_path.exists(), "old dir should be moved");
448 assert!(new_path.is_dir(), "new dir should exist");
449 }
450
451 #[test]
452 fn test_migrate_legacy_both_exist() {
453 let tmp_home = TempDir::new().unwrap();
455 let home_str = tmp_home.path().to_str().unwrap();
456 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
457 let new_path = tmp_home.path().join(".local/share/aptu-coder");
458 fs::create_dir_all(&old_path).unwrap();
459 fs::create_dir_all(&new_path).unwrap();
460
461 let result = migrate_legacy_metrics_dir_impl(home_str);
463
464 assert!(result.is_ok());
466 assert!(old_path.is_dir(), "old dir should remain");
467 assert!(new_path.is_dir(), "new dir should remain");
468 }
469
470 #[test]
471 fn test_migrate_legacy_neither_exists() {
472 let tmp_home = TempDir::new().unwrap();
474 let home_str = tmp_home.path().to_str().unwrap();
475 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
476 let new_path = tmp_home.path().join(".local/share/aptu-coder");
477
478 let result = migrate_legacy_metrics_dir_impl(home_str);
480
481 assert!(result.is_ok());
483 assert!(!old_path.exists(), "old dir should not exist");
484 assert!(!new_path.exists(), "new dir should not exist");
485 }
486
487 #[test]
488 fn test_date_to_days_since_epoch_known_dates() {
489 assert_eq!(date_to_days_since_epoch(1970, 1, 1), 0);
490 assert_eq!(date_to_days_since_epoch(2020, 1, 1), 18_262);
491 assert_eq!(date_to_days_since_epoch(2000, 2, 29), 11_016);
492 }
493
494 #[test]
495 fn test_current_date_str_format() {
496 let s = current_date_str();
497 assert_eq!(s.len(), 10);
498 assert_eq!(s.as_bytes()[4], b'-');
499 assert_eq!(s.as_bytes()[7], b'-');
500 let year: u32 = s[0..4].parse().expect("year must be numeric");
501 assert!(year >= 2020 && year <= 2100);
502 }
503
504 #[tokio::test]
505 async fn test_metrics_writer_batching() {
506 let dir = TempDir::new().unwrap();
507 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
508 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
509 let make_event = || MetricEvent {
510 ts: unix_ms(),
511 tool: "analyze_directory",
512 duration_ms: 1,
513 output_chars: 10,
514 param_path_depth: 1,
515 max_depth: None,
516 result: "ok",
517 error_type: None,
518 session_id: None,
519 seq: None,
520 cache_hit: None,
521 cache_write_failure: None,
522 exit_code: None,
523 timed_out: false,
524 cache_tier: None,
525 output_truncated: None,
526 chars_threshold_breach: false,
527 };
528 tx.send(make_event()).unwrap();
529 tx.send(make_event()).unwrap();
530 tx.send(make_event()).unwrap();
531 drop(tx);
532 writer.run().await;
533 let entries: Vec<_> = std::fs::read_dir(dir.path())
534 .unwrap()
535 .filter_map(|e| e.ok())
536 .filter(|e| {
537 e.path()
538 .extension()
539 .and_then(|x| x.to_str())
540 .map(|x| x.eq_ignore_ascii_case("jsonl"))
541 .unwrap_or(false)
542 })
543 .collect();
544 assert_eq!(entries.len(), 1);
545 let content = std::fs::read_to_string(entries[0].path()).unwrap();
546 let lines: Vec<&str> = content.lines().collect();
547 assert_eq!(lines.len(), 3);
548 }
549
550 #[tokio::test]
551 async fn test_cleanup_old_files_deletes_old_keeps_recent() {
552 let dir = TempDir::new().unwrap();
553 let old_file = dir.path().join("metrics-1970-01-01.jsonl");
554 let today = current_date_str();
555 let recent_file = dir.path().join(format!("metrics-{}.jsonl", today));
556 std::fs::write(&old_file, "old\n").unwrap();
557 std::fs::write(&recent_file, "recent\n").unwrap();
558 cleanup_old_files(dir.path()).await;
559 assert!(!old_file.exists());
560 assert!(recent_file.exists());
561 }
562
563 #[test]
564 fn test_metric_event_serialization() {
565 let event = MetricEvent {
566 ts: 1_700_000_000_000,
567 tool: "analyze_directory",
568 duration_ms: 42,
569 output_chars: 100,
570 param_path_depth: 3,
571 max_depth: Some(2),
572 result: "ok",
573 error_type: None,
574 session_id: None,
575 seq: None,
576 cache_hit: None,
577 cache_write_failure: None,
578 exit_code: None,
579 timed_out: false,
580 cache_tier: None,
581 output_truncated: None,
582 chars_threshold_breach: false,
583 };
584 let json = serde_json::to_string(&event).unwrap();
585 assert!(json.contains("analyze_directory"));
586 assert!(json.contains(r#""result":"ok""#));
587 assert!(json.contains(r#""output_chars":100"#));
588 }
589
590 #[test]
591 fn test_metric_event_serialization_error() {
592 let event = MetricEvent {
593 ts: 1_700_000_000_000,
594 tool: "analyze_directory",
595 duration_ms: 5,
596 output_chars: 0,
597 param_path_depth: 3,
598 max_depth: Some(3),
599 result: "error",
600 error_type: Some("invalid_params".to_string()),
601 session_id: None,
602 seq: None,
603 cache_hit: None,
604 cache_write_failure: None,
605 exit_code: None,
606 timed_out: false,
607 cache_tier: None,
608 output_truncated: None,
609 chars_threshold_breach: false,
610 };
611 let json = serde_json::to_string(&event).unwrap();
612 assert!(json.contains(r#""result":"error""#));
613 assert!(json.contains(r#""error_type":"invalid_params""#));
614 assert!(json.contains(r#""output_chars":0"#));
615 assert!(json.contains(r#""max_depth":3"#));
616 }
617
618 #[test]
619 fn test_metric_event_new_fields_round_trip() {
620 let event = MetricEvent {
621 ts: 1_700_000_000_000,
622 tool: "analyze_file",
623 duration_ms: 100,
624 output_chars: 500,
625 param_path_depth: 2,
626 max_depth: Some(3),
627 result: "ok",
628 error_type: None,
629 session_id: Some("1742468880123-42".to_string()),
630 seq: Some(5),
631 cache_hit: None,
632 cache_write_failure: None,
633 exit_code: None,
634 timed_out: false,
635 cache_tier: None,
636 output_truncated: None,
637 chars_threshold_breach: false,
638 };
639 let serialized = serde_json::to_string(&event).unwrap();
640 let json_str = r#"{"ts":1700000000000,"tool":"analyze_file","duration_ms":100,"output_chars":500,"param_path_depth":2,"max_depth":3,"result":"ok","error_type":null,"session_id":"1742468880123-42","seq":5}"#;
641 assert_eq!(serialized, json_str);
642 }
643
644 #[tokio::test]
645 async fn test_metrics_export_file_created() {
646 let _guard = metrics_export_lock();
647 let dir = TempDir::new().unwrap();
649 let export_file = dir.path().join("metrics_export.json");
650 let export_path_str = export_file.to_string_lossy().to_string();
651
652 unsafe {
653 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", &export_path_str);
654 }
655
656 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
658 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
659
660 tx.send(MetricEvent {
662 ts: unix_ms(),
663 tool: "analyze_directory",
664 duration_ms: 100,
665 output_chars: 50,
666 param_path_depth: 1,
667 max_depth: None,
668 result: "ok",
669 error_type: None,
670 session_id: Some("test-session-123".to_string()),
671 seq: None,
672 cache_hit: None,
673 cache_write_failure: None,
674 exit_code: None,
675 timed_out: false,
676 cache_tier: None,
677 output_truncated: None,
678 chars_threshold_breach: false,
679 })
680 .unwrap();
681 tx.send(MetricEvent {
682 ts: unix_ms(),
683 tool: "analyze_file",
684 duration_ms: 50,
685 output_chars: 100,
686 param_path_depth: 2,
687 max_depth: Some(3),
688 result: "ok",
689 error_type: None,
690 session_id: Some("test-session-123".to_string()),
691 seq: None,
692 cache_hit: None,
693 cache_write_failure: None,
694 exit_code: None,
695 timed_out: false,
696 cache_tier: None,
697 output_truncated: None,
698 chars_threshold_breach: false,
699 })
700 .unwrap();
701 drop(tx);
702 writer.run().await;
703
704 assert!(
706 export_file.exists(),
707 "export file should be created at {:?}",
708 export_file
709 );
710 let content = std::fs::read_to_string(&export_file).unwrap();
711 let json: serde_json::Value = serde_json::from_str(&content).unwrap();
712
713 assert_eq!(
714 json["session_id"], "test-session-123",
715 "export should contain correct session_id"
716 );
717 assert!(
718 json["tool_calls"].is_array(),
719 "export should contain tool_calls array"
720 );
721 let tool_calls = json["tool_calls"].as_array().unwrap();
722 assert_eq!(tool_calls.len(), 2, "should have 2 tool calls");
723 assert!(
724 json["total_duration_ms"].is_number(),
725 "export should contain total_duration_ms"
726 );
727 assert_eq!(
728 json["total_duration_ms"], 150,
729 "total_duration_ms should be sum of all durations"
730 );
731
732 unsafe {
734 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
735 }
736 }
737
738 #[tokio::test]
739 async fn test_metrics_export_env_var_unset() {
740 let _guard = metrics_export_lock();
741 unsafe {
743 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
744 }
745 let dir = TempDir::new().unwrap();
746 let marker = "metrics_export_unset_test";
748
749 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
751 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
752
753 tx.send(MetricEvent {
755 ts: unix_ms(),
756 tool: "analyze_directory",
757 duration_ms: 100,
758 output_chars: 50,
759 param_path_depth: 1,
760 max_depth: None,
761 result: "ok",
762 error_type: None,
763 session_id: Some("test-session-456".to_string()),
764 seq: None,
765 cache_hit: None,
766 cache_write_failure: None,
767 exit_code: None,
768 timed_out: false,
769 cache_tier: None,
770 output_truncated: None,
771 chars_threshold_breach: false,
772 })
773 .unwrap();
774 drop(tx);
775 writer.run().await;
776
777 let entries: Vec<_> = std::fs::read_dir(dir.path())
779 .unwrap()
780 .filter_map(|e| e.ok())
781 .filter(|e| {
782 e.path()
783 .file_name()
784 .and_then(|n| n.to_str())
785 .map(|n| n.contains(marker))
786 .unwrap_or(false)
787 })
788 .collect();
789 assert_eq!(
790 entries.len(),
791 0,
792 "no export file should be created when env var is unset"
793 );
794 }
795
796 #[tokio::test]
797 async fn test_metrics_export_relative_path_rejected() {
798 let _guard = metrics_export_lock();
799 let relative_path = "relative/path/metrics.json";
801 unsafe {
802 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", relative_path);
803 }
804
805 let dir = TempDir::new().unwrap();
806 let marker = "metrics_export_relative_test";
808
809 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
811 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
812
813 tx.send(MetricEvent {
815 ts: unix_ms(),
816 tool: "analyze_directory",
817 duration_ms: 100,
818 output_chars: 50,
819 param_path_depth: 1,
820 max_depth: None,
821 result: "ok",
822 error_type: None,
823 session_id: Some(marker.to_string()),
824 seq: None,
825 cache_hit: None,
826 cache_write_failure: None,
827 exit_code: None,
828 timed_out: false,
829 cache_tier: None,
830 output_truncated: None,
831 chars_threshold_breach: false,
832 })
833 .unwrap();
834 drop(tx);
835 writer.run().await;
836
837 let entries: Vec<_> = std::fs::read_dir(dir.path())
839 .unwrap()
840 .filter_map(|e| e.ok())
841 .filter(|e| {
842 e.path()
843 .file_name()
844 .and_then(|n| n.to_str())
845 .map(|n| n.contains("metrics.json"))
846 .unwrap_or(false)
847 })
848 .collect();
849 assert_eq!(
850 entries.len(),
851 0,
852 "no export file should be created for relative path"
853 );
854
855 unsafe {
857 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
858 }
859 }
860}
861
862fn record_otel_metrics(event: &MetricEvent) {
872 use opentelemetry::metrics::{Counter, Histogram};
873 use opentelemetry::{KeyValue, global};
874 use std::sync::OnceLock;
875
876 static DURATION_HISTOGRAM: OnceLock<Histogram<f64>> = OnceLock::new();
877 static CALL_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
878 static CACHE_HITS_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
879 static CACHE_WRITE_FAILURES_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
880
881 let histogram = DURATION_HISTOGRAM.get_or_init(|| {
882 global::meter("aptu-coder")
883 .f64_histogram("mcp.server.operation.duration")
884 .with_unit("s")
885 .with_boundaries(vec![
886 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
887 ])
888 .build()
889 });
890
891 let counter = CALL_COUNTER.get_or_init(|| {
892 global::meter("aptu-coder")
893 .u64_counter("mcp.server.tool.calls")
894 .build()
895 });
896
897 let cache_hits_counter = CACHE_HITS_COUNTER.get_or_init(|| {
898 global::meter("aptu-coder")
899 .u64_counter("mcp.server.tool.cache_hits_total")
900 .with_description("Number of tool responses served from cache (l1_memory or l2_disk)")
901 .build()
902 });
903
904 let cache_write_failures_counter = CACHE_WRITE_FAILURES_COUNTER.get_or_init(|| {
905 global::meter("aptu-coder")
906 .u64_counter("mcp.server.tool.cache_write_failures_total")
907 .with_description(
908 "Number of L2 disk cache write failures (dir, tempfile, write, rename)",
909 )
910 .build()
911 });
912
913 let error_type = event.error_type.as_deref().unwrap_or("success");
914 let attributes = [
915 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
916 KeyValue::new("error.type", error_type.to_string()),
917 KeyValue::new("mcp.method.name", "tools/call"),
918 KeyValue::new("mcp.protocol.version", "2025-11-25"),
919 KeyValue::new("network.transport", "pipe"),
920 ];
921
922 histogram.record(event.duration_ms as f64 / 1000.0, &attributes);
923 counter.add(1, &attributes);
924
925 if event.cache_hit == Some(true) {
926 let tier = event.cache_tier.unwrap_or("unknown");
927 cache_hits_counter.add(
928 1,
929 &[
930 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
931 KeyValue::new("cache_tier", tier.to_string()),
932 ],
933 );
934 }
935
936 if event.cache_write_failure == Some(true) {
937 cache_write_failures_counter.add(
938 1,
939 &[KeyValue::new("gen_ai.tool.name", event.tool.to_string())],
940 );
941 }
942}