zeph_agent_context/summarization/
scheduling.rs1use std::hash::Hash as _;
13
14use zeph_common::task_supervisor::BlockingHandle;
15use zeph_llm::any::AnyProvider;
16use zeph_llm::provider::{LlmProvider as _, Message, MessageMetadata, Role};
17
18use crate::compaction::SubgoalExtractionResult;
19use crate::state::ContextSummarizationView;
20use zeph_context::budget::ContextBudget;
21use zeph_context::manager::CompactionTier;
22
23use super::deferred::apply_deferred_summaries;
24use super::pruning::prune_tool_outputs;
25
26#[allow(
33 clippy::cast_precision_loss,
34 clippy::cast_possible_truncation,
35 clippy::cast_sign_loss
36)]
37pub(crate) fn maybe_soft_compact_mid_iteration(summ: &mut ContextSummarizationView<'_>) {
38 if summ
39 .context_manager
40 .compaction_state()
41 .is_compacted_this_turn()
42 {
43 return;
44 }
45 if !matches!(
46 summ.context_manager
47 .compaction_tier(*summ.cached_prompt_tokens),
48 CompactionTier::Soft | CompactionTier::Hard
49 ) {
50 return;
51 }
52 let budget = summ
53 .context_manager
54 .budget
55 .as_ref()
56 .map_or(0, ContextBudget::max_tokens);
57 let soft_threshold = (budget as f32 * summ.context_manager.soft_compaction_threshold) as usize;
58 let cached = usize::try_from(*summ.cached_prompt_tokens).unwrap_or(usize::MAX);
59
60 apply_deferred_summaries(summ);
61 let min_to_free = cached.saturating_sub(soft_threshold);
62 if min_to_free > 0 {
63 prune_tool_outputs(summ, min_to_free);
64 }
65 tracing::debug!(
66 cached_tokens = *summ.cached_prompt_tokens,
67 soft_threshold,
68 "mid-iteration soft compaction complete"
69 );
70}
71
72pub(crate) fn maybe_refresh_task_goal(summ: &mut ContextSummarizationView<'_>) {
82 match &summ.context_manager.compression.pruning_strategy {
83 zeph_config::PruningStrategy::Reactive
84 | zeph_config::PruningStrategy::Subgoal
85 | zeph_config::PruningStrategy::SubgoalMig => return,
86 zeph_config::PruningStrategy::TaskAware | zeph_config::PruningStrategy::Mig => {}
87 }
88
89 if summ.pending_task_goal.is_some() {
91 apply_completed_task_goal(summ);
92 }
93
94 if summ.pending_task_goal.is_some() {
96 return;
97 }
98
99 let Some(hash) = last_user_content_hash(summ.messages) else {
100 return;
101 };
102
103 if *summ.task_goal_user_msg_hash == Some(hash) {
104 return;
105 }
106
107 *summ.task_goal_user_msg_hash = Some(hash);
108 let recent = recent_user_assistant_excerpt(summ.messages, 10, false);
109 let provider = summ.summarization_deps.provider.clone();
110 let handle = spawn_task_goal_extraction(provider, recent, &summ.task_supervisor);
111 *summ.pending_task_goal = Some(handle);
112 tracing::debug!("extract_task_goal: background task spawned");
113 if let Some(ref tx) = summ.status_tx {
114 let _ = tx.send("Extracting task goal...".into());
115 }
116}
117
118pub(crate) fn maybe_refresh_subgoal(summ: &mut ContextSummarizationView<'_>) {
123 match &summ.context_manager.compression.pruning_strategy {
124 zeph_config::PruningStrategy::Subgoal | zeph_config::PruningStrategy::SubgoalMig => {}
125 _ => return,
126 }
127
128 let msg_len = summ.messages.len();
129
130 if summ.pending_subgoal.is_some() {
132 apply_completed_subgoal(summ, msg_len);
133 }
134
135 if summ.pending_subgoal.is_some() {
137 return;
138 }
139
140 let last_user_content = summ
141 .messages
142 .iter()
143 .rev()
144 .find(|m| m.role == Role::User && m.metadata.visibility.is_agent_visible())
145 .map(|m| m.content.as_str())
146 .unwrap_or_default();
147
148 if last_user_content.is_empty() {
149 return;
150 }
151
152 let hash = {
153 let mut hasher = std::collections::hash_map::DefaultHasher::new();
154 last_user_content.hash(&mut hasher);
155 std::hash::Hasher::finish(&hasher)
156 };
157
158 if *summ.subgoal_user_msg_hash == Some(hash) {
159 return;
160 }
161 *summ.subgoal_user_msg_hash = Some(hash);
162
163 let recent = recent_user_assistant_excerpt(summ.messages, 6, true);
164 let provider = summ.summarization_deps.provider.clone();
165 let handle = spawn_subgoal_extraction(provider, recent, &summ.task_supervisor);
166 *summ.pending_subgoal = Some(handle);
167 tracing::debug!("subgoal_extraction: background task spawned");
168 if let Some(ref tx) = summ.status_tx {
169 let _ = tx.send("Tracking subgoal...".into());
170 }
171}
172
173fn apply_completed_task_goal(summ: &mut ContextSummarizationView<'_>) {
180 if let Some(handle) = summ.pending_task_goal.take() {
181 match handle.try_join() {
182 Ok(Ok(Some(goal))) => {
183 tracing::debug!("extract_task_goal: background result applied");
184 *summ.current_task_goal = Some(goal);
185 }
186 Ok(Ok(None)) => {}
187 Ok(Err(e)) => tracing::debug!("extract_task_goal: task error: {e}"),
188 Err(handle) => {
189 *summ.pending_task_goal = Some(handle);
190 return;
191 }
192 }
193 if let Some(ref tx) = summ.status_tx {
195 let _ = tx.send(String::new());
196 }
197 }
198}
199
200fn apply_completed_subgoal(summ: &mut ContextSummarizationView<'_>, msg_len: usize) {
202 if let Some(handle) = summ.pending_subgoal.take() {
203 match handle.try_join() {
204 Ok(Ok(Some(result))) => {
205 let is_transition = result.completed.is_some();
206 if is_transition {
207 register_subgoal_transition(summ, &result, msg_len);
208 } else {
209 register_subgoal_continuation(summ, &result, msg_len);
210 }
211 }
212 Ok(Ok(None)) => {}
213 Ok(Err(e)) => tracing::debug!("subgoal_extraction: task error: {e}"),
214 Err(handle) => {
215 *summ.pending_subgoal = Some(handle);
216 return;
217 }
218 }
219 if let Some(ref tx) = summ.status_tx {
220 let _ = tx.send(String::new());
221 }
222 }
223}
224
225fn register_subgoal_transition(
226 summ: &mut ContextSummarizationView<'_>,
227 result: &SubgoalExtractionResult,
228 msg_len: usize,
229) {
230 if let Some(completed_desc) = &result.completed {
231 tracing::debug!(
232 completed = completed_desc.as_str(),
233 "subgoal transition detected"
234 );
235 }
236 summ.subgoal_registry
237 .complete_active(msg_len.saturating_sub(1));
238 let new_id = summ
239 .subgoal_registry
240 .push_active(result.current.clone(), msg_len.saturating_sub(1));
241 summ.subgoal_registry
242 .extend_active(msg_len.saturating_sub(1));
243 tracing::debug!(
244 current = result.current.as_str(),
245 id = new_id.0,
246 "new active subgoal registered"
247 );
248}
249
250fn register_subgoal_continuation(
251 summ: &mut ContextSummarizationView<'_>,
252 result: &SubgoalExtractionResult,
253 msg_len: usize,
254) {
255 let is_first = summ.subgoal_registry.subgoals.is_empty();
256 if is_first {
257 let id = summ
258 .subgoal_registry
259 .push_active(result.current.clone(), msg_len.saturating_sub(1));
260 if msg_len > 2 {
261 summ.subgoal_registry.tag_range(1, msg_len - 2, id);
262 }
263 summ.subgoal_registry
264 .extend_active(msg_len.saturating_sub(1));
265 tracing::debug!(
266 current = result.current.as_str(),
267 id = id.0,
268 retroactive_msgs = msg_len.saturating_sub(2),
269 "first subgoal registered with retroactive tagging"
270 );
271 } else {
272 summ.subgoal_registry
273 .extend_active(msg_len.saturating_sub(1));
274 tracing::debug!(current = result.current.as_str(), "active subgoal extended");
275 }
276}
277
278fn last_user_content_hash(messages: &[Message]) -> Option<u64> {
280 let content = messages
281 .iter()
282 .rev()
283 .find(|m| m.role == Role::User)
284 .map(|m| m.content.as_str())
285 .unwrap_or_default();
286
287 if content.is_empty() {
288 return None;
289 }
290
291 let mut hasher = std::collections::hash_map::DefaultHasher::new();
292 content.hash(&mut hasher);
293 Some(std::hash::Hasher::finish(&hasher))
294}
295
296fn recent_user_assistant_excerpt(
298 messages: &[Message],
299 take: usize,
300 agent_visible_only: bool,
301) -> Vec<(Role, String)> {
302 messages
303 .iter()
304 .filter(|m| {
305 let role_ok = matches!(m.role, Role::User | Role::Assistant);
306 let visible_ok = !agent_visible_only || m.metadata.visibility.is_agent_visible();
307 role_ok && visible_ok
308 })
309 .rev()
310 .take(take)
311 .collect::<Vec<_>>()
312 .into_iter()
313 .rev()
314 .map(|m| (m.role, m.content.clone()))
315 .collect()
316}
317
318#[must_use]
328pub fn parse_subgoal_extraction_response(response: &str) -> SubgoalExtractionResult {
329 let trimmed = response.trim();
330
331 if let Some(current_pos) = trimmed.find("CURRENT:") {
332 let after_current = &trimmed[current_pos + "CURRENT:".len()..];
333 let (current_line_raw, remainder_raw) = after_current
334 .split_once('\n')
335 .map_or((after_current, ""), |(l, r)| (l, r));
336 let current_line = current_line_raw.trim();
337 let remainder = remainder_raw.trim();
338
339 if current_line.is_empty() {
340 return SubgoalExtractionResult {
341 current: trimmed.to_string(),
342 completed: None,
343 };
344 }
345
346 let current = current_line.to_string();
347
348 let completed = if let Some(comp_pos) = remainder.find("COMPLETED:") {
349 let comp_text = remainder[comp_pos + "COMPLETED:".len()..].trim();
350 let comp_line = comp_text
351 .split('\n')
352 .next()
353 .unwrap_or("")
354 .trim()
355 .to_string();
356 if comp_line.is_empty() || comp_line.eq_ignore_ascii_case("none") {
357 None
358 } else {
359 Some(comp_line)
360 }
361 } else {
362 None
363 };
364
365 return SubgoalExtractionResult { current, completed };
366 }
367
368 SubgoalExtractionResult {
369 current: trimmed.to_string(),
370 completed: None,
371 }
372}
373
374fn spawn_task_goal_extraction(
376 provider: AnyProvider,
377 recent: Vec<(Role, String)>,
378 supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
379) -> BlockingHandle<Option<String>> {
380 let task = async move {
381 if recent.is_empty() {
382 return None;
383 }
384
385 let mut context_text = String::new();
386 for (role, content) in &recent {
387 let role_str = match role {
388 Role::User => "user",
389 Role::Assistant => "assistant",
390 Role::System => "system",
391 };
392 let preview = if content.len() > 300 {
393 let end = content.floor_char_boundary(300);
394 &content[..end]
395 } else {
396 content.as_str()
397 };
398 let _ = std::fmt::write(&mut context_text, format_args!("[{role_str}]: {preview}\n"));
399 }
400
401 let prompt = format!(
402 "Extract the current task goal from this conversation excerpt in one concise \
403 sentence.\nFocus on what the user is trying to accomplish right now.\n\
404 Respond with only the goal sentence, no preamble.\n\n\
405 <conversation>\n{context_text}</conversation>"
406 );
407
408 let msgs = [Message {
409 role: Role::User,
410 content: prompt,
411 parts: vec![],
412 metadata: MessageMetadata::default(),
413 }];
414
415 match tokio::time::timeout(std::time::Duration::from_secs(30), provider.chat(&msgs)).await {
416 Ok(Ok(goal)) => {
417 let trimmed = goal.trim();
418 if trimmed.is_empty() {
419 None
420 } else {
421 const MAX_GOAL_CHARS: usize = 500;
422 if trimmed.len() > MAX_GOAL_CHARS {
423 tracing::warn!(
424 len = trimmed.len(),
425 "extract_task_goal: LLM returned oversized goal; truncating"
426 );
427 let end = trimmed.floor_char_boundary(MAX_GOAL_CHARS);
428 Some(trimmed[..end].to_string())
429 } else {
430 Some(trimmed.to_string())
431 }
432 }
433 }
434 Ok(Err(e)) => {
435 tracing::debug!("extract_task_goal: LLM error: {e:#}");
436 None
437 }
438 Err(_) => {
439 tracing::debug!("extract_task_goal: timed out");
440 None
441 }
442 }
443 };
444 spawn_oneshot(
445 supervisor,
446 std::sync::Arc::from("agent.compaction.task_goal"),
447 move || task,
448 )
449}
450
451fn spawn_subgoal_extraction(
453 provider: AnyProvider,
454 recent: Vec<(Role, String)>,
455 supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
456) -> BlockingHandle<Option<SubgoalExtractionResult>> {
457 let task = async move {
458 if recent.is_empty() {
459 return None;
460 }
461
462 let mut context_text = String::new();
463 for (role, content) in &recent {
464 let role_str = match role {
465 Role::User => "user",
466 Role::Assistant => "assistant",
467 Role::System => "system",
468 };
469 let preview = if content.len() > 300 {
470 let end = content.floor_char_boundary(300);
471 &content[..end]
472 } else {
473 content.as_str()
474 };
475 let _ = std::fmt::write(&mut context_text, format_args!("[{role_str}]: {preview}\n"));
476 }
477
478 let prompt = format!(
479 "Given this conversation excerpt, identify the agent's CURRENT subgoal in one \
480 sentence. A subgoal is the immediate objective the agent is working toward right \
481 now, not the overall task.\n\n\
482 If the agent just completed a subgoal (answered a question, finished a subtask), \
483 also state the COMPLETED subgoal.\n\n\
484 Respond in this exact format:\n\
485 CURRENT: <one sentence describing current subgoal>\n\
486 COMPLETED: <one sentence describing just-completed subgoal, or NONE>\n\n\
487 <conversation>\n{context_text}</conversation>"
488 );
489
490 let msgs = [Message {
491 role: Role::User,
492 content: prompt,
493 parts: vec![],
494 metadata: MessageMetadata::default(),
495 }];
496
497 let response =
498 match tokio::time::timeout(std::time::Duration::from_secs(30), provider.chat(&msgs))
499 .await
500 {
501 Ok(Ok(r)) => r,
502 Ok(Err(e)) => {
503 tracing::debug!("subgoal_extraction: LLM error: {e:#}");
504 return None;
505 }
506 Err(_) => {
507 tracing::debug!("subgoal_extraction: timed out");
508 return None;
509 }
510 };
511
512 Some(parse_subgoal_extraction_response(&response))
513 };
514 spawn_oneshot(
515 supervisor,
516 std::sync::Arc::from("agent.compaction.subgoal"),
517 move || task,
518 )
519}
520
521fn spawn_oneshot<F, Fut, R>(
522 supervisor: &std::sync::Arc<zeph_common::TaskSupervisor>,
523 name: std::sync::Arc<str>,
524 factory: F,
525) -> BlockingHandle<R>
526where
527 F: FnOnce() -> Fut + Send + 'static,
528 Fut: std::future::Future<Output = R> + Send + 'static,
529 R: Send + 'static,
530{
531 supervisor.spawn_oneshot(name, factory)
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537
538 #[test]
539 fn parse_well_formed_with_both() {
540 let response = "CURRENT: Implement login\nCOMPLETED: Setup database";
541 let result = parse_subgoal_extraction_response(response);
542 assert_eq!(result.current, "Implement login");
543 assert_eq!(result.completed, Some("Setup database".to_string()));
544 }
545
546 #[test]
547 fn parse_well_formed_no_completed() {
548 let response = "CURRENT: Fetch user data\nCOMPLETED: NONE";
549 let result = parse_subgoal_extraction_response(response);
550 assert_eq!(result.current, "Fetch user data");
551 assert_eq!(result.completed, None);
552 }
553
554 #[test]
555 fn parse_malformed_no_current_prefix() {
556 let response = "Just some random text about subgoals";
557 let result = parse_subgoal_extraction_response(response);
558 assert_eq!(result.current, "Just some random text about subgoals");
559 assert_eq!(result.completed, None);
560 }
561
562 #[test]
563 fn parse_malformed_empty_current() {
564 let response = "CURRENT: \nCOMPLETED: Setup";
565 let result = parse_subgoal_extraction_response(response);
566 assert_eq!(result.current.trim(), "CURRENT: \nCOMPLETED: Setup");
567 assert_eq!(result.completed, None);
568 }
569}