1use async_trait::async_trait;
4use halter_protocol::{
5 CompactedContext, CompactionResult, ContextPlan, FileViewSlice, HookSessionStartSource,
6 Message, ObservedState, PromptSegment, ProviderCompactionRequest, ResolvedModel,
7 ResourceSnapshot, SessionBlueprint, SessionState, ToolSpec, TranscriptWindow,
8};
9use halter_providers::Provider;
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use tracing::info;
13
14use crate::compaction::{
15 ContextSettings, estimate_context_tokens, prepare_compaction, render_compaction_event_summary,
16 should_trigger_compaction,
17};
18use crate::prompt::skill_prompt_segment;
19
20fn skill_prompt_segments(snapshot: &ResourceSnapshot) -> Vec<PromptSegment> {
25 let mut entries: Vec<(&str, &str)> = snapshot
26 .skills
27 .values()
28 .map(|skill| (skill.name.as_str(), skill.body.as_str()))
29 .collect();
30 entries.sort_by(|a, b| a.0.cmp(b.0));
31 entries
32 .into_iter()
33 .map(|(name, body)| skill_prompt_segment(name, body))
34 .collect()
35}
36
37const DEFAULT_COMPACTION_PROMPT_MARKDOWN: &str = include_str!("../prompts/default-compaction.md");
38
39#[derive(Debug, Clone)]
40pub struct CompactionOutcome {
42 pub messages: Vec<Message>,
43 pub compacted_prefix: Vec<Value>,
44 pub compaction: Option<CompactionResult>,
45 pub session_start_latch: Option<HookSessionStartSource>,
46}
47
48#[derive(Debug, Clone)]
49pub struct CompactionEffects {
51 pub messages: Vec<Message>,
52 pub compacted_context: CompactedContext,
53 pub result: Option<CompactionResult>,
54 pub session_start_latch: Option<HookSessionStartSource>,
55}
56
57impl CompactionEffects {
58 pub fn apply(self, state: &mut SessionState) -> Option<CompactionResult> {
60 let CompactionEffects {
61 messages,
62 compacted_context,
63 result,
64 session_start_latch,
65 } = self;
66 if result.is_some() {
67 state.compacted_prefix = compacted_context.into_items();
68 state.messages = messages;
69 state.last_response_id = None;
73 state.messages_seen_by_provider = 0;
74 }
75 if let Some(source) = session_start_latch {
76 state.pending_session_start_source = Some(source);
77 }
78 result
79 }
80}
81
82impl CompactionOutcome {
83 pub fn apply(self, state: &mut SessionState) -> Option<CompactionResult> {
92 self.into_effects().apply(state)
93 }
94
95 fn into_effects(self) -> CompactionEffects {
96 let CompactionOutcome {
97 messages,
98 compacted_prefix,
99 compaction,
100 session_start_latch,
101 } = self;
102 CompactionEffects {
103 messages,
104 compacted_context: CompactedContext::from(compacted_prefix),
105 result: compaction,
106 session_start_latch,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Copy)]
112enum CompactionMode<'a> {
113 AutoThreshold,
114 Manual {
115 custom_instructions: Option<&'a str>,
116 },
117}
118
119impl<'a> CompactionMode<'a> {
120 fn is_forced(self) -> bool {
121 matches!(self, Self::Manual { .. })
122 }
123
124 fn custom_instructions(self) -> Option<&'a str> {
125 match self {
126 Self::AutoThreshold => None,
127 Self::Manual {
128 custom_instructions,
129 } => custom_instructions,
130 }
131 }
132
133 fn session_start_latch(self) -> Option<HookSessionStartSource> {
134 match self {
135 Self::AutoThreshold => None,
136 Self::Manual { .. } => Some(HookSessionStartSource::Compact),
137 }
138 }
139}
140
141#[async_trait]
142#[allow(clippy::too_many_arguments)]
143pub trait ContextManager: Send + Sync {
145 async fn plan(
147 &self,
148 blueprint: &SessionBlueprint,
149 state: &SessionState,
150 observed: &ObservedState,
151 snapshot: &ResourceSnapshot,
152 tool_specs: &[ToolSpec],
153 compaction_model: &ResolvedModel,
154 compaction_provider: &(dyn Provider + Send + Sync),
155 ) -> anyhow::Result<ContextPlan>;
156
157 async fn compact_now(
159 &self,
160 blueprint: &SessionBlueprint,
161 state: &SessionState,
162 observed: &ObservedState,
163 snapshot: &ResourceSnapshot,
164 tool_specs: &[ToolSpec],
165 compaction_model: &ResolvedModel,
166 compaction_provider: &(dyn Provider + Send + Sync),
167 custom_instructions: Option<&str>,
168 ) -> anyhow::Result<CompactionOutcome>;
169}
170
171#[derive(Debug, Default)]
172pub struct DefaultContextManager {
174 settings: ContextSettings,
175}
176
177impl DefaultContextManager {
178 #[must_use]
180 pub fn new(
181 compaction_threshold: u64,
182 pre_compaction_target: u64,
183 prune_signal_threshold: halter_protocol::PruneSignalThreshold,
184 ) -> Self {
185 Self {
186 settings: ContextSettings {
187 compaction_threshold,
188 pre_compaction_target,
189 prune_signal_threshold,
190 },
191 }
192 }
193
194 #[must_use]
196 pub fn from_settings(settings: ContextSettings) -> Self {
197 Self { settings }
198 }
199
200 #[must_use]
202 pub fn settings(&self) -> ContextSettings {
203 self.settings
204 }
205
206 #[allow(clippy::too_many_arguments)]
207 async fn execute_compaction(
208 &self,
209 blueprint: &SessionBlueprint,
210 state: &SessionState,
211 prompt_segments: &[PromptSegment],
212 tool_specs: &[ToolSpec],
213 compaction_model: &ResolvedModel,
214 compaction_provider: &(dyn Provider + Send + Sync),
215 mode: CompactionMode<'_>,
216 ) -> anyhow::Result<CompactionOutcome> {
217 let estimated_tokens = estimate_context_tokens(
218 prompt_segments,
219 &state.summaries,
220 &state.compacted_prefix,
221 &state.messages,
222 );
223 if !mode.is_forced() && !should_trigger_compaction(estimated_tokens, &self.settings) {
224 return Ok(CompactionOutcome {
225 messages: state.messages.clone(),
226 compacted_prefix: state.compacted_prefix.clone(),
227 compaction: None,
228 session_start_latch: mode.session_start_latch(),
229 });
230 }
231
232 let capabilities = compaction_provider.capabilities();
233 if !capabilities.supports_compaction {
234 anyhow::bail!(
235 "failed to compact session: provider '{}' does not support compaction",
236 compaction_model.provider
237 );
238 }
239
240 let Some(window) = compaction_provider.compaction_window(&state.messages) else {
241 if mode.is_forced() {
242 anyhow::bail!(
243 "failed to compact session: provider '{}' did not provide a compaction window",
244 compaction_model.provider
245 );
246 }
247 return Ok(CompactionOutcome {
248 messages: state.messages.clone(),
249 compacted_prefix: state.compacted_prefix.clone(),
250 compaction: None,
251 session_start_latch: mode.session_start_latch(),
252 });
253 };
254 let compacted_context = CompactedContext::from(state.compacted_prefix.clone());
255 let preparation = prepare_compaction(&self.settings, &compacted_context, window);
256 if compacted_context.is_empty() && preparation.compact_messages.is_empty() {
257 return Ok(CompactionOutcome {
258 messages: state.messages.clone(),
259 compacted_prefix: state.compacted_prefix.clone(),
260 compaction: None,
261 session_start_latch: mode.session_start_latch(),
262 });
263 }
264
265 let response = compaction_provider
266 .compact(
267 ProviderCompactionRequest {
268 session_id: blueprint.session_id.clone(),
269 model: compaction_model.clone(),
270 compacted_prefix: state.compacted_prefix.clone(),
271 messages: preparation.compact_messages.clone(),
272 tools: tool_specs.to_vec(),
273 instructions: compaction_instructions(mode.custom_instructions()),
274 },
275 tokio_util::sync::CancellationToken::new(),
276 )
277 .await?;
278 let summary = render_compaction_event_summary(
279 preparation.compacted_message_count,
280 response.output.len(),
281 preparation.evicted_unit_count,
282 preparation.reserved_response_block,
283 );
284
285 Ok(CompactionOutcome {
286 messages: preparation.preserved_messages,
287 compacted_prefix: response.output,
288 compaction: Some(CompactionResult {
289 compacted_count: preparation.compacted_message_count,
290 summary,
291 }),
292 session_start_latch: mode.session_start_latch(),
293 })
294 }
295}
296
297#[async_trait]
298impl ContextManager for DefaultContextManager {
299 async fn plan(
300 &self,
301 blueprint: &SessionBlueprint,
302 state: &SessionState,
303 observed: &ObservedState,
304 snapshot: &ResourceSnapshot,
305 tool_specs: &[ToolSpec],
306 compaction_model: &ResolvedModel,
307 compaction_provider: &(dyn Provider + Send + Sync),
308 ) -> anyhow::Result<ContextPlan> {
309 let mut prompt_segments = blueprint.system_prompt_seed.clone();
310 prompt_segments.extend(skill_prompt_segments(snapshot));
311 prompt_segments.extend(state.appended_prompt_segments.clone());
312
313 let file_views = state
314 .file_view_cache
315 .values()
316 .cloned()
317 .map(|entry| FileViewSlice {
318 path: entry.path,
319 full_hash: entry.full_hash,
320 viewed_ranges: entry.viewed_ranges,
321 last_shown_turn: entry.last_shown_turn,
322 })
323 .collect::<Vec<_>>();
324
325 let outcome = self
326 .execute_compaction(
327 blueprint,
328 state,
329 &prompt_segments,
330 tool_specs,
331 compaction_model,
332 compaction_provider,
333 CompactionMode::AutoThreshold,
334 )
335 .await?;
336 let estimated_tokens = estimate_context_tokens(
337 &prompt_segments,
338 &state.summaries,
339 &outcome.compacted_prefix,
340 &outcome.messages,
341 );
342
343 if let Some(compaction) = outcome.compaction.as_ref() {
344 info!(
345 compacted_messages = compaction.compacted_count,
346 remaining_messages = outcome.messages.len(),
347 compacted_prefix_items = outcome.compacted_prefix.len(),
348 estimated_tokens,
349 compaction_threshold = self.settings.compaction_threshold,
350 "context manager compacted session state"
351 );
352 }
353
354 let (previous_response_id, new_messages_start) = resolve_response_chain(
355 state.last_response_id.as_deref(),
356 state.messages_seen_by_provider,
357 state.messages.len(),
358 outcome.messages.len(),
359 outcome.compaction.is_some(),
360 !outcome.compacted_prefix.is_empty(),
361 );
362 let previous_response_id = previous_response_id.map(|s| s.to_owned());
363
364 Ok(ContextPlan {
365 prompt_segments,
366 transcript_window: TranscriptWindow {
367 messages: outcome.messages.clone(),
368 elided_message_count: state.messages.len().saturating_sub(outcome.messages.len())
369 as u64,
370 },
371 compacted_prefix: outcome.compacted_prefix.clone(),
372 file_views,
373 carried_summaries: state.summaries.clone(),
374 elided_tool_results: Vec::new(),
375 memory_items: Vec::new(),
376 tool_specs: tool_specs.to_vec(),
377 observed_state: observed.clone(),
378 projected_input_tokens: estimated_tokens,
379 cache_boundary_hash: cache_boundary_hash(),
380 messages: outcome.messages,
381 estimated_tokens,
382 compaction: outcome.compaction,
383 previous_response_id,
384 new_messages_start,
385 })
386 }
387
388 async fn compact_now(
389 &self,
390 blueprint: &SessionBlueprint,
391 state: &SessionState,
392 _observed: &ObservedState,
393 snapshot: &ResourceSnapshot,
394 tool_specs: &[ToolSpec],
395 compaction_model: &ResolvedModel,
396 compaction_provider: &(dyn Provider + Send + Sync),
397 custom_instructions: Option<&str>,
398 ) -> anyhow::Result<CompactionOutcome> {
399 let mut prompt_segments = blueprint.system_prompt_seed.clone();
400 prompt_segments.extend(skill_prompt_segments(snapshot));
401 prompt_segments.extend(state.appended_prompt_segments.clone());
402 self.execute_compaction(
403 blueprint,
404 state,
405 &prompt_segments,
406 tool_specs,
407 compaction_model,
408 compaction_provider,
409 CompactionMode::Manual {
410 custom_instructions,
411 },
412 )
413 .await
414 }
415}
416
417fn cache_boundary_hash() -> String {
418 let mut hasher = Sha256::new();
419 hasher.update(b"transcript_boundary_v2");
420 format!("{:x}", hasher.finalize())
421}
422
423#[must_use]
455pub fn resolve_response_chain(
456 last_response_id: Option<&str>,
457 messages_seen_by_provider: usize,
458 total_messages: usize,
459 window_messages: usize,
460 compacted_this_turn: bool,
461 has_compacted_prefix: bool,
462) -> (Option<&str>, usize) {
463 if compacted_this_turn
464 || has_compacted_prefix
465 || messages_seen_by_provider == 0
466 || last_response_id.is_none()
467 {
468 return (None, 0);
469 }
470 let window_offset = total_messages.saturating_sub(window_messages);
471 let new_start = messages_seen_by_provider
472 .saturating_sub(window_offset)
473 .min(window_messages);
474 (last_response_id, new_start)
475}
476
477fn compaction_instructions(custom_instructions: Option<&str>) -> String {
478 let base = DEFAULT_COMPACTION_PROMPT_MARKDOWN.trim();
479 if let Some(custom_instructions) =
480 custom_instructions.filter(|instructions| !instructions.trim().is_empty())
481 {
482 format!("{base}\n\n{custom_instructions}")
483 } else {
484 base.to_owned()
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use chrono::Utc;
491 use halter_protocol::{
492 ModelId, ModelRole, ProviderCapabilities, ProviderKind, ProviderName, ResolvedModel,
493 SessionId, SubagentEventForwarding, SummarySlice, ToolCallIdPolicy, Usage, UserMessage,
494 };
495
496 use super::*;
497
498 #[tokio::test]
499 async fn plan_disables_previous_response_chaining_when_compacted_prefix_exists() {
500 let manager = DefaultContextManager::default();
501 let outcome = manager
502 .plan(
503 &SessionBlueprint {
504 session_id: SessionId::new(),
505 parent_session_id: None,
506 default_model: "default".into(),
507 subagent_model: "subagent".into(),
508 subagent_event_forwarding: SubagentEventForwarding::Off,
509 snapshot_revision: "r1".into(),
510 working_dir: ".".into(),
511 system_prompt_seed: Vec::new(),
512 max_turns: None,
513 subagent_depth: 0,
514 },
515 &SessionState {
516 compacted_prefix: vec![serde_json::json!({
517 "type": "compaction",
518 "id": "cmp_1",
519 "encrypted_content": "x",
520 })],
521 summaries: vec![SummarySlice {
522 id: "summary-1".to_owned(),
523 text: "summary".to_owned(),
524 }],
525 messages: vec![Message::User(UserMessage::text("hello"))],
526 last_response_id: Some("resp_1".to_owned()),
527 messages_seen_by_provider: 1,
528 ..SessionState::default()
529 },
530 &ObservedState {
531 cwd: ".".into(),
532 git_branch: None,
533 git_dirty: None,
534 now_utc: Utc::now(),
535 env_facts: Default::default(),
536 },
537 &ResourceSnapshot::empty(),
538 &[],
539 &ResolvedModel {
540 role: ModelRole::default(),
541 id: ModelId::from("default"),
542 provider: ProviderName::from("fake"),
543 provider_kind: ProviderKind::Fake,
544 api_kind: halter_protocol::ApiKind::Fake,
545 model: "fake".to_owned(),
546 max_input_tokens: None,
547 max_output_tokens: None,
548 reasoning: None,
549 tokens_per_minute: None,
550 },
551 &NoopProvider,
552 )
553 .await
554 .expect("plan");
555
556 assert!(outcome.previous_response_id.is_none());
557 }
558
559 struct NoopProvider;
560
561 #[async_trait]
562 impl Provider for NoopProvider {
563 fn capabilities(&self) -> ProviderCapabilities {
564 ProviderCapabilities {
565 supports_compaction: true,
566 tool_call_id_policy: ToolCallIdPolicy::ProviderSupplied,
567 ..ProviderCapabilities::default()
568 }
569 }
570
571 async fn stream(
572 &self,
573 _request: halter_protocol::ProviderRequest,
574 _cancel: tokio_util::sync::CancellationToken,
575 ) -> anyhow::Result<
576 futures::stream::BoxStream<
577 'static,
578 Result<halter_protocol::StreamEvent, halter_protocol::ProviderError>,
579 >,
580 > {
581 anyhow::bail!("stream should not be called in this test");
582 }
583
584 async fn compact(
585 &self,
586 _request: ProviderCompactionRequest,
587 _cancel: tokio_util::sync::CancellationToken,
588 ) -> anyhow::Result<halter_protocol::ProviderCompactionResponse> {
589 Ok(halter_protocol::ProviderCompactionResponse {
590 output: vec![serde_json::json!({
591 "type": "compaction",
592 "id": "cmp_1",
593 "encrypted_content": "summary",
594 })],
595 usage: Usage::default(),
596 })
597 }
598 }
599
600 #[test]
601 fn compaction_instructions_append_custom_text() {
602 let instructions = compaction_instructions(Some("Focus on decisions."));
603 assert!(instructions.contains("Compress the conversation"));
604 assert!(instructions.contains("Focus on decisions."));
605 }
606
607 #[test]
608 fn compaction_instructions_ignore_blank_custom_text() {
609 assert_eq!(
610 compaction_instructions(Some(" ")),
611 DEFAULT_COMPACTION_PROMPT_MARKDOWN.trim()
612 );
613 }
614}