1use anyhow::Result;
6use chrono::{DateTime, Utc};
7use once_cell::sync::Lazy;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::sync::Mutex;
13
14pub static TOKEN_USAGE: Lazy<Arc<AtomicTokenCounter>> =
16 Lazy::new(|| Arc::new(AtomicTokenCounter::new()));
17
18pub static TOOL_EXECUTIONS: Lazy<Arc<AtomicToolCounter>> =
19 Lazy::new(|| Arc::new(AtomicToolCounter::new()));
20
21pub static PROVIDER_METRICS: Lazy<Arc<ProviderMetrics>> =
22 Lazy::new(|| Arc::new(ProviderMetrics::new()));
23
24#[derive(Debug, Clone, Default, Serialize, Deserialize)]
26pub struct TokenTotals {
27 pub input: u64,
28 pub output: u64,
29}
30
31impl TokenTotals {
32 pub fn new(input: u64, output: u64) -> Self {
33 Self { input, output }
34 }
35
36 pub fn total(&self) -> u64 {
37 self.input + self.output
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct GlobalTokenSnapshot {
44 pub input: u64,
45 pub output: u64,
46 pub total: TokenTotals,
47 pub totals: TokenTotals,
48 pub request_count: u64,
49}
50
51impl GlobalTokenSnapshot {
52 pub fn new(input: u64, output: u64, _total: u64) -> Self {
53 Self {
54 input,
55 output,
56 total: TokenTotals::new(input, output),
57 totals: TokenTotals::new(input, output),
58 request_count: 0,
59 }
60 }
61
62 pub fn summary(&self) -> String {
63 format!(
64 "{} total tokens ({} input, {} output)",
65 self.totals.total(),
66 self.input,
67 self.output
68 )
69 }
70}
71
72#[derive(Debug)]
74pub struct AtomicTokenCounter {
75 prompt_tokens: AtomicU64,
76 completion_tokens: AtomicU64,
77 total_tokens: AtomicU64,
78 request_count: AtomicU64,
79 model_usage: Mutex<HashMap<String, (u64, u64)>>,
80}
81
82impl AtomicTokenCounter {
83 pub fn new() -> Self {
84 Self {
85 prompt_tokens: AtomicU64::new(0),
86 completion_tokens: AtomicU64::new(0),
87 total_tokens: AtomicU64::new(0),
88 request_count: AtomicU64::new(0),
89 model_usage: Mutex::new(HashMap::new()),
90 }
91 }
92
93 pub fn record(&self, prompt: u64, completion: u64) {
94 self.prompt_tokens.fetch_add(prompt, Ordering::Relaxed);
95 self.completion_tokens
96 .fetch_add(completion, Ordering::Relaxed);
97 self.total_tokens
98 .fetch_add(prompt + completion, Ordering::Relaxed);
99 self.request_count.fetch_add(1, Ordering::Relaxed);
100 }
101
102 pub fn get(&self) -> (u64, u64, u64) {
103 (
104 self.prompt_tokens.load(Ordering::Relaxed),
105 self.completion_tokens.load(Ordering::Relaxed),
106 self.total_tokens.load(Ordering::Relaxed),
107 )
108 }
109
110 pub fn record_model_usage(&self, model: &str, prompt: u64, completion: u64) {
111 tracing::debug!(model, prompt, completion, "Recording model usage");
112 self.record(prompt, completion);
113
114 if let Ok(mut usage) = self.model_usage.try_lock() {
116 let entry = usage.entry(model.to_string()).or_insert((0, 0));
117 entry.0 += prompt;
118 entry.1 += completion;
119 }
120 }
121
122 pub fn global_snapshot(&self) -> GlobalTokenSnapshot {
123 let (prompt, completion, total) = self.get();
124 let mut snapshot = GlobalTokenSnapshot::new(prompt, completion, total);
125 snapshot.request_count = self.request_count.load(Ordering::Relaxed);
126 snapshot
127 }
128
129 pub fn model_snapshots(&self) -> Vec<TokenUsageSnapshot> {
130 if let Ok(usage) = self.model_usage.try_lock() {
131 usage
132 .iter()
133 .map(|(name, (input, output))| {
134 TokenUsageSnapshot {
135 name: name.clone(),
136 prompt_tokens: *input,
137 completion_tokens: *output,
138 total_tokens: input + output,
139 totals: TokenTotals::new(*input, *output),
140 timestamp: Utc::now(),
141 request_count: 0, }
143 })
144 .collect()
145 } else {
146 Vec::new()
147 }
148 }
149}
150
151impl Default for AtomicTokenCounter {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157#[derive(Debug)]
159pub struct AtomicToolCounter {
160 count: AtomicU64,
161 failures: AtomicU64,
162}
163
164impl AtomicToolCounter {
165 pub fn new() -> Self {
166 Self {
167 count: AtomicU64::new(0),
168 failures: AtomicU64::new(0),
169 }
170 }
171
172 pub fn record(&self, success: bool) {
173 self.count.fetch_add(1, Ordering::Relaxed);
174 if !success {
175 self.failures.fetch_add(1, Ordering::Relaxed);
176 }
177 }
178
179 pub fn get(&self) -> (u64, u64) {
180 (
181 self.count.load(Ordering::Relaxed),
182 self.failures.load(Ordering::Relaxed),
183 )
184 }
185}
186
187impl Default for AtomicToolCounter {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct ToolExecution {
196 pub id: String,
197 pub tool_name: String,
198 pub timestamp: DateTime<Utc>,
199 pub duration_ms: u64,
200 pub success: bool,
201 pub error: Option<String>,
202 pub tokens_used: Option<u64>,
203 pub session_id: Option<String>,
204 pub input: Option<serde_json::Value>,
205 #[serde(default)]
206 pub file_changes: Vec<FileChange>,
207}
208
209impl ToolExecution {
210 pub fn start(tool_name: &str, input: serde_json::Value) -> Self {
212 Self {
213 id: uuid::Uuid::new_v4().to_string(),
214 tool_name: tool_name.to_string(),
215 timestamp: Utc::now(),
216 duration_ms: 0,
217 success: false,
218 error: None,
219 tokens_used: None,
220 session_id: None,
221 input: Some(input),
222 file_changes: Vec::new(),
223 }
224 }
225
226 pub fn add_file_change(&mut self, change: FileChange) {
228 self.file_changes.push(change);
229 }
230
231 pub fn with_session(mut self, session_id: String) -> Self {
233 self.session_id = Some(session_id);
234 self
235 }
236
237 pub fn complete(&mut self, success: bool, duration_ms: u64) {
239 self.success = success;
240 self.duration_ms = duration_ms;
241 }
242
243 pub fn fail(&mut self, error: String, duration_ms: u64) {
245 self.success = false;
246 self.error = Some(error);
247 self.duration_ms = duration_ms;
248 }
249
250 pub fn complete_success(mut self, _output: String, duration: std::time::Duration) -> Self {
252 self.success = true;
253 self.duration_ms = duration.as_millis() as u64;
254 self
255 }
256
257 pub fn complete_error(mut self, error: String, duration: std::time::Duration) -> Self {
259 self.success = false;
260 self.error = Some(error);
261 self.duration_ms = duration.as_millis() as u64;
262 self
263 }
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct A2AMessageRecord {
269 pub tool_name: String,
270 pub task_id: String,
271 pub blocking: bool,
272 pub prompt: String,
273 pub duration_ms: u64,
274 pub success: bool,
275 pub output: Option<String>,
276 pub error: Option<String>,
277 pub timestamp: DateTime<Utc>,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct FileChange {
283 pub path: String,
284 pub operation: String,
285 pub timestamp: DateTime<Utc>,
286 pub size_bytes: Option<u64>,
287 pub line_range: Option<(u32, u32)>,
288 pub diff: Option<String>,
289}
290
291impl FileChange {
292 pub fn read(path: &str, line_range: Option<(u32, u32)>) -> Self {
294 Self {
295 path: path.to_string(),
296 operation: "read".to_string(),
297 timestamp: Utc::now(),
298 size_bytes: None,
299 line_range,
300 diff: None,
301 }
302 }
303
304 pub fn create(path: &str, content: &str) -> Self {
306 Self {
307 path: path.to_string(),
308 operation: "create".to_string(),
309 timestamp: Utc::now(),
310 size_bytes: Some(content.len() as u64),
311 line_range: None,
312 diff: None,
313 }
314 }
315
316 pub fn modify(
318 path: &str,
319 old_content: &str,
320 new_content: &str,
321 line_range: Option<(u32, u32)>,
322 ) -> Self {
323 Self {
324 path: path.to_string(),
325 operation: "modify".to_string(),
326 timestamp: Utc::now(),
327 size_bytes: Some(new_content.len() as u64),
328 line_range,
329 diff: Some(format!(
330 "-{} bytes +{} bytes",
331 old_content.len(),
332 new_content.len()
333 )),
334 }
335 }
336
337 pub fn modify_with_diff(
339 path: &str,
340 diff: &str,
341 new_size: usize,
342 line_range: Option<(u32, u32)>,
343 ) -> Self {
344 Self {
345 path: path.to_string(),
346 operation: "modify".to_string(),
347 timestamp: Utc::now(),
348 size_bytes: Some(new_size as u64),
349 line_range,
350 diff: Some(diff.to_string()),
351 }
352 }
353 pub fn summary(&self) -> String {
354 format!("{} ({})", self.path, self.operation)
355 }
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct ProviderRequestRecord {
361 pub provider: String,
362 pub model: String,
363 pub timestamp: DateTime<Utc>,
364 pub prompt_tokens: u64,
365 pub completion_tokens: u64,
366 pub input_tokens: u64,
367 pub output_tokens: u64,
368 pub latency_ms: u64,
369 pub ttft_ms: Option<u64>,
370 pub success: bool,
371}
372
373impl ProviderRequestRecord {
374 pub fn tokens_per_second(&self) -> f64 {
376 if self.latency_ms == 0 {
377 return 0.0;
378 }
379 (self.output_tokens as f64) / (self.latency_ms as f64 / 1000.0)
380 }
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct TokenUsageSnapshot {
386 pub name: String,
387 pub prompt_tokens: u64,
388 pub completion_tokens: u64,
389 pub total_tokens: u64,
390 pub totals: TokenTotals,
391 pub timestamp: DateTime<Utc>,
392 pub request_count: u64,
393}
394
395impl TokenUsageSnapshot {
396 pub fn current() -> Self {
397 let (prompt, comp, total) = TOKEN_USAGE.get();
398 Self {
399 name: "global".to_string(),
400 prompt_tokens: prompt,
401 completion_tokens: comp,
402 total_tokens: total,
403 totals: TokenTotals::new(prompt, comp),
404 timestamp: Utc::now(),
405 request_count: 0,
406 }
407 }
408
409 pub fn summary(&self) -> String {
410 format!(
411 "{} total tokens ({} input, {} output)",
412 self.totals.total(),
413 self.prompt_tokens,
414 self.completion_tokens
415 )
416 }
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct TokenCounts {
422 pub input_tokens: u64,
423 pub output_tokens: u64,
424}
425
426impl TokenCounts {
427 pub fn new(input_tokens: u64, output_tokens: u64) -> Self {
428 Self {
429 input_tokens,
430 output_tokens,
431 }
432 }
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct ContextLimit {
438 pub max_tokens: u64,
439 pub used_tokens: u64,
440 pub remaining_tokens: u64,
441 pub percentage_used: f64,
442 pub percentage: f64,
444}
445
446impl ContextLimit {
447 pub fn new(used_tokens: u64, max_tokens: u64) -> Self {
448 let remaining = max_tokens.saturating_sub(used_tokens);
449 let percentage = if max_tokens > 0 {
450 (used_tokens as f64 / max_tokens as f64) * 100.0
451 } else {
452 0.0
453 };
454 Self {
455 max_tokens,
456 used_tokens,
457 remaining_tokens: remaining,
458 percentage_used: percentage,
459 percentage,
460 }
461 }
462
463 pub fn percentage(&self) -> f64 {
465 self.percentage_used
466 }
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize)]
471pub struct CostEstimate {
472 pub input_cost: f64,
473 pub output_cost: f64,
474 pub total_cost: f64,
475 pub currency: String,
476}
477
478impl Default for CostEstimate {
479 fn default() -> Self {
480 Self {
481 input_cost: 0.0,
482 output_cost: 0.0,
483 total_cost: 0.0,
484 currency: "USD".to_string(),
485 }
486 }
487}
488
489impl CostEstimate {
490 pub fn from_tokens(tokens: &TokenCounts, input_price: f64, output_price: f64) -> Self {
491 let input_cost = (tokens.input_tokens as f64 / 1_000_000.0) * input_price;
492 let output_cost = (tokens.output_tokens as f64 / 1_000_000.0) * output_price;
493 Self {
494 input_cost,
495 output_cost,
496 total_cost: input_cost + output_cost,
497 currency: "USD".to_string(),
498 }
499 }
500
501 pub fn format_currency(&self) -> String {
502 format!("${:.4}", self.total_cost)
503 }
504
505 pub fn format_smart(&self) -> String {
506 if self.total_cost < 0.01 {
507 format!("${:.4}", self.total_cost)
508 } else {
509 format!("${:.2}", self.total_cost)
510 }
511 }
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct ProviderSnapshot {
516 pub provider: String,
517 pub request_count: usize,
518 pub total_input_tokens: u64,
519 pub total_output_tokens: u64,
520 pub avg_tps: f64,
521 pub avg_latency_ms: f64,
522 pub p50_tps: f64,
523 pub p50_latency_ms: f64,
524 pub p95_tps: f64,
525 pub p95_latency_ms: f64,
526}
527
528#[derive(Debug, Default)]
530pub struct ProviderMetrics {
531 requests: Mutex<Vec<ProviderRequestRecord>>,
532}
533
534impl ProviderMetrics {
535 pub fn new() -> Self {
536 Self::default()
537 }
538
539 pub async fn record(&self, record: ProviderRequestRecord) {
540 let mut requests = self.requests.lock().await;
541 requests.push(record);
542 if requests.len() > 1000 {
544 requests.remove(0);
545 }
546 }
547
548 pub async fn get_recent(&self, limit: usize) -> Vec<ProviderRequestRecord> {
549 let requests = self.requests.lock().await;
550 requests.iter().rev().take(limit).cloned().collect()
551 }
552
553 pub fn all_snapshots(&self) -> Vec<ProviderSnapshot> {
554 let requests = match self.requests.try_lock() {
555 Ok(guard) => guard.clone(),
556 Err(_) => return Vec::new(),
557 };
558
559 if requests.is_empty() {
560 return Vec::new();
561 }
562
563 let mut by_provider: HashMap<String, Vec<ProviderRequestRecord>> = HashMap::new();
564 for req in requests {
565 by_provider
566 .entry(req.provider.clone())
567 .or_default()
568 .push(req);
569 }
570
571 let mut snapshots = Vec::new();
572 for (provider, reqs) in by_provider {
573 if reqs.is_empty() {
574 continue;
575 }
576
577 let request_count = reqs.len();
578 let total_input_tokens: u64 = reqs.iter().map(|r| r.input_tokens).sum();
579 let total_output_tokens: u64 = reqs.iter().map(|r| r.output_tokens).sum();
580 let total_latency: u64 = reqs.iter().map(|r| r.latency_ms).sum();
581
582 let avg_latency_ms = total_latency as f64 / request_count as f64;
583
584 let mut tps_values: Vec<f64> = reqs.iter().map(|r| r.tokens_per_second()).collect();
585 tps_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
586
587 let mut latency_values: Vec<f64> = reqs.iter().map(|r| r.latency_ms as f64).collect();
588 latency_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
589
590 let p50_idx = (request_count as f64 * 0.50) as usize;
591 let p95_idx = (request_count as f64 * 0.95) as usize;
592
593 let p50_tps = tps_values.get(p50_idx).cloned().unwrap_or(0.0);
594 let p95_tps = tps_values.get(p95_idx).cloned().unwrap_or(0.0);
595
596 let p50_latency_ms = latency_values.get(p50_idx).cloned().unwrap_or(0.0);
597 let p95_latency_ms = latency_values.get(p95_idx).cloned().unwrap_or(0.0);
598
599 let avg_tps = tps_values.iter().sum::<f64>() / request_count as f64;
600
601 snapshots.push(ProviderSnapshot {
602 provider,
603 request_count,
604 total_input_tokens,
605 total_output_tokens,
606 avg_tps,
607 avg_latency_ms,
608 p50_tps,
609 p50_latency_ms,
610 p95_tps,
611 p95_latency_ms,
612 });
613 }
614
615 snapshots
616 }
617}
618
619pub fn record_persistent(category: &str, data: &serde_json::Value) -> Result<()> {
621 tracing::debug!(category, data = ?data, "Recording persistent telemetry");
622 Ok(())
624}
625
626#[derive(Debug, Default)]
628pub struct SwarmTelemetryCollector {
629 task_id: Mutex<Option<String>>,
630 agent_count: Mutex<usize>,
631 completed: Mutex<usize>,
632 total: Mutex<usize>,
633 start_time: Mutex<Option<DateTime<Utc>>>,
634}
635
636impl SwarmTelemetryCollector {
637 pub fn new() -> Self {
638 Self::default()
639 }
640
641 pub async fn start_swarm(&self, task_id: &str, agent_count: usize, _strategy: &str) {
642 let mut id = self.task_id.lock().await;
643 *id = Some(task_id.to_string());
644 let mut count = self.agent_count.lock().await;
645 *count = agent_count;
646 let mut start = self.start_time.lock().await;
647 *start = Some(Utc::now());
648 tracing::info!(task_id, agent_count, "Swarm started");
649 }
650
651 pub async fn record_progress(&self, completed: usize, total: usize) {
652 let mut c = self.completed.lock().await;
653 *c = completed;
654 let mut t = self.total.lock().await;
655 *t = total;
656 }
657
658 pub async fn record_swarm_latency(&self, _label: &str, duration: std::time::Duration) {
659 tracing::debug!(
660 label = _label,
661 duration_ms = duration.as_millis(),
662 "Swarm latency recorded"
663 );
664 }
665
666 pub async fn complete_swarm(&self, success: bool) -> TelemetryMetrics {
667 let start = self.start_time.lock().await;
668 let duration = start
669 .map(|s| (Utc::now() - s).num_milliseconds() as u64)
670 .unwrap_or(0);
671 drop(start);
672
673 let completed = *self.completed.lock().await;
674 let total = *self.total.lock().await;
675
676 tracing::info!(
677 success,
678 completed,
679 total,
680 duration_ms = duration,
681 "Swarm completed"
682 );
683
684 TelemetryMetrics {
685 tool_invocations: total as u64,
686 successful_operations: if success { completed as u64 } else { 0 },
687 failed_operations: if !success {
688 (total.saturating_sub(completed)) as u64
689 } else {
690 0
691 },
692 total_tokens: 0,
693 avg_latency_ms: duration as f64,
694 }
695 }
696}
697
698#[derive(Debug, Clone, Default, Serialize, Deserialize)]
700pub struct TelemetryMetrics {
701 pub tool_invocations: u64,
703 pub successful_operations: u64,
705 pub failed_operations: u64,
707 pub total_tokens: u64,
709 pub avg_latency_ms: f64,
711}
712
713#[derive(Debug)]
715pub struct Telemetry {
716 metrics: Mutex<TelemetryMetrics>,
717 pub metadata: HashMap<String, String>,
719}
720
721impl Telemetry {
722 pub fn new() -> Self {
724 Self {
725 metrics: Mutex::new(TelemetryMetrics::default()),
726 metadata: HashMap::new(),
727 }
728 }
729
730 pub async fn record_tool_invocation(&self, success: bool, latency_ms: u64, tokens: u64) {
732 let mut metrics = self.metrics.lock().await;
733 metrics.tool_invocations += 1;
734 if success {
735 metrics.successful_operations += 1;
736 } else {
737 metrics.failed_operations += 1;
738 }
739 metrics.total_tokens += tokens;
740 let n = metrics.tool_invocations as f64;
742 metrics.avg_latency_ms = metrics.avg_latency_ms * (n - 1.0) / n + latency_ms as f64 / n;
743 }
744
745 pub async fn get_metrics(&self) -> TelemetryMetrics {
747 self.metrics.lock().await.clone()
748 }
749
750 pub async fn start_swarm(&self, _task_id: &str, _agent_count: usize) {
752 }
754
755 pub async fn record_swarm_progress(&self, _task_id: &str, _completed: usize, _total: usize) {
757 }
759
760 pub async fn complete_swarm(&self, _success: bool) -> TelemetryMetrics {
762 self.metrics.lock().await.clone()
763 }
764}
765
766impl Default for Telemetry {
767 fn default() -> Self {
768 Self::new()
769 }
770}
771
772#[derive(Debug, Clone, Default, Serialize, Deserialize)]
774pub struct PersistentStats {
775 pub stats: PersistentStatsInner,
776}
777
778#[derive(Debug, Clone, Default, Serialize, Deserialize)]
779pub struct PersistentStatsInner {
780 pub total_input_tokens: u64,
781 pub total_output_tokens: u64,
782 pub total_requests: u64,
783 pub executions_by_tool: HashMap<String, u64>,
784 pub files_modified: HashMap<String, u64>,
785}
786
787impl PersistentStats {
788 pub fn recent(&self, _limit: usize) -> Vec<ToolExecution> {
789 Vec::new()
790 }
791
792 pub fn all_file_changes(&self) -> Vec<(String, FileChange)> {
793 Vec::new()
794 }
795
796 pub fn by_tool(&self, _tool_name: &str) -> Vec<ToolExecution> {
797 Vec::new()
798 }
799
800 pub fn by_file(&self, _file_path: &str) -> Vec<ToolExecution> {
801 Vec::new()
802 }
803
804 pub fn summary(&self) -> String {
805 "0 total executions".to_string()
806 }
807}
808
809pub fn get_persistent_stats() -> PersistentStats {
811 PersistentStats::default()
812}