1use anyhow::Result;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use once_cell::sync::Lazy;
13
14pub static TOKEN_USAGE: Lazy<Arc<AtomicTokenCounter>> = Lazy::new(|| {
16 Arc::new(AtomicTokenCounter::new())
17});
18
19pub static TOOL_EXECUTIONS: Lazy<Arc<AtomicToolCounter>> = Lazy::new(|| {
20 Arc::new(AtomicToolCounter::new())
21});
22
23pub static PROVIDER_METRICS: Lazy<Arc<ProviderMetrics>> = Lazy::new(|| {
24 Arc::new(ProviderMetrics::new())
25});
26
27#[derive(Debug, Clone, Default, Serialize, Deserialize)]
29pub struct TokenTotals {
30 pub input: u64,
31 pub output: u64,
32}
33
34impl TokenTotals {
35 pub fn new(input: u64, output: u64) -> Self {
36 Self { input, output }
37 }
38
39 pub fn total(&self) -> u64 {
40 self.input + self.output
41 }
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct GlobalTokenSnapshot {
47 pub input: u64,
48 pub output: u64,
49 pub total: TokenTotals,
50 pub totals: TokenTotals,
51 pub request_count: u64,
52}
53
54impl GlobalTokenSnapshot {
55 pub fn new(input: u64, output: u64, _total: u64) -> Self {
56 Self {
57 input,
58 output,
59 total: TokenTotals::new(input, output),
60 totals: TokenTotals::new(input, output),
61 request_count: 0,
62 }
63 }
64
65 pub fn summary(&self) -> String {
66 format!("{} total tokens ({} input, {} output)", self.totals.total(), self.input, self.output)
67 }
68}
69
70#[derive(Debug)]
72pub struct AtomicTokenCounter {
73 prompt_tokens: AtomicU64,
74 completion_tokens: AtomicU64,
75 total_tokens: AtomicU64,
76 request_count: AtomicU64,
77 model_usage: Mutex<HashMap<String, (u64, u64)>>,
78}
79
80impl AtomicTokenCounter {
81 pub fn new() -> Self {
82 Self {
83 prompt_tokens: AtomicU64::new(0),
84 completion_tokens: AtomicU64::new(0),
85 total_tokens: AtomicU64::new(0),
86 request_count: AtomicU64::new(0),
87 model_usage: Mutex::new(HashMap::new()),
88 }
89 }
90
91 pub fn record(&self, prompt: u64, completion: u64) {
92 self.prompt_tokens.fetch_add(prompt, Ordering::Relaxed);
93 self.completion_tokens.fetch_add(completion, Ordering::Relaxed);
94 self.total_tokens.fetch_add(prompt + completion, Ordering::Relaxed);
95 self.request_count.fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub fn get(&self) -> (u64, u64, u64) {
99 (
100 self.prompt_tokens.load(Ordering::Relaxed),
101 self.completion_tokens.load(Ordering::Relaxed),
102 self.total_tokens.load(Ordering::Relaxed),
103 )
104 }
105
106 pub fn record_model_usage(&self, model: &str, prompt: u64, completion: u64) {
107 tracing::debug!(model, prompt, completion, "Recording model usage");
108 self.record(prompt, completion);
109
110 if let Ok(mut usage) = self.model_usage.try_lock() {
112 let entry = usage.entry(model.to_string()).or_insert((0, 0));
113 entry.0 += prompt;
114 entry.1 += completion;
115 }
116 }
117
118 pub fn global_snapshot(&self) -> GlobalTokenSnapshot {
119 let (prompt, completion, total) = self.get();
120 let mut snapshot = GlobalTokenSnapshot::new(prompt, completion, total);
121 snapshot.request_count = self.request_count.load(Ordering::Relaxed);
122 snapshot
123 }
124
125 pub fn model_snapshots(&self) -> Vec<TokenUsageSnapshot> {
126 if let Ok(usage) = self.model_usage.try_lock() {
127 usage.iter().map(|(name, (input, output))| {
128 TokenUsageSnapshot {
129 name: name.clone(),
130 prompt_tokens: *input,
131 completion_tokens: *output,
132 total_tokens: input + output,
133 totals: TokenTotals::new(*input, *output),
134 timestamp: Utc::now(),
135 request_count: 0, }
137 }).collect()
138 } else {
139 Vec::new()
140 }
141 }
142}
143
144impl Default for AtomicTokenCounter {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150#[derive(Debug)]
152pub struct AtomicToolCounter {
153 count: AtomicU64,
154 failures: AtomicU64,
155}
156
157impl AtomicToolCounter {
158 pub fn new() -> Self {
159 Self {
160 count: AtomicU64::new(0),
161 failures: AtomicU64::new(0),
162 }
163 }
164
165 pub fn record(&self, success: bool) {
166 self.count.fetch_add(1, Ordering::Relaxed);
167 if !success {
168 self.failures.fetch_add(1, Ordering::Relaxed);
169 }
170 }
171
172 pub fn get(&self) -> (u64, u64) {
173 (
174 self.count.load(Ordering::Relaxed),
175 self.failures.load(Ordering::Relaxed),
176 )
177 }
178}
179
180impl Default for AtomicToolCounter {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct ToolExecution {
189 pub id: String,
190 pub tool_name: String,
191 pub timestamp: DateTime<Utc>,
192 pub duration_ms: u64,
193 pub success: bool,
194 pub error: Option<String>,
195 pub tokens_used: Option<u64>,
196 pub session_id: Option<String>,
197 pub input: Option<serde_json::Value>,
198 #[serde(default)]
199 pub file_changes: Vec<FileChange>,
200}
201
202impl ToolExecution {
203 pub fn start(tool_name: &str, input: serde_json::Value) -> Self {
205 Self {
206 id: uuid::Uuid::new_v4().to_string(),
207 tool_name: tool_name.to_string(),
208 timestamp: Utc::now(),
209 duration_ms: 0,
210 success: false,
211 error: None,
212 tokens_used: None,
213 session_id: None,
214 input: Some(input),
215 file_changes: Vec::new(),
216 }
217 }
218
219 pub fn add_file_change(&mut self, change: FileChange) {
221 self.file_changes.push(change);
222 }
223
224 pub fn with_session(mut self, session_id: String) -> Self {
226 self.session_id = Some(session_id);
227 self
228 }
229
230 pub fn complete(&mut self, success: bool, duration_ms: u64) {
232 self.success = success;
233 self.duration_ms = duration_ms;
234 }
235
236 pub fn fail(&mut self, error: String, duration_ms: u64) {
238 self.success = false;
239 self.error = Some(error);
240 self.duration_ms = duration_ms;
241 }
242
243 pub fn complete_success(mut self, _output: String, duration: std::time::Duration) -> Self {
245 self.success = true;
246 self.duration_ms = duration.as_millis() as u64;
247 self
248 }
249
250 pub fn complete_error(mut self, error: String, duration: std::time::Duration) -> Self {
252 self.success = false;
253 self.error = Some(error);
254 self.duration_ms = duration.as_millis() as u64;
255 self
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct A2AMessageRecord {
262 pub tool_name: String,
263 pub task_id: String,
264 pub blocking: bool,
265 pub prompt: String,
266 pub duration_ms: u64,
267 pub success: bool,
268 pub output: Option<String>,
269 pub error: Option<String>,
270 pub timestamp: DateTime<Utc>,
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct FileChange {
276 pub path: String,
277 pub operation: String,
278 pub timestamp: DateTime<Utc>,
279 pub size_bytes: Option<u64>,
280 pub line_range: Option<(u32, u32)>,
281 pub diff: Option<String>,
282}
283
284impl FileChange {
285 pub fn read(path: &str, line_range: Option<(u32, u32)>) -> Self {
287 Self {
288 path: path.to_string(),
289 operation: "read".to_string(),
290 timestamp: Utc::now(),
291 size_bytes: None,
292 line_range,
293 diff: None,
294 }
295 }
296
297 pub fn create(path: &str, content: &str) -> Self {
299 Self {
300 path: path.to_string(),
301 operation: "create".to_string(),
302 timestamp: Utc::now(),
303 size_bytes: Some(content.len() as u64),
304 line_range: None,
305 diff: None,
306 }
307 }
308
309 pub fn modify(path: &str, old_content: &str, new_content: &str, line_range: Option<(u32, u32)>) -> Self {
311 Self {
312 path: path.to_string(),
313 operation: "modify".to_string(),
314 timestamp: Utc::now(),
315 size_bytes: Some(new_content.len() as u64),
316 line_range,
317 diff: Some(format!("-{} bytes +{} bytes", old_content.len(), new_content.len())),
318 }
319 }
320
321 pub fn modify_with_diff(path: &str, diff: &str, new_size: usize, line_range: Option<(u32, u32)>) -> Self {
323 Self {
324 path: path.to_string(),
325 operation: "modify".to_string(),
326 timestamp: Utc::now(),
327 size_bytes: Some(new_size as u64),
328 line_range,
329 diff: Some(diff.to_string()),
330 }
331 }
332 pub fn summary(&self) -> String {
333 format!("{} ({})", self.path, self.operation)
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct ProviderRequestRecord {
340 pub provider: String,
341 pub model: String,
342 pub timestamp: DateTime<Utc>,
343 pub prompt_tokens: u64,
344 pub completion_tokens: u64,
345 pub input_tokens: u64,
346 pub output_tokens: u64,
347 pub latency_ms: u64,
348 pub ttft_ms: Option<u64>,
349 pub success: bool,
350}
351
352impl ProviderRequestRecord {
353 pub fn tokens_per_second(&self) -> f64 {
355 if self.latency_ms == 0 {
356 return 0.0;
357 }
358 (self.output_tokens as f64) / (self.latency_ms as f64 / 1000.0)
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct TokenUsageSnapshot {
365 pub name: String,
366 pub prompt_tokens: u64,
367 pub completion_tokens: u64,
368 pub total_tokens: u64,
369 pub totals: TokenTotals,
370 pub timestamp: DateTime<Utc>,
371 pub request_count: u64,
372}
373
374impl TokenUsageSnapshot {
375 pub fn current() -> Self {
376 let (prompt, comp, total) = TOKEN_USAGE.get();
377 Self {
378 name: "global".to_string(),
379 prompt_tokens: prompt,
380 completion_tokens: comp,
381 total_tokens: total,
382 totals: TokenTotals::new(prompt, comp),
383 timestamp: Utc::now(),
384 request_count: 0,
385 }
386 }
387
388 pub fn summary(&self) -> String {
389 format!("{} total tokens ({} input, {} output)", self.totals.total(), self.prompt_tokens, self.completion_tokens)
390 }
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct TokenCounts {
396 pub input_tokens: u64,
397 pub output_tokens: u64,
398}
399
400impl TokenCounts {
401 pub fn new(input_tokens: u64, output_tokens: u64) -> Self {
402 Self {
403 input_tokens,
404 output_tokens,
405 }
406 }
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct ContextLimit {
412 pub max_tokens: u64,
413 pub used_tokens: u64,
414 pub remaining_tokens: u64,
415 pub percentage_used: f64,
416 pub percentage: f64,
418}
419
420impl ContextLimit {
421 pub fn new(used_tokens: u64, max_tokens: u64) -> Self {
422 let remaining = max_tokens.saturating_sub(used_tokens);
423 let percentage = if max_tokens > 0 {
424 (used_tokens as f64 / max_tokens as f64) * 100.0
425 } else {
426 0.0
427 };
428 Self {
429 max_tokens,
430 used_tokens,
431 remaining_tokens: remaining,
432 percentage_used: percentage,
433 percentage,
434 }
435 }
436
437 pub fn percentage(&self) -> f64 {
439 self.percentage_used
440 }
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize)]
445pub struct CostEstimate {
446 pub input_cost: f64,
447 pub output_cost: f64,
448 pub total_cost: f64,
449 pub currency: String,
450}
451
452impl Default for CostEstimate {
453 fn default() -> Self {
454 Self {
455 input_cost: 0.0,
456 output_cost: 0.0,
457 total_cost: 0.0,
458 currency: "USD".to_string(),
459 }
460 }
461}
462
463impl CostEstimate {
464 pub fn from_tokens(tokens: &TokenCounts, input_price: f64, output_price: f64) -> Self {
465 let input_cost = (tokens.input_tokens as f64 / 1_000_000.0) * input_price;
466 let output_cost = (tokens.output_tokens as f64 / 1_000_000.0) * output_price;
467 Self {
468 input_cost,
469 output_cost,
470 total_cost: input_cost + output_cost,
471 currency: "USD".to_string(),
472 }
473 }
474
475 pub fn format_currency(&self) -> String {
476 format!("${:.4}", self.total_cost)
477 }
478
479 pub fn format_smart(&self) -> String {
480 if self.total_cost < 0.01 {
481 format!("${:.4}", self.total_cost)
482 } else if self.total_cost < 1.0 {
483 format!("${:.2}", self.total_cost)
484 } else {
485 format!("${:.2}", self.total_cost)
486 }
487 }
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct ProviderSnapshot {
492 pub provider: String,
493 pub request_count: usize,
494 pub total_input_tokens: u64,
495 pub total_output_tokens: u64,
496 pub avg_tps: f64,
497 pub avg_latency_ms: f64,
498 pub p50_tps: f64,
499 pub p50_latency_ms: f64,
500 pub p95_tps: f64,
501 pub p95_latency_ms: f64,
502}
503
504#[derive(Debug, Default)]
506pub struct ProviderMetrics {
507 requests: Mutex<Vec<ProviderRequestRecord>>,
508}
509
510impl ProviderMetrics {
511 pub fn new() -> Self {
512 Self::default()
513 }
514
515 pub async fn record(&self, record: ProviderRequestRecord) {
516 let mut requests = self.requests.lock().await;
517 requests.push(record);
518 if requests.len() > 1000 {
520 requests.remove(0);
521 }
522 }
523
524 pub async fn get_recent(&self, limit: usize) -> Vec<ProviderRequestRecord> {
525 let requests = self.requests.lock().await;
526 requests.iter().rev().take(limit).cloned().collect()
527 }
528
529 pub fn all_snapshots(&self) -> Vec<ProviderSnapshot> {
530 let requests = match self.requests.try_lock() {
531 Ok(guard) => guard.clone(),
532 Err(_) => return Vec::new(),
533 };
534
535 if requests.is_empty() {
536 return Vec::new();
537 }
538
539 let mut by_provider: HashMap<String, Vec<ProviderRequestRecord>> = HashMap::new();
540 for req in requests {
541 by_provider.entry(req.provider.clone()).or_default().push(req);
542 }
543
544 let mut snapshots = Vec::new();
545 for (provider, mut reqs) in by_provider {
546 if reqs.is_empty() {
547 continue;
548 }
549
550 let request_count = reqs.len();
551 let total_input_tokens: u64 = reqs.iter().map(|r| r.input_tokens).sum();
552 let total_output_tokens: u64 = reqs.iter().map(|r| r.output_tokens).sum();
553 let total_latency: u64 = reqs.iter().map(|r| r.latency_ms).sum();
554
555 let avg_latency_ms = total_latency as f64 / request_count as f64;
556
557 let mut tps_values: Vec<f64> = reqs.iter().map(|r| r.tokens_per_second()).collect();
558 tps_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
559
560 let mut latency_values: Vec<f64> = reqs.iter().map(|r| r.latency_ms as f64).collect();
561 latency_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
562
563 let p50_idx = (request_count as f64 * 0.50) as usize;
564 let p95_idx = (request_count as f64 * 0.95) as usize;
565
566 let p50_tps = tps_values.get(p50_idx).cloned().unwrap_or(0.0);
567 let p95_tps = tps_values.get(p95_idx).cloned().unwrap_or(0.0);
568
569 let p50_latency_ms = latency_values.get(p50_idx).cloned().unwrap_or(0.0);
570 let p95_latency_ms = latency_values.get(p95_idx).cloned().unwrap_or(0.0);
571
572 let avg_tps = tps_values.iter().sum::<f64>() / request_count as f64;
573
574 snapshots.push(ProviderSnapshot {
575 provider,
576 request_count,
577 total_input_tokens,
578 total_output_tokens,
579 avg_tps,
580 avg_latency_ms,
581 p50_tps,
582 p50_latency_ms,
583 p95_tps,
584 p95_latency_ms,
585 });
586 }
587
588 snapshots
589 }
590}
591
592pub fn record_persistent(category: &str, data: &serde_json::Value) -> Result<()> {
594 tracing::debug!(category, data = ?data, "Recording persistent telemetry");
595 Ok(())
597}
598
599#[derive(Debug, Default)]
601pub struct SwarmTelemetryCollector {
602 task_id: Mutex<Option<String>>,
603 agent_count: Mutex<usize>,
604 completed: Mutex<usize>,
605 total: Mutex<usize>,
606 start_time: Mutex<Option<DateTime<Utc>>>,
607}
608
609impl SwarmTelemetryCollector {
610 pub fn new() -> Self {
611 Self::default()
612 }
613
614 pub async fn start_swarm(&self, task_id: &str, agent_count: usize, _strategy: &str) {
615 let mut id = self.task_id.lock().await;
616 *id = Some(task_id.to_string());
617 let mut count = self.agent_count.lock().await;
618 *count = agent_count;
619 let mut start = self.start_time.lock().await;
620 *start = Some(Utc::now());
621 tracing::info!(task_id, agent_count, "Swarm started");
622 }
623
624 pub async fn record_progress(&self, completed: usize, total: usize) {
625 let mut c = self.completed.lock().await;
626 *c = completed;
627 let mut t = self.total.lock().await;
628 *t = total;
629 }
630
631 pub async fn record_swarm_latency(&self, _label: &str, duration: std::time::Duration) {
632 tracing::debug!(label = _label, duration_ms = duration.as_millis(), "Swarm latency recorded");
633 }
634
635 pub async fn complete_swarm(&self, success: bool) -> TelemetryMetrics {
636 let start = self.start_time.lock().await;
637 let duration = start.map(|s| (Utc::now() - s).num_milliseconds() as u64).unwrap_or(0);
638 drop(start);
639
640 let completed = *self.completed.lock().await;
641 let total = *self.total.lock().await;
642
643 tracing::info!(
644 success,
645 completed,
646 total,
647 duration_ms = duration,
648 "Swarm completed"
649 );
650
651 TelemetryMetrics {
652 tool_invocations: total as u64,
653 successful_operations: if success { completed as u64 } else { 0 },
654 failed_operations: if !success { (total.saturating_sub(completed)) as u64 } else { 0 },
655 total_tokens: 0,
656 avg_latency_ms: duration as f64,
657 }
658 }
659}
660
661#[derive(Debug, Clone, Default, Serialize, Deserialize)]
663pub struct TelemetryMetrics {
664 pub tool_invocations: u64,
666 pub successful_operations: u64,
668 pub failed_operations: u64,
670 pub total_tokens: u64,
672 pub avg_latency_ms: f64,
674}
675
676#[derive(Debug)]
678pub struct Telemetry {
679 metrics: Mutex<TelemetryMetrics>,
680 pub metadata: HashMap<String, String>,
682}
683
684impl Telemetry {
685 pub fn new() -> Self {
687 Self {
688 metrics: Mutex::new(TelemetryMetrics::default()),
689 metadata: HashMap::new(),
690 }
691 }
692
693 pub async fn record_tool_invocation(&self, success: bool, latency_ms: u64, tokens: u64) {
695 let mut metrics = self.metrics.lock().await;
696 metrics.tool_invocations += 1;
697 if success {
698 metrics.successful_operations += 1;
699 } else {
700 metrics.failed_operations += 1;
701 }
702 metrics.total_tokens += tokens;
703 let n = metrics.tool_invocations as f64;
705 metrics.avg_latency_ms = metrics.avg_latency_ms * (n - 1.0) / n + latency_ms as f64 / n;
706 }
707
708 pub async fn get_metrics(&self) -> TelemetryMetrics {
710 self.metrics.lock().await.clone()
711 }
712
713 pub async fn start_swarm(&self, _task_id: &str, _agent_count: usize) {
715 }
717
718 pub async fn record_swarm_progress(&self, _task_id: &str, _completed: usize, _total: usize) {
720 }
722
723 pub async fn complete_swarm(&self, _success: bool) -> TelemetryMetrics {
725 self.metrics.lock().await.clone()
726 }
727}
728
729impl Default for Telemetry {
730 fn default() -> Self {
731 Self::new()
732 }
733}
734
735#[derive(Debug, Clone, Default, Serialize, Deserialize)]
737pub struct PersistentStats {
738 pub stats: PersistentStatsInner,
739}
740
741#[derive(Debug, Clone, Default, Serialize, Deserialize)]
742pub struct PersistentStatsInner {
743 pub total_input_tokens: u64,
744 pub total_output_tokens: u64,
745 pub total_requests: u64,
746 pub executions_by_tool: HashMap<String, u64>,
747 pub files_modified: HashMap<String, u64>,
748}
749
750impl PersistentStats {
751 pub fn recent(&self, _limit: usize) -> Vec<ToolExecution> {
752 Vec::new()
753 }
754
755 pub fn all_file_changes(&self) -> Vec<(String, FileChange)> {
756 Vec::new()
757 }
758
759 pub fn by_tool(&self, _tool_name: &str) -> Vec<ToolExecution> {
760 Vec::new()
761 }
762
763 pub fn by_file(&self, _file_path: &str) -> Vec<ToolExecution> {
764 Vec::new()
765 }
766
767 pub fn summary(&self) -> String {
768 "0 total executions".to_string()
769 }
770}
771
772pub fn get_persistent_stats() -> PersistentStats {
774 PersistentStats::default()
775}