1use serde::{Deserialize, Serialize};
11use std::path::{Path, PathBuf};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::io::AsyncWriteExt;
14use tokio::sync::mpsc;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct MetricEvent {
20 pub ts: u64,
21 pub tool: &'static str,
22 pub duration_ms: u64,
23 pub output_chars: usize,
24 pub param_path_depth: usize,
25 pub max_depth: Option<u32>,
26 pub result: &'static str,
27 pub error_type: Option<String>,
28 #[serde(default)]
29 pub session_id: Option<String>,
30 #[serde(default)]
31 pub seq: Option<u32>,
32 #[serde(default)]
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub cache_hit: Option<bool>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub cache_tier: Option<&'static str>,
37 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub cache_write_failure: Option<bool>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub exit_code: Option<i32>,
43 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
44 pub timed_out: bool,
45}
46
47#[derive(Clone)]
49pub struct MetricsSender(pub tokio::sync::mpsc::UnboundedSender<MetricEvent>);
50
51impl MetricsSender {
52 pub fn send(&self, event: MetricEvent) {
53 let _ = self.0.send(event);
54 }
55}
56
57pub struct MetricsWriter {
59 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
60 base_dir: PathBuf,
61 dir_created: bool,
62}
63
64impl MetricsWriter {
65 pub fn new(
66 rx: tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
67 base_dir: Option<PathBuf>,
68 ) -> Self {
69 let dir = base_dir.unwrap_or_else(xdg_metrics_dir);
70 Self {
71 rx,
72 base_dir: dir,
73 dir_created: false,
74 }
75 }
76
77 fn accumulate_event(
79 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
80 export_session_id: &mut Option<String>,
81 event: &MetricEvent,
82 ) {
83 let entry = tool_counts.entry(event.tool).or_insert((0, 0));
84 entry.0 += 1;
85 entry.1 += event.duration_ms;
86 if export_session_id.is_none() {
87 *export_session_id = event.session_id.clone();
88 }
89 }
90
91 async fn flush_batch(file: &mut tokio::fs::File, batch: Vec<MetricEvent>) {
93 for event in batch {
94 record_otel_metrics(&event);
96
97 if let Ok(mut json) = serde_json::to_string(&event) {
99 json.push('\n');
100 let _ = file.write_all(json.as_bytes()).await;
101 }
102 }
103 let _ = file.flush().await;
104 }
105
106 fn rotate_metrics_file(
109 base_dir: &std::path::Path,
110 current_date: &mut String,
111 current_file: &mut Option<PathBuf>,
112 dir_created: &mut bool,
113 ) -> PathBuf {
114 let new_date = current_date_str();
115 if new_date != *current_date {
116 *current_date = new_date;
117 *current_file = None;
118 *dir_created = false;
119 }
120
121 if current_file.is_none() {
122 *current_file = Some(base_dir.join(format!("metrics-{}.jsonl", current_date)));
123 }
124
125 current_file
126 .as_ref()
127 .expect("current_file is guaranteed Some after check above")
128 .clone()
129 }
130
131 async fn receive_batch(
133 rx: &mut tokio::sync::mpsc::UnboundedReceiver<MetricEvent>,
134 tool_counts: &mut std::collections::HashMap<&'static str, (u64, u64)>,
135 export_session_id: &mut Option<String>,
136 ) -> Option<Vec<MetricEvent>> {
137 let mut batch = Vec::new();
138 if let Some(event) = rx.recv().await {
139 Self::accumulate_event(tool_counts, export_session_id, &event);
140 batch.push(event);
141 for _ in 0..99 {
142 match rx.try_recv() {
143 Ok(e) => {
144 Self::accumulate_event(tool_counts, export_session_id, &e);
145 batch.push(e);
146 }
147 Err(
148 mpsc::error::TryRecvError::Empty | mpsc::error::TryRecvError::Disconnected,
149 ) => break,
150 }
151 }
152 Some(batch)
153 } else {
154 None
155 }
156 }
157
158 async fn ensure_metrics_dir(path: &std::path::Path, dir_created: &mut bool) {
160 if !*dir_created
161 && let Some(parent) = path.parent()
162 && !parent.as_os_str().is_empty()
163 {
164 match tokio::fs::create_dir_all(parent).await {
165 Ok(()) => {
166 *dir_created = true;
167 }
168 Err(e) => {
169 tracing::warn!(
170 error = %e,
171 path = %parent.display(),
172 "metrics: failed to create directory; will retry next batch"
173 );
174 }
175 }
176 }
177 }
178
179 pub async fn run(mut self) {
180 cleanup_old_files(&self.base_dir).await;
181 let mut current_date = current_date_str();
182 let mut current_file: Option<PathBuf> = None;
183
184 let mut tool_counts: std::collections::HashMap<&'static str, (u64, u64)> =
186 std::collections::HashMap::new();
187 let mut export_session_id: Option<String> = None;
188
189 loop {
190 let Some(batch) =
191 Self::receive_batch(&mut self.rx, &mut tool_counts, &mut export_session_id).await
192 else {
193 break;
194 };
195
196 let path = Self::rotate_metrics_file(
197 &self.base_dir,
198 &mut current_date,
199 &mut current_file,
200 &mut self.dir_created,
201 );
202
203 Self::ensure_metrics_dir(&path, &mut self.dir_created).await;
204
205 let file = tokio::fs::OpenOptions::new()
207 .create(true)
208 .append(true)
209 .open(&path)
210 .await;
211
212 if let Ok(mut file) = file {
213 Self::flush_batch(&mut file, batch).await;
214 }
215 }
216
217 if let Ok(export_path) = std::env::var("APTU_CODER_METRICS_EXPORT_FILE") {
219 if !std::path::Path::new(&export_path).is_absolute() {
220 tracing::warn!(
221 path = %export_path,
222 "metrics: APTU_CODER_METRICS_EXPORT_FILE must be an absolute path; skipping export"
223 );
224 } else {
225 let mut tool_calls = Vec::new();
226 let mut total_duration_ms = 0u64;
227 for (tool_name, (count, duration)) in tool_counts {
228 tool_calls.push(serde_json::json!({
229 "tool": tool_name,
230 "call_count": count,
231 "total_duration_ms": duration
232 }));
233 total_duration_ms += duration;
234 }
235 let summary = serde_json::json!({
236 "session_id": export_session_id.unwrap_or_default(),
237 "tool_calls": tool_calls,
238 "total_duration_ms": total_duration_ms
239 });
240 if let Ok(json_str) = serde_json::to_string(&summary)
241 && let Err(e) = tokio::fs::write(&export_path, json_str).await
242 {
243 tracing::warn!(
244 error = %e,
245 path = %export_path,
246 "metrics: failed to write export file"
247 );
248 }
249 }
250 }
251 }
252}
253
254#[must_use]
256pub fn unix_ms() -> u64 {
257 SystemTime::now()
258 .duration_since(UNIX_EPOCH)
259 .unwrap_or_default()
260 .as_millis()
261 .try_into()
262 .unwrap_or(u64::MAX)
263}
264
265#[must_use]
267pub fn path_component_count(path: &str) -> usize {
268 Path::new(path).components().count()
269}
270
271fn xdg_metrics_dir() -> PathBuf {
272 if let Ok(xdg_data_home) = std::env::var("XDG_DATA_HOME")
273 && !xdg_data_home.is_empty()
274 {
275 return PathBuf::from(xdg_data_home).join("aptu-coder");
276 }
277
278 if let Ok(home) = std::env::var("HOME") {
279 PathBuf::from(home)
280 .join(".local")
281 .join("share")
282 .join("aptu-coder")
283 } else {
284 PathBuf::from(".")
285 }
286}
287
288async fn cleanup_old_files(base_dir: &Path) {
289 let now_days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
290
291 let Ok(mut entries) = tokio::fs::read_dir(base_dir).await else {
292 return;
293 };
294
295 loop {
296 match entries.next_entry().await {
297 Ok(Some(entry)) => {
298 let path = entry.path();
299 let file_name = match path.file_name() {
300 Some(n) => n.to_string_lossy().into_owned(),
301 None => continue,
302 };
303
304 if !file_name.starts_with("metrics-")
306 || std::path::Path::new(&*file_name)
307 .extension()
308 .is_none_or(|e| !e.eq_ignore_ascii_case("jsonl"))
309 {
310 continue;
311 }
312 let date_part = &file_name[8..file_name.len() - 6];
313 if date_part.len() != 10
314 || date_part.as_bytes().get(4) != Some(&b'-')
315 || date_part.as_bytes().get(7) != Some(&b'-')
316 {
317 continue;
318 }
319 let Ok(year) = date_part[0..4].parse::<u32>() else {
320 continue;
321 };
322 let Ok(month) = date_part[5..7].parse::<u32>() else {
323 continue;
324 };
325 let Ok(day) = date_part[8..10].parse::<u32>() else {
326 continue;
327 };
328 if month == 0 || month > 12 || day == 0 || day > 31 {
329 continue;
330 }
331
332 let file_days = date_to_days_since_epoch(year, month, day);
333 if now_days > file_days && (now_days - file_days) > 30 {
334 let _ = tokio::fs::remove_file(&path).await;
335 }
336 }
337 Ok(None) => break,
338 Err(e) => {
339 tracing::warn!("error reading metrics directory entry: {e}");
340 }
341 }
342 }
343}
344
345fn date_to_days_since_epoch(y: u32, m: u32, d: u32) -> u32 {
346 let (y, m) = if m <= 2 { (y - 1, m + 9) } else { (y, m - 3) };
348 let era = y / 400;
349 let yoe = y - era * 400;
350 let doy = (153 * m + 2) / 5 + d - 1;
351 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
352 (era * 146_097 + doe).saturating_sub(719_468)
356}
357
358#[must_use]
360pub fn current_date_str() -> String {
361 let days = u32::try_from(unix_ms() / 86_400_000).unwrap_or(u32::MAX);
362 let z = days + 719_468;
363 let era = z / 146_097;
364 let doe = z - era * 146_097;
365 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
366 let y = yoe + era * 400;
367 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
368 let mp = (5 * doy + 2) / 153;
369 let d = doy - (153 * mp + 2) / 5 + 1;
370 let m = if mp < 10 { mp + 3 } else { mp - 9 };
371 let y = if m <= 2 { y + 1 } else { y };
372 format!("{y:04}-{m:02}-{d:02}")
373}
374
375pub fn migrate_legacy_metrics_dir() -> std::io::Result<()> {
383 let home =
384 std::env::var("HOME").map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e))?;
385 migrate_legacy_metrics_dir_impl(&home)
386}
387
388#[allow(dead_code)]
389fn migrate_legacy_metrics_dir_impl(home: &str) -> std::io::Result<()> {
390 let old_dir = PathBuf::from(home).join(".local/share/code-analyze-mcp");
391 let new_dir = PathBuf::from(home).join(".local/share/aptu-coder");
392
393 let old_exists = old_dir.is_dir();
394 let new_exists = new_dir.is_dir();
395
396 if old_exists && !new_exists {
397 std::fs::rename(&old_dir, &new_dir)?;
398 tracing::info!(
399 "Migrated legacy metrics directory from {:?} to {:?}",
400 old_dir,
401 new_dir
402 );
403 } else if old_exists && new_exists {
404 tracing::warn!("Both legacy and new metrics directories exist; not migrating");
405 }
406 Ok(())
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use std::fs;
414 use std::sync::{Mutex, OnceLock};
415 use tempfile::TempDir;
416
417 fn metrics_export_lock() -> std::sync::MutexGuard<'static, ()> {
420 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
421 let m = LOCK.get_or_init(|| Mutex::new(()));
422 m.lock().unwrap_or_else(|e| e.into_inner())
423 }
424
425 #[test]
426 fn test_migrate_legacy_only_old_exists() {
427 let tmp_home = TempDir::new().unwrap();
429 let home_str = tmp_home.path().to_str().unwrap();
430 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
431 let new_path = tmp_home.path().join(".local/share/aptu-coder");
432 fs::create_dir_all(&old_path).unwrap();
433 assert!(!new_path.exists());
434
435 let result = migrate_legacy_metrics_dir_impl(home_str);
437
438 assert!(result.is_ok());
440 assert!(!old_path.exists(), "old dir should be moved");
441 assert!(new_path.is_dir(), "new dir should exist");
442 }
443
444 #[test]
445 fn test_migrate_legacy_both_exist() {
446 let tmp_home = TempDir::new().unwrap();
448 let home_str = tmp_home.path().to_str().unwrap();
449 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
450 let new_path = tmp_home.path().join(".local/share/aptu-coder");
451 fs::create_dir_all(&old_path).unwrap();
452 fs::create_dir_all(&new_path).unwrap();
453
454 let result = migrate_legacy_metrics_dir_impl(home_str);
456
457 assert!(result.is_ok());
459 assert!(old_path.is_dir(), "old dir should remain");
460 assert!(new_path.is_dir(), "new dir should remain");
461 }
462
463 #[test]
464 fn test_migrate_legacy_neither_exists() {
465 let tmp_home = TempDir::new().unwrap();
467 let home_str = tmp_home.path().to_str().unwrap();
468 let old_path = tmp_home.path().join(".local/share/code-analyze-mcp");
469 let new_path = tmp_home.path().join(".local/share/aptu-coder");
470
471 let result = migrate_legacy_metrics_dir_impl(home_str);
473
474 assert!(result.is_ok());
476 assert!(!old_path.exists(), "old dir should not exist");
477 assert!(!new_path.exists(), "new dir should not exist");
478 }
479
480 #[test]
481 fn test_date_to_days_since_epoch_known_dates() {
482 assert_eq!(date_to_days_since_epoch(1970, 1, 1), 0);
483 assert_eq!(date_to_days_since_epoch(2020, 1, 1), 18_262);
484 assert_eq!(date_to_days_since_epoch(2000, 2, 29), 11_016);
485 }
486
487 #[test]
488 fn test_current_date_str_format() {
489 let s = current_date_str();
490 assert_eq!(s.len(), 10);
491 assert_eq!(s.as_bytes()[4], b'-');
492 assert_eq!(s.as_bytes()[7], b'-');
493 let year: u32 = s[0..4].parse().expect("year must be numeric");
494 assert!(year >= 2020 && year <= 2100);
495 }
496
497 #[tokio::test]
498 async fn test_metrics_writer_batching() {
499 let dir = TempDir::new().unwrap();
500 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
501 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
502 let make_event = || MetricEvent {
503 ts: unix_ms(),
504 tool: "analyze_directory",
505 duration_ms: 1,
506 output_chars: 10,
507 param_path_depth: 1,
508 max_depth: None,
509 result: "ok",
510 error_type: None,
511 session_id: None,
512 seq: None,
513 cache_hit: None,
514 cache_write_failure: None,
515 exit_code: None,
516 timed_out: false,
517 cache_tier: None,
518 };
519 tx.send(make_event()).unwrap();
520 tx.send(make_event()).unwrap();
521 tx.send(make_event()).unwrap();
522 drop(tx);
523 writer.run().await;
524 let entries: Vec<_> = std::fs::read_dir(dir.path())
525 .unwrap()
526 .filter_map(|e| e.ok())
527 .filter(|e| {
528 e.path()
529 .extension()
530 .and_then(|x| x.to_str())
531 .map(|x| x.eq_ignore_ascii_case("jsonl"))
532 .unwrap_or(false)
533 })
534 .collect();
535 assert_eq!(entries.len(), 1);
536 let content = std::fs::read_to_string(entries[0].path()).unwrap();
537 let lines: Vec<&str> = content.lines().collect();
538 assert_eq!(lines.len(), 3);
539 }
540
541 #[tokio::test]
542 async fn test_cleanup_old_files_deletes_old_keeps_recent() {
543 let dir = TempDir::new().unwrap();
544 let old_file = dir.path().join("metrics-1970-01-01.jsonl");
545 let today = current_date_str();
546 let recent_file = dir.path().join(format!("metrics-{}.jsonl", today));
547 std::fs::write(&old_file, "old\n").unwrap();
548 std::fs::write(&recent_file, "recent\n").unwrap();
549 cleanup_old_files(dir.path()).await;
550 assert!(!old_file.exists());
551 assert!(recent_file.exists());
552 }
553
554 #[test]
555 fn test_metric_event_serialization() {
556 let event = MetricEvent {
557 ts: 1_700_000_000_000,
558 tool: "analyze_directory",
559 duration_ms: 42,
560 output_chars: 100,
561 param_path_depth: 3,
562 max_depth: Some(2),
563 result: "ok",
564 error_type: None,
565 session_id: None,
566 seq: None,
567 cache_hit: None,
568 cache_write_failure: None,
569 exit_code: None,
570 timed_out: false,
571 cache_tier: None,
572 };
573 let json = serde_json::to_string(&event).unwrap();
574 assert!(json.contains("analyze_directory"));
575 assert!(json.contains(r#""result":"ok""#));
576 assert!(json.contains(r#""output_chars":100"#));
577 }
578
579 #[test]
580 fn test_metric_event_serialization_error() {
581 let event = MetricEvent {
582 ts: 1_700_000_000_000,
583 tool: "analyze_directory",
584 duration_ms: 5,
585 output_chars: 0,
586 param_path_depth: 3,
587 max_depth: Some(3),
588 result: "error",
589 error_type: Some("invalid_params".to_string()),
590 session_id: None,
591 seq: None,
592 cache_hit: None,
593 cache_write_failure: None,
594 exit_code: None,
595 timed_out: false,
596 cache_tier: None,
597 };
598 let json = serde_json::to_string(&event).unwrap();
599 assert!(json.contains(r#""result":"error""#));
600 assert!(json.contains(r#""error_type":"invalid_params""#));
601 assert!(json.contains(r#""output_chars":0"#));
602 assert!(json.contains(r#""max_depth":3"#));
603 }
604
605 #[test]
606 fn test_metric_event_new_fields_round_trip() {
607 let event = MetricEvent {
608 ts: 1_700_000_000_000,
609 tool: "analyze_file",
610 duration_ms: 100,
611 output_chars: 500,
612 param_path_depth: 2,
613 max_depth: Some(3),
614 result: "ok",
615 error_type: None,
616 session_id: Some("1742468880123-42".to_string()),
617 seq: Some(5),
618 cache_hit: None,
619 cache_write_failure: None,
620 exit_code: None,
621 timed_out: false,
622 cache_tier: None,
623 };
624 let serialized = serde_json::to_string(&event).unwrap();
625 let json_str = r#"{"ts":1700000000000,"tool":"analyze_file","duration_ms":100,"output_chars":500,"param_path_depth":2,"max_depth":3,"result":"ok","error_type":null,"session_id":"1742468880123-42","seq":5}"#;
626 assert_eq!(serialized, json_str);
627 }
628
629 #[tokio::test]
630 async fn test_metrics_export_file_created() {
631 let _guard = metrics_export_lock();
632 let dir = TempDir::new().unwrap();
634 let export_file = dir.path().join("metrics_export.json");
635 let export_path_str = export_file.to_string_lossy().to_string();
636
637 unsafe {
638 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", &export_path_str);
639 }
640
641 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
643 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
644
645 tx.send(MetricEvent {
647 ts: unix_ms(),
648 tool: "analyze_directory",
649 duration_ms: 100,
650 output_chars: 50,
651 param_path_depth: 1,
652 max_depth: None,
653 result: "ok",
654 error_type: None,
655 session_id: Some("test-session-123".to_string()),
656 seq: None,
657 cache_hit: None,
658 cache_write_failure: None,
659 exit_code: None,
660 timed_out: false,
661 cache_tier: None,
662 })
663 .unwrap();
664 tx.send(MetricEvent {
665 ts: unix_ms(),
666 tool: "analyze_file",
667 duration_ms: 50,
668 output_chars: 100,
669 param_path_depth: 2,
670 max_depth: Some(3),
671 result: "ok",
672 error_type: None,
673 session_id: Some("test-session-123".to_string()),
674 seq: None,
675 cache_hit: None,
676 cache_write_failure: None,
677 exit_code: None,
678 timed_out: false,
679 cache_tier: None,
680 })
681 .unwrap();
682 drop(tx);
683 writer.run().await;
684
685 assert!(
687 export_file.exists(),
688 "export file should be created at {:?}",
689 export_file
690 );
691 let content = std::fs::read_to_string(&export_file).unwrap();
692 let json: serde_json::Value = serde_json::from_str(&content).unwrap();
693
694 assert_eq!(
695 json["session_id"], "test-session-123",
696 "export should contain correct session_id"
697 );
698 assert!(
699 json["tool_calls"].is_array(),
700 "export should contain tool_calls array"
701 );
702 let tool_calls = json["tool_calls"].as_array().unwrap();
703 assert_eq!(tool_calls.len(), 2, "should have 2 tool calls");
704 assert!(
705 json["total_duration_ms"].is_number(),
706 "export should contain total_duration_ms"
707 );
708 assert_eq!(
709 json["total_duration_ms"], 150,
710 "total_duration_ms should be sum of all durations"
711 );
712
713 unsafe {
715 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
716 }
717 }
718
719 #[tokio::test]
720 async fn test_metrics_export_env_var_unset() {
721 let _guard = metrics_export_lock();
722 unsafe {
724 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
725 }
726 let dir = TempDir::new().unwrap();
727 let marker = "metrics_export_unset_test";
729
730 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
732 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
733
734 tx.send(MetricEvent {
736 ts: unix_ms(),
737 tool: "analyze_directory",
738 duration_ms: 100,
739 output_chars: 50,
740 param_path_depth: 1,
741 max_depth: None,
742 result: "ok",
743 error_type: None,
744 session_id: Some("test-session-456".to_string()),
745 seq: None,
746 cache_hit: None,
747 cache_write_failure: None,
748 exit_code: None,
749 timed_out: false,
750 cache_tier: None,
751 })
752 .unwrap();
753 drop(tx);
754 writer.run().await;
755
756 let entries: Vec<_> = std::fs::read_dir(dir.path())
758 .unwrap()
759 .filter_map(|e| e.ok())
760 .filter(|e| {
761 e.path()
762 .file_name()
763 .and_then(|n| n.to_str())
764 .map(|n| n.contains(marker))
765 .unwrap_or(false)
766 })
767 .collect();
768 assert_eq!(
769 entries.len(),
770 0,
771 "no export file should be created when env var is unset"
772 );
773 }
774
775 #[tokio::test]
776 async fn test_metrics_export_relative_path_rejected() {
777 let _guard = metrics_export_lock();
778 let relative_path = "relative/path/metrics.json";
780 unsafe {
781 std::env::set_var("APTU_CODER_METRICS_EXPORT_FILE", relative_path);
782 }
783
784 let dir = TempDir::new().unwrap();
785 let marker = "metrics_export_relative_test";
787
788 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MetricEvent>();
790 let writer = MetricsWriter::new(rx, Some(dir.path().to_path_buf()));
791
792 tx.send(MetricEvent {
794 ts: unix_ms(),
795 tool: "analyze_directory",
796 duration_ms: 100,
797 output_chars: 50,
798 param_path_depth: 1,
799 max_depth: None,
800 result: "ok",
801 error_type: None,
802 session_id: Some(marker.to_string()),
803 seq: None,
804 cache_hit: None,
805 cache_write_failure: None,
806 exit_code: None,
807 timed_out: false,
808 cache_tier: None,
809 })
810 .unwrap();
811 drop(tx);
812 writer.run().await;
813
814 let entries: Vec<_> = std::fs::read_dir(dir.path())
816 .unwrap()
817 .filter_map(|e| e.ok())
818 .filter(|e| {
819 e.path()
820 .file_name()
821 .and_then(|n| n.to_str())
822 .map(|n| n.contains("metrics.json"))
823 .unwrap_or(false)
824 })
825 .collect();
826 assert_eq!(
827 entries.len(),
828 0,
829 "no export file should be created for relative path"
830 );
831
832 unsafe {
834 std::env::remove_var("APTU_CODER_METRICS_EXPORT_FILE");
835 }
836 }
837}
838
839fn record_otel_metrics(event: &MetricEvent) {
849 use opentelemetry::metrics::{Counter, Histogram};
850 use opentelemetry::{KeyValue, global};
851 use std::sync::OnceLock;
852
853 static DURATION_HISTOGRAM: OnceLock<Histogram<f64>> = OnceLock::new();
854 static CALL_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
855 static CACHE_HITS_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
856 static CACHE_WRITE_FAILURES_COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
857
858 let histogram = DURATION_HISTOGRAM.get_or_init(|| {
859 global::meter("aptu-coder")
860 .f64_histogram("mcp.server.operation.duration")
861 .with_unit("s")
862 .with_boundaries(vec![
863 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
864 ])
865 .build()
866 });
867
868 let counter = CALL_COUNTER.get_or_init(|| {
869 global::meter("aptu-coder")
870 .u64_counter("mcp.server.tool.calls")
871 .build()
872 });
873
874 let cache_hits_counter = CACHE_HITS_COUNTER.get_or_init(|| {
875 global::meter("aptu-coder")
876 .u64_counter("mcp.server.tool.cache_hits_total")
877 .with_description("Number of tool responses served from cache (l1_memory or l2_disk)")
878 .build()
879 });
880
881 let cache_write_failures_counter = CACHE_WRITE_FAILURES_COUNTER.get_or_init(|| {
882 global::meter("aptu-coder")
883 .u64_counter("mcp.server.tool.cache_write_failures_total")
884 .with_description(
885 "Number of L2 disk cache write failures (dir, tempfile, write, rename)",
886 )
887 .build()
888 });
889
890 let error_type = event.error_type.as_deref().unwrap_or("success");
891 let attributes = [
892 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
893 KeyValue::new("error.type", error_type.to_string()),
894 KeyValue::new("mcp.method.name", "tools/call"),
895 KeyValue::new("mcp.protocol.version", "2025-11-25"),
896 KeyValue::new("network.transport", "pipe"),
897 ];
898
899 histogram.record(event.duration_ms as f64 / 1000.0, &attributes);
900 counter.add(1, &attributes);
901
902 if event.cache_hit == Some(true) {
903 let tier = event.cache_tier.unwrap_or("unknown");
904 cache_hits_counter.add(
905 1,
906 &[
907 KeyValue::new("gen_ai.tool.name", event.tool.to_string()),
908 KeyValue::new("cache_tier", tier.to_string()),
909 ],
910 );
911 }
912
913 if event.cache_write_failure == Some(true) {
914 cache_write_failures_counter.add(
915 1,
916 &[KeyValue::new("gen_ai.tool.name", event.tool.to_string())],
917 );
918 }
919}