use super::storage::{MessageRange, PlanTask};
use crate::session::chat::session::ChatSession;
use crate::session::context::SessionId;
use crate::session::estimate_tokens;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
static PENDING_COMPRESSIONS: RwLock<Option<HashMap<SessionId, PendingTaskCompression>>> =
RwLock::new(None);
static PENDING_PHASE_COMPRESSIONS: RwLock<Option<HashMap<SessionId, PhaseCompressionRequest>>> =
RwLock::new(None);
static PENDING_PROJECT_COMPRESSIONS: RwLock<Option<HashMap<SessionId, ProjectCompressionRequest>>> =
RwLock::new(None);
lazy_static::lazy_static! {
static ref CLI_PENDING_COMPRESSION: Arc<Mutex<Option<PendingTaskCompression>>> = Arc::new(Mutex::new(None));
static ref CLI_PENDING_PHASE_COMPRESSION: Arc<Mutex<Option<PhaseCompressionRequest>>> = Arc::new(Mutex::new(None));
static ref CLI_PENDING_PROJECT_COMPRESSION: Arc<Mutex<Option<ProjectCompressionRequest>>> = Arc::new(Mutex::new(None));
}
fn effective_session_id() -> Option<SessionId> {
crate::session::context::current_session_id()
}
pub fn cleanup_compression_state(session_id: &SessionId) {
if let Ok(mut guard) = PENDING_COMPRESSIONS.write() {
if let Some(registry) = guard.as_mut() {
registry.remove(session_id);
}
}
if let Ok(mut guard) = PENDING_PHASE_COMPRESSIONS.write() {
if let Some(registry) = guard.as_mut() {
registry.remove(session_id);
}
}
if let Ok(mut guard) = PENDING_PROJECT_COMPRESSIONS.write() {
if let Some(registry) = guard.as_mut() {
registry.remove(session_id);
}
}
}
#[derive(Debug, Clone)]
struct PendingTaskCompression {
task: PlanTask,
force: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseCompression {
pub phase_name: String,
pub task_range: (usize, usize),
pub summary: String,
pub compressed_at: chrono::DateTime<chrono::Utc>,
pub message_range: MessageRange,
pub metrics: CompressionMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProjectCompression {
pub summary: String,
pub compressed_at: chrono::DateTime<chrono::Utc>,
pub message_range: MessageRange,
pub metrics: CompressionMetrics,
pub total_tasks: usize,
pub total_phases: usize,
}
#[derive(Debug, Clone)]
pub struct PhaseCompressionRequest {
pub phase_name: String,
pub task_range: (usize, usize),
pub summary: String,
pub message_range: Option<MessageRange>,
}
#[derive(Debug, Clone)]
pub struct ProjectCompressionRequest {
pub plan_title: String,
pub summary: String,
pub total_tasks: usize,
pub total_phases: usize,
pub message_range: Option<MessageRange>,
}
pub fn request_compression(task: PlanTask) {
crate::log_debug!("Compression requested for task: {}", task.title);
let ptc = PendingTaskCompression { task, force: false };
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_COMPRESSIONS.write().unwrap();
let registry = guard.get_or_insert_with(HashMap::new);
registry.insert(session_id, ptc);
} else {
let mut pending = CLI_PENDING_COMPRESSION.lock().unwrap();
*pending = Some(ptc);
}
}
pub fn request_forced_compression(task: PlanTask) {
crate::log_debug!("Forced compression requested for task: {}", task.title);
let ptc = PendingTaskCompression { task, force: true };
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_COMPRESSIONS.write().unwrap();
let registry = guard.get_or_insert_with(HashMap::new);
registry.insert(session_id, ptc);
} else {
let mut pending = CLI_PENDING_COMPRESSION.lock().unwrap();
*pending = Some(ptc);
}
}
pub fn set_pending_compression_range(start_index: usize, end_index: usize) -> Result<()> {
if start_index >= end_index {
crate::log_debug!(
"Compression range is empty (start={}, end={}) — nothing to compress, skipping",
start_index,
end_index
);
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_COMPRESSIONS.write().unwrap();
if let Some(registry) = guard.as_mut() {
registry.remove(&session_id);
}
} else {
let mut pending = CLI_PENDING_COMPRESSION.lock().unwrap();
*pending = None;
}
return Ok(());
}
let range = MessageRange {
start_index,
end_index,
};
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_COMPRESSIONS.write().unwrap();
if let Some(registry) = guard.as_mut() {
if let Some(ptc) = registry.get_mut(&session_id) {
ptc.task.message_range = Some(range);
crate::log_debug!(
"Compression range set: {} to {} for task '{}'",
start_index,
end_index,
ptc.task.title
);
return Ok(());
}
}
Err(anyhow!("No pending compression to set range for"))
} else {
let mut pending = CLI_PENDING_COMPRESSION.lock().unwrap();
if let Some(ref mut ptc) = *pending {
ptc.task.message_range = Some(range);
crate::log_debug!(
"Compression range set: {} to {} for task '{}'",
start_index,
end_index,
ptc.task.title
);
Ok(())
} else {
Err(anyhow!("No pending compression to set range for"))
}
}
}
pub async fn process_pending_compression(
session: &mut ChatSession,
) -> Result<Option<CompressionMetrics>> {
let ptc = if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_COMPRESSIONS.write().unwrap();
guard
.as_mut()
.and_then(|registry| registry.remove(&session_id))
} else {
let mut pending = CLI_PENDING_COMPRESSION.lock().unwrap();
pending.take()
};
if let Some(ptc) = ptc {
crate::log_debug!(
"Processing pending compression for task: {} (force: {})",
ptc.task.title,
ptc.force
);
let phase = format!("Compressing task ({})…", ptc.task.title);
crate::session::chat::animation_manager::get_animation_manager()
.set_phase(&phase)
.await;
let result = compress_completed_task(session, &ptc.task, ptc.force).await;
crate::session::chat::animation_manager::get_animation_manager().clear_phase();
result
} else {
Ok(None)
}
}
pub fn has_pending_compression() -> bool {
if let Some(session_id) = effective_session_id() {
let guard = PENDING_COMPRESSIONS.read().unwrap();
guard
.as_ref()
.map(|r| r.contains_key(&session_id))
.unwrap_or(false)
} else {
CLI_PENDING_COMPRESSION.lock().unwrap().is_some()
}
}
pub fn request_phase_compression(phase_name: String, task_range: (usize, usize), summary: String) {
crate::log_debug!(
"Phase compression requested: {} (tasks {}-{})",
phase_name,
task_range.0 + 1,
task_range.1 + 1
);
let req = PhaseCompressionRequest {
phase_name,
task_range,
summary,
message_range: None,
};
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_PHASE_COMPRESSIONS.write().unwrap();
let registry = guard.get_or_insert_with(HashMap::new);
registry.insert(session_id, req);
} else {
let mut pending = CLI_PENDING_PHASE_COMPRESSION.lock().unwrap();
*pending = Some(req);
}
}
pub fn request_project_compression(
plan_title: String,
summary: String,
total_tasks: usize,
total_phases: usize,
) {
crate::log_debug!(
"Project compression requested: {} ({} tasks, {} phases)",
plan_title,
total_tasks,
total_phases
);
let req = ProjectCompressionRequest {
plan_title,
summary,
total_tasks,
total_phases,
message_range: None,
};
if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_PROJECT_COMPRESSIONS.write().unwrap();
let registry = guard.get_or_insert_with(HashMap::new);
registry.insert(session_id, req);
} else {
let mut pending = CLI_PENDING_PROJECT_COMPRESSION.lock().unwrap();
*pending = Some(req);
}
}
pub fn has_pending_phase_compression() -> bool {
if let Some(session_id) = effective_session_id() {
let guard = PENDING_PHASE_COMPRESSIONS.read().unwrap();
guard
.as_ref()
.map(|r| r.contains_key(&session_id))
.unwrap_or(false)
} else {
CLI_PENDING_PHASE_COMPRESSION.lock().unwrap().is_some()
}
}
pub fn has_pending_project_compression() -> bool {
if let Some(session_id) = effective_session_id() {
let guard = PENDING_PROJECT_COMPRESSIONS.read().unwrap();
guard
.as_ref()
.map(|r| r.contains_key(&session_id))
.unwrap_or(false)
} else {
CLI_PENDING_PROJECT_COMPRESSION.lock().unwrap().is_some()
}
}
pub fn get_compression_id() -> Option<String> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
Some(format!(
"comp_{}_{}",
now.as_millis(),
now.as_nanos() ))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionMetrics {
pub messages_removed: usize,
pub tokens_saved: u64,
pub compression_ratio: f64, }
impl CompressionMetrics {
pub fn new(messages_removed: usize, tokens_saved: u64, original_tokens: u64) -> Self {
let compression_ratio = if original_tokens > 0 {
tokens_saved as f64 / original_tokens as f64
} else {
0.0
};
Self {
messages_removed,
tokens_saved,
compression_ratio,
}
}
}
pub async fn compress_completed_task(
session: &mut ChatSession,
task: &PlanTask,
force: bool,
) -> Result<Option<CompressionMetrics>> {
let summary = task
.summary
.as_ref()
.ok_or_else(|| anyhow!("Task has no summary - cannot compress"))?;
let message_range = task
.message_range
.as_ref()
.ok_or_else(|| anyhow!("Task has no message range - cannot compress"))?;
crate::log_debug!(
"Compressing task '{}' (messages {}-{})",
task.title,
message_range.start_index,
message_range.end_index
);
let tokens_before = calculate_range_tokens(session, message_range)?;
let compression_id = get_compression_id().unwrap_or_else(|| "unknown".to_string());
let plan_context = super::core::get_plan_context();
let file_refs = extract_file_refs_from_messages(
&session.session.messages[message_range.start_index..=message_range.end_index],
);
let compressed_entry = format_compressed_summary(
task,
summary,
&compression_id,
plan_context.as_ref(),
&file_refs,
);
let tokens_after = estimate_tokens(&compressed_entry) as u64;
if tokens_after >= tokens_before {
crate::log_info!(
"Task compression skipped: {} tokens before, {} tokens after (no savings).",
tokens_before,
tokens_after
);
return Ok(None);
}
if !force {
const MIN_CONTEXT_FRACTION: f64 = 0.20;
let total_session_tokens =
crate::session::estimate_session_tokens(&session.session.messages) as f64;
if total_session_tokens > 0.0 {
let range_fraction = tokens_before as f64 / total_session_tokens;
if range_fraction < MIN_CONTEXT_FRACTION {
crate::log_info!(
"Task compression skipped: range is {:.1}% of context ({} / {} tokens) — below 20% threshold.",
range_fraction * 100.0,
tokens_before,
total_session_tokens as u64
);
return Ok(None);
}
}
}
let mut adjusted_start = message_range.start_index;
if let Some(anchor) = session.session.messages.get(adjusted_start) {
if anchor.role == "assistant" && anchor.tool_calls.is_some() {
let mut next = adjusted_start + 1;
while next < session.session.messages.len()
&& session.session.messages[next].role == "tool"
{
next += 1;
}
if next > adjusted_start + 1 && next < session.session.messages.len() {
crate::log_debug!(
"Task compression: advancing start_index past {} tool results ({} -> {})",
next - adjusted_start - 1,
adjusted_start,
next
);
adjusted_start = next;
}
}
}
let (messages_removed, _) =
session.remove_messages_in_range(adjusted_start, message_range.end_index)?;
session.insert_compressed_knowledge(adjusted_start, compressed_entry)?;
let tokens_saved = tokens_before.saturating_sub(tokens_after);
let metrics = CompressionMetrics::new(messages_removed, tokens_saved, tokens_before);
crate::log_debug!(
"Compression complete: {} messages removed, {} tokens saved ({:.1}% reduction)",
metrics.messages_removed,
metrics.tokens_saved,
metrics.compression_ratio * 100.0
);
let _ = crate::session::logger::log_compression_point(
&session.session.info.name,
"task",
messages_removed,
tokens_saved,
&session.session.messages,
);
session.session.info.current_non_cached_tokens = 0;
session.session.info.current_total_tokens = 0;
session.session.info.last_cache_checkpoint_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
super::core::clear_task_start_index();
crate::log_debug!(
"Cleared start_index after successful compression - next task will set new start_index"
);
Ok(Some(metrics))
}
fn format_compressed_summary(
task: &PlanTask,
summary: &str,
compression_id: &str,
plan_context: Option<&(String, usize, usize, String)>,
file_refs: &[String],
) -> String {
let completed_at = task
.completed_at
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "Unknown".to_string());
let mut output = format!(
"## Task Completed: {} [COMPRESSED: {}]\n\n\
**Description**: {}\n\n\
**Summary**: {}\n\n\
**Completed**: {}\n\n",
task.title, compression_id, task.description, summary, completed_at
);
if let Some((plan_title, completed_count, total_tasks, current_task_title)) = plan_context {
output.push_str(&format!(
"🎯 **ACTIVE PLAN**: {}\n\
- Progress: {}/{} tasks completed\n\
- Current Task: {}\n\
- Status: IN PROGRESS (use plan commands to continue)\n\n",
plan_title, completed_count, total_tasks, current_task_title
));
}
if !file_refs.is_empty() {
output.push_str("\n**File references (can be re-read on demand):**\n");
for ref_str in file_refs.iter().take(10) {
output.push_str(&format!("- {}\n", ref_str));
}
output.push('\n');
}
output.push_str(&format!(
"**Compression Info**:\n\
- ID: `{}`\n\
- Type: Task-level compression\n\
- Retrievable: Use `/retrieve {}` to expand (future feature)\n\n\
---\n\
*Compressed - Detailed tool calls and intermediate work removed to optimize context.*",
compression_id, compression_id
));
output
}
fn calculate_range_tokens(session: &ChatSession, range: &MessageRange) -> Result<u64> {
let mut total_tokens = 0u64;
if range.start_index >= session.session.messages.len() {
return Err(anyhow::anyhow!("Invalid start_index in message range"));
}
if range.end_index >= session.session.messages.len() {
return Err(anyhow::anyhow!("Invalid end_index in message range"));
}
for i in (range.start_index + 1)..=range.end_index {
if let Some(message) = session.session.messages.get(i) {
let tokens = crate::session::estimate_message_tokens(message) as u64;
total_tokens += tokens;
}
}
Ok(total_tokens)
}
pub async fn process_pending_phase_compression(
session: &mut ChatSession,
) -> Result<Option<CompressionMetrics>> {
let request = if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_PHASE_COMPRESSIONS.write().unwrap();
guard
.as_mut()
.and_then(|registry| registry.remove(&session_id))
} else {
let mut pending = CLI_PENDING_PHASE_COMPRESSION.lock().unwrap();
pending.take()
};
if let Some(req) = request {
crate::log_debug!("Processing pending phase compression: {}", req.phase_name);
let phase = format!("Compressing phase ({})…", req.phase_name);
crate::session::chat::animation_manager::get_animation_manager()
.set_phase(&phase)
.await;
let result = compress_phase(session, &req).await;
crate::session::chat::animation_manager::get_animation_manager().clear_phase();
result
} else {
Ok(None)
}
}
async fn compress_phase(
session: &mut ChatSession,
request: &PhaseCompressionRequest,
) -> Result<Option<CompressionMetrics>> {
let mut task_summaries = Vec::new();
let mut start_index = None;
let mut end_index = None;
for (i, msg) in session.session.messages.iter().enumerate() {
if let Some(name) = &msg.name {
if name == "plan_compression" && msg.content.contains("## Task Completed:") {
task_summaries.push((i, msg.content.clone()));
if start_index.is_none() {
start_index = Some(i);
}
end_index = Some(i);
}
}
}
if task_summaries.is_empty() {
return Err(anyhow!("No task compressions found for phase compression"));
}
let start_idx = start_index.unwrap();
let end_idx = end_index.unwrap();
let range_start = if start_idx > 0 { start_idx - 1 } else { 0 };
let tokens_before = calculate_range_tokens(
session,
&MessageRange {
start_index: range_start,
end_index: end_idx,
},
)?;
let phase_summary =
format_phase_summary(&request.phase_name, &request.summary, task_summaries.len());
let tokens_after = estimate_tokens(&phase_summary) as u64;
if tokens_after >= tokens_before {
crate::log_info!(
"Phase compression skipped: {} tokens before, {} tokens after (no savings).",
tokens_before,
tokens_after
);
return Ok(None);
}
let (messages_removed, _) = session.remove_messages_in_range(range_start, end_idx)?;
session.insert_compressed_knowledge(range_start, phase_summary)?;
let tokens_saved = tokens_before.saturating_sub(tokens_after);
let metrics = CompressionMetrics::new(messages_removed, tokens_saved, tokens_before);
crate::log_debug!(
"Phase compression '{}': {} task summaries → 1 phase summary, {} tokens saved",
request.phase_name,
task_summaries.len(),
metrics.tokens_saved
);
let _ = crate::session::logger::log_compression_point(
&session.session.info.name,
"phase",
messages_removed,
tokens_saved,
&session.session.messages,
);
session.session.info.current_non_cached_tokens = 0;
session.session.info.current_total_tokens = 0;
session.session.info.last_cache_checkpoint_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
super::core::clear_task_start_index();
crate::log_debug!(
"Cleared start_index after successful phase compression - next task will set new start_index"
);
Ok(Some(metrics))
}
fn format_phase_summary(phase_name: &str, summary: &str, task_count: usize) -> String {
let compression_id = get_compression_id().unwrap_or_else(|| "unknown".to_string());
format!(
"## Phase Completed: {} [COMPRESSED: {}]\n\n\
**Tasks Completed**: {}\n\n\
**Summary**: {}\n\n\
**Compression Info**:\n\
- ID: `{}`\n\
- Type: Phase-level compression\n\
- Retrievable: Use `/retrieve {}` to expand (future feature)\n\n\
---\n\
*Phase Compression - {} task summaries compressed into phase overview*",
phase_name, compression_id, task_count, summary, compression_id, compression_id, task_count
)
}
pub async fn process_pending_project_compression(
session: &mut ChatSession,
) -> Result<Option<CompressionMetrics>> {
let request = if let Some(session_id) = effective_session_id() {
let mut guard = PENDING_PROJECT_COMPRESSIONS.write().unwrap();
guard
.as_mut()
.and_then(|registry| registry.remove(&session_id))
} else {
let mut pending = CLI_PENDING_PROJECT_COMPRESSION.lock().unwrap();
pending.take()
};
if let Some(req) = request {
crate::log_debug!("Processing pending project compression: {}", req.plan_title);
let phase = format!("Compressing project ({})…", req.plan_title);
crate::session::chat::animation_manager::get_animation_manager()
.set_phase(&phase)
.await;
let result = compress_project(session, &req).await;
crate::session::chat::animation_manager::get_animation_manager().clear_phase();
result
} else {
Ok(None)
}
}
async fn compress_project(
session: &mut ChatSession,
request: &ProjectCompressionRequest,
) -> Result<Option<CompressionMetrics>> {
let mut compression_indices = Vec::new();
for (i, msg) in session.session.messages.iter().enumerate() {
if let Some(name) = &msg.name {
if name == "plan_compression" {
compression_indices.push(i);
}
}
}
if compression_indices.len() < 2 {
crate::log_info!(
"Project compression skipped: need at least 2 task/phase compressions to consolidate (found {})",
compression_indices.len()
);
return Ok(None);
}
let start_idx = *compression_indices.first().unwrap();
let end_idx = *compression_indices.last().unwrap();
let range_start = if start_idx > 0 { start_idx - 1 } else { 0 };
let tokens_before = calculate_range_tokens(
session,
&MessageRange {
start_index: range_start,
end_index: end_idx,
},
)?;
let project_summary = format_project_summary(
&request.plan_title,
&request.summary,
request.total_tasks,
request.total_phases,
compression_indices.len(),
);
let tokens_after = estimate_tokens(&project_summary) as u64;
if tokens_after >= tokens_before {
crate::log_info!(
"Project compression skipped: {} tokens before, {} tokens after (no savings).",
tokens_before,
tokens_after
);
return Ok(None);
}
let (messages_removed, _) = session.remove_messages_in_range(range_start, end_idx)?;
session.insert_compressed_knowledge(range_start, project_summary)?;
let tokens_saved = tokens_before.saturating_sub(tokens_after);
let metrics = CompressionMetrics::new(messages_removed, tokens_saved, tokens_before);
crate::log_debug!(
"Project compression complete: {} summaries → 1 project summary, {} tokens saved",
compression_indices.len(),
metrics.tokens_saved
);
let _ = crate::session::logger::log_compression_point(
&session.session.info.name,
"project",
messages_removed,
tokens_saved,
&session.session.messages,
);
session.session.info.current_non_cached_tokens = 0;
session.session.info.current_total_tokens = 0;
session.session.info.last_cache_checkpoint_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
super::core::clear_task_start_index();
crate::log_debug!("Cleared start_index after successful project compression - plan complete");
Ok(Some(metrics))
}
fn extract_file_refs_from_messages(messages: &[crate::session::Message]) -> Vec<String> {
let mut refs: Vec<String> = Vec::new();
for msg in messages {
if msg.role != "assistant" {
continue;
}
if let Some(calls) = msg.tool_calls.as_ref().and_then(|v| v.as_array()) {
for call in calls {
let name = call
.get("function")
.and_then(|f| f.get("name"))
.and_then(|n| n.as_str())
.unwrap_or("unknown");
if let Some(args) = call.get("function").and_then(|f| f.get("arguments")) {
crate::session::chat::file_context::extract_file_refs_from_args(
name, args, &mut refs,
);
}
}
}
}
crate::session::chat::file_context::merge_file_refs(&refs)
}
fn format_project_summary(
plan_title: &str,
summary: &str,
total_tasks: usize,
total_phases: usize,
summaries_compressed: usize,
) -> String {
let compression_id = get_compression_id().unwrap_or_else(|| "unknown".to_string());
format!(
"## Project Completed: {} [COMPRESSED: {}]\n\n\
**Scale**: {} tasks across {} phases\n\n\
**Summary**: {}\n\n\
**Compression Info**:\n\
- ID: `{}`\n\
- Type: Project-level compression\n\
- Retrievable: Use `/retrieve {}` to expand (future feature)\n\n\
---\n\
*Project Compression - {} summaries consolidated into final project overview*",
plan_title,
compression_id,
total_tasks,
total_phases,
summary,
compression_id,
compression_id,
summaries_compressed
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compression_metrics_calculation() {
let metrics = CompressionMetrics::new(10, 5000, 10000);
assert_eq!(metrics.messages_removed, 10);
assert_eq!(metrics.tokens_saved, 5000);
assert_eq!(metrics.compression_ratio, 0.5);
}
#[test]
fn test_compression_metrics_zero_original() {
let metrics = CompressionMetrics::new(5, 0, 0);
assert_eq!(metrics.compression_ratio, 0.0);
}
#[test]
fn test_format_compressed_summary() {
use chrono::Utc;
let task = PlanTask {
title: "Test Task".to_string(),
description: "Test description".to_string(),
details: "Some details".to_string(),
summary: Some("Task completed successfully".to_string()),
status: super::super::storage::TaskStatus::Completed,
completed_at: Some(Utc::now()),
message_range: None,
phase: None,
};
let formatted =
format_compressed_summary(&task, "Task completed successfully", "test_123", None, &[]);
assert!(formatted.contains("## Task Completed: Test Task"));
assert!(formatted.contains("**Description**: Test description"));
assert!(formatted.contains("**Summary**: Task completed successfully"));
assert!(formatted.contains("Compressed"));
assert!(formatted.contains("test_123"));
}
#[test]
fn task_compression_advances_past_tool_results_to_prevent_orphans() {
use crate::session::Message;
use serde_json::json;
fn msg(role: &str) -> Message {
Message {
role: role.to_string(),
content: format!("{} message", role),
timestamp: 1000,
cached: false,
cache_ttl: None,
tool_call_id: None,
name: None,
tool_calls: None,
images: None,
videos: None,
thinking: None,
id: None,
}
}
let mut messages = Vec::new();
messages.push(msg("system"));
let mut assistant = msg("assistant"); assistant.content = "I'll check those files.".to_string();
assistant.tool_calls = Some(json!([
{"id": "call_A", "type": "function", "function": {"name": "view_signatures", "arguments": "{}"}},
{"id": "call_B", "type": "function", "function": {"name": "view", "arguments": "{}"}}
]));
messages.push(assistant);
let mut tool_a = msg("tool"); tool_a.tool_call_id = Some("call_A".to_string());
tool_a.name = Some("view_signatures".to_string());
messages.push(tool_a);
let mut tool_b = msg("tool"); tool_b.tool_call_id = Some("call_B".to_string());
tool_b.name = Some("view".to_string());
messages.push(tool_b);
messages.push(msg("assistant")); messages.push(msg("user")); messages.push(msg("assistant"));
let start_index = 1usize;
let end_index = 6usize;
let drain_range = start_index + 1..=end_index;
let tool_ids_in_anchor: Vec<&str> = messages[start_index]
.tool_calls
.as_ref()
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|tc| tc["id"].as_str().unwrap())
.collect();
let tool_results_in_drain: Vec<&str> = messages[drain_range.clone()]
.iter()
.filter(|m| m.role == "tool")
.filter_map(|m| m.tool_call_id.as_deref())
.collect();
assert!(
tool_results_in_drain.contains(&"call_A"),
"Bug scenario: call_A tool result IS in drain range (would be orphaned without fix)"
);
assert!(
tool_results_in_drain.contains(&"call_B"),
"Bug scenario: call_B tool result IS in drain range (would be orphaned without fix)"
);
let mut adjusted_start = start_index;
if let Some(anchor) = messages.get(adjusted_start) {
if anchor.role == "assistant" && anchor.tool_calls.is_some() {
let mut next = adjusted_start + 1;
while next < messages.len() && messages[next].role == "tool" {
next += 1;
}
if next > adjusted_start + 1 && next < messages.len() {
adjusted_start = next;
}
}
}
assert_eq!(
adjusted_start, 4,
"start_index must advance past tool results to avoid orphaning tool_calls"
);
let safe_drain_range = adjusted_start + 1..=end_index;
for msg in messages[safe_drain_range.clone()].iter() {
if msg.role == "tool" {
if let Some(ref tc_id) = msg.tool_call_id {
assert!(
!tool_ids_in_anchor.contains(&tc_id.as_str()),
"Drain range must not include tool results for anchor's tool_calls. Found {}",
tc_id
);
}
}
}
messages.drain(safe_drain_range);
let assistant_msg = &messages[1];
assert!(
assistant_msg.tool_calls.is_some(),
"Assistant message should still have tool_calls"
);
let tool_call_ids: Vec<String> = assistant_msg
.tool_calls
.as_ref()
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|tc| tc["id"].as_str().unwrap().to_string())
.collect();
for tc_id in &tool_call_ids {
let has_result = messages
.iter()
.any(|m| m.role == "tool" && m.tool_call_id.as_deref() == Some(tc_id.as_str()));
assert!(
has_result,
"tool_call {} must have a matching tool_result — got orphaned!",
tc_id
);
}
}
#[test]
fn task_compression_no_advancement_when_no_tool_calls() {
use crate::session::Message;
fn msg(role: &str) -> Message {
Message {
role: role.to_string(),
content: format!("{} message", role),
timestamp: 1000,
cached: false,
cache_ttl: None,
tool_call_id: None,
name: None,
tool_calls: None,
images: None,
videos: None,
thinking: None,
id: None,
}
}
let messages = [
msg("system"), msg("user"), msg("assistant"), msg("user"), msg("assistant"), ];
let start_index = 1usize;
let mut adjusted_start = start_index;
if let Some(anchor) = messages.get(adjusted_start) {
if anchor.role == "assistant" && anchor.tool_calls.is_some() {
let mut next = adjusted_start + 1;
while next < messages.len() && messages[next].role == "tool" {
next += 1;
}
if next > adjusted_start + 1 && next < messages.len() {
adjusted_start = next;
}
}
}
assert_eq!(
adjusted_start, start_index,
"start_index should not change when anchor has no tool_calls"
);
}
}