1use zeph_context::budget::ContextBudget;
7use zeph_llm::LlmProvider;
8use zeph_llm::provider::{MessagePart, Role};
9
10use crate::error::ContextError;
11use crate::helpers::{
12 CODE_CONTEXT_PREFIX, CORRECTIONS_PREFIX, CROSS_SESSION_PREFIX, DOCUMENT_RAG_PREFIX,
13 GRAPH_FACTS_PREFIX, LSP_NOTE_PREFIX, PERSONA_PREFIX, REASONING_PREFIX, RECALL_PREFIX,
14 SESSION_DIGEST_PREFIX, SUMMARY_PREFIX, TRAJECTORY_PREFIX, TREE_MEMORY_PREFIX,
15};
16use crate::state::{
17 ContextAssemblyView, ContextDelta, ContextSummarizationView, MessageWindowView,
18 ProviderHandles, StatusSink,
19};
20
21#[derive(Debug, Default)]
39pub struct ContextService;
40
41impl ContextService {
42 #[must_use]
46 pub fn new() -> Self {
47 Self
48 }
49
50 pub fn clear_history(&self, window: &mut MessageWindowView<'_>) {
58 let system_prompt = window.messages.first().cloned();
59 window.messages.clear();
60 if let Some(sp) = system_prompt {
61 window.messages.push(sp);
62 }
63 window.completed_tool_ids.clear();
64 recompute_prompt_tokens(window);
65 }
66
67 pub fn remove_recall_messages(&self, window: &mut MessageWindowView<'_>) {
69 remove_by_part_or_prefix(window.messages, RECALL_PREFIX, |p| {
70 matches!(p, MessagePart::Recall { .. })
71 });
72 }
73
74 pub fn remove_correction_messages(&self, window: &mut MessageWindowView<'_>) {
76 remove_by_prefix(window.messages, Role::System, CORRECTIONS_PREFIX);
77 }
78
79 pub fn remove_graph_facts_messages(&self, window: &mut MessageWindowView<'_>) {
81 remove_by_prefix(window.messages, Role::System, GRAPH_FACTS_PREFIX);
82 }
83
84 pub fn remove_persona_facts_messages(&self, window: &mut MessageWindowView<'_>) {
86 remove_by_prefix(window.messages, Role::System, PERSONA_PREFIX);
87 }
88
89 pub fn remove_trajectory_hints_messages(&self, window: &mut MessageWindowView<'_>) {
91 remove_by_prefix(window.messages, Role::System, TRAJECTORY_PREFIX);
92 }
93
94 pub fn remove_tree_memory_messages(&self, window: &mut MessageWindowView<'_>) {
96 remove_by_prefix(window.messages, Role::System, TREE_MEMORY_PREFIX);
97 }
98
99 pub fn remove_reasoning_strategies_messages(&self, window: &mut MessageWindowView<'_>) {
101 remove_by_prefix(window.messages, Role::System, REASONING_PREFIX);
102 }
103
104 pub fn remove_lsp_messages(&self, window: &mut MessageWindowView<'_>) {
109 remove_by_prefix(window.messages, Role::System, LSP_NOTE_PREFIX);
110 }
111
112 pub fn remove_code_context_messages(&self, window: &mut MessageWindowView<'_>) {
114 remove_by_part_or_prefix(window.messages, CODE_CONTEXT_PREFIX, |p| {
115 matches!(p, MessagePart::CodeContext { .. })
116 });
117 }
118
119 pub fn remove_summary_messages(&self, window: &mut MessageWindowView<'_>) {
121 remove_by_part_or_prefix(window.messages, SUMMARY_PREFIX, |p| {
122 matches!(p, MessagePart::Summary { .. })
123 });
124 }
125
126 pub fn remove_cross_session_messages(&self, window: &mut MessageWindowView<'_>) {
128 remove_by_part_or_prefix(window.messages, CROSS_SESSION_PREFIX, |p| {
129 matches!(p, MessagePart::CrossSession { .. })
130 });
131 }
132
133 pub fn remove_session_digest_message(&self, window: &mut MessageWindowView<'_>) {
135 remove_by_prefix(window.messages, Role::User, SESSION_DIGEST_PREFIX);
136 }
137
138 pub fn remove_document_rag_messages(&self, window: &mut MessageWindowView<'_>) {
140 remove_by_prefix(window.messages, Role::System, DOCUMENT_RAG_PREFIX);
141 }
142
143 pub fn trim_messages_to_budget(&self, window: &mut MessageWindowView<'_>, token_budget: usize) {
151 if token_budget == 0 {
152 return;
153 }
154
155 let history_start = window
157 .messages
158 .iter()
159 .position(|m| m.role != Role::System)
160 .unwrap_or(window.messages.len());
161
162 if history_start >= window.messages.len() {
163 return;
164 }
165
166 let mut total = 0usize;
167 let mut keep_from = window.messages.len();
168
169 for i in (history_start..window.messages.len()).rev() {
170 let msg_tokens = window
171 .token_counter
172 .count_message_tokens(&window.messages[i]);
173 if total + msg_tokens > token_budget {
174 break;
175 }
176 total += msg_tokens;
177 keep_from = i;
178 }
179
180 if keep_from > history_start {
181 let removed = keep_from - history_start;
182 window.messages.drain(history_start..keep_from);
183 recompute_prompt_tokens(window);
184 tracing::info!(
185 removed,
186 token_budget,
187 "trimmed messages to fit context budget"
188 );
189 }
190 }
191
192 pub async fn inject_semantic_recall(
204 &self,
205 query: &str,
206 token_budget: usize,
207 window: &mut MessageWindowView<'_>,
208 view: &ContextAssemblyView<'_>,
209 ) -> Result<(), ContextError> {
210 self.remove_recall_messages(window);
211
212 let (msg, _score) = crate::helpers::fetch_semantic_recall_raw(
213 view.memory.as_deref(),
214 view.recall_limit,
215 view.context_format,
216 query,
217 token_budget,
218 &view.token_counter,
219 None,
220 None,
221 )
222 .await?;
223
224 if let Some(msg) = msg
225 && window.messages.len() > 1
226 {
227 window.messages.insert(1, msg);
228 }
229
230 Ok(())
231 }
232
233 pub async fn inject_cross_session_context(
242 &self,
243 query: &str,
244 token_budget: usize,
245 window: &mut MessageWindowView<'_>,
246 view: &ContextAssemblyView<'_>,
247 ) -> Result<(), ContextError> {
248 self.remove_cross_session_messages(window);
249
250 if let Some(msg) = crate::helpers::fetch_cross_session_raw(
251 view.memory.as_deref(),
252 view.conversation_id,
253 view.cross_session_score_threshold,
254 query,
255 token_budget,
256 &view.token_counter,
257 )
258 .await?
259 && window.messages.len() > 1
260 {
261 window.messages.insert(1, msg);
262 tracing::debug!("injected cross-session context");
263 }
264
265 Ok(())
266 }
267
268 pub async fn inject_summaries(
277 &self,
278 token_budget: usize,
279 window: &mut MessageWindowView<'_>,
280 view: &ContextAssemblyView<'_>,
281 ) -> Result<(), ContextError> {
282 self.remove_summary_messages(window);
283
284 if let Some(msg) = crate::helpers::fetch_summaries_raw(
285 view.memory.as_deref(),
286 view.conversation_id,
287 token_budget,
288 &view.token_counter,
289 )
290 .await?
291 && window.messages.len() > 1
292 {
293 window.messages.insert(1, msg);
294 tracing::debug!("injected summaries into context");
295 }
296
297 Ok(())
298 }
299
300 pub async fn disambiguate_skills(
305 &self,
306 query: &str,
307 all_meta: &[&zeph_skills::loader::SkillMeta],
308 scored: &[zeph_skills::ScoredMatch],
309 providers: &ProviderHandles,
310 ) -> Option<Vec<usize>> {
311 use std::fmt::Write as _;
312
313 let mut candidates = String::new();
314 for sm in scored {
315 if let Some(meta) = all_meta.get(sm.index) {
316 let _ = writeln!(
317 candidates,
318 "- {} (score: {:.3}): {}",
319 meta.name, sm.score, meta.description
320 );
321 }
322 }
323
324 let prompt = format!(
325 "The user said: \"{query}\"\n\n\
326 These skills matched with similar scores:\n{candidates}\n\
327 Which skill best matches the user's intent? \
328 Return the skill_name, your confidence (0-1), and any extracted parameters."
329 );
330
331 let messages = vec![zeph_llm::provider::Message::from_legacy(
332 zeph_llm::provider::Role::User,
333 prompt,
334 )];
335 match providers
336 .primary
337 .chat_typed::<zeph_skills::IntentClassification>(&messages)
338 .await
339 {
340 Ok(classification) => {
341 tracing::info!(
342 skill = %classification.skill_name,
343 confidence = classification.confidence,
344 "disambiguation selected skill"
345 );
346 let mut indices: Vec<usize> = scored.iter().map(|s| s.index).collect();
347 if let Some(pos) = indices.iter().position(|&i| {
348 all_meta
349 .get(i)
350 .is_some_and(|m| m.name == classification.skill_name)
351 }) {
352 indices.swap(0, pos);
353 }
354 Some(indices)
355 }
356 Err(e) => {
357 tracing::warn!("disambiguation failed, using original order: {e:#}");
358 None
359 }
360 }
361 }
362
363 #[allow(clippy::too_many_lines)] pub async fn prepare_context(
376 &self,
377 query: &str,
378 window: &mut MessageWindowView<'_>,
379 view: &mut ContextAssemblyView<'_>,
380 _providers: &ProviderHandles,
381 ) -> Result<ContextDelta, ContextError> {
382 if view.context_manager.budget.is_none() {
383 return Ok(ContextDelta::default());
384 }
385
386 self.remove_session_digest_message(window);
388 self.remove_summary_messages(window);
389 self.remove_cross_session_messages(window);
390 self.remove_recall_messages(window);
391 self.remove_document_rag_messages(window);
392 self.remove_correction_messages(window);
393 self.remove_code_context_messages(window);
394 self.remove_graph_facts_messages(window);
395 self.remove_persona_facts_messages(window);
396 self.remove_trajectory_hints_messages(window);
397 self.remove_tree_memory_messages(window);
398 if view.reasoning_config.enabled {
399 self.remove_reasoning_strategies_messages(window);
400 }
401
402 if let Some(explorer) = view.proactive_explorer.clone()
404 && let Some(domain) = explorer.classify(query)
405 {
406 let already_known = {
407 let registry_guard = view.skill_registry.read();
408 explorer.has_knowledge(®istry_guard, &domain)
409 };
410 let excluded = explorer.is_excluded(&domain);
411
412 if !already_known && !excluded {
413 tracing::debug!(domain = %domain.0, query_len = query.len(), "proactive.explore triggered");
414 let timeout_ms = explorer.timeout_ms();
415 let result = tokio::time::timeout(
416 std::time::Duration::from_millis(timeout_ms),
417 explorer.explore(&domain),
418 )
419 .await;
420 match result {
421 Ok(Ok(())) => {
422 view.skill_registry.write().reload(view.skill_paths);
423 tracing::debug!(domain = %domain.0, "proactive.explore complete, registry reloaded");
424 }
425 Ok(Err(e)) => {
426 tracing::warn!(domain = %domain.0, error = %e, "proactive exploration failed");
427 }
428 Err(_) => {
429 tracing::warn!(domain = %domain.0, timeout_ms, "proactive exploration timed out");
430 }
431 }
432 }
433 }
434
435 let active_levels: &'static [zeph_memory::compression::CompressionLevel] =
437 if let Some(ref budget) = view.context_manager.budget {
438 let used = view.cached_prompt_tokens;
439 let max = budget.max_tokens();
440 #[allow(clippy::cast_precision_loss)]
441 let remaining_ratio = if max == 0 {
442 1.0_f32
443 } else {
444 1.0 - (used as f32 / max as f32).clamp(0.0, 1.0)
445 };
446 let levels =
447 zeph_memory::compression::RetrievalPolicy::default().select(remaining_ratio);
448 tracing::debug!(
449 remaining_ratio,
450 active_levels = ?levels,
451 "compression_spectrum: retrieval policy selected"
452 );
453 levels
454 } else {
455 &[]
456 };
457
458 let memory_view = zeph_context::input::ContextMemoryView {
459 memory: view.memory.clone(),
460 conversation_id: view.conversation_id,
461 recall_limit: view.recall_limit,
462 cross_session_score_threshold: view.cross_session_score_threshold,
463 context_strategy: view.context_strategy,
464 crossover_turn_threshold: view.crossover_turn_threshold,
465 cached_session_digest: view.cached_session_digest.clone(),
466 graph_config: view.graph_config.clone(),
467 document_config: view.document_config.clone(),
468 persona_config: view.persona_config.clone(),
469 trajectory_config: view.trajectory_config.clone(),
470 reasoning_config: view.reasoning_config.clone(),
471 memcot_config: view.memcot_config.clone(),
472 memcot_state: view.memcot_state.clone(),
473 tree_config: view.tree_config.clone(),
474 };
475
476 #[cfg(feature = "index")]
477 let index_access = view.index;
478 #[cfg(not(feature = "index"))]
479 let index_access: Option<&dyn zeph_context::input::IndexAccess> = None;
480
481 let input = zeph_context::input::ContextAssemblyInput {
482 memory: &memory_view,
483 context_manager: view.context_manager,
484 token_counter: &view.token_counter,
485 skills_prompt: view.last_skills_prompt,
486 index: index_access,
487 correction_config: view.correction_config,
488 sidequest_turn_counter: view.sidequest_turn_counter,
489 messages: window.messages,
490 query,
491 scrub: view.scrub,
492 active_levels,
493 };
494
495 let prepared = zeph_context::assembler::ContextAssembler::gather(&input).await?;
496
497 let delta = self.apply_prepared_context(window, view, prepared).await;
498 Ok(delta)
499 }
500
501 #[allow(clippy::too_many_lines)] async fn apply_prepared_context(
510 &self,
511 window: &mut MessageWindowView<'_>,
512 view: &mut ContextAssemblyView<'_>,
513 prepared: zeph_context::assembler::PreparedContext,
514 ) -> ContextDelta {
515 use std::borrow::Cow;
516 use zeph_llm::provider::{Message, MessageMetadata, Role};
517 use zeph_sanitizer::{ContentSource, ContentSourceKind, MemorySourceHint};
518
519 *view.last_recall_confidence = prepared.recall_confidence;
521
522 if prepared.memory_first {
524 let history_start = 1usize;
525 let len = window.messages.len();
526 let keep_tail =
527 zeph_context::assembler::memory_first_keep_tail(window.messages, history_start);
528 if len > history_start + keep_tail {
529 window.messages.drain(history_start..len - keep_tail);
530 recompute_prompt_tokens(window);
531 tracing::debug!(
532 strategy = "memory_first",
533 keep_tail,
534 "dropped conversation history, kept last {keep_tail} messages"
535 );
536 }
537 }
538
539 if let Some(msg) = prepared.graph_facts.filter(|_| window.messages.len() > 1) {
541 let sanitized = self
542 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
543 .await;
544 window.messages.insert(1, sanitized);
545 tracing::debug!("injected knowledge graph facts into context");
546 }
547 if let Some(msg) = prepared.doc_rag.filter(|_| window.messages.len() > 1) {
548 let sanitized = self
549 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
550 .await;
551 window.messages.insert(1, sanitized);
552 tracing::debug!("injected document RAG context");
553 }
554 if let Some(msg) = prepared.corrections.filter(|_| window.messages.len() > 1) {
555 let sanitized = self
556 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
557 .await;
558 window.messages.insert(1, sanitized);
559 tracing::debug!("injected past corrections into context");
560 }
561 if let Some(msg) = prepared.recall.filter(|_| window.messages.len() > 1) {
562 let sanitized = self
563 .sanitize_memory_message(msg, MemorySourceHint::ConversationHistory, view)
564 .await;
565 window.messages.insert(1, sanitized);
566 }
567 if let Some(msg) = prepared.cross_session.filter(|_| window.messages.len() > 1) {
568 let sanitized = self
569 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
570 .await;
571 window.messages.insert(1, sanitized);
572 }
573 if let Some(msg) = prepared.summaries.filter(|_| window.messages.len() > 1) {
574 let sanitized = self
575 .sanitize_memory_message(msg, MemorySourceHint::LlmSummary, view)
576 .await;
577 window.messages.insert(1, sanitized);
578 tracing::debug!("injected summaries into context");
579 }
580 if let Some(msg) = prepared.persona_facts.filter(|_| window.messages.len() > 1) {
581 let sanitized = self
582 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
583 .await;
584 window.messages.insert(1, sanitized);
585 tracing::debug!("injected persona facts into context");
586 }
587 if let Some(msg) = prepared
588 .trajectory_hints
589 .filter(|_| window.messages.len() > 1)
590 {
591 let sanitized = self
592 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
593 .await;
594 window.messages.insert(1, sanitized);
595 tracing::debug!("injected trajectory hints into context");
596 }
597 if let Some(msg) = prepared.tree_memory.filter(|_| window.messages.len() > 1) {
598 let sanitized = self
599 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
600 .await;
601 window.messages.insert(1, sanitized);
602 tracing::debug!("injected tree memory summary into context");
603 }
604 if let Some(msg) = prepared
605 .reasoning_hints
606 .filter(|_| window.messages.len() > 1)
607 {
608 let sanitized = self
609 .sanitize_memory_message(msg, MemorySourceHint::ExternalContent, view)
610 .await;
611 window.messages.insert(1, sanitized);
612 tracing::debug!("injected reasoning strategies into context");
613 }
614
615 let code_context = if let Some(text) = prepared.code_context {
617 let sanitized = view
618 .sanitizer
619 .sanitize(&text, ContentSource::new(ContentSourceKind::ToolResult));
620 view.metrics.sanitizer_runs += 1;
621 if !sanitized.injection_flags.is_empty() {
622 tracing::warn!(
623 flags = sanitized.injection_flags.len(),
624 "injection patterns detected in code RAG context"
625 );
626 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
627 let detail = sanitized
628 .injection_flags
629 .first()
630 .map_or_else(String::new, |f| {
631 format!("Detected pattern: {}", f.pattern_name)
632 });
633 view.security_events.push(
634 zeph_common::SecurityEventCategory::InjectionFlag,
635 "code_rag",
636 detail,
637 );
638 }
639 if sanitized.was_truncated {
640 view.metrics.sanitizer_truncations += 1;
641 view.security_events.push(
642 zeph_common::SecurityEventCategory::Truncation,
643 "code_rag",
644 "Content truncated to max_content_size".to_string(),
645 );
646 }
647 Some(sanitized.body)
648 } else {
649 None
650 };
651
652 if !prepared.memory_first {
653 self.trim_messages_to_budget(window, prepared.recent_history_budget);
654 }
655
656 if view.digest_enabled
658 && let Some((digest_text, _)) = view
659 .cached_session_digest
660 .clone()
661 .filter(|_| window.messages.len() > 1)
662 {
663 let digest_msg = Message {
664 role: Role::User,
665 content: format!("{}{digest_text}", crate::helpers::SESSION_DIGEST_PREFIX),
666 parts: vec![],
667 metadata: MessageMetadata::default(),
668 };
669 let sanitized = self
670 .sanitize_memory_message(digest_msg, MemorySourceHint::LlmSummary, view)
671 .await;
672 window.messages.insert(1, sanitized);
673 tracing::debug!("injected session digest into context");
674 }
675
676 if view.redact_credentials {
678 for msg in &mut *window.messages {
679 if msg.role == Role::System {
680 continue;
681 }
682 if let Cow::Owned(s) = (view.scrub)(&msg.content) {
683 msg.content = s;
684 }
685 }
686 }
687
688 recompute_prompt_tokens(window);
689
690 ContextDelta { code_context }
691 }
692
693 async fn sanitize_memory_message(
703 &self,
704 mut msg: zeph_llm::provider::Message,
705 hint: zeph_sanitizer::MemorySourceHint,
706 view: &mut ContextAssemblyView<'_>,
707 ) -> zeph_llm::provider::Message {
708 use zeph_sanitizer::{ContentSource, ContentSourceKind};
709
710 let source = ContentSource::new(ContentSourceKind::MemoryRetrieval).with_memory_hint(hint);
711 let sanitized = view.sanitizer.sanitize(&msg.content, source);
712 view.metrics.sanitizer_runs += 1;
713 if !sanitized.injection_flags.is_empty() {
714 tracing::warn!(
715 flags = sanitized.injection_flags.len(),
716 "injection patterns detected in memory retrieval"
717 );
718 view.metrics.sanitizer_injection_flags += sanitized.injection_flags.len() as u64;
719 let detail = sanitized
720 .injection_flags
721 .first()
722 .map_or_else(String::new, |f| {
723 format!("Detected pattern: {}", f.pattern_name)
724 });
725 view.security_events.push(
726 zeph_common::SecurityEventCategory::InjectionFlag,
727 "memory_retrieval",
728 detail,
729 );
730 }
731 if sanitized.was_truncated {
732 view.metrics.sanitizer_truncations += 1;
733 view.security_events.push(
734 zeph_common::SecurityEventCategory::Truncation,
735 "memory_retrieval",
736 "Content truncated to max_content_size".to_string(),
737 );
738 }
739
740 if view.sanitizer.is_enabled()
742 && let Some(qs) = view.quarantine_summarizer
743 && qs.should_quarantine(ContentSourceKind::MemoryRetrieval)
744 {
745 match qs.extract_facts(&sanitized, view.sanitizer).await {
746 Ok((facts, flags)) => {
747 view.metrics.quarantine_invocations += 1;
748 view.security_events.push(
749 zeph_common::SecurityEventCategory::Quarantine,
750 "memory_retrieval",
751 "Content quarantined, facts extracted".to_string(),
752 );
753 let escaped = zeph_sanitizer::ContentSanitizer::escape_delimiter_tags(&facts);
754 msg.content = zeph_sanitizer::ContentSanitizer::apply_spotlight(
755 &escaped,
756 &sanitized.source,
757 &flags,
758 );
759 return msg;
760 }
761 Err(e) => {
762 tracing::warn!(
763 error = %e,
764 "quarantine failed for memory retrieval, using original sanitized content"
765 );
766 view.metrics.quarantine_failures += 1;
767 view.security_events.push(
768 zeph_common::SecurityEventCategory::Quarantine,
769 "memory_retrieval",
770 format!("Quarantine failed: {e}"),
771 );
772 }
773 }
774 }
775
776 msg.content = sanitized.body;
777 msg
778 }
779
780 pub async fn reset_conversation(
790 &self,
791 window: &mut MessageWindowView<'_>,
792 _view: &mut ContextAssemblyView<'_>,
793 ) -> Result<(), ContextError> {
794 self.clear_history(window);
795 Ok(())
796 }
797
798 #[allow(
815 clippy::cast_precision_loss,
816 clippy::cast_possible_truncation,
817 clippy::cast_sign_loss
818 )]
819 pub async fn maybe_compact(
820 &self,
821 summ: &mut ContextSummarizationView<'_>,
822 _providers: &ProviderHandles,
823 status: &(impl StatusSink + ?Sized),
824 ) -> Result<(), ContextError> {
825 use zeph_context::manager::{CompactionState, CompactionTier};
826
827 if let Some(ref mut count) = summ.context_manager.turns_since_last_hard_compaction {
829 *count += 1;
830 }
831
832 if let CompactionState::Exhausted { ref mut warned } = summ.context_manager.compaction
834 && !*warned
835 {
836 *warned = true;
837 tracing::warn!("compaction exhausted: context budget too tight for this session");
838 }
839 if summ.context_manager.compaction.is_exhausted() {
840 return Ok(());
841 }
842
843 if summ.server_compaction_active {
845 let budget = summ
846 .context_manager
847 .budget
848 .as_ref()
849 .map_or(0, ContextBudget::max_tokens);
850 if budget > 0 {
851 let fallback = (budget * 95 / 100) as u64;
852 if *summ.cached_prompt_tokens < fallback {
853 return Ok(());
854 }
855 tracing::warn!(
856 "server compaction active but context at 95%+ — falling back to client-side"
857 );
858 } else {
859 return Ok(());
860 }
861 }
862
863 if summ.context_manager.compaction.is_compacted_this_turn() {
865 return Ok(());
866 }
867
868 let in_cooldown = summ.context_manager.compaction.cooldown_remaining() > 0;
870 if in_cooldown
871 && let CompactionState::Cooling {
872 ref mut turns_remaining,
873 } = summ.context_manager.compaction
874 {
875 *turns_remaining -= 1;
876 if *turns_remaining == 0 {
877 summ.context_manager.compaction = CompactionState::Ready;
878 }
879 }
880
881 match summ
882 .context_manager
883 .compaction_tier(*summ.cached_prompt_tokens)
884 {
885 CompactionTier::None => Ok(()),
886 CompactionTier::Soft => {
887 self.do_soft_compaction(summ, status).await;
888 Ok(())
889 }
890 CompactionTier::Hard => self.do_hard_compaction(summ, status, in_cooldown).await,
891 }
892 }
893
894 #[allow(
899 clippy::cast_precision_loss,
900 clippy::cast_possible_truncation,
901 clippy::cast_sign_loss
902 )]
903 async fn do_soft_compaction(
904 &self,
905 summ: &mut ContextSummarizationView<'_>,
906 status: &(impl StatusSink + ?Sized),
907 ) {
908 status.send_status("soft compacting context...").await;
909
910 match &summ.context_manager.compression.pruning_strategy {
912 zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {
913 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
914 }
915 _ => crate::summarization::scheduling::maybe_refresh_task_goal(summ),
916 }
917
918 let applied = crate::summarization::deferred::apply_deferred_summaries(summ);
920
921 if applied > 0
923 && summ
924 .context_manager
925 .compression
926 .pruning_strategy
927 .is_subgoal()
928 {
929 summ.subgoal_registry
930 .rebuild_after_compaction(summ.messages, 0);
931 }
932
933 let budget = summ
935 .context_manager
936 .budget
937 .as_ref()
938 .map_or(0, ContextBudget::max_tokens);
939 let soft_threshold =
940 (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
941 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
942 let min_to_free = cached.saturating_sub(soft_threshold);
943 if min_to_free > 0 {
944 crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
945 }
946
947 status.send_status("").await;
948 tracing::info!(
949 cached_tokens = *summ.cached_prompt_tokens,
950 soft_threshold,
951 "soft compaction complete"
952 );
953 }
954
955 #[allow(
957 clippy::cast_precision_loss,
958 clippy::cast_possible_truncation,
959 clippy::cast_sign_loss
960 )]
961 async fn do_hard_compaction(
962 &self,
963 summ: &mut ContextSummarizationView<'_>,
964 status: &(impl StatusSink + ?Sized),
965 in_cooldown: bool,
966 ) -> Result<(), ContextError> {
967 use zeph_context::manager::CompactionState;
968
969 let turns_since_last = summ
971 .context_manager
972 .turns_since_last_hard_compaction
973 .map(|t| u32::try_from(t).unwrap_or(u32::MAX));
974 summ.context_manager.turns_since_last_hard_compaction = Some(0);
975 if let Some(metrics) = summ.metrics {
976 metrics.record_hard_compaction(turns_since_last);
977 }
978
979 if in_cooldown {
980 tracing::debug!(
981 turns_remaining = summ.context_manager.compaction.cooldown_remaining(),
982 "hard compaction skipped: cooldown active"
983 );
984 return Ok(());
985 }
986
987 let budget = summ
988 .context_manager
989 .budget
990 .as_ref()
991 .map_or(0, ContextBudget::max_tokens);
992 let hard_threshold =
993 (budget as f32 * summ.context_manager.hard_compaction_threshold) as usize;
994 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
995 let min_to_free = cached.saturating_sub(hard_threshold);
996
997 status.send_status("compacting context...").await;
998
999 crate::summarization::deferred::apply_deferred_summaries(summ);
1001
1002 let freed = crate::summarization::pruning::prune_tool_outputs(summ, min_to_free);
1004 if freed >= min_to_free {
1005 tracing::info!(freed, "hard compaction: pruning sufficient");
1006 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1007 cooldown: summ.context_manager.compaction_cooldown_turns,
1008 };
1009 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1010 tracing::warn!(%e, "flush_deferred_summaries failed after hard compaction");
1011 }
1012 status.send_status("").await;
1013 return Ok(());
1014 }
1015
1016 let preserve_tail = summ.context_manager.compaction_preserve_tail;
1018 let compactable = summ.messages.len().saturating_sub(preserve_tail + 1);
1019 if compactable <= 1 {
1020 tracing::warn!(
1021 compactable,
1022 "hard compaction: too few messages, marking exhausted"
1023 );
1024 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1025 status.send_status("").await;
1026 return Ok(());
1027 }
1028
1029 tracing::info!(
1031 min_to_free,
1032 "hard compaction: falling back to LLM summarization"
1033 );
1034 let tokens_before = *summ.cached_prompt_tokens;
1035 let outcome = crate::summarization::compaction::compact_context(summ, None).await?;
1036
1037 let freed_tokens = tokens_before.saturating_sub(*summ.cached_prompt_tokens);
1038
1039 if !outcome.is_compacted() || freed_tokens == 0 {
1040 tracing::warn!("hard compaction: no net reduction, marking exhausted");
1041 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1042 status.send_status("").await;
1043 return Ok(());
1044 }
1045
1046 if matches!(
1047 summ.context_manager
1048 .compaction_tier(*summ.cached_prompt_tokens),
1049 zeph_context::manager::CompactionTier::Hard
1050 ) {
1051 tracing::warn!(
1052 freed_tokens,
1053 "hard compaction: still above hard threshold after compaction, marking exhausted"
1054 );
1055 summ.context_manager.compaction = CompactionState::Exhausted { warned: false };
1056 status.send_status("").await;
1057 return Ok(());
1058 }
1059
1060 summ.context_manager.compaction = CompactionState::CompactedThisTurn {
1061 cooldown: summ.context_manager.compaction_cooldown_turns,
1062 };
1063
1064 if tokens_before > *summ.cached_prompt_tokens {
1065 tracing::info!(
1066 tokens_before,
1067 tokens_after = *summ.cached_prompt_tokens,
1068 saved = freed_tokens,
1069 "context compaction complete"
1070 );
1071 }
1072
1073 status.send_status("").await;
1074 Ok(())
1075 }
1076
1077 pub async fn maybe_summarize_tool_pair(
1083 &self,
1084 summ: &mut ContextSummarizationView<'_>,
1085 providers: &ProviderHandles,
1086 ) {
1087 crate::summarization::deferred::maybe_summarize_tool_pair(
1088 summ,
1089 providers,
1090 &TxStatusSink(summ.status_tx.clone()),
1091 )
1092 .await;
1093 }
1094
1095 #[must_use]
1100 pub fn apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) -> usize {
1101 crate::summarization::deferred::apply_deferred_summaries(summ)
1102 }
1103
1104 pub async fn flush_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1109 if let Err(e) = crate::summarization::deferred::flush_deferred_summaries(summ).await {
1110 tracing::warn!(%e, "flush_deferred_summaries failed");
1111 }
1112 }
1113
1114 pub fn maybe_apply_deferred_summaries(&self, summ: &mut ContextSummarizationView<'_>) {
1120 crate::summarization::deferred::maybe_apply_deferred_summaries(summ);
1121 }
1122
1123 pub async fn compact_context(
1138 &self,
1139 summ: &mut ContextSummarizationView<'_>,
1140 max_summary_tokens: Option<usize>,
1141 ) -> Result<crate::state::CompactionOutcome, crate::error::ContextError> {
1142 crate::summarization::compaction::compact_context(summ, max_summary_tokens).await
1143 }
1144
1145 pub fn maybe_soft_compact_mid_iteration(&self, summ: &mut ContextSummarizationView<'_>) {
1151 crate::summarization::scheduling::maybe_soft_compact_mid_iteration(summ);
1152 }
1153
1154 pub async fn maybe_proactive_compress(
1160 &self,
1161 summ: &mut ContextSummarizationView<'_>,
1162 _providers: &ProviderHandles,
1163 status: &(impl StatusSink + ?Sized),
1164 ) {
1165 let Some((_threshold, max_summary_tokens)) = summ
1166 .context_manager
1167 .should_proactively_compress(*summ.cached_prompt_tokens)
1168 else {
1169 return;
1170 };
1171
1172 if summ.server_compaction_active {
1173 let budget = summ
1174 .context_manager
1175 .budget
1176 .as_ref()
1177 .map_or(0, ContextBudget::max_tokens);
1178 if budget > 0 {
1179 let fallback = (budget * 95 / 100) as u64;
1180 if *summ.cached_prompt_tokens <= fallback {
1181 return;
1182 }
1183 tracing::warn!(
1184 cached_prompt_tokens = *summ.cached_prompt_tokens,
1185 fallback_threshold = fallback,
1186 "server compaction active but context at 95%+ — falling back to proactive"
1187 );
1188 } else {
1189 return;
1190 }
1191 }
1192
1193 status.send_status("compressing context...").await;
1194 tracing::info!(
1195 max_summary_tokens,
1196 cached_tokens = *summ.cached_prompt_tokens,
1197 "proactive compression triggered"
1198 );
1199
1200 match crate::summarization::compaction::compact_context(summ, Some(max_summary_tokens))
1201 .await
1202 {
1203 Ok(outcome) if outcome.is_compacted() => {
1204 summ.context_manager.compaction =
1205 zeph_context::manager::CompactionState::CompactedThisTurn { cooldown: 0 };
1206 tracing::info!("proactive compression complete");
1207 }
1208 Ok(_) => {}
1209 Err(e) => tracing::warn!(%e, "proactive compression failed"),
1210 }
1211
1212 status.send_status("").await;
1213 }
1214
1215 pub fn maybe_refresh_task_goal(&self, summ: &mut ContextSummarizationView<'_>) {
1221 crate::summarization::scheduling::maybe_refresh_task_goal(summ);
1222 }
1223
1224 pub fn maybe_refresh_subgoal(&self, summ: &mut ContextSummarizationView<'_>) {
1229 crate::summarization::scheduling::maybe_refresh_subgoal(summ);
1230 }
1231}
1232
1233struct TxStatusSink(Option<tokio::sync::mpsc::UnboundedSender<String>>);
1239
1240impl StatusSink for TxStatusSink {
1241 fn send_status(&self, msg: &str) -> impl std::future::Future<Output = ()> + Send + '_ {
1242 if let Some(ref tx) = self.0 {
1243 let _ = tx.send(msg.to_owned());
1244 }
1245 std::future::ready(())
1246 }
1247}
1248
1249pub(crate) fn recompute_prompt_tokens(window: &mut MessageWindowView<'_>) {
1256 *window.cached_prompt_tokens = window
1257 .messages
1258 .iter()
1259 .map(|m| window.token_counter.count_message_tokens(m) as u64)
1260 .sum();
1261}
1262
1263pub(crate) fn remove_by_prefix(
1269 messages: &mut Vec<zeph_llm::provider::Message>,
1270 role: Role,
1271 prefix: &str,
1272) {
1273 messages.retain(|m| m.role != role || !m.content.starts_with(prefix));
1274}
1275
1276pub(crate) fn remove_by_part_or_prefix(
1282 messages: &mut Vec<zeph_llm::provider::Message>,
1283 prefix: &str,
1284 part_matches: impl Fn(&MessagePart) -> bool,
1285) {
1286 messages.retain(|m| {
1287 if m.role != Role::System {
1288 return true;
1289 }
1290 if m.parts.first().is_some_and(&part_matches) {
1291 return false;
1292 }
1293 !m.content.starts_with(prefix)
1294 });
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 use std::collections::HashSet;
1300 use std::sync::Arc;
1301
1302 use zeph_llm::provider::{Message, MessagePart, Role};
1303 use zeph_memory::TokenCounter;
1304
1305 use super::*;
1306 use crate::helpers::{GRAPH_FACTS_PREFIX, RECALL_PREFIX, SUMMARY_PREFIX};
1307 use crate::state::MessageWindowView;
1308
1309 fn make_counter() -> Arc<TokenCounter> {
1310 Arc::new(TokenCounter::default())
1311 }
1312
1313 fn make_window<'a>(
1314 messages: &'a mut Vec<Message>,
1315 cached: &'a mut u64,
1316 completed: &'a mut HashSet<String>,
1317 ) -> MessageWindowView<'a> {
1318 let last = Box::leak(Box::new(None::<i64>));
1319 let deferred_hide = Box::leak(Box::new(Vec::<i64>::new()));
1320 let deferred_summ = Box::leak(Box::new(Vec::<String>::new()));
1321 MessageWindowView {
1322 messages,
1323 last_persisted_message_id: last,
1324 deferred_db_hide_ids: deferred_hide,
1325 deferred_db_summaries: deferred_summ,
1326 cached_prompt_tokens: cached,
1327 token_counter: make_counter(),
1328 completed_tool_ids: completed,
1329 }
1330 }
1331
1332 fn sys(text: &str) -> Message {
1333 Message::from_legacy(Role::System, text)
1334 }
1335
1336 fn user(text: &str) -> Message {
1337 Message::from_legacy(Role::User, text)
1338 }
1339
1340 fn assistant(text: &str) -> Message {
1341 Message::from_legacy(Role::Assistant, text)
1342 }
1343
1344 #[test]
1345 fn clear_history_keeps_system_prompt() {
1346 let mut msgs = vec![sys("system"), user("hello"), assistant("hi")];
1347 let mut cached = 0u64;
1348 let mut completed = HashSet::new();
1349 completed.insert("tool_1".to_owned());
1350 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1351
1352 ContextService::new().clear_history(&mut window);
1353
1354 assert_eq!(window.messages.len(), 1);
1355 assert_eq!(window.messages[0].content, "system");
1356 assert!(
1357 window.completed_tool_ids.is_empty(),
1358 "completed_tool_ids must be cleared"
1359 );
1360 }
1361
1362 #[test]
1363 fn clear_history_empty_messages_is_noop() {
1364 let mut msgs: Vec<Message> = vec![];
1365 let mut cached = 0u64;
1366 let mut completed = HashSet::new();
1367 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1368
1369 ContextService::new().clear_history(&mut window);
1370
1371 assert!(window.messages.is_empty());
1372 }
1373
1374 #[test]
1375 fn remove_recall_messages_removes_by_prefix() {
1376 let mut msgs = vec![
1377 sys("system"),
1378 sys(&format!("{RECALL_PREFIX}some recalled text")),
1379 user("hello"),
1380 ];
1381 let mut cached = 0u64;
1382 let mut completed = HashSet::new();
1383 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1384
1385 ContextService::new().remove_recall_messages(&mut window);
1386
1387 assert_eq!(window.messages.len(), 2);
1388 assert!(
1389 window
1390 .messages
1391 .iter()
1392 .all(|m| !m.content.starts_with(RECALL_PREFIX))
1393 );
1394 }
1395
1396 #[test]
1397 fn remove_graph_facts_messages_removes_matching() {
1398 let mut msgs = vec![
1399 sys("system"),
1400 sys(&format!("{GRAPH_FACTS_PREFIX}fact1")),
1401 user("hello"),
1402 ];
1403 let mut cached = 0u64;
1404 let mut completed = HashSet::new();
1405 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1406
1407 ContextService::new().remove_graph_facts_messages(&mut window);
1408
1409 assert_eq!(window.messages.len(), 2);
1410 }
1411
1412 #[test]
1413 fn remove_summary_messages_removes_by_part() {
1414 let mut msgs = vec![
1415 sys("system"),
1416 Message::from_parts(
1417 Role::System,
1418 vec![MessagePart::Summary {
1419 text: format!("{SUMMARY_PREFIX}old summary"),
1420 }],
1421 ),
1422 user("hello"),
1423 ];
1424 let mut cached = 0u64;
1425 let mut completed = HashSet::new();
1426 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1427
1428 ContextService::new().remove_summary_messages(&mut window);
1429
1430 assert_eq!(window.messages.len(), 2);
1431 }
1432
1433 #[test]
1434 fn trim_messages_to_budget_zero_is_noop() {
1435 let mut msgs = vec![sys("system"), user("a"), assistant("b"), user("c")];
1436 let original_len = msgs.len();
1437 let mut cached = 0u64;
1438 let mut completed = HashSet::new();
1439 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1440
1441 ContextService::new().trim_messages_to_budget(&mut window, 0);
1442
1443 assert_eq!(window.messages.len(), original_len);
1444 }
1445
1446 #[test]
1447 fn trim_messages_to_budget_keeps_recent() {
1448 let mut msgs = vec![
1450 sys("system"),
1451 user("message 1"),
1452 assistant("reply 1"),
1453 user("message 2"),
1454 ];
1455 let mut cached = 0u64;
1456 let mut completed = HashSet::new();
1457 let mut window = make_window(&mut msgs, &mut cached, &mut completed);
1458
1459 ContextService::new().trim_messages_to_budget(&mut window, 1);
1461
1462 assert!(
1464 window.messages.len() < 4,
1465 "trim should remove some messages"
1466 );
1467 assert_eq!(
1468 window.messages[0].role,
1469 Role::System,
1470 "system prompt must survive trim"
1471 );
1472 }
1473}