1use zeph_llm::provider::{LlmProvider, Message, MessagePart, Role};
5
6use super::{Agent, CODE_CONTEXT_PREFIX};
7use crate::channel::Channel;
8use crate::metrics::{MetricsSnapshot, SECURITY_EVENT_CAP, SecurityEvent, SecurityEventCategory};
9use zeph_tools::FilterStats;
10
11impl<C: Channel> Agent<C> {
12 pub fn sync_community_detection_failures(&self) {
14 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
15 let failures = memory.community_detection_failures();
16 self.update_metrics(|m| {
17 m.graph_community_detection_failures = failures;
18 });
19 }
20 }
21
22 pub fn sync_graph_extraction_metrics(&self) {
24 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
25 let count = memory.graph_extraction_count();
26 let failures = memory.graph_extraction_failures();
27 self.update_metrics(|m| {
28 m.graph_extraction_count = count;
29 m.graph_extraction_failures = failures;
30 });
31 }
32 }
33
34 pub async fn sync_graph_counts(&self) {
36 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
37 return;
38 };
39 let Some(store) = memory.graph_store.as_ref() else {
40 return;
41 };
42 let (entities, edges, communities) = tokio::join!(
43 store.entity_count(),
44 store.active_edge_count(),
45 store.community_count()
46 );
47 self.update_metrics(|m| {
48 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
49 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
50 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
51 });
52 }
53
54 pub async fn check_vector_store_health(&self, backend_name: &str) {
56 let connected = match self.memory_state.persistence.memory.as_ref() {
57 Some(m) => m.is_vector_store_connected().await,
58 None => false,
59 };
60 let name = backend_name.to_owned();
61 self.update_metrics(|m| {
62 m.qdrant_available = connected;
63 m.vector_backend = name;
64 });
65 }
66
67 pub async fn sync_guidelines_status(&self) {
72 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
73 return;
74 };
75 let cid = self.memory_state.persistence.conversation_id;
76 match memory.sqlite().load_compression_guidelines_meta(cid).await {
77 Ok((version, created_at)) => {
78 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
79 let version_u32 = u32::try_from(version).unwrap_or(0);
80 self.update_metrics(|m| {
81 m.guidelines_version = version_u32;
82 m.guidelines_updated_at = created_at;
83 });
84 }
85 Err(e) => {
86 tracing::warn!("failed to sync guidelines status: {e:#}");
87 }
88 }
89 }
90
91 pub(super) fn record_filter_metrics(&mut self, fs: &FilterStats) {
92 let saved = fs.estimated_tokens_saved() as u64;
93 let raw = (fs.raw_chars / 4) as u64;
94 let confidence = fs.confidence;
95 let was_filtered = fs.filtered_chars < fs.raw_chars;
96 self.update_metrics(|m| {
97 m.filter_raw_tokens += raw;
98 m.filter_saved_tokens += saved;
99 m.filter_applications += 1;
100 m.filter_total_commands += 1;
101 if was_filtered {
102 m.filter_filtered_commands += 1;
103 }
104 if let Some(c) = confidence {
105 match c {
106 zeph_tools::FilterConfidence::Full => {
107 m.filter_confidence_full += 1;
108 }
109 zeph_tools::FilterConfidence::Partial => {
110 m.filter_confidence_partial += 1;
111 }
112 zeph_tools::FilterConfidence::Fallback => {
113 m.filter_confidence_fallback += 1;
114 }
115 }
116 }
117 });
118 }
119
120 pub(super) fn update_metrics(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
121 if let Some(ref tx) = self.metrics.metrics_tx {
122 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
123 tx.send_modify(|m| {
124 m.uptime_seconds = elapsed;
125 f(m);
126 });
127 }
128 }
129
130 pub(crate) fn publish_context_budget(&self) {
137 let max_tokens = self
138 .context_manager
139 .budget
140 .as_ref()
141 .map_or(0, |b| b.max_tokens() as u64);
142 self.update_metrics(|m| m.context_max_tokens = max_tokens);
143 }
144
145 pub(super) fn flush_turn_timings(&mut self) {
150 let timings = std::mem::take(&mut self.metrics.pending_timings);
151 tracing::debug!(
152 prepare_context_ms = timings.prepare_context_ms,
153 llm_chat_ms = timings.llm_chat_ms,
154 tool_exec_ms = timings.tool_exec_ms,
155 persist_message_ms = timings.persist_message_ms,
156 "turn timings"
157 );
158
159 if self.metrics.timing_window.len() >= 10 {
160 self.metrics.timing_window.pop_front();
161 }
162 self.metrics.timing_window.push_back(timings.clone());
163
164 let count = self.metrics.timing_window.len();
165 let mut avg = crate::metrics::TurnTimings::default();
166 let mut max = crate::metrics::TurnTimings::default();
167 for t in &self.metrics.timing_window {
168 avg.prepare_context_ms = avg.prepare_context_ms.saturating_add(t.prepare_context_ms);
169 avg.llm_chat_ms = avg.llm_chat_ms.saturating_add(t.llm_chat_ms);
170 avg.tool_exec_ms = avg.tool_exec_ms.saturating_add(t.tool_exec_ms);
171 avg.persist_message_ms = avg.persist_message_ms.saturating_add(t.persist_message_ms);
172
173 max.prepare_context_ms = max.prepare_context_ms.max(t.prepare_context_ms);
174 max.llm_chat_ms = max.llm_chat_ms.max(t.llm_chat_ms);
175 max.tool_exec_ms = max.tool_exec_ms.max(t.tool_exec_ms);
176 max.persist_message_ms = max.persist_message_ms.max(t.persist_message_ms);
177 }
178 let n = count as u64;
179 avg.prepare_context_ms /= n;
180 avg.llm_chat_ms /= n;
181 avg.tool_exec_ms /= n;
182 avg.persist_message_ms /= n;
183
184 let total_ms = timings
185 .prepare_context_ms
186 .saturating_add(timings.llm_chat_ms)
187 .saturating_add(timings.tool_exec_ms)
188 .saturating_add(timings.persist_message_ms);
189
190 self.update_metrics(|m| {
191 m.last_turn_timings = timings;
192 m.avg_turn_timings = avg;
193 m.max_turn_timings = max;
194 m.timing_sample_count = n;
195 });
196
197 if let Some(ref recorder) = self.metrics.histogram_recorder {
198 recorder.observe_turn_duration(std::time::Duration::from_millis(total_ms));
199 }
200 }
201
202 pub(super) fn push_classifier_metrics(&self) {
207 if let Some(ref m) = self.metrics.classifier_metrics {
208 let snapshot = m.snapshot();
209 self.update_metrics(|ms| ms.classifier = snapshot);
210 }
211 }
212
213 pub(super) fn push_security_event(
214 &self,
215 category: SecurityEventCategory,
216 source: &str,
217 detail: impl Into<String>,
218 ) {
219 if let Some(ref tx) = self.metrics.metrics_tx {
220 let event = SecurityEvent::new(category, source, detail);
221 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
222 tx.send_modify(|m| {
223 m.uptime_seconds = elapsed;
224 if m.security_events.len() >= SECURITY_EVENT_CAP {
225 m.security_events.pop_front();
226 }
227 m.security_events.push_back(event);
228 });
229 }
230 }
231
232 pub(super) fn recompute_prompt_tokens(&mut self) {
233 self.providers.cached_prompt_tokens = self
234 .msg
235 .messages
236 .iter()
237 .map(|m| self.metrics.token_counter.count_message_tokens(m) as u64)
238 .sum();
239 }
240
241 pub(super) fn push_message(&mut self, msg: Message) {
242 self.providers.cached_prompt_tokens +=
243 self.metrics.token_counter.count_message_tokens(&msg) as u64;
244 if msg.role == zeph_llm::provider::Role::Assistant {
245 self.session.last_assistant_at = Some(std::time::Instant::now());
246 }
247 self.msg.messages.push(msg);
248 self.detect_magic_docs_in_messages();
250 }
251
252 pub(crate) fn record_cost_and_cache(&self, input_tokens: u64, output_tokens: u64) {
253 let (cache_write, cache_read) = self.provider.last_cache_usage().unwrap_or((0, 0));
254
255 if let Some(ref tracker) = self.metrics.cost_tracker {
256 let provider_name = if self.runtime.active_provider_name.is_empty() {
257 self.provider.name()
258 } else {
259 self.runtime.active_provider_name.as_str()
260 };
261 tracker.record_usage(
262 provider_name,
263 self.provider.provider_kind_str(),
264 &self.runtime.model_name,
265 input_tokens,
266 cache_read,
267 cache_write,
268 output_tokens,
269 );
270 let breakdown = tracker.provider_breakdown();
271 self.update_metrics(|m| {
272 m.cost_spent_cents = tracker.current_spend();
273 m.cache_creation_tokens += cache_write;
274 m.cache_read_tokens += cache_read;
275 m.provider_cost_breakdown = breakdown;
276 });
277 } else if cache_write > 0 || cache_read > 0 {
278 self.update_metrics(|m| {
279 m.cache_creation_tokens += cache_write;
280 m.cache_read_tokens += cache_read;
281 });
282 }
283 }
284
285 pub(crate) fn record_successful_task(&self) {
286 if let Some(ref tracker) = self.metrics.cost_tracker {
287 tracker.record_successful_task();
288 self.update_metrics(|m| {
289 m.cost_cps_cents = tracker.cps();
290 m.cost_successful_tasks = tracker.successful_tasks();
291 });
292 }
293 }
294
295 pub(super) fn last_assistant_preview(&self, max_chars: usize) -> String {
303 let raw = self
304 .msg
305 .messages
306 .iter()
307 .rev()
308 .find(|m| m.role == Role::Assistant)
309 .map_or("", |m| m.content.as_str());
310
311 if raw.is_empty() {
312 return String::new();
313 }
314
315 let truncated: &str = if raw.chars().count() > max_chars {
317 let end = raw
318 .char_indices()
319 .nth(max_chars)
320 .map_or(raw.len(), |(i, _)| i);
321 &raw[..end]
322 } else {
323 raw
324 };
325
326 crate::redact::scrub_content(truncated).into_owned()
327 }
328
329 pub fn inject_code_context(&mut self, text: &str) {
332 self.remove_code_context_messages();
333 if text.is_empty() || self.msg.messages.len() <= 1 {
334 return;
335 }
336 let content = format!("{CODE_CONTEXT_PREFIX}{text}");
337 self.msg.messages.insert(
338 1,
339 Message::from_parts(
340 Role::System,
341 vec![MessagePart::CodeContext { text: content }],
342 ),
343 );
344 }
345
346 #[must_use]
347 pub fn context_messages(&self) -> &[Message] {
348 &self.msg.messages
349 }
350
351 pub(super) fn truncate_old_tool_results(&mut self) {
361 const LIMIT: usize = 2048;
362 const SUFFIX: &str = "…[truncated]";
363
364 let len = self.msg.messages.len();
365 if len <= 2 {
366 return;
367 }
368 for msg in &mut self.msg.messages[..len - 2] {
369 for part in &mut msg.parts {
370 match part {
371 MessagePart::ToolResult { content, .. } if content.len() > LIMIT => {
372 content.truncate(content.floor_char_boundary(LIMIT));
373 content.push_str(SUFFIX);
374 }
375 MessagePart::ToolOutput { body, .. } if body.len() > LIMIT => {
376 body.truncate(body.floor_char_boundary(LIMIT));
377 body.push_str(SUFFIX);
378 }
379 _ => {}
380 }
381 }
382 }
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::super::agent_tests::{
389 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
390 };
391 use super::*;
392 use zeph_llm::provider::{MessageMetadata, MessagePart};
393
394 #[test]
395 fn push_message_increments_cached_tokens() {
396 let provider = mock_provider(vec![]);
397 let channel = MockChannel::new(vec![]);
398 let registry = create_test_registry();
399 let executor = MockToolExecutor::no_tools();
400 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
401
402 let before = agent.providers.cached_prompt_tokens;
403 let msg = Message {
404 role: Role::User,
405 content: "hello world!!".to_string(),
406 parts: vec![],
407 metadata: MessageMetadata::default(),
408 };
409 let expected_delta = agent.metrics.token_counter.count_message_tokens(&msg) as u64;
410 agent.push_message(msg);
411 assert_eq!(
412 agent.providers.cached_prompt_tokens,
413 before + expected_delta
414 );
415 }
416
417 #[test]
418 fn recompute_prompt_tokens_matches_sum() {
419 let provider = mock_provider(vec![]);
420 let channel = MockChannel::new(vec![]);
421 let registry = create_test_registry();
422 let executor = MockToolExecutor::no_tools();
423 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
424
425 agent.msg.messages.push(Message {
426 role: Role::User,
427 content: "1234".to_string(),
428 parts: vec![],
429 metadata: MessageMetadata::default(),
430 });
431 agent.msg.messages.push(Message {
432 role: Role::Assistant,
433 content: "5678".to_string(),
434 parts: vec![],
435 metadata: MessageMetadata::default(),
436 });
437
438 agent.recompute_prompt_tokens();
439
440 let expected: u64 = agent
441 .msg
442 .messages
443 .iter()
444 .map(|m| agent.metrics.token_counter.count_message_tokens(m) as u64)
445 .sum();
446 assert_eq!(agent.providers.cached_prompt_tokens, expected);
447 }
448
449 #[test]
450 fn inject_code_context_into_messages_with_existing_content() {
451 let provider = mock_provider(vec![]);
452 let channel = MockChannel::new(vec![]);
453 let registry = create_test_registry();
454 let executor = MockToolExecutor::no_tools();
455 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
456
457 agent.push_message(Message {
459 role: Role::User,
460 content: "question".to_string(),
461 parts: vec![],
462 metadata: MessageMetadata::default(),
463 });
464
465 agent.inject_code_context("some code here");
466
467 let found = agent.msg.messages.iter().any(|m| {
468 m.parts.iter().any(|p| {
469 matches!(p, MessagePart::CodeContext { text } if text.contains("some code here"))
470 })
471 });
472 assert!(found, "code context should be injected into messages");
473 }
474
475 #[test]
476 fn inject_code_context_empty_text_is_noop() {
477 let provider = mock_provider(vec![]);
478 let channel = MockChannel::new(vec![]);
479 let registry = create_test_registry();
480 let executor = MockToolExecutor::no_tools();
481 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
482
483 agent.push_message(Message {
484 role: Role::User,
485 content: "question".to_string(),
486 parts: vec![],
487 metadata: MessageMetadata::default(),
488 });
489 let count_before = agent.msg.messages.len();
490
491 agent.inject_code_context("");
492
493 assert_eq!(agent.msg.messages.len(), count_before);
495 }
496
497 #[test]
498 fn inject_code_context_with_single_message_is_noop() {
499 let provider = mock_provider(vec![]);
500 let channel = MockChannel::new(vec![]);
501 let registry = create_test_registry();
502 let executor = MockToolExecutor::no_tools();
503 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
504 let count_before = agent.msg.messages.len();
506
507 agent.inject_code_context("some code");
508
509 assert_eq!(agent.msg.messages.len(), count_before);
510 }
511
512 #[test]
513 fn context_messages_returns_all_messages() {
514 let provider = mock_provider(vec![]);
515 let channel = MockChannel::new(vec![]);
516 let registry = create_test_registry();
517 let executor = MockToolExecutor::no_tools();
518 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
519
520 agent.push_message(Message {
521 role: Role::User,
522 content: "test".to_string(),
523 parts: vec![],
524 metadata: MessageMetadata::default(),
525 });
526
527 assert_eq!(agent.context_messages().len(), agent.msg.messages.len());
528 }
529
530 #[test]
531 fn truncate_old_tool_results_truncates_stale_content() {
532 let provider = mock_provider(vec![]);
533 let channel = MockChannel::new(vec![]);
534 let registry = create_test_registry();
535 let executor = MockToolExecutor::no_tools();
536 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
537
538 let big_content = "x".repeat(4096);
539
540 agent.msg.messages.push(Message {
542 role: Role::User,
543 content: String::new(),
544 parts: vec![MessagePart::ToolResult {
545 tool_use_id: "id1".to_string(),
546 content: big_content.clone(),
547 is_error: false,
548 }],
549 metadata: MessageMetadata::default(),
550 });
551 agent.msg.messages.push(Message {
553 role: Role::User,
554 content: String::new(),
555 parts: vec![MessagePart::ToolOutput {
556 tool_name: "shell".into(),
557 body: big_content.clone(),
558 compacted_at: None,
559 }],
560 metadata: MessageMetadata::default(),
561 });
562 agent.msg.messages.push(Message {
564 role: Role::Assistant,
565 content: "reply".to_string(),
566 parts: vec![MessagePart::ToolResult {
567 tool_use_id: "id3".to_string(),
568 content: big_content.clone(),
569 is_error: false,
570 }],
571 metadata: MessageMetadata::default(),
572 });
573 agent.msg.messages.push(Message {
575 role: Role::User,
576 content: "last".to_string(),
577 parts: vec![MessagePart::ToolResult {
578 tool_use_id: "id4".to_string(),
579 content: big_content.clone(),
580 is_error: false,
581 }],
582 metadata: MessageMetadata::default(),
583 });
584
585 let base = agent.msg.messages.len() - 4;
587
588 agent.truncate_old_tool_results();
589
590 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base].parts[0] {
592 assert!(
593 content.ends_with("…[truncated]"),
594 "msg[base] should be truncated"
595 );
596 assert!(content.len() <= 2048 + 16);
597 } else {
598 panic!("expected ToolResult at msg[base]");
599 }
600
601 if let MessagePart::ToolOutput { body, .. } = &agent.msg.messages[base + 1].parts[0] {
603 assert!(
604 body.ends_with("…[truncated]"),
605 "msg[base+1] should be truncated"
606 );
607 } else {
608 panic!("expected ToolOutput at msg[base+1]");
609 }
610
611 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 2].parts[0] {
613 assert_eq!(content.len(), 4096, "msg[base+2] should NOT be truncated");
614 } else {
615 panic!("expected ToolResult at msg[base+2]");
616 }
617 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 3].parts[0] {
618 assert_eq!(content.len(), 4096, "msg[base+3] should NOT be truncated");
619 } else {
620 panic!("expected ToolResult at msg[base+3]");
621 }
622 }
623
624 #[test]
625 fn truncate_old_tool_results_noop_when_few_messages() {
626 let provider = mock_provider(vec![]);
627 let channel = MockChannel::new(vec![]);
628 let registry = create_test_registry();
629 let executor = MockToolExecutor::no_tools();
630 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
631
632 let big = "y".repeat(4096);
633 agent.msg.messages.push(Message {
634 role: Role::User,
635 content: String::new(),
636 parts: vec![MessagePart::ToolResult {
637 tool_use_id: "id".to_string(),
638 content: big.clone(),
639 is_error: false,
640 }],
641 metadata: MessageMetadata::default(),
642 });
643 agent.msg.messages.push(Message {
644 role: Role::Assistant,
645 content: "ok".to_string(),
646 parts: vec![MessagePart::ToolResult {
647 tool_use_id: "id2".to_string(),
648 content: big.clone(),
649 is_error: false,
650 }],
651 metadata: MessageMetadata::default(),
652 });
653
654 let len_before = agent.msg.messages.len();
656 agent.truncate_old_tool_results();
657
658 assert_eq!(agent.msg.messages.len(), len_before);
660 if let MessagePart::ToolResult { content, .. } =
661 &agent.msg.messages[len_before - 2].parts[0]
662 {
663 assert_eq!(
664 content.len(),
665 4096,
666 "second-to-last should not be truncated"
667 );
668 } else {
669 panic!("expected ToolResult");
670 }
671 if let MessagePart::ToolResult { content, .. } =
672 &agent.msg.messages[len_before - 1].parts[0]
673 {
674 assert_eq!(content.len(), 4096, "last should not be truncated");
675 } else {
676 panic!("expected ToolResult");
677 }
678 }
679
680 fn make_timings(ctx: u64, llm: u64, tool: u64, persist: u64) -> crate::metrics::TurnTimings {
681 crate::metrics::TurnTimings {
682 prepare_context_ms: ctx,
683 llm_chat_ms: llm,
684 tool_exec_ms: tool,
685 persist_message_ms: persist,
686 }
687 }
688
689 fn agent_with_metrics_watch() -> (
690 Agent<MockChannel>,
691 tokio::sync::watch::Receiver<crate::metrics::MetricsSnapshot>,
692 ) {
693 let provider = mock_provider(vec![]);
694 let channel = MockChannel::new(vec![]);
695 let registry = create_test_registry();
696 let executor = MockToolExecutor::no_tools();
697 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
698
699 let (tx, rx) = tokio::sync::watch::channel(crate::metrics::MetricsSnapshot::default());
700 agent.metrics.metrics_tx = Some(tx);
701 (agent, rx)
702 }
703
704 #[test]
706 fn flush_turn_timings_single_flush() {
707 let (mut agent, rx) = agent_with_metrics_watch();
708
709 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
710 agent.flush_turn_timings();
711
712 let snap = rx.borrow();
713 assert_eq!(snap.last_turn_timings.prepare_context_ms, 10);
714 assert_eq!(snap.last_turn_timings.llm_chat_ms, 200);
715 assert_eq!(snap.last_turn_timings.tool_exec_ms, 50);
716 assert_eq!(snap.last_turn_timings.persist_message_ms, 5);
717 assert_eq!(snap.timing_sample_count, 1);
718 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 200);
720 }
721
722 #[test]
724 fn flush_turn_timings_resets_pending() {
725 let provider = mock_provider(vec![]);
726 let channel = MockChannel::new(vec![]);
727 let registry = create_test_registry();
728 let executor = MockToolExecutor::no_tools();
729 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
730
731 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
732 agent.flush_turn_timings();
733
734 let p = &agent.metrics.pending_timings;
735 assert_eq!(p.prepare_context_ms, 0);
736 assert_eq!(p.llm_chat_ms, 0);
737 assert_eq!(p.tool_exec_ms, 0);
738 assert_eq!(p.persist_message_ms, 0);
739 }
740
741 #[test]
743 fn flush_turn_timings_window_capped_at_10() {
744 let (mut agent, rx) = agent_with_metrics_watch();
745
746 for i in 1_u64..=12 {
748 agent.metrics.pending_timings = make_timings(0, i * 10, 0, 0);
749 agent.flush_turn_timings();
750 }
751
752 let snap = rx.borrow();
753 assert_eq!(snap.timing_sample_count, 10);
755 assert_eq!(snap.max_turn_timings.llm_chat_ms, 120);
757 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 75);
759 }
760}