1use std::fmt::Write as _;
14use std::sync::Arc;
15use std::time::Duration;
16
17use futures::StreamExt as _;
18use tracing::Instrument as _;
19use zeph_common::OVERFLOW_NOTICE_PREFIX;
20use zeph_common::memory::AnchoredSummary;
21use zeph_llm::LlmProvider as _;
22use zeph_llm::any::AnyProvider;
23use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
24
25pub trait MessageTokenCounter: Send + Sync {
30 fn count_message_tokens(&self, msg: &Message) -> usize;
32}
33
34#[derive(Clone)]
39pub struct SummarizationDeps {
40 pub provider: AnyProvider,
42 pub llm_timeout: Duration,
44 pub token_counter: Arc<dyn MessageTokenCounter>,
46 pub structured_summaries: bool,
48 #[allow(clippy::type_complexity)]
53 pub on_anchored_summary: Option<Arc<dyn Fn(&AnchoredSummary, bool) + Send + Sync>>,
54}
55
56pub async fn summarize_structured(
65 deps: &SummarizationDeps,
66 messages: &[Message],
67 guidelines: &str,
68) -> Result<AnchoredSummary, zeph_llm::LlmError> {
69 async move {
70 let prompt = build_anchored_summary_prompt(messages, guidelines);
71 let msgs = [Message {
72 role: Role::User,
73 content: prompt,
74 parts: vec![],
75 metadata: MessageMetadata::default(),
76 }];
77 let summary: AnchoredSummary = tokio::time::timeout(
78 deps.llm_timeout,
79 deps.provider.chat_typed_erased::<AnchoredSummary>(&msgs),
80 )
81 .await
82 .map_err(|_| zeph_llm::LlmError::Timeout)??;
83
84 if !summary.files_modified.is_empty() && summary.decisions_made.is_empty() {
85 tracing::warn!("structured summary: decisions_made is empty");
86 } else if summary.files_modified.is_empty() {
87 tracing::warn!(
88 "structured summary: files_modified is empty (may be a pure discussion session)"
89 );
90 }
91
92 if !summary.is_complete() {
93 tracing::warn!(
94 session_intent_empty = summary.session_intent.trim().is_empty(),
95 next_steps_empty = summary.next_steps.is_empty(),
96 "structured summary incomplete: mandatory fields missing, falling back to prose"
97 );
98 return Err(zeph_llm::LlmError::StructuredParse(
99 "structured summary missing mandatory fields".into(),
100 ));
101 }
102
103 if let Err(msg) = summary.validate() {
104 tracing::warn!(
105 error = %msg,
106 "structured summary failed field validation, falling back to prose"
107 );
108 return Err(zeph_llm::LlmError::StructuredParse(msg));
109 }
110
111 Ok(summary)
112 }
113 .instrument(tracing::info_span!(
114 "context.summarization.structured",
115 message_count = messages.len(),
116 ))
117 .await
118}
119
120pub async fn single_pass_summary(
125 deps: &SummarizationDeps,
126 messages: &[Message],
127 guidelines: &str,
128) -> Result<String, zeph_llm::LlmError> {
129 async move {
130 let prompt = build_chunk_prompt(messages, guidelines);
131 let msgs = [Message {
132 role: Role::User,
133 content: prompt,
134 parts: vec![],
135 metadata: MessageMetadata::default(),
136 }];
137 tokio::time::timeout(deps.llm_timeout, deps.provider.chat(&msgs))
138 .await
139 .map_err(|_| zeph_llm::LlmError::Timeout)?
140 }
141 .instrument(tracing::info_span!(
142 "context.summarization.single_pass",
143 message_count = messages.len(),
144 ))
145 .await
146}
147
148pub async fn summarize_with_llm(
157 deps: &SummarizationDeps,
158 messages: &[Message],
159 guidelines: &str,
160) -> Result<String, zeph_llm::LlmError> {
161 async move {
162 const CHUNK_TOKEN_BUDGET: usize = 4096;
163 const OVERSIZED_THRESHOLD: usize = CHUNK_TOKEN_BUDGET / 2;
164
165 let tc = Arc::clone(&deps.token_counter);
166 let chunks = crate::slot::chunk_messages(
167 messages,
168 CHUNK_TOKEN_BUDGET,
169 OVERSIZED_THRESHOLD,
170 move |msg| tc.count_message_tokens(msg),
171 );
172
173 if chunks.len() <= 1 {
174 return single_pass_summary(deps, messages, guidelines).await;
175 }
176
177 let partial_summaries = run_chunk_summaries(deps, chunks, guidelines).await;
178
179 if partial_summaries.is_empty() {
180 return single_pass_summary(deps, messages, guidelines).await;
181 }
182
183 let numbered = join_partial_summaries(&partial_summaries);
184
185 if deps.structured_summaries
186 && let Some(result) = try_structured_consolidation(deps, &numbered).await
187 {
188 return Ok(result);
189 }
190
191 prose_consolidation(deps, &numbered).await
192 }
193 .instrument(tracing::info_span!(
194 "context.summarization.with_llm",
195 message_count = messages.len(),
196 ))
197 .await
198}
199
200async fn run_chunk_summaries(
203 deps: &SummarizationDeps,
204 chunks: Vec<Vec<Message>>,
205 guidelines: &str,
206) -> Vec<String> {
207 let chunk_count = chunks.len();
208 async move {
209 let provider = deps.provider.clone();
210 let guidelines_arc: Arc<str> = Arc::from(guidelines);
211 let timeout = deps.llm_timeout;
212
213 let results: Vec<_> = futures::stream::iter(chunks.into_iter().map(|chunk| {
214 let guidelines_ref = Arc::clone(&guidelines_arc);
215 let prompt = build_chunk_prompt(&chunk, &guidelines_ref);
216 let p = provider.clone();
217 async move {
218 tokio::time::timeout(
219 timeout,
220 p.chat(&[Message {
221 role: Role::User,
222 content: prompt,
223 parts: vec![],
224 metadata: MessageMetadata::default(),
225 }]),
226 )
227 .await
228 .map_err(|_| zeph_llm::LlmError::Timeout)?
229 }
230 }))
231 .buffer_unordered(4)
232 .collect()
233 .await;
234
235 results
236 .into_iter()
237 .collect::<Result<Vec<_>, zeph_llm::LlmError>>()
238 .unwrap_or_else(|e| {
239 tracing::warn!(
240 "chunked compaction: one or more chunks failed: {e:#}, falling back to single-pass"
241 );
242 Vec::new()
243 })
244 }
245 .instrument(tracing::info_span!(
246 "context.summarization.chunk_summaries",
247 chunk_count,
248 ))
249 .await
250}
251
252fn join_partial_summaries(partials: &[String]) -> String {
253 let cap: usize = partials.iter().map(|s| s.len() + 8).sum();
254 let mut buf = String::with_capacity(cap);
255 for (i, s) in partials.iter().enumerate() {
256 if i > 0 {
257 buf.push_str("\n\n");
258 }
259 let _ = write!(buf, "{}. {s}", i + 1);
260 }
261 buf
262}
263
264async fn try_structured_consolidation(deps: &SummarizationDeps, numbered: &str) -> Option<String> {
265 async move {
266 let timeout = deps.llm_timeout;
267 let anchored_prompt = format!(
268 "<analysis>\n\
269 Merge these partial conversation summaries into a single structured summary.\n\
270 </analysis>\n\
271 \n\
272 Produce a JSON object with exactly these 5 fields:\n\
273 - session_intent: string — what the user is trying to accomplish\n\
274 - files_modified: string[] — file paths, function names, structs touched\n\
275 - decisions_made: string[] — each entry: \"Decision: X — Reason: Y\"\n\
276 - open_questions: string[] — unresolved questions or blockers\n\
277 - next_steps: string[] — concrete next actions\n\
278 \n\
279 Partial summaries:\n{numbered}"
280 );
281 let anchored_msgs = [Message {
282 role: Role::User,
283 content: anchored_prompt,
284 parts: vec![],
285 metadata: MessageMetadata::default(),
286 }];
287 match tokio::time::timeout(
288 timeout,
289 deps.provider
290 .chat_typed_erased::<AnchoredSummary>(&anchored_msgs),
291 )
292 .await
293 {
294 Ok(Ok(anchored)) if anchored.is_complete() => {
295 if let Some(ref cb) = deps.on_anchored_summary {
296 cb(&anchored, false);
297 }
298 Some(crate::slot::cap_summary(anchored.to_markdown(), 16_000))
299 }
300 Ok(Ok(anchored)) => {
301 tracing::warn!(
302 "chunked consolidation: structured summary incomplete, falling back to prose"
303 );
304 if let Some(ref cb) = deps.on_anchored_summary {
305 cb(&anchored, true);
306 }
307 None
308 }
309 Ok(Err(e)) => {
310 tracing::warn!(error = %e, "chunked consolidation: structured output failed, falling back to prose");
311 None
312 }
313 Err(_) => {
314 tracing::warn!(
315 "chunked consolidation: structured output timed out, falling back to prose"
316 );
317 None
318 }
319 }
320 }
321 .instrument(tracing::info_span!("context.summarization.structured_consolidation"))
322 .await
323}
324
325async fn prose_consolidation(
326 deps: &SummarizationDeps,
327 numbered: &str,
328) -> Result<String, zeph_llm::LlmError> {
329 async move {
330 let timeout = deps.llm_timeout;
331 let consolidation_prompt = format!(
332 "<analysis>\n\
333 Merge these partial conversation summaries into a single structured compaction note.\n\
334 Produce exactly these 9 sections covering all partial summaries:\n\
335 1. User Intent\n2. Technical Concepts\n3. Files & Code\n4. Errors & Fixes\n\
336 5. Problem Solving\n6. User Messages\n7. Pending Tasks\n8. Current Work\n9. Next Step\n\
337 </analysis>\n\n\
338 Partial summaries:\n{numbered}"
339 );
340 let consolidation_msgs = [Message {
341 role: Role::User,
342 content: consolidation_prompt,
343 parts: vec![],
344 metadata: MessageMetadata::default(),
345 }];
346 tokio::time::timeout(timeout, deps.provider.chat(&consolidation_msgs))
347 .await
348 .map_err(|_| zeph_llm::LlmError::Timeout)?
349 }
350 .instrument(tracing::info_span!("context.summarization.prose_consolidation"))
351 .await
352}
353
354#[must_use]
367pub fn build_chunk_prompt(messages: &[Message], guidelines: &str) -> String {
368 let history_text = format_history(messages);
369
370 let guidelines_section = guidelines_xml(guidelines);
371
372 format!(
373 "<analysis>\n\
374 Analyze this conversation and produce a structured compaction note for self-consumption.\n\
375 This note replaces the original messages in your context window — be thorough.\n\
376 Longer is better if it preserves actionable detail.\n\
377 </analysis>\n\
378 {guidelines_section}\n\
379 Produce exactly these 9 sections:\n\
380 1. User Intent — what the user is ultimately trying to accomplish\n\
381 2. Technical Concepts — key technologies, patterns, constraints discussed\n\
382 3. Files & Code — file paths, function names, structs, enums touched or relevant\n\
383 4. Errors & Fixes — every error encountered and whether/how it was resolved\n\
384 5. Problem Solving — approaches tried, decisions made, alternatives rejected\n\
385 6. User Messages — verbatim user requests that are still pending or relevant\n\
386 7. Pending Tasks — items explicitly promised or left TODO\n\
387 8. Current Work — the exact task in progress at the moment of compaction\n\
388 9. Next Step — the single most important action to take immediately after compaction\n\
389 \n\
390 Conversation:\n{history_text}"
391 )
392}
393
394#[must_use]
407pub fn build_anchored_summary_prompt(messages: &[Message], guidelines: &str) -> String {
408 let history_text = format_history(messages);
409 let guidelines_section = guidelines_xml(guidelines);
410
411 format!(
412 "<analysis>\n\
413 You are compacting a conversation into a structured summary for self-consumption.\n\
414 This summary replaces the original messages in your context window.\n\
415 Every field MUST be populated — empty fields mean lost information.\n\
416 </analysis>\n\
417 {guidelines_section}\n\
418 Produce a JSON object with exactly these 5 fields:\n\
419 - session_intent: string — what the user is trying to accomplish\n\
420 - files_modified: string[] — file paths, function names, structs touched\n\
421 - decisions_made: string[] — each entry: \"Decision: X — Reason: Y\"\n\
422 - open_questions: string[] — unresolved questions or blockers\n\
423 - next_steps: string[] — concrete next actions\n\
424 \n\
425 Be thorough. Preserve all file paths, line numbers, error messages, \
426 and specific identifiers — they cannot be recovered.\n\
427 \n\
428 Conversation:\n{history_text}"
429 )
430}
431
432#[must_use]
437pub fn build_metadata_summary(messages: &[Message], truncate: fn(&str, usize) -> String) -> String {
438 let mut user_count = 0usize;
439 let mut assistant_count = 0usize;
440 let mut system_count = 0usize;
441 let mut last_user = String::new();
442 let mut last_assistant = String::new();
443
444 for m in messages {
445 match m.role {
446 Role::User => {
447 user_count += 1;
448 if !m.content.is_empty() {
449 last_user.clone_from(&m.content);
450 }
451 }
452 Role::Assistant => {
453 assistant_count += 1;
454 if !m.content.is_empty() {
455 last_assistant.clone_from(&m.content);
456 }
457 }
458 Role::System => system_count += 1,
459 _ => {}
460 }
461 }
462
463 let last_user_preview = truncate(&last_user, 200);
464 let last_assistant_preview = truncate(&last_assistant, 200);
465
466 format!(
467 "[metadata summary — LLM compaction unavailable]\n\
468 Messages compacted: {} ({} user, {} assistant, {} system)\n\
469 Last user message: {last_user_preview}\n\
470 Last assistant message: {last_assistant_preview}",
471 messages.len(),
472 user_count,
473 assistant_count,
474 system_count,
475 )
476}
477
478#[must_use]
482pub fn build_tool_pair_summary_prompt(req: &Message, res: &Message) -> String {
483 format!(
484 "Produce a concise but technically precise summary of this tool invocation.\n\
485 Preserve all facts that would be needed to continue work without re-running the tool:\n\
486 - Tool name and key input parameters (file paths, function names, patterns, line ranges)\n\
487 - Exact findings: line numbers, struct/enum/function names, error messages, numeric values\n\
488 - Outcome: what was found, changed, created, or confirmed\n\
489 Do NOT omit specific identifiers, paths, or numbers — they cannot be recovered later.\n\
490 Use 2-4 sentences maximum.\n\n\
491 <tool_request>\n{}\n</tool_request>\n\n<tool_response>\n{}\n</tool_response>",
492 req.content, res.content
493 )
494}
495
496#[allow(
537 clippy::cast_precision_loss,
538 clippy::cast_possible_truncation,
539 clippy::cast_sign_loss,
540 clippy::cast_possible_wrap
541)]
542#[must_use]
543pub fn remove_tool_responses_middle_out(mut messages: Vec<Message>, fraction: f32) -> Vec<Message> {
544 let tool_indices: Vec<usize> = messages
545 .iter()
546 .enumerate()
547 .filter(|(_, m)| {
548 m.parts.iter().any(|p| {
549 matches!(
550 p,
551 MessagePart::ToolResult { .. } | MessagePart::ToolOutput { .. }
552 )
553 })
554 })
555 .map(|(i, _)| i)
556 .collect();
557
558 if tool_indices.is_empty() {
559 return messages;
560 }
561
562 let n = tool_indices.len();
563 let to_remove = ((n as f32 * fraction).ceil() as usize).min(n);
564
565 let center = n / 2;
566 let mut remove_set: Vec<usize> = Vec::with_capacity(to_remove);
567 let mut left = center as isize - 1;
568 let mut right = center;
569 let mut count = 0;
570
571 while count < to_remove {
572 if right < n {
573 remove_set.push(tool_indices[right]);
574 count += 1;
575 right += 1;
576 }
577 if count < to_remove && left >= 0 {
578 let idx = left as usize;
579 if !remove_set.contains(&tool_indices[idx]) {
580 remove_set.push(tool_indices[idx]);
581 count += 1;
582 }
583 }
584 left -= 1;
585 if left < 0 && right >= n {
586 break;
587 }
588 }
589
590 for &msg_idx in &remove_set {
591 let msg = &mut messages[msg_idx];
592 for part in &mut msg.parts {
593 match part {
594 MessagePart::ToolResult { content, .. } => {
595 let ref_notice = extract_overflow_ref(content).map_or_else(
596 || String::from("[compacted]"),
597 |uuid| {
598 format!("[tool output pruned; use read_overflow {uuid} to retrieve]")
599 },
600 );
601 *content = ref_notice;
602 }
603 MessagePart::ToolOutput {
604 body, compacted_at, ..
605 } if compacted_at.is_none() => {
606 let ref_notice = extract_overflow_ref(body)
607 .map(|uuid| {
608 format!("[tool output pruned; use read_overflow {uuid} to retrieve]")
609 })
610 .unwrap_or_default();
611 *body = ref_notice;
612 *compacted_at = Some(
613 std::time::SystemTime::now()
614 .duration_since(std::time::UNIX_EPOCH)
615 .unwrap_or_default()
616 .as_secs()
617 .cast_signed(),
618 );
619 }
620 _ => {}
621 }
622 }
623 msg.rebuild_content();
624 }
625 messages
626}
627
628#[must_use]
635pub fn extract_overflow_ref(body: &str) -> Option<&str> {
636 let start = body.find(OVERFLOW_NOTICE_PREFIX)?;
637 let rest = &body[start + OVERFLOW_NOTICE_PREFIX.len()..];
638 let end = rest.find(" \u{2014} ")?;
639 Some(&rest[..end])
640}
641
642fn format_history(messages: &[Message]) -> String {
643 let estimated_len: usize = messages
644 .iter()
645 .map(|m| "[assistant]: ".len() + m.content.len() + 2)
646 .sum();
647 let mut history_text = String::with_capacity(estimated_len);
648 for (i, m) in messages.iter().enumerate() {
649 if i > 0 {
650 history_text.push_str("\n\n");
651 }
652 let role = match m.role {
653 Role::Assistant => "assistant",
654 Role::System => "system",
655 Role::User | _ => "user",
656 };
657 let _ = write!(history_text, "[{role}]: {}", m.content);
658 }
659 history_text
660}
661
662fn guidelines_xml(guidelines: &str) -> String {
663 if guidelines.is_empty() {
664 String::new()
665 } else {
666 format!("\n<compression-guidelines>\n{guidelines}\n</compression-guidelines>\n")
667 }
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673 use zeph_llm::provider::{Message, MessageMetadata, Role};
674
675 fn user_msg(content: &str) -> Message {
676 Message {
677 role: Role::User,
678 content: content.to_string(),
679 parts: vec![],
680 metadata: MessageMetadata::default(),
681 }
682 }
683
684 fn assistant_msg(content: &str) -> Message {
685 Message {
686 role: Role::Assistant,
687 content: content.to_string(),
688 parts: vec![],
689 metadata: MessageMetadata::default(),
690 }
691 }
692
693 #[test]
694 fn build_chunk_prompt_includes_guidelines_section() {
695 let msgs = [user_msg("hello")];
696 let prompt = build_chunk_prompt(&msgs, "be concise");
697 assert!(
698 prompt.contains("<compression-guidelines>"),
699 "prompt must include guidelines XML"
700 );
701 assert!(
702 prompt.contains("be concise"),
703 "prompt must embed the guidelines text"
704 );
705 }
706
707 #[test]
708 fn build_chunk_prompt_no_guidelines_omits_section() {
709 let prompt = build_chunk_prompt(&[], "");
710 assert!(
711 !prompt.contains("<compression-guidelines>"),
712 "empty guidelines must not produce the XML section"
713 );
714 }
715
716 #[test]
717 fn build_anchored_summary_prompt_contains_json_fields() {
718 let prompt = build_anchored_summary_prompt(&[], "");
719 assert!(prompt.contains("session_intent"));
720 assert!(prompt.contains("files_modified"));
721 assert!(prompt.contains("next_steps"));
722 }
723
724 #[test]
725 fn build_metadata_summary_counts_messages() {
726 let msgs = [user_msg("hi"), assistant_msg("hello"), user_msg("bye")];
727 let summary = build_metadata_summary(&msgs, |s, n| s.chars().take(n).collect());
728 assert!(summary.contains("3 (2 user, 1 assistant, 0 system)"));
729 }
730
731 #[test]
732 fn build_tool_pair_summary_prompt_contains_request_and_response() {
733 let req = user_msg("req content");
734 let res = assistant_msg("res content");
735 let prompt = build_tool_pair_summary_prompt(&req, &res);
736 assert!(prompt.contains("req content"));
737 assert!(prompt.contains("res content"));
738 }
739
740 #[test]
741 fn extract_overflow_ref_returns_uuid_when_present() {
742 let uuid = "550e8400-e29b-41d4-a716-446655440000";
743 let body =
744 format!("some output\n[full output stored \u{2014} ID: {uuid} \u{2014} 12345 bytes]");
745 assert_eq!(extract_overflow_ref(&body), Some(uuid));
746 }
747
748 #[test]
749 fn extract_overflow_ref_returns_none_when_absent() {
750 assert_eq!(extract_overflow_ref("normal output"), None);
751 }
752
753 fn tool_result_msg(content: &str) -> Message {
754 use zeph_llm::provider::MessagePart;
755 Message {
756 role: Role::User,
757 content: content.to_string(),
758 parts: vec![
759 MessagePart::ToolUse {
760 id: "t1".into(),
761 name: "bash".into(),
762 input: serde_json::Value::Null,
763 },
764 MessagePart::ToolResult {
765 tool_use_id: "t1".into(),
766 content: content.to_string(),
767 is_error: false,
768 },
769 ],
770 metadata: MessageMetadata::default(),
771 }
772 }
773
774 #[test]
775 fn remove_tool_responses_middle_out_clears_correct_fraction() {
776 let mut messages = vec![
778 tool_result_msg("out0"),
779 tool_result_msg("out1"),
780 tool_result_msg("out2"),
781 tool_result_msg("out3"),
782 ];
783 messages = remove_tool_responses_middle_out(messages, 0.5);
784
785 let compacted_count = messages
786 .iter()
787 .flat_map(|m| m.parts.iter())
788 .filter(|p| {
789 if let zeph_llm::provider::MessagePart::ToolResult { content, .. } = p {
790 content == "[compacted]"
791 } else {
792 false
793 }
794 })
795 .count();
796
797 assert_eq!(
798 compacted_count, 2,
799 "ceil(4 * 0.5) = 2 tool results must be replaced with [compacted]"
800 );
801 }
802
803 #[test]
804 fn remove_tool_responses_middle_out_no_tool_messages_returns_unchanged() {
805 let messages = vec![user_msg("hello"), assistant_msg("hi")];
806 let result = remove_tool_responses_middle_out(messages.clone(), 0.5);
807 assert_eq!(result.len(), messages.len());
808 assert!(
809 result.iter().all(|m| m.parts.is_empty()),
810 "non-tool messages must be unchanged"
811 );
812 }
813}