1use zeph_llm::provider::{LlmProvider, Message, MessagePart, Role};
5
6use super::{Agent, CODE_CONTEXT_PREFIX};
7use crate::channel::Channel;
8use crate::metrics::{MetricsSnapshot, SECURITY_EVENT_CAP, SecurityEvent, SecurityEventCategory};
9use zeph_tools::FilterStats;
10
11impl<C: Channel> Agent<C> {
12 pub fn sync_community_detection_failures(&self) {
14 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
15 let failures = memory.community_detection_failures();
16 self.update_metrics(|m| {
17 m.graph_community_detection_failures = failures;
18 });
19 }
20 }
21
22 pub fn sync_graph_extraction_metrics(&self) {
24 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
25 let count = memory.graph_extraction_count();
26 let failures = memory.graph_extraction_failures();
27 self.update_metrics(|m| {
28 m.graph_extraction_count = count;
29 m.graph_extraction_failures = failures;
30 });
31 }
32 }
33
34 pub async fn sync_graph_counts(&self) {
36 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
37 return;
38 };
39 let Some(store) = memory.graph_store.as_ref() else {
40 return;
41 };
42 let (entities, edges, communities) = tokio::join!(
43 store.entity_count(),
44 store.active_edge_count(),
45 store.community_count()
46 );
47 self.update_metrics(|m| {
48 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
49 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
50 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
51 });
52 }
53
54 pub async fn check_vector_store_health(&self, backend_name: &str) {
56 let connected = match self.memory_state.persistence.memory.as_ref() {
57 Some(m) => m.is_vector_store_connected().await,
58 None => false,
59 };
60 let name = backend_name.to_owned();
61 self.update_metrics(|m| {
62 m.qdrant_available = connected;
63 m.vector_backend = name;
64 });
65 }
66
67 pub async fn sync_guidelines_status(&self) {
72 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
73 return;
74 };
75 let cid = self.memory_state.persistence.conversation_id;
76 match memory.sqlite().load_compression_guidelines_meta(cid).await {
77 Ok((version, created_at)) => {
78 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
79 let version_u32 = u32::try_from(version).unwrap_or(0);
80 self.update_metrics(|m| {
81 m.guidelines_version = version_u32;
82 m.guidelines_updated_at = created_at;
83 });
84 }
85 Err(e) => {
86 tracing::warn!("failed to sync guidelines status: {e:#}");
87 }
88 }
89 }
90
91 pub(super) fn record_filter_metrics(&mut self, fs: &FilterStats) {
92 let saved = fs.estimated_tokens_saved() as u64;
93 let raw = (fs.raw_chars / 4) as u64;
94 let confidence = fs.confidence;
95 let was_filtered = fs.filtered_chars < fs.raw_chars;
96 self.update_metrics(|m| {
97 m.filter_raw_tokens += raw;
98 m.filter_saved_tokens += saved;
99 m.filter_applications += 1;
100 m.filter_total_commands += 1;
101 if was_filtered {
102 m.filter_filtered_commands += 1;
103 }
104 if let Some(c) = confidence {
105 match c {
106 zeph_tools::FilterConfidence::Full => {
107 m.filter_confidence_full += 1;
108 }
109 zeph_tools::FilterConfidence::Partial => {
110 m.filter_confidence_partial += 1;
111 }
112 zeph_tools::FilterConfidence::Fallback => {
113 m.filter_confidence_fallback += 1;
114 }
115 }
116 }
117 });
118 }
119
120 pub(super) fn update_metrics(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
121 if let Some(ref tx) = self.metrics.metrics_tx {
122 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
123 tx.send_modify(|m| {
124 m.uptime_seconds = elapsed;
125 f(m);
126 });
127 }
128 }
129
130 pub(super) fn flush_turn_timings(&mut self) {
135 let timings = std::mem::take(&mut self.metrics.pending_timings);
136 tracing::debug!(
137 prepare_context_ms = timings.prepare_context_ms,
138 llm_chat_ms = timings.llm_chat_ms,
139 tool_exec_ms = timings.tool_exec_ms,
140 persist_message_ms = timings.persist_message_ms,
141 "turn timings"
142 );
143
144 if self.metrics.timing_window.len() >= 10 {
145 self.metrics.timing_window.pop_front();
146 }
147 self.metrics.timing_window.push_back(timings.clone());
148
149 let count = self.metrics.timing_window.len();
150 let mut avg = crate::metrics::TurnTimings::default();
151 let mut max = crate::metrics::TurnTimings::default();
152 for t in &self.metrics.timing_window {
153 avg.prepare_context_ms = avg.prepare_context_ms.saturating_add(t.prepare_context_ms);
154 avg.llm_chat_ms = avg.llm_chat_ms.saturating_add(t.llm_chat_ms);
155 avg.tool_exec_ms = avg.tool_exec_ms.saturating_add(t.tool_exec_ms);
156 avg.persist_message_ms = avg.persist_message_ms.saturating_add(t.persist_message_ms);
157
158 max.prepare_context_ms = max.prepare_context_ms.max(t.prepare_context_ms);
159 max.llm_chat_ms = max.llm_chat_ms.max(t.llm_chat_ms);
160 max.tool_exec_ms = max.tool_exec_ms.max(t.tool_exec_ms);
161 max.persist_message_ms = max.persist_message_ms.max(t.persist_message_ms);
162 }
163 let n = count as u64;
164 avg.prepare_context_ms /= n;
165 avg.llm_chat_ms /= n;
166 avg.tool_exec_ms /= n;
167 avg.persist_message_ms /= n;
168
169 let total_ms = timings
170 .prepare_context_ms
171 .saturating_add(timings.llm_chat_ms)
172 .saturating_add(timings.tool_exec_ms)
173 .saturating_add(timings.persist_message_ms);
174
175 self.update_metrics(|m| {
176 m.last_turn_timings = timings;
177 m.avg_turn_timings = avg;
178 m.max_turn_timings = max;
179 m.timing_sample_count = n;
180 });
181
182 if let Some(ref recorder) = self.metrics.histogram_recorder {
183 recorder.observe_turn_duration(std::time::Duration::from_millis(total_ms));
184 }
185 }
186
187 pub(super) fn push_classifier_metrics(&self) {
192 if let Some(ref m) = self.metrics.classifier_metrics {
193 let snapshot = m.snapshot();
194 self.update_metrics(|ms| ms.classifier = snapshot);
195 }
196 }
197
198 pub(super) fn push_security_event(
199 &self,
200 category: SecurityEventCategory,
201 source: &str,
202 detail: impl Into<String>,
203 ) {
204 if let Some(ref tx) = self.metrics.metrics_tx {
205 let event = SecurityEvent::new(category, source, detail);
206 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
207 tx.send_modify(|m| {
208 m.uptime_seconds = elapsed;
209 if m.security_events.len() >= SECURITY_EVENT_CAP {
210 m.security_events.pop_front();
211 }
212 m.security_events.push_back(event);
213 });
214 }
215 }
216
217 pub(super) fn recompute_prompt_tokens(&mut self) {
218 self.providers.cached_prompt_tokens = self
219 .msg
220 .messages
221 .iter()
222 .map(|m| self.metrics.token_counter.count_message_tokens(m) as u64)
223 .sum();
224 }
225
226 pub(super) fn push_message(&mut self, msg: Message) {
227 self.providers.cached_prompt_tokens +=
228 self.metrics.token_counter.count_message_tokens(&msg) as u64;
229 if msg.role == zeph_llm::provider::Role::Assistant {
230 self.session.last_assistant_at = Some(std::time::Instant::now());
231 }
232 self.msg.messages.push(msg);
233 self.detect_magic_docs_in_messages();
235 }
236
237 pub(crate) fn record_cost_and_cache(&self, input_tokens: u64, output_tokens: u64) {
238 let (cache_write, cache_read) = self.provider.last_cache_usage().unwrap_or((0, 0));
239
240 if let Some(ref tracker) = self.metrics.cost_tracker {
241 let provider_name = if self.runtime.active_provider_name.is_empty() {
242 self.provider.name()
243 } else {
244 self.runtime.active_provider_name.as_str()
245 };
246 tracker.record_usage(
247 provider_name,
248 &self.runtime.model_name,
249 input_tokens,
250 cache_read,
251 cache_write,
252 output_tokens,
253 );
254 let breakdown = tracker.provider_breakdown();
255 self.update_metrics(|m| {
256 m.cost_spent_cents = tracker.current_spend();
257 m.cache_creation_tokens += cache_write;
258 m.cache_read_tokens += cache_read;
259 m.provider_cost_breakdown = breakdown;
260 });
261 } else if cache_write > 0 || cache_read > 0 {
262 self.update_metrics(|m| {
263 m.cache_creation_tokens += cache_write;
264 m.cache_read_tokens += cache_read;
265 });
266 }
267 }
268
269 pub fn inject_code_context(&mut self, text: &str) {
272 self.remove_code_context_messages();
273 if text.is_empty() || self.msg.messages.len() <= 1 {
274 return;
275 }
276 let content = format!("{CODE_CONTEXT_PREFIX}{text}");
277 self.msg.messages.insert(
278 1,
279 Message::from_parts(
280 Role::System,
281 vec![MessagePart::CodeContext { text: content }],
282 ),
283 );
284 }
285
286 #[must_use]
287 pub fn context_messages(&self) -> &[Message] {
288 &self.msg.messages
289 }
290
291 pub(super) fn truncate_old_tool_results(&mut self) {
301 const LIMIT: usize = 2048;
302 const SUFFIX: &str = "…[truncated]";
303
304 let len = self.msg.messages.len();
305 if len <= 2 {
306 return;
307 }
308 for msg in &mut self.msg.messages[..len - 2] {
309 for part in &mut msg.parts {
310 match part {
311 MessagePart::ToolResult { content, .. } if content.len() > LIMIT => {
312 content.truncate(content.floor_char_boundary(LIMIT));
313 content.push_str(SUFFIX);
314 }
315 MessagePart::ToolOutput { body, .. } if body.len() > LIMIT => {
316 body.truncate(body.floor_char_boundary(LIMIT));
317 body.push_str(SUFFIX);
318 }
319 _ => {}
320 }
321 }
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::super::agent_tests::{
329 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
330 };
331 use super::*;
332 use zeph_llm::provider::{MessageMetadata, MessagePart};
333
334 #[test]
335 fn push_message_increments_cached_tokens() {
336 let provider = mock_provider(vec![]);
337 let channel = MockChannel::new(vec![]);
338 let registry = create_test_registry();
339 let executor = MockToolExecutor::no_tools();
340 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
341
342 let before = agent.providers.cached_prompt_tokens;
343 let msg = Message {
344 role: Role::User,
345 content: "hello world!!".to_string(),
346 parts: vec![],
347 metadata: MessageMetadata::default(),
348 };
349 let expected_delta = agent.metrics.token_counter.count_message_tokens(&msg) as u64;
350 agent.push_message(msg);
351 assert_eq!(
352 agent.providers.cached_prompt_tokens,
353 before + expected_delta
354 );
355 }
356
357 #[test]
358 fn recompute_prompt_tokens_matches_sum() {
359 let provider = mock_provider(vec![]);
360 let channel = MockChannel::new(vec![]);
361 let registry = create_test_registry();
362 let executor = MockToolExecutor::no_tools();
363 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
364
365 agent.msg.messages.push(Message {
366 role: Role::User,
367 content: "1234".to_string(),
368 parts: vec![],
369 metadata: MessageMetadata::default(),
370 });
371 agent.msg.messages.push(Message {
372 role: Role::Assistant,
373 content: "5678".to_string(),
374 parts: vec![],
375 metadata: MessageMetadata::default(),
376 });
377
378 agent.recompute_prompt_tokens();
379
380 let expected: u64 = agent
381 .msg
382 .messages
383 .iter()
384 .map(|m| agent.metrics.token_counter.count_message_tokens(m) as u64)
385 .sum();
386 assert_eq!(agent.providers.cached_prompt_tokens, expected);
387 }
388
389 #[test]
390 fn inject_code_context_into_messages_with_existing_content() {
391 let provider = mock_provider(vec![]);
392 let channel = MockChannel::new(vec![]);
393 let registry = create_test_registry();
394 let executor = MockToolExecutor::no_tools();
395 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
396
397 agent.push_message(Message {
399 role: Role::User,
400 content: "question".to_string(),
401 parts: vec![],
402 metadata: MessageMetadata::default(),
403 });
404
405 agent.inject_code_context("some code here");
406
407 let found = agent.msg.messages.iter().any(|m| {
408 m.parts.iter().any(|p| {
409 matches!(p, MessagePart::CodeContext { text } if text.contains("some code here"))
410 })
411 });
412 assert!(found, "code context should be injected into messages");
413 }
414
415 #[test]
416 fn inject_code_context_empty_text_is_noop() {
417 let provider = mock_provider(vec![]);
418 let channel = MockChannel::new(vec![]);
419 let registry = create_test_registry();
420 let executor = MockToolExecutor::no_tools();
421 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
422
423 agent.push_message(Message {
424 role: Role::User,
425 content: "question".to_string(),
426 parts: vec![],
427 metadata: MessageMetadata::default(),
428 });
429 let count_before = agent.msg.messages.len();
430
431 agent.inject_code_context("");
432
433 assert_eq!(agent.msg.messages.len(), count_before);
435 }
436
437 #[test]
438 fn inject_code_context_with_single_message_is_noop() {
439 let provider = mock_provider(vec![]);
440 let channel = MockChannel::new(vec![]);
441 let registry = create_test_registry();
442 let executor = MockToolExecutor::no_tools();
443 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
444 let count_before = agent.msg.messages.len();
446
447 agent.inject_code_context("some code");
448
449 assert_eq!(agent.msg.messages.len(), count_before);
450 }
451
452 #[test]
453 fn context_messages_returns_all_messages() {
454 let provider = mock_provider(vec![]);
455 let channel = MockChannel::new(vec![]);
456 let registry = create_test_registry();
457 let executor = MockToolExecutor::no_tools();
458 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
459
460 agent.push_message(Message {
461 role: Role::User,
462 content: "test".to_string(),
463 parts: vec![],
464 metadata: MessageMetadata::default(),
465 });
466
467 assert_eq!(agent.context_messages().len(), agent.msg.messages.len());
468 }
469
470 #[test]
471 fn truncate_old_tool_results_truncates_stale_content() {
472 let provider = mock_provider(vec![]);
473 let channel = MockChannel::new(vec![]);
474 let registry = create_test_registry();
475 let executor = MockToolExecutor::no_tools();
476 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
477
478 let big_content = "x".repeat(4096);
479
480 agent.msg.messages.push(Message {
482 role: Role::User,
483 content: String::new(),
484 parts: vec![MessagePart::ToolResult {
485 tool_use_id: "id1".to_string(),
486 content: big_content.clone(),
487 is_error: false,
488 }],
489 metadata: MessageMetadata::default(),
490 });
491 agent.msg.messages.push(Message {
493 role: Role::User,
494 content: String::new(),
495 parts: vec![MessagePart::ToolOutput {
496 tool_name: "shell".into(),
497 body: big_content.clone(),
498 compacted_at: None,
499 }],
500 metadata: MessageMetadata::default(),
501 });
502 agent.msg.messages.push(Message {
504 role: Role::Assistant,
505 content: "reply".to_string(),
506 parts: vec![MessagePart::ToolResult {
507 tool_use_id: "id3".to_string(),
508 content: big_content.clone(),
509 is_error: false,
510 }],
511 metadata: MessageMetadata::default(),
512 });
513 agent.msg.messages.push(Message {
515 role: Role::User,
516 content: "last".to_string(),
517 parts: vec![MessagePart::ToolResult {
518 tool_use_id: "id4".to_string(),
519 content: big_content.clone(),
520 is_error: false,
521 }],
522 metadata: MessageMetadata::default(),
523 });
524
525 let base = agent.msg.messages.len() - 4;
527
528 agent.truncate_old_tool_results();
529
530 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base].parts[0] {
532 assert!(
533 content.ends_with("…[truncated]"),
534 "msg[base] should be truncated"
535 );
536 assert!(content.len() <= 2048 + 16);
537 } else {
538 panic!("expected ToolResult at msg[base]");
539 }
540
541 if let MessagePart::ToolOutput { body, .. } = &agent.msg.messages[base + 1].parts[0] {
543 assert!(
544 body.ends_with("…[truncated]"),
545 "msg[base+1] should be truncated"
546 );
547 } else {
548 panic!("expected ToolOutput at msg[base+1]");
549 }
550
551 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 2].parts[0] {
553 assert_eq!(content.len(), 4096, "msg[base+2] should NOT be truncated");
554 } else {
555 panic!("expected ToolResult at msg[base+2]");
556 }
557 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 3].parts[0] {
558 assert_eq!(content.len(), 4096, "msg[base+3] should NOT be truncated");
559 } else {
560 panic!("expected ToolResult at msg[base+3]");
561 }
562 }
563
564 #[test]
565 fn truncate_old_tool_results_noop_when_few_messages() {
566 let provider = mock_provider(vec![]);
567 let channel = MockChannel::new(vec![]);
568 let registry = create_test_registry();
569 let executor = MockToolExecutor::no_tools();
570 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
571
572 let big = "y".repeat(4096);
573 agent.msg.messages.push(Message {
574 role: Role::User,
575 content: String::new(),
576 parts: vec![MessagePart::ToolResult {
577 tool_use_id: "id".to_string(),
578 content: big.clone(),
579 is_error: false,
580 }],
581 metadata: MessageMetadata::default(),
582 });
583 agent.msg.messages.push(Message {
584 role: Role::Assistant,
585 content: "ok".to_string(),
586 parts: vec![MessagePart::ToolResult {
587 tool_use_id: "id2".to_string(),
588 content: big.clone(),
589 is_error: false,
590 }],
591 metadata: MessageMetadata::default(),
592 });
593
594 let len_before = agent.msg.messages.len();
596 agent.truncate_old_tool_results();
597
598 assert_eq!(agent.msg.messages.len(), len_before);
600 if let MessagePart::ToolResult { content, .. } =
601 &agent.msg.messages[len_before - 2].parts[0]
602 {
603 assert_eq!(
604 content.len(),
605 4096,
606 "second-to-last should not be truncated"
607 );
608 } else {
609 panic!("expected ToolResult");
610 }
611 if let MessagePart::ToolResult { content, .. } =
612 &agent.msg.messages[len_before - 1].parts[0]
613 {
614 assert_eq!(content.len(), 4096, "last should not be truncated");
615 } else {
616 panic!("expected ToolResult");
617 }
618 }
619
620 fn make_timings(ctx: u64, llm: u64, tool: u64, persist: u64) -> crate::metrics::TurnTimings {
621 crate::metrics::TurnTimings {
622 prepare_context_ms: ctx,
623 llm_chat_ms: llm,
624 tool_exec_ms: tool,
625 persist_message_ms: persist,
626 }
627 }
628
629 fn agent_with_metrics_watch() -> (
630 Agent<MockChannel>,
631 tokio::sync::watch::Receiver<crate::metrics::MetricsSnapshot>,
632 ) {
633 let provider = mock_provider(vec![]);
634 let channel = MockChannel::new(vec![]);
635 let registry = create_test_registry();
636 let executor = MockToolExecutor::no_tools();
637 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
638
639 let (tx, rx) = tokio::sync::watch::channel(crate::metrics::MetricsSnapshot::default());
640 agent.metrics.metrics_tx = Some(tx);
641 (agent, rx)
642 }
643
644 #[test]
646 fn flush_turn_timings_single_flush() {
647 let (mut agent, rx) = agent_with_metrics_watch();
648
649 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
650 agent.flush_turn_timings();
651
652 let snap = rx.borrow();
653 assert_eq!(snap.last_turn_timings.prepare_context_ms, 10);
654 assert_eq!(snap.last_turn_timings.llm_chat_ms, 200);
655 assert_eq!(snap.last_turn_timings.tool_exec_ms, 50);
656 assert_eq!(snap.last_turn_timings.persist_message_ms, 5);
657 assert_eq!(snap.timing_sample_count, 1);
658 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 200);
660 }
661
662 #[test]
664 fn flush_turn_timings_resets_pending() {
665 let provider = mock_provider(vec![]);
666 let channel = MockChannel::new(vec![]);
667 let registry = create_test_registry();
668 let executor = MockToolExecutor::no_tools();
669 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
670
671 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
672 agent.flush_turn_timings();
673
674 let p = &agent.metrics.pending_timings;
675 assert_eq!(p.prepare_context_ms, 0);
676 assert_eq!(p.llm_chat_ms, 0);
677 assert_eq!(p.tool_exec_ms, 0);
678 assert_eq!(p.persist_message_ms, 0);
679 }
680
681 #[test]
683 fn flush_turn_timings_window_capped_at_10() {
684 let (mut agent, rx) = agent_with_metrics_watch();
685
686 for i in 1_u64..=12 {
688 agent.metrics.pending_timings = make_timings(0, i * 10, 0, 0);
689 agent.flush_turn_timings();
690 }
691
692 let snap = rx.borrow();
693 assert_eq!(snap.timing_sample_count, 10);
695 assert_eq!(snap.max_turn_timings.llm_chat_ms, 120);
697 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 75);
699 }
700}