use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use lash_core::PreparedContext;
use lash_core::plugin::{
CompactionContext, ContextCompaction, ContextCompactor, ContextError, PluginError,
PluginFactory, PluginOptions, PluginRegistrar, PluginSessionContext, SessionContextOverlay,
SessionCreateRequest, SessionPlugin, SessionStartPoint, TurnContextTransform,
TurnTransformContext,
};
use lash_core::{
InputItem, Message, MessageOrigin, MessageRole, Part, PartKind, PromptUsage, SessionSnapshot,
TurnInput,
};
const PRUNE_RECENT_USER_TURNS: usize = 2;
const COMPACTION_BUFFER_TOKENS: usize = 20_000;
const COMPACTION_KEEP_RECENT_TOKENS: usize = 20_000;
const PRUNE_CONTEXT_THRESHOLD: f64 = 0.6;
pub(crate) const ROLLING_HISTORY_PLUGIN_ID: &str = "rolling_history";
const COMPACTION_SUMMARY_TITLE: &str = "Compaction summary:";
const COMPACTION_PROMPT: &str = "Provide a detailed summary of the conversation above so a later session can continue the work without the full history.\n\nUse this template:\n---\n## Goal\n[What is the user trying to accomplish?]\n\n## Instructions\n- [Relevant instructions or constraints]\n\n## Discoveries\n[Important findings, failures, or decisions]\n\n## Accomplished\n[What is done, what is in progress, what remains]\n\n## Relevant files / directories\n[List important files or directories]\n---";
const PRUNED_IMAGE_PLACEHOLDER: &str = "[Image omitted from older context]";
const COMPACTED_IMAGE_PLACEHOLDER: &str = "[Image omitted during compaction]";
#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RollingHistoryConfig;
fn compaction_update_prompt(previous_summary: &str) -> String {
format!(
"A previous compaction summary exists (shown below). Update it with information from the conversation above.\n\n\
Rules:\n\
- PRESERVE all existing information from the previous summary\n\
- ADD new progress, decisions, and context from the new messages\n\
- Move items from in-progress to done where applicable\n\
- PRESERVE exact file paths, function names, and error messages\n\n\
Previous summary:\n{previous_summary}\n\n\
Use this template:\n---\n\
## Goal\n[What is the user trying to accomplish?]\n\n\
## Instructions\n- [Relevant instructions or constraints]\n\n\
## Discoveries\n[Important findings, failures, or decisions]\n\n\
## Accomplished\n[What is done, what is in progress, what remains]\n\n\
## Relevant files / directories\n[List important files or directories]\n---"
)
}
fn with_instructions(base: &str, instructions: Option<&str>) -> String {
match instructions {
Some(text) if !text.trim().is_empty() => {
format!("{base}\n\nAdditional focus:\n{}\n", text.trim())
}
_ => base.to_string(),
}
}
fn leading_system_prefix_len(msgs: &[Message]) -> usize {
msgs.iter()
.take_while(|msg| msg.role == MessageRole::System)
.count()
}
fn approx_token_count(text: &str) -> usize {
text.len().div_ceil(4)
}
fn strip_image_attachment(part: &mut Part, placeholder: &str) -> bool {
if !matches!(part.kind, PartKind::Image) || part.attachment.is_none() {
return false;
}
part.attachment = None;
part.content = placeholder.to_string();
true
}
fn prune_old_images(messages: &mut [Message]) -> bool {
let mut changed = false;
let mut recent_user_turns = 0usize;
'scan: for msg_idx in (0..messages.len()).rev() {
if is_compaction_summary_message(&messages[msg_idx]) {
break 'scan;
}
if messages[msg_idx].role == MessageRole::User {
recent_user_turns += 1;
}
if recent_user_turns < PRUNE_RECENT_USER_TURNS {
continue;
}
for part in std::sync::Arc::make_mut(&mut messages[msg_idx].parts).iter_mut() {
changed |= strip_image_attachment(part, PRUNED_IMAGE_PLACEHOLDER);
}
}
changed
}
fn strip_all_image_attachments(messages: &mut [Message], placeholder: &str) -> bool {
let mut changed = false;
for message in messages {
for part in std::sync::Arc::make_mut(&mut message.parts).iter_mut() {
changed |= strip_image_attachment(part, placeholder);
}
}
changed
}
fn is_compaction_summary_message(message: &Message) -> bool {
matches!(
message.origin,
Some(MessageOrigin::Plugin { ref plugin_id, .. }) if plugin_id == ROLLING_HISTORY_PLUGIN_ID
)
}
fn latest_user_index(messages: &[Message]) -> Option<usize> {
messages
.iter()
.rposition(|message| matches!(message.role, MessageRole::User))
}
fn find_compaction_cut_point(messages: &[Message], prefix_len: usize) -> usize {
let start = messages[prefix_len..]
.iter()
.rposition(is_compaction_summary_message)
.map(|i| prefix_len + i + 1)
.unwrap_or(prefix_len);
let mut accumulated = 0usize;
for idx in (start..messages.len()).rev() {
for part in messages[idx].parts.iter() {
accumulated += approx_token_count(&part.content);
if part.attachment.is_some() {
accumulated += 1200; }
}
if accumulated >= COMPACTION_KEEP_RECENT_TOKENS && messages[idx].role == MessageRole::User {
return idx;
}
}
latest_user_index(messages).unwrap_or(messages.len())
}
fn pruning_needed(prompt_usage: Option<&PromptUsage>, max_context_tokens: Option<usize>) -> bool {
let Some(usage) = prompt_usage else {
return false;
};
let Some(max_context) = max_context_tokens else {
return false;
};
if max_context == 0 {
return false;
}
(usage.context_budget_tokens as f64 / max_context as f64) >= PRUNE_CONTEXT_THRESHOLD
}
fn extract_previous_summary(messages: &[Message]) -> Option<String> {
messages.iter().rev().find_map(|m| {
if !is_compaction_summary_message(m) {
return None;
}
m.parts.first().map(|p| {
p.content
.strip_prefix(COMPACTION_SUMMARY_TITLE)
.unwrap_or(&p.content)
.trim()
.to_string()
})
})
}
fn compaction_needed(
prompt_usage: Option<&PromptUsage>,
max_context_tokens: Option<usize>,
) -> bool {
let Some(usage) = prompt_usage else {
return false;
};
let Some(max_context) = max_context_tokens else {
return false;
};
let usable = max_context.saturating_sub(COMPACTION_BUFFER_TOKENS.min(max_context));
usage.context_budget_tokens >= usable
}
fn compaction_turn_id(parent_turn_id: &str) -> String {
format!("{parent_turn_id}:rolling-history-compaction")
}
fn prompt_tail_window(messages: &[Message], cut_point: usize) -> Vec<Message> {
let prefix_len = leading_system_prefix_len(messages);
let latest_summary_index = messages[prefix_len..]
.iter()
.rposition(is_compaction_summary_message)
.map(|index| prefix_len + index);
let mut out = Vec::new();
out.extend_from_slice(&messages[..prefix_len]);
if let Some(summary_index) = latest_summary_index
&& summary_index < cut_point
{
out.push(messages[summary_index].clone());
}
out.extend_from_slice(&messages[cut_point..]);
out
}
async fn summarize_compaction_prefix(
session_id: &str,
state: &SessionSnapshot,
prefix_messages: Vec<Message>,
instructions: Option<&str>,
session_lifecycle: Arc<dyn lash_core::plugin::runtime_host::SessionLifecycleService>,
scoped_effect_controller: lash_core::ScopedEffectController<'_>,
) -> Result<Option<String>, ContextError> {
if prefix_messages.is_empty() {
return Ok(None);
}
let mut snapshot = lash_core::runtime::RuntimeSessionState::from_snapshot(state.clone());
snapshot.policy.max_turns = Some(1);
let mut messages = prefix_messages;
strip_all_image_attachments(&mut messages, COMPACTED_IMAGE_PLACEHOLDER);
snapshot.execution_state_snapshot = None;
snapshot.last_prompt_usage = None;
let previous_summary = extract_previous_summary(&messages);
snapshot.replace_active_read_state(&messages);
let compaction_session_id = format!("{session_id}-compaction");
let mut policy = snapshot.policy.clone();
policy.max_turns = Some(1);
let request = SessionCreateRequest::child(
session_id,
SessionStartPoint::Snapshot {
snapshot: Box::new(snapshot.to_snapshot()),
},
policy,
PluginOptions::default(),
"compaction",
)
.with_context_overlay(SessionContextOverlay {
include_base_tools: false,
tool_providers: Vec::new(),
prompt_contributions: Vec::new(),
})
.with_session_id(compaction_session_id);
let handle = session_lifecycle
.create_session(request)
.await
.map_err(ContextError::from)?;
let base_prompt = match previous_summary {
Some(prev) => compaction_update_prompt(&prev),
None => COMPACTION_PROMPT.to_string(),
};
let prompt_text = with_instructions(&base_prompt, instructions);
let turn_id = compaction_turn_id(scoped_effect_controller.scope_id());
let compaction_effect_controller = lash_core::ScopedEffectController::borrowed(
scoped_effect_controller.controller(),
lash_core::ExecutionScope::turn(&handle.session_id, &turn_id),
)
.map_err(|err| ContextError::Session(err.to_string()))?;
let request = lash_core::SessionTurnRequest::new(
&handle.session_id,
&turn_id,
TurnInput {
items: vec![InputItem::Text { text: prompt_text }],
image_blobs: HashMap::new(),
protocol_turn_options: None,
trace_turn_id: None,
protocol_extension: None,
turn_context: lash_core::TurnContext::default(),
},
compaction_effect_controller,
)
.map_err(|err| ContextError::Session(err.to_string()))?;
let turn = session_lifecycle.start_turn(request).await;
let _ = session_lifecycle.close_session(&handle.session_id).await;
let turn = turn.map_err(ContextError::from)?;
let summary = turn.assistant_output.safe_text.trim().to_string();
if summary.is_empty() {
return Ok(None);
}
Ok(Some(summary))
}
fn compaction_summary_seed(summary: &str) -> lash_core::SessionAppendNode {
lash_core::SessionAppendNode::message(
lash_core::PluginMessage::text(
MessageRole::Assistant,
format!("{COMPACTION_SUMMARY_TITLE}\n{summary}"),
)
.with_origin(MessageOrigin::Plugin {
plugin_id: ROLLING_HISTORY_PLUGIN_ID.to_string(),
transient: false,
}),
)
}
async fn compact_messages_core(
session_id: &str,
state: &SessionSnapshot,
messages: &[Message],
instructions: Option<&str>,
session_lifecycle: Arc<dyn lash_core::plugin::runtime_host::SessionLifecycleService>,
scoped_effect_controller: lash_core::ScopedEffectController<'_>,
) -> Result<Option<ContextCompaction>, ContextError> {
let prefix_len = leading_system_prefix_len(messages);
let cut_point = find_compaction_cut_point(messages, prefix_len);
if cut_point <= prefix_len {
return Ok(None);
}
let prefix_messages = messages[prefix_len..].to_vec();
let Some(summary) = summarize_compaction_prefix(
session_id,
state,
prefix_messages,
instructions,
session_lifecycle,
scoped_effect_controller,
)
.await?
else {
return Ok(None);
};
Ok(Some(ContextCompaction::new(vec![compaction_summary_seed(
&summary,
)])))
}
pub struct RollingHistoryPluginFactory {
config: RollingHistoryConfig,
}
impl RollingHistoryPluginFactory {
pub fn new(config: RollingHistoryConfig) -> Self {
Self { config }
}
}
impl Default for RollingHistoryPluginFactory {
fn default() -> Self {
Self::new(RollingHistoryConfig)
}
}
impl PluginFactory for RollingHistoryPluginFactory {
fn id(&self) -> &'static str {
ROLLING_HISTORY_PLUGIN_ID
}
fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
Ok(Arc::new(RollingHistoryPlugin {
config: self.config.clone(),
}))
}
}
struct RollingHistoryPlugin {
config: RollingHistoryConfig,
}
impl SessionPlugin for RollingHistoryPlugin {
fn id(&self) -> &'static str {
ROLLING_HISTORY_PLUGIN_ID
}
fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
let config = self.config.clone();
reg.context()
.prepare_turn(100, Arc::new(RollingTurnTransform::new(config.clone())));
reg.context()
.compact(100, Arc::new(RollingContextCompactor::new(config)));
Ok(())
}
}
struct RollingTurnTransform;
impl RollingTurnTransform {
fn new(_config: RollingHistoryConfig) -> Self {
Self
}
}
#[async_trait]
impl TurnContextTransform for RollingTurnTransform {
fn id(&self) -> &'static str {
"rolling_history.prepare_turn"
}
async fn transform(
&self,
ctx: &TurnTransformContext<'_>,
mut input: PreparedContext,
) -> Result<PreparedContext, ContextError> {
let prompt_usage = ctx.prompt_usage.as_ref();
let max_context_tokens = ctx.max_context_tokens;
let needs_pruning = pruning_needed(prompt_usage, max_context_tokens);
let needs_compaction = compaction_needed(prompt_usage, max_context_tokens);
if !needs_pruning && !needs_compaction {
return Ok(input);
}
let messages = input.messages.make_mut();
if needs_pruning {
prune_old_images(messages);
}
if !needs_compaction {
return Ok(input);
}
let messages = input.messages.make_mut();
let prefix_len = leading_system_prefix_len(messages);
let cut_point = find_compaction_cut_point(messages, prefix_len);
if cut_point <= prefix_len {
return Ok(input);
}
let projected = prompt_tail_window(messages, cut_point);
input.messages.replace(projected);
Ok(input)
}
}
struct RollingContextCompactor;
impl RollingContextCompactor {
fn new(_config: RollingHistoryConfig) -> Self {
Self
}
}
#[async_trait]
impl ContextCompactor for RollingContextCompactor {
fn id(&self) -> &'static str {
"rolling_history.compact"
}
async fn compact(
&self,
ctx: &CompactionContext<'_>,
) -> Result<Option<ContextCompaction>, ContextError> {
let session_id = ctx.session_id.clone();
let session_lifecycle = Arc::clone(&ctx.session_lifecycle);
let scoped_effect_controller = ctx.scoped_effect_controller.clone();
compact_messages_core(
&session_id,
&ctx.state.to_snapshot(),
ctx.state.messages(),
ctx.instructions.as_deref(),
session_lifecycle,
scoped_effect_controller,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use lash_core::{SessionGraph, SessionPolicy};
use serde_json::json;
fn text_message(id: &str, role: MessageRole, content: &str) -> Message {
Message {
id: id.to_string(),
role,
parts: vec![Part {
id: format!("{id}.p0"),
kind: PartKind::Text,
content: content.to_string(),
attachment: None,
tool_call_id: None,
tool_name: None,
tool_replay: None,
prune_state: lash_core::PruneState::Intact,
reasoning_meta: None,
response_meta: None,
}]
.into(),
origin: None,
}
}
fn image_message(id: &str, role: MessageRole, bytes: &[u8]) -> Message {
Message {
id: id.to_string(),
role,
parts: vec![Part {
id: format!("{id}.p0"),
kind: PartKind::Image,
content: String::new(),
attachment: Some(lash_core::session_model::message::PartAttachment {
reference: lash_core::AttachmentRef {
id: lash_core::AttachmentId::new(format!("{id}-att")),
media_type: lash_core::MediaType::Image(lash_core::ImageMediaType::Png),
byte_len: bytes.len() as u64,
width: None,
height: None,
label: None,
},
}),
tool_call_id: None,
tool_name: None,
tool_replay: None,
prune_state: lash_core::PruneState::Intact,
reasoning_meta: None,
response_meta: None,
}]
.into(),
origin: None,
}
}
use lash_core::testing::{MockSessionManager, mock_assembled_turn as empty_turn};
fn mock_manager() -> MockSessionManager {
MockSessionManager::default()
.with_tool_catalog(vec![
json!({"name":"exec_command"}),
json!({"name":"read_file"}),
])
.with_turn(empty_turn("root", "Compacted work summary"))
}
fn build_turn_ctx(
session_id: &str,
state: SessionSnapshot,
prompt_usage: Option<PromptUsage>,
max_context_tokens: Option<usize>,
manager: Arc<MockSessionManager>,
) -> TurnTransformContext<'static> {
TurnTransformContext {
session_id: session_id.to_string(),
state: state.read_view(),
prompt_usage,
max_context_tokens,
sessions: manager.clone(),
session_lifecycle: manager.clone(),
session_graph: manager,
scoped_effect_controller: lash_core::ScopedEffectController::shared(
Arc::new(lash_core::InlineRuntimeEffectController),
lash_core::ExecutionScope::turn(session_id, "rolling-history-test-turn"),
)
.expect("test scoped effect controller"),
direct_completions: lash_core::DirectCompletionClient::from_fn(|_, _| {
Err(lash_core::PluginError::Session(
"direct completions are unavailable in rolling history tests".to_string(),
))
}),
}
}
fn build_compaction_ctx(
session_id: &str,
state: SessionSnapshot,
instructions: Option<String>,
manager: Arc<MockSessionManager>,
) -> CompactionContext<'static> {
CompactionContext {
session_id: session_id.to_string(),
instructions,
state: state.read_view(),
sessions: manager.clone(),
session_lifecycle: manager.clone(),
session_graph: manager,
scoped_effect_controller: lash_core::ScopedEffectController::shared(
Arc::new(lash_core::InlineRuntimeEffectController),
lash_core::ExecutionScope::runtime_operation("rolling-history-compact-test"),
)
.expect("test scoped effect controller"),
}
}
#[tokio::test]
async fn rolling_turn_transform_strips_old_image_attachments() {
let messages = vec![
image_message("u0", MessageRole::User, &[1, 2, 3]),
text_message("u1", MessageRole::User, "recent"),
text_message("u2", MessageRole::User, "latest"),
];
let state = SessionSnapshot::default();
let manager = Arc::new(mock_manager());
let transform = RollingTurnTransform::new(RollingHistoryConfig);
let ctx = build_turn_ctx(
"root",
state,
Some(PromptUsage {
prompt_context_tokens: 130_000,
input_tokens: 130_000,
cached_input_tokens: 0,
context_budget_tokens: 130_000,
}),
Some(200_000),
manager,
);
let prepared = PreparedContext {
messages: messages.into(),
..Default::default()
};
let built = transform
.transform(&ctx, prepared)
.await
.expect("transform")
.messages;
let image_part = built[0].parts.first().expect("image part");
assert!(matches!(image_part.kind, PartKind::Image));
assert!(image_part.attachment.is_none());
assert_eq!(image_part.content, PRUNED_IMAGE_PLACEHOLDER);
}
#[tokio::test]
async fn rolling_turn_transform_projects_tail_without_summary() {
let manager = Arc::new(mock_manager());
let transform = RollingTurnTransform::new(RollingHistoryConfig);
let state = SessionSnapshot {
session_id: "root".to_string(),
policy: SessionPolicy::default(),
..Default::default()
};
let ctx = build_turn_ctx(
"root",
state,
Some(PromptUsage {
prompt_context_tokens: 90_000,
input_tokens: 90_000,
cached_input_tokens: 0,
context_budget_tokens: 90_000,
}),
Some(100_000),
manager.clone(),
);
let prepared = PreparedContext {
messages: vec![
text_message("u1", MessageRole::User, "old work"),
text_message("a1", MessageRole::Assistant, "assistant old"),
text_message("u2", MessageRole::User, "latest request"),
]
.into(),
..Default::default()
};
let built = transform
.transform(&ctx, prepared)
.await
.expect("transform")
.messages;
assert!(built.iter().any(|message| {
message
.parts
.iter()
.any(|part| part.content.contains("latest request"))
}));
assert!(!built.iter().any(|message| {
message
.parts
.iter()
.any(|part| part.content.contains("old work"))
}));
let created = manager.created_snapshot();
assert!(created.is_empty());
let turns = manager.turns.lock().expect("turns lock").clone();
assert!(turns.is_empty());
}
#[tokio::test]
async fn rolling_compactor_returns_summary_seed_for_new_frame() {
let manager = Arc::new(mock_manager());
let messages = vec![
text_message("u1", MessageRole::User, "old work"),
text_message("a1", MessageRole::Assistant, "assistant old"),
text_message("u2", MessageRole::User, "latest request"),
];
let state = SessionSnapshot {
session_id: "root".to_string(),
policy: SessionPolicy::default(),
session_graph: SessionGraph::from_active_read_state(&messages),
..Default::default()
};
let ctx = build_compaction_ctx(
"root",
state,
Some("focus on latest request".to_string()),
manager.clone(),
);
let compactor = RollingContextCompactor::new(RollingHistoryConfig);
let compaction = compactor
.compact(&ctx)
.await
.expect("compact")
.expect("compaction");
assert_eq!(compaction.initial_nodes.len(), 1);
let lash_core::SessionAppendNode::Message { message, .. } = &compaction.initial_nodes[0]
else {
panic!("expected summary message seed");
};
assert_eq!(message.role, MessageRole::Assistant);
assert!(
message
.first_text()
.expect("summary text")
.contains("Compacted work summary")
);
assert!(matches!(
message.origin.as_ref(),
Some(MessageOrigin::Plugin { plugin_id, .. }) if plugin_id == ROLLING_HISTORY_PLUGIN_ID
));
let created = manager.created_snapshot();
assert_eq!(created.len(), 1);
let turns = manager.turns.lock().expect("turns lock").clone();
assert_eq!(turns.len(), 1);
assert_eq!(
turns[0].1,
"rolling-history-compact-test:rolling-history-compaction"
);
assert_eq!(
turns[0].2.as_deref(),
Some("rolling-history-compact-test:rolling-history-compaction")
);
assert_eq!(
turns[0].3,
lash_core::ExecutionScope::turn(
"root-compaction",
"rolling-history-compact-test:rolling-history-compaction"
)
);
}
}