Skip to main content

zeph_agent_context/summarization/
scheduling.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Compaction scheduling: tiered compaction dispatch, proactive compression, and
5//! background goal/subgoal extraction.
6//!
7//! The three-tier model:
8//! - **Soft** — apply deferred summaries + prune tool outputs (no LLM).
9//! - **Hard** — soft tier + LLM full summarization.
10//! - **Proactive** — proactively compress before the hard threshold is reached.
11
12use std::hash::Hash as _;
13
14use zeph_common::task_supervisor::BlockingHandle;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{LlmProvider as _, Message, MessageMetadata, Role};
17
18use crate::compaction::SubgoalExtractionResult;
19use crate::state::ContextSummarizationView;
20use zeph_context::budget::ContextBudget;
21use zeph_context::manager::CompactionTier;
22
23use super::deferred::apply_deferred_summaries;
24use super::pruning::prune_tool_outputs;
25
26/// Soft-only compaction for mid-iteration use inside tool execution loops.
27///
28/// Applies deferred tool summaries and prunes tool outputs down to the soft threshold.
29/// Never triggers Hard tier (no LLM call), never increments `turns_since_last_hard_compaction`,
30/// and never decrements the cooldown counter. Returns immediately when `compacted_this_turn`
31/// is set or when context usage is below the soft threshold.
32#[allow(
33    clippy::cast_precision_loss,
34    clippy::cast_possible_truncation,
35    clippy::cast_sign_loss
36)]
37pub(crate) fn maybe_soft_compact_mid_iteration(summ: &mut ContextSummarizationView<'_>) {
38    if summ
39        .context_manager
40        .compaction_state()
41        .is_compacted_this_turn()
42    {
43        return;
44    }
45    if !matches!(
46        summ.context_manager
47            .compaction_tier(*summ.cached_prompt_tokens),
48        CompactionTier::Soft | CompactionTier::Hard
49    ) {
50        return;
51    }
52    let budget = summ
53        .context_manager
54        .budget
55        .as_ref()
56        .map_or(0, ContextBudget::max_tokens);
57    let soft_threshold = (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
58    let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
59
60    apply_deferred_summaries(summ);
61    let min_to_free = cached.saturating_sub(soft_threshold);
62    if min_to_free > 0 {
63        prune_tool_outputs(summ, min_to_free);
64    }
65    tracing::debug!(
66        cached_tokens = *summ.cached_prompt_tokens,
67        soft_threshold,
68        "mid-iteration soft compaction complete"
69    );
70}
71
72/// Refresh the cached task goal when the last user message has changed.
73///
74/// Two-phase non-blocking design:
75/// - **Phase 1 (apply)**: if the background extraction task from last compaction has finished,
76///   apply its result to `current_task_goal`.
77/// - **Phase 2 (schedule)**: if the user message hash has changed and no task is in-flight,
78///   spawn a new background extraction. Current compaction uses the cached goal.
79///
80/// Only runs when a `TaskAware` or `Mig` pruning strategy is active.
81pub(crate) fn maybe_refresh_task_goal(summ: &mut ContextSummarizationView<'_>) {
82    match &summ.context_manager.compression.pruning_strategy {
83        zeph_config::PruningStrategy::Reactive
84        | zeph_config::PruningStrategy::Subgoal
85        | zeph_config::PruningStrategy::SubgoalMig => return,
86        zeph_config::PruningStrategy::TaskAware | zeph_config::PruningStrategy::Mig => {}
87    }
88
89    // Phase 1: apply completed background result.
90    if summ.pending_task_goal.is_some() {
91        apply_completed_task_goal(summ);
92    }
93
94    // Phase 2: no task in flight — may schedule a new one.
95    if summ.pending_task_goal.is_some() {
96        return;
97    }
98
99    let Some(hash) = last_user_content_hash(summ.messages) else {
100        return;
101    };
102
103    if *summ.task_goal_user_msg_hash == Some(hash) {
104        return;
105    }
106
107    *summ.task_goal_user_msg_hash = Some(hash);
108    let recent = recent_user_assistant_excerpt(summ.messages, 10, false);
109    let provider = summ.summarization_deps.provider.clone();
110    let handle = spawn_task_goal_extraction(provider, recent, &summ.task_supervisor);
111    *summ.pending_task_goal = Some(handle);
112    tracing::debug!("extract_task_goal: background task spawned");
113    if let Some(ref tx) = summ.status_tx {
114        let _ = tx.send("Extracting task goal...".into());
115    }
116}
117
118/// Refresh the subgoal registry when the last user message has changed.
119///
120/// Mirrors the two-phase `maybe_refresh_task_goal` pattern.
121/// Only runs when a `Subgoal` or `SubgoalMig` pruning strategy is active.
122pub(crate) fn maybe_refresh_subgoal(summ: &mut ContextSummarizationView<'_>) {
123    match &summ.context_manager.compression.pruning_strategy {
124        zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {}
125        _ => return,
126    }
127
128    let msg_len = summ.messages.len();
129
130    // Phase 1: apply completed background result.
131    if summ.pending_subgoal.is_some() {
132        apply_completed_subgoal(summ, msg_len);
133    }
134
135    // Phase 2: no task in flight.
136    if summ.pending_subgoal.is_some() {
137        return;
138    }
139
140    let last_user_content = summ
141        .messages
142        .iter()
143        .rev()
144        .find(|m| m.role == Role::User && m.metadata.visibility.is_agent_visible())
145        .map(|m| m.content.as_str())
146        .unwrap_or_default();
147
148    if last_user_content.is_empty() {
149        return;
150    }
151
152    let hash = {
153        let mut hasher = std::collections::hash_map::DefaultHasher::new();
154        last_user_content.hash(&mut hasher);
155        std::hash::Hasher::finish(&hasher)
156    };
157
158    if *summ.subgoal_user_msg_hash == Some(hash) {
159        return;
160    }
161    *summ.subgoal_user_msg_hash = Some(hash);
162
163    let recent = recent_user_assistant_excerpt(summ.messages, 6, true);
164    let provider = summ.summarization_deps.provider.clone();
165    let handle = spawn_subgoal_extraction(provider, recent, &summ.task_supervisor);
166    *summ.pending_subgoal = Some(handle);
167    tracing::debug!("subgoal_extraction: background task spawned");
168    if let Some(ref tx) = summ.status_tx {
169        let _ = tx.send("Tracking subgoal...".into());
170    }
171}
172
173// ── Private helpers ──────────────────────────────────────────────────────────
174
175/// Apply a completed background task-goal extraction result to `current_task_goal`.
176///
177/// Re-stores the handle if the task is not yet complete so the caller can check again
178/// next turn.
179fn apply_completed_task_goal(summ: &mut ContextSummarizationView<'_>) {
180    if let Some(handle) = summ.pending_task_goal.take() {
181        match handle.try_join() {
182            Ok(Ok(Some(goal))) => {
183                tracing::debug!("extract_task_goal: background result applied");
184                *summ.current_task_goal = Some(goal);
185            }
186            Ok(Ok(None)) => {}
187            Ok(Err(e)) => tracing::debug!("extract_task_goal: task error: {e}"),
188            Err(handle) => {
189                *summ.pending_task_goal = Some(handle);
190                return;
191            }
192        }
193        // Clear spinner on all completion paths.
194        if let Some(ref tx) = summ.status_tx {
195            let _ = tx.send(String::new());
196        }
197    }
198}
199
200/// Apply a completed background subgoal extraction result to the registry.
201fn apply_completed_subgoal(summ: &mut ContextSummarizationView<'_>, msg_len: usize) {
202    if let Some(handle) = summ.pending_subgoal.take() {
203        match handle.try_join() {
204            Ok(Ok(Some(result))) => {
205                let is_transition = result.completed.is_some();
206                if is_transition {
207                    register_subgoal_transition(summ, &result, msg_len);
208                } else {
209                    register_subgoal_continuation(summ, &result, msg_len);
210                }
211            }
212            Ok(Ok(None)) => {}
213            Ok(Err(e)) => tracing::debug!("subgoal_extraction: task error: {e}"),
214            Err(handle) => {
215                *summ.pending_subgoal = Some(handle);
216                return;
217            }
218        }
219        if let Some(ref tx) = summ.status_tx {
220            let _ = tx.send(String::new());
221        }
222    }
223}
224
225fn register_subgoal_transition(
226    summ: &mut ContextSummarizationView<'_>,
227    result: &SubgoalExtractionResult,
228    msg_len: usize,
229) {
230    if let Some(completed_desc) = &result.completed {
231        tracing::debug!(
232            completed = completed_desc.as_str(),
233            "subgoal transition detected"
234        );
235    }
236    summ.subgoal_registry
237        .complete_active(msg_len.saturating_sub(1));
238    let new_id = summ
239        .subgoal_registry
240        .push_active(result.current.clone(), msg_len.saturating_sub(1));
241    summ.subgoal_registry
242        .extend_active(msg_len.saturating_sub(1));
243    tracing::debug!(
244        current = result.current.as_str(),
245        id = new_id.0,
246        "new active subgoal registered"
247    );
248}
249
250fn register_subgoal_continuation(
251    summ: &mut ContextSummarizationView<'_>,
252    result: &SubgoalExtractionResult,
253    msg_len: usize,
254) {
255    let is_first = summ.subgoal_registry.subgoals.is_empty();
256    if is_first {
257        let id = summ
258            .subgoal_registry
259            .push_active(result.current.clone(), msg_len.saturating_sub(1));
260        if msg_len > 2 {
261            summ.subgoal_registry.tag_range(1, msg_len - 2, id);
262        }
263        summ.subgoal_registry
264            .extend_active(msg_len.saturating_sub(1));
265        tracing::debug!(
266            current = result.current.as_str(),
267            id = id.0,
268            retroactive_msgs = msg_len.saturating_sub(2),
269            "first subgoal registered with retroactive tagging"
270        );
271    } else {
272        summ.subgoal_registry
273            .extend_active(msg_len.saturating_sub(1));
274        tracing::debug!(current = result.current.as_str(), "active subgoal extended");
275    }
276}
277
278/// Compute a hash of the last user message content.
279fn last_user_content_hash(messages: &[Message]) -> Option<u64> {
280    let content = messages
281        .iter()
282        .rev()
283        .find(|m| m.role == Role::User)
284        .map(|m| m.content.as_str())
285        .unwrap_or_default();
286
287    if content.is_empty() {
288        return None;
289    }
290
291    let mut hasher = std::collections::hash_map::DefaultHasher::new();
292    content.hash(&mut hasher);
293    Some(std::hash::Hasher::finish(&hasher))
294}
295
296/// Collect recent user/assistant messages for LLM extraction prompts.
297fn recent_user_assistant_excerpt(
298    messages: &[Message],
299    take: usize,
300    agent_visible_only: bool,
301) -> Vec<(Role, String)> {
302    messages
303        .iter()
304        .filter(|m| {
305            let role_ok = matches!(m.role, Role::User | Role::Assistant);
306            let visible_ok = !agent_visible_only || m.metadata.visibility.is_agent_visible();
307            role_ok && visible_ok
308        })
309        .rev()
310        .take(take)
311        .collect::<Vec<_>>()
312        .into_iter()
313        .rev()
314        .map(|m| (m.role, m.content.clone()))
315        .collect()
316}
317
318/// Parse the structured LLM response for subgoal extraction.
319///
320/// Expected format:
321/// ```text
322/// CURRENT: <description>
323/// COMPLETED: <description or NONE>
324/// ```
325///
326/// Falls back to treating the entire response as the current subgoal on malformed input.
327#[must_use]
328pub fn parse_subgoal_extraction_response(response: &str) -> SubgoalExtractionResult {
329    let trimmed = response.trim();
330
331    if let Some(current_pos) = trimmed.find("CURRENT:") {
332        let after_current = &trimmed[current_pos + "CURRENT:".len()..];
333        let (current_line_raw, remainder_raw) = after_current
334            .split_once('\n')
335            .map_or((after_current, ""), |(l, r)| (l, r));
336        let current_line = current_line_raw.trim();
337        let remainder = remainder_raw.trim();
338
339        if current_line.is_empty() {
340            return SubgoalExtractionResult {
341                current: trimmed.to_string(),
342                completed: None,
343            };
344        }
345
346        let current = current_line.to_string();
347
348        let completed = if let Some(comp_pos) = remainder.find("COMPLETED:") {
349            let comp_text = remainder[comp_pos + "COMPLETED:".len()..].trim();
350            let comp_line = comp_text
351                .split('\n')
352                .next()
353                .unwrap_or("")
354                .trim()
355                .to_string();
356            if comp_line.is_empty() || comp_line.eq_ignore_ascii_case("none") {
357                None
358            } else {
359                Some(comp_line)
360            }
361        } else {
362            None
363        };
364
365        return SubgoalExtractionResult { current, completed };
366    }
367
368    SubgoalExtractionResult {
369        current: trimmed.to_string(),
370        completed: None,
371    }
372}
373
374/// Spawn a background task-goal extraction task.
375fn spawn_task_goal_extraction(
376    provider: AnyProvider,
377    recent: Vec<(Role, String)>,
378    supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
379) -> BlockingHandle<Option<String>> {
380    let task = async move {
381        if recent.is_empty() {
382            return None;
383        }
384
385        let mut context_text = String::new();
386        for (role, content) in &recent {
387            let role_str = match role {
388                Role::User => "user",
389                Role::Assistant => "assistant",
390                Role::System => "system",
391            };
392            let preview = if content.len() > 300 {
393                let end = content.floor_char_boundary(300);
394                &content[..end]
395            } else {
396                content.as_str()
397            };
398            let _ = std::fmt::write(&mut context_text, format_args!("[{role_str}]: {preview}\n"));
399        }
400
401        let prompt = format!(
402            "Extract the current task goal from this conversation excerpt in one concise \
403             sentence.\nFocus on what the user is trying to accomplish right now.\n\
404             Respond with only the goal sentence, no preamble.\n\n\
405             <conversation>\n{context_text}</conversation>"
406        );
407
408        let msgs = [Message {
409            role: Role::User,
410            content: prompt,
411            parts: vec![],
412            metadata: MessageMetadata::default(),
413        }];
414
415        match tokio::time::timeout(std::time::Duration::from_secs(30), provider.chat(&msgs)).await {
416            Ok(Ok(goal)) => {
417                let trimmed = goal.trim();
418                if trimmed.is_empty() {
419                    None
420                } else {
421                    const MAX_GOAL_CHARS: usize = 500;
422                    if trimmed.len() > MAX_GOAL_CHARS {
423                        tracing::warn!(
424                            len = trimmed.len(),
425                            "extract_task_goal: LLM returned oversized goal; truncating"
426                        );
427                        let end = trimmed.floor_char_boundary(MAX_GOAL_CHARS);
428                        Some(trimmed[..end].to_string())
429                    } else {
430                        Some(trimmed.to_string())
431                    }
432                }
433            }
434            Ok(Err(e)) => {
435                tracing::debug!("extract_task_goal: LLM error: {e:#}");
436                None
437            }
438            Err(_) => {
439                tracing::debug!("extract_task_goal: timed out");
440                None
441            }
442        }
443    };
444    spawn_oneshot(
445        supervisor,
446        std::sync::Arc::from("agent.compaction.task_goal"),
447        move || task,
448    )
449}
450
451/// Spawn a background subgoal extraction task.
452fn spawn_subgoal_extraction(
453    provider: AnyProvider,
454    recent: Vec<(Role, String)>,
455    supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
456) -> BlockingHandle<Option<SubgoalExtractionResult>> {
457    let task = async move {
458        if recent.is_empty() {
459            return None;
460        }
461
462        let mut context_text = String::new();
463        for (role, content) in &recent {
464            let role_str = match role {
465                Role::User => "user",
466                Role::Assistant => "assistant",
467                Role::System => "system",
468            };
469            let preview = if content.len() > 300 {
470                let end = content.floor_char_boundary(300);
471                &content[..end]
472            } else {
473                content.as_str()
474            };
475            let _ = std::fmt::write(&mut context_text, format_args!("[{role_str}]: {preview}\n"));
476        }
477
478        let prompt = format!(
479            "Given this conversation excerpt, identify the agent's CURRENT subgoal in one \
480             sentence. A subgoal is the immediate objective the agent is working toward right \
481             now, not the overall task.\n\n\
482             If the agent just completed a subgoal (answered a question, finished a subtask), \
483             also state the COMPLETED subgoal.\n\n\
484             Respond in this exact format:\n\
485             CURRENT: <one sentence describing current subgoal>\n\
486             COMPLETED: <one sentence describing just-completed subgoal, or NONE>\n\n\
487             <conversation>\n{context_text}</conversation>"
488        );
489
490        let msgs = [Message {
491            role: Role::User,
492            content: prompt,
493            parts: vec![],
494            metadata: MessageMetadata::default(),
495        }];
496
497        let response =
498            match tokio::time::timeout(std::time::Duration::from_secs(30), provider.chat(&msgs))
499                .await
500            {
501                Ok(Ok(r)) => r,
502                Ok(Err(e)) => {
503                    tracing::debug!("subgoal_extraction: LLM error: {e:#}");
504                    return None;
505                }
506                Err(_) => {
507                    tracing::debug!("subgoal_extraction: timed out");
508                    return None;
509                }
510            };
511
512        Some(parse_subgoal_extraction_response(&response))
513    };
514    spawn_oneshot(
515        supervisor,
516        std::sync::Arc::from("agent.compaction.subgoal"),
517        move || task,
518    )
519}
520
521fn spawn_oneshot<F, Fut, R>(
522    supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
523    name: std::sync::Arc<str>,
524    factory: F,
525) -> BlockingHandle<R>
526where
527    F: FnOnce() -> Fut + Send + 'static,
528    Fut: std::future::Future<Output = R> + Send + 'static,
529    R: Send + 'static,
530{
531    supervisor.spawn_oneshot(name, factory)
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    #[test]
539    fn parse_well_formed_with_both() {
540        let response = "CURRENT: Implement login\nCOMPLETED: Setup database";
541        let result = parse_subgoal_extraction_response(response);
542        assert_eq!(result.current, "Implement login");
543        assert_eq!(result.completed, Some("Setup database".to_string()));
544    }
545
546    #[test]
547    fn parse_well_formed_no_completed() {
548        let response = "CURRENT: Fetch user data\nCOMPLETED: NONE";
549        let result = parse_subgoal_extraction_response(response);
550        assert_eq!(result.current, "Fetch user data");
551        assert_eq!(result.completed, None);
552    }
553
554    #[test]
555    fn parse_malformed_no_current_prefix() {
556        let response = "Just some random text about subgoals";
557        let result = parse_subgoal_extraction_response(response);
558        assert_eq!(result.current, "Just some random text about subgoals");
559        assert_eq!(result.completed, None);
560    }
561
562    #[test]
563    fn parse_malformed_empty_current() {
564        let response = "CURRENT: \nCOMPLETED: Setup";
565        let result = parse_subgoal_extraction_response(response);
566        assert_eq!(result.current.trim(), "CURRENT: \nCOMPLETED: Setup");
567        assert_eq!(result.completed, None);
568    }
569}