1use zeph_llm::provider::{LlmProvider, Message, MessagePart, Role};
5
6use super::{Agent, CODE_CONTEXT_PREFIX};
7use crate::channel::Channel;
8use crate::metrics::{MetricsSnapshot, SECURITY_EVENT_CAP, SecurityEvent, SecurityEventCategory};
9use zeph_tools::FilterStats;
10
11impl<C: Channel> Agent<C> {
12 pub fn sync_community_detection_failures(&self) {
14 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
15 let failures = memory.community_detection_failures();
16 self.update_metrics(|m| {
17 m.graph_community_detection_failures = failures;
18 });
19 }
20 }
21
22 pub fn sync_graph_extraction_metrics(&self) {
24 if let Some(memory) = self.memory_state.persistence.memory.as_ref() {
25 let count = memory.graph_extraction_count();
26 let failures = memory.graph_extraction_failures();
27 self.update_metrics(|m| {
28 m.graph_extraction_count = count;
29 m.graph_extraction_failures = failures;
30 });
31 }
32 }
33
34 pub async fn sync_graph_counts(&self) {
36 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
37 return;
38 };
39 let Some(store) = memory.graph_store.as_ref() else {
40 return;
41 };
42 let (entities, edges, communities) = tokio::join!(
43 store.entity_count(),
44 store.active_edge_count(),
45 store.community_count()
46 );
47 self.update_metrics(|m| {
48 m.graph_entities_total = entities.unwrap_or(0).cast_unsigned();
49 m.graph_edges_total = edges.unwrap_or(0).cast_unsigned();
50 m.graph_communities_total = communities.unwrap_or(0).cast_unsigned();
51 });
52 }
53
54 pub async fn check_vector_store_health(&self, backend_name: &str) {
56 let connected = match self.memory_state.persistence.memory.as_ref() {
57 Some(m) => m.is_vector_store_connected().await,
58 None => false,
59 };
60 let name = backend_name.to_owned();
61 self.update_metrics(|m| {
62 m.qdrant_available = connected;
63 m.vector_backend = name;
64 });
65 }
66
67 pub async fn sync_guidelines_status(&self) {
72 let Some(memory) = self.memory_state.persistence.memory.as_ref() else {
73 return;
74 };
75 let cid = self.memory_state.persistence.conversation_id;
76 match memory.sqlite().load_compression_guidelines_meta(cid).await {
77 Ok((version, created_at)) => {
78 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
79 let version_u32 = u32::try_from(version).unwrap_or(0);
80 self.update_metrics(|m| {
81 m.guidelines_version = version_u32;
82 m.guidelines_updated_at = created_at;
83 });
84 }
85 Err(e) => {
86 tracing::warn!("failed to sync guidelines status: {e:#}");
87 }
88 }
89 }
90
91 pub(super) fn record_filter_metrics(&mut self, fs: &FilterStats) {
92 let saved = fs.estimated_tokens_saved() as u64;
93 let raw = (fs.raw_chars / 4) as u64;
94 let confidence = fs.confidence;
95 let was_filtered = fs.filtered_chars < fs.raw_chars;
96 self.update_metrics(|m| {
97 m.filter_raw_tokens += raw;
98 m.filter_saved_tokens += saved;
99 m.filter_applications += 1;
100 m.filter_total_commands += 1;
101 if was_filtered {
102 m.filter_filtered_commands += 1;
103 }
104 if let Some(c) = confidence {
105 match c {
106 zeph_tools::FilterConfidence::Full => {
107 m.filter_confidence_full += 1;
108 }
109 zeph_tools::FilterConfidence::Partial => {
110 m.filter_confidence_partial += 1;
111 }
112 zeph_tools::FilterConfidence::Fallback => {
113 m.filter_confidence_fallback += 1;
114 }
115 }
116 }
117 });
118 }
119
120 pub(super) fn update_metrics(&self, f: impl FnOnce(&mut MetricsSnapshot)) {
121 if let Some(ref tx) = self.metrics.metrics_tx {
122 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
123 tx.send_modify(|m| {
124 m.uptime_seconds = elapsed;
125 f(m);
126 });
127 }
128 }
129
130 pub(super) fn flush_turn_timings(&mut self) {
135 let timings = std::mem::take(&mut self.metrics.pending_timings);
136 tracing::debug!(
137 prepare_context_ms = timings.prepare_context_ms,
138 llm_chat_ms = timings.llm_chat_ms,
139 tool_exec_ms = timings.tool_exec_ms,
140 persist_message_ms = timings.persist_message_ms,
141 "turn timings"
142 );
143
144 if self.metrics.timing_window.len() >= 10 {
145 self.metrics.timing_window.pop_front();
146 }
147 self.metrics.timing_window.push_back(timings.clone());
148
149 let count = self.metrics.timing_window.len();
150 let mut avg = crate::metrics::TurnTimings::default();
151 let mut max = crate::metrics::TurnTimings::default();
152 for t in &self.metrics.timing_window {
153 avg.prepare_context_ms = avg.prepare_context_ms.saturating_add(t.prepare_context_ms);
154 avg.llm_chat_ms = avg.llm_chat_ms.saturating_add(t.llm_chat_ms);
155 avg.tool_exec_ms = avg.tool_exec_ms.saturating_add(t.tool_exec_ms);
156 avg.persist_message_ms = avg.persist_message_ms.saturating_add(t.persist_message_ms);
157
158 max.prepare_context_ms = max.prepare_context_ms.max(t.prepare_context_ms);
159 max.llm_chat_ms = max.llm_chat_ms.max(t.llm_chat_ms);
160 max.tool_exec_ms = max.tool_exec_ms.max(t.tool_exec_ms);
161 max.persist_message_ms = max.persist_message_ms.max(t.persist_message_ms);
162 }
163 let n = count as u64;
164 avg.prepare_context_ms /= n;
165 avg.llm_chat_ms /= n;
166 avg.tool_exec_ms /= n;
167 avg.persist_message_ms /= n;
168
169 let total_ms = timings
170 .prepare_context_ms
171 .saturating_add(timings.llm_chat_ms)
172 .saturating_add(timings.tool_exec_ms)
173 .saturating_add(timings.persist_message_ms);
174
175 self.update_metrics(|m| {
176 m.last_turn_timings = timings;
177 m.avg_turn_timings = avg;
178 m.max_turn_timings = max;
179 m.timing_sample_count = n;
180 });
181
182 if let Some(ref recorder) = self.metrics.histogram_recorder {
183 recorder.observe_turn_duration(std::time::Duration::from_millis(total_ms));
184 }
185 }
186
187 pub(super) fn push_classifier_metrics(&self) {
192 if let Some(ref m) = self.metrics.classifier_metrics {
193 let snapshot = m.snapshot();
194 self.update_metrics(|ms| ms.classifier = snapshot);
195 }
196 }
197
198 pub(super) fn push_security_event(
199 &self,
200 category: SecurityEventCategory,
201 source: &str,
202 detail: impl Into<String>,
203 ) {
204 if let Some(ref tx) = self.metrics.metrics_tx {
205 let event = SecurityEvent::new(category, source, detail);
206 let elapsed = self.lifecycle.start_time.elapsed().as_secs();
207 tx.send_modify(|m| {
208 m.uptime_seconds = elapsed;
209 if m.security_events.len() >= SECURITY_EVENT_CAP {
210 m.security_events.pop_front();
211 }
212 m.security_events.push_back(event);
213 });
214 }
215 }
216
217 pub(super) fn recompute_prompt_tokens(&mut self) {
218 self.providers.cached_prompt_tokens = self
219 .msg
220 .messages
221 .iter()
222 .map(|m| self.metrics.token_counter.count_message_tokens(m) as u64)
223 .sum();
224 }
225
226 pub(super) fn push_message(&mut self, msg: Message) {
227 self.providers.cached_prompt_tokens +=
228 self.metrics.token_counter.count_message_tokens(&msg) as u64;
229 if msg.role == zeph_llm::provider::Role::Assistant {
230 self.session.last_assistant_at = Some(std::time::Instant::now());
231 }
232 self.msg.messages.push(msg);
233 self.detect_magic_docs_in_messages();
235 }
236
237 pub(crate) fn record_cost_and_cache(&self, input_tokens: u64, output_tokens: u64) {
238 let (cache_write, cache_read) = self.provider.last_cache_usage().unwrap_or((0, 0));
239
240 if let Some(ref tracker) = self.metrics.cost_tracker {
241 let provider_name = if self.runtime.active_provider_name.is_empty() {
242 self.provider.name()
243 } else {
244 self.runtime.active_provider_name.as_str()
245 };
246 tracker.record_usage(
247 provider_name,
248 self.provider.provider_kind_str(),
249 &self.runtime.model_name,
250 input_tokens,
251 cache_read,
252 cache_write,
253 output_tokens,
254 );
255 let breakdown = tracker.provider_breakdown();
256 self.update_metrics(|m| {
257 m.cost_spent_cents = tracker.current_spend();
258 m.cache_creation_tokens += cache_write;
259 m.cache_read_tokens += cache_read;
260 m.provider_cost_breakdown = breakdown;
261 });
262 } else if cache_write > 0 || cache_read > 0 {
263 self.update_metrics(|m| {
264 m.cache_creation_tokens += cache_write;
265 m.cache_read_tokens += cache_read;
266 });
267 }
268 }
269
270 pub fn inject_code_context(&mut self, text: &str) {
273 self.remove_code_context_messages();
274 if text.is_empty() || self.msg.messages.len() <= 1 {
275 return;
276 }
277 let content = format!("{CODE_CONTEXT_PREFIX}{text}");
278 self.msg.messages.insert(
279 1,
280 Message::from_parts(
281 Role::System,
282 vec![MessagePart::CodeContext { text: content }],
283 ),
284 );
285 }
286
287 #[must_use]
288 pub fn context_messages(&self) -> &[Message] {
289 &self.msg.messages
290 }
291
292 pub(super) fn truncate_old_tool_results(&mut self) {
302 const LIMIT: usize = 2048;
303 const SUFFIX: &str = "…[truncated]";
304
305 let len = self.msg.messages.len();
306 if len <= 2 {
307 return;
308 }
309 for msg in &mut self.msg.messages[..len - 2] {
310 for part in &mut msg.parts {
311 match part {
312 MessagePart::ToolResult { content, .. } if content.len() > LIMIT => {
313 content.truncate(content.floor_char_boundary(LIMIT));
314 content.push_str(SUFFIX);
315 }
316 MessagePart::ToolOutput { body, .. } if body.len() > LIMIT => {
317 body.truncate(body.floor_char_boundary(LIMIT));
318 body.push_str(SUFFIX);
319 }
320 _ => {}
321 }
322 }
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::super::agent_tests::{
330 MockChannel, MockToolExecutor, create_test_registry, mock_provider,
331 };
332 use super::*;
333 use zeph_llm::provider::{MessageMetadata, MessagePart};
334
335 #[test]
336 fn push_message_increments_cached_tokens() {
337 let provider = mock_provider(vec![]);
338 let channel = MockChannel::new(vec![]);
339 let registry = create_test_registry();
340 let executor = MockToolExecutor::no_tools();
341 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
342
343 let before = agent.providers.cached_prompt_tokens;
344 let msg = Message {
345 role: Role::User,
346 content: "hello world!!".to_string(),
347 parts: vec![],
348 metadata: MessageMetadata::default(),
349 };
350 let expected_delta = agent.metrics.token_counter.count_message_tokens(&msg) as u64;
351 agent.push_message(msg);
352 assert_eq!(
353 agent.providers.cached_prompt_tokens,
354 before + expected_delta
355 );
356 }
357
358 #[test]
359 fn recompute_prompt_tokens_matches_sum() {
360 let provider = mock_provider(vec![]);
361 let channel = MockChannel::new(vec![]);
362 let registry = create_test_registry();
363 let executor = MockToolExecutor::no_tools();
364 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
365
366 agent.msg.messages.push(Message {
367 role: Role::User,
368 content: "1234".to_string(),
369 parts: vec![],
370 metadata: MessageMetadata::default(),
371 });
372 agent.msg.messages.push(Message {
373 role: Role::Assistant,
374 content: "5678".to_string(),
375 parts: vec![],
376 metadata: MessageMetadata::default(),
377 });
378
379 agent.recompute_prompt_tokens();
380
381 let expected: u64 = agent
382 .msg
383 .messages
384 .iter()
385 .map(|m| agent.metrics.token_counter.count_message_tokens(m) as u64)
386 .sum();
387 assert_eq!(agent.providers.cached_prompt_tokens, expected);
388 }
389
390 #[test]
391 fn inject_code_context_into_messages_with_existing_content() {
392 let provider = mock_provider(vec![]);
393 let channel = MockChannel::new(vec![]);
394 let registry = create_test_registry();
395 let executor = MockToolExecutor::no_tools();
396 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
397
398 agent.push_message(Message {
400 role: Role::User,
401 content: "question".to_string(),
402 parts: vec![],
403 metadata: MessageMetadata::default(),
404 });
405
406 agent.inject_code_context("some code here");
407
408 let found = agent.msg.messages.iter().any(|m| {
409 m.parts.iter().any(|p| {
410 matches!(p, MessagePart::CodeContext { text } if text.contains("some code here"))
411 })
412 });
413 assert!(found, "code context should be injected into messages");
414 }
415
416 #[test]
417 fn inject_code_context_empty_text_is_noop() {
418 let provider = mock_provider(vec![]);
419 let channel = MockChannel::new(vec![]);
420 let registry = create_test_registry();
421 let executor = MockToolExecutor::no_tools();
422 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
423
424 agent.push_message(Message {
425 role: Role::User,
426 content: "question".to_string(),
427 parts: vec![],
428 metadata: MessageMetadata::default(),
429 });
430 let count_before = agent.msg.messages.len();
431
432 agent.inject_code_context("");
433
434 assert_eq!(agent.msg.messages.len(), count_before);
436 }
437
438 #[test]
439 fn inject_code_context_with_single_message_is_noop() {
440 let provider = mock_provider(vec![]);
441 let channel = MockChannel::new(vec![]);
442 let registry = create_test_registry();
443 let executor = MockToolExecutor::no_tools();
444 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
445 let count_before = agent.msg.messages.len();
447
448 agent.inject_code_context("some code");
449
450 assert_eq!(agent.msg.messages.len(), count_before);
451 }
452
453 #[test]
454 fn context_messages_returns_all_messages() {
455 let provider = mock_provider(vec![]);
456 let channel = MockChannel::new(vec![]);
457 let registry = create_test_registry();
458 let executor = MockToolExecutor::no_tools();
459 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
460
461 agent.push_message(Message {
462 role: Role::User,
463 content: "test".to_string(),
464 parts: vec![],
465 metadata: MessageMetadata::default(),
466 });
467
468 assert_eq!(agent.context_messages().len(), agent.msg.messages.len());
469 }
470
471 #[test]
472 fn truncate_old_tool_results_truncates_stale_content() {
473 let provider = mock_provider(vec![]);
474 let channel = MockChannel::new(vec![]);
475 let registry = create_test_registry();
476 let executor = MockToolExecutor::no_tools();
477 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
478
479 let big_content = "x".repeat(4096);
480
481 agent.msg.messages.push(Message {
483 role: Role::User,
484 content: String::new(),
485 parts: vec![MessagePart::ToolResult {
486 tool_use_id: "id1".to_string(),
487 content: big_content.clone(),
488 is_error: false,
489 }],
490 metadata: MessageMetadata::default(),
491 });
492 agent.msg.messages.push(Message {
494 role: Role::User,
495 content: String::new(),
496 parts: vec![MessagePart::ToolOutput {
497 tool_name: "shell".into(),
498 body: big_content.clone(),
499 compacted_at: None,
500 }],
501 metadata: MessageMetadata::default(),
502 });
503 agent.msg.messages.push(Message {
505 role: Role::Assistant,
506 content: "reply".to_string(),
507 parts: vec![MessagePart::ToolResult {
508 tool_use_id: "id3".to_string(),
509 content: big_content.clone(),
510 is_error: false,
511 }],
512 metadata: MessageMetadata::default(),
513 });
514 agent.msg.messages.push(Message {
516 role: Role::User,
517 content: "last".to_string(),
518 parts: vec![MessagePart::ToolResult {
519 tool_use_id: "id4".to_string(),
520 content: big_content.clone(),
521 is_error: false,
522 }],
523 metadata: MessageMetadata::default(),
524 });
525
526 let base = agent.msg.messages.len() - 4;
528
529 agent.truncate_old_tool_results();
530
531 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base].parts[0] {
533 assert!(
534 content.ends_with("…[truncated]"),
535 "msg[base] should be truncated"
536 );
537 assert!(content.len() <= 2048 + 16);
538 } else {
539 panic!("expected ToolResult at msg[base]");
540 }
541
542 if let MessagePart::ToolOutput { body, .. } = &agent.msg.messages[base + 1].parts[0] {
544 assert!(
545 body.ends_with("…[truncated]"),
546 "msg[base+1] should be truncated"
547 );
548 } else {
549 panic!("expected ToolOutput at msg[base+1]");
550 }
551
552 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 2].parts[0] {
554 assert_eq!(content.len(), 4096, "msg[base+2] should NOT be truncated");
555 } else {
556 panic!("expected ToolResult at msg[base+2]");
557 }
558 if let MessagePart::ToolResult { content, .. } = &agent.msg.messages[base + 3].parts[0] {
559 assert_eq!(content.len(), 4096, "msg[base+3] should NOT be truncated");
560 } else {
561 panic!("expected ToolResult at msg[base+3]");
562 }
563 }
564
565 #[test]
566 fn truncate_old_tool_results_noop_when_few_messages() {
567 let provider = mock_provider(vec![]);
568 let channel = MockChannel::new(vec![]);
569 let registry = create_test_registry();
570 let executor = MockToolExecutor::no_tools();
571 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
572
573 let big = "y".repeat(4096);
574 agent.msg.messages.push(Message {
575 role: Role::User,
576 content: String::new(),
577 parts: vec![MessagePart::ToolResult {
578 tool_use_id: "id".to_string(),
579 content: big.clone(),
580 is_error: false,
581 }],
582 metadata: MessageMetadata::default(),
583 });
584 agent.msg.messages.push(Message {
585 role: Role::Assistant,
586 content: "ok".to_string(),
587 parts: vec![MessagePart::ToolResult {
588 tool_use_id: "id2".to_string(),
589 content: big.clone(),
590 is_error: false,
591 }],
592 metadata: MessageMetadata::default(),
593 });
594
595 let len_before = agent.msg.messages.len();
597 agent.truncate_old_tool_results();
598
599 assert_eq!(agent.msg.messages.len(), len_before);
601 if let MessagePart::ToolResult { content, .. } =
602 &agent.msg.messages[len_before - 2].parts[0]
603 {
604 assert_eq!(
605 content.len(),
606 4096,
607 "second-to-last should not be truncated"
608 );
609 } else {
610 panic!("expected ToolResult");
611 }
612 if let MessagePart::ToolResult { content, .. } =
613 &agent.msg.messages[len_before - 1].parts[0]
614 {
615 assert_eq!(content.len(), 4096, "last should not be truncated");
616 } else {
617 panic!("expected ToolResult");
618 }
619 }
620
621 fn make_timings(ctx: u64, llm: u64, tool: u64, persist: u64) -> crate::metrics::TurnTimings {
622 crate::metrics::TurnTimings {
623 prepare_context_ms: ctx,
624 llm_chat_ms: llm,
625 tool_exec_ms: tool,
626 persist_message_ms: persist,
627 }
628 }
629
630 fn agent_with_metrics_watch() -> (
631 Agent<MockChannel>,
632 tokio::sync::watch::Receiver<crate::metrics::MetricsSnapshot>,
633 ) {
634 let provider = mock_provider(vec![]);
635 let channel = MockChannel::new(vec![]);
636 let registry = create_test_registry();
637 let executor = MockToolExecutor::no_tools();
638 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
639
640 let (tx, rx) = tokio::sync::watch::channel(crate::metrics::MetricsSnapshot::default());
641 agent.metrics.metrics_tx = Some(tx);
642 (agent, rx)
643 }
644
645 #[test]
647 fn flush_turn_timings_single_flush() {
648 let (mut agent, rx) = agent_with_metrics_watch();
649
650 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
651 agent.flush_turn_timings();
652
653 let snap = rx.borrow();
654 assert_eq!(snap.last_turn_timings.prepare_context_ms, 10);
655 assert_eq!(snap.last_turn_timings.llm_chat_ms, 200);
656 assert_eq!(snap.last_turn_timings.tool_exec_ms, 50);
657 assert_eq!(snap.last_turn_timings.persist_message_ms, 5);
658 assert_eq!(snap.timing_sample_count, 1);
659 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 200);
661 }
662
663 #[test]
665 fn flush_turn_timings_resets_pending() {
666 let provider = mock_provider(vec![]);
667 let channel = MockChannel::new(vec![]);
668 let registry = create_test_registry();
669 let executor = MockToolExecutor::no_tools();
670 let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
671
672 agent.metrics.pending_timings = make_timings(10, 200, 50, 5);
673 agent.flush_turn_timings();
674
675 let p = &agent.metrics.pending_timings;
676 assert_eq!(p.prepare_context_ms, 0);
677 assert_eq!(p.llm_chat_ms, 0);
678 assert_eq!(p.tool_exec_ms, 0);
679 assert_eq!(p.persist_message_ms, 0);
680 }
681
682 #[test]
684 fn flush_turn_timings_window_capped_at_10() {
685 let (mut agent, rx) = agent_with_metrics_watch();
686
687 for i in 1_u64..=12 {
689 agent.metrics.pending_timings = make_timings(0, i * 10, 0, 0);
690 agent.flush_turn_timings();
691 }
692
693 let snap = rx.borrow();
694 assert_eq!(snap.timing_sample_count, 10);
696 assert_eq!(snap.max_turn_timings.llm_chat_ms, 120);
698 assert_eq!(snap.avg_turn_timings.llm_chat_ms, 75);
700 }
701}