1use serde::{Deserialize, Serialize};
11use std::path::{Path, PathBuf};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::io::AsyncWriteExt;
14use tokio::sync::mpsc;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct MetricEvent {
19 pub ts: u64,
20 pub tool: &'static str,
21 pub duration_ms: u64,
22 pub output_chars: usize,
23 pub param_path_depth: usize,
24 pub max_depth: Option<u32>,
25 pub result: &'static str,
26 pub error_type: Option<String>,
27 #[serde(default)]
28 pub session_id: Option<String>,
29 #[serde(default)]
30 pub seq: Option<u32>,
31 #[serde(default)]
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub cache_hit: Option<bool>,
34}
35
36#[derive(Clone)]
38pub struct MetricsSender(pub tokio::sync::mpsc::UnboundedSender<MetricEvent>);
39
40impl MetricsSender {
41 pub fn send(&self, event: MetricEvent) {
42 let _ = self.0.send(event);
43 }
44}
45
46pub struct MetricsWriter {
48 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
49 base_dir: PathBuf,
50 dir_created: bool,
51}
52
53impl MetricsWriter {
54 pub fn new(
55 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
56 base_dir: Option<PathBuf>,
57 ) -> Self {
58 let dir = base_dir.unwrap_or_else(xdg_metrics_dir);
59 Self {
60 rx,
61 base_dir: dir,
62 dir_created: false,
63 }
64 }
65
66 fn accumulate_event(
68 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
69 export_session_id: &mut Option<String>,
70 event: &MetricEvent,
71 ) {
72 let entry = tool_counts.entry(event.tool).or_insert((0, 0));
73 entry.0 += 1;
74 entry.1 += event.duration_ms;
75 if export_session_id.is_none() {
76 *export_session_id = event.session_id.clone();
77 }
78 }
79
80 pub async fn run(mut self) {
81 cleanup_old_files(&self.base_dir).await;
82 let mut current_date = current_date_str();
83 let mut current_file: Option<PathBuf> = None;
84
85 let mut tool_counts: std::collections::HashMap<&'static str, (u64, u64)> =
87 std::collections::HashMap::new();
88 let mut export_session_id: Option<String> = None;
89
90 loop {
91 let mut batch = Vec::new();
92 if let Some(event) = self.rx.recv().await {
93 Self::accumulate_event(&mut tool_counts, &mut export_session_id, &event);
94 batch.push(event);
95 for _ in 0..99 {
96 match self.rx.try_recv() {
97 Ok(e) => {
98 Self::accumulate_event(&mut tool_counts, &mut export_session_id, &e);
99 batch.push(e);
100 }
101 Err(
102 mpsc::error::TryRecvError::Empty
103 | mpsc::error::TryRecvError::Disconnected,
104 ) => break,
105 }
106 }
107 } else {
108 break;
109 }
110
111 let new_date = current_date_str();
112 if new_date != current_date {
113 current_date = new_date;
114 current_file = None;
115 self.dir_created = false;
116 }
117
118 if current_file.is_none() {
119 current_file = Some(
120 self.base_dir
121 .join(format!("metrics-{}.jsonl", current_date)),
122 );
123 }
124
125 let path = current_file.as_ref().unwrap();
126
127 if !self.dir_created
129 && let Some(parent) = path.parent()
130 && !parent.as_os_str().is_empty()
131 {
132 match tokio::fs::create_dir_all(parent).await {
133 Ok(()) => {
134 self.dir_created = true;
135 }
136 Err(e) => {
137 tracing::warn!(
138 error = %e,
139 path = %parent.display(),
140 "metrics: failed to create directory; will retry next batch"
141 );
142 }
143 }
144 }
145
146 let file = tokio::fs::OpenOptions::new()
148 .create(true)
149 .append(true)
150 .open(path)
151 .await;
152
153 if let Ok(mut file) = file {
154 for event in batch {
155 record_otel_metrics(&event);
157
158 if let Ok(mut json) = serde_json::to_string(&event) {
160 json.push('\n');
161 let _ = file.write_all(json.as_bytes()).await;
162 }
163 }
164 let _ = file.flush().await;
165 }
166 }
167
168 if let Ok(export_path) = std::env::var("APTU_CODER_METRICS_EXPORT_FILE") {
170 if !std::path::Path::new(&export_path).is_absolute() {
171 tracing::warn!(
172 path = %export_path,
173 "metrics: APTU_CODER_METRICS_EXPORT_FILE must be an absolute path; skipping export"
174 );
175 } else {
176 let mut tool_calls = Vec::new();
177 let mut total_duration_ms = 0u64;
178 for (tool_name, (count, duration)) in tool_counts {
179 tool_calls.push(serde_json::json!({
180 "tool": tool_name,
181 "call_count": count,
182 "total_duration_ms": duration
183 }));
184 total_duration_ms += duration;
185 }
186 let summary = serde_json::json!({
187 "session_id": export_session_id.unwrap_or_default(),
188 "tool_calls": tool_calls,
189 "total_duration_ms": total_duration_ms
190 });
191 if let Ok(json_str) = serde_json::to_string(&summary)
192 && let Err(e) = tokio::fs::write(&export_path, json_str).await
193 {
194 tracing::warn!(
195 error = %e,
196 path = %export_path,
197 "metrics: failed to write export file"
198 );
199 }
200 }
201 }
202 }
203}
204
205#[must_use]
207pub fn unix_ms() -> u64 {
208 SystemTime::now()
209 .duration_since(UNIX_EPOCH)
210 .unwrap_or_default()
211 .as_millis()
212 .try_into()
213 .unwrap_or(u64::MAX)
214}
215
216#[must_use]
218pub fn path_component_count(path: &str) -> usize {
219 Path::new(path).components().count()
220}
221
222fn xdg_metrics_dir() -> PathBuf {
223 if let Ok(xdg_data_home) = std::env::var("XDG_DATA_HOME")
224 && !xdg_data_home.is_empty()
225 {
226 return PathBuf::from(xdg_data_home).join("aptu-coder");
227 }
228
229 if let Ok(home) = std::env::var("HOME") {
230 PathBuf::from(home)
231 .join(".local")
232 .join("share")
233 .join("aptu-coder")
234 } else {
235 PathBuf::from(".")
236 }
237}
238
239async fn cleanup_old_files(base_dir: &Path) {
240 let now_days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
241
242 let Ok(mut entries) = tokio::fs::read_dir(base_dir).await else {
243 return;
244 };
245
246 loop {
247 match entries.next_entry().await {
248 Ok(Some(entry)) => {
249 let path = entry.path();
250 let file_name = match path.file_name() {
251 Some(n) => n.to_string_lossy().into_owned(),
252 None => continue,
253 };
254
255 if !file_name.starts_with("metrics-")
257 || std::path::Path::new(&*file_name)
258 .extension()
259 .is_none_or(|e| !e.eq_ignore_ascii_case("jsonl"))
260 {
261 continue;
262 }
263 let date_part = &file_name[8..file_name.len() - 6];
264 if date_part.len() != 10
265 || date_part.as_bytes().get(4) != Some(&b'-')
266 || date_part.as_bytes().get(7) != Some(&b'-')
267 {
268 continue;
269 }
270 let Ok(year) = date_part[0..4].parse::<u32>() else {
271 continue;
272 };
273 let Ok(month) = date_part[5..7].parse::<u32>() else {
274 continue;
275 };
276 let Ok(day) = date_part[8..10].parse::<u32>() else {
277 continue;
278 };
279 if month == 0 || month > 12 || day == 0 || day > 31 {
280 continue;
281 }
282
283 let file_days = date_to_days_since_epoch(year, month, day);
284 if now_days > file_days && (now_days - file_days) > 30 {
285 let _ = tokio::fs::remove_file(&path).await;
286 }
287 }
288 Ok(None) => break,
289 Err(e) => {
290 tracing::warn!("error reading metrics directory entry: {e}");
291 }
292 }
293 }
294}
295
296fn date_to_days_since_epoch(y: u32, m: u32, d: u32) -> u32 {
297 let (y, m) = if m <= 2 { (y - 1, m + 9) } else { (y, m - 3) };
299 let era = y / 400;
300 let yoe = y - era * 400;
301 let doy = (153 * m + 2) / 5 + d - 1;
302 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
303 (era * 146_097 + doe).saturating_sub(719_468)
307}
308
309#[must_use]
311pub fn current_date_str() -> String {
312 let days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
313 let z = days + 719_468;
314 let era = z / 146_097;
315 let doe = z - era * 146_097;
316 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
317 let y = yoe + era * 400;
318 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
319 let mp = (5 * doy + 2) / 153;
320 let d = doy - (153 * mp + 2) / 5 + 1;
321 let m = if mp < 10 { mp + 3 } else { mp - 9 };
322 let y = if m <= 2 { y + 1 } else { y };
323 format!("{y:04}-{m:02}-{d:02}")
324}
325
326pub fn migrate_legacy_metrics_dir() -> std::io::Result<()> {
334 let home =
335 std::env::var("HOME").map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e))?;
336 migrate_legacy_metrics_dir_impl(&home)
337}
338
339#[allow(dead_code)]
340fn migrate_legacy_metrics_dir_impl(home: &str) -> std::io::Result<()> {
341 let old_dir = PathBuf::from(home).join(".local/share/code-analyze-mcp");
342 let new_dir = PathBuf::from(home).join(".local/share/aptu-coder");
343
344 let old_exists = old_dir.is_dir();
345 let new_exists = new_dir.is_dir();
346
347 if old_exists && !new_exists {
348 std::fs::rename(&old_dir, &new_dir)?;
349 tracing::info!(
350 "Migrated legacy metrics directory from {:?} to {:?}",
351 old_dir,
352 new_dir
353 );
354 } else if old_exists && new_exists {
355 tracing::warn!("Both legacy and new metrics directories exist; not migrating");
356 }
357 Ok(())
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use std::fs;
365 use std::sync::{Mutex, OnceLock};
366 use tempfile::TempDir;
367
368 fn metrics_export_lock() -> std::sync::MutexGuard<'static, ()> {
371 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
372 let m = LOCK.get_or_init(|| Mutex::new(()));
373 m.lock().unwrap_or_else(|e| e.into_inner())
374 }
375
376 #[test]
377 fn test_migrate_legacy_only_old_exists() {
378 let tmp_home = TempDir::new().unwrap();
380 let home_str = tmp_home.path().to_str().unwrap();
381 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
382 let new_path = tmp_home.path().join(".local/share/aptu-coder");
383 fs::create_dir_all(&old_path).unwrap();
384 assert!(!new_path.exists());
385
386 let result = migrate_legacy_metrics_dir_impl(home_str);
388
389 assert!(result.is_ok());
391 assert!(!old_path.exists(), "old dir should be moved");
392 assert!(new_path.is_dir(), "new dir should exist");
393 }
394
395 #[test]
396 fn test_migrate_legacy_both_exist() {
397 let tmp_home = TempDir::new().unwrap();
399 let home_str = tmp_home.path().to_str().unwrap();
400 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
401 let new_path = tmp_home.path().join(".local/share/aptu-coder");
402 fs::create_dir_all(&old_path).unwrap();
403 fs::create_dir_all(&new_path).unwrap();
404
405 let result = migrate_legacy_metrics_dir_impl(home_str);
407
408 assert!(result.is_ok());
410 assert!(old_path.is_dir(), "old dir should remain");
411 assert!(new_path.is_dir(), "new dir should remain");
412 }
413
414 #[test]
415 fn test_migrate_legacy_neither_exists() {
416 let tmp_home = TempDir::new().unwrap();
418 let home_str = tmp_home.path().to_str().unwrap();
419 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
420 let new_path = tmp_home.path().join(".local/share/aptu-coder");
421
422 let result = migrate_legacy_metrics_dir_impl(home_str);
424
425 assert!(result.is_ok());
427 assert!(!old_path.exists(), "old dir should not exist");
428 assert!(!new_path.exists(), "new dir should not exist");
429 }
430
431 #[test]
432 fn test_date_to_days_since_epoch_known_dates() {
433 assert_eq!(date_to_days_since_epoch(1970, 1, 1), 0);
434 assert_eq!(date_to_days_since_epoch(2020, 1, 1), 18_262);
435 assert_eq!(date_to_days_since_epoch(2000, 2, 29), 11_016);
436 }
437
438 #[test]
439 fn test_current_date_str_format() {
440 let s = current_date_str();
441 assert_eq!(s.len(), 10);
442 assert_eq!(s.as_bytes()[4], b'-');
443 assert_eq!(s.as_bytes()[7], b'-');
444 let year: u32 = s[0..4].parse().expect("year must be numeric");
445 assert!(year >= 2020 && year <= 2100);
446 }
447
448 #[tokio::test]
449 async fn test_metrics_writer_batching() {
450 let dir = TempDir::new().unwrap();
451 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
452 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
453 let make_event = || MetricEvent {
454 ts: unix_ms(),
455 tool: "analyze_directory",
456 duration_ms: 1,
457 output_chars: 10,
458 param_path_depth: 1,
459 max_depth: None,
460 result: "ok",
461 error_type: None,
462 session_id: None,
463 seq: None,
464 cache_hit: None,
465 };
466 tx.send(make_event()).unwrap();
467 tx.send(make_event()).unwrap();
468 tx.send(make_event()).unwrap();
469 drop(tx);
470 writer.run().await;
471 let entries: Vec<_> = std::fs::read_dir(dir.path())
472 .unwrap()
473 .filter_map(|e| e.ok())
474 .filter(|e| {
475 e.path()
476 .extension()
477 .and_then(|x| x.to_str())
478 .map(|x| x.eq_ignore_ascii_case("jsonl"))
479 .unwrap_or(false)
480 })
481 .collect();
482 assert_eq!(entries.len(), 1);
483 let content = std::fs::read_to_string(entries[0].path()).unwrap();
484 let lines: Vec<&str> = content.lines().collect();
485 assert_eq!(lines.len(), 3);
486 }
487
488 #[tokio::test]
489 async fn test_cleanup_old_files_deletes_old_keeps_recent() {
490 let dir = TempDir::new().unwrap();
491 let old_file = dir.path().join("metrics-1970-01-01.jsonl");
492 let today = current_date_str();
493 let recent_file = dir.path().join(format!("metrics-{}.jsonl", today));
494 std::fs::write(&old_file, "old\n").unwrap();
495 std::fs::write(&recent_file, "recent\n").unwrap();
496 cleanup_old_files(dir.path()).await;
497 assert!(!old_file.exists());
498 assert!(recent_file.exists());
499 }
500
501 #[test]
502 fn test_metric_event_serialization() {
503 let event = MetricEvent {
504 ts: 1_700_000_000_000,
505 tool: "analyze_directory",
506 duration_ms: 42,
507 output_chars: 100,
508 param_path_depth: 3,
509 max_depth: Some(2),
510 result: "ok",
511 error_type: None,
512 session_id: None,
513 seq: None,
514 cache_hit: None,
515 };
516 let json = serde_json::to_string(&event).unwrap();
517 assert!(json.contains("analyze_directory"));
518 assert!(json.contains(r#""result":"ok""#));
519 assert!(json.contains(r#""output_chars":100"#));
520 }
521
522 #[test]
523 fn test_metric_event_serialization_error() {
524 let event = MetricEvent {
525 ts: 1_700_000_000_000,
526 tool: "analyze_directory",
527 duration_ms: 5,
528 output_chars: 0,
529 param_path_depth: 3,
530 max_depth: Some(3),
531 result: "error",
532 error_type: Some("invalid_params".to_string()),
533 session_id: None,
534 seq: None,
535 cache_hit: None,
536 };
537 let json = serde_json::to_string(&event).unwrap();
538 assert!(json.contains(r#""result":"error""#));
539 assert!(json.contains(r#""error_type":"invalid_params""#));
540 assert!(json.contains(r#""output_chars":0"#));
541 assert!(json.contains(r#""max_depth":3"#));
542 }
543
544 #[test]
545 fn test_metric_event_new_fields_round_trip() {
546 let event = MetricEvent {
547 ts: 1_700_000_000_000,
548 tool: "analyze_file",
549 duration_ms: 100,
550 output_chars: 500,
551 param_path_depth: 2,
552 max_depth: Some(3),
553 result: "ok",
554 error_type: None,
555 session_id: Some("1742468880123-42".to_string()),
556 seq: Some(5),
557 cache_hit: None,
558 };
559 let serialized = serde_json::to_string(&event).unwrap();
560 let json_str = r#"{"ts":1700000000000,"tool":"analyze_file","duration_ms":100,"output_chars":500,"param_path_depth":2,"max_depth":3,"result":"ok","error_type":null,"session_id":"1742468880123-42","seq":5}"#;
561 assert_eq!(serialized, json_str);
562 }
563
564 #[tokio::test]
565 async fn test_metrics_export_file_created() {
566 let _guard = metrics_export_lock();
567 let dir = TempDir::new().unwrap();
569 let export_file = dir.path().join("metrics_export.json");
570 let export_path_str = export_file.to_string_lossy().to_string();
571
572 unsafe {
573 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", &export_path_str);
574 }
575
576 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
578 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
579
580 tx.send(MetricEvent {
582 ts: unix_ms(),
583 tool: "analyze_directory",
584 duration_ms: 100,
585 output_chars: 50,
586 param_path_depth: 1,
587 max_depth: None,
588 result: "ok",
589 error_type: None,
590 session_id: Some("test-session-123".to_string()),
591 seq: None,
592 cache_hit: None,
593 })
594 .unwrap();
595 tx.send(MetricEvent {
596 ts: unix_ms(),
597 tool: "analyze_file",
598 duration_ms: 50,
599 output_chars: 100,
600 param_path_depth: 2,
601 max_depth: Some(3),
602 result: "ok",
603 error_type: None,
604 session_id: Some("test-session-123".to_string()),
605 seq: None,
606 cache_hit: None,
607 })
608 .unwrap();
609 drop(tx);
610 writer.run().await;
611
612 assert!(
614 export_file.exists(),
615 "export file should be created at {:?}",
616 export_file
617 );
618 let content = std::fs::read_to_string(&export_file).unwrap();
619 let json: serde_json::Value = serde_json::from_str(&content).unwrap();
620
621 assert_eq!(
622 json["session_id"], "test-session-123",
623 "export should contain correct session_id"
624 );
625 assert!(
626 json["tool_calls"].is_array(),
627 "export should contain tool_calls array"
628 );
629 let tool_calls = json["tool_calls"].as_array().unwrap();
630 assert_eq!(tool_calls.len(), 2, "should have 2 tool calls");
631 assert!(
632 json["total_duration_ms"].is_number(),
633 "export should contain total_duration_ms"
634 );
635 assert_eq!(
636 json["total_duration_ms"], 150,
637 "total_duration_ms should be sum of all durations"
638 );
639
640 unsafe {
642 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
643 }
644 }
645
646 #[tokio::test]
647 async fn test_metrics_export_env_var_unset() {
648 let _guard = metrics_export_lock();
649 unsafe {
651 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
652 }
653 let dir = TempDir::new().unwrap();
654 let marker = "metrics_export_unset_test";
656
657 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
659 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
660
661 tx.send(MetricEvent {
663 ts: unix_ms(),
664 tool: "analyze_directory",
665 duration_ms: 100,
666 output_chars: 50,
667 param_path_depth: 1,
668 max_depth: None,
669 result: "ok",
670 error_type: None,
671 session_id: Some("test-session-456".to_string()),
672 seq: None,
673 cache_hit: None,
674 })
675 .unwrap();
676 drop(tx);
677 writer.run().await;
678
679 let entries: Vec<_> = std::fs::read_dir(dir.path())
681 .unwrap()
682 .filter_map(|e| e.ok())
683 .filter(|e| {
684 e.path()
685 .file_name()
686 .and_then(|n| n.to_str())
687 .map(|n| n.contains(marker))
688 .unwrap_or(false)
689 })
690 .collect();
691 assert_eq!(
692 entries.len(),
693 0,
694 "no export file should be created when env var is unset"
695 );
696 }
697
698 #[tokio::test]
699 async fn test_metrics_export_relative_path_rejected() {
700 let _guard = metrics_export_lock();
701 let relative_path = "relative/path/metrics.json";
703 unsafe {
704 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", relative_path);
705 }
706
707 let dir = TempDir::new().unwrap();
708 let marker = "metrics_export_relative_test";
710
711 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
713 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
714
715 tx.send(MetricEvent {
717 ts: unix_ms(),
718 tool: "analyze_directory",
719 duration_ms: 100,
720 output_chars: 50,
721 param_path_depth: 1,
722 max_depth: None,
723 result: "ok",
724 error_type: None,
725 session_id: Some(marker.to_string()),
726 seq: None,
727 cache_hit: None,
728 })
729 .unwrap();
730 drop(tx);
731 writer.run().await;
732
733 let entries: Vec<_> = std::fs::read_dir(dir.path())
735 .unwrap()
736 .filter_map(|e| e.ok())
737 .filter(|e| {
738 e.path()
739 .file_name()
740 .and_then(|n| n.to_str())
741 .map(|n| n.contains("metrics.json"))
742 .unwrap_or(false)
743 })
744 .collect();
745 assert_eq!(
746 entries.len(),
747 0,
748 "no export file should be created for relative path"
749 );
750
751 unsafe {
753 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
754 }
755 }
756}
757
758fn record_otel_metrics(event: &MetricEvent) {
768 use opentelemetry::metrics::{Counter, Histogram};
769 use opentelemetry::{KeyValue, global};
770 use std::sync::OnceLock;
771
772 static DURATION_HISTOGRAM: OnceLock<Histogram<f64>> = OnceLock::new();
773 static CALL_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
774
775 let histogram = DURATION_HISTOGRAM.get_or_init(|| {
776 global::meter("aptu-coder")
777 .f64_histogram("mcp.server.operation.duration")
778 .with_unit("ms")
779 .build()
780 });
781
782 let counter = CALL_COUNTER.get_or_init(|| {
783 global::meter("aptu-coder")
784 .u64_counter("mcp.server.tool.calls")
785 .build()
786 });
787
788 let error_type = event.error_type.as_deref().unwrap_or("success");
789 let attributes = [
790 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
791 KeyValue::new("error.type", error_type.to_string()),
792 ];
793
794 histogram.record(event.duration_ms as f64, &attributes);
795 counter.add(1, &attributes);
796}