1use std::fmt::Write as _;
14use std::sync::Arc;
15use std::time::Duration;
16
17use futures::StreamExt as _;
18use tracing::Instrument as _;
19use zeph_common::OVERFLOW_NOTICE_PREFIX;
20use zeph_common::memory::AnchoredSummary;
21use zeph_llm::LlmProvider as _;
22use zeph_llm::any::AnyProvider;
23use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
24
25pub trait MessageTokenCounter: Send + Sync {
30 fn count_message_tokens(&self, msg: &Message) -> usize;
32}
33
34#[derive(Clone)]
39pub struct SummarizationDeps {
40 pub provider: AnyProvider,
42 pub llm_timeout: Duration,
44 pub token_counter: Arc<dyn MessageTokenCounter>,
46 pub structured_summaries: bool,
48 #[allow(clippy::type_complexity)]
53 pub on_anchored_summary: Option<Arc<dyn Fn(&AnchoredSummary, bool) + Send + Sync>>,
54}
55
56pub async fn summarize_structured(
65 deps: &SummarizationDeps,
66 messages: &[Message],
67 guidelines: &str,
68) -> Result<AnchoredSummary, zeph_llm::LlmError> {
69 async move {
70 let prompt = build_anchored_summary_prompt(messages, guidelines);
71 let msgs = [Message {
72 role: Role::User,
73 content: prompt,
74 parts: vec![],
75 metadata: MessageMetadata::default(),
76 }];
77 let summary: AnchoredSummary = tokio::time::timeout(
78 deps.llm_timeout,
79 deps.provider.chat_typed_erased::<AnchoredSummary>(&msgs),
80 )
81 .await
82 .map_err(|_| zeph_llm::LlmError::Timeout)??;
83
84 if !summary.files_modified.is_empty() && summary.decisions_made.is_empty() {
85 tracing::warn!("structured summary: decisions_made is empty");
86 } else if summary.files_modified.is_empty() {
87 tracing::warn!(
88 "structured summary: files_modified is empty (may be a pure discussion session)"
89 );
90 }
91
92 if !summary.is_complete() {
93 tracing::warn!(
94 session_intent_empty = summary.session_intent.trim().is_empty(),
95 next_steps_empty = summary.next_steps.is_empty(),
96 "structured summary incomplete: mandatory fields missing, falling back to prose"
97 );
98 return Err(zeph_llm::LlmError::StructuredParse(
99 "structured summary missing mandatory fields".into(),
100 ));
101 }
102
103 if let Err(msg) = summary.validate() {
104 tracing::warn!(
105 error = %msg,
106 "structured summary failed field validation, falling back to prose"
107 );
108 return Err(zeph_llm::LlmError::StructuredParse(msg));
109 }
110
111 Ok(summary)
112 }
113 .instrument(tracing::info_span!(
114 "context.summarization.structured",
115 message_count = messages.len(),
116 ))
117 .await
118}
119
120pub async fn single_pass_summary(
125 deps: &SummarizationDeps,
126 messages: &[Message],
127 guidelines: &str,
128) -> Result<String, zeph_llm::LlmError> {
129 async move {
130 let prompt = build_chunk_prompt(messages, guidelines);
131 let msgs = [Message {
132 role: Role::User,
133 content: prompt,
134 parts: vec![],
135 metadata: MessageMetadata::default(),
136 }];
137 tokio::time::timeout(deps.llm_timeout, deps.provider.chat(&msgs))
138 .await
139 .map_err(|_| zeph_llm::LlmError::Timeout)?
140 }
141 .instrument(tracing::info_span!(
142 "context.summarization.single_pass",
143 message_count = messages.len(),
144 ))
145 .await
146}
147
148pub async fn summarize_with_llm(
157 deps: &SummarizationDeps,
158 messages: &[Message],
159 guidelines: &str,
160) -> Result<String, zeph_llm::LlmError> {
161 async move {
162 const CHUNK_TOKEN_BUDGET: usize = 4096;
163 const OVERSIZED_THRESHOLD: usize = CHUNK_TOKEN_BUDGET / 2;
164
165 let tc = Arc::clone(&deps.token_counter);
166 let chunks = crate::slot::chunk_messages(
167 messages,
168 CHUNK_TOKEN_BUDGET,
169 OVERSIZED_THRESHOLD,
170 move |msg| tc.count_message_tokens(msg),
171 );
172
173 if chunks.len() <= 1 {
174 return single_pass_summary(deps, messages, guidelines).await;
175 }
176
177 let partial_summaries = run_chunk_summaries(deps, chunks, guidelines).await;
178
179 if partial_summaries.is_empty() {
180 return single_pass_summary(deps, messages, guidelines).await;
181 }
182
183 let numbered = join_partial_summaries(&partial_summaries);
184
185 if deps.structured_summaries
186 && let Some(result) = try_structured_consolidation(deps, &numbered).await
187 {
188 return Ok(result);
189 }
190
191 prose_consolidation(deps, &numbered).await
192 }
193 .instrument(tracing::info_span!(
194 "context.summarization.with_llm",
195 message_count = messages.len(),
196 ))
197 .await
198}
199
200async fn run_chunk_summaries(
203 deps: &SummarizationDeps,
204 chunks: Vec<Vec<Message>>,
205 guidelines: &str,
206) -> Vec<String> {
207 let chunk_count = chunks.len();
208 async move {
209 let provider = deps.provider.clone();
210 let guidelines_arc: Arc<str> = Arc::from(guidelines);
211 let timeout = deps.llm_timeout;
212
213 let results: Vec<_> = futures::stream::iter(chunks.into_iter().map(|chunk| {
214 let guidelines_ref = Arc::clone(&guidelines_arc);
215 let prompt = build_chunk_prompt(&chunk, &guidelines_ref);
216 let p = provider.clone();
217 async move {
218 tokio::time::timeout(
219 timeout,
220 p.chat(&[Message {
221 role: Role::User,
222 content: prompt,
223 parts: vec![],
224 metadata: MessageMetadata::default(),
225 }]),
226 )
227 .await
228 .map_err(|_| zeph_llm::LlmError::Timeout)?
229 }
230 }))
231 .buffer_unordered(4)
232 .collect()
233 .await;
234
235 results
236 .into_iter()
237 .collect::<Result<Vec<_>, zeph_llm::LlmError>>()
238 .unwrap_or_else(|e| {
239 tracing::warn!(
240 "chunked compaction: one or more chunks failed: {e:#}, falling back to single-pass"
241 );
242 Vec::new()
243 })
244 }
245 .instrument(tracing::info_span!(
246 "context.summarization.chunk_summaries",
247 chunk_count,
248 ))
249 .await
250}
251
252fn join_partial_summaries(partials: &[String]) -> String {
253 let cap: usize = partials.iter().map(|s| s.len() + 8).sum();
254 let mut buf = String::with_capacity(cap);
255 for (i, s) in partials.iter().enumerate() {
256 if i > 0 {
257 buf.push_str("\n\n");
258 }
259 let _ = write!(buf, "{}. {s}", i + 1);
260 }
261 buf
262}
263
264async fn try_structured_consolidation(deps: &SummarizationDeps, numbered: &str) -> Option<String> {
265 async move {
266 let timeout = deps.llm_timeout;
267 let anchored_prompt = format!(
268 "<analysis>\n\
269 Merge these partial conversation summaries into a single structured summary.\n\
270 </analysis>\n\
271 \n\
272 Produce a JSON object with exactly these 5 fields:\n\
273 - session_intent: string — what the user is trying to accomplish\n\
274 - files_modified: string[] — file paths, function names, structs touched\n\
275 - decisions_made: string[] — each entry: \"Decision: X — Reason: Y\"\n\
276 - open_questions: string[] — unresolved questions or blockers\n\
277 - next_steps: string[] — concrete next actions\n\
278 \n\
279 Partial summaries:\n{numbered}"
280 );
281 let anchored_msgs = [Message {
282 role: Role::User,
283 content: anchored_prompt,
284 parts: vec![],
285 metadata: MessageMetadata::default(),
286 }];
287 match tokio::time::timeout(
288 timeout,
289 deps.provider
290 .chat_typed_erased::<AnchoredSummary>(&anchored_msgs),
291 )
292 .await
293 {
294 Ok(Ok(anchored)) if anchored.is_complete() => {
295 if let Some(ref cb) = deps.on_anchored_summary {
296 cb(&anchored, false);
297 }
298 Some(crate::slot::cap_summary(anchored.to_markdown(), 16_000))
299 }
300 Ok(Ok(anchored)) => {
301 tracing::warn!(
302 "chunked consolidation: structured summary incomplete, falling back to prose"
303 );
304 if let Some(ref cb) = deps.on_anchored_summary {
305 cb(&anchored, true);
306 }
307 None
308 }
309 Ok(Err(e)) => {
310 tracing::warn!(error = %e, "chunked consolidation: structured output failed, falling back to prose");
311 None
312 }
313 Err(_) => {
314 tracing::warn!(
315 "chunked consolidation: structured output timed out, falling back to prose"
316 );
317 None
318 }
319 }
320 }
321 .instrument(tracing::info_span!("context.summarization.structured_consolidation"))
322 .await
323}
324
325async fn prose_consolidation(
326 deps: &SummarizationDeps,
327 numbered: &str,
328) -> Result<String, zeph_llm::LlmError> {
329 async move {
330 let timeout = deps.llm_timeout;
331 let consolidation_prompt = format!(
332 "<analysis>\n\
333 Merge these partial conversation summaries into a single structured compaction note.\n\
334 Produce exactly these 9 sections covering all partial summaries:\n\
335 1. User Intent\n2. Technical Concepts\n3. Files & Code\n4. Errors & Fixes\n\
336 5. Problem Solving\n6. User Messages\n7. Pending Tasks\n8. Current Work\n9. Next Step\n\
337 </analysis>\n\n\
338 Partial summaries:\n{numbered}"
339 );
340 let consolidation_msgs = [Message {
341 role: Role::User,
342 content: consolidation_prompt,
343 parts: vec![],
344 metadata: MessageMetadata::default(),
345 }];
346 tokio::time::timeout(timeout, deps.provider.chat(&consolidation_msgs))
347 .await
348 .map_err(|_| zeph_llm::LlmError::Timeout)?
349 }
350 .instrument(tracing::info_span!("context.summarization.prose_consolidation"))
351 .await
352}
353
354#[must_use]
367pub fn build_chunk_prompt(messages: &[Message], guidelines: &str) -> String {
368 let history_text = format_history(messages);
369
370 let guidelines_section = guidelines_xml(guidelines);
371
372 format!(
373 "<analysis>\n\
374 Analyze this conversation and produce a structured compaction note for self-consumption.\n\
375 This note replaces the original messages in your context window — be thorough.\n\
376 Longer is better if it preserves actionable detail.\n\
377 </analysis>\n\
378 {guidelines_section}\n\
379 Produce exactly these 9 sections:\n\
380 1. User Intent — what the user is ultimately trying to accomplish\n\
381 2. Technical Concepts — key technologies, patterns, constraints discussed\n\
382 3. Files & Code — file paths, function names, structs, enums touched or relevant\n\
383 4. Errors & Fixes — every error encountered and whether/how it was resolved\n\
384 5. Problem Solving — approaches tried, decisions made, alternatives rejected\n\
385 6. User Messages — verbatim user requests that are still pending or relevant\n\
386 7. Pending Tasks — items explicitly promised or left TODO\n\
387 8. Current Work — the exact task in progress at the moment of compaction\n\
388 9. Next Step — the single most important action to take immediately after compaction\n\
389 \n\
390 Conversation:\n{history_text}"
391 )
392}
393
394#[must_use]
407pub fn build_anchored_summary_prompt(messages: &[Message], guidelines: &str) -> String {
408 let history_text = format_history(messages);
409 let guidelines_section = guidelines_xml(guidelines);
410
411 format!(
412 "<analysis>\n\
413 You are compacting a conversation into a structured summary for self-consumption.\n\
414 This summary replaces the original messages in your context window.\n\
415 Every field MUST be populated — empty fields mean lost information.\n\
416 </analysis>\n\
417 {guidelines_section}\n\
418 Produce a JSON object with exactly these 5 fields:\n\
419 - session_intent: string — what the user is trying to accomplish\n\
420 - files_modified: string[] — file paths, function names, structs touched\n\
421 - decisions_made: string[] — each entry: \"Decision: X — Reason: Y\"\n\
422 - open_questions: string[] — unresolved questions or blockers\n\
423 - next_steps: string[] — concrete next actions\n\
424 \n\
425 Be thorough. Preserve all file paths, line numbers, error messages, \
426 and specific identifiers — they cannot be recovered.\n\
427 \n\
428 Conversation:\n{history_text}"
429 )
430}
431
432#[must_use]
437pub fn build_metadata_summary(messages: &[Message], truncate: fn(&str, usize) -> String) -> String {
438 let mut user_count = 0usize;
439 let mut assistant_count = 0usize;
440 let mut system_count = 0usize;
441 let mut last_user = String::new();
442 let mut last_assistant = String::new();
443
444 for m in messages {
445 match m.role {
446 Role::User => {
447 user_count += 1;
448 if !m.content.is_empty() {
449 last_user.clone_from(&m.content);
450 }
451 }
452 Role::Assistant => {
453 assistant_count += 1;
454 if !m.content.is_empty() {
455 last_assistant.clone_from(&m.content);
456 }
457 }
458 Role::System => system_count += 1,
459 }
460 }
461
462 let last_user_preview = truncate(&last_user, 200);
463 let last_assistant_preview = truncate(&last_assistant, 200);
464
465 format!(
466 "[metadata summary — LLM compaction unavailable]\n\
467 Messages compacted: {} ({} user, {} assistant, {} system)\n\
468 Last user message: {last_user_preview}\n\
469 Last assistant message: {last_assistant_preview}",
470 messages.len(),
471 user_count,
472 assistant_count,
473 system_count,
474 )
475}
476
477#[must_use]
481pub fn build_tool_pair_summary_prompt(req: &Message, res: &Message) -> String {
482 format!(
483 "Produce a concise but technically precise summary of this tool invocation.\n\
484 Preserve all facts that would be needed to continue work without re-running the tool:\n\
485 - Tool name and key input parameters (file paths, function names, patterns, line ranges)\n\
486 - Exact findings: line numbers, struct/enum/function names, error messages, numeric values\n\
487 - Outcome: what was found, changed, created, or confirmed\n\
488 Do NOT omit specific identifiers, paths, or numbers — they cannot be recovered later.\n\
489 Use 2-4 sentences maximum.\n\n\
490 <tool_request>\n{}\n</tool_request>\n\n<tool_response>\n{}\n</tool_response>",
491 req.content, res.content
492 )
493}
494
495#[allow(
503 clippy::cast_precision_loss,
504 clippy::cast_possible_truncation,
505 clippy::cast_sign_loss,
506 clippy::cast_possible_wrap
507)]
508#[must_use]
509pub fn remove_tool_responses_middle_out(mut messages: Vec<Message>, fraction: f32) -> Vec<Message> {
510 let tool_indices: Vec<usize> = messages
511 .iter()
512 .enumerate()
513 .filter(|(_, m)| {
514 m.parts.iter().any(|p| {
515 matches!(
516 p,
517 MessagePart::ToolResult { .. } | MessagePart::ToolOutput { .. }
518 )
519 })
520 })
521 .map(|(i, _)| i)
522 .collect();
523
524 if tool_indices.is_empty() {
525 return messages;
526 }
527
528 let n = tool_indices.len();
529 let to_remove = ((n as f32 * fraction).ceil() as usize).min(n);
530
531 let center = n / 2;
532 let mut remove_set: Vec<usize> = Vec::with_capacity(to_remove);
533 let mut left = center as isize - 1;
534 let mut right = center;
535 let mut count = 0;
536
537 while count < to_remove {
538 if right < n {
539 remove_set.push(tool_indices[right]);
540 count += 1;
541 right += 1;
542 }
543 if count < to_remove && left >= 0 {
544 let idx = left as usize;
545 if !remove_set.contains(&tool_indices[idx]) {
546 remove_set.push(tool_indices[idx]);
547 count += 1;
548 }
549 }
550 left -= 1;
551 if left < 0 && right >= n {
552 break;
553 }
554 }
555
556 for &msg_idx in &remove_set {
557 let msg = &mut messages[msg_idx];
558 for part in &mut msg.parts {
559 match part {
560 MessagePart::ToolResult { content, .. } => {
561 let ref_notice = extract_overflow_ref(content).map_or_else(
562 || String::from("[compacted]"),
563 |uuid| {
564 format!("[tool output pruned; use read_overflow {uuid} to retrieve]")
565 },
566 );
567 *content = ref_notice;
568 }
569 MessagePart::ToolOutput {
570 body, compacted_at, ..
571 } if compacted_at.is_none() => {
572 let ref_notice = extract_overflow_ref(body)
573 .map(|uuid| {
574 format!("[tool output pruned; use read_overflow {uuid} to retrieve]")
575 })
576 .unwrap_or_default();
577 *body = ref_notice;
578 *compacted_at = Some(
579 std::time::SystemTime::now()
580 .duration_since(std::time::UNIX_EPOCH)
581 .unwrap_or_default()
582 .as_secs()
583 .cast_signed(),
584 );
585 }
586 _ => {}
587 }
588 }
589 msg.rebuild_content();
590 }
591 messages
592}
593
594#[must_use]
601pub fn extract_overflow_ref(body: &str) -> Option<&str> {
602 let start = body.find(OVERFLOW_NOTICE_PREFIX)?;
603 let rest = &body[start + OVERFLOW_NOTICE_PREFIX.len()..];
604 let end = rest.find(" \u{2014} ")?;
605 Some(&rest[..end])
606}
607
608fn format_history(messages: &[Message]) -> String {
609 let estimated_len: usize = messages
610 .iter()
611 .map(|m| "[assistant]: ".len() + m.content.len() + 2)
612 .sum();
613 let mut history_text = String::with_capacity(estimated_len);
614 for (i, m) in messages.iter().enumerate() {
615 if i > 0 {
616 history_text.push_str("\n\n");
617 }
618 let role = match m.role {
619 Role::User => "user",
620 Role::Assistant => "assistant",
621 Role::System => "system",
622 };
623 let _ = write!(history_text, "[{role}]: {}", m.content);
624 }
625 history_text
626}
627
628fn guidelines_xml(guidelines: &str) -> String {
629 if guidelines.is_empty() {
630 String::new()
631 } else {
632 format!("\n<compression-guidelines>\n{guidelines}\n</compression-guidelines>\n")
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use zeph_llm::provider::{Message, MessageMetadata, Role};
640
641 fn user_msg(content: &str) -> Message {
642 Message {
643 role: Role::User,
644 content: content.to_string(),
645 parts: vec![],
646 metadata: MessageMetadata::default(),
647 }
648 }
649
650 fn assistant_msg(content: &str) -> Message {
651 Message {
652 role: Role::Assistant,
653 content: content.to_string(),
654 parts: vec![],
655 metadata: MessageMetadata::default(),
656 }
657 }
658
659 #[test]
660 fn build_chunk_prompt_includes_guidelines_section() {
661 let msgs = [user_msg("hello")];
662 let prompt = build_chunk_prompt(&msgs, "be concise");
663 assert!(
664 prompt.contains("<compression-guidelines>"),
665 "prompt must include guidelines XML"
666 );
667 assert!(
668 prompt.contains("be concise"),
669 "prompt must embed the guidelines text"
670 );
671 }
672
673 #[test]
674 fn build_chunk_prompt_no_guidelines_omits_section() {
675 let prompt = build_chunk_prompt(&[], "");
676 assert!(
677 !prompt.contains("<compression-guidelines>"),
678 "empty guidelines must not produce the XML section"
679 );
680 }
681
682 #[test]
683 fn build_anchored_summary_prompt_contains_json_fields() {
684 let prompt = build_anchored_summary_prompt(&[], "");
685 assert!(prompt.contains("session_intent"));
686 assert!(prompt.contains("files_modified"));
687 assert!(prompt.contains("next_steps"));
688 }
689
690 #[test]
691 fn build_metadata_summary_counts_messages() {
692 let msgs = [user_msg("hi"), assistant_msg("hello"), user_msg("bye")];
693 let summary = build_metadata_summary(&msgs, |s, n| s.chars().take(n).collect());
694 assert!(summary.contains("3 (2 user, 1 assistant, 0 system)"));
695 }
696
697 #[test]
698 fn build_tool_pair_summary_prompt_contains_request_and_response() {
699 let req = user_msg("req content");
700 let res = assistant_msg("res content");
701 let prompt = build_tool_pair_summary_prompt(&req, &res);
702 assert!(prompt.contains("req content"));
703 assert!(prompt.contains("res content"));
704 }
705
706 #[test]
707 fn extract_overflow_ref_returns_uuid_when_present() {
708 let uuid = "550e8400-e29b-41d4-a716-446655440000";
709 let body =
710 format!("some output\n[full output stored \u{2014} ID: {uuid} \u{2014} 12345 bytes]");
711 assert_eq!(extract_overflow_ref(&body), Some(uuid));
712 }
713
714 #[test]
715 fn extract_overflow_ref_returns_none_when_absent() {
716 assert_eq!(extract_overflow_ref("normal output"), None);
717 }
718
719 fn tool_result_msg(content: &str) -> Message {
720 use zeph_llm::provider::MessagePart;
721 Message {
722 role: Role::User,
723 content: content.to_string(),
724 parts: vec![
725 MessagePart::ToolUse {
726 id: "t1".into(),
727 name: "bash".into(),
728 input: serde_json::Value::Null,
729 },
730 MessagePart::ToolResult {
731 tool_use_id: "t1".into(),
732 content: content.to_string(),
733 is_error: false,
734 },
735 ],
736 metadata: MessageMetadata::default(),
737 }
738 }
739
740 #[test]
741 fn remove_tool_responses_middle_out_clears_correct_fraction() {
742 let mut messages = vec![
744 tool_result_msg("out0"),
745 tool_result_msg("out1"),
746 tool_result_msg("out2"),
747 tool_result_msg("out3"),
748 ];
749 messages = remove_tool_responses_middle_out(messages, 0.5);
750
751 let compacted_count = messages
752 .iter()
753 .flat_map(|m| m.parts.iter())
754 .filter(|p| {
755 if let zeph_llm::provider::MessagePart::ToolResult { content, .. } = p {
756 content == "[compacted]"
757 } else {
758 false
759 }
760 })
761 .count();
762
763 assert_eq!(
764 compacted_count, 2,
765 "ceil(4 * 0.5) = 2 tool results must be replaced with [compacted]"
766 );
767 }
768
769 #[test]
770 fn remove_tool_responses_middle_out_no_tool_messages_returns_unchanged() {
771 let messages = vec![user_msg("hello"), assistant_msg("hi")];
772 let result = remove_tool_responses_middle_out(messages.clone(), 0.5);
773 assert_eq!(result.len(), messages.len());
774 assert!(
775 result.iter().all(|m| m.parts.is_empty()),
776 "non-tool messages must be unchanged"
777 );
778 }
779}