1use zeph_context::budget::ContextBudget;
7use zeph_llm::LlmProvider;
8use zeph_llm::provider::{MessagePart, Role};
9
10use crate::error::ContextError;
11use crate::helpers::{
12 CODE_CONTEXT_PREFIX, CORRECTIONS_PREFIX, CROSS_SESSION_PREFIX, DOCUMENT_RAG_PREFIX,
13 GRAPH_FACTS_PREFIX, LSP_NOTE_PREFIX, PERSONA_PREFIX, REASONING_PREFIX, RECALL_PREFIX,
14 SESSION_DIGEST_PREFIX, SUMMARY_PREFIX, TRAJECTORY_PREFIX, TREE_MEMORY_PREFIX,
15};
16use crate::state::{
17 ContextAssemblyView, ContextDelta, ContextSummarizationView, MessageWindowView,
18 ProviderHandles, StatusSink,
19};
20
21#[derive(Debug, Default)]
39pub struct ContextService;
40
41impl ContextService {
42 #[must_use]
46 pub fn new() -> Self {
47 Self
48 }
49
50 pub fn clear_history(&self, window: &mut MessageWindowView<'_>) {
58 let system_prompt = window.messages.first().cloned();
59 window.messages.clear();
60 if let Some(sp) = system_prompt {
61 window.messages.push(sp);
62 }
63 window.completed_tool_ids.clear();
64 recompute_prompt_tokens(window);
65 }
66
67 pub fn remove_recall_messages(&self, window: &mut MessageWindowView<'_>) {
69 remove_by_part_or_prefix(window.messages, RECALL_PREFIX, |p| {
70 matches!(p, MessagePart::Recall { .. })
71 });
72 }
73
74 pub fn remove_correction_messages(&self, window: &mut MessageWindowView<'_>) {
76 remove_by_prefix(window.messages, Role::System, CORRECTIONS_PREFIX);
77 }
78
79 pub fn remove_graph_facts_messages(&self, window: &mut MessageWindowView<'_>) {
81 remove_by_prefix(window.messages, Role::System, GRAPH_FACTS_PREFIX);
82 }
83
84 pub fn remove_persona_facts_messages(&self, window: &mut MessageWindowView<'_>) {
86 remove_by_prefix(window.messages, Role::System, PERSONA_PREFIX);
87 }
88
89 pub fn remove_trajectory_hints_messages(&self, window: &mut MessageWindowView<'_>) {
91 remove_by_prefix(window.messages, Role::System, TRAJECTORY_PREFIX);
92 }
93
94 pub fn remove_tree_memory_messages(&self, window: &mut MessageWindowView<'_>) {
96 remove_by_prefix(window.messages, Role::System, TREE_MEMORY_PREFIX);
97 }
98
99 pub fn remove_reasoning_strategies_messages(&self, window: &mut MessageWindowView<'_>) {
101 remove_by_prefix(window.messages, Role::System, REASONING_PREFIX);
102 }
103
104 pub fn remove_lsp_messages(&self, window: &mut MessageWindowView<'_>) {
109 remove_by_prefix(window.messages, Role::System, LSP_NOTE_PREFIX);
110 }
111
112 pub fn remove_code_context_messages(&self, window: &mut MessageWindowView<'_>) {
114 remove_by_part_or_prefix(window.messages, CODE_CONTEXT_PREFIX, |p| {
115 matches!(p, MessagePart::CodeContext { .. })
116 });
117 }
118
119 pub fn remove_summary_messages(&self, window: &mut MessageWindowView<'_>) {
121 remove_by_part_or_prefix(window.messages, SUMMARY_PREFIX, |p| {
122 matches!(p, MessagePart::Summary { .. })
123 });
124 }
125
126 pub fn remove_cross_session_messages(&self, window: &mut MessageWindowView<'_>) {
128 remove_by_part_or_prefix(window.messages, CROSS_SESSION_PREFIX, |p| {
129 matches!(p, MessagePart::CrossSession { .. })
130 });
131 }
132
133 pub fn remove_session_digest_message(&self, window: &mut MessageWindowView<'_>) {
135 remove_by_prefix(window.messages, Role::User, SESSION_DIGEST_PREFIX);
136 }
137
138 pub fn remove_document_rag_messages(&self, window: &mut MessageWindowView<'_>) {
140 remove_by_prefix(window.messages, Role::System, DOCUMENT_RAG_PREFIX);
141 }
142
143 pub fn trim_messages_to_budget(&self, window: &mut MessageWindowView<'_>, token_budget: usize) {
151 if token_budget == 0 {
152 return;
153 }
154
155 let history_start = window
157 .messages
158 .iter()
159 .position(|m| m.role != Role::System)
160 .unwrap_or(window.messages.len());
161
162 if history_start >= window.messages.len() {
163 return;
164 }
165
166 let mut total = 0usize;
167 let mut keep_from = window.messages.len();
168
169 for i in (history_start..window.messages.len()).rev() {
170 let msg_tokens = window
171 .token_counter
172 .count_message_tokens(&window.messages[i]);
173 if total + msg_tokens > token_budget {
174 break;
175 }
176 total += msg_tokens;
177 keep_from = i;
178 }
179
180 if keep_from > history_start {
181 let removed = keep_from - history_start;
182 window.messages.drain(history_start..keep_from);
183 recompute_prompt_tokens(window);
184 tracing::info!(
185 removed,
186 token_budget,
187 "trimmed messages to fit context budget"
188 );
189 }
190 }
191
192 pub async fn inject_semantic_recall(
204 &self,
205 query: &str,
206 token_budget: usize,
207 window: &mut MessageWindowView<'_>,
208 view: &ContextAssemblyView<'_>,
209 ) -> Result<(), ContextError> {
210 self.remove_recall_messages(window);
211
212 let (msg, _score) = crate::helpers::fetch_semantic_recall_raw(
213 view.memory.as_deref(),
214 view.recall_limit,
215 view.context_format,
216 query,
217 token_budget,
218 &view.token_counter,
219 None,
220 None,
221 )
222 .await?;
223
224 if let Some(msg) = msg
225 && window.messages.len() > 1
226 {
227 window.messages.insert(1, msg);
228 }
229
230 Ok(())
231 }
232
233 pub async fn inject_cross_session_context(
242 &self,
243 query: &str,
244 token_budget: usize,
245 window: &mut MessageWindowView<'_>,
246 view: &ContextAssemblyView<'_>,
247 ) -> Result<(), ContextError> {
248 self.remove_cross_session_messages(window);
249
250 if let Some(msg) = crate::helpers::fetch_cross_session_raw(
251 view.memory.as_deref(),
252 view.conversation_id,
253 view.cross_session_score_threshold,
254 query,
255 token_budget,
256 &view.token_counter,
257 )
258 .await?
259 && window.messages.len() > 1
260 {
261 window.messages.insert(1, msg);
262 tracing::debug!("injected cross-session context");
263 }
264
265 Ok(())
266 }
267
268 pub async fn inject_summaries(
277 &self,
278 token_budget: usize,
279 window: &mut MessageWindowView<'_>,
280 view: &ContextAssemblyView<'_>,
281 ) -> Result<(), ContextError> {
282 self.remove_summary_messages(window);
283
284 if let Some(msg) = crate::helpers::fetch_summaries_raw(
285 view.memory.as_deref(),
286 view.conversation_id,
287 token_budget,
288 &view.token_counter,
289 )
290 .await?
291 && window.messages.len() > 1
292 {
293 window.messages.insert(1, msg);
294 tracing::debug!("injected summaries into context");
295 }
296
297 Ok(())
298 }
299
300 pub async fn disambiguate_skills(
305 &self,
306 query: &str,
307 all_meta: &[&zeph_skills::loader::SkillMeta],
308 scored: &[zeph_skills::ScoredMatch],
309 providers: &ProviderHandles,
310 ) -> Option<Vec<usize>> {
311 use std::fmt::Write as _;
312
313 let mut candidates = String::new();
314 for sm in scored {
315 if let Some(meta) = all_meta.get(sm.index) {
316 let _ = writeln!(
317 candidates,
318 "- {} (score: {:.3}): {}",
319 meta.name, sm.score, meta.description
320 );
321 }
322 }
323
324 let prompt = format!(
325 "The user said: \"{query}\"\n\n\
326 These skills matched with similar scores:\n{candidates}\n\
327 Which skill best matches the user's intent? \
328 Return the skill_name, your confidence (0-1), and any extracted parameters."
329 );
330
331 let messages = vec![zeph_llm::provider::Message::from_legacy(
332 zeph_llm::provider::Role::User,
333 prompt,
334 )];
335 match providers
336 .primary
337 .chat_typed::<zeph_skills::IntentClassification>(&messages)
338 .await
339 {
340 Ok(classification) => {
341 tracing::info!(
342 skill = %classification.skill_name,
343 confidence = classification.confidence,
344 "disambiguation selected skill"
345 );
346 let mut indices: Vec<usize> = scored.iter().map(|s| s.index).collect();
347 if let Some(pos) = indices.iter().position(|&i| {
348 all_meta
349 .get(i)
350 .is_some_and(|m| m.name == classification.skill_name)
351 }) {
352 indices.swap(0, pos);
353 }
354 Some(indices)
355 }
356 Err(e) => {
357 tracing::warn!("disambiguation failed, using original order: {e:#}");
358 None
359 }
360 }
361 }
362
363 #[allow(clippy::too_many_lines)] pub async fn prepare_context(
376 &self,
377 query: &str,
378 window: &mut MessageWindowView<'_>,
379 view: &mut ContextAssemblyView<'_>,
380 _providers: &ProviderHandles,
381 ) -> Result<ContextDelta, ContextError> {
382 if view.context_manager.budget.is_none() {
383 return Ok(ContextDelta::default());
384 }
385
386 self.remove_session_digest_message(window);
388 self.remove_summary_messages(window);
389 self.remove_cross_session_messages(window);
390 self.remove_recall_messages(window);
391 self.remove_document_rag_messages(window);
392 self.remove_correction_messages(window);
393 self.remove_code_context_messages(window);
394 self.remove_graph_facts_messages(window);
395 self.remove_persona_facts_messages(window);
396 self.remove_trajectory_hints_messages(window);
397 self.remove_tree_memory_messages(window);
398 if view.reasoning_config.enabled {
399 self.remove_reasoning_strategies_messages(window);
400 }
401
402 if let Some(explorer) = view.proactive_explorer.clone()
404 && let Some(domain) = explorer.classify(query)
405 {
406 let already_known = {
407 let registry_guard = view.skill_registry.read();
408 explorer.has_knowledge(®istry_guard, &domain)
409 };
410 let excluded = explorer.is_excluded(&domain);
411
412 if !already_known && !excluded {
413 tracing::debug!(domain = %domain.0, query_len = query.len(), "proactive.explore triggered");
414 let timeout_ms = explorer.timeout_ms();
415 let result = tokio::time::timeout(
416 std::time::Duration::from_millis(timeout_ms),
417 explorer.explore(&domain),
418 )
419 .await;
420 match result {
421 Ok(Ok(())) => {
422 view.skill_registry.write().reload(view.skill_paths);
423 tracing::debug!(domain = %domain.0, "proactive.explore complete, registry reloaded");
424 }
425 Ok(Err(e)) => {
426 tracing::warn!(domain = %domain.0, error = %e, "proactive exploration failed");
427 }
428 Err(_) => {
429 tracing::warn!(domain = %domain.0, timeout_ms, "proactive exploration timed out");
430 }
431 }
432 }
433 }
434
435 let active_levels: &'static [zeph_memory::compression::CompressionLevel] =
437 if let Some(ref budget) = view.context_manager.budget {
438 let used = view.cached_prompt_tokens;
439 let max = budget.max_tokens();
440 #[allow(clippy::cast_precision_loss)]
441 let remaining_ratio = if max == 0 {
442 1.0_f32
443 } else {
444 1.0 - (used as f32 / max as f32).clamp(0.0, 1.0)
445 };
446 let levels =
447 zeph_memory::compression::RetrievalPolicy::default().select(remaining_ratio);
448 tracing::debug!(
449 remaining_ratio,
450 active_levels = ?levels,
451 "compression_spectrum: retrieval policy selected"
452 );
453 levels
454 } else {
455 &[]
456 };
457
458 let memory_backend: Option<std::sync::Arc<dyn zeph_common::memory::ContextMemoryBackend>> =
459 view.memory.clone().map(
460 |m| -> std::sync::Arc<dyn zeph_common::memory::ContextMemoryBackend> {
461 std::sync::Arc::new(crate::memory_backend::SemanticMemoryBackend::new(m))
462 },
463 );
464
465 let memory_view = zeph_context::input::ContextMemoryView {
466 memory: memory_backend,
467 conversation_id: view.conversation_id.map(|c| c.0),
468 recall_limit: view.recall_limit,
469 cross_session_score_threshold: view.cross_session_score_threshold,
470 context_strategy: view.context_strategy,
471 crossover_turn_threshold: view.crossover_turn_threshold,
472 cached_session_digest: view.cached_session_digest.clone(),
473 graph_config: view.graph_config.clone(),
474 document_config: view.document_config.clone(),
475 persona_config: view.persona_config.clone(),
476 trajectory_config: view.trajectory_config.clone(),
477 reasoning_config: view.reasoning_config.clone(),
478 memcot_config: view.memcot_config.clone(),
479 memcot_state: view.memcot_state.clone(),
480 tree_config: view.tree_config.clone(),
481 };
482
483 #[cfg(feature = "index")]
484 let index_access = view.index;
485 #[cfg(not(feature = "index"))]
486 let index_access: Option<&dyn zeph_context::input::IndexAccess> = None;
487
488 let router = crate::memory_backend::build_memory_router(view.context_manager);
489
490 let input = zeph_context::input::ContextAssemblyInput {
491 memory: &memory_view,
492 context_manager: view.context_manager,
493 token_counter: &*view.token_counter,
494 skills_prompt: view.last_skills_prompt,
495 index: index_access,
496 correction_config: view.correction_config,
497 sidequest_turn_counter: view.sidequest_turn_counter,
498 messages: window.messages,
499 query,
500 scrub: view.scrub,
501 active_levels,
502 router,
503 };
504
505 let prepared = zeph_context::assembler::ContextAssembler::gather(&input).await?;
506
507 let delta = self.apply_prepared_context(window, view, prepared).await;
508 Ok(delta)
509 }
510
511 #[allow(clippy::too_many_lines)] async fn apply_prepared_context(
520 &self,
521 window: &mut MessageWindowView<'_>,
522 view: &mut ContextAssemblyView<'_>,
523 prepared: zeph_context::assembler::PreparedContext,
524 ) -> ContextDelta {
525 use std::borrow::Cow;
526 use zeph_llm::provider::{Message, MessageMetadata, Role};
527 use zeph_sanitizer::{ContentSource, ContentSourceKind, MemorySourceHint};
528
529 *view.last_recall_confidence = prepared.recall_confidence;
531
532 if prepared.memory_first {
534 let history_start = 1usize;
535 let len = window.messages.len();
536 let keep_tail =
537 zeph_context::assembler::memory_first_keep_tail(window.messages, history_start);
538 if len > history_start + keep_tail {
539 window.messages.drain(history_start..len - keep_tail);
540 recompute_prompt_tokens(window);
541 tracing::debug!(
542 strategy = "memory_first",
543 keep_tail,
544 "dropped conversation history, kept last {keep_tail} messages"
545 );
546 }
547 }
548
549 if let Some(msg) = prepared.graph_facts.filter(|_| window.messages.len() > 1) {
551 let sanitized = self
552 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
553 .await;
554 window.messages.insert(1, sanitized);
555 tracing::debug!("injected knowledge graph facts into context");
556 }
557 if let Some(msg) = prepared.doc_rag.filter(|_| window.messages.len() > 1) {
558 let sanitized = self
559 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
560 .await;
561 window.messages.insert(1, sanitized);
562 tracing::debug!("injected document RAG context");
563 }
564 if let Some(msg) = prepared.corrections.filter(|_| window.messages.len() > 1) {
565 let sanitized = self
566 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
567 .await;
568 window.messages.insert(1, sanitized);
569 tracing::debug!("injected past corrections into context");
570 }
571 if let Some(msg) = prepared.recall.filter(|_| window.messages.len() > 1) {
572 let sanitized = self
573 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
574 .await;
575 window.messages.insert(1, sanitized);
576 }
577 if let Some(msg) = prepared.cross_session.filter(|_| window.messages.len() > 1) {
578 let sanitized = self
579 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
580 .await;
581 window.messages.insert(1, sanitized);
582 }
583 if let Some(msg) = prepared.summaries.filter(|_| window.messages.len() > 1) {
584 let sanitized = self
585 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
586 .await;
587 window.messages.insert(1, sanitized);
588 tracing::debug!("injected summaries into context");
589 }
590 if let Some(msg) = prepared.persona_facts.filter(|_| window.messages.len() > 1) {
591 let sanitized = self
592 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
593 .await;
594 window.messages.insert(1, sanitized);
595 tracing::debug!("injected persona facts into context");
596 }
597 if let Some(msg) = prepared
598 .trajectory_hints
599 .filter(|_| window.messages.len() > 1)
600 {
601 let sanitized = self
602 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
603 .await;
604 window.messages.insert(1, sanitized);
605 tracing::debug!("injected trajectory hints into context");
606 }
607 if let Some(msg) = prepared.tree_memory.filter(|_| window.messages.len() > 1) {
608 let sanitized = self
609 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
610 .await;
611 window.messages.insert(1, sanitized);
612 tracing::debug!("injected tree memory summary into context");
613 }
614 if let Some(msg) = prepared
615 .reasoning_hints
616 .filter(|_| window.messages.len() > 1)
617 {
618 let sanitized = self
619 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
620 .await;
621 window.messages.insert(1, sanitized);
622 tracing::debug!("injected reasoning strategies into context");
623 }
624
625 let code_context = if let Some(text) = prepared.code_context {
627 let sanitized = view
628 .sanitizer
629 .sanitize(&text, ContentSource::new(ContentSourceKind::ToolResult));
630 view.metrics.sanitizer_runs += 1;
631 if !sanitized.injection_flags.is_empty() {
632 tracing::warn!(
633 flags = sanitized.injection_flags.len(),
634 "injection patterns detected in code RAG context"
635 );
636 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
637 let detail = sanitized
638 .injection_flags
639 .first()
640 .map_or_else(String::new, |f| {
641 format!("Detected pattern: {}", f.pattern_name)
642 });
643 view.security_events.push(
644 zeph_common::SecurityEventCategory::InjectionFlag,
645 "code_rag",
646 detail,
647 );
648 }
649 if sanitized.was_truncated {
650 view.metrics.sanitizer_truncations += 1;
651 view.security_events.push(
652 zeph_common::SecurityEventCategory::Truncation,
653 "code_rag",
654 "Content truncated to max_content_size".to_string(),
655 );
656 }
657 Some(sanitized.body)
658 } else {
659 None
660 };
661
662 if !prepared.memory_first {
663 self.trim_messages_to_budget(window, prepared.recent_history_budget);
664 }
665
666 if view.digest_enabled
668 && let Some((digest_text, _)) = view
669 .cached_session_digest
670 .clone()
671 .filter(|_| window.messages.len() > 1)
672 {
673 let digest_msg = Message {
674 role: Role::User,
675 content: format!("{}{digest_text}", crate::helpers::SESSION_DIGEST_PREFIX),
676 parts: vec![],
677 metadata: MessageMetadata::default(),
678 };
679 let sanitized = self
680 .sanitize_memory_message(digest_msg, MemorySourceHint::LlmSummary, view)
681 .await;
682 window.messages.insert(1, sanitized);
683 tracing::debug!("injected session digest into context");
684 }
685
686 if view.redact_credentials {
688 for msg in &mut *window.messages {
689 if msg.role == Role::System {
690 continue;
691 }
692 if let Cow::Owned(s) = (view.scrub)(&msg.content) {
693 msg.content = s;
694 }
695 }
696 }
697
698 recompute_prompt_tokens(window);
699
700 ContextDelta { code_context }
701 }
702
703 async fn sanitize_memory_message(
713 &self,
714 mut msg: zeph_llm::provider::Message,
715 hint: zeph_sanitizer::MemorySourceHint,
716 view: &mut ContextAssemblyView<'_>,
717 ) -> zeph_llm::provider::Message {
718 use zeph_sanitizer::{ContentSource, ContentSourceKind};
719
720 let source = ContentSource::new(ContentSourceKind::MemoryRetrieval).with_memory_hint(hint);
721 let sanitized = view.sanitizer.sanitize(&msg.content, source);
722 view.metrics.sanitizer_runs += 1;
723 if !sanitized.injection_flags.is_empty() {
724 tracing::warn!(
725 flags = sanitized.injection_flags.len(),
726 "injection patterns detected in memory retrieval"
727 );
728 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
729 let detail = sanitized
730 .injection_flags
731 .first()
732 .map_or_else(String::new, |f| {
733 format!("Detected pattern: {}", f.pattern_name)
734 });
735 view.security_events.push(
736 zeph_common::SecurityEventCategory::InjectionFlag,
737 "memory_retrieval",
738 detail,
739 );
740 }
741 if sanitized.was_truncated {
742 view.metrics.sanitizer_truncations += 1;
743 view.security_events.push(
744 zeph_common::SecurityEventCategory::Truncation,
745 "memory_retrieval",
746 "Content truncated to max_content_size".to_string(),
747 );
748 }
749
750 if view.sanitizer.is_enabled()
752 && let Some(qs) = view.quarantine_summarizer
753 && qs.should_quarantine(ContentSourceKind::MemoryRetrieval)
754 {
755 match qs.extract_facts(&sanitized, view.sanitizer).await {
756 Ok((facts, flags)) => {
757 view.metrics.quarantine_invocations += 1;
758 view.security_events.push(
759 zeph_common::SecurityEventCategory::Quarantine,
760 "memory_retrieval",
761 "Content quarantined, facts extracted".to_string(),
762 );
763 let escaped = zeph_sanitizer::ContentSanitizer::escape_delimiter_tags(&facts);
764 msg.content = zeph_sanitizer::ContentSanitizer::apply_spotlight(
765 &escaped,
766 &sanitized.source,
767 &flags,
768 );
769 return msg;
770 }
771 Err(e) => {
772 tracing::warn!(
773 error = %e,
774 "quarantine failed for memory retrieval, using original sanitized content"
775 );
776 view.metrics.quarantine_failures += 1;
777 view.security_events.push(
778 zeph_common::SecurityEventCategory::Quarantine,
779 "memory_retrieval",
780 format!("Quarantine failed: {e}"),
781 );
782 }
783 }
784 }
785
786 msg.content = sanitized.body;
787 msg
788 }
789
790 pub async fn reset_conversation(
800 &self,
801 window: &mut MessageWindowView<'_>,
802 _view: &mut ContextAssemblyView<'_>,
803 ) -> Result<(), ContextError> {
804 self.clear_history(window);
805 Ok(())
806 }
807
808 #[allow(
825 clippy::cast_precision_loss,
826 clippy::cast_possible_truncation,
827 clippy::cast_sign_loss
828 )]
829 pub async fn maybe_compact(
830 &self,
831 summ: &mut ContextSummarizationView<'_>,
832 _providers: &ProviderHandles,
833 status: &(impl StatusSink + ?Sized),
834 ) -> Result<(), ContextError> {
835 use zeph_context::manager::{CompactionState, CompactionTier};
836
837 if let Some(ref mut count) = summ.context_manager.turns_since_last_hard_compaction {
839 *count += 1;
840 }
841
842 if let CompactionState::Exhausted { ref mut warned } = summ.context_manager.compaction
844 && !*warned
845 {
846 *warned = true;
847 tracing::warn!("compaction exhausted: context budget too tight for this session");
848 }
849 if summ.context_manager.compaction.is_exhausted() {
850 return Ok(());
851 }
852
853 if summ.server_compaction_active {
855 let budget = summ
856 .context_manager
857 .budget
858 .as_ref()
859 .map_or(0, ContextBudget::max_tokens);
860 if budget > 0 {
861 let fallback = (budget * 95 / 100) as u64;
862 if *summ.cached_prompt_tokens < fallback {
863 return Ok(());
864 }
865 tracing::warn!(
866 "server compaction active but context at 95%+ — falling back to client-side"
867 );
868 } else {
869 return Ok(());
870 }
871 }
872
873 if summ.context_manager.compaction.is_compacted_this_turn() {
875 return Ok(());
876 }
877
878 let in_cooldown = summ.context_manager.compaction.cooldown_remaining() > 0;
880 if in_cooldown
881 && let CompactionState::Cooling {
882 ref mut turns_remaining,
883 } = summ.context_manager.compaction
884 {
885 *turns_remaining -= 1;
886 if *turns_remaining == 0 {
887 summ.context_manager.compaction = CompactionState::Ready;
888 }
889 }
890
891 match summ
892 .context_manager
893 .compaction_tier(*summ.cached_prompt_tokens)
894 {
895 CompactionTier::None => Ok(()),
896 CompactionTier::Soft => {
897 self.do_soft_compaction(summ, status).await;
898 Ok(())
899 }
900 CompactionTier::Hard => self.do_hard_compaction(summ, status, in_cooldown).await,
901 }
902 }
903
904 #[allow(
909 clippy::cast_precision_loss,
910 clippy::cast_possible_truncation,
911 clippy::cast_sign_loss
912 )]
913 async fn do_soft_compaction(
914 &self,
915 summ: &mut ContextSummarizationView<'_>,
916 status: &(impl StatusSink + ?Sized),
917 ) {
918 status.send_status("soft compacting context...").await;
919
920 match &summ.context_manager.compression.pruning_strategy {
922 zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {
923 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
924 }
925 _ => crate::summarization::scheduling::maybe_refresh_task_goal(summ),
926 }
927
928 let applied = crate::summarization::deferred::apply_deferred_summaries(summ);
930
931 if applied > 0
933 && summ
934 .context_manager
935 .compression
936 .pruning_strategy
937 .is_subgoal()
938 {
939 summ.subgoal_registry
940 .rebuild_after_compaction(summ.messages, 0);
941 }
942
943 let budget = summ
945 .context_manager
946 .budget
947 .as_ref()
948 .map_or(0, ContextBudget::max_tokens);
949 let soft_threshold =
950 (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
951 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
952 let min_to_free = cached.saturating_sub(soft_threshold);
953 if min_to_free > 0 {
954 crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
955 }
956
957 status.send_status("").await;
958 tracing::info!(
959 cached_tokens = *summ.cached_prompt_tokens,
960 soft_threshold,
961 "soft compaction complete"
962 );
963 }
964
965 #[allow(
967 clippy::cast_precision_loss,
968 clippy::cast_possible_truncation,
969 clippy::cast_sign_loss
970 )]
971 async fn do_hard_compaction(
972 &self,
973 summ: &mut ContextSummarizationView<'_>,
974 status: &(impl StatusSink + ?Sized),
975 in_cooldown: bool,
976 ) -> Result<(), ContextError> {
977 use zeph_context::manager::CompactionState;
978
979 let turns_since_last = summ
981 .context_manager
982 .turns_since_last_hard_compaction
983 .map(|t| u32::try_from(t).unwrap_or(u32::MAX));
984 summ.context_manager.turns_since_last_hard_compaction = Some(0);
985 if let Some(metrics) = summ.metrics {
986 metrics.record_hard_compaction(turns_since_last);
987 }
988
989 if in_cooldown {
990 tracing::debug!(
991 turns_remaining = summ.context_manager.compaction.cooldown_remaining(),
992 "hard compaction skipped: cooldown active"
993 );
994 return Ok(());
995 }
996
997 let budget = summ
998 .context_manager
999 .budget
1000 .as_ref()
1001 .map_or(0, ContextBudget::max_tokens);
1002 let hard_threshold =
1003 (budget as f32 * summ.context_manager.hard_compaction_threshold) as usize;
1004 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
1005 let min_to_free = cached.saturating_sub(hard_threshold);
1006
1007 status.send_status("compacting context...").await;
1008
1009 crate::summarization::deferred::apply_deferred_summaries(summ);
1011
1012 let freed = crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
1014 if freed >= min_to_free {
1015 tracing::info!(freed, "hard compaction: pruning sufficient");
1016 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1017 cooldown: summ.context_manager.compaction_cooldown_turns,
1018 };
1019 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1020 tracing::warn!(%e, "flush_deferred_summaries failed after hard compaction");
1021 }
1022 status.send_status("").await;
1023 return Ok(());
1024 }
1025
1026 let preserve_tail = summ.context_manager.compaction_preserve_tail;
1028 let compactable = summ.messages.len().saturating_sub(preserve_tail + 1);
1029 if compactable <= 1 {
1030 tracing::warn!(
1031 compactable,
1032 "hard compaction: too few messages, marking exhausted"
1033 );
1034 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1035 status.send_status("").await;
1036 return Ok(());
1037 }
1038
1039 tracing::info!(
1041 min_to_free,
1042 "hard compaction: falling back to LLM summarization"
1043 );
1044 let tokens_before = *summ.cached_prompt_tokens;
1045 let outcome = crate::summarization::compaction::compact_context(summ, None).await?;
1046
1047 let freed_tokens = tokens_before.saturating_sub(*summ.cached_prompt_tokens);
1048
1049 if !outcome.is_compacted() || freed_tokens == 0 {
1050 tracing::warn!("hard compaction: no net reduction, marking exhausted");
1051 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1052 status.send_status("").await;
1053 return Ok(());
1054 }
1055
1056 if matches!(
1057 summ.context_manager
1058 .compaction_tier(*summ.cached_prompt_tokens),
1059 zeph_context::manager::CompactionTier::Hard
1060 ) {
1061 tracing::warn!(
1062 freed_tokens,
1063 "hard compaction: still above hard threshold after compaction, marking exhausted"
1064 );
1065 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1066 status.send_status("").await;
1067 return Ok(());
1068 }
1069
1070 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1071 cooldown: summ.context_manager.compaction_cooldown_turns,
1072 };
1073
1074 if tokens_before > *summ.cached_prompt_tokens {
1075 tracing::info!(
1076 tokens_before,
1077 tokens_after = *summ.cached_prompt_tokens,
1078 saved = freed_tokens,
1079 "context compaction complete"
1080 );
1081 }
1082
1083 status.send_status("").await;
1084 Ok(())
1085 }
1086
1087 pub async fn maybe_summarize_tool_pair(
1093 &self,
1094 summ: &mut ContextSummarizationView<'_>,
1095 providers: &ProviderHandles,
1096 ) {
1097 crate::summarization::deferred::maybe_summarize_tool_pair(
1098 summ,
1099 providers,
1100 &TxStatusSink(summ.status_tx.clone()),
1101 )
1102 .await;
1103 }
1104
1105 #[must_use]
1110 pub fn apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) -> usize {
1111 crate::summarization::deferred::apply_deferred_summaries(summ)
1112 }
1113
1114 pub async fn flush_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1119 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1120 tracing::warn!(%e, "flush_deferred_summaries failed");
1121 }
1122 }
1123
1124 pub fn maybe_apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1130 crate::summarization::deferred::maybe_apply_deferred_summaries(summ);
1131 }
1132
1133 pub async fn compact_context(
1148 &self,
1149 summ: &mut ContextSummarizationView<'_>,
1150 max_summary_tokens: Option<usize>,
1151 ) -> Result<crate::state::CompactionOutcome, crate::error::ContextError> {
1152 crate::summarization::compaction::compact_context(summ, max_summary_tokens).await
1153 }
1154
1155 pub fn maybe_soft_compact_mid_iteration(&self, summ: &mut ContextSummarizationView<'_>) {
1161 crate::summarization::scheduling::maybe_soft_compact_mid_iteration(summ);
1162 }
1163
1164 pub async fn maybe_proactive_compress(
1170 &self,
1171 summ: &mut ContextSummarizationView<'_>,
1172 _providers: &ProviderHandles,
1173 status: &(impl StatusSink + ?Sized),
1174 ) {
1175 let Some((_threshold, max_summary_tokens)) = summ
1176 .context_manager
1177 .should_proactively_compress(*summ.cached_prompt_tokens)
1178 else {
1179 return;
1180 };
1181
1182 if summ.server_compaction_active {
1183 let budget = summ
1184 .context_manager
1185 .budget
1186 .as_ref()
1187 .map_or(0, ContextBudget::max_tokens);
1188 if budget > 0 {
1189 let fallback = (budget * 95 / 100) as u64;
1190 if *summ.cached_prompt_tokens <= fallback {
1191 return;
1192 }
1193 tracing::warn!(
1194 cached_prompt_tokens = *summ.cached_prompt_tokens,
1195 fallback_threshold = fallback,
1196 "server compaction active but context at 95%+ — falling back to proactive"
1197 );
1198 } else {
1199 return;
1200 }
1201 }
1202
1203 status.send_status("compressing context...").await;
1204 tracing::info!(
1205 max_summary_tokens,
1206 cached_tokens = *summ.cached_prompt_tokens,
1207 "proactive compression triggered"
1208 );
1209
1210 match crate::summarization::compaction::compact_context(summ, Some(max_summary_tokens))
1211 .await
1212 {
1213 Ok(outcome) if outcome.is_compacted() => {
1214 summ.context_manager.compaction =
1215 zeph_context::manager::CompactionState::CompactedThisTurn { cooldown: 0 };
1216 tracing::info!("proactive compression complete");
1217 }
1218 Ok(_) => {}
1219 Err(e) => tracing::warn!(%e, "proactive compression failed"),
1220 }
1221
1222 status.send_status("").await;
1223 }
1224
1225 pub fn maybe_refresh_task_goal(&self, summ: &mut ContextSummarizationView<'_>) {
1231 crate::summarization::scheduling::maybe_refresh_task_goal(summ);
1232 }
1233
1234 pub fn maybe_refresh_subgoal(&self, summ: &mut ContextSummarizationView<'_>) {
1239 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
1240 }
1241}
1242
1243struct TxStatusSink(Option<tokio::sync::mpsc::UnboundedSender<String>>);
1249
1250impl StatusSink for TxStatusSink {
1251 fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
1252 if let Some(ref tx) = self.0 {
1253 let _ = tx.send(msg.to_owned());
1254 }
1255 std::future::ready(())
1256 }
1257}
1258
1259pub(crate) fn recompute_prompt_tokens(window: &mut MessageWindowView<'_>) {
1266 *window.cached_prompt_tokens = window
1267 .messages
1268 .iter()
1269 .map(|m| window.token_counter.count_message_tokens(m) as u64)
1270 .sum();
1271}
1272
1273pub(crate) fn remove_by_prefix(
1279 messages: &mut Vec<zeph_llm::provider::Message>,
1280 role: Role,
1281 prefix: &str,
1282) {
1283 messages.retain(|m| m.role != role || !m.content.starts_with(prefix));
1284}
1285
1286pub(crate) fn remove_by_part_or_prefix(
1292 messages: &mut Vec<zeph_llm::provider::Message>,
1293 prefix: &str,
1294 part_matches: impl Fn(&MessagePart) -> bool,
1295) {
1296 messages.retain(|m| {
1297 if m.role != Role::System {
1298 return true;
1299 }
1300 if m.parts.first().is_some_and(&part_matches) {
1301 return false;
1302 }
1303 !m.content.starts_with(prefix)
1304 });
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309 use std::collections::HashSet;
1310 use std::sync::Arc;
1311
1312 use zeph_llm::provider::{Message, MessagePart, Role};
1313 use zeph_memory::TokenCounter;
1314
1315 use super::*;
1316 use crate::helpers::{GRAPH_FACTS_PREFIX, RECALL_PREFIX, SUMMARY_PREFIX};
1317 use crate::state::MessageWindowView;
1318
1319 fn make_counter() -> Arc<TokenCounter> {
1320 Arc::new(TokenCounter::default())
1321 }
1322
1323 fn make_window<'a>(
1324 messages: &'a mut Vec<Message>,
1325 cached: &'a mut u64,
1326 completed: &'a mut HashSet<String>,
1327 ) -> MessageWindowView<'a> {
1328 let last = Box::leak(Box::new(None::<i64>));
1329 let deferred_hide = Box::leak(Box::new(Vec::<i64>::new()));
1330 let deferred_summ = Box::leak(Box::new(Vec::<String>::new()));
1331 MessageWindowView {
1332 messages,
1333 last_persisted_message_id: last,
1334 deferred_db_hide_ids: deferred_hide,
1335 deferred_db_summaries: deferred_summ,
1336 cached_prompt_tokens: cached,
1337 token_counter: make_counter(),
1338 completed_tool_ids: completed,
1339 }
1340 }
1341
1342 fn sys(text: &str) -> Message {
1343 Message::from_legacy(Role::System, text)
1344 }
1345
1346 fn user(text: &str) -> Message {
1347 Message::from_legacy(Role::User, text)
1348 }
1349
1350 fn assistant(text: &str) -> Message {
1351 Message::from_legacy(Role::Assistant, text)
1352 }
1353
1354 #[test]
1355 fn clear_history_keeps_system_prompt() {
1356 let mut msgs = vec![sys("system"), user("hello"), assistant("hi")];
1357 let mut cached = 0u64;
1358 let mut completed = HashSet::new();
1359 completed.insert("tool_1".to_owned());
1360 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1361
1362 ContextService::new().clear_history(&mut window);
1363
1364 assert_eq!(window.messages.len(), 1);
1365 assert_eq!(window.messages[0].content, "system");
1366 assert!(
1367 window.completed_tool_ids.is_empty(),
1368 "completed_tool_ids must be cleared"
1369 );
1370 }
1371
1372 #[test]
1373 fn clear_history_empty_messages_is_noop() {
1374 let mut msgs: Vec<Message> = vec![];
1375 let mut cached = 0u64;
1376 let mut completed = HashSet::new();
1377 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1378
1379 ContextService::new().clear_history(&mut window);
1380
1381 assert!(window.messages.is_empty());
1382 }
1383
1384 #[test]
1385 fn remove_recall_messages_removes_by_prefix() {
1386 let mut msgs = vec![
1387 sys("system"),
1388 sys(&format!("{RECALL_PREFIX}some recalled text")),
1389 user("hello"),
1390 ];
1391 let mut cached = 0u64;
1392 let mut completed = HashSet::new();
1393 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1394
1395 ContextService::new().remove_recall_messages(&mut window);
1396
1397 assert_eq!(window.messages.len(), 2);
1398 assert!(
1399 window
1400 .messages
1401 .iter()
1402 .all(|m| !m.content.starts_with(RECALL_PREFIX))
1403 );
1404 }
1405
1406 #[test]
1407 fn remove_graph_facts_messages_removes_matching() {
1408 let mut msgs = vec![
1409 sys("system"),
1410 sys(&format!("{GRAPH_FACTS_PREFIX}fact1")),
1411 user("hello"),
1412 ];
1413 let mut cached = 0u64;
1414 let mut completed = HashSet::new();
1415 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1416
1417 ContextService::new().remove_graph_facts_messages(&mut window);
1418
1419 assert_eq!(window.messages.len(), 2);
1420 }
1421
1422 #[test]
1423 fn remove_summary_messages_removes_by_part() {
1424 let mut msgs = vec![
1425 sys("system"),
1426 Message::from_parts(
1427 Role::System,
1428 vec![MessagePart::Summary {
1429 text: format!("{SUMMARY_PREFIX}old summary"),
1430 }],
1431 ),
1432 user("hello"),
1433 ];
1434 let mut cached = 0u64;
1435 let mut completed = HashSet::new();
1436 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1437
1438 ContextService::new().remove_summary_messages(&mut window);
1439
1440 assert_eq!(window.messages.len(), 2);
1441 }
1442
1443 #[test]
1444 fn trim_messages_to_budget_zero_is_noop() {
1445 let mut msgs = vec![sys("system"), user("a"), assistant("b"), user("c")];
1446 let original_len = msgs.len();
1447 let mut cached = 0u64;
1448 let mut completed = HashSet::new();
1449 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1450
1451 ContextService::new().trim_messages_to_budget(&mut window, 0);
1452
1453 assert_eq!(window.messages.len(), original_len);
1454 }
1455
1456 #[test]
1457 fn trim_messages_to_budget_keeps_recent() {
1458 let mut msgs = vec![
1460 sys("system"),
1461 user("message 1"),
1462 assistant("reply 1"),
1463 user("message 2"),
1464 ];
1465 let mut cached = 0u64;
1466 let mut completed = HashSet::new();
1467 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1468
1469 ContextService::new().trim_messages_to_budget(&mut window, 1);
1471
1472 assert!(
1474 window.messages.len() < 4,
1475 "trim should remove some messages"
1476 );
1477 assert_eq!(
1478 window.messages[0].role,
1479 Role::System,
1480 "system prompt must survive trim"
1481 );
1482 }
1483}