Skip to main content

halter_runtime/
context.rs

1// pattern: Imperative Shell
2
3use async_trait::async_trait;
4use halter_protocol::{
5    CompactedContext, CompactionResult, ContextPlan, FileViewSlice, HookSessionStartSource,
6    Message, ObservedState, PromptSegment, ProviderCompactionRequest, ResolvedModel,
7    ResourceSnapshot, SessionBlueprint, SessionState, ToolSpec, TranscriptWindow,
8};
9use halter_providers::Provider;
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use tracing::info;
13
14use crate::compaction::{
15    ContextSettings, estimate_context_tokens, prepare_compaction, render_compaction_event_summary,
16    should_trigger_compaction,
17};
18use crate::prompt::skill_prompt_segment;
19
20/// Build one prompt segment per skill loaded into the resource snapshot,
21/// in skill-name order so the resulting prefix is stable across rebuilds.
22/// Snapshot order is already deterministic (`IndexMap`), but we still sort
23/// by name to be defensive against future loader changes.
24fn skill_prompt_segments(snapshot: &ResourceSnapshot) -> Vec<PromptSegment> {
25    let mut entries: Vec<(&str, &str)> = snapshot
26        .skills
27        .values()
28        .map(|skill| (skill.name.as_str(), skill.body.as_str()))
29        .collect();
30    entries.sort_by(|a, b| a.0.cmp(b.0));
31    entries
32        .into_iter()
33        .map(|(name, body)| skill_prompt_segment(name, body))
34        .collect()
35}
36
37const DEFAULT_COMPACTION_PROMPT_MARKDOWN: &str = include_str!("../prompts/default-compaction.md");
38
39#[derive(Debug, Clone)]
40/// Result of a compaction pass before it is applied to session state.
41pub struct CompactionOutcome {
42    pub messages: Vec<Message>,
43    pub compacted_prefix: Vec<Value>,
44    pub compaction: Option<CompactionResult>,
45    pub session_start_latch: Option<HookSessionStartSource>,
46}
47
48#[derive(Debug, Clone)]
49/// State mutation produced by compaction.
50pub struct CompactionEffects {
51    pub messages: Vec<Message>,
52    pub compacted_context: CompactedContext,
53    pub result: Option<CompactionResult>,
54    pub session_start_latch: Option<HookSessionStartSource>,
55}
56
57impl CompactionEffects {
58    /// Apply compaction side effects to session state.
59    pub fn apply(self, state: &mut SessionState) -> Option<CompactionResult> {
60        let CompactionEffects {
61            messages,
62            compacted_context,
63            result,
64            session_start_latch,
65        } = self;
66        if result.is_some() {
67            state.compacted_prefix = compacted_context.into_items();
68            state.messages = messages;
69            // Compaction breaks the previous_response_id chain: the provider
70            // has no record of the synthetic `compacted_prefix` we just
71            // injected, so the next request must replay everything.
72            state.last_response_id = None;
73            state.messages_seen_by_provider = 0;
74        }
75        if let Some(source) = session_start_latch {
76            state.pending_session_start_source = Some(source);
77        }
78        result
79    }
80}
81
82impl CompactionOutcome {
83    /// Apply the outcome to a `SessionState` in place. Used by both the
84    /// turn loop and the manual `compact()` entry point so the rules for
85    /// "what changes when compaction lands" live in one place rather than
86    /// being copy-pasted into every caller.
87    ///
88    /// Returns the inner `CompactionResult` when compaction actually fired
89    /// (so callers can publish the event), or `None` when there was
90    /// nothing to compact and the state was left untouched.
91    pub fn apply(self, state: &mut SessionState) -> Option<CompactionResult> {
92        self.into_effects().apply(state)
93    }
94
95    fn into_effects(self) -> CompactionEffects {
96        let CompactionOutcome {
97            messages,
98            compacted_prefix,
99            compaction,
100            session_start_latch,
101        } = self;
102        CompactionEffects {
103            messages,
104            compacted_context: CompactedContext::from(compacted_prefix),
105            result: compaction,
106            session_start_latch,
107        }
108    }
109}
110
111#[derive(Debug, Clone, Copy)]
112enum CompactionMode<'a> {
113    AutoThreshold,
114    Manual {
115        custom_instructions: Option<&'a str>,
116    },
117}
118
119impl<'a> CompactionMode<'a> {
120    fn is_forced(self) -> bool {
121        matches!(self, Self::Manual { .. })
122    }
123
124    fn custom_instructions(self) -> Option<&'a str> {
125        match self {
126            Self::AutoThreshold => None,
127            Self::Manual {
128                custom_instructions,
129            } => custom_instructions,
130        }
131    }
132
133    fn session_start_latch(self) -> Option<HookSessionStartSource> {
134        match self {
135            Self::AutoThreshold => None,
136            Self::Manual { .. } => Some(HookSessionStartSource::Compact),
137        }
138    }
139}
140
141#[async_trait]
142#[allow(clippy::too_many_arguments)]
143/// Builds context plans and performs compaction.
144pub trait ContextManager: Send + Sync {
145    /// Plan the next provider request.
146    async fn plan(
147        &self,
148        blueprint: &SessionBlueprint,
149        state: &SessionState,
150        observed: &ObservedState,
151        snapshot: &ResourceSnapshot,
152        tool_specs: &[ToolSpec],
153        compaction_model: &ResolvedModel,
154        compaction_provider: &(dyn Provider + Send + Sync),
155    ) -> anyhow::Result<ContextPlan>;
156
157    /// Force a compaction pass, optionally with additional instructions.
158    async fn compact_now(
159        &self,
160        blueprint: &SessionBlueprint,
161        state: &SessionState,
162        observed: &ObservedState,
163        snapshot: &ResourceSnapshot,
164        tool_specs: &[ToolSpec],
165        compaction_model: &ResolvedModel,
166        compaction_provider: &(dyn Provider + Send + Sync),
167        custom_instructions: Option<&str>,
168    ) -> anyhow::Result<CompactionOutcome>;
169}
170
171#[derive(Debug, Default)]
172/// Default context manager using heuristic token estimates and signal pruning.
173pub struct DefaultContextManager {
174    settings: ContextSettings,
175}
176
177impl DefaultContextManager {
178    /// Construct from explicit compaction settings.
179    #[must_use]
180    pub fn new(
181        compaction_threshold: u64,
182        pre_compaction_target: u64,
183        prune_signal_threshold: halter_protocol::PruneSignalThreshold,
184    ) -> Self {
185        Self {
186            settings: ContextSettings {
187                compaction_threshold,
188                pre_compaction_target,
189                prune_signal_threshold,
190            },
191        }
192    }
193
194    /// Construct from a settings struct.
195    #[must_use]
196    pub fn from_settings(settings: ContextSettings) -> Self {
197        Self { settings }
198    }
199
200    /// Current context settings.
201    #[must_use]
202    pub fn settings(&self) -> ContextSettings {
203        self.settings
204    }
205
206    #[allow(clippy::too_many_arguments)]
207    async fn execute_compaction(
208        &self,
209        blueprint: &SessionBlueprint,
210        state: &SessionState,
211        prompt_segments: &[PromptSegment],
212        tool_specs: &[ToolSpec],
213        compaction_model: &ResolvedModel,
214        compaction_provider: &(dyn Provider + Send + Sync),
215        mode: CompactionMode<'_>,
216    ) -> anyhow::Result<CompactionOutcome> {
217        let estimated_tokens = estimate_context_tokens(
218            prompt_segments,
219            &state.summaries,
220            &state.compacted_prefix,
221            &state.messages,
222        );
223        if !mode.is_forced() && !should_trigger_compaction(estimated_tokens, &self.settings) {
224            return Ok(CompactionOutcome {
225                messages: state.messages.clone(),
226                compacted_prefix: state.compacted_prefix.clone(),
227                compaction: None,
228                session_start_latch: mode.session_start_latch(),
229            });
230        }
231
232        let capabilities = compaction_provider.capabilities();
233        if !capabilities.supports_compaction {
234            anyhow::bail!(
235                "failed to compact session: provider '{}' does not support compaction",
236                compaction_model.provider
237            );
238        }
239
240        let Some(window) = compaction_provider.compaction_window(&state.messages) else {
241            if mode.is_forced() {
242                anyhow::bail!(
243                    "failed to compact session: provider '{}' did not provide a compaction window",
244                    compaction_model.provider
245                );
246            }
247            return Ok(CompactionOutcome {
248                messages: state.messages.clone(),
249                compacted_prefix: state.compacted_prefix.clone(),
250                compaction: None,
251                session_start_latch: mode.session_start_latch(),
252            });
253        };
254        let compacted_context = CompactedContext::from(state.compacted_prefix.clone());
255        let preparation = prepare_compaction(&self.settings, &compacted_context, window);
256        if compacted_context.is_empty() && preparation.compact_messages.is_empty() {
257            return Ok(CompactionOutcome {
258                messages: state.messages.clone(),
259                compacted_prefix: state.compacted_prefix.clone(),
260                compaction: None,
261                session_start_latch: mode.session_start_latch(),
262            });
263        }
264
265        let response = compaction_provider
266            .compact(
267                ProviderCompactionRequest {
268                    session_id: blueprint.session_id.clone(),
269                    model: compaction_model.clone(),
270                    compacted_prefix: state.compacted_prefix.clone(),
271                    messages: preparation.compact_messages.clone(),
272                    tools: tool_specs.to_vec(),
273                    instructions: compaction_instructions(mode.custom_instructions()),
274                },
275                tokio_util::sync::CancellationToken::new(),
276            )
277            .await?;
278        let summary = render_compaction_event_summary(
279            preparation.compacted_message_count,
280            response.output.len(),
281            preparation.evicted_unit_count,
282            preparation.reserved_response_block,
283        );
284
285        Ok(CompactionOutcome {
286            messages: preparation.preserved_messages,
287            compacted_prefix: response.output,
288            compaction: Some(CompactionResult {
289                compacted_count: preparation.compacted_message_count,
290                summary,
291            }),
292            session_start_latch: mode.session_start_latch(),
293        })
294    }
295}
296
297#[async_trait]
298impl ContextManager for DefaultContextManager {
299    async fn plan(
300        &self,
301        blueprint: &SessionBlueprint,
302        state: &SessionState,
303        observed: &ObservedState,
304        snapshot: &ResourceSnapshot,
305        tool_specs: &[ToolSpec],
306        compaction_model: &ResolvedModel,
307        compaction_provider: &(dyn Provider + Send + Sync),
308    ) -> anyhow::Result<ContextPlan> {
309        let mut prompt_segments = blueprint.system_prompt_seed.clone();
310        prompt_segments.extend(skill_prompt_segments(snapshot));
311        prompt_segments.extend(state.appended_prompt_segments.clone());
312
313        let file_views = state
314            .file_view_cache
315            .values()
316            .cloned()
317            .map(|entry| FileViewSlice {
318                path: entry.path,
319                full_hash: entry.full_hash,
320                viewed_ranges: entry.viewed_ranges,
321                last_shown_turn: entry.last_shown_turn,
322            })
323            .collect::<Vec<_>>();
324
325        let outcome = self
326            .execute_compaction(
327                blueprint,
328                state,
329                &prompt_segments,
330                tool_specs,
331                compaction_model,
332                compaction_provider,
333                CompactionMode::AutoThreshold,
334            )
335            .await?;
336        let estimated_tokens = estimate_context_tokens(
337            &prompt_segments,
338            &state.summaries,
339            &outcome.compacted_prefix,
340            &outcome.messages,
341        );
342
343        if let Some(compaction) = outcome.compaction.as_ref() {
344            info!(
345                compacted_messages = compaction.compacted_count,
346                remaining_messages = outcome.messages.len(),
347                compacted_prefix_items = outcome.compacted_prefix.len(),
348                estimated_tokens,
349                compaction_threshold = self.settings.compaction_threshold,
350                "context manager compacted session state"
351            );
352        }
353
354        let (previous_response_id, new_messages_start) = resolve_response_chain(
355            state.last_response_id.as_deref(),
356            state.messages_seen_by_provider,
357            state.messages.len(),
358            outcome.messages.len(),
359            outcome.compaction.is_some(),
360            !outcome.compacted_prefix.is_empty(),
361        );
362        let previous_response_id = previous_response_id.map(|s| s.to_owned());
363
364        Ok(ContextPlan {
365            prompt_segments,
366            transcript_window: TranscriptWindow {
367                messages: outcome.messages.clone(),
368                elided_message_count: state.messages.len().saturating_sub(outcome.messages.len())
369                    as u64,
370            },
371            compacted_prefix: outcome.compacted_prefix.clone(),
372            file_views,
373            carried_summaries: state.summaries.clone(),
374            elided_tool_results: Vec::new(),
375            memory_items: Vec::new(),
376            tool_specs: tool_specs.to_vec(),
377            observed_state: observed.clone(),
378            projected_input_tokens: estimated_tokens,
379            cache_boundary_hash: cache_boundary_hash(),
380            messages: outcome.messages,
381            estimated_tokens,
382            compaction: outcome.compaction,
383            previous_response_id,
384            new_messages_start,
385        })
386    }
387
388    async fn compact_now(
389        &self,
390        blueprint: &SessionBlueprint,
391        state: &SessionState,
392        _observed: &ObservedState,
393        snapshot: &ResourceSnapshot,
394        tool_specs: &[ToolSpec],
395        compaction_model: &ResolvedModel,
396        compaction_provider: &(dyn Provider + Send + Sync),
397        custom_instructions: Option<&str>,
398    ) -> anyhow::Result<CompactionOutcome> {
399        let mut prompt_segments = blueprint.system_prompt_seed.clone();
400        prompt_segments.extend(skill_prompt_segments(snapshot));
401        prompt_segments.extend(state.appended_prompt_segments.clone());
402        self.execute_compaction(
403            blueprint,
404            state,
405            &prompt_segments,
406            tool_specs,
407            compaction_model,
408            compaction_provider,
409            CompactionMode::Manual {
410                custom_instructions,
411            },
412        )
413        .await
414    }
415}
416
417fn cache_boundary_hash() -> String {
418    let mut hasher = Sha256::new();
419    hasher.update(b"transcript_boundary_v2");
420    format!("{:x}", hasher.finalize())
421}
422
423/// Determines whether a request should reuse the provider's
424/// `previous_response_id` chain, and if so, from which message in the pruned
425/// transcript window the "new since the provider last saw us" slice begins.
426///
427/// Chaining requires that no compaction happened *this* turn and that no prior
428/// compacted prefix is in play — both conditions force a clean replay. When
429/// chaining is allowed, `new_messages_start` is the number of messages at the
430/// head of the window the provider has already observed; the caller sends the
431/// suffix only.
432///
433/// ```
434/// # use halter_runtime::resolve_response_chain;
435/// // No prior response: no chaining.
436/// assert_eq!(resolve_response_chain(None, 0, 0, 0, false, false), (None, 0));
437///
438/// // Clean turn, 6 total messages, provider saw 4, window has 6 → resume at 4.
439/// let (id, start) = resolve_response_chain(Some("resp_1"), 4, 6, 6, false, false);
440/// assert_eq!(id, Some("resp_1"));
441/// assert_eq!(start, 4);
442///
443/// // A 2-message head was pruned: window has 4, provider saw 4 of the original
444/// // 6 → the first 2 seen messages fell outside the window, resume at 2.
445/// let (_, start) = resolve_response_chain(Some("resp_1"), 4, 6, 4, false, false);
446/// assert_eq!(start, 2);
447///
448/// // Compaction fired this turn — must not chain.
449/// assert_eq!(resolve_response_chain(Some("resp_1"), 4, 6, 6, true, false), (None, 0));
450///
451/// // A compacted prefix is already carried — must not chain.
452/// assert_eq!(resolve_response_chain(Some("resp_1"), 4, 6, 6, false, true), (None, 0));
453/// ```
454#[must_use]
455pub fn resolve_response_chain(
456    last_response_id: Option<&str>,
457    messages_seen_by_provider: usize,
458    total_messages: usize,
459    window_messages: usize,
460    compacted_this_turn: bool,
461    has_compacted_prefix: bool,
462) -> (Option<&str>, usize) {
463    if compacted_this_turn
464        || has_compacted_prefix
465        || messages_seen_by_provider == 0
466        || last_response_id.is_none()
467    {
468        return (None, 0);
469    }
470    let window_offset = total_messages.saturating_sub(window_messages);
471    let new_start = messages_seen_by_provider
472        .saturating_sub(window_offset)
473        .min(window_messages);
474    (last_response_id, new_start)
475}
476
477fn compaction_instructions(custom_instructions: Option<&str>) -> String {
478    let base = DEFAULT_COMPACTION_PROMPT_MARKDOWN.trim();
479    if let Some(custom_instructions) =
480        custom_instructions.filter(|instructions| !instructions.trim().is_empty())
481    {
482        format!("{base}\n\n{custom_instructions}")
483    } else {
484        base.to_owned()
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use chrono::Utc;
491    use halter_protocol::{
492        ModelId, ModelRole, ProviderCapabilities, ProviderKind, ProviderName, ResolvedModel,
493        SessionId, SubagentEventForwarding, SummarySlice, ToolCallIdPolicy, Usage, UserMessage,
494    };
495
496    use super::*;
497
498    #[tokio::test]
499    async fn plan_disables_previous_response_chaining_when_compacted_prefix_exists() {
500        let manager = DefaultContextManager::default();
501        let outcome = manager
502            .plan(
503                &SessionBlueprint {
504                    session_id: SessionId::new(),
505                    parent_session_id: None,
506                    default_model: "default".into(),
507                    subagent_model: "subagent".into(),
508                    subagent_event_forwarding: SubagentEventForwarding::Off,
509                    snapshot_revision: "r1".into(),
510                    working_dir: ".".into(),
511                    system_prompt_seed: Vec::new(),
512                    max_turns: None,
513                    subagent_depth: 0,
514                },
515                &SessionState {
516                    compacted_prefix: vec![serde_json::json!({
517                        "type": "compaction",
518                        "id": "cmp_1",
519                        "encrypted_content": "x",
520                    })],
521                    summaries: vec![SummarySlice {
522                        id: "summary-1".to_owned(),
523                        text: "summary".to_owned(),
524                    }],
525                    messages: vec![Message::User(UserMessage::text("hello"))],
526                    last_response_id: Some("resp_1".to_owned()),
527                    messages_seen_by_provider: 1,
528                    ..SessionState::default()
529                },
530                &ObservedState {
531                    cwd: ".".into(),
532                    git_branch: None,
533                    git_dirty: None,
534                    now_utc: Utc::now(),
535                    env_facts: Default::default(),
536                },
537                &ResourceSnapshot::empty(),
538                &[],
539                &ResolvedModel {
540                    role: ModelRole::default(),
541                    id: ModelId::from("default"),
542                    provider: ProviderName::from("fake"),
543                    provider_kind: ProviderKind::Fake,
544                    api_kind: halter_protocol::ApiKind::Fake,
545                    model: "fake".to_owned(),
546                    max_input_tokens: None,
547                    max_output_tokens: None,
548                    reasoning: None,
549                    tokens_per_minute: None,
550                },
551                &NoopProvider,
552            )
553            .await
554            .expect("plan");
555
556        assert!(outcome.previous_response_id.is_none());
557    }
558
559    struct NoopProvider;
560
561    #[async_trait]
562    impl Provider for NoopProvider {
563        fn capabilities(&self) -> ProviderCapabilities {
564            ProviderCapabilities {
565                supports_compaction: true,
566                tool_call_id_policy: ToolCallIdPolicy::ProviderSupplied,
567                ..ProviderCapabilities::default()
568            }
569        }
570
571        async fn stream(
572            &self,
573            _request: halter_protocol::ProviderRequest,
574            _cancel: tokio_util::sync::CancellationToken,
575        ) -> anyhow::Result<
576            futures::stream::BoxStream<
577                'static,
578                Result<halter_protocol::StreamEvent, halter_protocol::ProviderError>,
579            >,
580        > {
581            anyhow::bail!("stream should not be called in this test");
582        }
583
584        async fn compact(
585            &self,
586            _request: ProviderCompactionRequest,
587            _cancel: tokio_util::sync::CancellationToken,
588        ) -> anyhow::Result<halter_protocol::ProviderCompactionResponse> {
589            Ok(halter_protocol::ProviderCompactionResponse {
590                output: vec![serde_json::json!({
591                    "type": "compaction",
592                    "id": "cmp_1",
593                    "encrypted_content": "summary",
594                })],
595                usage: Usage::default(),
596            })
597        }
598    }
599
600    #[test]
601    fn compaction_instructions_append_custom_text() {
602        let instructions = compaction_instructions(Some("Focus on decisions."));
603        assert!(instructions.contains("Compress the conversation"));
604        assert!(instructions.contains("Focus on decisions."));
605    }
606
607    #[test]
608    fn compaction_instructions_ignore_blank_custom_text() {
609        assert_eq!(
610            compaction_instructions(Some("   ")),
611            DEFAULT_COMPACTION_PROMPT_MARKDOWN.trim()
612        );
613    }
614}