1use zeph_context::budget::ContextBudget;
7use zeph_llm::LlmProvider;
8use zeph_llm::provider::{MessagePart, Role};
9
10use crate::error::ContextError;
11use crate::helpers::{
12 CODE_CONTEXT_PREFIX, CORRECTIONS_PREFIX, CROSS_SESSION_PREFIX, DOCUMENT_RAG_PREFIX,
13 GRAPH_FACTS_PREFIX, LSP_NOTE_PREFIX, PERSONA_PREFIX, REASONING_PREFIX, RECALL_PREFIX,
14 SESSION_DIGEST_PREFIX, SUMMARY_PREFIX, TRAJECTORY_PREFIX, TREE_MEMORY_PREFIX,
15};
16use crate::state::{
17 ContextAssemblyView, ContextDelta, ContextSummarizationView, MessageWindowView,
18 ProviderHandles, StatusSink,
19};
20
21#[derive(Debug, Default)]
39pub struct ContextService;
40
41impl ContextService {
42 #[must_use]
46 pub fn new() -> Self {
47 Self
48 }
49
50 pub fn clear_history(&self, window: &mut MessageWindowView<'_>) {
58 let system_prompt = window.messages.first().cloned();
59 window.messages.clear();
60 if let Some(sp) = system_prompt {
61 window.messages.push(sp);
62 }
63 window.completed_tool_ids.clear();
64 recompute_prompt_tokens(window);
65 }
66
67 pub fn remove_recall_messages(&self, window: &mut MessageWindowView<'_>) {
69 remove_by_part_or_prefix(window.messages, RECALL_PREFIX, |p| {
70 matches!(p, MessagePart::Recall { .. })
71 });
72 }
73
74 pub fn remove_correction_messages(&self, window: &mut MessageWindowView<'_>) {
76 remove_by_prefix(window.messages, Role::System, CORRECTIONS_PREFIX);
77 }
78
79 pub fn remove_graph_facts_messages(&self, window: &mut MessageWindowView<'_>) {
81 remove_by_prefix(window.messages, Role::System, GRAPH_FACTS_PREFIX);
82 }
83
84 pub fn remove_persona_facts_messages(&self, window: &mut MessageWindowView<'_>) {
86 remove_by_prefix(window.messages, Role::System, PERSONA_PREFIX);
87 }
88
89 pub fn remove_trajectory_hints_messages(&self, window: &mut MessageWindowView<'_>) {
91 remove_by_prefix(window.messages, Role::System, TRAJECTORY_PREFIX);
92 }
93
94 pub fn remove_tree_memory_messages(&self, window: &mut MessageWindowView<'_>) {
96 remove_by_prefix(window.messages, Role::System, TREE_MEMORY_PREFIX);
97 }
98
99 pub fn remove_reasoning_strategies_messages(&self, window: &mut MessageWindowView<'_>) {
101 remove_by_prefix(window.messages, Role::System, REASONING_PREFIX);
102 }
103
104 pub fn remove_lsp_messages(&self, window: &mut MessageWindowView<'_>) {
109 remove_by_prefix(window.messages, Role::System, LSP_NOTE_PREFIX);
110 }
111
112 pub fn remove_code_context_messages(&self, window: &mut MessageWindowView<'_>) {
114 remove_by_part_or_prefix(window.messages, CODE_CONTEXT_PREFIX, |p| {
115 matches!(p, MessagePart::CodeContext { .. })
116 });
117 }
118
119 pub fn remove_summary_messages(&self, window: &mut MessageWindowView<'_>) {
121 remove_by_part_or_prefix(window.messages, SUMMARY_PREFIX, |p| {
122 matches!(p, MessagePart::Summary { .. })
123 });
124 }
125
126 pub fn remove_cross_session_messages(&self, window: &mut MessageWindowView<'_>) {
128 remove_by_part_or_prefix(window.messages, CROSS_SESSION_PREFIX, |p| {
129 matches!(p, MessagePart::CrossSession { .. })
130 });
131 }
132
133 pub fn remove_session_digest_message(&self, window: &mut MessageWindowView<'_>) {
135 remove_by_prefix(window.messages, Role::User, SESSION_DIGEST_PREFIX);
136 }
137
138 pub fn remove_document_rag_messages(&self, window: &mut MessageWindowView<'_>) {
140 remove_by_prefix(window.messages, Role::System, DOCUMENT_RAG_PREFIX);
141 }
142
143 pub fn trim_messages_to_budget(&self, window: &mut MessageWindowView<'_>, token_budget: usize) {
151 if token_budget == 0 {
152 return;
153 }
154
155 let history_start = window
157 .messages
158 .iter()
159 .position(|m| m.role != Role::System)
160 .unwrap_or(window.messages.len());
161
162 if history_start >= window.messages.len() {
163 return;
164 }
165
166 let mut total = 0usize;
167 let mut keep_from = window.messages.len();
168
169 for i in (history_start..window.messages.len()).rev() {
170 let msg_tokens = window
171 .token_counter
172 .count_message_tokens(&window.messages[i]);
173 if total + msg_tokens > token_budget {
174 break;
175 }
176 total += msg_tokens;
177 keep_from = i;
178 }
179
180 if keep_from > history_start {
181 let removed = keep_from - history_start;
182 window.messages.drain(history_start..keep_from);
183 recompute_prompt_tokens(window);
184 tracing::info!(
185 removed,
186 token_budget,
187 "trimmed messages to fit context budget"
188 );
189 }
190 }
191
192 pub async fn inject_semantic_recall(
204 &self,
205 query: &str,
206 token_budget: usize,
207 window: &mut MessageWindowView<'_>,
208 view: &ContextAssemblyView<'_>,
209 ) -> Result<(), ContextError> {
210 self.remove_recall_messages(window);
211
212 let (msg, _score) = crate::helpers::fetch_semantic_recall_raw(
213 view.memory.as_deref(),
214 view.recall_limit,
215 view.context_format,
216 query,
217 token_budget,
218 &view.token_counter,
219 None,
220 )
221 .await?;
222
223 if let Some(msg) = msg
224 && window.messages.len() > 1
225 {
226 window.messages.insert(1, msg);
227 }
228
229 Ok(())
230 }
231
232 pub async fn inject_cross_session_context(
241 &self,
242 query: &str,
243 token_budget: usize,
244 window: &mut MessageWindowView<'_>,
245 view: &ContextAssemblyView<'_>,
246 ) -> Result<(), ContextError> {
247 self.remove_cross_session_messages(window);
248
249 if let Some(msg) = crate::helpers::fetch_cross_session_raw(
250 view.memory.as_deref(),
251 view.conversation_id,
252 view.cross_session_score_threshold,
253 query,
254 token_budget,
255 &view.token_counter,
256 )
257 .await?
258 && window.messages.len() > 1
259 {
260 window.messages.insert(1, msg);
261 tracing::debug!("injected cross-session context");
262 }
263
264 Ok(())
265 }
266
267 pub async fn inject_summaries(
276 &self,
277 token_budget: usize,
278 window: &mut MessageWindowView<'_>,
279 view: &ContextAssemblyView<'_>,
280 ) -> Result<(), ContextError> {
281 self.remove_summary_messages(window);
282
283 if let Some(msg) = crate::helpers::fetch_summaries_raw(
284 view.memory.as_deref(),
285 view.conversation_id,
286 token_budget,
287 &view.token_counter,
288 )
289 .await?
290 && window.messages.len() > 1
291 {
292 window.messages.insert(1, msg);
293 tracing::debug!("injected summaries into context");
294 }
295
296 Ok(())
297 }
298
299 pub async fn disambiguate_skills(
304 &self,
305 query: &str,
306 all_meta: &[&zeph_skills::loader::SkillMeta],
307 scored: &[zeph_skills::ScoredMatch],
308 providers: &ProviderHandles,
309 ) -> Option<Vec<usize>> {
310 use std::fmt::Write as _;
311
312 let mut candidates = String::new();
313 for sm in scored {
314 if let Some(meta) = all_meta.get(sm.index) {
315 let _ = writeln!(
316 candidates,
317 "- {} (score: {:.3}): {}",
318 meta.name, sm.score, meta.description
319 );
320 }
321 }
322
323 let prompt = format!(
324 "The user said: \"{query}\"\n\n\
325 These skills matched with similar scores:\n{candidates}\n\
326 Which skill best matches the user's intent? \
327 Return the skill_name, your confidence (0-1), and any extracted parameters."
328 );
329
330 let messages = vec![zeph_llm::provider::Message::from_legacy(
331 zeph_llm::provider::Role::User,
332 prompt,
333 )];
334 match providers
335 .primary
336 .chat_typed::<zeph_skills::IntentClassification>(&messages)
337 .await
338 {
339 Ok(classification) => {
340 tracing::info!(
341 skill = %classification.skill_name,
342 confidence = classification.confidence,
343 "disambiguation selected skill"
344 );
345 let mut indices: Vec<usize> = scored.iter().map(|s| s.index).collect();
346 if let Some(pos) = indices.iter().position(|&i| {
347 all_meta
348 .get(i)
349 .is_some_and(|m| m.name == classification.skill_name)
350 }) {
351 indices.swap(0, pos);
352 }
353 Some(indices)
354 }
355 Err(e) => {
356 tracing::warn!("disambiguation failed, using original order: {e:#}");
357 None
358 }
359 }
360 }
361
362 #[allow(clippy::too_many_lines)] pub async fn prepare_context(
375 &self,
376 query: &str,
377 window: &mut MessageWindowView<'_>,
378 view: &mut ContextAssemblyView<'_>,
379 _providers: &ProviderHandles,
380 ) -> Result<ContextDelta, ContextError> {
381 if view.context_manager.budget.is_none() {
382 return Ok(ContextDelta::default());
383 }
384
385 self.remove_session_digest_message(window);
387 self.remove_summary_messages(window);
388 self.remove_cross_session_messages(window);
389 self.remove_recall_messages(window);
390 self.remove_document_rag_messages(window);
391 self.remove_correction_messages(window);
392 self.remove_code_context_messages(window);
393 self.remove_graph_facts_messages(window);
394 self.remove_persona_facts_messages(window);
395 self.remove_trajectory_hints_messages(window);
396 self.remove_tree_memory_messages(window);
397 if view.reasoning_config.enabled {
398 self.remove_reasoning_strategies_messages(window);
399 }
400
401 if let Some(explorer) = view.proactive_explorer.clone()
403 && let Some(domain) = explorer.classify(query)
404 {
405 let already_known = {
406 let registry_guard = view.skill_registry.read();
407 explorer.has_knowledge(®istry_guard, &domain)
408 };
409 let excluded = explorer.is_excluded(&domain);
410
411 if !already_known && !excluded {
412 tracing::debug!(domain = %domain.0, query_len = query.len(), "proactive.explore triggered");
413 let timeout_ms = explorer.timeout_ms();
414 let result = tokio::time::timeout(
415 std::time::Duration::from_millis(timeout_ms),
416 explorer.explore(&domain),
417 )
418 .await;
419 match result {
420 Ok(Ok(())) => {
421 view.skill_registry.write().reload(view.skill_paths);
422 tracing::debug!(domain = %domain.0, "proactive.explore complete, registry reloaded");
423 }
424 Ok(Err(e)) => {
425 tracing::warn!(domain = %domain.0, error = %e, "proactive exploration failed");
426 }
427 Err(_) => {
428 tracing::warn!(domain = %domain.0, timeout_ms, "proactive exploration timed out");
429 }
430 }
431 }
432 }
433
434 let active_levels: &'static [zeph_memory::compression::CompressionLevel] =
436 if let Some(ref budget) = view.context_manager.budget {
437 let used = view.cached_prompt_tokens;
438 let max = budget.max_tokens();
439 #[allow(clippy::cast_precision_loss)]
440 let remaining_ratio = if max == 0 {
441 1.0_f32
442 } else {
443 1.0 - (used as f32 / max as f32).clamp(0.0, 1.0)
444 };
445 let levels =
446 zeph_memory::compression::RetrievalPolicy::default().select(remaining_ratio);
447 tracing::debug!(
448 remaining_ratio,
449 active_levels = ?levels,
450 "compression_spectrum: retrieval policy selected"
451 );
452 levels
453 } else {
454 &[]
455 };
456
457 let memory_view = zeph_context::input::ContextMemoryView {
458 memory: view.memory.clone(),
459 conversation_id: view.conversation_id,
460 recall_limit: view.recall_limit,
461 cross_session_score_threshold: view.cross_session_score_threshold,
462 context_strategy: view.context_strategy,
463 crossover_turn_threshold: view.crossover_turn_threshold,
464 cached_session_digest: view.cached_session_digest.clone(),
465 graph_config: view.graph_config.clone(),
466 document_config: view.document_config.clone(),
467 persona_config: view.persona_config.clone(),
468 trajectory_config: view.trajectory_config.clone(),
469 reasoning_config: view.reasoning_config.clone(),
470 tree_config: view.tree_config.clone(),
471 };
472
473 #[cfg(feature = "index")]
474 let index_access = view.index;
475 #[cfg(not(feature = "index"))]
476 let index_access: Option<&dyn zeph_context::input::IndexAccess> = None;
477
478 let input = zeph_context::input::ContextAssemblyInput {
479 memory: &memory_view,
480 context_manager: view.context_manager,
481 token_counter: &view.token_counter,
482 skills_prompt: view.last_skills_prompt,
483 index: index_access,
484 correction_config: view.correction_config,
485 sidequest_turn_counter: view.sidequest_turn_counter,
486 messages: window.messages,
487 query,
488 scrub: view.scrub,
489 active_levels,
490 };
491
492 let prepared = zeph_context::assembler::ContextAssembler::gather(&input).await?;
493
494 let delta = self.apply_prepared_context(window, view, prepared).await;
495 Ok(delta)
496 }
497
498 #[allow(clippy::too_many_lines)] async fn apply_prepared_context(
507 &self,
508 window: &mut MessageWindowView<'_>,
509 view: &mut ContextAssemblyView<'_>,
510 prepared: zeph_context::assembler::PreparedContext,
511 ) -> ContextDelta {
512 use std::borrow::Cow;
513 use zeph_llm::provider::{Message, MessageMetadata, Role};
514 use zeph_sanitizer::{ContentSource, ContentSourceKind, MemorySourceHint};
515
516 *view.last_recall_confidence = prepared.recall_confidence;
518
519 if prepared.memory_first {
521 let history_start = 1usize;
522 let len = window.messages.len();
523 let keep_tail =
524 zeph_context::assembler::memory_first_keep_tail(window.messages, history_start);
525 if len > history_start + keep_tail {
526 window.messages.drain(history_start..len - keep_tail);
527 recompute_prompt_tokens(window);
528 tracing::debug!(
529 strategy = "memory_first",
530 keep_tail,
531 "dropped conversation history, kept last {keep_tail} messages"
532 );
533 }
534 }
535
536 if let Some(msg) = prepared.graph_facts.filter(|_| window.messages.len() > 1) {
538 let sanitized = self
539 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
540 .await;
541 window.messages.insert(1, sanitized);
542 tracing::debug!("injected knowledge graph facts into context");
543 }
544 if let Some(msg) = prepared.doc_rag.filter(|_| window.messages.len() > 1) {
545 let sanitized = self
546 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
547 .await;
548 window.messages.insert(1, sanitized);
549 tracing::debug!("injected document RAG context");
550 }
551 if let Some(msg) = prepared.corrections.filter(|_| window.messages.len() > 1) {
552 let sanitized = self
553 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
554 .await;
555 window.messages.insert(1, sanitized);
556 tracing::debug!("injected past corrections into context");
557 }
558 if let Some(msg) = prepared.recall.filter(|_| window.messages.len() > 1) {
559 let sanitized = self
560 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
561 .await;
562 window.messages.insert(1, sanitized);
563 }
564 if let Some(msg) = prepared.cross_session.filter(|_| window.messages.len() > 1) {
565 let sanitized = self
566 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
567 .await;
568 window.messages.insert(1, sanitized);
569 }
570 if let Some(msg) = prepared.summaries.filter(|_| window.messages.len() > 1) {
571 let sanitized = self
572 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
573 .await;
574 window.messages.insert(1, sanitized);
575 tracing::debug!("injected summaries into context");
576 }
577 if let Some(msg) = prepared.persona_facts.filter(|_| window.messages.len() > 1) {
578 let sanitized = self
579 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
580 .await;
581 window.messages.insert(1, sanitized);
582 tracing::debug!("injected persona facts into context");
583 }
584 if let Some(msg) = prepared
585 .trajectory_hints
586 .filter(|_| window.messages.len() > 1)
587 {
588 let sanitized = self
589 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
590 .await;
591 window.messages.insert(1, sanitized);
592 tracing::debug!("injected trajectory hints into context");
593 }
594 if let Some(msg) = prepared.tree_memory.filter(|_| window.messages.len() > 1) {
595 let sanitized = self
596 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
597 .await;
598 window.messages.insert(1, sanitized);
599 tracing::debug!("injected tree memory summary into context");
600 }
601 if let Some(msg) = prepared
602 .reasoning_hints
603 .filter(|_| window.messages.len() > 1)
604 {
605 let sanitized = self
606 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
607 .await;
608 window.messages.insert(1, sanitized);
609 tracing::debug!("injected reasoning strategies into context");
610 }
611
612 let code_context = if let Some(text) = prepared.code_context {
614 let sanitized = view
615 .sanitizer
616 .sanitize(&text, ContentSource::new(ContentSourceKind::ToolResult));
617 view.metrics.sanitizer_runs += 1;
618 if !sanitized.injection_flags.is_empty() {
619 tracing::warn!(
620 flags = sanitized.injection_flags.len(),
621 "injection patterns detected in code RAG context"
622 );
623 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
624 let detail = sanitized
625 .injection_flags
626 .first()
627 .map_or_else(String::new, |f| {
628 format!("Detected pattern: {}", f.pattern_name)
629 });
630 view.security_events.push(
631 zeph_common::SecurityEventCategory::InjectionFlag,
632 "code_rag",
633 detail,
634 );
635 }
636 if sanitized.was_truncated {
637 view.metrics.sanitizer_truncations += 1;
638 view.security_events.push(
639 zeph_common::SecurityEventCategory::Truncation,
640 "code_rag",
641 "Content truncated to max_content_size".to_string(),
642 );
643 }
644 Some(sanitized.body)
645 } else {
646 None
647 };
648
649 if !prepared.memory_first {
650 self.trim_messages_to_budget(window, prepared.recent_history_budget);
651 }
652
653 if view.digest_enabled
655 && let Some((digest_text, _)) = view
656 .cached_session_digest
657 .clone()
658 .filter(|_| window.messages.len() > 1)
659 {
660 let digest_msg = Message {
661 role: Role::User,
662 content: format!("{}{digest_text}", crate::helpers::SESSION_DIGEST_PREFIX),
663 parts: vec![],
664 metadata: MessageMetadata::default(),
665 };
666 let sanitized = self
667 .sanitize_memory_message(digest_msg, MemorySourceHint::LlmSummary, view)
668 .await;
669 window.messages.insert(1, sanitized);
670 tracing::debug!("injected session digest into context");
671 }
672
673 if view.redact_credentials {
675 for msg in &mut *window.messages {
676 if msg.role == Role::System {
677 continue;
678 }
679 if let Cow::Owned(s) = (view.scrub)(&msg.content) {
680 msg.content = s;
681 }
682 }
683 }
684
685 recompute_prompt_tokens(window);
686
687 ContextDelta { code_context }
688 }
689
690 async fn sanitize_memory_message(
700 &self,
701 mut msg: zeph_llm::provider::Message,
702 hint: zeph_sanitizer::MemorySourceHint,
703 view: &mut ContextAssemblyView<'_>,
704 ) -> zeph_llm::provider::Message {
705 use zeph_sanitizer::{ContentSource, ContentSourceKind};
706
707 let source = ContentSource::new(ContentSourceKind::MemoryRetrieval).with_memory_hint(hint);
708 let sanitized = view.sanitizer.sanitize(&msg.content, source);
709 view.metrics.sanitizer_runs += 1;
710 if !sanitized.injection_flags.is_empty() {
711 tracing::warn!(
712 flags = sanitized.injection_flags.len(),
713 "injection patterns detected in memory retrieval"
714 );
715 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
716 let detail = sanitized
717 .injection_flags
718 .first()
719 .map_or_else(String::new, |f| {
720 format!("Detected pattern: {}", f.pattern_name)
721 });
722 view.security_events.push(
723 zeph_common::SecurityEventCategory::InjectionFlag,
724 "memory_retrieval",
725 detail,
726 );
727 }
728 if sanitized.was_truncated {
729 view.metrics.sanitizer_truncations += 1;
730 view.security_events.push(
731 zeph_common::SecurityEventCategory::Truncation,
732 "memory_retrieval",
733 "Content truncated to max_content_size".to_string(),
734 );
735 }
736
737 if view.sanitizer.is_enabled()
739 && let Some(qs) = view.quarantine_summarizer
740 && qs.should_quarantine(ContentSourceKind::MemoryRetrieval)
741 {
742 match qs.extract_facts(&sanitized, view.sanitizer).await {
743 Ok((facts, flags)) => {
744 view.metrics.quarantine_invocations += 1;
745 view.security_events.push(
746 zeph_common::SecurityEventCategory::Quarantine,
747 "memory_retrieval",
748 "Content quarantined, facts extracted".to_string(),
749 );
750 let escaped = zeph_sanitizer::ContentSanitizer::escape_delimiter_tags(&facts);
751 msg.content = zeph_sanitizer::ContentSanitizer::apply_spotlight(
752 &escaped,
753 &sanitized.source,
754 &flags,
755 );
756 return msg;
757 }
758 Err(e) => {
759 tracing::warn!(
760 error = %e,
761 "quarantine failed for memory retrieval, using original sanitized content"
762 );
763 view.metrics.quarantine_failures += 1;
764 view.security_events.push(
765 zeph_common::SecurityEventCategory::Quarantine,
766 "memory_retrieval",
767 format!("Quarantine failed: {e}"),
768 );
769 }
770 }
771 }
772
773 msg.content = sanitized.body;
774 msg
775 }
776
777 pub async fn reset_conversation(
787 &self,
788 window: &mut MessageWindowView<'_>,
789 _view: &mut ContextAssemblyView<'_>,
790 ) -> Result<(), ContextError> {
791 self.clear_history(window);
792 Ok(())
793 }
794
795 #[allow(
812 clippy::cast_precision_loss,
813 clippy::cast_possible_truncation,
814 clippy::cast_sign_loss
815 )]
816 pub async fn maybe_compact(
817 &self,
818 summ: &mut ContextSummarizationView<'_>,
819 _providers: &ProviderHandles,
820 status: &(impl StatusSink + ?Sized),
821 ) -> Result<(), ContextError> {
822 use zeph_context::manager::{CompactionState, CompactionTier};
823
824 if let Some(ref mut count) = summ.context_manager.turns_since_last_hard_compaction {
826 *count += 1;
827 }
828
829 if let CompactionState::Exhausted { ref mut warned } = summ.context_manager.compaction
831 && !*warned
832 {
833 *warned = true;
834 tracing::warn!("compaction exhausted: context budget too tight for this session");
835 }
836 if summ.context_manager.compaction.is_exhausted() {
837 return Ok(());
838 }
839
840 if summ.server_compaction_active {
842 let budget = summ
843 .context_manager
844 .budget
845 .as_ref()
846 .map_or(0, ContextBudget::max_tokens);
847 if budget > 0 {
848 let fallback = (budget * 95 / 100) as u64;
849 if *summ.cached_prompt_tokens < fallback {
850 return Ok(());
851 }
852 tracing::warn!(
853 "server compaction active but context at 95%+ — falling back to client-side"
854 );
855 } else {
856 return Ok(());
857 }
858 }
859
860 if summ.context_manager.compaction.is_compacted_this_turn() {
862 return Ok(());
863 }
864
865 let in_cooldown = summ.context_manager.compaction.cooldown_remaining() > 0;
867 if in_cooldown
868 && let CompactionState::Cooling {
869 ref mut turns_remaining,
870 } = summ.context_manager.compaction
871 {
872 *turns_remaining -= 1;
873 if *turns_remaining == 0 {
874 summ.context_manager.compaction = CompactionState::Ready;
875 }
876 }
877
878 match summ
879 .context_manager
880 .compaction_tier(*summ.cached_prompt_tokens)
881 {
882 CompactionTier::None => Ok(()),
883 CompactionTier::Soft => {
884 self.do_soft_compaction(summ, status).await;
885 Ok(())
886 }
887 CompactionTier::Hard => self.do_hard_compaction(summ, status, in_cooldown).await,
888 }
889 }
890
891 #[allow(
896 clippy::cast_precision_loss,
897 clippy::cast_possible_truncation,
898 clippy::cast_sign_loss
899 )]
900 async fn do_soft_compaction(
901 &self,
902 summ: &mut ContextSummarizationView<'_>,
903 status: &(impl StatusSink + ?Sized),
904 ) {
905 status.send_status("soft compacting context...").await;
906
907 match &summ.context_manager.compression.pruning_strategy {
909 zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {
910 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
911 }
912 _ => crate::summarization::scheduling::maybe_refresh_task_goal(summ),
913 }
914
915 let applied = crate::summarization::deferred::apply_deferred_summaries(summ);
917
918 if applied > 0
920 && summ
921 .context_manager
922 .compression
923 .pruning_strategy
924 .is_subgoal()
925 {
926 summ.subgoal_registry
927 .rebuild_after_compaction(summ.messages, 0);
928 }
929
930 let budget = summ
932 .context_manager
933 .budget
934 .as_ref()
935 .map_or(0, ContextBudget::max_tokens);
936 let soft_threshold =
937 (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
938 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
939 let min_to_free = cached.saturating_sub(soft_threshold);
940 if min_to_free > 0 {
941 crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
942 }
943
944 status.send_status("").await;
945 tracing::info!(
946 cached_tokens = *summ.cached_prompt_tokens,
947 soft_threshold,
948 "soft compaction complete"
949 );
950 }
951
952 #[allow(
954 clippy::cast_precision_loss,
955 clippy::cast_possible_truncation,
956 clippy::cast_sign_loss
957 )]
958 async fn do_hard_compaction(
959 &self,
960 summ: &mut ContextSummarizationView<'_>,
961 status: &(impl StatusSink + ?Sized),
962 in_cooldown: bool,
963 ) -> Result<(), ContextError> {
964 use zeph_context::manager::CompactionState;
965
966 let turns_since_last = summ
968 .context_manager
969 .turns_since_last_hard_compaction
970 .map(|t| u32::try_from(t).unwrap_or(u32::MAX));
971 summ.context_manager.turns_since_last_hard_compaction = Some(0);
972 if let Some(metrics) = summ.metrics {
973 metrics.record_hard_compaction(turns_since_last);
974 }
975
976 if in_cooldown {
977 tracing::debug!(
978 turns_remaining = summ.context_manager.compaction.cooldown_remaining(),
979 "hard compaction skipped: cooldown active"
980 );
981 return Ok(());
982 }
983
984 let budget = summ
985 .context_manager
986 .budget
987 .as_ref()
988 .map_or(0, ContextBudget::max_tokens);
989 let hard_threshold =
990 (budget as f32 * summ.context_manager.hard_compaction_threshold) as usize;
991 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
992 let min_to_free = cached.saturating_sub(hard_threshold);
993
994 status.send_status("compacting context...").await;
995
996 crate::summarization::deferred::apply_deferred_summaries(summ);
998
999 let freed = crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
1001 if freed >= min_to_free {
1002 tracing::info!(freed, "hard compaction: pruning sufficient");
1003 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1004 cooldown: summ.context_manager.compaction_cooldown_turns,
1005 };
1006 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1007 tracing::warn!(%e, "flush_deferred_summaries failed after hard compaction");
1008 }
1009 status.send_status("").await;
1010 return Ok(());
1011 }
1012
1013 let preserve_tail = summ.context_manager.compaction_preserve_tail;
1015 let compactable = summ.messages.len().saturating_sub(preserve_tail + 1);
1016 if compactable <= 1 {
1017 tracing::warn!(
1018 compactable,
1019 "hard compaction: too few messages, marking exhausted"
1020 );
1021 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1022 status.send_status("").await;
1023 return Ok(());
1024 }
1025
1026 tracing::info!(
1028 min_to_free,
1029 "hard compaction: falling back to LLM summarization"
1030 );
1031 let tokens_before = *summ.cached_prompt_tokens;
1032 let outcome = crate::summarization::compaction::compact_context(summ, None).await?;
1033
1034 let freed_tokens = tokens_before.saturating_sub(*summ.cached_prompt_tokens);
1035
1036 if !outcome.is_compacted() || freed_tokens == 0 {
1037 tracing::warn!("hard compaction: no net reduction, marking exhausted");
1038 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1039 status.send_status("").await;
1040 return Ok(());
1041 }
1042
1043 if matches!(
1044 summ.context_manager
1045 .compaction_tier(*summ.cached_prompt_tokens),
1046 zeph_context::manager::CompactionTier::Hard
1047 ) {
1048 tracing::warn!(
1049 freed_tokens,
1050 "hard compaction: still above hard threshold after compaction, marking exhausted"
1051 );
1052 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1053 status.send_status("").await;
1054 return Ok(());
1055 }
1056
1057 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1058 cooldown: summ.context_manager.compaction_cooldown_turns,
1059 };
1060
1061 if tokens_before > *summ.cached_prompt_tokens {
1062 tracing::info!(
1063 tokens_before,
1064 tokens_after = *summ.cached_prompt_tokens,
1065 saved = freed_tokens,
1066 "context compaction complete"
1067 );
1068 }
1069
1070 status.send_status("").await;
1071 Ok(())
1072 }
1073
1074 pub async fn maybe_summarize_tool_pair(
1080 &self,
1081 summ: &mut ContextSummarizationView<'_>,
1082 providers: &ProviderHandles,
1083 ) {
1084 crate::summarization::deferred::maybe_summarize_tool_pair(
1085 summ,
1086 providers,
1087 &TxStatusSink(summ.status_tx.clone()),
1088 )
1089 .await;
1090 }
1091
1092 #[must_use]
1097 pub fn apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) -> usize {
1098 crate::summarization::deferred::apply_deferred_summaries(summ)
1099 }
1100
1101 pub async fn flush_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1106 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1107 tracing::warn!(%e, "flush_deferred_summaries failed");
1108 }
1109 }
1110
1111 pub fn maybe_apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1117 crate::summarization::deferred::maybe_apply_deferred_summaries(summ);
1118 }
1119
1120 pub async fn compact_context(
1135 &self,
1136 summ: &mut ContextSummarizationView<'_>,
1137 max_summary_tokens: Option<usize>,
1138 ) -> Result<crate::state::CompactionOutcome, crate::error::ContextError> {
1139 crate::summarization::compaction::compact_context(summ, max_summary_tokens).await
1140 }
1141
1142 pub fn maybe_soft_compact_mid_iteration(&self, summ: &mut ContextSummarizationView<'_>) {
1148 crate::summarization::scheduling::maybe_soft_compact_mid_iteration(summ);
1149 }
1150
1151 pub async fn maybe_proactive_compress(
1157 &self,
1158 summ: &mut ContextSummarizationView<'_>,
1159 _providers: &ProviderHandles,
1160 status: &(impl StatusSink + ?Sized),
1161 ) {
1162 let Some((_threshold, max_summary_tokens)) = summ
1163 .context_manager
1164 .should_proactively_compress(*summ.cached_prompt_tokens)
1165 else {
1166 return;
1167 };
1168
1169 if summ.server_compaction_active {
1170 let budget = summ
1171 .context_manager
1172 .budget
1173 .as_ref()
1174 .map_or(0, ContextBudget::max_tokens);
1175 if budget > 0 {
1176 let fallback = (budget * 95 / 100) as u64;
1177 if *summ.cached_prompt_tokens <= fallback {
1178 return;
1179 }
1180 tracing::warn!(
1181 cached_prompt_tokens = *summ.cached_prompt_tokens,
1182 fallback_threshold = fallback,
1183 "server compaction active but context at 95%+ — falling back to proactive"
1184 );
1185 } else {
1186 return;
1187 }
1188 }
1189
1190 status.send_status("compressing context...").await;
1191 tracing::info!(
1192 max_summary_tokens,
1193 cached_tokens = *summ.cached_prompt_tokens,
1194 "proactive compression triggered"
1195 );
1196
1197 match crate::summarization::compaction::compact_context(summ, Some(max_summary_tokens))
1198 .await
1199 {
1200 Ok(outcome) if outcome.is_compacted() => {
1201 summ.context_manager.compaction =
1202 zeph_context::manager::CompactionState::CompactedThisTurn { cooldown: 0 };
1203 tracing::info!("proactive compression complete");
1204 }
1205 Ok(_) => {}
1206 Err(e) => tracing::warn!(%e, "proactive compression failed"),
1207 }
1208
1209 status.send_status("").await;
1210 }
1211
1212 pub fn maybe_refresh_task_goal(&self, summ: &mut ContextSummarizationView<'_>) {
1218 crate::summarization::scheduling::maybe_refresh_task_goal(summ);
1219 }
1220
1221 pub fn maybe_refresh_subgoal(&self, summ: &mut ContextSummarizationView<'_>) {
1226 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
1227 }
1228}
1229
1230struct TxStatusSink(Option<tokio::sync::mpsc::UnboundedSender<String>>);
1236
1237impl StatusSink for TxStatusSink {
1238 fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
1239 if let Some(ref tx) = self.0 {
1240 let _ = tx.send(msg.to_owned());
1241 }
1242 std::future::ready(())
1243 }
1244}
1245
1246pub(crate) fn recompute_prompt_tokens(window: &mut MessageWindowView<'_>) {
1253 *window.cached_prompt_tokens = window
1254 .messages
1255 .iter()
1256 .map(|m| window.token_counter.count_message_tokens(m) as u64)
1257 .sum();
1258}
1259
1260pub(crate) fn remove_by_prefix(
1266 messages: &mut Vec<zeph_llm::provider::Message>,
1267 role: Role,
1268 prefix: &str,
1269) {
1270 messages.retain(|m| m.role != role || !m.content.starts_with(prefix));
1271}
1272
1273pub(crate) fn remove_by_part_or_prefix(
1279 messages: &mut Vec<zeph_llm::provider::Message>,
1280 prefix: &str,
1281 part_matches: impl Fn(&MessagePart) -> bool,
1282) {
1283 messages.retain(|m| {
1284 if m.role != Role::System {
1285 return true;
1286 }
1287 if m.parts.first().is_some_and(&part_matches) {
1288 return false;
1289 }
1290 !m.content.starts_with(prefix)
1291 });
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296 use std::collections::HashSet;
1297 use std::sync::Arc;
1298
1299 use zeph_llm::provider::{Message, MessagePart, Role};
1300 use zeph_memory::TokenCounter;
1301
1302 use super::*;
1303 use crate::helpers::{GRAPH_FACTS_PREFIX, RECALL_PREFIX, SUMMARY_PREFIX};
1304 use crate::state::MessageWindowView;
1305
1306 fn make_counter() -> Arc<TokenCounter> {
1307 Arc::new(TokenCounter::default())
1308 }
1309
1310 fn make_window<'a>(
1311 messages: &'a mut Vec<Message>,
1312 cached: &'a mut u64,
1313 completed: &'a mut HashSet<String>,
1314 ) -> MessageWindowView<'a> {
1315 let last = Box::leak(Box::new(None::<i64>));
1316 let deferred_hide = Box::leak(Box::new(Vec::<i64>::new()));
1317 let deferred_summ = Box::leak(Box::new(Vec::<String>::new()));
1318 MessageWindowView {
1319 messages,
1320 last_persisted_message_id: last,
1321 deferred_db_hide_ids: deferred_hide,
1322 deferred_db_summaries: deferred_summ,
1323 cached_prompt_tokens: cached,
1324 token_counter: make_counter(),
1325 completed_tool_ids: completed,
1326 }
1327 }
1328
1329 fn sys(text: &str) -> Message {
1330 Message::from_legacy(Role::System, text)
1331 }
1332
1333 fn user(text: &str) -> Message {
1334 Message::from_legacy(Role::User, text)
1335 }
1336
1337 fn assistant(text: &str) -> Message {
1338 Message::from_legacy(Role::Assistant, text)
1339 }
1340
1341 #[test]
1342 fn clear_history_keeps_system_prompt() {
1343 let mut msgs = vec![sys("system"), user("hello"), assistant("hi")];
1344 let mut cached = 0u64;
1345 let mut completed = HashSet::new();
1346 completed.insert("tool_1".to_owned());
1347 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1348
1349 ContextService::new().clear_history(&mut window);
1350
1351 assert_eq!(window.messages.len(), 1);
1352 assert_eq!(window.messages[0].content, "system");
1353 assert!(
1354 window.completed_tool_ids.is_empty(),
1355 "completed_tool_ids must be cleared"
1356 );
1357 }
1358
1359 #[test]
1360 fn clear_history_empty_messages_is_noop() {
1361 let mut msgs: Vec<Message> = vec![];
1362 let mut cached = 0u64;
1363 let mut completed = HashSet::new();
1364 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1365
1366 ContextService::new().clear_history(&mut window);
1367
1368 assert!(window.messages.is_empty());
1369 }
1370
1371 #[test]
1372 fn remove_recall_messages_removes_by_prefix() {
1373 let mut msgs = vec![
1374 sys("system"),
1375 sys(&format!("{RECALL_PREFIX}some recalled text")),
1376 user("hello"),
1377 ];
1378 let mut cached = 0u64;
1379 let mut completed = HashSet::new();
1380 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1381
1382 ContextService::new().remove_recall_messages(&mut window);
1383
1384 assert_eq!(window.messages.len(), 2);
1385 assert!(
1386 window
1387 .messages
1388 .iter()
1389 .all(|m| !m.content.starts_with(RECALL_PREFIX))
1390 );
1391 }
1392
1393 #[test]
1394 fn remove_graph_facts_messages_removes_matching() {
1395 let mut msgs = vec![
1396 sys("system"),
1397 sys(&format!("{GRAPH_FACTS_PREFIX}fact1")),
1398 user("hello"),
1399 ];
1400 let mut cached = 0u64;
1401 let mut completed = HashSet::new();
1402 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1403
1404 ContextService::new().remove_graph_facts_messages(&mut window);
1405
1406 assert_eq!(window.messages.len(), 2);
1407 }
1408
1409 #[test]
1410 fn remove_summary_messages_removes_by_part() {
1411 let mut msgs = vec![
1412 sys("system"),
1413 Message::from_parts(
1414 Role::System,
1415 vec![MessagePart::Summary {
1416 text: format!("{SUMMARY_PREFIX}old summary"),
1417 }],
1418 ),
1419 user("hello"),
1420 ];
1421 let mut cached = 0u64;
1422 let mut completed = HashSet::new();
1423 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1424
1425 ContextService::new().remove_summary_messages(&mut window);
1426
1427 assert_eq!(window.messages.len(), 2);
1428 }
1429
1430 #[test]
1431 fn trim_messages_to_budget_zero_is_noop() {
1432 let mut msgs = vec![sys("system"), user("a"), assistant("b"), user("c")];
1433 let original_len = msgs.len();
1434 let mut cached = 0u64;
1435 let mut completed = HashSet::new();
1436 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1437
1438 ContextService::new().trim_messages_to_budget(&mut window, 0);
1439
1440 assert_eq!(window.messages.len(), original_len);
1441 }
1442
1443 #[test]
1444 fn trim_messages_to_budget_keeps_recent() {
1445 let mut msgs = vec![
1447 sys("system"),
1448 user("message 1"),
1449 assistant("reply 1"),
1450 user("message 2"),
1451 ];
1452 let mut cached = 0u64;
1453 let mut completed = HashSet::new();
1454 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1455
1456 ContextService::new().trim_messages_to_budget(&mut window, 1);
1458
1459 assert!(
1461 window.messages.len() < 4,
1462 "trim should remove some messages"
1463 );
1464 assert_eq!(
1465 window.messages[0].role,
1466 Role::System,
1467 "system prompt must survive trim"
1468 );
1469 }
1470}