1use std::fmt::Write as _;
14use std::time::Instant;
15
16use zeph_config::ContextFormat;
17use zeph_llm::provider::{Message, MessagePart, Role};
18use zeph_memory::{RetrievalFailureRecord, RetrievalFailureType, TokenCounter};
19
20use crate::error::ContextError;
21use crate::state::ContextAssemblyView;
22
23pub const PERSONA_PREFIX: &str = "[Persona context]\n";
25pub const TRAJECTORY_PREFIX: &str = "[Past experience]\n";
27pub const TREE_MEMORY_PREFIX: &str = "[Memory summary]\n";
29pub const REASONING_PREFIX: &str = "[Reasoning Strategy]\n";
31
32pub const GRAPH_FACTS_PREFIX: &str = "[known facts]\n";
34pub const RECALL_PREFIX: &str = "[semantic recall]\n";
36pub const SUMMARY_PREFIX: &str = "[conversation summaries]\n";
38pub const CROSS_SESSION_PREFIX: &str = "[cross-session context]\n";
40
41pub const CORRECTIONS_PREFIX: &str = "[past corrections]\n";
43pub const CODE_CONTEXT_PREFIX: &str = "[code context]\n";
45pub const SESSION_DIGEST_PREFIX: &str = "[Session digest from previous interaction]\n";
47pub const LSP_NOTE_PREFIX: &str = "[lsp ";
49pub const DOCUMENT_RAG_PREFIX: &str = "## Relevant documents\n";
51
52#[must_use]
56pub fn truncate_chars(s: &str, max_chars: usize) -> String {
57 zeph_common::text::truncate_to_chars(s, max_chars)
58}
59
60#[must_use]
65pub fn format_correction_note(correction_text: &str) -> String {
66 format!(
67 "- Past user correction: \"{}\"",
68 truncate_chars(correction_text, 200)
69 )
70}
71
72pub fn effective_recall_timeout_ms(configured: u64) -> u64 {
77 if configured == 0 {
78 tracing::warn!(
79 "recall_timeout_ms is 0, which would disable spreading activation recall; \
80 clamping to 100ms"
81 );
82 100
83 } else {
84 configured
85 }
86}
87
88#[tracing::instrument(name = "agent_context.helpers.fetch_graph_facts", skip_all, err)]
99pub async fn fetch_graph_facts(
100 view: &ContextAssemblyView<'_>,
101 query: &str,
102 budget_tokens: usize,
103 tc: &TokenCounter,
104) -> Result<Option<Message>, ContextError> {
105 fetch_graph_facts_raw(
106 view.memory.as_deref(),
107 &view.graph_config,
108 query,
109 budget_tokens,
110 tc,
111 )
112 .await
113 .map_err(ContextError::Memory)
114}
115
116#[allow(clippy::too_many_lines, clippy::items_after_statements)]
127#[tracing::instrument(
128 name = "agent_context.helpers.fetch_graph_facts_raw",
129 skip_all,
130 err,
131 fields(effective_strategy)
132)]
133pub async fn fetch_graph_facts_raw(
134 memory: Option<&zeph_memory::semantic::SemanticMemory>,
135 graph_config: &zeph_config::GraphConfig,
136 query: &str,
137 budget_tokens: usize,
138 tc: &TokenCounter,
139) -> Result<Option<Message>, zeph_memory::MemoryError> {
140 if budget_tokens == 0 || !graph_config.enabled {
141 return Ok(None);
142 }
143 let Some(memory) = memory else {
144 return Ok(None);
145 };
146 let recall_limit = graph_config.recall_limit;
147 let temporal_decay_rate = graph_config.temporal_decay_rate;
148 let edge_types = zeph_memory::classify_graph_subgraph(query);
149 let sa_config = &graph_config.spreading_activation;
150
151 let mut body = String::from(GRAPH_FACTS_PREFIX);
152 let mut tokens_so_far = tc.count_tokens(&body);
153 let max_hops = graph_config.max_hops;
154
155 use zeph_config::memory::GraphRetrievalStrategy;
156 let effective_strategy = if sa_config.enabled {
157 GraphRetrievalStrategy::Synapse
158 } else {
159 graph_config.retrieval_strategy
160 };
161
162 tracing::Span::current().record(
163 "effective_strategy",
164 tracing::field::debug(&effective_strategy),
165 );
166 let strategy_str = format!("{effective_strategy:?}").to_lowercase();
167 let edge_types_json = serde_json::to_string(&edge_types).ok();
168
169 fn append_graph_facts(
171 facts: &[zeph_memory::graph::types::GraphFact],
172 body: &mut String,
173 tokens_so_far: &mut usize,
174 budget_tokens: usize,
175 tc: &TokenCounter,
176 ) -> usize {
177 let mut count = 0;
178 for f in facts {
179 let fact_text = f.fact.replace(['\n', '\r', '<', '>'], " ");
180 let line = format!("- {} (confidence: {:.2})\n", fact_text, f.confidence);
181 let line_tokens = tc.count_tokens(&line);
182 if *tokens_so_far + line_tokens > budget_tokens {
183 break;
184 }
185 body.push_str(&line);
186 *tokens_so_far += line_tokens;
187 count += 1;
188 }
189 count
190 }
191
192 match effective_strategy {
193 GraphRetrievalStrategy::Synapse => {
194 let sa_params = zeph_memory::graph::SpreadingActivationParams {
195 decay_lambda: sa_config.decay_lambda,
196 max_hops: sa_config.max_hops,
197 activation_threshold: sa_config.activation_threshold,
198 inhibition_threshold: sa_config.inhibition_threshold,
199 max_activated_nodes: sa_config.max_activated_nodes,
200 temporal_decay_rate,
201 seed_structural_weight: sa_config.seed_structural_weight,
202 seed_community_cap: sa_config.seed_community_cap,
203 alpha: sa_config.alpha,
204 };
205 let timeout_ms = effective_recall_timeout_ms(sa_config.recall_timeout_ms);
206 let t0 = Instant::now();
207 let activated_facts = match tokio::time::timeout(
208 std::time::Duration::from_millis(timeout_ms),
209 memory.recall_graph_activated(query, recall_limit, sa_params, &edge_types),
210 )
211 .await
212 {
213 Ok(Ok(facts)) => facts,
214 Ok(Err(e)) => {
215 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
216 tracing::warn!("spreading activation recall failed: {e:#}");
217 memory.log_retrieval_failure(RetrievalFailureRecord {
221 conversation_id: None,
222 turn_index: 0,
223 failure_type: RetrievalFailureType::Error,
224 retrieval_strategy: strategy_str.clone(),
225 query_text: query.to_owned(),
226 query_len: query.len(),
227 top_score: None,
228 confidence_threshold: None,
229 result_count: 0,
230 latency_ms,
231 edge_types: edge_types_json.clone(),
232 error_context: Some(format!("{e:#}")),
233 });
234 Vec::new()
235 }
236 Err(_) => {
237 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
238 tracing::warn!("spreading activation recall timed out ({timeout_ms}ms)");
239 memory.log_retrieval_failure(RetrievalFailureRecord {
240 conversation_id: None,
241 turn_index: 0,
242 failure_type: RetrievalFailureType::Timeout,
243 retrieval_strategy: strategy_str.clone(),
244 query_text: query.to_owned(),
245 query_len: query.len(),
246 top_score: None,
247 confidence_threshold: None,
248 result_count: 0,
249 latency_ms,
250 edge_types: edge_types_json.clone(),
251 error_context: Some(format!("timeout after {timeout_ms}ms")),
252 });
253 Vec::new()
254 }
255 };
256 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
257 if activated_facts.is_empty() {
258 memory.log_retrieval_failure(RetrievalFailureRecord {
259 conversation_id: None,
260 turn_index: 0,
261 failure_type: RetrievalFailureType::NoHit,
262 retrieval_strategy: strategy_str,
263 query_text: query.to_owned(),
264 query_len: query.len(),
265 top_score: None,
266 confidence_threshold: None,
267 result_count: 0,
268 latency_ms,
269 edge_types: edge_types_json,
270 error_context: None,
271 });
272 return Ok(None);
273 }
274 for f in &activated_facts {
275 let fact_text = f.edge.fact.replace(['\n', '\r', '<', '>'], " ");
276 let line = format!(
277 "- {} (confidence: {:.2}, activation: {:.2})\n",
278 fact_text, f.edge.confidence, f.activation_score
279 );
280 let line_tokens = tc.count_tokens(&line);
281 if tokens_so_far + line_tokens > budget_tokens {
282 break;
283 }
284 body.push_str(&line);
285 tokens_so_far += line_tokens;
286 }
287 }
288 GraphRetrievalStrategy::Bfs => {
289 let t0 = Instant::now();
290 let facts = match memory
291 .recall_graph(
292 query,
293 recall_limit,
294 max_hops,
295 None,
296 temporal_decay_rate,
297 &edge_types,
298 )
299 .await
300 {
301 Ok(f) => f,
302 Err(e) => {
303 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
304 memory.log_retrieval_failure(RetrievalFailureRecord {
305 conversation_id: None,
306 turn_index: 0,
307 failure_type: RetrievalFailureType::Error,
308 retrieval_strategy: strategy_str,
309 query_text: query.to_owned(),
310 query_len: query.len(),
311 top_score: None,
312 confidence_threshold: None,
313 result_count: 0,
314 latency_ms,
315 edge_types: edge_types_json,
316 error_context: Some(format!("{e:#}")),
317 });
318 return Err(e);
319 }
320 };
321 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
322 if facts.is_empty() {
323 memory.log_retrieval_failure(RetrievalFailureRecord {
324 conversation_id: None,
325 turn_index: 0,
326 failure_type: RetrievalFailureType::NoHit,
327 retrieval_strategy: strategy_str,
328 query_text: query.to_owned(),
329 query_len: query.len(),
330 top_score: None,
331 confidence_threshold: None,
332 result_count: 0,
333 latency_ms,
334 edge_types: edge_types_json,
335 error_context: None,
336 });
337 return Ok(None);
338 }
339 append_graph_facts(&facts, &mut body, &mut tokens_so_far, budget_tokens, tc);
340 }
341 GraphRetrievalStrategy::AStar => {
342 let t0 = Instant::now();
343 let facts = match memory
344 .recall_graph_astar(
345 query,
346 recall_limit,
347 max_hops,
348 temporal_decay_rate,
349 &edge_types,
350 )
351 .await
352 {
353 Ok(f) => f,
354 Err(e) => {
355 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
356 memory.log_retrieval_failure(RetrievalFailureRecord {
357 conversation_id: None,
358 turn_index: 0,
359 failure_type: RetrievalFailureType::Error,
360 retrieval_strategy: strategy_str,
361 query_text: query.to_owned(),
362 query_len: query.len(),
363 top_score: None,
364 confidence_threshold: None,
365 result_count: 0,
366 latency_ms,
367 edge_types: edge_types_json,
368 error_context: Some(format!("{e:#}")),
369 });
370 return Err(e);
371 }
372 };
373 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
374 if facts.is_empty() {
375 memory.log_retrieval_failure(RetrievalFailureRecord {
376 conversation_id: None,
377 turn_index: 0,
378 failure_type: RetrievalFailureType::NoHit,
379 retrieval_strategy: strategy_str,
380 query_text: query.to_owned(),
381 query_len: query.len(),
382 top_score: None,
383 confidence_threshold: None,
384 result_count: 0,
385 latency_ms,
386 edge_types: edge_types_json,
387 error_context: None,
388 });
389 return Ok(None);
390 }
391 append_graph_facts(&facts, &mut body, &mut tokens_so_far, budget_tokens, tc);
392 }
393 GraphRetrievalStrategy::WaterCircles => {
394 let ring_limit = graph_config.watercircles.ring_limit;
395 let t0 = Instant::now();
396 let facts = match memory
397 .recall_graph_watercircles(
398 query,
399 recall_limit,
400 max_hops,
401 ring_limit,
402 temporal_decay_rate,
403 &edge_types,
404 )
405 .await
406 {
407 Ok(f) => f,
408 Err(e) => {
409 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
410 memory.log_retrieval_failure(RetrievalFailureRecord {
411 conversation_id: None,
412 turn_index: 0,
413 failure_type: RetrievalFailureType::Error,
414 retrieval_strategy: strategy_str,
415 query_text: query.to_owned(),
416 query_len: query.len(),
417 top_score: None,
418 confidence_threshold: None,
419 result_count: 0,
420 latency_ms,
421 edge_types: edge_types_json,
422 error_context: Some(format!("{e:#}")),
423 });
424 return Err(e);
425 }
426 };
427 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
428 if facts.is_empty() {
429 memory.log_retrieval_failure(RetrievalFailureRecord {
430 conversation_id: None,
431 turn_index: 0,
432 failure_type: RetrievalFailureType::NoHit,
433 retrieval_strategy: strategy_str,
434 query_text: query.to_owned(),
435 query_len: query.len(),
436 top_score: None,
437 confidence_threshold: None,
438 result_count: 0,
439 latency_ms,
440 edge_types: edge_types_json,
441 error_context: None,
442 });
443 return Ok(None);
444 }
445 append_graph_facts(&facts, &mut body, &mut tokens_so_far, budget_tokens, tc);
446 }
447 GraphRetrievalStrategy::BeamSearch => {
448 let beam_width = graph_config.beam_search.beam_width;
449 let t0 = Instant::now();
450 let facts = match memory
451 .recall_graph_beam(
452 query,
453 recall_limit,
454 beam_width,
455 max_hops,
456 temporal_decay_rate,
457 &edge_types,
458 )
459 .await
460 {
461 Ok(f) => f,
462 Err(e) => {
463 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
464 memory.log_retrieval_failure(RetrievalFailureRecord {
465 conversation_id: None,
466 turn_index: 0,
467 failure_type: RetrievalFailureType::Error,
468 retrieval_strategy: strategy_str,
469 query_text: query.to_owned(),
470 query_len: query.len(),
471 top_score: None,
472 confidence_threshold: None,
473 result_count: 0,
474 latency_ms,
475 edge_types: edge_types_json,
476 error_context: Some(format!("{e:#}")),
477 });
478 return Err(e);
479 }
480 };
481 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
482 if facts.is_empty() {
483 memory.log_retrieval_failure(RetrievalFailureRecord {
484 conversation_id: None,
485 turn_index: 0,
486 failure_type: RetrievalFailureType::NoHit,
487 retrieval_strategy: strategy_str,
488 query_text: query.to_owned(),
489 query_len: query.len(),
490 top_score: None,
491 confidence_threshold: None,
492 result_count: 0,
493 latency_ms,
494 edge_types: edge_types_json,
495 error_context: None,
496 });
497 return Ok(None);
498 }
499 append_graph_facts(&facts, &mut body, &mut tokens_so_far, budget_tokens, tc);
500 }
501 GraphRetrievalStrategy::Hybrid => {
502 const CLASSIFIER_TIMEOUT_MS: u64 = 2_000;
503 let classifier_t0 = Instant::now();
504 let classified = if let Ok(s) = tokio::time::timeout(
505 std::time::Duration::from_millis(CLASSIFIER_TIMEOUT_MS),
506 memory.classify_graph_strategy(query),
507 )
508 .await
509 {
510 s
511 } else {
512 let latency_ms = classifier_t0
513 .elapsed()
514 .as_millis()
515 .try_into()
516 .unwrap_or(u64::MAX);
517 tracing::warn!(
518 "hybrid strategy classifier timed out after {CLASSIFIER_TIMEOUT_MS}ms, \
519 falling back to synapse"
520 );
521 memory.log_retrieval_failure(RetrievalFailureRecord {
522 conversation_id: None,
523 turn_index: 0,
524 failure_type: RetrievalFailureType::Timeout,
525 retrieval_strategy: "hybrid_classifier".to_owned(),
526 query_text: query.to_owned(),
527 query_len: query.len(),
528 top_score: None,
529 confidence_threshold: None,
530 result_count: 0,
531 latency_ms,
532 edge_types: edge_types_json.clone(),
533 error_context: Some(format!(
534 "classifier timeout after {CLASSIFIER_TIMEOUT_MS}ms"
535 )),
536 });
537 "synapse".to_owned()
538 };
539 tracing::debug!(classified_strategy = %classified, "hybrid dispatch: classified");
540 let t0 = Instant::now();
541 let facts = match classified.as_str() {
542 "astar" => {
543 match memory
544 .recall_graph_astar(
545 query,
546 recall_limit,
547 max_hops,
548 temporal_decay_rate,
549 &edge_types,
550 )
551 .await
552 {
553 Ok(f) => f,
554 Err(e) => {
555 let latency_ms =
556 t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
557 memory.log_retrieval_failure(RetrievalFailureRecord {
558 conversation_id: None,
559 turn_index: 0,
560 failure_type: RetrievalFailureType::Error,
561 retrieval_strategy: strategy_str,
562 query_text: query.to_owned(),
563 query_len: query.len(),
564 top_score: None,
565 confidence_threshold: None,
566 result_count: 0,
567 latency_ms,
568 edge_types: edge_types_json,
569 error_context: Some(format!("{e:#}")),
570 });
571 return Err(e);
572 }
573 }
574 }
575 "watercircles" => {
576 let ring_limit = graph_config.watercircles.ring_limit;
577 match memory
578 .recall_graph_watercircles(
579 query,
580 recall_limit,
581 max_hops,
582 ring_limit,
583 temporal_decay_rate,
584 &edge_types,
585 )
586 .await
587 {
588 Ok(f) => f,
589 Err(e) => {
590 let latency_ms =
591 t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
592 memory.log_retrieval_failure(RetrievalFailureRecord {
593 conversation_id: None,
594 turn_index: 0,
595 failure_type: RetrievalFailureType::Error,
596 retrieval_strategy: strategy_str,
597 query_text: query.to_owned(),
598 query_len: query.len(),
599 top_score: None,
600 confidence_threshold: None,
601 result_count: 0,
602 latency_ms,
603 edge_types: edge_types_json,
604 error_context: Some(format!("{e:#}")),
605 });
606 return Err(e);
607 }
608 }
609 }
610 "beam_search" => {
611 let beam_width = graph_config.beam_search.beam_width;
612 match memory
613 .recall_graph_beam(
614 query,
615 recall_limit,
616 beam_width,
617 max_hops,
618 temporal_decay_rate,
619 &edge_types,
620 )
621 .await
622 {
623 Ok(f) => f,
624 Err(e) => {
625 let latency_ms =
626 t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
627 memory.log_retrieval_failure(RetrievalFailureRecord {
628 conversation_id: None,
629 turn_index: 0,
630 failure_type: RetrievalFailureType::Error,
631 retrieval_strategy: strategy_str,
632 query_text: query.to_owned(),
633 query_len: query.len(),
634 top_score: None,
635 confidence_threshold: None,
636 result_count: 0,
637 latency_ms,
638 edge_types: edge_types_json,
639 error_context: Some(format!("{e:#}")),
640 });
641 return Err(e);
642 }
643 }
644 }
645 _ => {
646 let sa_params = zeph_memory::graph::SpreadingActivationParams {
647 decay_lambda: sa_config.decay_lambda,
648 max_hops: sa_config.max_hops,
649 activation_threshold: sa_config.activation_threshold,
650 inhibition_threshold: sa_config.inhibition_threshold,
651 max_activated_nodes: sa_config.max_activated_nodes,
652 temporal_decay_rate,
653 seed_structural_weight: sa_config.seed_structural_weight,
654 seed_community_cap: sa_config.seed_community_cap,
655 alpha: sa_config.alpha,
656 };
657 match memory
658 .recall_graph_activated(query, recall_limit, sa_params, &edge_types)
659 .await
660 {
661 Ok(activated) => activated
662 .into_iter()
663 .map(|f| zeph_memory::graph::types::GraphFact {
664 entity_name: f.edge.source_entity_id.to_string(),
665 relation: f.edge.relation.clone(),
666 target_name: f.edge.target_entity_id.to_string(),
667 fact: f.edge.fact.clone(),
668 entity_match_score: f.activation_score,
669 hop_distance: 0,
670 confidence: f.edge.confidence,
671 valid_from: Some(f.edge.valid_from.clone()),
672 edge_type: f.edge.edge_type,
673 retrieval_count: f.edge.retrieval_count,
674 edge_id: Some(f.edge.id),
675 })
676 .collect(),
677 Err(e) => {
678 let latency_ms =
679 t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
680 memory.log_retrieval_failure(RetrievalFailureRecord {
681 conversation_id: None,
682 turn_index: 0,
683 failure_type: RetrievalFailureType::Error,
684 retrieval_strategy: strategy_str,
685 query_text: query.to_owned(),
686 query_len: query.len(),
687 top_score: None,
688 confidence_threshold: None,
689 result_count: 0,
690 latency_ms,
691 edge_types: edge_types_json,
692 error_context: Some(format!("{e:#}")),
693 });
694 return Err(e);
695 }
696 }
697 }
698 };
699 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
700 if facts.is_empty() {
701 memory.log_retrieval_failure(RetrievalFailureRecord {
702 conversation_id: None,
703 turn_index: 0,
704 failure_type: RetrievalFailureType::NoHit,
705 retrieval_strategy: strategy_str,
706 query_text: query.to_owned(),
707 query_len: query.len(),
708 top_score: None,
709 confidence_threshold: None,
710 result_count: 0,
711 latency_ms,
712 edge_types: edge_types_json,
713 error_context: None,
714 });
715 return Ok(None);
716 }
717 append_graph_facts(&facts, &mut body, &mut tokens_so_far, budget_tokens, tc);
718 }
719 _ => {}
720 }
721
722 if body == GRAPH_FACTS_PREFIX {
723 return Ok(None);
724 }
725
726 Ok(Some(Message::from_legacy(Role::System, body)))
727}
728
729#[allow(clippy::too_many_arguments)]
740#[tracing::instrument(
741 name = "agent_context.helpers.fetch_semantic_recall_raw",
742 skip_all,
743 err
744)]
745pub async fn fetch_semantic_recall_raw(
746 memory: Option<&zeph_memory::semantic::SemanticMemory>,
747 recall_limit: usize,
748 context_format: ContextFormat,
749 query: &str,
750 token_budget: usize,
751 tc: &TokenCounter,
752 router: Option<&dyn zeph_memory::AsyncMemoryRouter>,
753 low_confidence_threshold: Option<f32>,
754) -> Result<(Option<Message>, Option<f32>), zeph_memory::MemoryError> {
755 let Some(memory) = memory else {
756 return Ok((None, None));
757 };
758 if recall_limit == 0 || token_budget == 0 {
759 return Ok((None, None));
760 }
761
762 let t0 = Instant::now();
763 let recalled = if let Some(r) = router {
764 memory
765 .recall_routed_async(query, recall_limit, None, r, None)
766 .await?
767 } else {
768 memory.recall(query, recall_limit, None).await?
769 };
770 let latency_ms = t0.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
771
772 if recalled.is_empty() {
773 memory.log_retrieval_failure(RetrievalFailureRecord {
774 conversation_id: None,
775 turn_index: 0,
776 failure_type: RetrievalFailureType::NoHit,
777 retrieval_strategy: "semantic".to_owned(),
778 query_text: query.to_owned(),
779 query_len: query.len(),
780 top_score: None,
781 confidence_threshold: low_confidence_threshold,
782 result_count: 0,
783 latency_ms,
784 edge_types: None,
785 error_context: None,
786 });
787 return Ok((None, None));
788 }
789
790 let top_score = recalled.first().map(|r| r.score);
791
792 if let (Some(score), Some(threshold)) = (top_score, low_confidence_threshold)
793 && score < threshold
794 {
795 memory.log_retrieval_failure(RetrievalFailureRecord {
796 conversation_id: None,
797 turn_index: 0,
798 failure_type: RetrievalFailureType::LowConfidence,
799 retrieval_strategy: "semantic".to_owned(),
800 query_text: query.to_owned(),
801 query_len: query.len(),
802 top_score: Some(score),
803 confidence_threshold: Some(threshold),
804 result_count: recalled.len(),
805 latency_ms,
806 edge_types: None,
807 error_context: None,
808 });
809 }
810 let initial_cap = (recall_limit * 512).min(token_budget * 3);
811 let mut recall_text = String::with_capacity(initial_cap);
812 recall_text.push_str(RECALL_PREFIX);
813 let mut tokens_used = tc.count_tokens(&recall_text);
814
815 for item in &recalled {
816 if item.message.content.starts_with("[skipped]")
817 || item.message.content.starts_with("[stopped]")
818 {
819 continue;
820 }
821 let entry = match context_format {
822 ContextFormat::Structured => format_structured_recall_entry(item),
823 _ => format_plain_recall_entry(item),
824 };
825 let entry_tokens = tc.count_tokens(&entry);
826 if tokens_used + entry_tokens > token_budget {
827 break;
828 }
829 recall_text.push_str(&entry);
830 tokens_used += entry_tokens;
831 }
832
833 if tokens_used > tc.count_tokens(RECALL_PREFIX) {
834 Ok((
835 Some(Message::from_parts(
836 Role::System,
837 vec![MessagePart::Recall { text: recall_text }],
838 )),
839 top_score,
840 ))
841 } else {
842 Ok((None, None))
843 }
844}
845
846#[tracing::instrument(name = "agent_context.helpers.fetch_summaries_raw", skip_all, err)]
854pub async fn fetch_summaries_raw(
855 memory: Option<&zeph_memory::semantic::SemanticMemory>,
856 conversation_id: Option<zeph_memory::ConversationId>,
857 token_budget: usize,
858 tc: &TokenCounter,
859) -> Result<Option<Message>, zeph_memory::MemoryError> {
860 let (Some(memory), Some(cid)) = (memory, conversation_id) else {
861 return Ok(None);
862 };
863 if token_budget == 0 {
864 return Ok(None);
865 }
866
867 let summaries = memory.load_summaries(cid).await?;
868 if summaries.is_empty() {
869 return Ok(None);
870 }
871
872 let mut summary_text = String::from(SUMMARY_PREFIX);
873 let mut tokens_used = tc.count_tokens(&summary_text);
874
875 for summary in summaries.iter().rev() {
876 let first = summary.first_message_id.map_or(0, |m| m.0);
877 let last = summary.last_message_id.map_or(0, |m| m.0);
878 let entry = format!("- Messages {first}-{last}: {}\n", summary.content);
879 let cost = tc.count_tokens(&entry);
880 if tokens_used + cost > token_budget {
881 break;
882 }
883 summary_text.push_str(&entry);
884 tokens_used += cost;
885 }
886
887 if tokens_used > tc.count_tokens(SUMMARY_PREFIX) {
888 Ok(Some(Message::from_parts(
889 Role::System,
890 vec![MessagePart::Summary { text: summary_text }],
891 )))
892 } else {
893 Ok(None)
894 }
895}
896
897#[tracing::instrument(name = "agent_context.helpers.fetch_cross_session_raw", skip_all, err)]
905pub async fn fetch_cross_session_raw(
906 memory: Option<&zeph_memory::semantic::SemanticMemory>,
907 conversation_id: Option<zeph_memory::ConversationId>,
908 cross_session_score_threshold: f32,
909 query: &str,
910 token_budget: usize,
911 tc: &TokenCounter,
912) -> Result<Option<Message>, zeph_memory::MemoryError> {
913 let (Some(memory), Some(cid)) = (memory, conversation_id) else {
914 return Ok(None);
915 };
916 if token_budget == 0 {
917 return Ok(None);
918 }
919
920 let results: Vec<_> = memory
921 .search_session_summaries(query, 5, Some(cid))
922 .await?
923 .into_iter()
924 .filter(|r| r.score >= cross_session_score_threshold)
925 .collect();
926 if results.is_empty() {
927 return Ok(None);
928 }
929
930 let mut text = String::from(CROSS_SESSION_PREFIX);
931 let mut tokens_used = tc.count_tokens(&text);
932
933 for item in &results {
934 let entry = format!("- {}\n", item.summary_text);
935 let cost = tc.count_tokens(&entry);
936 if tokens_used + cost > token_budget {
937 break;
938 }
939 text.push_str(&entry);
940 tokens_used += cost;
941 }
942
943 if tokens_used > tc.count_tokens(CROSS_SESSION_PREFIX) {
944 Ok(Some(Message::from_parts(
945 Role::System,
946 vec![MessagePart::CrossSession { text }],
947 )))
948 } else {
949 Ok(None)
950 }
951}
952
953#[tracing::instrument(name = "agent_context.helpers.fetch_semantic_recall", skip_all, err)]
967pub async fn fetch_semantic_recall(
968 view: &ContextAssemblyView<'_>,
969 query: &str,
970 token_budget: usize,
971 tc: &TokenCounter,
972 router: Option<&dyn zeph_memory::AsyncMemoryRouter>,
973) -> Result<(Option<Message>, Option<f32>), ContextError> {
974 fetch_semantic_recall_raw(
975 view.memory.as_deref(),
976 view.recall_limit,
977 view.context_format,
978 query,
979 token_budget,
980 tc,
981 router,
982 None,
983 )
984 .await
985 .map_err(ContextError::Memory)
986}
987
988fn format_plain_recall_entry(item: &zeph_memory::RecalledMessage) -> String {
989 let role_label = match item.message.role {
990 Role::Assistant => "assistant",
991 Role::System => "system",
992 Role::User | _ => "user",
993 };
994 format!("- [{}] {}\n", role_label, item.message.content)
995}
996
997#[allow(clippy::map_unwrap_or)]
998fn format_structured_recall_entry(item: &zeph_memory::RecalledMessage) -> String {
999 let source = match item.message.role {
1000 Role::Assistant => "assistant",
1001 Role::System => "system",
1002 Role::User | _ => "user",
1003 };
1004 let date = item
1009 .message
1010 .metadata
1011 .compacted_at
1012 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0))
1013 .map(|dt| dt.format("%Y-%m-%d").to_string())
1014 .unwrap_or_else(|| "unknown".to_owned());
1015 format!(
1016 "[Memory | {} | {} | relevance: {:.2}]\n{}\n",
1017 source, date, item.score, item.message.content
1018 )
1019}
1020
1021#[tracing::instrument(name = "agent_context.helpers.fetch_summaries", skip_all, err)]
1032pub async fn fetch_summaries(
1033 view: &ContextAssemblyView<'_>,
1034 token_budget: usize,
1035 tc: &TokenCounter,
1036) -> Result<Option<Message>, ContextError> {
1037 fetch_summaries_raw(
1038 view.memory.as_deref(),
1039 view.conversation_id,
1040 token_budget,
1041 tc,
1042 )
1043 .await
1044 .map_err(ContextError::Memory)
1045}
1046
1047#[tracing::instrument(name = "agent_context.helpers.fetch_cross_session", skip_all, err)]
1061pub async fn fetch_cross_session(
1062 view: &ContextAssemblyView<'_>,
1063 query: &str,
1064 token_budget: usize,
1065 tc: &TokenCounter,
1066) -> Result<Option<Message>, ContextError> {
1067 fetch_cross_session_raw(
1068 view.memory.as_deref(),
1069 view.conversation_id,
1070 view.cross_session_score_threshold,
1071 query,
1072 token_budget,
1073 tc,
1074 )
1075 .await
1076 .map_err(ContextError::Memory)
1077}
1078
1079pub struct BudgetHint {
1087 pub remaining_cost_cents: Option<f64>,
1089 pub total_budget_cents: Option<f64>,
1091 pub remaining_tool_calls: usize,
1093 pub max_tool_calls: usize,
1095}
1096
1097impl BudgetHint {
1098 #[must_use]
1119 pub fn format_xml(&self) -> Option<String> {
1120 let has_cost = self.remaining_cost_cents.is_some();
1121 if !has_cost && self.max_tool_calls == 0 {
1123 return None;
1124 }
1125 let mut s = String::from("<budget>");
1126 if let Some(remaining) = self.remaining_cost_cents {
1127 let _ = write!(
1128 s,
1129 "\n<remaining_cost_cents>{remaining:.2}</remaining_cost_cents>"
1130 );
1131 }
1132 if let Some(total) = self.total_budget_cents {
1133 let _ = write!(s, "\n<total_budget_cents>{total:.2}</total_budget_cents>");
1134 }
1135 if self.max_tool_calls > 0 {
1136 let _ = write!(
1137 s,
1138 "\n<remaining_tool_calls>{}</remaining_tool_calls>",
1139 self.remaining_tool_calls
1140 );
1141 let _ = write!(
1142 s,
1143 "\n<max_tool_calls>{}</max_tool_calls>",
1144 self.max_tool_calls
1145 );
1146 }
1147 s.push_str("\n</budget>");
1148 Some(s)
1149 }
1150}
1151
1152#[cfg(test)]
1153mod budget_hint_tests {
1154 use super::*;
1155
1156 #[test]
1157 fn format_xml_none_when_no_data() {
1158 let hint = BudgetHint {
1159 remaining_cost_cents: None,
1160 total_budget_cents: None,
1161 remaining_tool_calls: 0,
1162 max_tool_calls: 0,
1163 };
1164 assert!(hint.format_xml().is_none());
1165 }
1166
1167 #[test]
1168 fn format_xml_with_cost_only() {
1169 let hint = BudgetHint {
1170 remaining_cost_cents: Some(25.5),
1171 total_budget_cents: Some(100.0),
1172 remaining_tool_calls: 0,
1173 max_tool_calls: 0,
1174 };
1175 let xml = hint.format_xml().unwrap();
1176 assert!(xml.contains("<remaining_cost_cents>25.50</remaining_cost_cents>"));
1177 assert!(xml.contains("<total_budget_cents>100.00</total_budget_cents>"));
1178 }
1179
1180 #[test]
1181 fn format_xml_with_tool_calls_only() {
1182 let hint = BudgetHint {
1183 remaining_cost_cents: None,
1184 total_budget_cents: None,
1185 remaining_tool_calls: 3,
1186 max_tool_calls: 10,
1187 };
1188 let xml = hint.format_xml().unwrap();
1189 assert!(xml.contains("<remaining_tool_calls>3</remaining_tool_calls>"));
1190 assert!(xml.contains("<max_tool_calls>10</max_tool_calls>"));
1191 }
1192
1193 #[test]
1194 fn format_xml_with_all_fields() {
1195 let hint = BudgetHint {
1196 remaining_cost_cents: Some(50.0),
1197 total_budget_cents: Some(100.0),
1198 remaining_tool_calls: 8,
1199 max_tool_calls: 10,
1200 };
1201 let xml = hint.format_xml().unwrap();
1202 assert!(xml.starts_with("<budget>"));
1203 assert!(xml.ends_with("</budget>"));
1204 }
1205}