1use zeph_context::budget::ContextBudget;
7use zeph_llm::LlmProvider;
8use zeph_llm::provider::{Message, MessagePart, Role};
9
10use crate::error::ContextError;
11use crate::helpers::{
12 CODE_CONTEXT_PREFIX, CORRECTIONS_PREFIX, CROSS_SESSION_PREFIX, DOCUMENT_RAG_PREFIX,
13 GRAPH_FACTS_PREFIX, LSP_NOTE_PREFIX, PERSONA_PREFIX, REASONING_PREFIX, RECALL_PREFIX,
14 SESSION_DIGEST_PREFIX, SUMMARY_PREFIX, TRAJECTORY_PREFIX, TREE_MEMORY_PREFIX,
15};
16use crate::state::{
17 ContextAssemblyView, ContextDelta, ContextSummarizationView, MessageWindowView,
18 ProviderHandles, StatusSink,
19};
20
21pub struct SemanticRecallParams<'a> {
29 pub query: &'a str,
31 pub token_budget: usize,
33 pub recall_limit: usize,
35 pub context_format: zeph_config::ContextFormat,
37 pub conversation_id: Option<zeph_memory::ConversationId>,
39 pub tiered_classifier: Option<&'a std::sync::Arc<zeph_llm::any::AnyProvider>>,
41 pub tiered_validator: Option<&'a std::sync::Arc<zeph_llm::any::AnyProvider>>,
43 pub tiered_config: &'a zeph_config::memory::TieredRetrievalConfig,
45}
46
47#[derive(Debug, Default)]
65pub struct ContextService;
66
67impl ContextService {
68 #[must_use]
72 pub fn new() -> Self {
73 Self
74 }
75
76 pub fn clear_history(&self, window: &mut MessageWindowView<'_>) {
84 let system_prompt = window.messages.first().cloned();
85 window.messages.clear();
86 if let Some(sp) = system_prompt {
87 window.messages.push(sp);
88 }
89 window.completed_tool_ids.clear();
90 recompute_prompt_tokens(window);
91 }
92
93 pub fn remove_recall_messages(&self, window: &mut MessageWindowView<'_>) {
95 remove_by_part_or_prefix(window.messages, RECALL_PREFIX, |p| {
96 matches!(p, MessagePart::Recall { .. })
97 });
98 }
99
100 pub fn remove_correction_messages(&self, window: &mut MessageWindowView<'_>) {
102 remove_by_prefix(window.messages, Role::System, CORRECTIONS_PREFIX);
103 }
104
105 pub fn remove_graph_facts_messages(&self, window: &mut MessageWindowView<'_>) {
107 remove_by_prefix(window.messages, Role::System, GRAPH_FACTS_PREFIX);
108 }
109
110 pub fn remove_persona_facts_messages(&self, window: &mut MessageWindowView<'_>) {
112 remove_by_prefix(window.messages, Role::System, PERSONA_PREFIX);
113 }
114
115 pub fn remove_trajectory_hints_messages(&self, window: &mut MessageWindowView<'_>) {
117 remove_by_prefix(window.messages, Role::System, TRAJECTORY_PREFIX);
118 }
119
120 pub fn remove_tree_memory_messages(&self, window: &mut MessageWindowView<'_>) {
122 remove_by_prefix(window.messages, Role::System, TREE_MEMORY_PREFIX);
123 }
124
125 pub fn remove_reasoning_strategies_messages(&self, window: &mut MessageWindowView<'_>) {
127 remove_by_prefix(window.messages, Role::System, REASONING_PREFIX);
128 }
129
130 pub fn remove_lsp_messages(&self, window: &mut MessageWindowView<'_>) {
135 remove_by_prefix(window.messages, Role::System, LSP_NOTE_PREFIX);
136 }
137
138 pub fn remove_code_context_messages(&self, window: &mut MessageWindowView<'_>) {
140 remove_by_part_or_prefix(window.messages, CODE_CONTEXT_PREFIX, |p| {
141 matches!(p, MessagePart::CodeContext { .. })
142 });
143 }
144
145 pub fn remove_summary_messages(&self, window: &mut MessageWindowView<'_>) {
147 remove_by_part_or_prefix(window.messages, SUMMARY_PREFIX, |p| {
148 matches!(p, MessagePart::Summary { .. })
149 });
150 }
151
152 pub fn remove_cross_session_messages(&self, window: &mut MessageWindowView<'_>) {
154 remove_by_part_or_prefix(window.messages, CROSS_SESSION_PREFIX, |p| {
155 matches!(p, MessagePart::CrossSession { .. })
156 });
157 }
158
159 pub fn remove_session_digest_message(&self, window: &mut MessageWindowView<'_>) {
161 remove_by_prefix(window.messages, Role::User, SESSION_DIGEST_PREFIX);
162 }
163
164 pub fn remove_document_rag_messages(&self, window: &mut MessageWindowView<'_>) {
166 remove_by_prefix(window.messages, Role::System, DOCUMENT_RAG_PREFIX);
167 }
168
169 pub fn trim_messages_to_budget(&self, window: &mut MessageWindowView<'_>, token_budget: usize) {
177 if token_budget == 0 {
178 return;
179 }
180
181 let history_start = window
183 .messages
184 .iter()
185 .position(|m| m.role != Role::System)
186 .unwrap_or(window.messages.len());
187
188 if history_start >= window.messages.len() {
189 return;
190 }
191
192 let mut total = 0usize;
193 let mut keep_from = window.messages.len();
194
195 for i in (history_start..window.messages.len()).rev() {
196 let msg_tokens = window
197 .token_counter
198 .count_message_tokens(&window.messages[i]);
199 if total + msg_tokens > token_budget {
200 break;
201 }
202 total += msg_tokens;
203 keep_from = i;
204 }
205
206 if keep_from > history_start {
207 let removed = keep_from - history_start;
208 window.messages.drain(history_start..keep_from);
209 recompute_prompt_tokens(window);
210 tracing::info!(
211 removed,
212 token_budget,
213 "trimmed messages to fit context budget"
214 );
215 }
216 }
217
218 pub async fn inject_semantic_recall(
230 &self,
231 query: &str,
232 token_budget: usize,
233 window: &mut MessageWindowView<'_>,
234 view: &ContextAssemblyView<'_>,
235 ) -> Result<(), ContextError> {
236 self.remove_recall_messages(window);
237
238 let params = SemanticRecallParams {
239 query,
240 token_budget,
241 recall_limit: view.recall_limit,
242 context_format: view.context_format,
243 conversation_id: view.conversation_id,
244 tiered_classifier: view.tiered_retrieval_classifier.as_ref(),
245 tiered_validator: view.tiered_retrieval_validator.as_ref(),
246 tiered_config: &view.tiered_retrieval_config,
247 };
248 let msg = self
249 .run_tiered_recall(¶ms, window, view.memory.as_deref())
250 .await?;
251
252 if let Some(msg) = msg
253 && window.messages.len() > 1
254 {
255 window.messages.insert(1, msg);
256 }
257
258 Ok(())
259 }
260
261 pub async fn inject_semantic_recall_bare(
273 &self,
274 params: SemanticRecallParams<'_>,
275 window: &mut MessageWindowView<'_>,
276 memory: Option<&zeph_memory::semantic::SemanticMemory>,
277 ) -> Result<(), ContextError> {
278 self.remove_recall_messages(window);
279
280 let msg = self.run_tiered_recall(¶ms, window, memory).await?;
281
282 if let Some(msg) = msg
283 && window.messages.len() > 1
284 {
285 window.messages.insert(1, msg);
286 }
287
288 Ok(())
289 }
290
291 async fn run_tiered_recall(
296 &self,
297 params: &SemanticRecallParams<'_>,
298 window: &MessageWindowView<'_>,
299 memory: Option<&zeph_memory::semantic::SemanticMemory>,
300 ) -> Result<Option<Message>, ContextError> {
301 if params.tiered_config.enabled {
302 use tracing::Instrument as _;
303 let Some(mem) = memory else {
304 return Ok(None);
305 };
306 let result = tokio::time::timeout(
307 std::time::Duration::from_secs(30),
308 zeph_memory::recall_tiered(
309 mem,
310 params.query,
311 params.conversation_id,
312 params.tiered_classifier,
313 params.tiered_validator,
314 params.tiered_config,
315 Some(params.token_budget),
316 )
317 .instrument(tracing::info_span!("agent_context.tiered_retrieval.recall")),
318 )
319 .await
320 .map_err(|_| {
321 tracing::warn!("tiered_retrieval: recall_tiered timed out after 30s");
322 ContextError::Memory(zeph_memory::MemoryError::Timeout(
323 "recall_tiered timed out".to_owned(),
324 ))
325 })?
326 .map_err(ContextError::Memory)?;
327
328 tracing::debug!(
329 intent = %result.intent,
330 tokens_used = result.tokens_used,
331 tier_escalated = result.tier_escalated,
332 count = result.messages.len(),
333 "tiered_retrieval: recall complete"
334 );
335
336 if result.messages.is_empty() {
337 return Ok(None);
338 }
339
340 let recalled_text = result
341 .messages
342 .iter()
343 .map(|m| m.message.content.as_str())
344 .collect::<Vec<_>>()
345 .join("\n---\n");
346 Ok(Some(Message::from_legacy(
347 Role::User,
348 format!("{RECALL_PREFIX}{recalled_text}"),
349 )))
350 } else {
351 let (msg, _score) = crate::helpers::fetch_semantic_recall_raw(
352 memory,
353 params.recall_limit,
354 params.context_format,
355 params.query,
356 params.token_budget,
357 &window.token_counter,
358 None,
359 None,
360 )
361 .await?;
362 Ok(msg)
363 }
364 }
365
366 pub async fn inject_cross_session_context(
375 &self,
376 query: &str,
377 token_budget: usize,
378 window: &mut MessageWindowView<'_>,
379 view: &ContextAssemblyView<'_>,
380 ) -> Result<(), ContextError> {
381 self.remove_cross_session_messages(window);
382
383 if let Some(msg) = crate::helpers::fetch_cross_session_raw(
384 view.memory.as_deref(),
385 view.conversation_id,
386 view.cross_session_score_threshold,
387 query,
388 token_budget,
389 &view.token_counter,
390 )
391 .await?
392 && window.messages.len() > 1
393 {
394 window.messages.insert(1, msg);
395 tracing::debug!("injected cross-session context");
396 }
397
398 Ok(())
399 }
400
401 pub async fn inject_summaries(
410 &self,
411 token_budget: usize,
412 window: &mut MessageWindowView<'_>,
413 view: &ContextAssemblyView<'_>,
414 ) -> Result<(), ContextError> {
415 self.remove_summary_messages(window);
416
417 if let Some(msg) = crate::helpers::fetch_summaries_raw(
418 view.memory.as_deref(),
419 view.conversation_id,
420 token_budget,
421 &view.token_counter,
422 )
423 .await?
424 && window.messages.len() > 1
425 {
426 window.messages.insert(1, msg);
427 tracing::debug!("injected summaries into context");
428 }
429
430 Ok(())
431 }
432
433 pub async fn disambiguate_skills(
438 &self,
439 query: &str,
440 all_meta: &[&zeph_skills::loader::SkillMeta],
441 scored: &[zeph_skills::ScoredMatch],
442 providers: &ProviderHandles,
443 ) -> Option<Vec<usize>> {
444 use std::fmt::Write as _;
445
446 let mut candidates = String::new();
447 for sm in scored {
448 if let Some(meta) = all_meta.get(sm.index) {
449 let _ = writeln!(
450 candidates,
451 "- {} (score: {:.3}): {}",
452 meta.name, sm.score, meta.description
453 );
454 }
455 }
456
457 let prompt = format!(
458 "The user said: \"{query}\"\n\n\
459 These skills matched with similar scores:\n{candidates}\n\
460 Which skill best matches the user's intent? \
461 Return the skill_name, your confidence (0-1), and any extracted parameters."
462 );
463
464 let messages = vec![zeph_llm::provider::Message::from_legacy(
465 zeph_llm::provider::Role::User,
466 prompt,
467 )];
468 match providers
469 .disambiguate
470 .chat_typed::<zeph_skills::IntentClassification>(&messages)
471 .await
472 {
473 Ok(classification) => {
474 tracing::info!(
475 skill = %classification.skill_name,
476 confidence = classification.confidence,
477 "disambiguation selected skill"
478 );
479 let mut indices: Vec<usize> = scored.iter().map(|s| s.index).collect();
480 if let Some(pos) = indices.iter().position(|&i| {
481 all_meta
482 .get(i)
483 .is_some_and(|m| m.name == classification.skill_name)
484 }) {
485 indices.swap(0, pos);
486 }
487 Some(indices)
488 }
489 Err(e) => {
490 tracing::warn!("disambiguation failed, using original order: {e:#}");
491 None
492 }
493 }
494 }
495
496 #[allow(clippy::too_many_lines)] pub async fn prepare_context(
509 &self,
510 query: &str,
511 window: &mut MessageWindowView<'_>,
512 view: &mut ContextAssemblyView<'_>,
513 ) -> Result<ContextDelta, ContextError> {
514 if view.context_manager.budget.is_none() {
515 return Ok(ContextDelta::default());
516 }
517
518 self.remove_session_digest_message(window);
520 self.remove_summary_messages(window);
521 self.remove_cross_session_messages(window);
522 self.remove_recall_messages(window);
523 self.remove_document_rag_messages(window);
524 self.remove_correction_messages(window);
525 self.remove_code_context_messages(window);
526 self.remove_graph_facts_messages(window);
527 self.remove_persona_facts_messages(window);
528 self.remove_trajectory_hints_messages(window);
529 self.remove_tree_memory_messages(window);
530 if view.reasoning_config.enabled {
531 self.remove_reasoning_strategies_messages(window);
532 }
533
534 if let Some(explorer) = view.proactive_explorer.clone()
536 && let Some(domain) = explorer.classify(query)
537 {
538 let already_known = {
539 let registry_guard = view.skill_registry.read();
540 explorer.has_knowledge(®istry_guard, &domain)
541 };
542 let excluded = explorer.is_excluded(&domain);
543
544 if !already_known && !excluded {
545 tracing::debug!(domain = %domain.0, query_len = query.len(), "proactive.explore triggered");
546 let timeout_ms = explorer.timeout_ms();
547 let result = tokio::time::timeout(
548 std::time::Duration::from_millis(timeout_ms),
549 explorer.explore(&domain),
550 )
551 .await;
552 match result {
553 Ok(Ok(())) => {
554 view.skill_registry.write().reload(view.skill_paths);
555 tracing::debug!(domain = %domain.0, "proactive.explore complete, registry reloaded");
556 }
557 Ok(Err(e)) => {
558 tracing::warn!(domain = %domain.0, error = %e, "proactive exploration failed");
559 }
560 Err(_) => {
561 tracing::warn!(domain = %domain.0, timeout_ms, "proactive exploration timed out");
562 }
563 }
564 }
565 }
566
567 let active_levels: &'static [zeph_memory::compression::CompressionLevel] =
569 if let Some(ref budget) = view.context_manager.budget {
570 let used = view.cached_prompt_tokens;
571 let max = budget.max_tokens();
572 #[allow(clippy::cast_precision_loss)]
573 let remaining_ratio = if max == 0 {
574 1.0_f32
575 } else {
576 1.0 - (used as f32 / max as f32).clamp(0.0, 1.0)
577 };
578 let levels =
579 zeph_memory::compression::RetrievalPolicy::default().select(remaining_ratio);
580 tracing::debug!(
581 remaining_ratio,
582 active_levels = ?levels,
583 "compression_spectrum: retrieval policy selected"
584 );
585 levels
586 } else {
587 &[]
588 };
589
590 let memory_backend: Option<std::sync::Arc<dyn zeph_common::memory::ContextMemoryBackend>> =
591 view.memory.clone().map(
592 |m| -> std::sync::Arc<dyn zeph_common::memory::ContextMemoryBackend> {
593 std::sync::Arc::new(crate::memory_backend::SemanticMemoryBackend::new(m))
594 },
595 );
596
597 let memory_view = zeph_context::input::ContextMemoryView {
598 memory: memory_backend,
599 conversation_id: view.conversation_id.map(|c| c.0),
600 recall_limit: view.recall_limit,
601 cross_session_score_threshold: view.cross_session_score_threshold,
602 context_strategy: view.context_strategy,
603 crossover_turn_threshold: view.crossover_turn_threshold,
604 cached_session_digest: view.cached_session_digest.clone(),
605 graph_config: view.graph_config.clone(),
606 document_config: view.document_config.clone(),
607 persona_config: view.persona_config.clone(),
608 trajectory_config: view.trajectory_config.clone(),
609 reasoning_config: view.reasoning_config.clone(),
610 memcot_config: view.memcot_config.clone(),
611 memcot_state: view.memcot_state.clone(),
612 tree_config: view.tree_config.clone(),
613 };
614
615 #[cfg(feature = "index")]
616 let index_access = view.index;
617 #[cfg(not(feature = "index"))]
618 let index_access: Option<&dyn zeph_context::input::IndexAccess> = None;
619
620 let router = crate::memory_backend::build_memory_router(view.context_manager);
621
622 let input = zeph_context::input::ContextAssemblyInput {
623 memory: &memory_view,
624 context_manager: view.context_manager,
625 token_counter: &*view.token_counter,
626 skills_prompt: view.last_skills_prompt,
627 index: index_access,
628 correction_config: view.correction_config,
629 sidequest_turn_counter: view.sidequest_turn_counter,
630 messages: window.messages,
631 query,
632 scrub: view.scrub,
633 active_levels,
634 router,
635 };
636
637 let mut prepared = zeph_context::assembler::ContextAssembler::gather(&input).await?;
638
639 if view.tiered_retrieval_config.enabled {
644 prepared.recall = None;
645 }
646
647 let delta = self.apply_prepared_context(window, view, prepared).await;
648
649 if view.tiered_retrieval_config.enabled {
650 self.inject_semantic_recall(query, usize::MAX, window, view)
651 .await?;
652 }
653
654 Ok(delta)
655 }
656
657 #[allow(clippy::too_many_lines)] async fn apply_prepared_context(
666 &self,
667 window: &mut MessageWindowView<'_>,
668 view: &mut ContextAssemblyView<'_>,
669 prepared: zeph_context::assembler::PreparedContext,
670 ) -> ContextDelta {
671 use std::borrow::Cow;
672 use zeph_llm::provider::{Message, MessageMetadata, Role};
673 use zeph_sanitizer::{ContentSource, ContentSourceKind, MemorySourceHint};
674
675 *view.last_recall_confidence = prepared.recall_confidence;
677
678 if prepared.memory_first {
680 let history_start = 1usize;
681 let len = window.messages.len();
682 let keep_tail =
683 zeph_context::assembler::memory_first_keep_tail(window.messages, history_start);
684 if len > history_start + keep_tail {
685 window.messages.drain(history_start..len - keep_tail);
686 recompute_prompt_tokens(window);
687 tracing::debug!(
688 strategy = "memory_first",
689 keep_tail,
690 "dropped conversation history, kept last {keep_tail} messages"
691 );
692 }
693 }
694
695 if let Some(msg) = prepared.graph_facts.filter(|_| window.messages.len() > 1) {
697 let sanitized = self
698 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
699 .await;
700 window.messages.insert(1, sanitized);
701 tracing::debug!("injected knowledge graph facts into context");
702 }
703 if let Some(msg) = prepared.doc_rag.filter(|_| window.messages.len() > 1) {
704 let sanitized = self
705 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
706 .await;
707 window.messages.insert(1, sanitized);
708 tracing::debug!("injected document RAG context");
709 }
710 if let Some(msg) = prepared.corrections.filter(|_| window.messages.len() > 1) {
711 let sanitized = self
712 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
713 .await;
714 window.messages.insert(1, sanitized);
715 tracing::debug!("injected past corrections into context");
716 }
717 if let Some(msg) = prepared.recall.filter(|_| window.messages.len() > 1) {
718 let sanitized = self
719 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
720 .await;
721 window.messages.insert(1, sanitized);
722 }
723 if let Some(msg) = prepared.cross_session.filter(|_| window.messages.len() > 1) {
724 let sanitized = self
725 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
726 .await;
727 window.messages.insert(1, sanitized);
728 }
729 if let Some(msg) = prepared.summaries.filter(|_| window.messages.len() > 1) {
730 let sanitized = self
731 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
732 .await;
733 window.messages.insert(1, sanitized);
734 tracing::debug!("injected summaries into context");
735 }
736 if let Some(msg) = prepared.persona_facts.filter(|_| window.messages.len() > 1) {
737 let sanitized = self
738 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
739 .await;
740 window.messages.insert(1, sanitized);
741 tracing::debug!("injected persona facts into context");
742 }
743 if let Some(msg) = prepared
744 .trajectory_hints
745 .filter(|_| window.messages.len() > 1)
746 {
747 let sanitized = self
748 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
749 .await;
750 window.messages.insert(1, sanitized);
751 tracing::debug!("injected trajectory hints into context");
752 }
753 if let Some(msg) = prepared.tree_memory.filter(|_| window.messages.len() > 1) {
754 let sanitized = self
755 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
756 .await;
757 window.messages.insert(1, sanitized);
758 tracing::debug!("injected tree memory summary into context");
759 }
760 if let Some(msg) = prepared
761 .reasoning_hints
762 .filter(|_| window.messages.len() > 1)
763 {
764 let sanitized = self
765 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
766 .await;
767 window.messages.insert(1, sanitized);
768 tracing::debug!("injected reasoning strategies into context");
769 }
770
771 let code_context = if let Some(text) = prepared.code_context {
773 let sanitized = view
774 .sanitizer
775 .sanitize(&text, ContentSource::new(ContentSourceKind::ToolResult));
776 view.metrics.sanitizer_runs += 1;
777 if !sanitized.injection_flags.is_empty() {
778 tracing::warn!(
779 flags = sanitized.injection_flags.len(),
780 "injection patterns detected in code RAG context"
781 );
782 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
783 let detail = sanitized
784 .injection_flags
785 .first()
786 .map_or_else(String::new, |f| {
787 format!("Detected pattern: {}", f.pattern_name)
788 });
789 view.security_events.push(
790 zeph_common::SecurityEventCategory::InjectionFlag,
791 "code_rag",
792 detail,
793 );
794 }
795 if sanitized.was_truncated {
796 view.metrics.sanitizer_truncations += 1;
797 view.security_events.push(
798 zeph_common::SecurityEventCategory::Truncation,
799 "code_rag",
800 "Content truncated to max_content_size".to_string(),
801 );
802 }
803 Some(sanitized.body)
804 } else {
805 None
806 };
807
808 if !prepared.memory_first {
809 self.trim_messages_to_budget(window, prepared.recent_history_budget);
810 }
811
812 if view.digest_enabled
814 && let Some((digest_text, _)) = view
815 .cached_session_digest
816 .clone()
817 .filter(|_| window.messages.len() > 1)
818 {
819 let digest_msg = Message {
820 role: Role::User,
821 content: format!("{}{digest_text}", crate::helpers::SESSION_DIGEST_PREFIX),
822 parts: vec![],
823 metadata: MessageMetadata::default(),
824 };
825 let sanitized = self
826 .sanitize_memory_message(digest_msg, MemorySourceHint::LlmSummary, view)
827 .await;
828 window.messages.insert(1, sanitized);
829 tracing::debug!("injected session digest into context");
830 }
831
832 if view.redact_credentials {
834 for msg in &mut *window.messages {
835 if msg.role == Role::System {
836 continue;
837 }
838 if let Cow::Owned(s) = (view.scrub)(&msg.content) {
839 msg.content = s;
840 }
841 }
842 }
843
844 recompute_prompt_tokens(window);
845
846 ContextDelta { code_context }
847 }
848
849 async fn sanitize_memory_message(
859 &self,
860 mut msg: zeph_llm::provider::Message,
861 hint: zeph_sanitizer::MemorySourceHint,
862 view: &mut ContextAssemblyView<'_>,
863 ) -> zeph_llm::provider::Message {
864 use zeph_sanitizer::{ContentSource, ContentSourceKind};
865
866 let source = ContentSource::new(ContentSourceKind::MemoryRetrieval).with_memory_hint(hint);
867 let sanitized = view.sanitizer.sanitize(&msg.content, source);
868 view.metrics.sanitizer_runs += 1;
869 if !sanitized.injection_flags.is_empty() {
870 tracing::warn!(
871 flags = sanitized.injection_flags.len(),
872 "injection patterns detected in memory retrieval"
873 );
874 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
875 let detail = sanitized
876 .injection_flags
877 .first()
878 .map_or_else(String::new, |f| {
879 format!("Detected pattern: {}", f.pattern_name)
880 });
881 view.security_events.push(
882 zeph_common::SecurityEventCategory::InjectionFlag,
883 "memory_retrieval",
884 detail,
885 );
886 }
887 if sanitized.was_truncated {
888 view.metrics.sanitizer_truncations += 1;
889 view.security_events.push(
890 zeph_common::SecurityEventCategory::Truncation,
891 "memory_retrieval",
892 "Content truncated to max_content_size".to_string(),
893 );
894 }
895
896 if view.sanitizer.is_enabled()
898 && let Some(qs) = view.quarantine_summarizer
899 && qs.should_quarantine(ContentSourceKind::MemoryRetrieval)
900 {
901 match qs.extract_facts(&sanitized, view.sanitizer).await {
902 Ok((facts, flags)) => {
903 view.metrics.quarantine_invocations += 1;
904 view.security_events.push(
905 zeph_common::SecurityEventCategory::Quarantine,
906 "memory_retrieval",
907 "Content quarantined, facts extracted".to_string(),
908 );
909 let escaped = zeph_sanitizer::ContentSanitizer::escape_delimiter_tags(&facts);
910 msg.content = zeph_sanitizer::ContentSanitizer::apply_spotlight(
911 &escaped,
912 &sanitized.source,
913 &flags,
914 );
915 return msg;
916 }
917 Err(e) => {
918 tracing::warn!(
919 error = %e,
920 "quarantine failed for memory retrieval, using original sanitized content"
921 );
922 view.metrics.quarantine_failures += 1;
923 view.security_events.push(
924 zeph_common::SecurityEventCategory::Quarantine,
925 "memory_retrieval",
926 format!("Quarantine failed: {e}"),
927 );
928 }
929 }
930 }
931
932 msg.content = sanitized.body;
933 msg
934 }
935
936 pub async fn reset_conversation(
946 &self,
947 window: &mut MessageWindowView<'_>,
948 _view: &mut ContextAssemblyView<'_>,
949 ) -> Result<(), ContextError> {
950 self.clear_history(window);
951 Ok(())
952 }
953
954 #[allow(
971 clippy::cast_precision_loss,
972 clippy::cast_possible_truncation,
973 clippy::cast_sign_loss
974 )]
975 pub async fn maybe_compact(
976 &self,
977 summ: &mut ContextSummarizationView<'_>,
978 status: &(impl StatusSink + ?Sized),
979 ) -> Result<(), ContextError> {
980 use zeph_context::manager::{CompactionState, CompactionTier};
981
982 if let Some(count) = summ.context_manager.turns_since_last_hard_compaction_mut() {
984 *count += 1;
985 }
986
987 if let CompactionState::Exhausted { warned } = summ.context_manager.compaction_state()
989 && !warned
990 {
991 summ.context_manager
992 .set_compaction_state(CompactionState::Exhausted { warned: true });
993 tracing::warn!("compaction exhausted: context budget too tight for this session");
994 }
995 if summ.context_manager.compaction_state().is_exhausted() {
996 return Ok(());
997 }
998
999 if summ.server_compaction_active {
1001 let budget = summ
1002 .context_manager
1003 .budget
1004 .as_ref()
1005 .map_or(0, ContextBudget::max_tokens);
1006 if budget > 0 {
1007 let fallback = (budget * 95 / 100) as u64;
1008 if *summ.cached_prompt_tokens < fallback {
1009 return Ok(());
1010 }
1011 tracing::warn!(
1012 "server compaction active but context at 95%+ — falling back to client-side"
1013 );
1014 } else {
1015 return Ok(());
1016 }
1017 }
1018
1019 if summ
1021 .context_manager
1022 .compaction_state()
1023 .is_compacted_this_turn()
1024 {
1025 return Ok(());
1026 }
1027
1028 let in_cooldown = summ.context_manager.compaction_state().cooldown_remaining() > 0;
1030 if in_cooldown
1031 && let CompactionState::Cooling { turns_remaining } =
1032 summ.context_manager.compaction_state()
1033 {
1034 let next = turns_remaining - 1;
1035 summ.context_manager.set_compaction_state(if next == 0 {
1036 CompactionState::Ready
1037 } else {
1038 CompactionState::Cooling {
1039 turns_remaining: next,
1040 }
1041 });
1042 }
1043
1044 match summ
1045 .context_manager
1046 .compaction_tier(*summ.cached_prompt_tokens)
1047 {
1048 CompactionTier::None => Ok(()),
1049 CompactionTier::Soft => {
1050 self.do_soft_compaction(summ, status).await;
1051 Ok(())
1052 }
1053 CompactionTier::Hard => self.do_hard_compaction(summ, status, in_cooldown).await,
1054 }
1055 }
1056
1057 #[allow(
1062 clippy::cast_precision_loss,
1063 clippy::cast_possible_truncation,
1064 clippy::cast_sign_loss
1065 )]
1066 async fn do_soft_compaction(
1067 &self,
1068 summ: &mut ContextSummarizationView<'_>,
1069 status: &(impl StatusSink + ?Sized),
1070 ) {
1071 status.send_status("soft compacting context...").await;
1072
1073 match &summ.context_manager.compression.pruning_strategy {
1075 zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {
1076 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
1077 }
1078 _ => crate::summarization::scheduling::maybe_refresh_task_goal(summ),
1079 }
1080
1081 let applied = crate::summarization::deferred::apply_deferred_summaries(summ);
1083
1084 if applied > 0
1086 && summ
1087 .context_manager
1088 .compression
1089 .pruning_strategy
1090 .is_subgoal()
1091 {
1092 summ.subgoal_registry
1093 .rebuild_after_compaction(summ.messages, 0);
1094 }
1095
1096 let budget = summ
1098 .context_manager
1099 .budget
1100 .as_ref()
1101 .map_or(0, ContextBudget::max_tokens);
1102 let soft_threshold =
1103 (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
1104 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
1105 let min_to_free = cached.saturating_sub(soft_threshold);
1106 if min_to_free > 0 {
1107 crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
1108 }
1109
1110 status.send_status("").await;
1111 tracing::info!(
1112 cached_tokens = *summ.cached_prompt_tokens,
1113 soft_threshold,
1114 "soft compaction complete"
1115 );
1116 }
1117
1118 #[allow(
1120 clippy::cast_precision_loss,
1121 clippy::cast_possible_truncation,
1122 clippy::cast_sign_loss
1123 )]
1124 async fn do_hard_compaction(
1125 &self,
1126 summ: &mut ContextSummarizationView<'_>,
1127 status: &(impl StatusSink + ?Sized),
1128 in_cooldown: bool,
1129 ) -> Result<(), ContextError> {
1130 use zeph_context::manager::CompactionState;
1131
1132 let turns_since_last = summ
1134 .context_manager
1135 .turns_since_last_hard_compaction()
1136 .map(|t| u32::try_from(t).unwrap_or(u32::MAX));
1137 summ.context_manager
1138 .set_turns_since_last_hard_compaction(Some(0));
1139 if let Some(metrics) = summ.metrics {
1140 metrics.record_hard_compaction(turns_since_last);
1141 }
1142
1143 if in_cooldown {
1144 tracing::debug!(
1145 turns_remaining = summ.context_manager.compaction_state().cooldown_remaining(),
1146 "hard compaction skipped: cooldown active"
1147 );
1148 return Ok(());
1149 }
1150
1151 let budget = summ
1152 .context_manager
1153 .budget
1154 .as_ref()
1155 .map_or(0, ContextBudget::max_tokens);
1156 let hard_threshold =
1157 (budget as f32 * summ.context_manager.hard_compaction_threshold) as usize;
1158 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
1159 let min_to_free = cached.saturating_sub(hard_threshold);
1160
1161 status.send_status("compacting context...").await;
1162
1163 crate::summarization::deferred::apply_deferred_summaries(summ);
1165
1166 let freed = crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
1168 if freed >= min_to_free {
1169 tracing::info!(freed, "hard compaction: pruning sufficient");
1170 summ.context_manager
1171 .set_compaction_state(CompactionState::CompactedThisTurn {
1172 cooldown: summ.context_manager.compaction_cooldown_turns(),
1173 });
1174 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1175 tracing::warn!(%e, "flush_deferred_summaries failed after hard compaction");
1176 }
1177 status.send_status("").await;
1178 return Ok(());
1179 }
1180
1181 let preserve_tail = summ.context_manager.compaction_preserve_tail;
1183 let compactable = summ.messages.len().saturating_sub(preserve_tail + 1);
1184 if compactable <= 1 {
1185 tracing::warn!(
1186 compactable,
1187 "hard compaction: too few messages, marking exhausted"
1188 );
1189 summ.context_manager
1190 .set_compaction_state(CompactionState::Exhausted { warned: false });
1191 status.send_status("").await;
1192 return Ok(());
1193 }
1194
1195 tracing::info!(
1197 min_to_free,
1198 "hard compaction: falling back to LLM summarization"
1199 );
1200 let tokens_before = *summ.cached_prompt_tokens;
1201 let outcome = crate::summarization::compaction::compact_context(summ, None).await?;
1202
1203 let freed_tokens = tokens_before.saturating_sub(*summ.cached_prompt_tokens);
1204
1205 if !outcome.is_compacted() || freed_tokens == 0 {
1206 tracing::warn!("hard compaction: no net reduction, marking exhausted");
1207 summ.context_manager
1208 .set_compaction_state(CompactionState::Exhausted { warned: false });
1209 status.send_status("").await;
1210 return Ok(());
1211 }
1212
1213 if matches!(
1214 summ.context_manager
1215 .compaction_tier(*summ.cached_prompt_tokens),
1216 zeph_context::manager::CompactionTier::Hard
1217 ) {
1218 tracing::warn!(
1219 freed_tokens,
1220 "hard compaction: still above hard threshold after compaction, marking exhausted"
1221 );
1222 summ.context_manager
1223 .set_compaction_state(CompactionState::Exhausted { warned: false });
1224 status.send_status("").await;
1225 return Ok(());
1226 }
1227
1228 summ.context_manager
1229 .set_compaction_state(CompactionState::CompactedThisTurn {
1230 cooldown: summ.context_manager.compaction_cooldown_turns(),
1231 });
1232
1233 if tokens_before > *summ.cached_prompt_tokens {
1234 tracing::info!(
1235 tokens_before,
1236 tokens_after = *summ.cached_prompt_tokens,
1237 saved = freed_tokens,
1238 "context compaction complete"
1239 );
1240 }
1241
1242 status.send_status("").await;
1243 Ok(())
1244 }
1245
1246 pub async fn maybe_summarize_tool_pair(
1252 &self,
1253 summ: &mut ContextSummarizationView<'_>,
1254 providers: &ProviderHandles,
1255 ) {
1256 crate::summarization::deferred::maybe_summarize_tool_pair(
1257 summ,
1258 providers,
1259 &TxStatusSink(summ.status_tx.clone()),
1260 )
1261 .await;
1262 }
1263
1264 #[must_use]
1269 pub fn apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) -> usize {
1270 crate::summarization::deferred::apply_deferred_summaries(summ)
1271 }
1272
1273 pub async fn flush_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1278 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1279 tracing::warn!(%e, "flush_deferred_summaries failed");
1280 }
1281 }
1282
1283 pub fn maybe_apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1289 crate::summarization::deferred::maybe_apply_deferred_summaries(summ);
1290 }
1291
1292 pub async fn compact_context(
1307 &self,
1308 summ: &mut ContextSummarizationView<'_>,
1309 max_summary_tokens: Option<usize>,
1310 ) -> Result<crate::state::CompactionOutcome, crate::error::ContextError> {
1311 crate::summarization::compaction::compact_context(summ, max_summary_tokens).await
1312 }
1313
1314 pub fn maybe_soft_compact_mid_iteration(&self, summ: &mut ContextSummarizationView<'_>) {
1320 crate::summarization::scheduling::maybe_soft_compact_mid_iteration(summ);
1321 }
1322
1323 pub async fn maybe_proactive_compress(
1329 &self,
1330 summ: &mut ContextSummarizationView<'_>,
1331 status: &(impl StatusSink + ?Sized),
1332 ) {
1333 let Some((_threshold, max_summary_tokens)) = summ
1334 .context_manager
1335 .should_proactively_compress(*summ.cached_prompt_tokens)
1336 else {
1337 return;
1338 };
1339
1340 if summ.server_compaction_active {
1341 let budget = summ
1342 .context_manager
1343 .budget
1344 .as_ref()
1345 .map_or(0, ContextBudget::max_tokens);
1346 if budget > 0 {
1347 let fallback = (budget * 95 / 100) as u64;
1348 if *summ.cached_prompt_tokens <= fallback {
1349 return;
1350 }
1351 tracing::warn!(
1352 cached_prompt_tokens = *summ.cached_prompt_tokens,
1353 fallback_threshold = fallback,
1354 "server compaction active but context at 95%+ — falling back to proactive"
1355 );
1356 } else {
1357 return;
1358 }
1359 }
1360
1361 status.send_status("compressing context...").await;
1362 tracing::info!(
1363 max_summary_tokens,
1364 cached_tokens = *summ.cached_prompt_tokens,
1365 "proactive compression triggered"
1366 );
1367
1368 match crate::summarization::compaction::compact_context(summ, Some(max_summary_tokens))
1369 .await
1370 {
1371 Ok(outcome) if outcome.is_compacted() => {
1372 summ.context_manager.set_compaction_state(
1373 zeph_context::manager::CompactionState::CompactedThisTurn { cooldown: 0 },
1374 );
1375 tracing::info!("proactive compression complete");
1376 }
1377 Ok(_) => {}
1378 Err(e) => tracing::warn!(%e, "proactive compression failed"),
1379 }
1380
1381 status.send_status("").await;
1382 }
1383
1384 pub fn maybe_refresh_task_goal(&self, summ: &mut ContextSummarizationView<'_>) {
1390 crate::summarization::scheduling::maybe_refresh_task_goal(summ);
1391 }
1392
1393 pub fn maybe_refresh_subgoal(&self, summ: &mut ContextSummarizationView<'_>) {
1398 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
1399 }
1400}
1401
1402struct TxStatusSink(Option<tokio::sync::mpsc::UnboundedSender<String>>);
1408
1409impl StatusSink for TxStatusSink {
1410 fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
1411 if let Some(ref tx) = self.0 {
1412 let _ = tx.send(msg.to_owned());
1413 }
1414 std::future::ready(())
1415 }
1416}
1417
1418pub(crate) fn recompute_prompt_tokens(window: &mut MessageWindowView<'_>) {
1425 *window.cached_prompt_tokens = window
1426 .messages
1427 .iter()
1428 .map(|m| window.token_counter.count_message_tokens(m) as u64)
1429 .sum();
1430}
1431
1432pub(crate) fn remove_by_prefix(
1438 messages: &mut Vec<zeph_llm::provider::Message>,
1439 role: Role,
1440 prefix: &str,
1441) {
1442 messages.retain(|m| m.role != role || !m.content.starts_with(prefix));
1443}
1444
1445pub(crate) fn remove_by_part_or_prefix(
1454 messages: &mut Vec<zeph_llm::provider::Message>,
1455 prefix: &str,
1456 part_matches: impl Fn(&MessagePart) -> bool,
1457) {
1458 messages.retain(|m| {
1459 if m.role == Role::User {
1462 return !m.content.starts_with(prefix);
1463 }
1464 if m.role != Role::System {
1465 return true;
1466 }
1467 if m.parts.first().is_some_and(&part_matches) {
1468 return false;
1469 }
1470 !m.content.starts_with(prefix)
1471 });
1472}
1473
1474#[cfg(test)]
1475mod tests {
1476 use std::collections::HashSet;
1477 use std::sync::Arc;
1478
1479 use zeph_llm::provider::{Message, MessagePart, Role};
1480 use zeph_memory::TokenCounter;
1481
1482 use super::*;
1483 use crate::helpers::{GRAPH_FACTS_PREFIX, RECALL_PREFIX, SUMMARY_PREFIX};
1484 use crate::state::MessageWindowView;
1485
1486 fn make_counter() -> Arc<TokenCounter> {
1487 Arc::new(TokenCounter::default())
1488 }
1489
1490 fn make_window<'a>(
1491 messages: &'a mut Vec<Message>,
1492 cached: &'a mut u64,
1493 completed: &'a mut HashSet<String>,
1494 ) -> MessageWindowView<'a> {
1495 let last = Box::leak(Box::new(None::<i64>));
1496 let deferred_hide = Box::leak(Box::new(Vec::<i64>::new()));
1497 let deferred_summ = Box::leak(Box::new(Vec::<String>::new()));
1498 MessageWindowView {
1499 messages,
1500 last_persisted_message_id: last,
1501 deferred_db_hide_ids: deferred_hide,
1502 deferred_db_summaries: deferred_summ,
1503 cached_prompt_tokens: cached,
1504 token_counter: make_counter(),
1505 completed_tool_ids: completed,
1506 }
1507 }
1508
1509 fn sys(text: &str) -> Message {
1510 Message::from_legacy(Role::System, text)
1511 }
1512
1513 fn user(text: &str) -> Message {
1514 Message::from_legacy(Role::User, text)
1515 }
1516
1517 fn assistant(text: &str) -> Message {
1518 Message::from_legacy(Role::Assistant, text)
1519 }
1520
1521 #[test]
1522 fn clear_history_keeps_system_prompt() {
1523 let mut msgs = vec![sys("system"), user("hello"), assistant("hi")];
1524 let mut cached = 0u64;
1525 let mut completed = HashSet::new();
1526 completed.insert("tool_1".to_owned());
1527 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1528
1529 ContextService::new().clear_history(&mut window);
1530
1531 assert_eq!(window.messages.len(), 1);
1532 assert_eq!(window.messages[0].content, "system");
1533 assert!(
1534 window.completed_tool_ids.is_empty(),
1535 "completed_tool_ids must be cleared"
1536 );
1537 }
1538
1539 #[test]
1540 fn clear_history_empty_messages_is_noop() {
1541 let mut msgs: Vec<Message> = vec![];
1542 let mut cached = 0u64;
1543 let mut completed = HashSet::new();
1544 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1545
1546 ContextService::new().clear_history(&mut window);
1547
1548 assert!(window.messages.is_empty());
1549 }
1550
1551 #[test]
1552 fn remove_recall_messages_removes_by_prefix() {
1553 let mut msgs = vec![
1554 sys("system"),
1555 sys(&format!("{RECALL_PREFIX}some recalled text")),
1556 user("hello"),
1557 ];
1558 let mut cached = 0u64;
1559 let mut completed = HashSet::new();
1560 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1561
1562 ContextService::new().remove_recall_messages(&mut window);
1563
1564 assert_eq!(window.messages.len(), 2);
1565 assert!(
1566 window
1567 .messages
1568 .iter()
1569 .all(|m| !m.content.starts_with(RECALL_PREFIX))
1570 );
1571 }
1572
1573 #[test]
1576 fn remove_recall_messages_removes_user_role_recall() {
1577 let mut msgs = vec![
1578 sys("system"),
1579 user(&format!("{RECALL_PREFIX}recalled via tiered path")),
1580 user("real user message"),
1581 ];
1582 let mut cached = 0u64;
1583 let mut completed = HashSet::new();
1584 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1585
1586 ContextService::new().remove_recall_messages(&mut window);
1587
1588 assert_eq!(
1589 window.messages.len(),
1590 2,
1591 "Role::User recall message must be removed"
1592 );
1593 assert!(
1594 window
1595 .messages
1596 .iter()
1597 .all(|m| !m.content.starts_with(RECALL_PREFIX)),
1598 "no message with RECALL_PREFIX must remain"
1599 );
1600 assert!(
1601 window
1602 .messages
1603 .iter()
1604 .any(|m| m.content == "real user message"),
1605 "non-recall user message must survive"
1606 );
1607 }
1608
1609 #[test]
1610 fn remove_graph_facts_messages_removes_matching() {
1611 let mut msgs = vec![
1612 sys("system"),
1613 sys(&format!("{GRAPH_FACTS_PREFIX}fact1")),
1614 user("hello"),
1615 ];
1616 let mut cached = 0u64;
1617 let mut completed = HashSet::new();
1618 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1619
1620 ContextService::new().remove_graph_facts_messages(&mut window);
1621
1622 assert_eq!(window.messages.len(), 2);
1623 }
1624
1625 #[test]
1626 fn remove_summary_messages_removes_by_part() {
1627 let mut msgs = vec![
1628 sys("system"),
1629 Message::from_parts(
1630 Role::System,
1631 vec![MessagePart::Summary {
1632 text: format!("{SUMMARY_PREFIX}old summary"),
1633 }],
1634 ),
1635 user("hello"),
1636 ];
1637 let mut cached = 0u64;
1638 let mut completed = HashSet::new();
1639 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1640
1641 ContextService::new().remove_summary_messages(&mut window);
1642
1643 assert_eq!(window.messages.len(), 2);
1644 }
1645
1646 #[test]
1647 fn trim_messages_to_budget_zero_is_noop() {
1648 let mut msgs = vec![sys("system"), user("a"), assistant("b"), user("c")];
1649 let original_len = msgs.len();
1650 let mut cached = 0u64;
1651 let mut completed = HashSet::new();
1652 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1653
1654 ContextService::new().trim_messages_to_budget(&mut window, 0);
1655
1656 assert_eq!(window.messages.len(), original_len);
1657 }
1658
1659 #[test]
1660 fn trim_messages_to_budget_keeps_recent() {
1661 let mut msgs = vec![
1663 sys("system"),
1664 user("message 1"),
1665 assistant("reply 1"),
1666 user("message 2"),
1667 ];
1668 let mut cached = 0u64;
1669 let mut completed = HashSet::new();
1670 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1671
1672 ContextService::new().trim_messages_to_budget(&mut window, 1);
1674
1675 assert!(
1677 window.messages.len() < 4,
1678 "trim should remove some messages"
1679 );
1680 assert_eq!(
1681 window.messages[0].role,
1682 Role::System,
1683 "system prompt must survive trim"
1684 );
1685 }
1686
1687 mod inject_semantic_recall_tests {
1688 use parking_lot::RwLock;
1689 use std::borrow::Cow;
1690 use std::collections::HashSet;
1691 use std::sync::Arc;
1692
1693 use zeph_config::memory::TieredRetrievalConfig;
1694 use zeph_config::{
1695 ContextFormat, ContextStrategy, DocumentConfig, GraphConfig, PersonaConfig,
1696 ReasoningConfig, TrajectoryConfig, TreeConfig,
1697 };
1698 use zeph_context::manager::ContextManager;
1699 use zeph_llm::provider::Message;
1700 use zeph_memory::TokenCounter;
1701 use zeph_sanitizer::ContentIsolationConfig;
1702 use zeph_sanitizer::ContentSanitizer;
1703 use zeph_skills::registry::SkillRegistry;
1704
1705 use zeph_common::SecurityEventCategory;
1706
1707 use super::super::*;
1708 use crate::helpers::RECALL_PREFIX;
1709 use crate::state::{
1710 ContextAssemblyView, MessageWindowView, MetricsCounters, SecurityEventSink,
1711 };
1712
1713 struct NoopSink;
1714 impl SecurityEventSink for NoopSink {
1715 fn push(&mut self, _: SecurityEventCategory, _: &'static str, _: String) {}
1716 }
1717
1718 fn make_counter() -> Arc<TokenCounter> {
1719 Arc::new(TokenCounter::default())
1720 }
1721
1722 fn make_window<'a>(
1723 messages: &'a mut Vec<Message>,
1724 cached: &'a mut u64,
1725 completed: &'a mut HashSet<String>,
1726 ) -> MessageWindowView<'a> {
1727 let last = Box::leak(Box::new(None::<i64>));
1728 let deferred_hide = Box::leak(Box::new(Vec::<i64>::new()));
1729 let deferred_summ = Box::leak(Box::new(Vec::<String>::new()));
1730 MessageWindowView {
1731 messages,
1732 last_persisted_message_id: last,
1733 deferred_db_hide_ids: deferred_hide,
1734 deferred_db_summaries: deferred_summ,
1735 cached_prompt_tokens: cached,
1736 token_counter: make_counter(),
1737 completed_tool_ids: completed,
1738 }
1739 }
1740
1741 fn scrub_noop(s: &str) -> Cow<'_, str> {
1742 Cow::Borrowed(s)
1743 }
1744
1745 #[tokio::test]
1746 async fn tiered_recall_disabled_uses_flat_path() {
1747 let mut msgs: Vec<Message> = vec![];
1750 let mut cached = 0u64;
1751 let mut completed = HashSet::new();
1752 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1753
1754 let sanitizer = ContentSanitizer::new(&ContentIsolationConfig::default());
1755 let mut ctx_mgr = ContextManager::new();
1756 let mut sink = NoopSink;
1757 let mut last_confidence = None::<f32>;
1758 let mut last_skills_prompt = String::new();
1759 let mut active_skill_names = Vec::new();
1760 let registry = Arc::new(RwLock::new(SkillRegistry::default()));
1761
1762 let view = ContextAssemblyView {
1763 memory: None,
1764 conversation_id: None,
1765 recall_limit: 10,
1766 cross_session_score_threshold: 0.5,
1767 context_format: ContextFormat::default(),
1768 last_recall_confidence: &mut last_confidence,
1769 context_strategy: ContextStrategy::default(),
1770 crossover_turn_threshold: 0,
1771 cached_session_digest: None,
1772 digest_enabled: false,
1773 graph_config: GraphConfig::default(),
1774 document_config: DocumentConfig::default(),
1775 persona_config: PersonaConfig::default(),
1776 trajectory_config: TrajectoryConfig::default(),
1777 reasoning_config: ReasoningConfig::default(),
1778 memcot_config: zeph_config::MemCotConfig::default(),
1779 memcot_state: None,
1780 tree_config: TreeConfig::default(),
1781 last_skills_prompt: &mut last_skills_prompt,
1782 active_skill_names: &mut active_skill_names,
1783 skill_registry: registry,
1784 skill_paths: &[],
1785 correction_config: None,
1786 sidequest_turn_counter: 0,
1787 proactive_explorer: None,
1788 sanitizer: &sanitizer,
1789 quarantine_summarizer: None,
1790 context_manager: &mut ctx_mgr,
1791 token_counter: make_counter(),
1792 metrics: MetricsCounters::default(),
1793 security_events: &mut sink,
1794 cached_prompt_tokens: 0,
1795 redact_credentials: false,
1796 channel_skills: &[],
1797 scrub: scrub_noop,
1798 tiered_retrieval_config: TieredRetrievalConfig {
1799 enabled: false,
1800 ..TieredRetrievalConfig::default()
1801 },
1802 tiered_retrieval_classifier: None,
1803 tiered_retrieval_validator: None,
1804 };
1805
1806 let result = ContextService::new()
1807 .inject_semantic_recall("test query", 1000, &mut window, &view)
1808 .await;
1809
1810 assert!(result.is_ok(), "disabled tiered recall must return Ok(())");
1811 assert!(
1812 window
1813 .messages
1814 .iter()
1815 .all(|m| !m.content.starts_with(RECALL_PREFIX)),
1816 "no recall message must be injected when memory is None"
1817 );
1818 }
1819
1820 #[tokio::test]
1821 async fn tiered_recall_enabled_no_memory_returns_ok() {
1822 let mut msgs: Vec<Message> = vec![];
1825 let mut cached = 0u64;
1826 let mut completed = HashSet::new();
1827 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1828
1829 let sanitizer = ContentSanitizer::new(&ContentIsolationConfig::default());
1830 let mut ctx_mgr = ContextManager::new();
1831 let mut sink = NoopSink;
1832 let mut last_confidence = None::<f32>;
1833 let mut last_skills_prompt = String::new();
1834 let mut active_skill_names = Vec::new();
1835 let registry = Arc::new(RwLock::new(SkillRegistry::default()));
1836
1837 let view = ContextAssemblyView {
1838 memory: None,
1839 conversation_id: None,
1840 recall_limit: 10,
1841 cross_session_score_threshold: 0.5,
1842 context_format: ContextFormat::default(),
1843 last_recall_confidence: &mut last_confidence,
1844 context_strategy: ContextStrategy::default(),
1845 crossover_turn_threshold: 0,
1846 cached_session_digest: None,
1847 digest_enabled: false,
1848 graph_config: GraphConfig::default(),
1849 document_config: DocumentConfig::default(),
1850 persona_config: PersonaConfig::default(),
1851 trajectory_config: TrajectoryConfig::default(),
1852 reasoning_config: ReasoningConfig::default(),
1853 memcot_config: zeph_config::MemCotConfig::default(),
1854 memcot_state: None,
1855 tree_config: TreeConfig::default(),
1856 last_skills_prompt: &mut last_skills_prompt,
1857 active_skill_names: &mut active_skill_names,
1858 skill_registry: registry,
1859 skill_paths: &[],
1860 correction_config: None,
1861 sidequest_turn_counter: 0,
1862 proactive_explorer: None,
1863 sanitizer: &sanitizer,
1864 quarantine_summarizer: None,
1865 context_manager: &mut ctx_mgr,
1866 token_counter: make_counter(),
1867 metrics: MetricsCounters::default(),
1868 security_events: &mut sink,
1869 cached_prompt_tokens: 0,
1870 redact_credentials: false,
1871 channel_skills: &[],
1872 scrub: scrub_noop,
1873 tiered_retrieval_config: TieredRetrievalConfig {
1874 enabled: true,
1875 ..TieredRetrievalConfig::default()
1876 },
1877 tiered_retrieval_classifier: None,
1878 tiered_retrieval_validator: None,
1879 };
1880
1881 let result = ContextService::new()
1882 .inject_semantic_recall("test query", 1000, &mut window, &view)
1883 .await;
1884
1885 assert!(
1886 result.is_ok(),
1887 "enabled tiered recall with no memory must return Ok(())"
1888 );
1889 assert!(
1890 window.messages.is_empty(),
1891 "no recall message must be injected when memory is None"
1892 );
1893 }
1894
1895 #[tokio::test]
1900 async fn prepare_context_tiered_enabled_no_budget_returns_default() {
1901 let mut msgs: Vec<zeph_llm::provider::Message> = vec![];
1902 let mut cached = 0u64;
1903 let mut completed = HashSet::new();
1904 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1905
1906 let sanitizer = zeph_sanitizer::ContentSanitizer::new(
1907 &zeph_sanitizer::ContentIsolationConfig::default(),
1908 );
1909 let mut ctx_mgr = zeph_context::manager::ContextManager::new();
1910 assert!(ctx_mgr.budget.is_none());
1912
1913 let mut sink = NoopSink;
1914 let mut last_confidence = None::<f32>;
1915 let mut last_skills_prompt = String::new();
1916 let mut active_skill_names = Vec::new();
1917 let registry = Arc::new(RwLock::new(zeph_skills::registry::SkillRegistry::default()));
1918
1919 let mut view = ContextAssemblyView {
1920 memory: None,
1921 conversation_id: None,
1922 recall_limit: 10,
1923 cross_session_score_threshold: 0.5,
1924 context_format: ContextFormat::default(),
1925 last_recall_confidence: &mut last_confidence,
1926 context_strategy: ContextStrategy::default(),
1927 crossover_turn_threshold: 0,
1928 cached_session_digest: None,
1929 digest_enabled: false,
1930 graph_config: GraphConfig::default(),
1931 document_config: DocumentConfig::default(),
1932 persona_config: PersonaConfig::default(),
1933 trajectory_config: TrajectoryConfig::default(),
1934 reasoning_config: ReasoningConfig::default(),
1935 memcot_config: zeph_config::MemCotConfig::default(),
1936 memcot_state: None,
1937 tree_config: TreeConfig::default(),
1938 last_skills_prompt: &mut last_skills_prompt,
1939 active_skill_names: &mut active_skill_names,
1940 skill_registry: registry,
1941 skill_paths: &[],
1942 correction_config: None,
1943 sidequest_turn_counter: 0,
1944 proactive_explorer: None,
1945 sanitizer: &sanitizer,
1946 quarantine_summarizer: None,
1947 context_manager: &mut ctx_mgr,
1948 token_counter: make_counter(),
1949 metrics: MetricsCounters::default(),
1950 security_events: &mut sink,
1951 cached_prompt_tokens: 0,
1952 redact_credentials: false,
1953 channel_skills: &[],
1954 scrub: scrub_noop,
1955 tiered_retrieval_config: TieredRetrievalConfig {
1956 enabled: true,
1957 ..TieredRetrievalConfig::default()
1958 },
1959 tiered_retrieval_classifier: None,
1960 tiered_retrieval_validator: None,
1961 };
1962
1963 let result = ContextService::new()
1964 .prepare_context("test query", &mut window, &mut view)
1965 .await;
1966
1967 assert!(
1968 result.is_ok(),
1969 "prepare_context with tiered enabled and no budget must return Ok"
1970 );
1971 }
1972
1973 #[tokio::test]
1976 async fn inject_semantic_recall_bare_no_memory_returns_ok() {
1977 use zeph_config::memory::TieredRetrievalConfig;
1978
1979 let mut msgs: Vec<Message> = vec![];
1980 let mut cached = 0u64;
1981 let mut completed = HashSet::new();
1982 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1983
1984 let tiered_config = TieredRetrievalConfig {
1985 enabled: true,
1986 ..TieredRetrievalConfig::default()
1987 };
1988 let params = SemanticRecallParams {
1989 query: "test query",
1990 token_budget: 1000,
1991 recall_limit: 10,
1992 context_format: zeph_config::ContextFormat::default(),
1993 conversation_id: None,
1994 tiered_classifier: None,
1995 tiered_validator: None,
1996 tiered_config: &tiered_config,
1997 };
1998 let result = ContextService::new()
1999 .inject_semantic_recall_bare(params, &mut window, None)
2000 .await;
2001
2002 assert!(
2003 result.is_ok(),
2004 "inject_semantic_recall_bare with memory=None must return Ok(())"
2005 );
2006 assert!(
2007 window.messages.is_empty(),
2008 "no recall message must be injected when memory is None"
2009 );
2010 }
2011 }
2012}