1use std::collections::VecDeque;
5
6use tokio::sync::watch;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SecurityEventCategory {
11 InjectionFlag,
12 ExfiltrationBlock,
13 Quarantine,
14 Truncation,
15}
16
17impl SecurityEventCategory {
18 #[must_use]
19 pub fn as_str(self) -> &'static str {
20 match self {
21 Self::InjectionFlag => "injection",
22 Self::ExfiltrationBlock => "exfil",
23 Self::Quarantine => "quarantine",
24 Self::Truncation => "truncation",
25 }
26 }
27}
28
29#[derive(Debug, Clone)]
31pub struct SecurityEvent {
32 pub timestamp: u64,
34 pub category: SecurityEventCategory,
35 pub source: String,
37 pub detail: String,
39}
40
41impl SecurityEvent {
42 #[must_use]
43 pub fn new(
44 category: SecurityEventCategory,
45 source: impl Into<String>,
46 detail: impl Into<String>,
47 ) -> Self {
48 let source: String = source
50 .into()
51 .chars()
52 .filter(|c| !c.is_ascii_control())
53 .take(64)
54 .collect();
55 let detail = detail.into();
57 let detail = if detail.len() > 128 {
58 let end = detail.floor_char_boundary(127);
59 format!("{}…", &detail[..end])
60 } else {
61 detail
62 };
63 Self {
64 timestamp: std::time::SystemTime::now()
65 .duration_since(std::time::UNIX_EPOCH)
66 .unwrap_or_default()
67 .as_secs(),
68 category,
69 source,
70 detail,
71 }
72 }
73}
74
75pub const SECURITY_EVENT_CAP: usize = 100;
77
78#[derive(Debug, Clone)]
82pub struct TaskSnapshotRow {
83 pub id: u32,
84 pub title: String,
85 pub status: String,
87 pub agent: Option<String>,
88 pub duration_ms: u64,
89 pub error: Option<String>,
91}
92
93#[derive(Debug, Clone, Default)]
95pub struct TaskGraphSnapshot {
96 pub graph_id: String,
97 pub goal: String,
98 pub status: String,
100 pub tasks: Vec<TaskSnapshotRow>,
101 pub completed_at: Option<std::time::Instant>,
102}
103
104impl TaskGraphSnapshot {
105 #[must_use]
108 pub fn is_stale(&self) -> bool {
109 self.completed_at
110 .is_some_and(|t| t.elapsed().as_secs() > 30)
111 }
112}
113
114#[derive(Debug, Clone, Default)]
118pub struct OrchestrationMetrics {
119 pub plans_total: u64,
120 pub tasks_total: u64,
121 pub tasks_completed: u64,
122 pub tasks_failed: u64,
123 pub tasks_skipped: u64,
124}
125
126#[derive(Debug, Clone, Default)]
128pub struct SkillConfidence {
129 pub name: String,
130 pub posterior: f64,
131 pub total_uses: u32,
132}
133
134#[derive(Debug, Clone, Default)]
136pub struct SubAgentMetrics {
137 pub id: String,
138 pub name: String,
139 pub state: String,
141 pub turns_used: u32,
142 pub max_turns: u32,
143 pub background: bool,
144 pub elapsed_secs: u64,
145 pub permission_mode: String,
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct MetricsSnapshot {
152 pub prompt_tokens: u64,
153 pub completion_tokens: u64,
154 pub total_tokens: u64,
155 pub context_tokens: u64,
156 pub api_calls: u64,
157 pub active_skills: Vec<String>,
158 pub total_skills: usize,
159 pub mcp_server_count: usize,
160 pub mcp_tool_count: usize,
161 pub active_mcp_tools: Vec<String>,
162 pub sqlite_message_count: u64,
163 pub sqlite_conversation_id: Option<zeph_memory::ConversationId>,
164 pub qdrant_available: bool,
165 pub vector_backend: String,
166 pub embeddings_generated: u64,
167 pub last_llm_latency_ms: u64,
168 pub uptime_seconds: u64,
169 pub provider_name: String,
170 pub model_name: String,
171 pub summaries_count: u64,
172 pub context_compactions: u64,
173 pub compression_events: u64,
174 pub compression_tokens_saved: u64,
175 pub tool_output_prunes: u64,
176 pub cache_read_tokens: u64,
177 pub cache_creation_tokens: u64,
178 pub cost_spent_cents: f64,
179 pub filter_raw_tokens: u64,
180 pub filter_saved_tokens: u64,
181 pub filter_applications: u64,
182 pub filter_total_commands: u64,
183 pub filter_filtered_commands: u64,
184 pub filter_confidence_full: u64,
185 pub filter_confidence_partial: u64,
186 pub filter_confidence_fallback: u64,
187 pub cancellations: u64,
188 pub server_compaction_events: u64,
189 pub sanitizer_runs: u64,
190 pub sanitizer_injection_flags: u64,
191 pub sanitizer_truncations: u64,
192 pub quarantine_invocations: u64,
193 pub quarantine_failures: u64,
194 pub exfiltration_images_blocked: u64,
195 pub exfiltration_tool_urls_flagged: u64,
196 pub exfiltration_memory_guards: u64,
197 pub sub_agents: Vec<SubAgentMetrics>,
198 pub skill_confidence: Vec<SkillConfidence>,
199 pub scheduled_tasks: Vec<[String; 4]>,
201 pub router_thompson_stats: Vec<(String, f64, f64)>,
203 pub security_events: VecDeque<SecurityEvent>,
205 pub orchestration: OrchestrationMetrics,
206 pub orchestration_graph: Option<TaskGraphSnapshot>,
208 pub graph_community_detection_failures: u64,
209 pub graph_entities_total: u64,
210 pub graph_edges_total: u64,
211 pub graph_communities_total: u64,
212 pub graph_extraction_count: u64,
213 pub graph_extraction_failures: u64,
214}
215
216fn strip_ctrl(s: &str) -> String {
222 let mut out = String::with_capacity(s.len());
223 let mut chars = s.chars().peekable();
224 while let Some(c) = chars.next() {
225 if c == '\x1b' {
226 if chars.peek() == Some(&'[') {
228 chars.next(); for inner in chars.by_ref() {
230 if ('\x40'..='\x7e').contains(&inner) {
231 break;
232 }
233 }
234 }
235 } else if c.is_control() && c != '\t' && c != '\n' && c != '\r' {
237 } else {
239 out.push(c);
240 }
241 }
242 out
243}
244
245impl From<&crate::orchestration::TaskGraph> for TaskGraphSnapshot {
247 fn from(graph: &crate::orchestration::TaskGraph) -> Self {
248 let tasks = graph
249 .tasks
250 .iter()
251 .map(|t| {
252 let error = t
253 .result
254 .as_ref()
255 .filter(|_| t.status == crate::orchestration::TaskStatus::Failed)
256 .and_then(|r| {
257 if r.output.is_empty() {
258 None
259 } else {
260 let s = strip_ctrl(&r.output);
262 if s.len() > 80 {
263 let end = s.floor_char_boundary(79);
264 Some(format!("{}…", &s[..end]))
265 } else {
266 Some(s)
267 }
268 }
269 });
270 let duration_ms = t.result.as_ref().map_or(0, |r| r.duration_ms);
271 TaskSnapshotRow {
272 id: t.id.0,
273 title: strip_ctrl(&t.title),
274 status: t.status.to_string(),
275 agent: t.assigned_agent.as_deref().map(strip_ctrl),
276 duration_ms,
277 error,
278 }
279 })
280 .collect();
281 Self {
282 graph_id: graph.id.to_string(),
283 goal: strip_ctrl(&graph.goal),
284 status: graph.status.to_string(),
285 tasks,
286 completed_at: None,
287 }
288 }
289}
290
291pub struct MetricsCollector {
292 tx: watch::Sender<MetricsSnapshot>,
293}
294
295impl MetricsCollector {
296 #[must_use]
297 pub fn new() -> (Self, watch::Receiver<MetricsSnapshot>) {
298 let (tx, rx) = watch::channel(MetricsSnapshot::default());
299 (Self { tx }, rx)
300 }
301
302 pub fn update(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
303 self.tx.send_modify(f);
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 #![allow(clippy::field_reassign_with_default)]
310
311 use super::*;
312
313 #[test]
314 fn default_metrics_snapshot() {
315 let m = MetricsSnapshot::default();
316 assert_eq!(m.total_tokens, 0);
317 assert_eq!(m.api_calls, 0);
318 assert!(m.active_skills.is_empty());
319 assert!(m.active_mcp_tools.is_empty());
320 assert_eq!(m.mcp_tool_count, 0);
321 assert_eq!(m.mcp_server_count, 0);
322 assert!(m.provider_name.is_empty());
323 assert_eq!(m.summaries_count, 0);
324 }
325
326 #[test]
327 fn metrics_collector_update() {
328 let (collector, rx) = MetricsCollector::new();
329 collector.update(|m| {
330 m.api_calls = 5;
331 m.total_tokens = 1000;
332 });
333 let snapshot = rx.borrow().clone();
334 assert_eq!(snapshot.api_calls, 5);
335 assert_eq!(snapshot.total_tokens, 1000);
336 }
337
338 #[test]
339 fn metrics_collector_multiple_updates() {
340 let (collector, rx) = MetricsCollector::new();
341 collector.update(|m| m.api_calls = 1);
342 collector.update(|m| m.api_calls += 1);
343 assert_eq!(rx.borrow().api_calls, 2);
344 }
345
346 #[test]
347 fn metrics_snapshot_clone() {
348 let mut m = MetricsSnapshot::default();
349 m.provider_name = "ollama".into();
350 let cloned = m.clone();
351 assert_eq!(cloned.provider_name, "ollama");
352 }
353
354 #[test]
355 fn filter_metrics_tracking() {
356 let (collector, rx) = MetricsCollector::new();
357 collector.update(|m| {
358 m.filter_raw_tokens += 250;
359 m.filter_saved_tokens += 200;
360 m.filter_applications += 1;
361 });
362 collector.update(|m| {
363 m.filter_raw_tokens += 100;
364 m.filter_saved_tokens += 80;
365 m.filter_applications += 1;
366 });
367 let s = rx.borrow();
368 assert_eq!(s.filter_raw_tokens, 350);
369 assert_eq!(s.filter_saved_tokens, 280);
370 assert_eq!(s.filter_applications, 2);
371 }
372
373 #[test]
374 fn filter_confidence_and_command_metrics() {
375 let (collector, rx) = MetricsCollector::new();
376 collector.update(|m| {
377 m.filter_total_commands += 1;
378 m.filter_filtered_commands += 1;
379 m.filter_confidence_full += 1;
380 });
381 collector.update(|m| {
382 m.filter_total_commands += 1;
383 m.filter_confidence_partial += 1;
384 });
385 let s = rx.borrow();
386 assert_eq!(s.filter_total_commands, 2);
387 assert_eq!(s.filter_filtered_commands, 1);
388 assert_eq!(s.filter_confidence_full, 1);
389 assert_eq!(s.filter_confidence_partial, 1);
390 assert_eq!(s.filter_confidence_fallback, 0);
391 }
392
393 #[test]
394 fn summaries_count_tracks_summarizations() {
395 let (collector, rx) = MetricsCollector::new();
396 collector.update(|m| m.summaries_count += 1);
397 collector.update(|m| m.summaries_count += 1);
398 assert_eq!(rx.borrow().summaries_count, 2);
399 }
400
401 #[test]
402 fn cancellations_counter_increments() {
403 let (collector, rx) = MetricsCollector::new();
404 assert_eq!(rx.borrow().cancellations, 0);
405 collector.update(|m| m.cancellations += 1);
406 collector.update(|m| m.cancellations += 1);
407 assert_eq!(rx.borrow().cancellations, 2);
408 }
409
410 #[test]
411 fn security_event_detail_exact_128_not_truncated() {
412 let s = "a".repeat(128);
413 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s.clone());
414 assert_eq!(ev.detail, s, "128-char string must not be truncated");
415 }
416
417 #[test]
418 fn security_event_detail_129_is_truncated() {
419 let s = "a".repeat(129);
420 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
421 assert!(
422 ev.detail.ends_with('…'),
423 "129-char string must end with ellipsis"
424 );
425 assert!(
426 ev.detail.len() <= 130,
427 "truncated detail must be at most 130 bytes"
428 );
429 }
430
431 #[test]
432 fn security_event_detail_multibyte_utf8_no_panic() {
433 let s = "中".repeat(43);
435 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
436 assert!(ev.detail.ends_with('…'));
437 }
438
439 #[test]
440 fn security_event_source_capped_at_64_chars() {
441 let long_source = "x".repeat(200);
442 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, long_source, "detail");
443 assert_eq!(ev.source.len(), 64);
444 }
445
446 #[test]
447 fn security_event_source_strips_control_chars() {
448 let source = "tool\x00name\x1b[31m";
449 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, source, "detail");
450 assert!(!ev.source.contains('\x00'));
451 assert!(!ev.source.contains('\x1b'));
452 }
453
454 #[test]
455 fn security_event_category_as_str() {
456 assert_eq!(SecurityEventCategory::InjectionFlag.as_str(), "injection");
457 assert_eq!(SecurityEventCategory::ExfiltrationBlock.as_str(), "exfil");
458 assert_eq!(SecurityEventCategory::Quarantine.as_str(), "quarantine");
459 assert_eq!(SecurityEventCategory::Truncation.as_str(), "truncation");
460 }
461
462 #[test]
463 fn ring_buffer_respects_cap_via_update() {
464 let (collector, rx) = MetricsCollector::new();
465 for i in 0..110u64 {
466 let event = SecurityEvent::new(
467 SecurityEventCategory::InjectionFlag,
468 "src",
469 format!("event {i}"),
470 );
471 collector.update(|m| {
472 if m.security_events.len() >= SECURITY_EVENT_CAP {
473 m.security_events.pop_front();
474 }
475 m.security_events.push_back(event);
476 });
477 }
478 let snap = rx.borrow();
479 assert_eq!(snap.security_events.len(), SECURITY_EVENT_CAP);
480 assert!(snap.security_events.back().unwrap().detail.contains("109"));
482 }
483
484 #[test]
485 fn security_events_empty_by_default() {
486 let m = MetricsSnapshot::default();
487 assert!(m.security_events.is_empty());
488 }
489
490 #[test]
491 fn orchestration_metrics_default_zero() {
492 let m = OrchestrationMetrics::default();
493 assert_eq!(m.plans_total, 0);
494 assert_eq!(m.tasks_total, 0);
495 assert_eq!(m.tasks_completed, 0);
496 assert_eq!(m.tasks_failed, 0);
497 assert_eq!(m.tasks_skipped, 0);
498 }
499
500 #[test]
501 fn metrics_snapshot_includes_orchestration_default_zero() {
502 let m = MetricsSnapshot::default();
503 assert_eq!(m.orchestration.plans_total, 0);
504 assert_eq!(m.orchestration.tasks_total, 0);
505 assert_eq!(m.orchestration.tasks_completed, 0);
506 }
507
508 #[test]
509 fn orchestration_metrics_update_via_collector() {
510 let (collector, rx) = MetricsCollector::new();
511 collector.update(|m| {
512 m.orchestration.plans_total += 1;
513 m.orchestration.tasks_total += 5;
514 m.orchestration.tasks_completed += 3;
515 m.orchestration.tasks_failed += 1;
516 m.orchestration.tasks_skipped += 1;
517 });
518 let s = rx.borrow();
519 assert_eq!(s.orchestration.plans_total, 1);
520 assert_eq!(s.orchestration.tasks_total, 5);
521 assert_eq!(s.orchestration.tasks_completed, 3);
522 assert_eq!(s.orchestration.tasks_failed, 1);
523 assert_eq!(s.orchestration.tasks_skipped, 1);
524 }
525
526 #[test]
527 fn strip_ctrl_removes_escape_sequences() {
528 let input = "hello\x1b[31mworld\x00end";
529 let result = strip_ctrl(input);
530 assert_eq!(result, "helloworldend");
531 }
532
533 #[test]
534 fn strip_ctrl_allows_tab_lf_cr() {
535 let input = "a\tb\nc\rd";
536 let result = strip_ctrl(input);
537 assert_eq!(result, "a\tb\nc\rd");
538 }
539
540 #[test]
541 fn task_graph_snapshot_is_stale_after_30s() {
542 let mut snap = TaskGraphSnapshot::default();
543 assert!(!snap.is_stale());
545 snap.completed_at = Some(std::time::Instant::now());
547 assert!(!snap.is_stale());
548 snap.completed_at = Some(
550 std::time::Instant::now()
551 .checked_sub(std::time::Duration::from_secs(31))
552 .unwrap(),
553 );
554 assert!(snap.is_stale());
555 }
556
557 #[test]
559 fn task_graph_snapshot_from_task_graph_maps_fields() {
560 use crate::orchestration::{GraphStatus, TaskGraph, TaskNode, TaskResult, TaskStatus};
561
562 let mut graph = TaskGraph::new("My goal");
563 let mut task = TaskNode::new(0, "Do work", "description");
564 task.status = TaskStatus::Failed;
565 task.assigned_agent = Some("agent-1".into());
566 task.result = Some(TaskResult {
567 output: "error occurred here".into(),
568 artifacts: vec![],
569 duration_ms: 1234,
570 agent_id: None,
571 agent_def: None,
572 });
573 graph.tasks.push(task);
574 graph.status = GraphStatus::Failed;
575
576 let snap = TaskGraphSnapshot::from(&graph);
577 assert_eq!(snap.goal, "My goal");
578 assert_eq!(snap.status, "failed");
579 assert_eq!(snap.tasks.len(), 1);
580 let row = &snap.tasks[0];
581 assert_eq!(row.title, "Do work");
582 assert_eq!(row.status, "failed");
583 assert_eq!(row.agent.as_deref(), Some("agent-1"));
584 assert_eq!(row.duration_ms, 1234);
585 assert!(row.error.as_deref().unwrap().contains("error occurred"));
586 }
587
588 #[test]
590 fn task_graph_snapshot_from_compiles_with_feature() {
591 use crate::orchestration::TaskGraph;
592 let graph = TaskGraph::new("feature flag test");
593 let snap = TaskGraphSnapshot::from(&graph);
594 assert_eq!(snap.goal, "feature flag test");
595 assert!(snap.tasks.is_empty());
596 assert!(!snap.is_stale());
597 }
598
599 #[test]
601 fn task_graph_snapshot_error_truncated_at_80_chars() {
602 use crate::orchestration::{TaskGraph, TaskNode, TaskResult, TaskStatus};
603
604 let mut graph = TaskGraph::new("goal");
605 let mut task = TaskNode::new(0, "t", "d");
606 task.status = TaskStatus::Failed;
607 task.result = Some(TaskResult {
608 output: "e".repeat(100),
609 artifacts: vec![],
610 duration_ms: 0,
611 agent_id: None,
612 agent_def: None,
613 });
614 graph.tasks.push(task);
615
616 let snap = TaskGraphSnapshot::from(&graph);
617 let err = snap.tasks[0].error.as_ref().unwrap();
618 assert!(err.ends_with('…'), "truncated error must end with ellipsis");
619 assert!(
620 err.len() <= 83,
621 "truncated error must not exceed 80 chars + ellipsis"
622 );
623 }
624
625 #[test]
627 fn task_graph_snapshot_strips_control_chars_from_title() {
628 use crate::orchestration::{TaskGraph, TaskNode};
629
630 let mut graph = TaskGraph::new("goal\x1b[31m");
631 let task = TaskNode::new(0, "title\x00injected", "d");
632 graph.tasks.push(task);
633
634 let snap = TaskGraphSnapshot::from(&graph);
635 assert!(!snap.goal.contains('\x1b'), "goal must not contain escape");
636 assert!(
637 !snap.tasks[0].title.contains('\x00'),
638 "title must not contain null byte"
639 );
640 }
641
642 #[test]
643 fn graph_metrics_default_zero() {
644 let m = MetricsSnapshot::default();
645 assert_eq!(m.graph_entities_total, 0);
646 assert_eq!(m.graph_edges_total, 0);
647 assert_eq!(m.graph_communities_total, 0);
648 assert_eq!(m.graph_extraction_count, 0);
649 assert_eq!(m.graph_extraction_failures, 0);
650 }
651
652 #[test]
653 fn graph_metrics_update_via_collector() {
654 let (collector, rx) = MetricsCollector::new();
655 collector.update(|m| {
656 m.graph_entities_total = 5;
657 m.graph_edges_total = 10;
658 m.graph_communities_total = 2;
659 m.graph_extraction_count = 7;
660 m.graph_extraction_failures = 1;
661 });
662 let snapshot = rx.borrow().clone();
663 assert_eq!(snapshot.graph_entities_total, 5);
664 assert_eq!(snapshot.graph_edges_total, 10);
665 assert_eq!(snapshot.graph_communities_total, 2);
666 assert_eq!(snapshot.graph_extraction_count, 7);
667 assert_eq!(snapshot.graph_extraction_failures, 1);
668 }
669}