1use std::collections::VecDeque;
5
6use tokio::sync::watch;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SecurityEventCategory {
11 InjectionFlag,
12 ExfiltrationBlock,
13 Quarantine,
14 Truncation,
15 RateLimit,
16 MemoryValidation,
17}
18
19impl SecurityEventCategory {
20 #[must_use]
21 pub fn as_str(self) -> &'static str {
22 match self {
23 Self::InjectionFlag => "injection",
24 Self::ExfiltrationBlock => "exfil",
25 Self::Quarantine => "quarantine",
26 Self::Truncation => "truncation",
27 Self::RateLimit => "rate_limit",
28 Self::MemoryValidation => "memory_validation",
29 }
30 }
31}
32
33#[derive(Debug, Clone)]
35pub struct SecurityEvent {
36 pub timestamp: u64,
38 pub category: SecurityEventCategory,
39 pub source: String,
41 pub detail: String,
43}
44
45impl SecurityEvent {
46 #[must_use]
47 pub fn new(
48 category: SecurityEventCategory,
49 source: impl Into<String>,
50 detail: impl Into<String>,
51 ) -> Self {
52 let source: String = source
54 .into()
55 .chars()
56 .filter(|c| !c.is_ascii_control())
57 .take(64)
58 .collect();
59 let detail = detail.into();
61 let detail = if detail.len() > 128 {
62 let end = detail.floor_char_boundary(127);
63 format!("{}…", &detail[..end])
64 } else {
65 detail
66 };
67 Self {
68 timestamp: std::time::SystemTime::now()
69 .duration_since(std::time::UNIX_EPOCH)
70 .unwrap_or_default()
71 .as_secs(),
72 category,
73 source,
74 detail,
75 }
76 }
77}
78
79pub const SECURITY_EVENT_CAP: usize = 100;
81
82#[derive(Debug, Clone)]
86pub struct TaskSnapshotRow {
87 pub id: u32,
88 pub title: String,
89 pub status: String,
91 pub agent: Option<String>,
92 pub duration_ms: u64,
93 pub error: Option<String>,
95}
96
97#[derive(Debug, Clone, Default)]
99pub struct TaskGraphSnapshot {
100 pub graph_id: String,
101 pub goal: String,
102 pub status: String,
104 pub tasks: Vec<TaskSnapshotRow>,
105 pub completed_at: Option<std::time::Instant>,
106}
107
108impl TaskGraphSnapshot {
109 #[must_use]
112 pub fn is_stale(&self) -> bool {
113 self.completed_at
114 .is_some_and(|t| t.elapsed().as_secs() > 30)
115 }
116}
117
118#[derive(Debug, Clone, Default)]
122pub struct OrchestrationMetrics {
123 pub plans_total: u64,
124 pub tasks_total: u64,
125 pub tasks_completed: u64,
126 pub tasks_failed: u64,
127 pub tasks_skipped: u64,
128}
129
130#[derive(Debug, Clone, Default)]
132pub struct SkillConfidence {
133 pub name: String,
134 pub posterior: f64,
135 pub total_uses: u32,
136}
137
138#[derive(Debug, Clone, Default)]
140pub struct SubAgentMetrics {
141 pub id: String,
142 pub name: String,
143 pub state: String,
145 pub turns_used: u32,
146 pub max_turns: u32,
147 pub background: bool,
148 pub elapsed_secs: u64,
149 pub permission_mode: String,
152}
153
154#[derive(Debug, Clone, Default)]
155pub struct MetricsSnapshot {
156 pub prompt_tokens: u64,
157 pub completion_tokens: u64,
158 pub total_tokens: u64,
159 pub context_tokens: u64,
160 pub api_calls: u64,
161 pub active_skills: Vec<String>,
162 pub total_skills: usize,
163 pub mcp_server_count: usize,
164 pub mcp_tool_count: usize,
165 pub active_mcp_tools: Vec<String>,
166 pub sqlite_message_count: u64,
167 pub sqlite_conversation_id: Option<zeph_memory::ConversationId>,
168 pub qdrant_available: bool,
169 pub vector_backend: String,
170 pub embeddings_generated: u64,
171 pub last_llm_latency_ms: u64,
172 pub uptime_seconds: u64,
173 pub provider_name: String,
174 pub model_name: String,
175 pub summaries_count: u64,
176 pub context_compactions: u64,
177 pub compaction_hard_count: u64,
180 pub compaction_turns_after_hard: Vec<u64>,
184 pub compression_events: u64,
185 pub compression_tokens_saved: u64,
186 pub tool_output_prunes: u64,
187 pub cache_read_tokens: u64,
188 pub cache_creation_tokens: u64,
189 pub cost_spent_cents: f64,
190 pub filter_raw_tokens: u64,
191 pub filter_saved_tokens: u64,
192 pub filter_applications: u64,
193 pub filter_total_commands: u64,
194 pub filter_filtered_commands: u64,
195 pub filter_confidence_full: u64,
196 pub filter_confidence_partial: u64,
197 pub filter_confidence_fallback: u64,
198 pub cancellations: u64,
199 pub server_compaction_events: u64,
200 pub sanitizer_runs: u64,
201 pub sanitizer_injection_flags: u64,
202 pub sanitizer_truncations: u64,
203 pub quarantine_invocations: u64,
204 pub quarantine_failures: u64,
205 pub exfiltration_images_blocked: u64,
206 pub exfiltration_tool_urls_flagged: u64,
207 pub exfiltration_memory_guards: u64,
208 pub pii_scrub_count: u64,
209 pub memory_validation_failures: u64,
210 pub rate_limit_trips: u64,
211 pub sub_agents: Vec<SubAgentMetrics>,
212 pub skill_confidence: Vec<SkillConfidence>,
213 pub scheduled_tasks: Vec<[String; 4]>,
215 pub router_thompson_stats: Vec<(String, f64, f64)>,
217 pub security_events: VecDeque<SecurityEvent>,
219 pub orchestration: OrchestrationMetrics,
220 pub orchestration_graph: Option<TaskGraphSnapshot>,
222 pub graph_community_detection_failures: u64,
223 pub graph_entities_total: u64,
224 pub graph_edges_total: u64,
225 pub graph_communities_total: u64,
226 pub graph_extraction_count: u64,
227 pub graph_extraction_failures: u64,
228 pub extended_context: bool,
231}
232
233fn strip_ctrl(s: &str) -> String {
239 let mut out = String::with_capacity(s.len());
240 let mut chars = s.chars().peekable();
241 while let Some(c) = chars.next() {
242 if c == '\x1b' {
243 if chars.peek() == Some(&'[') {
245 chars.next(); for inner in chars.by_ref() {
247 if ('\x40'..='\x7e').contains(&inner) {
248 break;
249 }
250 }
251 }
252 } else if c.is_control() && c != '\t' && c != '\n' && c != '\r' {
254 } else {
256 out.push(c);
257 }
258 }
259 out
260}
261
262impl From<&crate::orchestration::TaskGraph> for TaskGraphSnapshot {
264 fn from(graph: &crate::orchestration::TaskGraph) -> Self {
265 let tasks = graph
266 .tasks
267 .iter()
268 .map(|t| {
269 let error = t
270 .result
271 .as_ref()
272 .filter(|_| t.status == crate::orchestration::TaskStatus::Failed)
273 .and_then(|r| {
274 if r.output.is_empty() {
275 None
276 } else {
277 let s = strip_ctrl(&r.output);
279 if s.len() > 80 {
280 let end = s.floor_char_boundary(79);
281 Some(format!("{}…", &s[..end]))
282 } else {
283 Some(s)
284 }
285 }
286 });
287 let duration_ms = t.result.as_ref().map_or(0, |r| r.duration_ms);
288 TaskSnapshotRow {
289 id: t.id.0,
290 title: strip_ctrl(&t.title),
291 status: t.status.to_string(),
292 agent: t.assigned_agent.as_deref().map(strip_ctrl),
293 duration_ms,
294 error,
295 }
296 })
297 .collect();
298 Self {
299 graph_id: graph.id.to_string(),
300 goal: strip_ctrl(&graph.goal),
301 status: graph.status.to_string(),
302 tasks,
303 completed_at: None,
304 }
305 }
306}
307
308pub struct MetricsCollector {
309 tx: watch::Sender<MetricsSnapshot>,
310}
311
312impl MetricsCollector {
313 #[must_use]
314 pub fn new() -> (Self, watch::Receiver<MetricsSnapshot>) {
315 let (tx, rx) = watch::channel(MetricsSnapshot::default());
316 (Self { tx }, rx)
317 }
318
319 pub fn update(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
320 self.tx.send_modify(f);
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 #![allow(clippy::field_reassign_with_default)]
327
328 use super::*;
329
330 #[test]
331 fn default_metrics_snapshot() {
332 let m = MetricsSnapshot::default();
333 assert_eq!(m.total_tokens, 0);
334 assert_eq!(m.api_calls, 0);
335 assert!(m.active_skills.is_empty());
336 assert!(m.active_mcp_tools.is_empty());
337 assert_eq!(m.mcp_tool_count, 0);
338 assert_eq!(m.mcp_server_count, 0);
339 assert!(m.provider_name.is_empty());
340 assert_eq!(m.summaries_count, 0);
341 }
342
343 #[test]
344 fn metrics_collector_update() {
345 let (collector, rx) = MetricsCollector::new();
346 collector.update(|m| {
347 m.api_calls = 5;
348 m.total_tokens = 1000;
349 });
350 let snapshot = rx.borrow().clone();
351 assert_eq!(snapshot.api_calls, 5);
352 assert_eq!(snapshot.total_tokens, 1000);
353 }
354
355 #[test]
356 fn metrics_collector_multiple_updates() {
357 let (collector, rx) = MetricsCollector::new();
358 collector.update(|m| m.api_calls = 1);
359 collector.update(|m| m.api_calls += 1);
360 assert_eq!(rx.borrow().api_calls, 2);
361 }
362
363 #[test]
364 fn metrics_snapshot_clone() {
365 let mut m = MetricsSnapshot::default();
366 m.provider_name = "ollama".into();
367 let cloned = m.clone();
368 assert_eq!(cloned.provider_name, "ollama");
369 }
370
371 #[test]
372 fn filter_metrics_tracking() {
373 let (collector, rx) = MetricsCollector::new();
374 collector.update(|m| {
375 m.filter_raw_tokens += 250;
376 m.filter_saved_tokens += 200;
377 m.filter_applications += 1;
378 });
379 collector.update(|m| {
380 m.filter_raw_tokens += 100;
381 m.filter_saved_tokens += 80;
382 m.filter_applications += 1;
383 });
384 let s = rx.borrow();
385 assert_eq!(s.filter_raw_tokens, 350);
386 assert_eq!(s.filter_saved_tokens, 280);
387 assert_eq!(s.filter_applications, 2);
388 }
389
390 #[test]
391 fn filter_confidence_and_command_metrics() {
392 let (collector, rx) = MetricsCollector::new();
393 collector.update(|m| {
394 m.filter_total_commands += 1;
395 m.filter_filtered_commands += 1;
396 m.filter_confidence_full += 1;
397 });
398 collector.update(|m| {
399 m.filter_total_commands += 1;
400 m.filter_confidence_partial += 1;
401 });
402 let s = rx.borrow();
403 assert_eq!(s.filter_total_commands, 2);
404 assert_eq!(s.filter_filtered_commands, 1);
405 assert_eq!(s.filter_confidence_full, 1);
406 assert_eq!(s.filter_confidence_partial, 1);
407 assert_eq!(s.filter_confidence_fallback, 0);
408 }
409
410 #[test]
411 fn summaries_count_tracks_summarizations() {
412 let (collector, rx) = MetricsCollector::new();
413 collector.update(|m| m.summaries_count += 1);
414 collector.update(|m| m.summaries_count += 1);
415 assert_eq!(rx.borrow().summaries_count, 2);
416 }
417
418 #[test]
419 fn cancellations_counter_increments() {
420 let (collector, rx) = MetricsCollector::new();
421 assert_eq!(rx.borrow().cancellations, 0);
422 collector.update(|m| m.cancellations += 1);
423 collector.update(|m| m.cancellations += 1);
424 assert_eq!(rx.borrow().cancellations, 2);
425 }
426
427 #[test]
428 fn security_event_detail_exact_128_not_truncated() {
429 let s = "a".repeat(128);
430 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s.clone());
431 assert_eq!(ev.detail, s, "128-char string must not be truncated");
432 }
433
434 #[test]
435 fn security_event_detail_129_is_truncated() {
436 let s = "a".repeat(129);
437 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
438 assert!(
439 ev.detail.ends_with('…'),
440 "129-char string must end with ellipsis"
441 );
442 assert!(
443 ev.detail.len() <= 130,
444 "truncated detail must be at most 130 bytes"
445 );
446 }
447
448 #[test]
449 fn security_event_detail_multibyte_utf8_no_panic() {
450 let s = "中".repeat(43);
452 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, "src", s);
453 assert!(ev.detail.ends_with('…'));
454 }
455
456 #[test]
457 fn security_event_source_capped_at_64_chars() {
458 let long_source = "x".repeat(200);
459 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, long_source, "detail");
460 assert_eq!(ev.source.len(), 64);
461 }
462
463 #[test]
464 fn security_event_source_strips_control_chars() {
465 let source = "tool\x00name\x1b[31m";
466 let ev = SecurityEvent::new(SecurityEventCategory::InjectionFlag, source, "detail");
467 assert!(!ev.source.contains('\x00'));
468 assert!(!ev.source.contains('\x1b'));
469 }
470
471 #[test]
472 fn security_event_category_as_str() {
473 assert_eq!(SecurityEventCategory::InjectionFlag.as_str(), "injection");
474 assert_eq!(SecurityEventCategory::ExfiltrationBlock.as_str(), "exfil");
475 assert_eq!(SecurityEventCategory::Quarantine.as_str(), "quarantine");
476 assert_eq!(SecurityEventCategory::Truncation.as_str(), "truncation");
477 }
478
479 #[test]
480 fn ring_buffer_respects_cap_via_update() {
481 let (collector, rx) = MetricsCollector::new();
482 for i in 0..110u64 {
483 let event = SecurityEvent::new(
484 SecurityEventCategory::InjectionFlag,
485 "src",
486 format!("event {i}"),
487 );
488 collector.update(|m| {
489 if m.security_events.len() >= SECURITY_EVENT_CAP {
490 m.security_events.pop_front();
491 }
492 m.security_events.push_back(event);
493 });
494 }
495 let snap = rx.borrow();
496 assert_eq!(snap.security_events.len(), SECURITY_EVENT_CAP);
497 assert!(snap.security_events.back().unwrap().detail.contains("109"));
499 }
500
501 #[test]
502 fn security_events_empty_by_default() {
503 let m = MetricsSnapshot::default();
504 assert!(m.security_events.is_empty());
505 }
506
507 #[test]
508 fn orchestration_metrics_default_zero() {
509 let m = OrchestrationMetrics::default();
510 assert_eq!(m.plans_total, 0);
511 assert_eq!(m.tasks_total, 0);
512 assert_eq!(m.tasks_completed, 0);
513 assert_eq!(m.tasks_failed, 0);
514 assert_eq!(m.tasks_skipped, 0);
515 }
516
517 #[test]
518 fn metrics_snapshot_includes_orchestration_default_zero() {
519 let m = MetricsSnapshot::default();
520 assert_eq!(m.orchestration.plans_total, 0);
521 assert_eq!(m.orchestration.tasks_total, 0);
522 assert_eq!(m.orchestration.tasks_completed, 0);
523 }
524
525 #[test]
526 fn orchestration_metrics_update_via_collector() {
527 let (collector, rx) = MetricsCollector::new();
528 collector.update(|m| {
529 m.orchestration.plans_total += 1;
530 m.orchestration.tasks_total += 5;
531 m.orchestration.tasks_completed += 3;
532 m.orchestration.tasks_failed += 1;
533 m.orchestration.tasks_skipped += 1;
534 });
535 let s = rx.borrow();
536 assert_eq!(s.orchestration.plans_total, 1);
537 assert_eq!(s.orchestration.tasks_total, 5);
538 assert_eq!(s.orchestration.tasks_completed, 3);
539 assert_eq!(s.orchestration.tasks_failed, 1);
540 assert_eq!(s.orchestration.tasks_skipped, 1);
541 }
542
543 #[test]
544 fn strip_ctrl_removes_escape_sequences() {
545 let input = "hello\x1b[31mworld\x00end";
546 let result = strip_ctrl(input);
547 assert_eq!(result, "helloworldend");
548 }
549
550 #[test]
551 fn strip_ctrl_allows_tab_lf_cr() {
552 let input = "a\tb\nc\rd";
553 let result = strip_ctrl(input);
554 assert_eq!(result, "a\tb\nc\rd");
555 }
556
557 #[test]
558 fn task_graph_snapshot_is_stale_after_30s() {
559 let mut snap = TaskGraphSnapshot::default();
560 assert!(!snap.is_stale());
562 snap.completed_at = Some(std::time::Instant::now());
564 assert!(!snap.is_stale());
565 snap.completed_at = Some(
567 std::time::Instant::now()
568 .checked_sub(std::time::Duration::from_secs(31))
569 .unwrap(),
570 );
571 assert!(snap.is_stale());
572 }
573
574 #[test]
576 fn task_graph_snapshot_from_task_graph_maps_fields() {
577 use crate::orchestration::{GraphStatus, TaskGraph, TaskNode, TaskResult, TaskStatus};
578
579 let mut graph = TaskGraph::new("My goal");
580 let mut task = TaskNode::new(0, "Do work", "description");
581 task.status = TaskStatus::Failed;
582 task.assigned_agent = Some("agent-1".into());
583 task.result = Some(TaskResult {
584 output: "error occurred here".into(),
585 artifacts: vec![],
586 duration_ms: 1234,
587 agent_id: None,
588 agent_def: None,
589 });
590 graph.tasks.push(task);
591 graph.status = GraphStatus::Failed;
592
593 let snap = TaskGraphSnapshot::from(&graph);
594 assert_eq!(snap.goal, "My goal");
595 assert_eq!(snap.status, "failed");
596 assert_eq!(snap.tasks.len(), 1);
597 let row = &snap.tasks[0];
598 assert_eq!(row.title, "Do work");
599 assert_eq!(row.status, "failed");
600 assert_eq!(row.agent.as_deref(), Some("agent-1"));
601 assert_eq!(row.duration_ms, 1234);
602 assert!(row.error.as_deref().unwrap().contains("error occurred"));
603 }
604
605 #[test]
607 fn task_graph_snapshot_from_compiles_with_feature() {
608 use crate::orchestration::TaskGraph;
609 let graph = TaskGraph::new("feature flag test");
610 let snap = TaskGraphSnapshot::from(&graph);
611 assert_eq!(snap.goal, "feature flag test");
612 assert!(snap.tasks.is_empty());
613 assert!(!snap.is_stale());
614 }
615
616 #[test]
618 fn task_graph_snapshot_error_truncated_at_80_chars() {
619 use crate::orchestration::{TaskGraph, TaskNode, TaskResult, TaskStatus};
620
621 let mut graph = TaskGraph::new("goal");
622 let mut task = TaskNode::new(0, "t", "d");
623 task.status = TaskStatus::Failed;
624 task.result = Some(TaskResult {
625 output: "e".repeat(100),
626 artifacts: vec![],
627 duration_ms: 0,
628 agent_id: None,
629 agent_def: None,
630 });
631 graph.tasks.push(task);
632
633 let snap = TaskGraphSnapshot::from(&graph);
634 let err = snap.tasks[0].error.as_ref().unwrap();
635 assert!(err.ends_with('…'), "truncated error must end with ellipsis");
636 assert!(
637 err.len() <= 83,
638 "truncated error must not exceed 80 chars + ellipsis"
639 );
640 }
641
642 #[test]
644 fn task_graph_snapshot_strips_control_chars_from_title() {
645 use crate::orchestration::{TaskGraph, TaskNode};
646
647 let mut graph = TaskGraph::new("goal\x1b[31m");
648 let task = TaskNode::new(0, "title\x00injected", "d");
649 graph.tasks.push(task);
650
651 let snap = TaskGraphSnapshot::from(&graph);
652 assert!(!snap.goal.contains('\x1b'), "goal must not contain escape");
653 assert!(
654 !snap.tasks[0].title.contains('\x00'),
655 "title must not contain null byte"
656 );
657 }
658
659 #[test]
660 fn graph_metrics_default_zero() {
661 let m = MetricsSnapshot::default();
662 assert_eq!(m.graph_entities_total, 0);
663 assert_eq!(m.graph_edges_total, 0);
664 assert_eq!(m.graph_communities_total, 0);
665 assert_eq!(m.graph_extraction_count, 0);
666 assert_eq!(m.graph_extraction_failures, 0);
667 }
668
669 #[test]
670 fn graph_metrics_update_via_collector() {
671 let (collector, rx) = MetricsCollector::new();
672 collector.update(|m| {
673 m.graph_entities_total = 5;
674 m.graph_edges_total = 10;
675 m.graph_communities_total = 2;
676 m.graph_extraction_count = 7;
677 m.graph_extraction_failures = 1;
678 });
679 let snapshot = rx.borrow().clone();
680 assert_eq!(snapshot.graph_entities_total, 5);
681 assert_eq!(snapshot.graph_edges_total, 10);
682 assert_eq!(snapshot.graph_communities_total, 2);
683 assert_eq!(snapshot.graph_extraction_count, 7);
684 assert_eq!(snapshot.graph_extraction_failures, 1);
685 }
686}