1use std::sync::Arc;
29
30use async_trait::async_trait;
31use entelix_core::ir::{ContentPart, Message, Role};
32use entelix_core::{ExecutionContext, Result};
33use entelix_runnable::Runnable;
34use entelix_session::{
35 CompactedHistory, Compactor, GraphEvent, Turn, messages_char_size, messages_to_events,
36};
37
38pub struct RunnableCompacting<R> {
55 inner: R,
56 compactor: Arc<dyn Compactor>,
57 threshold_chars: usize,
58}
59
60impl<R> RunnableCompacting<R> {
61 #[must_use]
65 pub const fn threshold_chars(&self) -> usize {
66 self.threshold_chars
67 }
68
69 pub const fn inner(&self) -> &R {
71 &self.inner
72 }
73}
74
75#[async_trait]
76impl<R> Runnable<Vec<Message>, Message> for RunnableCompacting<R>
77where
78 R: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
79{
80 async fn invoke(&self, input: Vec<Message>, ctx: &ExecutionContext) -> Result<Message> {
81 let input = if messages_char_size(&input) >= self.threshold_chars {
82 let dropped_size = messages_char_size(&input);
83 let events = messages_to_events(&input)?;
84 let compacted = self
85 .compactor
86 .compact(&events, self.threshold_chars, ctx)
87 .await?
88 .to_messages();
89 let retained_size = messages_char_size(&compacted);
90 if let Some(handle) = ctx.audit_sink() {
91 handle.as_sink().record_context_compacted(
92 dropped_size.saturating_sub(retained_size),
93 retained_size,
94 );
95 }
96 compacted
97 } else {
98 input
99 };
100 self.inner.invoke(input, ctx).await
101 }
102}
103
104pub trait MessageRunnableCompactionExt: Runnable<Vec<Message>, Message> + Sized {
110 fn with_compaction(
114 self,
115 compactor: Arc<dyn Compactor>,
116 threshold_chars: usize,
117 ) -> RunnableCompacting<Self> {
118 RunnableCompacting {
119 inner: self,
120 compactor,
121 threshold_chars,
122 }
123 }
124}
125
126impl<R> MessageRunnableCompactionExt for R where R: Runnable<Vec<Message>, Message> + Sized {}
127
128pub const DEFAULT_SUMMARY_SYSTEM_PROMPT: &str = "You are a conversation summariser. Distil the conversation below into 100-200 words preserving key facts, decisions, entities, and tool outcomes. Output ONLY the summary text — no preamble, no commentary.";
133
134pub const DEFAULT_SUMMARY_KEEP_RECENT_TURNS: usize = 4;
140
141pub struct SummaryCompactor<M> {
156 model: Arc<M>,
157 system_prompt: String,
158 keep_recent_turns: usize,
159}
160
161impl<M> SummaryCompactor<M> {
162 #[must_use]
164 pub fn new(model: Arc<M>) -> Self {
165 Self {
166 model,
167 system_prompt: DEFAULT_SUMMARY_SYSTEM_PROMPT.to_owned(),
168 keep_recent_turns: DEFAULT_SUMMARY_KEEP_RECENT_TURNS,
169 }
170 }
171
172 #[must_use]
176 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
177 self.system_prompt = prompt.into();
178 self
179 }
180
181 #[must_use]
185 pub const fn with_keep_recent_turns(mut self, n: usize) -> Self {
186 self.keep_recent_turns = n;
187 self
188 }
189}
190
191#[async_trait]
192impl<M> Compactor for SummaryCompactor<M>
193where
194 M: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
195{
196 async fn compact(
197 &self,
198 events: &[GraphEvent],
199 _budget_chars: usize,
200 ctx: &ExecutionContext,
201 ) -> Result<CompactedHistory> {
202 let grouped = CompactedHistory::group(events)?;
203 let total = grouped.len();
204 if total <= self.keep_recent_turns {
205 return Ok(grouped);
206 }
207 let split_at = total - self.keep_recent_turns;
208 let mut all = grouped.turns().to_vec();
209 let recent = all.split_off(split_at);
210 let older = all;
211 if older.is_empty() {
212 return Ok(CompactedHistory::from_turns(recent));
213 }
214 let older_messages = CompactedHistory::from_turns(older).to_messages();
215 let mut prompt = Vec::with_capacity(older_messages.len() + 1);
216 prompt.push(Message::new(
217 Role::System,
218 vec![ContentPart::text(self.system_prompt.clone())],
219 ));
220 prompt.extend(older_messages);
221 let summary_msg = self.model.invoke(prompt, ctx).await?;
222 let summary_text = extract_text(&summary_msg.content);
223 let summary_turn = Turn::User {
224 content: vec![ContentPart::text(format!(
225 "[Summary of earlier conversation]\n{summary_text}"
226 ))],
227 };
228 let mut combined = Vec::with_capacity(1 + recent.len());
229 combined.push(summary_turn);
230 combined.extend(recent);
231 Ok(CompactedHistory::from_turns(combined))
232 }
233}
234
235fn extract_text(parts: &[ContentPart]) -> String {
236 let mut out = String::new();
237 for part in parts {
238 if let ContentPart::Text { text, .. } = part {
239 if !out.is_empty() {
240 out.push('\n');
241 }
242 out.push_str(text);
243 }
244 }
245 out
246}
247
248#[cfg(test)]
249#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
250mod tests {
251 use std::sync::atomic::{AtomicUsize, Ordering};
252
253 use entelix_core::ir::{ContentPart, Message, Role};
254 use entelix_session::HeadDropCompactor;
255 use parking_lot::Mutex;
256
257 use super::*;
258
259 struct EchoModel {
260 invocations: AtomicUsize,
261 last_input_len: AtomicUsize,
262 }
263
264 impl EchoModel {
265 fn new() -> Self {
266 Self {
267 invocations: AtomicUsize::new(0),
268 last_input_len: AtomicUsize::new(0),
269 }
270 }
271 }
272
273 #[async_trait]
274 impl Runnable<Vec<Message>, Message> for EchoModel {
275 async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
276 self.invocations.fetch_add(1, Ordering::SeqCst);
277 self.last_input_len.store(input.len(), Ordering::SeqCst);
278 Ok(Message::new(Role::Assistant, vec![ContentPart::text("ok")]))
279 }
280 }
281
282 fn user(text: &str) -> Message {
283 Message::new(Role::User, vec![ContentPart::text(text)])
284 }
285
286 fn assistant(text: &str) -> Message {
287 Message::new(Role::Assistant, vec![ContentPart::text(text)])
288 }
289
290 #[tokio::test]
291 async fn passes_through_below_threshold() {
292 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
293 let wrapped = EchoModel::new().with_compaction(compactor, 1024);
294
295 let input = vec![user("short"), assistant("ok")];
296 let _ = wrapped
297 .invoke(input.clone(), &ExecutionContext::new())
298 .await
299 .unwrap();
300 assert_eq!(
301 wrapped.inner().last_input_len.load(Ordering::SeqCst),
302 input.len()
303 );
304 }
305
306 #[tokio::test]
307 async fn compacts_when_threshold_exceeded() {
308 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
309 let model = EchoModel::new();
310 let wrapped = model.with_compaction(compactor, 30);
314
315 let input = vec![
316 user("one one one one"),
317 assistant("first reply long enough"),
318 user("two two two two"),
319 assistant("second reply long enough"),
320 user("three three three three"),
321 assistant("third reply"),
322 ];
323 let _ = wrapped
324 .invoke(input.clone(), &ExecutionContext::new())
325 .await
326 .unwrap();
327 let observed_len = wrapped.inner().last_input_len.load(Ordering::SeqCst);
328 assert!(
329 observed_len < input.len(),
330 "compaction must trim — got {observed_len}, input had {}",
331 input.len()
332 );
333 }
334
335 struct CapturingAuditSink {
339 compactions: Mutex<Vec<(usize, usize)>>,
340 }
341
342 impl CapturingAuditSink {
343 fn new() -> Self {
344 Self {
345 compactions: Mutex::new(Vec::new()),
346 }
347 }
348 }
349
350 impl entelix_core::AuditSink for CapturingAuditSink {
351 fn record_sub_agent_invoked(&self, _agent_id: &str, _sub_thread_id: &str) {}
352 fn record_agent_handoff(&self, _from: Option<&str>, _to: &str) {}
353 fn record_resumed(&self, _from_checkpoint: &str) {}
354 fn record_memory_recall(&self, _tier: &str, _namespace_key: &str, _hits: usize) {}
355 fn record_usage_limit_exceeded(&self, _breach: &entelix_core::UsageLimitBreach) {}
356 fn record_context_compacted(&self, dropped_chars: usize, retained_chars: usize) {
357 self.compactions
358 .lock()
359 .push((dropped_chars, retained_chars));
360 }
361 }
362
363 #[tokio::test]
364 async fn compaction_records_audit_event_when_threshold_exceeded() {
365 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
366 let model = EchoModel::new();
367 let wrapped = model.with_compaction(compactor, 30);
368 let sink = Arc::new(CapturingAuditSink::new());
369 let ctx = ExecutionContext::new()
370 .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
371
372 let input = vec![
373 user("padding to force compaction one one one one"),
374 assistant("more padding to force compaction"),
375 user("trailing turn"),
376 assistant("ok"),
377 ];
378 let _ = wrapped.invoke(input, &ctx).await.unwrap();
379
380 let captured = sink.compactions.lock().clone();
381 assert_eq!(captured.len(), 1, "exactly one compaction event expected");
382 let (dropped, _retained) = captured[0];
383 assert!(dropped > 0, "must report some dropped characters");
384 }
385
386 #[tokio::test]
387 async fn compaction_records_no_audit_event_below_threshold() {
388 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
389 let model = EchoModel::new();
390 let wrapped = model.with_compaction(compactor, 1024);
391 let sink = Arc::new(CapturingAuditSink::new());
392 let ctx = ExecutionContext::new()
393 .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
394
395 let input = vec![user("short"), assistant("ok")];
396 let _ = wrapped.invoke(input, &ctx).await.unwrap();
397
398 assert!(
399 sink.compactions.lock().is_empty(),
400 "no audit event expected when threshold is not crossed"
401 );
402 }
403
404 #[tokio::test]
405 async fn empty_messages_pass_through() {
406 let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
407 let model = EchoModel::new();
408 let wrapped = model.with_compaction(compactor, 1024);
409 let _ = wrapped
410 .invoke(Vec::new(), &ExecutionContext::new())
411 .await
412 .unwrap();
413 assert_eq!(wrapped.inner().last_input_len.load(Ordering::SeqCst), 0);
414 }
415
416 struct StubSummariser {
421 captured_prompt: Mutex<Vec<Message>>,
422 reply: String,
423 }
424
425 impl StubSummariser {
426 fn new(reply: impl Into<String>) -> Self {
427 Self {
428 captured_prompt: Mutex::new(Vec::new()),
429 reply: reply.into(),
430 }
431 }
432 }
433
434 #[async_trait]
435 impl Runnable<Vec<Message>, Message> for StubSummariser {
436 async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
437 *self.captured_prompt.lock() = input;
438 Ok(Message::new(
439 Role::Assistant,
440 vec![ContentPart::text(self.reply.clone())],
441 ))
442 }
443 }
444
445 fn user_event(text: &str) -> entelix_session::GraphEvent {
446 entelix_session::GraphEvent::UserMessage {
447 content: vec![ContentPart::text(text)],
448 timestamp: chrono::Utc::now(),
449 }
450 }
451
452 fn assistant_event(text: &str) -> entelix_session::GraphEvent {
453 entelix_session::GraphEvent::AssistantMessage {
454 content: vec![ContentPart::text(text)],
455 usage: None,
456 timestamp: chrono::Utc::now(),
457 }
458 }
459
460 #[tokio::test]
461 async fn summary_compactor_skips_when_under_keep_recent_threshold() {
462 let summariser = Arc::new(StubSummariser::new("never invoked"));
463 let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(8);
464 let events = vec![
465 user_event("u1"),
466 assistant_event("a1"),
467 user_event("u2"),
468 assistant_event("a2"),
469 ];
470 let history = compactor
471 .compact(&events, 0, &ExecutionContext::new())
472 .await
473 .unwrap();
474 assert_eq!(history.len(), 4);
475 assert!(
476 summariser.captured_prompt.lock().is_empty(),
477 "summariser must NOT be invoked when total <= keep_recent_turns"
478 );
479 }
480
481 #[tokio::test]
482 async fn summary_compactor_replaces_older_turns_with_summary() {
483 let summariser = Arc::new(StubSummariser::new("brief recap"));
484 let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(2);
485 let events = vec![
486 user_event("oldest user"),
487 assistant_event("oldest assistant"),
488 user_event("middle user"),
489 assistant_event("middle assistant"),
490 user_event("newest user"),
491 assistant_event("newest assistant"),
492 ];
493 let history = compactor
494 .compact(&events, 0, &ExecutionContext::new())
495 .await
496 .unwrap();
497 assert_eq!(history.len(), 3);
499 if let Turn::User { content } = &history.turns()[0] {
501 if let ContentPart::Text { text, .. } = &content[0] {
502 assert!(text.contains("Summary"), "summary marker missing: {text}");
503 assert!(
504 text.contains("brief recap"),
505 "summariser reply missing: {text}"
506 );
507 }
508 } else {
509 panic!("expected User turn at head");
510 }
511 let captured_len;
513 let captured_role;
514 {
515 let captured = summariser.captured_prompt.lock();
516 captured_len = captured.len();
517 captured_role = captured[0].role;
518 }
519 assert!(
520 captured_len >= 5,
521 "expected system + ≥4 older messages, got {captured_len}"
522 );
523 assert!(matches!(captured_role, Role::System));
524 }
525
526 #[tokio::test]
527 async fn summary_compactor_with_system_prompt_overrides_default() {
528 let summariser = Arc::new(StubSummariser::new("ok"));
529 let compactor = SummaryCompactor::new(summariser.clone())
530 .with_keep_recent_turns(0)
531 .with_system_prompt("CUSTOM PROMPT MARKER");
532 let events = vec![user_event("hi"), assistant_event("hello")];
533 let _ = compactor
534 .compact(&events, 0, &ExecutionContext::new())
535 .await
536 .unwrap();
537 let prompt_text = {
538 let captured = summariser.captured_prompt.lock();
539 if let ContentPart::Text { text, .. } = &captured[0].content[0] {
540 text.clone()
541 } else {
542 panic!("expected Text part at system position");
543 }
544 };
545 assert!(
546 prompt_text.contains("CUSTOM PROMPT MARKER"),
547 "operator-supplied prompt must reach the summariser, got: {prompt_text}"
548 );
549 }
550}