1use std::sync::Arc;
29
30use async_trait::async_trait;
31use entelix_core::ir::{ContentPart, Message, Role};
32use entelix_core::{ExecutionContext, Result};
33use entelix_runnable::Runnable;
34use entelix_session::{
35 CompactedHistory, Compactor, GraphEvent, Turn, messages_char_size, messages_to_events,
36};
37
38pub struct RunnableCompacting<R> {
55 inner: R,
56 compactor: Arc<dyn Compactor>,
57 threshold_chars: usize,
58}
59
60impl<R> RunnableCompacting<R> {
61 #[must_use]
65 pub const fn threshold_chars(&self) -> usize {
66 self.threshold_chars
67 }
68
69 pub const fn inner(&self) -> &R {
71 &self.inner
72 }
73}
74
75#[async_trait]
76impl<R> Runnable<Vec<Message>, Message> for RunnableCompacting<R>
77where
78 R: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
79{
80 async fn invoke(&self, input: Vec<Message>, ctx: &ExecutionContext) -> Result<Message> {
81 let input = if messages_char_size(&input) >= self.threshold_chars {
82 let dropped_size = messages_char_size(&input);
83 let events = messages_to_events(&input)?;
84 let compacted = self
85 .compactor
86 .compact(&events, self.threshold_chars, ctx)
87 .await?
88 .to_messages();
89 let retained_size = messages_char_size(&compacted);
90 if let Some(handle) = ctx.audit_sink() {
91 handle.as_sink().record_context_compacted(
92 dropped_size.saturating_sub(retained_size),
93 retained_size,
94 );
95 }
96 compacted
97 } else {
98 input
99 };
100 self.inner.invoke(input, ctx).await
101 }
102}
103
104pub trait MessageRunnableCompactionExt: Runnable<Vec<Message>, Message> + Sized {
110 fn with_compaction(
114 self,
115 compactor: Arc<dyn Compactor>,
116 threshold_chars: usize,
117 ) -> RunnableCompacting<Self> {
118 RunnableCompacting {
119 inner: self,
120 compactor,
121 threshold_chars,
122 }
123 }
124}
125
126impl<R> MessageRunnableCompactionExt for R where R: Runnable<Vec<Message>, Message> + Sized {}
127
128pub const DEFAULT_SUMMARY_SYSTEM_PROMPT: &str = "You are a conversation summariser. Distil the conversation below into 100-200 words preserving key facts, decisions, entities, and tool outcomes. Output ONLY the summary text — no preamble, no commentary.";
133
134pub const DEFAULT_SUMMARY_KEEP_RECENT_TURNS: usize = 4;
140
141pub struct SummaryCompactor<M> {
156 model: Arc<M>,
157 system_prompt: String,
158 keep_recent_turns: usize,
159}
160
161impl<M> SummaryCompactor<M> {
162 #[must_use]
164 pub fn new(model: Arc<M>) -> Self {
165 Self {
166 model,
167 system_prompt: DEFAULT_SUMMARY_SYSTEM_PROMPT.to_owned(),
168 keep_recent_turns: DEFAULT_SUMMARY_KEEP_RECENT_TURNS,
169 }
170 }
171
172 #[must_use]
176 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
177 self.system_prompt = prompt.into();
178 self
179 }
180
181 #[must_use]
185 pub const fn with_keep_recent_turns(mut self, n: usize) -> Self {
186 self.keep_recent_turns = n;
187 self
188 }
189}
190
191#[async_trait]
192impl<M> Compactor for SummaryCompactor<M>
193where
194 M: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
195{
196 async fn compact(
197 &self,
198 events: &[GraphEvent],
199 _budget_chars: usize,
200 ctx: &ExecutionContext,
201 ) -> Result<CompactedHistory> {
202 let grouped = CompactedHistory::group(events)?;
203 let total = grouped.len();
204 if total <= self.keep_recent_turns {
205 return Ok(grouped);
206 }
207 let split_at = total - self.keep_recent_turns;
208 let mut all = grouped.turns().to_vec();
209 let recent = all.split_off(split_at);
210 let older = all;
211 if older.is_empty() {
212 return Ok(CompactedHistory::from_turns(recent));
213 }
214 let older_messages = CompactedHistory::from_turns(older).to_messages();
215 let mut prompt = Vec::with_capacity(older_messages.len() + 1);
216 prompt.push(Message::new(
217 Role::System,
218 vec![ContentPart::text(self.system_prompt.clone())],
219 ));
220 prompt.extend(older_messages);
221 let summary_msg = self.model.invoke(prompt, ctx).await?;
222 let summary_text = extract_text(&summary_msg.content);
223 let summary_turn = Turn::User {
224 content: vec![ContentPart::text(format!(
225 "[Summary of earlier conversation]\n{summary_text}"
226 ))],
227 };
228 let mut combined = Vec::with_capacity(1 + recent.len());
229 combined.push(summary_turn);
230 combined.extend(recent);
231 Ok(CompactedHistory::from_turns(combined))
232 }
233}
234
235fn extract_text(parts: &[ContentPart]) -> String {
236 let mut out = String::new();
237 for part in parts {
238 if let ContentPart::Text { text, .. } = part {
239 if !out.is_empty() {
240 out.push('\n');
241 }
242 out.push_str(text);
243 }
244 }
245 out
246}
247
248#[cfg(test)]
249#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
250mod tests {
251 use std::sync::atomic::{AtomicUsize, Ordering};
252
253 use entelix_core::ir::{ContentPart, Message, Role};
254 use entelix_session::HeadDropCompactor;
255 use parking_lot::Mutex;
256
257 use super::*;
258
259 struct EchoModel {
260 invocations: AtomicUsize,
261 last_input_len: AtomicUsize,
262 }
263
264 impl EchoModel {
265 fn new() -> Self {
266 Self {
267 invocations: AtomicUsize::new(0),
268 last_input_len: AtomicUsize::new(0),
269 }
270 }
271 }
272
273 #[async_trait]
274 impl Runnable<Vec<Message>, Message> for EchoModel {
275 async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
276 self.invocations.fetch_add(1, Ordering::SeqCst);
277 self.last_input_len.store(input.len(), Ordering::SeqCst);
278 Ok(Message::new(Role::Assistant, vec![ContentPart::text("ok")]))
279 }
280 }
281
282 fn user(text: &str) -> Message {
283 Message::new(Role::User, vec![ContentPart::text(text)])
284 }
285
286 fn assistant(text: &str) -> Message {
287 Message::new(Role::Assistant, vec![ContentPart::text(text)])
288 }
289
290 #[tokio::test]
291 async fn passes_through_below_threshold() {
292 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
293 let wrapped = EchoModel::new().with_compaction(compactor, 1024);
294
295 let input = vec![user("short"), assistant("ok")];
296 let _ = wrapped
297 .invoke(input.clone(), &ExecutionContext::new())
298 .await
299 .unwrap();
300 assert_eq!(
301 wrapped.inner().last_input_len.load(Ordering::SeqCst),
302 input.len()
303 );
304 }
305
306 #[tokio::test]
307 async fn compacts_when_threshold_exceeded() {
308 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
309 let model = EchoModel::new();
310 let wrapped = model.with_compaction(compactor, 30);
314
315 let input = vec![
316 user("one one one one"),
317 assistant("first reply long enough"),
318 user("two two two two"),
319 assistant("second reply long enough"),
320 user("three three three three"),
321 assistant("third reply"),
322 ];
323 let _ = wrapped
324 .invoke(input.clone(), &ExecutionContext::new())
325 .await
326 .unwrap();
327 let observed_len = wrapped.inner().last_input_len.load(Ordering::SeqCst);
328 assert!(
329 observed_len < input.len(),
330 "compaction must trim — got {observed_len}, input had {}",
331 input.len()
332 );
333 }
334
335 struct CapturingAuditSink {
339 compactions: Mutex<Vec<(usize, usize)>>,
340 }
341
342 impl CapturingAuditSink {
343 fn new() -> Self {
344 Self {
345 compactions: Mutex::new(Vec::new()),
346 }
347 }
348 }
349
350 impl entelix_core::AuditSink for CapturingAuditSink {
351 fn record_sub_agent_invoked(&self, _agent_id: &str, _sub_thread_id: &str) {}
352 fn record_agent_handoff(&self, _from: Option<&str>, _to: &str) {}
353 fn record_interrupted(
354 &self,
355 _kind: &entelix_core::InterruptionKind,
356 _payload: &serde_json::Value,
357 ) {
358 }
359 fn record_resumed(&self, _from_checkpoint: &str) {}
360 fn record_memory_recall(&self, _tier: &str, _namespace_key: &str, _hits: usize) {}
361 fn record_usage_limit_exceeded(&self, _breach: &entelix_core::UsageLimitBreach) {}
362 fn record_context_compacted(&self, dropped_chars: usize, retained_chars: usize) {
363 self.compactions
364 .lock()
365 .push((dropped_chars, retained_chars));
366 }
367 fn record_tool_error_terminal(&self, _kind: entelix_core::ToolErrorKind, _tool_name: &str) {
368 }
369 }
370
371 #[tokio::test]
372 async fn compaction_records_audit_event_when_threshold_exceeded() {
373 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
374 let model = EchoModel::new();
375 let wrapped = model.with_compaction(compactor, 30);
376 let sink = Arc::new(CapturingAuditSink::new());
377 let ctx = ExecutionContext::new()
378 .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
379
380 let input = vec![
381 user("padding to force compaction one one one one"),
382 assistant("more padding to force compaction"),
383 user("trailing turn"),
384 assistant("ok"),
385 ];
386 let _ = wrapped.invoke(input, &ctx).await.unwrap();
387
388 let captured = sink.compactions.lock().clone();
389 assert_eq!(captured.len(), 1, "exactly one compaction event expected");
390 let (dropped, _retained) = captured[0];
391 assert!(dropped > 0, "must report some dropped characters");
392 }
393
394 #[tokio::test]
395 async fn compaction_records_no_audit_event_below_threshold() {
396 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
397 let model = EchoModel::new();
398 let wrapped = model.with_compaction(compactor, 1024);
399 let sink = Arc::new(CapturingAuditSink::new());
400 let ctx = ExecutionContext::new()
401 .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
402
403 let input = vec![user("short"), assistant("ok")];
404 let _ = wrapped.invoke(input, &ctx).await.unwrap();
405
406 assert!(
407 sink.compactions.lock().is_empty(),
408 "no audit event expected when threshold is not crossed"
409 );
410 }
411
412 #[tokio::test]
413 async fn empty_messages_pass_through() {
414 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
415 let model = EchoModel::new();
416 let wrapped = model.with_compaction(compactor, 1024);
417 let _ = wrapped
418 .invoke(Vec::new(), &ExecutionContext::new())
419 .await
420 .unwrap();
421 assert_eq!(wrapped.inner().last_input_len.load(Ordering::SeqCst), 0);
422 }
423
424 struct StubSummariser {
429 captured_prompt: Mutex<Vec<Message>>,
430 reply: String,
431 }
432
433 impl StubSummariser {
434 fn new(reply: impl Into<String>) -> Self {
435 Self {
436 captured_prompt: Mutex::new(Vec::new()),
437 reply: reply.into(),
438 }
439 }
440 }
441
442 #[async_trait]
443 impl Runnable<Vec<Message>, Message> for StubSummariser {
444 async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
445 *self.captured_prompt.lock() = input;
446 Ok(Message::new(
447 Role::Assistant,
448 vec![ContentPart::text(self.reply.clone())],
449 ))
450 }
451 }
452
453 fn user_event(text: &str) -> entelix_session::GraphEvent {
454 entelix_session::GraphEvent::UserMessage {
455 content: vec![ContentPart::text(text)],
456 timestamp: chrono::Utc::now(),
457 }
458 }
459
460 fn assistant_event(text: &str) -> entelix_session::GraphEvent {
461 entelix_session::GraphEvent::AssistantMessage {
462 content: vec![ContentPart::text(text)],
463 usage: None,
464 timestamp: chrono::Utc::now(),
465 }
466 }
467
468 #[tokio::test]
469 async fn summary_compactor_skips_when_under_keep_recent_threshold() {
470 let summariser = Arc::new(StubSummariser::new("never invoked"));
471 let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(8);
472 let events = vec![
473 user_event("u1"),
474 assistant_event("a1"),
475 user_event("u2"),
476 assistant_event("a2"),
477 ];
478 let history = compactor
479 .compact(&events, 0, &ExecutionContext::new())
480 .await
481 .unwrap();
482 assert_eq!(history.len(), 4);
483 assert!(
484 summariser.captured_prompt.lock().is_empty(),
485 "summariser must NOT be invoked when total <= keep_recent_turns"
486 );
487 }
488
489 #[tokio::test]
490 async fn summary_compactor_replaces_older_turns_with_summary() {
491 let summariser = Arc::new(StubSummariser::new("brief recap"));
492 let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(2);
493 let events = vec![
494 user_event("oldest user"),
495 assistant_event("oldest assistant"),
496 user_event("middle user"),
497 assistant_event("middle assistant"),
498 user_event("newest user"),
499 assistant_event("newest assistant"),
500 ];
501 let history = compactor
502 .compact(&events, 0, &ExecutionContext::new())
503 .await
504 .unwrap();
505 assert_eq!(history.len(), 3);
507 if let Turn::User { content } = &history.turns()[0] {
509 if let ContentPart::Text { text, .. } = &content[0] {
510 assert!(text.contains("Summary"), "summary marker missing: {text}");
511 assert!(
512 text.contains("brief recap"),
513 "summariser reply missing: {text}"
514 );
515 }
516 } else {
517 panic!("expected User turn at head");
518 }
519 let captured_len;
521 let captured_role;
522 {
523 let captured = summariser.captured_prompt.lock();
524 captured_len = captured.len();
525 captured_role = captured[0].role;
526 }
527 assert!(
528 captured_len >= 5,
529 "expected system + ≥4 older messages, got {captured_len}"
530 );
531 assert!(matches!(captured_role, Role::System));
532 }
533
534 #[tokio::test]
535 async fn summary_compactor_with_system_prompt_overrides_default() {
536 let summariser = Arc::new(StubSummariser::new("ok"));
537 let compactor = SummaryCompactor::new(summariser.clone())
538 .with_keep_recent_turns(0)
539 .with_system_prompt("CUSTOM PROMPT MARKER");
540 let events = vec![user_event("hi"), assistant_event("hello")];
541 let _ = compactor
542 .compact(&events, 0, &ExecutionContext::new())
543 .await
544 .unwrap();
545 let prompt_text = {
546 let captured = summariser.captured_prompt.lock();
547 if let ContentPart::Text { text, .. } = &captured[0].content[0] {
548 text.clone()
549 } else {
550 panic!("expected Text part at system position");
551 }
552 };
553 assert!(
554 prompt_text.contains("CUSTOM PROMPT MARKER"),
555 "operator-supplied prompt must reach the summariser, got: {prompt_text}"
556 );
557 }
558}