use super::*;
use crate::agent::attachment_content::build_attachment_content;
use crate::agent::turn_render::RenderMode;
use crate::agent::vision::user_message_content_matches;
use crate::execution_policy::PolicyBundle;
use crate::traits::MessageAttachment;
pub(super) struct MessageBuildCtx<'a> {
pub session_id: &'a str,
pub iteration: usize,
pub user_text: &'a str,
pub current_attachments: &'a [MessageAttachment],
pub completed_tool_calls: &'a [String],
pub model: &'a str,
pub core_prompt: &'a str,
pub task_context_tail: &'a str,
pub tool_defs: &'a [Value],
pub policy_bundle: &'a PolicyBundle,
pub pending_system_messages: &'a mut Vec<SystemDirective>,
pub empty_response_retry_pending: bool,
pub status_tx: &'a Option<mpsc::Sender<StatusUpdate>>,
}
pub(super) struct MessageBuildData {
pub messages: Vec<Value>,
pub tool_defs: Vec<Value>,
pub est_input_tokens: u32,
}
const EMPTY_RETRY_MAX_PARENT_CHARS: usize = 800;
const EXECUTION_CHECKPOINT_MAX_REQUEST_CHARS: usize = 240;
const EXECUTION_CHECKPOINT_MAX_ACTIVITY_CHARS: usize = 900;
const EXECUTION_CHECKPOINT_MAX_EVIDENCE_CHARS: usize = 500;
const RESPONSE_RESERVE_TOKENS: usize = 1_536;
const MIN_MESSAGE_BUDGET_TOKENS: usize = 1_024;
const TOKEN_ESTIMATE_SAFETY_MARGIN: usize = 256;
const CURRENT_TURN_RESERVE_TOKENS: usize = 4_000;
const ARCHIVED_BUDGET_SAFETY_MARGIN: f64 = 0.10;
fn trimmed_message_content(message: &Value) -> Option<String> {
message
.get("content")
.and_then(|c| c.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
}
fn truncate_parent_for_empty_retry(content: &str) -> String {
let mut out: String = content.chars().take(EMPTY_RETRY_MAX_PARENT_CHARS).collect();
if content.chars().count() > EMPTY_RETRY_MAX_PARENT_CHARS {
out.push_str("...");
}
out
}
fn is_current_user_message(message: &Value, user_text: &str) -> bool {
message.get("role").and_then(|r| r.as_str()) == Some("user")
&& message
.get("content")
.is_some_and(|content| user_message_content_matches(content, user_text))
}
fn find_current_user_position(messages: &[Value], user_text: &str) -> Option<usize> {
messages
.iter()
.rposition(|m| is_current_user_message(m, user_text))
}
fn current_turn_user_attachments(
turn: &crate::events::FetchedTurn,
user_text: &str,
) -> Vec<MessageAttachment> {
turn.messages
.iter()
.filter(|m| m.role == "user")
.find(|m| m.content.as_deref() == Some(user_text))
.map(|m| m.attachments.clone())
.unwrap_or_default()
}
fn build_empty_response_retry_messages(existing: &[Value], user_text: &str) -> Vec<Value> {
let current_idx = find_current_user_position(existing, user_text);
let search_end = current_idx.unwrap_or(existing.len());
let prev_assistant = existing
.iter()
.take(search_end)
.rev()
.find(|m| m.get("role").and_then(|r| r.as_str()) == Some("assistant"))
.and_then(trimmed_message_content);
let prev_user = existing
.iter()
.take(search_end)
.rev()
.find(|m| {
if m.get("role").and_then(|r| r.as_str()) != Some("user") {
return false;
}
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|content| content != user_text && !content.trim().is_empty())
})
.and_then(trimmed_message_content);
let mut recovered = Vec::new();
if let Some(prev_user) = prev_user {
recovered.push(json!({
"role": "user",
"content": truncate_parent_for_empty_retry(&prev_user),
}));
}
if let Some(prev_assistant) = prev_assistant {
recovered.push(json!({
"role": "assistant",
"content": truncate_parent_for_empty_retry(&prev_assistant),
}));
}
recovered.push(json!({
"role": "user",
"content": user_text,
}));
recovered
}
fn tool_is_low_info_for_checkpoint(tool_name: &str) -> bool {
matches!(
tool_name,
"write_file"
| "edit_file"
| "manage_memories"
| "manage_people"
| "remember_fact"
| "check_environment"
)
}
fn build_execution_checkpoint_message(
user_text: &str,
completed_tool_calls: &[String],
current_interaction: &[&Message],
) -> Option<String> {
let trimmed_user = user_text.trim();
if trimmed_user.is_empty() || completed_tool_calls.is_empty() {
return None;
}
let activity = super::post_task::categorize_tool_calls(completed_tool_calls);
let latest_evidence = current_interaction.iter().rev().find_map(|message| {
if message.role != "tool" {
return None;
}
let tool_name = message.tool_name.as_deref().unwrap_or("").trim();
if tool_name.is_empty() || tool_is_low_info_for_checkpoint(tool_name) {
return None;
}
let content = message.primary_content()?;
let content = content.trim();
if content.is_empty() {
return None;
}
Some(format!(
"- {}: {}",
tool_name,
truncate_for_resume(content, EXECUTION_CHECKPOINT_MAX_EVIDENCE_CHARS)
))
});
let mut lines = vec![
"[SYSTEM] EXECUTION CHECKPOINT: You are still working on the same active request from this turn.".to_string(),
format!(
"Active request: {}",
truncate_for_resume(trimmed_user, EXECUTION_CHECKPOINT_MAX_REQUEST_CHARS)
),
];
if !activity.trim().is_empty() {
lines.push("Completed work so far:".to_string());
lines.push(truncate_for_resume(
activity.trim(),
EXECUTION_CHECKPOINT_MAX_ACTIVITY_CHARS,
));
}
if let Some(evidence) = latest_evidence {
lines.push("Latest concrete evidence:".to_string());
lines.push(evidence);
}
lines.push("Continue from this checkpoint. Do NOT reset into a generic availability reply or ask what the user wants help with. Either take the next step for this request, answer with concrete results if it is complete, or state the blocker tied to this request.".to_string());
Some(lines.join("\n"))
}
pub(super) async fn run_message_build_phase(
services: &super::services::AgentServices<'_>,
ctx: &mut MessageBuildCtx<'_>,
) -> anyhow::Result<MessageBuildData> {
let agent = services.agent;
let session_id = ctx.session_id;
let iteration = ctx.iteration;
let user_text = ctx.user_text;
let current_attachments = ctx.current_attachments;
let completed_tool_calls = ctx.completed_tool_calls;
let model = ctx.model;
let core_prompt = ctx.core_prompt;
let task_context_tail = ctx.task_context_tail;
let original_tool_defs = ctx.tool_defs;
let policy_bundle = ctx.policy_bundle;
let pending_system_messages = &mut *ctx.pending_system_messages;
let empty_response_retry_pending = ctx.empty_response_retry_pending;
let status_tx = ctx.status_tx;
let render_options = agent.render_options(model);
let total_context_budget =
crate::memory::context_window::model_context_budget(model, &agent.context_window_config);
let system_tokens = crate::memory::context_window::estimate_tokens(core_prompt)
+ crate::memory::context_window::estimate_tokens(task_context_tail);
let original_tool_tokens =
crate::memory::context_window::estimate_tool_definition_tokens(original_tool_defs);
let tool_budget = total_context_budget
.saturating_sub(system_tokens + RESPONSE_RESERVE_TOKENS + MIN_MESSAGE_BUDGET_TOKENS);
let mut effective_tool_defs = crate::memory::context_window::fit_tool_definitions_to_budget(
original_tool_defs,
tool_budget,
);
if effective_tool_defs != original_tool_defs {
info!(
session_id,
iteration,
model,
total_context_budget,
original_tool_tokens,
effective_tool_tokens = crate::memory::context_window::estimate_tool_definition_tokens(
&effective_tool_defs
),
tool_count = effective_tool_defs.len(),
"Compacted tool schema descriptions for model context compatibility"
);
}
let mut tool_defs = effective_tool_defs.as_slice();
use crate::events::TerminalState;
let core_tokens = crate::memory::context_window::estimate_tokens(core_prompt);
let tail_tokens = crate::memory::context_window::estimate_tokens(task_context_tail);
let tool_tokens_for_budget =
crate::memory::context_window::estimate_tool_definition_tokens(tool_defs);
let archived_budget = super::turn_eviction::archived_budget(
total_context_budget,
core_tokens,
tool_tokens_for_budget,
tail_tokens,
CURRENT_TURN_RESERVE_TOKENS,
crate::memory::context_window::CONTEXT_RESPONSE_RESERVE_TOKENS,
ARCHIVED_BUDGET_SAFETY_MARGIN,
);
let current_turn_id: Option<String> =
agent.current_turn_ids.read().await.get(session_id).cloned();
let anchor: i64 = {
let existing = agent.turn_anchors.read().await.get(session_id).copied();
match existing {
Some(a) => a,
None => {
let low_water = super::turn_eviction::low_water(archived_budget);
let mut before: Option<i64> = None;
let mut accumulated: usize = 0;
let mut init_anchor: Option<i64> = None;
loop {
let page = agent
.event_store
.get_recent_turns_page(session_id, before, 1)
.await?;
let Some(turn) = page.into_iter().next() else {
break;
};
let terminal_state = TerminalState::from_task_status(turn.terminal_status);
let rendered = super::turn_render::render_turn(
&turn.messages,
super::turn_render::RenderMode::Archived { terminal_state },
super::turn_render::RENDERER_VERSION,
&render_options,
);
let est = crate::memory::context_window::estimate_tokens(
&serde_json::to_string(&rendered).unwrap_or_default(),
);
if init_anchor.is_none() {
init_anchor = Some(turn.turn_seq);
before = Some(turn.turn_seq);
continue;
}
if accumulated + est > low_water {
break;
}
accumulated = accumulated.saturating_add(est);
init_anchor = Some(turn.turn_seq);
before = Some(turn.turn_seq);
}
let resolved = init_anchor.unwrap_or(0);
agent
.turn_anchors
.write()
.await
.insert(session_id.to_string(), resolved);
info!(
session_id,
anchor = resolved,
archived_budget,
"anchor initialized (cold start)"
);
resolved
}
}
};
let mut turns = agent
.event_store
.get_turns_from_anchor(session_id, anchor)
.await?;
let last_is_current = turns
.last()
.map(|t| t.turn_id.as_deref() == current_turn_id.as_deref() && current_turn_id.is_some())
.unwrap_or(false);
if !last_is_current {
let synthetic_turn_id = current_turn_id.clone();
let synthetic_seq = turns.last().map(|t| t.turn_seq + 1).unwrap_or(0);
let current_user = Message {
id: format!("synthetic-current-user-{}", uuid::Uuid::new_v4()),
session_id: session_id.to_string(),
role: "user".to_string(),
content: Some(user_text.to_string()),
tool_call_id: None,
tool_name: None,
tool_calls_json: None,
created_at: chrono::Utc::now(),
importance: 1.0,
turn_id: synthetic_turn_id.clone(),
attachments: current_attachments.to_vec(),
..Message::runtime_defaults()
};
turns.push(crate::events::FetchedTurn {
turn_id: synthetic_turn_id,
turn_seq: synthetic_seq,
messages: vec![current_user],
terminal_status: None,
});
warn!(
session_id,
"current turn absent from fetch; injected in-process"
);
} else {
debug_assert!(
turns
.last()
.map(|t| t.turn_id.as_deref() == current_turn_id.as_deref())
.unwrap_or(false),
"turn-anchored fetch must end in the current turn on the happy path"
);
}
let current_turn = turns.pop().expect("at least the current turn is present");
struct ArchivedRender {
turn_id: Option<String>,
turn_seq: i64,
content_fp: String,
bytes: Vec<Value>,
cache_hit: bool,
cache_reason: &'static str,
}
let mut archived_renders: Vec<ArchivedRender> = Vec::with_capacity(turns.len());
{
let prev_cache = agent.turn_renders.read().await;
let session_cache = prev_cache.get(session_id);
for turn in &turns {
let terminal_state = TerminalState::from_task_status(turn.terminal_status);
let fp = super::turn_render_cache::content_fp(&turn.messages, terminal_state);
let prev = turn
.turn_id
.as_deref()
.and_then(|tid| session_cache.and_then(|c| c.get(tid)));
let render_fn = || {
super::turn_render::render_turn(
&turn.messages,
super::turn_render::RenderMode::Archived { terminal_state },
super::turn_render::RENDERER_VERSION,
&render_options,
)
};
let (bytes, hit, reason) = super::turn_render_cache::render_cache_decision(
prev,
&fp,
super::turn_render::RENDERER_VERSION,
"archived",
render_fn,
);
#[cfg(debug_assertions)]
if hit {
let fresh = super::turn_render::render_turn(
&turn.messages,
super::turn_render::RenderMode::Archived { terminal_state },
super::turn_render::RENDERER_VERSION,
&render_options,
);
assert_eq!(
fresh, bytes,
"archived render must be deterministic for turn {:?}",
turn.turn_id
);
}
archived_renders.push(ArchivedRender {
turn_id: turn.turn_id.clone(),
turn_seq: turn.turn_seq,
content_fp: fp,
bytes,
cache_hit: hit,
cache_reason: reason,
});
}
}
let rendered_for_plan: Vec<super::turn_eviction::RenderedTurn> = archived_renders
.iter()
.map(|r| super::turn_eviction::RenderedTurn {
turn_seq: r.turn_seq,
est_tokens: crate::memory::context_window::estimate_tokens(
&serde_json::to_string(&r.bytes).unwrap_or_default(),
),
})
.collect();
let plan = super::turn_eviction::plan_eviction(&rendered_for_plan, archived_budget);
if plan.degenerate {
warn!(
session_id,
iteration,
archived_budget,
core_tokens,
tool_tokens = tool_tokens_for_budget,
tail_tokens,
current_turn_reserve = CURRENT_TURN_RESERVE_TOKENS,
"Archived budget degenerate: non-evictable components exceed context; zero archived turns"
);
}
if plan.evicted_count > 0 {
let evicted: Vec<ArchivedRender> = archived_renders.drain(..plan.evicted_count).collect();
{
let mut anchors = agent.turn_anchors.write().await;
anchors.insert(session_id.to_string(), plan.new_anchor_turn_seq);
}
{
let mut cache = agent.turn_renders.write().await;
if let Some(session_cache) = cache.get_mut(session_id) {
for r in &evicted {
if let Some(tid) = r.turn_id.as_deref() {
session_cache.remove(tid);
}
}
}
}
info!(
session_id,
iteration,
new_anchor = plan.new_anchor_turn_seq,
turns_evicted = plan.evicted_count,
kept_est_tokens = plan.kept_est_tokens,
archived_kept = archived_renders.len(),
archived_budget,
"Window decision"
);
}
if plan.degenerate {
archived_renders.clear();
}
{
let mut cache = agent.turn_renders.write().await;
let session_cache = cache.entry(session_id.to_string()).or_default();
for r in &archived_renders {
if r.cache_hit {
tracing::debug!(
session_id,
turn_id = ?r.turn_id,
"archived render cache hit"
);
} else {
info!(
session_id,
turn_id = ?r.turn_id,
reason = r.cache_reason,
"archived render cache miss"
);
}
if let Some(tid) = r.turn_id.as_deref() {
session_cache.insert(
tid.to_string(),
super::turn_render_cache::CachedRender {
content_fp: r.content_fp.clone(),
renderer_version: super::turn_render::RENDERER_VERSION,
mode_tag: "archived".to_string(),
bytes: r.bytes.clone(),
},
);
}
}
}
let current_rendered = super::turn_render::render_turn(
¤t_turn.messages,
super::turn_render::RenderMode::Current,
super::turn_render::RENDERER_VERSION,
&render_options,
);
let mut messages: Vec<Value> = Vec::new();
for r in &archived_renders {
messages.extend(r.bytes.iter().cloned());
}
messages.extend(current_rendered);
fixup_message_ordering(&mut messages);
tracing::debug!(
session_id,
iteration,
stage = "turn_anchored_assembly",
archived_turns = archived_renders.len(),
pre_boundary_hash = %super::prefix_fingerprint::stage_pre_boundary_hash(&messages, user_text),
"Build stage pre-boundary fingerprint"
);
let execution_checkpoint = if iteration > 1 {
let current_interaction: Vec<&Message> = current_turn.messages.iter().collect();
build_execution_checkpoint_message(user_text, completed_tool_calls, ¤t_interaction)
} else {
None
};
{
let has_current_user_msg = messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").is_some_and(|content| {
crate::agent::vision::user_message_content_matches(content, user_text)
})
});
if !has_current_user_msg {
let turn_attachments = current_turn_user_attachments(¤t_turn, user_text);
let attachments: &[MessageAttachment] = if !turn_attachments.is_empty() {
&turn_attachments
} else {
current_attachments
};
let build = build_attachment_content(
user_text,
attachments,
RenderMode::Current,
&render_options.vision,
&render_options.audio,
model,
);
messages.push(json!({
"role": "user",
"content": build.content,
}));
}
}
{
let user_positions: Vec<usize> = messages
.iter()
.enumerate()
.filter(|(_, m)| m.get("role").and_then(|r| r.as_str()) == Some("user"))
.map(|(i, _)| i)
.collect();
if user_positions.len() >= 2 {
let current_pos = user_positions
.iter()
.copied()
.rev()
.find(|&pos| {
messages[pos].get("content").is_some_and(|content| {
crate::agent::vision::user_message_content_matches(content, user_text)
})
})
.or_else(|| user_positions.last().copied());
if let Some(current_pos) = current_pos {
let prev_user_content = user_positions
.iter()
.copied()
.filter(|&pos| pos != current_pos)
.rev()
.find_map(|pos| {
messages[pos]
.get("content")
.and_then(|c| c.as_str())
.map(|s| s.to_string())
});
let has_different_task =
prev_user_content.as_deref() != Some(user_text) && prev_user_content.is_some();
if has_different_task {
let marker = json!({
"role": "system",
"content": "[Current Task] The message below is the user's current request. \
Prior messages are conversation history for context."
});
messages.insert(current_pos, marker);
info!(
session_id,
iteration,
user_messages = user_positions.len(),
"Current task marker injected before current user message"
);
}
}
}
}
tracing::debug!(
session_id,
iteration,
stage = "current_task_marker",
pre_boundary_hash = %super::prefix_fingerprint::stage_pre_boundary_hash(&messages, user_text),
"Build stage pre-boundary fingerprint"
);
{
let current_task_pos = find_current_user_position(&messages, user_text);
if let Some(task_pos) = current_task_pos {
let chain_end = messages
.iter()
.enumerate()
.rev()
.find(|(i, m)| {
*i > task_pos
&& matches!(
m.get("role").and_then(|r| r.as_str()),
Some("assistant") | Some("tool")
)
})
.map(|(i, _)| i)
.unwrap_or(task_pos);
let stray_start = chain_end + 1;
if stray_start < messages.len() {
let stray_count = messages[stray_start..]
.iter()
.filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user"))
.count();
if stray_count > 0 {
messages.truncate(stray_start);
info!(
session_id,
iteration,
stray_user_messages = stray_count,
"Truncated stray messages after current task's tool chain"
);
}
}
}
}
let collapsed_tool_errors = super::loop_utils::collapse_repeated_tool_errors(&mut messages);
if collapsed_tool_errors > 0 {
info!(
session_id,
iteration,
collapsed_tool_errors,
reason = "repeated_tool_error_collapse",
"Prefix mutation"
);
}
tracing::debug!(
session_id,
iteration,
stage = "tool_error_collapse",
pre_boundary_hash = %super::prefix_fingerprint::stage_pre_boundary_hash(&messages, user_text),
"Build stage pre-boundary fingerprint"
);
if agent.context_window_config.enabled {
let system_tokens = crate::memory::context_window::estimate_tokens(core_prompt)
+ crate::memory::context_window::estimate_tokens(task_context_tail);
let model_budget = crate::memory::context_window::compute_available_budget_precomputed(
model,
system_tokens,
tool_defs,
&agent.context_window_config,
);
let policy_budget = policy_bundle.policy.context_budget;
if agent.policy_config.policy_shadow_mode && !agent.policy_config.policy_enforce {
info!(
session_id,
iteration, model_budget, policy_budget, "Context budget shadow comparison"
);
}
let effective_budget = if agent.policy_config.policy_enforce {
policy_budget.min(model_budget)
} else {
model_budget
};
let current_region_idx = find_current_user_position(&messages, user_text).unwrap_or(0);
let archived_prefix: Vec<Value> = messages[..current_region_idx].to_vec();
let current_region: Vec<Value> = messages[current_region_idx..].to_vec();
let archived_prefix_tokens = crate::memory::context_window::estimate_tokens(
&serde_json::to_string(&archived_prefix).unwrap_or_default(),
);
let current_budget = effective_budget.saturating_sub(archived_prefix_tokens);
let (fitted_current, dropped) =
crate::memory::context_window::fit_messages_with_source_quotas(
current_region,
current_budget,
);
messages = archived_prefix;
messages.extend(fitted_current);
if dropped > 0 {
info!(
session_id,
iteration,
dropped,
reason = "history_fitting",
"Prefix mutation"
);
}
}
tracing::debug!(
session_id,
iteration,
stage = "history_fitting",
pre_boundary_hash = %super::prefix_fingerprint::stage_pre_boundary_hash(&messages, user_text),
"Build stage pre-boundary fingerprint"
);
if empty_response_retry_pending && !is_trigger_session(session_id) {
let before = messages.len();
let rebuilt = build_empty_response_retry_messages(&messages, user_text);
let mutated = rebuilt != messages;
messages = rebuilt;
info!(
session_id,
iteration,
before,
after = messages.len(),
"Empty-response recovery: reduced history while preserving immediate parent context"
);
if mutated {
info!(
session_id,
iteration,
reason = "empty_response_retry",
"Prefix mutation"
);
}
}
if !task_context_tail.is_empty() {
let tail_insert_pos =
find_current_user_position(&messages, user_text).unwrap_or(messages.len());
messages.insert(
tail_insert_pos,
json!({
"role": "system",
"content": task_context_tail,
}),
);
}
messages.insert(
0,
json!({
"role": "system",
"content": core_prompt,
}),
);
tracing::debug!(
session_id,
iteration,
stage = "context_tail",
pre_boundary_hash = %super::prefix_fingerprint::stage_pre_boundary_hash(&messages, user_text),
"Build stage pre-boundary fingerprint"
);
if let Some(checkpoint) = execution_checkpoint {
messages.push(json!({
"role": "system",
"content": checkpoint,
}));
info!(
session_id,
iteration, "Injected execution checkpoint for in-progress task continuity"
);
}
tracing::debug!(
session_id,
iteration,
stage = "execution_checkpoint",
full_payload_hash = %super::prefix_fingerprint::hash_canonical(&serde_json::Value::Array(messages.clone())),
"Build stage tail fingerprint"
);
{
let non_system_non_user_count = messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
role != "system" && role != "user"
})
.count();
if non_system_non_user_count == 0 {
messages.retain(|m| {
m.get("role").and_then(|r| r.as_str()) != Some("user")
|| m.get("content")
.is_some_and(|content| user_message_content_matches(content, user_text))
});
pending_system_messages.push(SystemDirective::FreshConversationContext);
}
}
for directive in pending_system_messages.drain(..) {
messages.push(json!({
"role": "system",
"content": directive.render(),
}));
}
if empty_response_retry_pending && !is_trigger_session(session_id) {
messages.push(json!({
"role": "system",
"content": SystemDirective::EmptyResponseRetry.render()
}));
}
let messages_json = serde_json::to_string(&messages).unwrap_or_default();
if agent.context_window_config.enabled {
let message_tokens =
crate::memory::context_window::estimate_multimodal_message_tokens(&messages);
let final_tool_budget = total_context_budget.saturating_sub(
message_tokens + RESPONSE_RESERVE_TOKENS + TOKEN_ESTIMATE_SAFETY_MARGIN,
);
let final_tool_defs = crate::memory::context_window::fit_tool_definitions_to_budget(
original_tool_defs,
final_tool_budget,
);
if final_tool_defs != effective_tool_defs {
info!(
session_id,
iteration,
model,
message_tokens,
final_tool_budget,
before_tool_tokens = crate::memory::context_window::estimate_tool_definition_tokens(
&effective_tool_defs
),
after_tool_tokens = crate::memory::context_window::estimate_tool_definition_tokens(
&final_tool_defs
),
tool_count = final_tool_defs.len(),
"Recompacted tool schemas after final prompt assembly"
);
effective_tool_defs = final_tool_defs;
tool_defs = effective_tool_defs.as_slice();
}
}
if iteration > 1 {
send_status(status_tx, StatusUpdate::Thinking(iteration));
}
{
let summary: Vec<String> = messages
.iter()
.map(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("?");
let name = m.get("name").and_then(|n| n.as_str()).unwrap_or("");
let tc_id = m
.get("tool_call_id")
.and_then(|id| id.as_str())
.unwrap_or("");
let tc_count = m
.get("tool_calls")
.and_then(|v| v.as_array())
.map_or(0, |a| a.len());
if role == "tool" {
format!("tool({},tc_id={})", name, &tc_id[..tc_id.len().min(12)])
} else if tc_count > 0 {
format!("{}(tc={})", role, tc_count)
} else {
role.to_string()
}
})
.collect();
let est_msg_tokens = messages_json.len() / 4;
let est_tool_tokens =
crate::memory::context_window::estimate_tool_definition_tokens(tool_defs);
let est_total_tokens = est_msg_tokens + est_tool_tokens;
let est_msg_tokens_u64 = est_msg_tokens as u64;
let est_tool_tokens_u64 = est_tool_tokens as u64;
let est_total_tokens_u64 = est_total_tokens as u64;
let est_tool_share_bps = est_tool_tokens_u64
.saturating_mul(10_000)
.checked_div(est_total_tokens_u64)
.unwrap_or(0);
POLICY_METRICS
.est_input_token_samples
.fetch_add(1, Ordering::Relaxed);
POLICY_METRICS
.est_input_tokens_total
.fetch_add(est_total_tokens_u64, Ordering::Relaxed);
POLICY_METRICS
.est_msg_tokens_total
.fetch_add(est_msg_tokens_u64, Ordering::Relaxed);
POLICY_METRICS
.est_tool_tokens_total
.fetch_add(est_tool_tokens_u64, Ordering::Relaxed);
const HIGH_TOOL_SHARE_BPS: u64 = 3500; const HIGH_TOOL_TOKENS_ABS: u64 = 1_500; if est_tool_share_bps >= HIGH_TOOL_SHARE_BPS {
POLICY_METRICS
.est_tool_tokens_high_share_total
.fetch_add(1, Ordering::Relaxed);
}
if est_tool_tokens_u64 >= HIGH_TOOL_TOKENS_ABS {
POLICY_METRICS
.est_tool_tokens_high_abs_total
.fetch_add(1, Ordering::Relaxed);
}
info!(
session_id,
iteration,
est_input_tokens = est_total_tokens,
est_msg_tokens,
est_tool_tokens,
total_context_budget,
response_reserve_tokens = RESPONSE_RESERVE_TOKENS,
est_tool_share_pct = est_tool_share_bps as f64 / 100.0,
msg_count = messages.len(),
msgs = ?summary,
"Context before LLM call"
);
}
Agent::sort_tool_definitions_by_name(&mut effective_tool_defs);
let est_input_tokens = {
let est_msg_tokens = messages_json.len() / 4;
let est_tool_tokens =
crate::memory::context_window::estimate_tool_definition_tokens(&effective_tool_defs);
(est_msg_tokens + est_tool_tokens) as u32
};
Ok(MessageBuildData {
messages,
tool_defs: effective_tool_defs,
est_input_tokens,
})
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn msg(role: &str, content: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
session_id: "test-session".to_string(),
role: role.to_string(),
content: Some(content.to_string()),
tool_call_id: None,
tool_name: None,
tool_calls_json: None,
created_at: Utc::now(),
importance: 0.5,
..Message::runtime_defaults()
}
}
fn tool_msg(name: &str, content: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
session_id: "test-session".to_string(),
role: "tool".to_string(),
content: Some(content.to_string()),
tool_call_id: Some(format!("tool-call-{}", uuid::Uuid::new_v4())),
tool_name: Some(name.to_string()),
tool_calls_json: None,
created_at: Utc::now(),
importance: 0.5,
..Message::runtime_defaults()
}
}
use crate::events::{Event, EventStore, EventType};
async fn seed_turn(
store: &EventStore,
session: &str,
turn_id: &str,
user: &str,
tool: Option<(&str, &str)>,
final_assistant: &str,
status: &str,
) {
store
.append(Event::new(
session,
EventType::UserMessage,
json!({ "content": user, "turn_id": turn_id }),
))
.await
.expect("seed user_message");
if let Some((name, result)) = tool {
let call_id = format!("call-{turn_id}-{name}");
store
.append(Event::new(
session,
EventType::AssistantResponse,
json!({
"content": serde_json::Value::Null,
"tool_calls": [{ "id": call_id, "name": name, "arguments": "{}" }],
"turn_id": turn_id,
}),
))
.await
.expect("seed assistant tool-call");
store
.append(Event::new(
session,
EventType::ToolResult,
json!({
"tool_call_id": call_id,
"name": name,
"result": result,
"success": true,
"duration_ms": 1,
"turn_id": turn_id,
}),
))
.await
.expect("seed tool_result");
}
store
.append(Event::new(
session,
EventType::AssistantResponse,
json!({ "content": final_assistant, "turn_id": turn_id }),
))
.await
.expect("seed final assistant");
store
.append(Event::new(
session,
EventType::TaskEnd,
json!({ "status": status, "turn_id": turn_id }),
))
.await
.expect("seed task_end");
}
async fn seed_store(harness: &crate::testing::TestHarness) -> EventStore {
EventStore::new(harness.state.pool())
.await
.expect("sibling event store")
}
async fn set_current_turn(harness: &crate::testing::TestHarness, session: &str, turn_id: &str) {
harness
.agent
.current_turn_ids
.write()
.await
.insert(session.to_string(), turn_id.to_string());
}
#[test]
fn empty_retry_preserves_parent_pair_and_current_user() {
let messages = vec![
json!({"role": "user", "content": "can you clear cache using drush?"}),
json!({"role": "assistant", "content": "I can see updates available. Should I proceed with updating these?"}),
json!({"role": "user", "content": "yes, update them"}),
];
let recovered = build_empty_response_retry_messages(&messages, "yes, update them");
assert_eq!(recovered.len(), 3);
assert_eq!(recovered[0]["role"], "user");
assert_eq!(recovered[1]["role"], "assistant");
assert_eq!(recovered[2]["role"], "user");
assert_eq!(recovered[2]["content"].as_str(), Some("yes, update them"));
}
#[test]
fn empty_retry_falls_back_to_current_user_when_no_history() {
let messages = vec![json!({"role": "user", "content": "help"})];
let recovered = build_empty_response_retry_messages(&messages, "help");
assert_eq!(recovered.len(), 1);
assert_eq!(recovered[0]["role"], "user");
assert_eq!(recovered[0]["content"].as_str(), Some("help"));
}
#[tokio::test]
async fn later_iterations_include_execution_checkpoint_after_tool_progress() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
let store = seed_store(&harness).await;
let turn = "turn-checkpoint";
store
.append(Event::new(
"test-session",
EventType::UserMessage,
json!({ "content": "Find the system details and summarize them.", "turn_id": turn }),
))
.await
.expect("seed current user");
store
.append(Event::new(
"test-session",
EventType::AssistantResponse,
json!({
"content": serde_json::Value::Null,
"tool_calls": [{ "id": "call-sysinfo", "name": "system_info", "arguments": "{}" }],
"turn_id": turn,
}),
))
.await
.expect("seed assistant tool-call");
store
.append(Event::new(
"test-session",
EventType::ToolResult,
json!({
"tool_call_id": "call-sysinfo",
"name": "system_info",
"result": "OS: macOS 15.0\nMemory: 16 GB\nHostname: dev-machine",
"success": true,
"duration_ms": 1,
"turn_id": turn,
}),
))
.await
.expect("seed tool_result");
set_current_turn(&harness, "test-session", turn).await;
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let completed_tool_calls = vec!["system_info({})".to_string()];
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 2,
user_text: "Find the system details and summarize them.",
current_attachments: &[],
completed_tool_calls: &completed_tool_calls,
model: "mock-model",
core_prompt: "You are a helpful test assistant.",
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
serialized.contains("EXECUTION CHECKPOINT"),
"later iterations should carry a live execution checkpoint: {}",
serialized
);
assert!(
serialized.contains("Find the system details and summarize them."),
"checkpoint should restate the active request: {}",
serialized
);
assert!(
serialized.contains("system_info"),
"checkpoint should include completed tool/evidence context: {}",
serialized
);
assert!(
serialized.contains("Do NOT reset into a generic availability reply"),
"checkpoint should explicitly block idle reset replies: {}",
serialized
);
}
#[tokio::test]
async fn later_iterations_preserve_system_prompt_prefix_without_duplicate_guidance() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Inspect the repository."))
.await
.expect("append user");
let system_prompt =
"## Identity\nStable identity.\n\n## Tools\nVerbose tool guidance.\n\n## Behavior\nBe precise.";
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut first_pending_system_messages = Vec::new();
let mut first_ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Inspect the repository.",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: system_prompt,
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut first_pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let first = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut first_ctx,
)
.await
.expect("first message build");
let mut second_pending_system_messages = Vec::new();
let mut second_ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 2,
user_text: "Inspect the repository.",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: system_prompt,
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut second_pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let second = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut second_ctx,
)
.await
.expect("second message build");
let first_system = first.messages[0]["content"]
.as_str()
.expect("first system content");
let second_system = second.messages[0]["content"]
.as_str()
.expect("second system content");
assert_eq!(first_system, system_prompt);
assert_eq!(
second_system, first_system,
"message zero must remain byte-identical for prompt-cache reuse"
);
for (iteration, built) in [(1, &first), (2, &second)] {
assert!(
built.messages.iter().skip(1).all(|message| {
!message
.get("content")
.and_then(Value::as_str)
.is_some_and(|content| content.contains("Stable identity."))
}),
"iteration {iteration} must not duplicate the system prompt later in the request"
);
}
}
#[tokio::test]
async fn non_event_history_is_not_reconstructed_into_payload() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
use chrono::Duration as ChronoDuration;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
let old_time = Utc::now() - ChronoDuration::hours(3);
let old_user = Message {
created_at: old_time,
..msg("user", "Old stale question from 3 hours ago")
};
let old_assistant = Message {
created_at: old_time,
..msg("assistant", "Old stale answer from 3 hours ago")
};
harness
.state
.append_message(&old_user)
.await
.expect("append old user");
harness
.state
.append_message(&old_assistant)
.await
.expect("append old assistant");
harness
.state
.append_message(&msg("user", "Fresh question now"))
.await
.expect("append current user");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Fresh question now",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: "You are a helpful test assistant.",
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
!serialized.contains("Old stale question from 3 hours ago"),
"idle gap should reset window to 0, removing old stale pairs: {}",
serialized
);
assert!(
!serialized.contains("Old stale answer from 3 hours ago"),
"idle gap should reset window to 0, removing old stale assistant: {}",
serialized
);
assert!(
serialized.contains("Fresh question now"),
"current user message should always be present: {}",
serialized
);
}
#[tokio::test]
async fn session_summary_travels_inside_task_context_tail() {
use crate::agent::prefix_fingerprint::TASK_CONTEXT_TAIL_MARKER;
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Current question"))
.await
.expect("append user");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let tail = format!(
"{TASK_CONTEXT_TAIL_MARKER}\n\n[Session Summary]\nUser previously asked about deploying a blog. Config was created."
);
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Current question",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: "You are a helpful test assistant.",
task_context_tail: &tail,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let tail_msg = built.messages.iter().find(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("system")
&& m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.starts_with(TASK_CONTEXT_TAIL_MARKER))
});
let tail_content = tail_msg
.and_then(|m| m["content"].as_str())
.expect("tail message must be present");
assert!(
tail_content.contains("[Session Summary]"),
"summary must live inside the task context tail: {tail_content}"
);
assert!(
tail_content.contains("deploying a blog"),
"summary content must be present in the tail: {tail_content}"
);
let summary_only_messages = built
.messages
.iter()
.filter(|m| {
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.contains("[Session Summary]"))
})
.count();
assert_eq!(
summary_only_messages, 1,
"summary must appear exactly once (inside the tail), not as a separate message"
);
}
#[tokio::test]
async fn small_context_model_compacts_tool_schemas_without_dropping_tools() {
use crate::execution_policy::PolicyBundle;
use crate::memory::context_window::{estimate_tokens, estimate_tool_definition_tokens};
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let mut harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.agent
.context_window_config
.model_budgets
.insert("gemma-4-26b".to_string(), 16_384);
harness
.state
.append_message(&msg("user", "List all available tools."))
.await
.expect("append user");
let verbose =
"Detailed operational guidance for selecting and safely using this tool. ".repeat(300);
let tool_defs: Vec<Value> = (0..20)
.map(|idx| {
json!({
"type": "function",
"function": {
"name": format!("tool_{idx}"),
"description": verbose,
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": verbose
},
"mode": {
"type": "string",
"description": verbose,
"enum": ["read", "write"]
}
},
"required": ["path"],
"additionalProperties": false
}
}
})
})
.collect();
assert!(estimate_tool_definition_tokens(&tool_defs) > 16_384);
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "List all available tools.",
current_attachments: &[],
completed_tool_calls: &[],
model: "gemma-4-26b",
core_prompt: "You are a helpful test assistant.",
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
assert_eq!(built.tool_defs.len(), tool_defs.len());
let got_names: std::collections::HashSet<String> = built
.tool_defs
.iter()
.filter_map(|t| t["function"]["name"].as_str().map(str::to_string))
.collect();
let expected_names: std::collections::HashSet<String> = (0..tool_defs.len())
.map(|idx| format!("tool_{idx}"))
.collect();
assert_eq!(got_names, expected_names, "all tools must be preserved");
for tool in &built.tool_defs {
assert_eq!(
tool["function"]["parameters"]["properties"]["mode"]["enum"],
json!(["read", "write"])
);
}
let ordered_names: Vec<&str> = built
.tool_defs
.iter()
.filter_map(|t| t["function"]["name"].as_str())
.collect();
let mut sorted = ordered_names.clone();
sorted.sort();
assert_eq!(ordered_names, sorted, "tool order must be name-sorted");
let message_tokens =
estimate_tokens(&serde_json::to_string(&built.messages).expect("serialize messages"));
let tool_tokens = estimate_tool_definition_tokens(&built.tool_defs);
assert!(
message_tokens + tool_tokens + 1_536 <= 16_384,
"request estimate should fit Gemma context: messages={message_tokens}, tools={tool_tokens}"
);
}
#[tokio::test]
async fn small_context_model_rechecks_budget_after_final_prompt_assembly() {
use crate::execution_policy::PolicyBundle;
use crate::memory::context_window::{estimate_tokens, estimate_tool_definition_tokens};
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let mut harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.agent
.context_window_config
.model_budgets
.insert("gemma-4-26b".to_string(), 16_384);
harness
.state
.append_message(&msg("user", "Can you test your tools?"))
.await
.expect("append user");
harness
.state
.append_message(&tool_msg(
"system_info",
"OS: macOS\nMemory: 64 GB\nHostname: workstation",
))
.await
.expect("append tool");
let verbose = "Detailed parameter guidance for local agent tool execution. ".repeat(40);
let tool_defs: Vec<Value> = (0..38)
.map(|idx| {
let properties: serde_json::Map<String, Value> = (0..8)
.map(|prop_idx| {
(
format!("parameter_{prop_idx}"),
json!({
"type": "string",
"description": verbose,
"enum": ["one", "two", "three"]
}),
)
})
.collect();
json!({
"type": "function",
"function": {
"name": format!("tool_{idx}"),
"description": verbose,
"parameters": {
"type": "object",
"properties": properties,
"required": ["parameter_0"],
"additionalProperties": false
}
}
})
})
.collect();
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let mut pending_system_messages = vec![SystemDirective::FreshConversationContext];
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let completed_tool_calls = vec!["system_info({})".to_string()];
let system_prompt = "Root agent operating guidance and tool policy. ".repeat(650);
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 2,
user_text: "Can you test your tools?",
current_attachments: &[],
completed_tool_calls: &completed_tool_calls,
model: "gemma-4-26b",
core_prompt: &system_prompt,
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
assert_eq!(built.tool_defs.len(), tool_defs.len());
let message_tokens =
estimate_tokens(&serde_json::to_string(&built.messages).expect("serialize messages"));
let tool_tokens = estimate_tool_definition_tokens(&built.tool_defs);
assert!(
message_tokens + tool_tokens + RESPONSE_RESERVE_TOKENS <= 16_384,
"final assembled request should fit: messages={message_tokens}, tools={tool_tokens}"
);
}
#[tokio::test]
async fn tail_precedes_current_turn_and_summary_lives_only_in_tail() {
use crate::agent::prefix_fingerprint::TASK_CONTEXT_TAIL_MARKER;
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Old question"))
.await
.expect("append old user");
harness
.state
.append_message(&msg("assistant", "Old answer"))
.await
.expect("append old assistant");
harness
.state
.append_message(&msg("user", "Current question"))
.await
.expect("append current user");
let core = "You are aidaemon. CORE PROMPT BODY.";
let tail = format!(
"{TASK_CONTEXT_TAIL_MARKER}\n\n[Session Summary]\nUser deploying a blog.\n\n[Current Date & Time]\nMonday"
);
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Current question",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: core,
task_context_tail: &tail,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
assert_eq!(
built.messages[0]["content"].as_str(),
Some(core),
"message zero must be the core prompt bytes with no volatile suffix"
);
let tail_positions: Vec<usize> = built
.messages
.iter()
.enumerate()
.filter(|(_, m)| {
m.get("role").and_then(|r| r.as_str()) == Some("system")
&& m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.starts_with(TASK_CONTEXT_TAIL_MARKER))
})
.map(|(i, _)| i)
.collect();
assert_eq!(tail_positions.len(), 1, "exactly one tail message expected");
let tail_pos = tail_positions[0];
let current_user_pos = built
.messages
.iter()
.rposition(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()) == Some("Current question")
})
.expect("current user message present");
assert_eq!(
tail_pos + 1,
current_user_pos,
"tail must sit immediately before the current user message (boundary − 1)"
);
assert!(
!built.messages[1]["content"]
.as_str()
.unwrap_or("")
.starts_with("[Session Summary]"),
"index 1 must not be a standalone session-summary message"
);
let summary_msgs = built
.messages
.iter()
.filter(|m| {
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.contains("[Session Summary]"))
})
.count();
assert_eq!(
summary_msgs, 1,
"summary appears exactly once, inside the tail"
);
}
#[tokio::test]
async fn within_task_tail_reuse_is_byte_identical() {
use crate::agent::prefix_fingerprint::TASK_CONTEXT_TAIL_MARKER;
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Same task"))
.await
.expect("append user");
let core = "CORE";
let tail = format!("{TASK_CONTEXT_TAIL_MARKER}\n\n[Current Date & Time]\nFixed timestamp");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let extract_tail = |built: &MessageBuildData| -> String {
built
.messages
.iter()
.find(|m| {
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.starts_with(TASK_CONTEXT_TAIL_MARKER))
})
.and_then(|m| m["content"].as_str())
.expect("tail present")
.to_string()
};
let mut p1 = Vec::new();
let mut ctx1 = MessageBuildCtx {
session_id: "reuse-session",
iteration: 1,
user_text: "Same task",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: core,
task_context_tail: &tail,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut p1,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built1 = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx1,
)
.await
.expect("build 1");
let mut p2 = Vec::new();
let mut ctx2 = MessageBuildCtx {
session_id: "reuse-session",
iteration: 2,
user_text: "Same task",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: core,
task_context_tail: &tail,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut p2,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built2 = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx2,
)
.await
.expect("build 2");
assert_eq!(
extract_tail(&built1),
extract_tail(&built2),
"tail must be byte-identical across within-task iterations"
);
}
#[tokio::test]
async fn final_tool_order_is_name_sorted_after_mutations() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::MessageStore;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Do work"))
.await
.expect("append user");
let tool_defs: Vec<Value> = ["zebra_tool", "alpha_tool", "mango_tool"]
.iter()
.map(|name| {
json!({
"type": "function",
"function": {
"name": name,
"description": "x",
"parameters": {"type": "object", "properties": {}, "additionalProperties": false}
}
})
})
.collect();
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let mut pending = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "sort-session",
iteration: 1,
user_text: "Do work",
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: "CORE",
task_context_tail: "",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("build");
let names: Vec<&str> = built
.tool_defs
.iter()
.filter_map(|d| d["function"]["name"].as_str())
.collect();
assert_eq!(
names,
vec!["alpha_tool", "mango_tool", "zebra_tool"],
"final tool_defs must be name-sorted"
);
}
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider, TestHarness};
async fn build_payload(
harness: &TestHarness,
session: &str,
user_text: &str,
iteration: usize,
) -> Vec<Value> {
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let tool_defs: Vec<Value> = Vec::new();
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: session,
iteration,
user_text,
current_attachments: &[],
completed_tool_calls: &[],
model: "mock-model",
core_prompt: "CORE-PROMPT-BYTES",
task_context_tail: "[Task Context] tail",
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build")
.messages
}
fn count_occurrences(messages: &[Value], needle: &str) -> usize {
serde_json::to_string(messages)
.unwrap_or_default()
.matches(needle)
.count()
}
#[tokio::test]
async fn archived_turns_are_whole_and_in_archived_form() {
use crate::agent::prefix_fingerprint::TASK_CONTEXT_TAIL_MARKER;
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s1",
"t1",
"first question",
Some(("terminal", "exit_code: 0")),
"first answer",
"completed",
)
.await;
seed_turn(
&store,
"s1",
"t2",
"second question",
Some(("read_file", "10 lines")),
"second answer",
"completed",
)
.await;
seed_turn(
&store,
"s1",
"t3",
"third question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s1", "t3").await;
let messages = build_payload(&harness, "s1", "third question", 1).await;
let serialized = serde_json::to_string(&messages).unwrap();
assert_eq!(messages[0]["content"].as_str(), Some("CORE-PROMPT-BYTES"));
assert!(serialized.contains("first question"), "{serialized}");
assert!(serialized.contains("second question"), "{serialized}");
assert!(
messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("tool")
&& m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|c| c.starts_with("terminal:"))
}),
"archived terminal result should be summarized: {serialized}"
);
let tail_pos = messages.iter().position(|m| {
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|c| c.starts_with(TASK_CONTEXT_TAIL_MARKER))
});
let tail_pos = tail_pos.expect("tail present");
let first_q_pos = messages
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some("first question"))
.expect("first archived user present");
assert!(
first_q_pos < tail_pos,
"archived turns must precede the tail"
);
}
#[tokio::test]
async fn cross_turn_archived_render_is_byte_stable() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s2",
"t1",
"marker-ONE question",
Some(("terminal", "exit_code: 0")),
"answer one",
"completed",
)
.await;
seed_turn(
&store,
"s2",
"t2",
"marker-TWO question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s2", "t2").await;
let build_a = build_payload(&harness, "s2", "marker-TWO question", 1).await;
store
.append(Event::new(
"s2",
EventType::TaskEnd,
json!({"status":"completed","turn_id":"t2"}),
))
.await
.unwrap();
seed_turn(
&store,
"s2",
"t3",
"marker-THREE question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s2", "t3").await;
let build_b = build_payload(&harness, "s2", "marker-THREE question", 1).await;
let extract_t1 = |msgs: &[Value]| -> Vec<Value> {
let end = msgs
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some("answer one"))
.expect("answer one present");
msgs[..=end].to_vec()
};
assert_eq!(
extract_t1(&build_a),
extract_t1(&build_b),
"archived turn 1 must be byte-identical across builds (render-cache hit)"
);
}
#[tokio::test]
async fn current_turn_is_full_not_summarized() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
let long_user = "DETAILED current request ".repeat(8);
store
.append(Event::new(
"s3",
EventType::UserMessage,
json!({"content": long_user, "turn_id":"tc"}),
))
.await
.unwrap();
set_current_turn(&harness, "s3", "tc").await;
let messages = build_payload(&harness, "s3", &long_user, 1).await;
assert!(
messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()) == Some(long_user.as_str())
}),
"current user message must be full/verbatim"
);
}
#[tokio::test]
async fn tail_and_core_positions_preserved() {
use crate::agent::prefix_fingerprint::TASK_CONTEXT_TAIL_MARKER;
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s4",
"t1",
"old question",
None,
"old answer",
"completed",
)
.await;
seed_turn(
&store,
"s4",
"t2",
"current question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s4", "t2").await;
let messages = build_payload(&harness, "s4", "current question", 1).await;
assert_eq!(messages[0]["content"].as_str(), Some("CORE-PROMPT-BYTES"));
let tail_positions: Vec<usize> = messages
.iter()
.enumerate()
.filter(|(_, m)| {
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|c| c.starts_with(TASK_CONTEXT_TAIL_MARKER))
})
.map(|(i, _)| i)
.collect();
assert_eq!(tail_positions.len(), 1, "exactly one tail marker");
let tail_idx = tail_positions[0];
assert_eq!(
messages[tail_idx + 1]["role"].as_str(),
Some("user"),
"tail must sit at boundary − 1 (immediately before current user)"
);
assert_eq!(
messages[tail_idx + 1]["content"].as_str(),
Some("current question")
);
}
#[tokio::test]
async fn eviction_advances_anchor_and_evicts_whole_turns() {
let mut harness = setup_test_agent(MockProvider::new()).await.unwrap();
harness.agent.context_window_config.default_budget = 5900;
let store = seed_store(&harness).await;
let big = "lots of detail ".repeat(20);
for i in 1..=4 {
seed_turn(
&store,
"s5",
&format!("t{i}"),
&format!("OLDMARK-{i} {big}"),
None,
&format!("answer {i}"),
"completed",
)
.await;
}
seed_turn(
&store,
"s5",
"tcur",
"current request",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s5", "tcur").await;
let anchor_before = harness.agent.turn_anchors.read().await.get("s5").copied();
let messages = build_payload(&harness, "s5", "current request", 1).await;
let anchor_after = harness.agent.turn_anchors.read().await.get("s5").copied();
assert!(anchor_after.is_some(), "anchor recorded");
assert!(
anchor_after != anchor_before || anchor_before.is_none(),
"eviction should advance the anchor"
);
let serialized = serde_json::to_string(&messages).unwrap();
assert!(
!serialized.contains("OLDMARK-1"),
"oldest whole turn should be evicted: {serialized}"
);
assert!(serialized.contains("current request"));
}
#[tokio::test]
async fn late_write_re_renders_only_the_affected_turn() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s6",
"t1",
"alpha question",
Some(("terminal", "exit_code: 0")),
"alpha answer",
"completed",
)
.await;
seed_turn(
&store,
"s6",
"t2",
"beta question",
Some(("read_file", "5 lines")),
"beta answer",
"completed",
)
.await;
seed_turn(
&store,
"s6",
"t3",
"current question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s6", "t3").await;
let first = build_payload(&harness, "s6", "current question", 1).await;
store
.append(Event::new(
"s6",
EventType::AssistantResponse,
json!({
"content": serde_json::Value::Null,
"tool_calls": [{ "id": "late-call", "name": "web_search", "arguments": "{}" }],
"turn_id": "t1",
}),
))
.await
.unwrap();
store
.append(Event::new(
"s6",
EventType::ToolResult,
json!({
"tool_call_id": "late-call",
"name": "web_search",
"result": "LATEWRITE-marker",
"success": true,
"duration_ms": 1,
"turn_id": "t1",
}),
))
.await
.unwrap();
let second = build_payload(&harness, "s6", "current question", 1).await;
let extract_region = |msgs: &[Value], start_marker: &str, end_marker: &str| -> Vec<Value> {
let s = msgs
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some(start_marker))
.expect("start marker");
let e = msgs
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some(end_marker))
.expect("end marker");
msgs[s..=e].to_vec()
};
assert_eq!(
extract_region(&first, "beta question", "beta answer"),
extract_region(&second, "beta question", "beta answer"),
"unaffected archived turn t2 must stay byte-identical"
);
let extract_t1 = |msgs: &[Value]| -> Vec<Value> {
let s = msgs
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some("alpha question"))
.expect("alpha question");
let e = msgs
.iter()
.position(|m| m.get("content").and_then(|c| c.as_str()) == Some("beta question"))
.expect("beta question");
msgs[s..e].to_vec()
};
let first_t1 = extract_t1(&first);
let second_t1 = extract_t1(&second);
assert_ne!(
first_t1, second_t1,
"affected archived turn t1 must be re-rendered after the late write"
);
assert!(
second_t1.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("tool")
&& m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|c| c.starts_with("web_search:"))
}),
"late web_search step must appear (summarized) in the re-rendered turn"
);
}
#[tokio::test]
async fn no_synthetic_user_and_no_age_collapse_on_happy_path() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s7",
"t1",
"current question",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s7", "t1").await;
let messages = build_payload(&harness, "s7", "current question", 1).await;
let serialized = serde_json::to_string(&messages).unwrap();
assert!(
!serialized.contains("synthetic-current-user-"),
"no synthetic user id on the happy path: {serialized}"
);
assert!(!serialized.contains("age_collapse"));
}
#[tokio::test]
async fn current_turn_fallback_injects_when_row_absent() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
seed_turn(
&store,
"s8",
"t1",
"old question",
None,
"old answer",
"completed",
)
.await;
set_current_turn(&harness, "s8", "t-not-committed").await;
let messages = build_payload(&harness, "s8", "RACE current text", 1).await;
let last_user = messages
.iter()
.rev()
.find(|m| m.get("role").and_then(|r| r.as_str()) == Some("user"));
assert!(
last_user.is_some_and(
|m| m.get("content").and_then(|c| c.as_str()) == Some("RACE current text")
),
"fallback must inject the current user message: {:?}",
serde_json::to_string(&messages)
);
}
#[tokio::test]
async fn render_cache_prunes_evicted_turns() {
let mut harness = setup_test_agent(MockProvider::new()).await.unwrap();
harness.agent.context_window_config.default_budget = 5900;
let store = seed_store(&harness).await;
let big = "detail ".repeat(20);
for i in 1..=4 {
seed_turn(
&store,
"s9",
&format!("t{i}"),
&format!("turn {i} {big}"),
None,
&format!("answer {i}"),
"completed",
)
.await;
}
seed_turn(
&store,
"s9",
"tcur",
"current request",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s9", "tcur").await;
let _ = build_payload(&harness, "s9", "current request", 1).await;
let cache = harness.agent.turn_renders.read().await;
let session_cache = cache.get("s9");
if let Some(sc) = session_cache {
assert!(
!sc.contains_key("t1"),
"evicted turn t1 must be pruned from the render cache"
);
}
let anchor = harness.agent.turn_anchors.read().await.get("s9").copied();
assert!(anchor.is_some());
}
#[tokio::test]
async fn no_duplicate_pinned_history_path() {
let harness = setup_test_agent(MockProvider::new()).await.unwrap();
let store = seed_store(&harness).await;
for i in 1..=12 {
seed_turn(
&store,
"s10",
&format!("t{i}"),
&format!("UNIQUEMARK-{i:02}-X"),
None,
&format!("reply-{i}"),
"completed",
)
.await;
}
seed_turn(
&store,
"s10",
"tcur",
"current request UNIQUEMARK-CUR",
None,
"",
"in_progress",
)
.await;
set_current_turn(&harness, "s10", "tcur").await;
let messages = build_payload(&harness, "s10", "current request UNIQUEMARK-CUR", 1).await;
for i in 1..=12 {
let needle = format!("UNIQUEMARK-{i:02}-X");
let occ = count_occurrences(&messages, &needle);
assert!(
occ <= 1,
"marker {needle} must occur at most once, got {occ}"
);
}
let anchor = harness
.agent
.turn_anchors
.read()
.await
.get("s10")
.copied()
.expect("anchor recorded");
let store2 = seed_store(&harness).await;
let turns = store2.get_turns_from_anchor("s10", anchor).await.unwrap();
let serialized = serde_json::to_string(&messages).unwrap();
for i in 1..=12 {
let needle = format!("UNIQUEMARK-{i:02}-X");
if serialized.contains(&needle) {
let belongs = turns.iter().any(|t| {
t.turn_seq >= anchor
&& t.messages
.iter()
.any(|m| m.content.as_deref().is_some_and(|c| c.contains(&needle)))
});
assert!(
belongs,
"emitted marker {needle} must belong to a turn >= anchor"
);
}
}
let _assert_no_pinned = |c: &MessageBuildCtx| {
let MessageBuildCtx {
session_id: _,
iteration: _,
user_text: _,
current_attachments: _,
completed_tool_calls: _,
model: _,
core_prompt: _,
task_context_tail: _,
tool_defs: _,
policy_bundle: _,
pending_system_messages: _,
empty_response_retry_pending: _,
status_tx: _,
} = c;
};
}
#[test]
fn fresh_context_isolation_preserves_multimodal_user_message() {
let user_text = "What do you see here?";
let mut messages = vec![
json!({"role": "system", "content": "core"}),
json!({
"role": "user",
"content": [
{"type": "text", "text": user_text},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}}
]
}),
];
let non_system_non_user_count = messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
role != "system" && role != "user"
})
.count();
assert_eq!(non_system_non_user_count, 0);
messages.retain(|m| {
m.get("role").and_then(|r| r.as_str()) != Some("user")
|| m.get("content")
.is_some_and(|content| user_message_content_matches(content, user_text))
});
assert_eq!(messages.len(), 2);
assert!(
messages
.iter()
.any(|m| m.get("role").and_then(|r| r.as_str()) == Some("user")),
"multimodal user message must survive fresh-context retain"
);
}
}