1use std::collections::VecDeque;
5
6use tokio::sync::watch;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SecurityEventCategory {
11 InjectionFlag,
12 ExfiltrationBlock,
13 Quarantine,
14 Truncation,
15}
16
17impl SecurityEventCategory {
18 #[must_use]
19 pub fn as_str(self) -> &'static str {
20 match self {
21 Self::InjectionFlag => "injection",
22 Self::ExfiltrationBlock => "exfil",
23 Self::Quarantine => "quarantine",
24 Self::Truncation => "truncation",
25 }
26 }
27}
28
29#[derive(Debug, Clone)]
31pub struct SecurityEvent {
32 pub timestamp: u64,
34 pub category: SecurityEventCategory,
35 pub source: String,
37 pub detail: String,
39}
40
41impl SecurityEvent {
42 #[must_use]
43 pub fn new(
44 category: SecurityEventCategory,
45 source: impl Into<String>,
46 detail: impl Into<String>,
47 ) -> Self {
48 let source: String = source
50 .into()
51 .chars()
52 .filter(|c| !c.is_ascii_control())
53 .take(64)
54 .collect();
55 let detail = detail.into();
57 let detail = if detail.len() > 128 {
58 let end = detail.floor_char_boundary(127);
59 format!("{}…", &detail[..end])
60 } else {
61 detail
62 };
63 Self {
64 timestamp: std::time::SystemTime::now()
65 .duration_since(std::time::UNIX_EPOCH)
66 .unwrap_or_default()
67 .as_secs(),
68 category,
69 source,
70 detail,
71 }
72 }
73}
74
75pub const SECURITY_EVENT_CAP: usize = 100;
77
78#[derive(Debug, Clone)]
82pub struct TaskSnapshotRow {
83 pub id: u32,
84 pub title: String,
85 pub status: String,
87 pub agent: Option<String>,
88 pub duration_ms: u64,
89 pub error: Option<String>,
91}
92
93#[derive(Debug, Clone, Default)]
98pub struct TaskGraphSnapshot {
99 pub graph_id: String,
100 pub goal: String,
101 pub status: String,
103 pub tasks: Vec<TaskSnapshotRow>,
104 pub completed_at: Option<std::time::Instant>,
105}
106
107impl TaskGraphSnapshot {
108 #[must_use]
111 pub fn is_stale(&self) -> bool {
112 self.completed_at
113 .is_some_and(|t| t.elapsed().as_secs() > 30)
114 }
115}
116
117#[derive(Debug, Clone, Default)]
121pub struct OrchestrationMetrics {
122 pub plans_total: u64,
123 pub tasks_total: u64,
124 pub tasks_completed: u64,
125 pub tasks_failed: u64,
126 pub tasks_skipped: u64,
127}
128
129#[derive(Debug, Clone, Default)]
131pub struct SkillConfidence {
132 pub name: String,
133 pub posterior: f64,
134 pub total_uses: u32,
135}
136
137#[derive(Debug, Clone, Default)]
139pub struct SubAgentMetrics {
140 pub id: String,
141 pub name: String,
142 pub state: String,
144 pub turns_used: u32,
145 pub max_turns: u32,
146 pub background: bool,
147 pub elapsed_secs: u64,
148 pub permission_mode: String,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct MetricsSnapshot {
155 pub prompt_tokens: u64,
156 pub completion_tokens: u64,
157 pub total_tokens: u64,
158 pub context_tokens: u64,
159 pub api_calls: u64,
160 pub active_skills: Vec<String>,
161 pub total_skills: usize,
162 pub mcp_server_count: usize,
163 pub mcp_tool_count: usize,
164 pub active_mcp_tools: Vec<String>,
165 pub sqlite_message_count: u64,
166 pub sqlite_conversation_id: Option<zeph_memory::ConversationId>,
167 pub qdrant_available: bool,
168 pub vector_backend: String,
169 pub embeddings_generated: u64,
170 pub last_llm_latency_ms: u64,
171 pub uptime_seconds: u64,
172 pub provider_name: String,
173 pub model_name: String,
174 pub summaries_count: u64,
175 pub context_compactions: u64,
176 pub compression_events: u64,
177 pub compression_tokens_saved: u64,
178 pub tool_output_prunes: u64,
179 pub cache_read_tokens: u64,
180 pub cache_creation_tokens: u64,
181 pub cost_spent_cents: f64,
182 pub filter_raw_tokens: u64,
183 pub filter_saved_tokens: u64,
184 pub filter_applications: u64,
185 pub filter_total_commands: u64,
186 pub filter_filtered_commands: u64,
187 pub filter_confidence_full: u64,
188 pub filter_confidence_partial: u64,
189 pub filter_confidence_fallback: u64,
190 pub cancellations: u64,
191 pub sanitizer_runs: u64,
192 pub sanitizer_injection_flags: u64,
193 pub sanitizer_truncations: u64,
194 pub quarantine_invocations: u64,
195 pub quarantine_failures: u64,
196 pub exfiltration_images_blocked: u64,
197 pub exfiltration_tool_urls_flagged: u64,
198 pub exfiltration_memory_guards: u64,
199 pub sub_agents: Vec<SubAgentMetrics>,
200 pub skill_confidence: Vec<SkillConfidence>,
201 pub scheduled_tasks: Vec<[String; 4]>,
203 pub router_thompson_stats: Vec<(String, f64, f64)>,
205 pub security_events: VecDeque<SecurityEvent>,
207 pub orchestration: OrchestrationMetrics,
208 pub orchestration_graph: Option<TaskGraphSnapshot>,
210 pub graph_community_detection_failures: u64,
211 pub graph_entities_total: u64,
212 pub graph_edges_total: u64,
213 pub graph_communities_total: u64,
214 pub graph_extraction_count: u64,
215 pub graph_extraction_failures: u64,
216}
217
218#[cfg_attr(not(feature = "orchestration"), allow(dead_code))]
224fn strip_ctrl(s: &str) -> String {
225 let mut out = String::with_capacity(s.len());
226 let mut chars = s.chars().peekable();
227 while let Some(c) = chars.next() {
228 if c == '\x1b' {
229 if chars.peek() == Some(&'[') {
231 chars.next(); for inner in chars.by_ref() {
233 if ('\x40'..='\x7e').contains(&inner) {
234 break;
235 }
236 }
237 }
238 } else if c.is_control() && c != '\t' && c != '\n' && c != '\r' {
240 } else {
242 out.push(c);
243 }
244 }
245 out
246}
247
248#[cfg(feature = "orchestration")]
253impl From<&crate::orchestration::TaskGraph> for TaskGraphSnapshot {
254 fn from(graph: &crate::orchestration::TaskGraph) -> Self {
255 let tasks = graph
256 .tasks
257 .iter()
258 .map(|t| {
259 let error = t
260 .result
261 .as_ref()
262 .filter(|_| t.status == crate::orchestration::TaskStatus::Failed)
263 .and_then(|r| {
264 if r.output.is_empty() {
265 None
266 } else {
267 let s = strip_ctrl(&r.output);
269 if s.len() > 80 {
270 let end = s.floor_char_boundary(79);
271 Some(format!("{}…", &s[..end]))
272 } else {
273 Some(s)
274 }
275 }
276 });
277 let duration_ms = t.result.as_ref().map_or(0, |r| r.duration_ms);
278 TaskSnapshotRow {
279 id: t.id.0,
280 title: strip_ctrl(&t.title),
281 status: t.status.to_string(),
282 agent: t.assigned_agent.as_deref().map(strip_ctrl),
283 duration_ms,
284 error,
285 }
286 })
287 .collect();
288 Self {
289 graph_id: graph.id.to_string(),
290 goal: strip_ctrl(&graph.goal),
291 status: graph.status.to_string(),
292 tasks,
293 completed_at: None,
294 }
295 }
296}
297
298pub struct MetricsCollector {
299 tx: watch::Sender<MetricsSnapshot>,
300}
301
302impl MetricsCollector {
303 #[must_use]
304 pub fn new() -> (Self, watch::Receiver<MetricsSnapshot>) {
305 let (tx, rx) = watch::channel(MetricsSnapshot::default());
306 (Self { tx }, rx)
307 }
308
309 pub fn update(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
310 self.tx.send_modify(f);
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn default_metrics_snapshot() {
320 let m = MetricsSnapshot::default();
321 assert_eq!(m.total_tokens, 0);
322 assert_eq!(m.api_calls, 0);
323 assert!(m.active_skills.is_empty());
324 assert!(m.active_mcp_tools.is_empty());
325 assert_eq!(m.mcp_tool_count, 0);
326 assert_eq!(m.mcp_server_count, 0);
327 assert!(m.provider_name.is_empty());
328 assert_eq!(m.summaries_count, 0);
329 }
330
331 #[test]
332 fn metrics_collector_update() {
333 let (collector, rx) = MetricsCollector::new();
334 collector.update(|m| {
335 m.api_calls = 5;
336 m.total_tokens = 1000;
337 });
338 let snapshot = rx.borrow().clone();
339 assert_eq!(snapshot.api_calls, 5);
340 assert_eq!(snapshot.total_tokens, 1000);
341 }
342
343 #[test]
344 fn metrics_collector_multiple_updates() {
345 let (collector, rx) = MetricsCollector::new();
346 collector.update(|m| m.api_calls = 1);
347 collector.update(|m| m.api_calls += 1);
348 assert_eq!(rx.borrow().api_calls, 2);
349 }
350
351 #[test]
352 fn metrics_snapshot_clone() {
353 let mut m = MetricsSnapshot::default();
354 m.provider_name = "ollama".into();
355 let cloned = m.clone();
356 assert_eq!(cloned.provider_name, "ollama");
357 }
358
359 #[test]
360 fn filter_metrics_tracking() {
361 let (collector, rx) = MetricsCollector::new();
362 collector.update(|m| {
363 m.filter_raw_tokens += 250;
364 m.filter_saved_tokens += 200;
365 m.filter_applications += 1;
366 });
367 collector.update(|m| {
368 m.filter_raw_tokens += 100;
369 m.filter_saved_tokens += 80;
370 m.filter_applications += 1;
371 });
372 let s = rx.borrow();
373 assert_eq!(s.filter_raw_tokens, 350);
374 assert_eq!(s.filter_saved_tokens, 280);
375 assert_eq!(s.filter_applications, 2);
376 }
377
378 #[test]
379 fn filter_confidence_and_command_metrics() {
380 let (collector, rx) = MetricsCollector::new();
381 collector.update(|m| {
382 m.filter_total_commands += 1;
383 m.filter_filtered_commands += 1;
384 m.filter_confidence_full += 1;
385 });
386 collector.update(|m| {
387 m.filter_total_commands += 1;
388 m.filter_confidence_partial += 1;
389 });
390 let s = rx.borrow();
391 assert_eq!(s.filter_total_commands, 2);
392 assert_eq!(s.filter_filtered_commands, 1);
393 assert_eq!(s.filter_confidence_full, 1);
394 assert_eq!(s.filter_confidence_partial, 1);
395 assert_eq!(s.filter_confidence_fallback, 0);
396 }
397
398 #[test]
399 fn summaries_count_tracks_summarizations() {
400 let (collector, rx) = MetricsCollector::new();
401 collector.update(|m| m.summaries_count += 1);
402 collector.update(|m| m.summaries_count += 1);
403 assert_eq!(rx.borrow().summaries_count, 2);
404 }
405
406 #[test]
407 fn cancellations_counter_increments() {
408 let (collector, rx) = MetricsCollector::new();
409 assert_eq!(rx.borrow().cancellations, 0);
410 collector.update(|m| m.cancellations += 1);
411 collector.update(|m| m.cancellations += 1);
412 assert_eq!(rx.borrow().cancellations, 2);
413 }
414
415 #[test]
416 fn security_event_detail_exact_128_not_truncated() {
417 let s = "a".repeat(128);
418 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s.clone());
419 assert_eq!(ev.detail, s, "128-char string must not be truncated");
420 }
421
422 #[test]
423 fn security_event_detail_129_is_truncated() {
424 let s = "a".repeat(129);
425 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
426 assert!(
427 ev.detail.ends_with('…'),
428 "129-char string must end with ellipsis"
429 );
430 assert!(
431 ev.detail.len() <= 130,
432 "truncated detail must be at most 130 bytes"
433 );
434 }
435
436 #[test]
437 fn security_event_detail_multibyte_utf8_no_panic() {
438 let s = "中".repeat(43);
440 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
441 assert!(ev.detail.ends_with('…'));
442 }
443
444 #[test]
445 fn security_event_source_capped_at_64_chars() {
446 let long_source = "x".repeat(200);
447 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, long_source, "detail");
448 assert_eq!(ev.source.len(), 64);
449 }
450
451 #[test]
452 fn security_event_source_strips_control_chars() {
453 let source = "tool\x00name\x1b[31m";
454 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, source, "detail");
455 assert!(!ev.source.contains('\x00'));
456 assert!(!ev.source.contains('\x1b'));
457 }
458
459 #[test]
460 fn security_event_category_as_str() {
461 assert_eq!(SecurityEventCategory::InjectionFlag.as_str(), "injection");
462 assert_eq!(SecurityEventCategory::ExfiltrationBlock.as_str(), "exfil");
463 assert_eq!(SecurityEventCategory::Quarantine.as_str(), "quarantine");
464 assert_eq!(SecurityEventCategory::Truncation.as_str(), "truncation");
465 }
466
467 #[test]
468 fn ring_buffer_respects_cap_via_update() {
469 let (collector, rx) = MetricsCollector::new();
470 for i in 0..110u64 {
471 let event = SecurityEvent::new(
472 SecurityEventCategory::InjectionFlag,
473 "src",
474 format!("event {i}"),
475 );
476 collector.update(|m| {
477 if m.security_events.len() >= SECURITY_EVENT_CAP {
478 m.security_events.pop_front();
479 }
480 m.security_events.push_back(event);
481 });
482 }
483 let snap = rx.borrow();
484 assert_eq!(snap.security_events.len(), SECURITY_EVENT_CAP);
485 assert!(snap.security_events.back().unwrap().detail.contains("109"));
487 }
488
489 #[test]
490 fn security_events_empty_by_default() {
491 let m = MetricsSnapshot::default();
492 assert!(m.security_events.is_empty());
493 }
494
495 #[test]
496 fn orchestration_metrics_default_zero() {
497 let m = OrchestrationMetrics::default();
498 assert_eq!(m.plans_total, 0);
499 assert_eq!(m.tasks_total, 0);
500 assert_eq!(m.tasks_completed, 0);
501 assert_eq!(m.tasks_failed, 0);
502 assert_eq!(m.tasks_skipped, 0);
503 }
504
505 #[test]
506 fn metrics_snapshot_includes_orchestration_default_zero() {
507 let m = MetricsSnapshot::default();
508 assert_eq!(m.orchestration.plans_total, 0);
509 assert_eq!(m.orchestration.tasks_total, 0);
510 assert_eq!(m.orchestration.tasks_completed, 0);
511 }
512
513 #[test]
514 fn orchestration_metrics_update_via_collector() {
515 let (collector, rx) = MetricsCollector::new();
516 collector.update(|m| {
517 m.orchestration.plans_total += 1;
518 m.orchestration.tasks_total += 5;
519 m.orchestration.tasks_completed += 3;
520 m.orchestration.tasks_failed += 1;
521 m.orchestration.tasks_skipped += 1;
522 });
523 let s = rx.borrow();
524 assert_eq!(s.orchestration.plans_total, 1);
525 assert_eq!(s.orchestration.tasks_total, 5);
526 assert_eq!(s.orchestration.tasks_completed, 3);
527 assert_eq!(s.orchestration.tasks_failed, 1);
528 assert_eq!(s.orchestration.tasks_skipped, 1);
529 }
530
531 #[test]
532 fn strip_ctrl_removes_escape_sequences() {
533 let input = "hello\x1b[31mworld\x00end";
534 let result = strip_ctrl(input);
535 assert_eq!(result, "helloworldend");
536 }
537
538 #[test]
539 fn strip_ctrl_allows_tab_lf_cr() {
540 let input = "a\tb\nc\rd";
541 let result = strip_ctrl(input);
542 assert_eq!(result, "a\tb\nc\rd");
543 }
544
545 #[test]
546 fn task_graph_snapshot_is_stale_after_30s() {
547 let mut snap = TaskGraphSnapshot::default();
548 assert!(!snap.is_stale());
550 snap.completed_at = Some(std::time::Instant::now());
552 assert!(!snap.is_stale());
553 snap.completed_at = Some(
555 std::time::Instant::now()
556 .checked_sub(std::time::Duration::from_secs(31))
557 .unwrap(),
558 );
559 assert!(snap.is_stale());
560 }
561
562 #[cfg(feature = "orchestration")]
564 #[test]
565 fn task_graph_snapshot_from_task_graph_maps_fields() {
566 use crate::orchestration::{GraphStatus, TaskGraph, TaskNode, TaskResult, TaskStatus};
567
568 let mut graph = TaskGraph::new("My goal");
569 let mut task = TaskNode::new(0, "Do work", "description");
570 task.status = TaskStatus::Failed;
571 task.assigned_agent = Some("agent-1".into());
572 task.result = Some(TaskResult {
573 output: "error occurred here".into(),
574 artifacts: vec![],
575 duration_ms: 1234,
576 agent_id: None,
577 agent_def: None,
578 });
579 graph.tasks.push(task);
580 graph.status = GraphStatus::Failed;
581
582 let snap = TaskGraphSnapshot::from(&graph);
583 assert_eq!(snap.goal, "My goal");
584 assert_eq!(snap.status, "failed");
585 assert_eq!(snap.tasks.len(), 1);
586 let row = &snap.tasks[0];
587 assert_eq!(row.title, "Do work");
588 assert_eq!(row.status, "failed");
589 assert_eq!(row.agent.as_deref(), Some("agent-1"));
590 assert_eq!(row.duration_ms, 1234);
591 assert!(row.error.as_deref().unwrap().contains("error occurred"));
592 }
593
594 #[cfg(feature = "orchestration")]
596 #[test]
597 fn task_graph_snapshot_from_compiles_with_feature() {
598 use crate::orchestration::TaskGraph;
599 let graph = TaskGraph::new("feature flag test");
600 let snap = TaskGraphSnapshot::from(&graph);
601 assert_eq!(snap.goal, "feature flag test");
602 assert!(snap.tasks.is_empty());
603 assert!(!snap.is_stale());
604 }
605
606 #[cfg(feature = "orchestration")]
608 #[test]
609 fn task_graph_snapshot_error_truncated_at_80_chars() {
610 use crate::orchestration::{TaskGraph, TaskNode, TaskResult, TaskStatus};
611
612 let mut graph = TaskGraph::new("goal");
613 let mut task = TaskNode::new(0, "t", "d");
614 task.status = TaskStatus::Failed;
615 task.result = Some(TaskResult {
616 output: "e".repeat(100),
617 artifacts: vec![],
618 duration_ms: 0,
619 agent_id: None,
620 agent_def: None,
621 });
622 graph.tasks.push(task);
623
624 let snap = TaskGraphSnapshot::from(&graph);
625 let err = snap.tasks[0].error.as_ref().unwrap();
626 assert!(err.ends_with('…'), "truncated error must end with ellipsis");
627 assert!(
628 err.len() <= 83,
629 "truncated error must not exceed 80 chars + ellipsis"
630 );
631 }
632
633 #[cfg(feature = "orchestration")]
635 #[test]
636 fn task_graph_snapshot_strips_control_chars_from_title() {
637 use crate::orchestration::{TaskGraph, TaskNode};
638
639 let mut graph = TaskGraph::new("goal\x1b[31m");
640 let task = TaskNode::new(0, "title\x00injected", "d");
641 graph.tasks.push(task);
642
643 let snap = TaskGraphSnapshot::from(&graph);
644 assert!(!snap.goal.contains('\x1b'), "goal must not contain escape");
645 assert!(
646 !snap.tasks[0].title.contains('\x00'),
647 "title must not contain null byte"
648 );
649 }
650
651 #[test]
652 fn graph_metrics_default_zero() {
653 let m = MetricsSnapshot::default();
654 assert_eq!(m.graph_entities_total, 0);
655 assert_eq!(m.graph_edges_total, 0);
656 assert_eq!(m.graph_communities_total, 0);
657 assert_eq!(m.graph_extraction_count, 0);
658 assert_eq!(m.graph_extraction_failures, 0);
659 }
660
661 #[test]
662 fn graph_metrics_update_via_collector() {
663 let (collector, rx) = MetricsCollector::new();
664 collector.update(|m| {
665 m.graph_entities_total = 5;
666 m.graph_edges_total = 10;
667 m.graph_communities_total = 2;
668 m.graph_extraction_count = 7;
669 m.graph_extraction_failures = 1;
670 });
671 let snapshot = rx.borrow().clone();
672 assert_eq!(snapshot.graph_entities_total, 5);
673 assert_eq!(snapshot.graph_edges_total, 10);
674 assert_eq!(snapshot.graph_communities_total, 2);
675 assert_eq!(snapshot.graph_extraction_count, 7);
676 assert_eq!(snapshot.graph_extraction_failures, 1);
677 }
678}