use super::recall_guardrails::text_relates_to_critical_identity;
use super::*;
use crate::execution_policy::PolicyBundle;
use crate::traits::ConversationSummary;
pub(super) struct MessageBuildCtx<'a> {
pub session_id: &'a str,
pub iteration: usize,
pub user_text: &'a str,
pub completed_tool_calls: &'a [String],
pub model: &'a str,
pub system_prompt: &'a str,
pub pinned_memories: &'a [Message],
pub tool_defs: &'a [Value],
pub policy_bundle: &'a PolicyBundle,
pub session_summary: &'a Option<ConversationSummary>,
pub pending_system_messages: &'a mut Vec<SystemDirective>,
pub empty_response_retry_pending: bool,
pub status_tx: &'a Option<mpsc::Sender<StatusUpdate>>,
}
pub(super) struct MessageBuildData {
pub messages: Vec<Value>,
}
const EMPTY_RETRY_MAX_PARENT_CHARS: usize = 800;
const EXECUTION_CHECKPOINT_MAX_REQUEST_CHARS: usize = 240;
const EXECUTION_CHECKPOINT_MAX_ACTIVITY_CHARS: usize = 900;
const EXECUTION_CHECKPOINT_MAX_EVIDENCE_CHARS: usize = 500;
fn trimmed_message_content(message: &Value) -> Option<String> {
message
.get("content")
.and_then(|c| c.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
}
fn truncate_parent_for_empty_retry(content: &str) -> String {
let mut out: String = content.chars().take(EMPTY_RETRY_MAX_PARENT_CHARS).collect();
if content.chars().count() > EMPTY_RETRY_MAX_PARENT_CHARS {
out.push_str("...");
}
out
}
fn build_empty_response_retry_messages(existing: &[Value], user_text: &str) -> Vec<Value> {
let current_idx = existing.iter().rposition(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()) == Some(user_text)
});
let search_end = current_idx.unwrap_or(existing.len());
let prev_assistant = existing
.iter()
.take(search_end)
.rev()
.find(|m| m.get("role").and_then(|r| r.as_str()) == Some("assistant"))
.and_then(trimmed_message_content);
let prev_user = existing
.iter()
.take(search_end)
.rev()
.find(|m| {
if m.get("role").and_then(|r| r.as_str()) != Some("user") {
return false;
}
m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|content| content != user_text && !content.trim().is_empty())
})
.and_then(trimmed_message_content);
let mut recovered = Vec::new();
if let Some(prev_user) = prev_user {
recovered.push(json!({
"role": "user",
"content": truncate_parent_for_empty_retry(&prev_user),
}));
}
if let Some(prev_assistant) = prev_assistant {
recovered.push(json!({
"role": "assistant",
"content": truncate_parent_for_empty_retry(&prev_assistant),
}));
}
recovered.push(json!({
"role": "user",
"content": user_text,
}));
recovered
}
fn tool_is_low_info_for_checkpoint(tool_name: &str) -> bool {
matches!(
tool_name,
"write_file"
| "edit_file"
| "manage_memories"
| "manage_people"
| "remember_fact"
| "check_environment"
)
}
fn build_execution_checkpoint_message(
user_text: &str,
completed_tool_calls: &[String],
current_interaction: &[&Message],
) -> Option<String> {
let trimmed_user = user_text.trim();
if trimmed_user.is_empty() || completed_tool_calls.is_empty() {
return None;
}
let activity = super::post_task::categorize_tool_calls(completed_tool_calls);
let latest_evidence = current_interaction.iter().rev().find_map(|message| {
if message.role != "tool" {
return None;
}
let tool_name = message.tool_name.as_deref().unwrap_or("").trim();
if tool_name.is_empty() || tool_is_low_info_for_checkpoint(tool_name) {
return None;
}
let content = message.primary_content()?;
let content = content.trim();
if content.is_empty() {
return None;
}
Some(format!(
"- {}: {}",
tool_name,
truncate_for_resume(content, EXECUTION_CHECKPOINT_MAX_EVIDENCE_CHARS)
))
});
let mut lines = vec![
"[SYSTEM] EXECUTION CHECKPOINT: You are still working on the same active request from this turn.".to_string(),
format!(
"Active request: {}",
truncate_for_resume(trimmed_user, EXECUTION_CHECKPOINT_MAX_REQUEST_CHARS)
),
];
if !activity.trim().is_empty() {
lines.push("Completed work so far:".to_string());
lines.push(truncate_for_resume(
activity.trim(),
EXECUTION_CHECKPOINT_MAX_ACTIVITY_CHARS,
));
}
if let Some(evidence) = latest_evidence {
lines.push("Latest concrete evidence:".to_string());
lines.push(evidence);
}
lines.push("Continue from this checkpoint. Do NOT reset into a generic availability reply or ask what the user wants help with. Either take the next step for this request, answer with concrete results if it is complete, or state the blocker tied to this request.".to_string());
Some(lines.join("\n"))
}
pub(super) async fn run_message_build_phase(
services: &super::services::AgentServices<'_>,
ctx: &mut MessageBuildCtx<'_>,
) -> anyhow::Result<MessageBuildData> {
let agent = services.agent;
let session_id = ctx.session_id;
let iteration = ctx.iteration;
let user_text = ctx.user_text;
let completed_tool_calls = ctx.completed_tool_calls;
let model = ctx.model;
let system_prompt = ctx.system_prompt;
let pinned_memories = ctx.pinned_memories;
let tool_defs = ctx.tool_defs;
let policy_bundle = ctx.policy_bundle;
let session_summary = ctx.session_summary;
let pending_system_messages = &mut *ctx.pending_system_messages;
let empty_response_retry_pending = ctx.empty_response_retry_pending;
let status_tx = ctx.status_tx;
let history_limit = 40_usize.max(iteration.saturating_mul(3).min(120));
let mut recent_history = agent.load_recent_history(session_id, history_limit).await?;
let last_user_msg = recent_history.iter().rev().find(|m| m.role == "user");
let user_msg_present = last_user_msg.is_some_and(|m| m.content.as_deref() == Some(user_text));
if !user_msg_present && !user_text.is_empty() {
let synthetic_turn_id = agent.current_turn_ids.read().await.get(session_id).cloned();
recent_history.push(Message {
id: format!("synthetic-user-{}", uuid::Uuid::new_v4()),
session_id: session_id.to_string(),
role: "user".to_string(),
content: Some(user_text.to_string()),
tool_call_id: None,
tool_name: None,
tool_calls_json: None,
created_at: chrono::Utc::now(),
importance: 1.0,
turn_id: synthetic_turn_id,
..Message::runtime_defaults()
});
info!(
session_id,
iteration, "Injected current user message into history (was outside event window)"
);
}
let mut seen_ids: std::collections::HashSet<&String> = std::collections::HashSet::new();
let deduped_msgs: Vec<&Message> = pinned_memories
.iter()
.chain(recent_history.iter())
.filter(|m| seen_ids.insert(&m.id))
.collect();
let identity_preserve_indices: std::collections::HashSet<usize> = deduped_msgs
.iter()
.enumerate()
.filter_map(|(idx, msg)| {
let content = msg.content.as_deref()?;
if text_relates_to_critical_identity(content) {
Some(idx)
} else {
None
}
})
.flat_map(|idx| {
let start = idx.saturating_sub(1);
let end = (idx + 2).min(deduped_msgs.len().saturating_sub(1));
start..=end
})
.collect();
let current_turn_id: Option<String> =
agent.current_turn_ids.read().await.get(session_id).cloned();
let last_user_pos: Option<usize> = current_turn_id
.as_deref()
.and_then(|tid| {
deduped_msgs
.iter()
.position(|m| m.role == "user" && m.turn_id.as_deref() == Some(tid))
})
.or_else(|| {
deduped_msgs
.iter()
.rposition(|m| m.role == "user" && m.content.as_deref() == Some(user_text))
});
if last_user_pos.is_none() {
warn!(
session_id,
iteration,
total = deduped_msgs.len(),
"Collapse boundary: last_user_pos=None (should be rare after synthetic injection)"
);
}
let pre_collapse_len = deduped_msgs.len();
let prior_1_start: Option<usize> = last_user_pos.and_then(|boundary| {
deduped_msgs[..boundary]
.iter()
.rposition(|m| m.role == "user")
});
let prior_1_tool_ids: std::collections::HashSet<String> =
if let (Some(p1_start), Some(boundary)) = (prior_1_start, last_user_pos) {
deduped_msgs[p1_start..boundary]
.iter()
.filter(|m| m.role == "tool")
.map(|m| m.id.clone())
.collect()
} else {
std::collections::HashSet::new()
};
let deduped_msgs: Vec<&Message> = if let Some(boundary) = last_user_pos {
let p1 = prior_1_start.unwrap_or(boundary);
deduped_msgs
.into_iter()
.enumerate()
.filter(|(i, m)| {
if *i >= boundary {
true } else if *i >= p1 {
true
} else {
m.role != "tool" || identity_preserve_indices.contains(i)
}
})
.map(|(_, m)| m)
.collect()
} else {
const KEEP_RECENT_TOOL_RESULTS: usize = 8;
let tool_positions: Vec<usize> = deduped_msgs
.iter()
.enumerate()
.filter(|(_, m)| m.role == "tool")
.map(|(i, _)| i)
.collect();
let protect_from = if tool_positions.len() > KEEP_RECENT_TOOL_RESULTS {
tool_positions[tool_positions.len() - KEEP_RECENT_TOOL_RESULTS]
} else {
0
};
warn!(
session_id,
iteration,
total_tool_results = tool_positions.len(),
protect_from,
"Current user message not in history — using safe collapse (keeping recent tool results)"
);
deduped_msgs
.into_iter()
.enumerate()
.filter(|(i, m)| {
m.role != "tool" || *i >= protect_from || identity_preserve_indices.contains(i)
})
.map(|(_, m)| m)
.collect()
};
let collapsed = pre_collapse_len.saturating_sub(deduped_msgs.len());
if collapsed > 0 || !prior_1_tool_ids.is_empty() {
info!(
session_id,
dropped = collapsed,
summarized = prior_1_tool_ids.len(),
"Age-based tool result clearing: dropped Prior 2+ results, summarizing Prior 1 results"
);
}
let collapse_boundary = deduped_msgs
.iter()
.rposition(|m| m.role == "user" && m.content.as_deref() == Some(user_text))
.or_else(|| deduped_msgs.iter().rposition(|m| m.role == "user"));
let deduped_msgs: Vec<&Message> = if let Some(boundary) = collapse_boundary {
use crate::memory::context_window::estimate_tokens;
let old_user_positions: Vec<usize> = deduped_msgs
.iter()
.enumerate()
.filter(|(i, m)| *i < boundary && m.role == "user")
.map(|(i, _)| i)
.collect();
if old_user_positions.is_empty() {
deduped_msgs
} else {
let skeleton_pairs: Vec<(usize, usize)> = old_user_positions
.iter()
.enumerate()
.map(|(pair_idx, &user_pos)| {
let pair_end = if pair_idx + 1 < old_user_positions.len() {
old_user_positions[pair_idx + 1]
} else {
boundary
};
let user_tokens =
estimate_tokens(deduped_msgs[user_pos].content.as_deref().unwrap_or(""));
let assistant_tokens: usize = deduped_msgs[user_pos + 1..pair_end]
.iter()
.filter(|m| m.role == "assistant")
.map(|m| estimate_tokens(m.content.as_deref().unwrap_or("")))
.sum();
(user_tokens, assistant_tokens)
})
.collect();
let system_tokens = estimate_tokens(system_prompt);
let tools_json = serde_json::to_string(tool_defs).unwrap_or_default();
let tools_tokens = estimate_tokens(&tools_json);
let pinned_tokens: usize = pinned_memories
.iter()
.map(|m| estimate_tokens(m.content.as_deref().unwrap_or("")))
.sum();
let model_budget = 128_000usize;
let available_budget =
model_budget.saturating_sub(system_tokens + tools_tokens + pinned_tokens);
let computed_window_size =
super::sliding_window::calculate_window_size(&skeleton_pairs, available_budget);
let idle_gap_detected = boundary > 0
&& deduped_msgs
.get(boundary.saturating_sub(1))
.is_some_and(|m| {
let now = chrono::Utc::now();
now.signed_duration_since(m.created_at).num_seconds() > 7200
});
let window_size = if idle_gap_detected {
info!(
session_id,
"Idle gap detected (>2h): resetting sliding window to 0"
);
0
} else {
computed_window_size
};
let keep_from = if window_size == 0 {
boundary
} else if old_user_positions.len() > window_size {
old_user_positions[old_user_positions.len() - window_size]
} else {
0
};
let trimmed: Vec<&Message> = deduped_msgs
.into_iter()
.enumerate()
.filter(|(i, _)| *i >= keep_from || identity_preserve_indices.contains(i))
.map(|(_, m)| m)
.collect();
if trimmed.len() < pre_collapse_len {
info!(
session_id,
old_pairs_trimmed = pre_collapse_len - trimmed.len(),
window_size,
available_budget,
"Adaptive sliding window: trimmed old conversation pairs"
);
}
trimmed
}
} else {
deduped_msgs
};
let deduped_msgs: Vec<&Message> = {
let boundary = deduped_msgs
.iter()
.rposition(|m| m.role == "user" && m.content.as_deref() == Some(user_text))
.or_else(|| deduped_msgs.iter().rposition(|m| m.role == "user"));
if let Some(boundary) = boundary {
let mut skip_indices = std::collections::HashSet::new();
for (i, m) in deduped_msgs.iter().enumerate() {
if i < boundary && m.role == "user" && m.content.as_deref() == Some(user_text) {
skip_indices.insert(i);
if i + 1 < boundary && deduped_msgs[i + 1].role == "assistant" {
skip_indices.insert(i + 1);
}
}
}
if !skip_indices.is_empty() {
info!(
session_id,
duplicates_removed = skip_indices.len(),
"Removed duplicate old user messages matching current prompt"
);
}
deduped_msgs
.into_iter()
.enumerate()
.filter(|(i, _)| !skip_indices.contains(i))
.map(|(_, m)| m)
.collect()
} else {
deduped_msgs
}
};
let execution_checkpoint = if iteration > 1 {
let current_boundary = deduped_msgs
.iter()
.rposition(|m| m.role == "user" && m.content.as_deref() == Some(user_text))
.or_else(|| deduped_msgs.iter().rposition(|m| m.role == "user"));
let current_interaction: Vec<&Message> = current_boundary
.map(|boundary| deduped_msgs.iter().skip(boundary).copied().collect())
.unwrap_or_default();
build_execution_checkpoint_message(user_text, completed_tool_calls, ¤t_interaction)
} else {
None
};
let old_interaction_assistant_ids: std::collections::HashSet<&str> = if let Some(boundary) =
deduped_msgs
.iter()
.rposition(|m| m.role == "user" && m.content.as_deref() == Some(user_text))
.or_else(|| deduped_msgs.iter().rposition(|m| m.role == "user"))
{
let prior_assistant_id: Option<&str> = (0..boundary)
.rev()
.find(|&i| deduped_msgs[i].role == "assistant")
.map(|i| deduped_msgs[i].id.as_str());
deduped_msgs
.iter()
.enumerate()
.filter(|(i, m)| {
*i < boundary
&& m.role == "assistant"
&& Some(m.id.as_str()) != prior_assistant_id
&& !m
.content
.as_deref()
.is_some_and(text_relates_to_critical_identity)
})
.map(|(_, m)| m.id.as_str())
.collect()
} else {
std::collections::HashSet::new()
};
let tool_result_ids: std::collections::HashSet<&str> = deduped_msgs
.iter()
.filter(|m| m.role == "tool" && m.tool_name.as_ref().is_some_and(|n| !n.is_empty()))
.filter_map(|m| m.tool_call_id.as_deref())
.collect();
let tool_call_info: std::collections::HashMap<String, (String, String)> =
if !prior_1_tool_ids.is_empty() {
let mut map = std::collections::HashMap::new();
for m in deduped_msgs.iter() {
if m.role == "assistant" {
if let Some(tc_json) = &m.tool_calls_json {
if let Ok(tcs) = serde_json::from_str::<Vec<ToolCall>>(tc_json) {
for tc in &tcs {
map.insert(tc.id.clone(), (tc.name.clone(), tc.arguments.clone()));
}
}
}
}
}
map
} else {
std::collections::HashMap::new()
};
let mut messages: Vec<Value> = deduped_msgs
.iter()
.filter(|m| !(m.role == "tool" && m.tool_name.as_ref().is_none_or(|n| n.is_empty())))
.filter_map(|m| {
let is_old_assistant = old_interaction_assistant_ids.contains(m.id.as_str());
let is_identity_critical = m
.content
.as_deref()
.is_some_and(text_relates_to_critical_identity);
let content =
if m.role == "tool" && prior_1_tool_ids.contains(&m.id) && !is_identity_critical {
let tc_id = m.tool_call_id.as_deref().unwrap_or("");
let (tool_name, args_json) = tool_call_info
.get(tc_id)
.map(|(n, a)| (n.as_str(), a.as_str()))
.unwrap_or_else(|| (m.tool_name.as_deref().unwrap_or("unknown"), ""));
let result_content = m.content.as_deref().unwrap_or("");
Some(super::sliding_window::summarize_tool_result(
tool_name,
args_json,
result_content,
))
} else if is_old_assistant {
m.content.as_ref().map(|c| {
if c.len() > MAX_OLD_ASSISTANT_CONTENT_CHARS {
let truncated: String =
c.chars().take(MAX_OLD_ASSISTANT_CONTENT_CHARS).collect();
format!("{}…", truncated)
} else {
c.clone()
}
})
} else {
m.content.clone()
};
if m.role == "assistant"
&& m.tool_calls_json.is_none()
&& content.as_deref().is_some_and(|c| {
let t = c.trim_start();
t.starts_with("I wasn't able to process that request.")
|| t.starts_with("I wasn't able to complete this task.")
|| t.starts_with("I made some progress but wasn't able to fully complete")
|| t.starts_with("I seem to be stuck on this task.")
|| t.starts_with("I've reached my processing limit")
|| t.starts_with("This goal hit its daily processing budget")
|| t.starts_with("This scheduled goal hit its daily processing budget")
|| t.starts_with("This scheduled run hit its per-run processing budget")
|| t.starts_with("I sent the requested file(s), but ran into issues")
|| t.starts_with(
"I completed the main deliverable but wasn't able to finish",
)
})
{
return None;
}
let mut obj = json!({
"role": m.role,
"content": content,
});
if let Some(tc_json) = &m.tool_calls_json {
if let Ok(tcs) = serde_json::from_str::<Vec<ToolCall>>(tc_json) {
let filtered: Vec<Value> = tcs
.iter()
.filter(|tc| tool_result_ids.contains(tc.id.as_str()))
.map(|tc| {
let mut val = json!({
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": tc.arguments
}
});
if let Some(ref extra) = tc.extra_content {
val["extra_content"] = extra.clone();
}
val
})
.collect();
if !filtered.is_empty() {
obj["tool_calls"] = json!(filtered);
if m.content.is_none() {
obj["content"] = Value::Null;
}
} else if m.content.is_none()
|| m.content.as_deref().is_some_and(|c| c.trim().is_empty())
{
obj["content"] = json!("[Action completed]");
}
}
}
if let Some(name) = &m.tool_name {
if !name.is_empty() {
obj["name"] = json!(name);
}
}
if let Some(tcid) = &m.tool_call_id {
obj["tool_call_id"] = json!(tcid);
}
Some(obj)
})
.collect();
messages.retain(|m| {
if m.get("role").and_then(|r| r.as_str()) == Some("tool") {
let has_name = m
.get("name")
.and_then(|n| n.as_str())
.is_some_and(|n| !n.is_empty());
if !has_name {
warn!(
"Dropping tool message with missing/empty name: tool_call_id={:?}",
m.get("tool_call_id")
);
}
has_name
} else {
true
}
});
fixup_message_ordering(&mut messages);
{
let has_current_user_msg = messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()) == Some(user_text)
});
if !has_current_user_msg {
messages.push(json!({
"role": "user",
"content": user_text,
}));
}
}
{
let user_positions: Vec<usize> = messages
.iter()
.enumerate()
.filter(|(_, m)| m.get("role").and_then(|r| r.as_str()) == Some("user"))
.map(|(i, _)| i)
.collect();
if user_positions.len() >= 2 {
let current_pos = user_positions
.iter()
.copied()
.rev()
.find(|&pos| {
messages[pos].get("content").and_then(|c| c.as_str()) == Some(user_text)
})
.or_else(|| user_positions.last().copied());
if let Some(current_pos) = current_pos {
let prev_user_content = user_positions
.iter()
.copied()
.filter(|&pos| pos != current_pos)
.rev()
.find_map(|pos| {
messages[pos]
.get("content")
.and_then(|c| c.as_str())
.map(|s| s.to_string())
});
let has_different_task =
prev_user_content.as_deref() != Some(user_text) && prev_user_content.is_some();
if has_different_task {
let marker = json!({
"role": "system",
"content": "[Current Task] The message below is the user's current request. \
Prior messages are conversation history for context."
});
messages.insert(current_pos, marker);
info!(
session_id,
iteration,
user_messages = user_positions.len(),
"Current task marker injected before current user message"
);
}
}
}
}
{
let current_task_pos = messages.iter().rposition(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()) == Some(user_text)
});
if let Some(task_pos) = current_task_pos {
let chain_end = messages
.iter()
.enumerate()
.rev()
.find(|(i, m)| {
*i > task_pos
&& matches!(
m.get("role").and_then(|r| r.as_str()),
Some("assistant") | Some("tool")
)
})
.map(|(i, _)| i)
.unwrap_or(task_pos);
let stray_start = chain_end + 1;
if stray_start < messages.len() {
let stray_count = messages[stray_start..]
.iter()
.filter(|m| m.get("role").and_then(|r| r.as_str()) == Some("user"))
.count();
if stray_count > 0 {
messages.truncate(stray_start);
info!(
session_id,
iteration,
stray_user_messages = stray_count,
"Truncated stray messages after current task's tool chain"
);
}
}
}
}
let collapsed_tool_errors = super::loop_utils::collapse_repeated_tool_errors(&mut messages);
if collapsed_tool_errors > 0 {
info!(
session_id,
iteration,
collapsed_tool_errors,
"Collapsed repeated tool errors in current interaction"
);
}
if agent.context_window_config.enabled {
let model_budget = crate::memory::context_window::compute_available_budget(
model,
system_prompt,
tool_defs,
&agent.context_window_config,
);
let policy_budget = policy_bundle.policy.context_budget;
if agent.policy_config.policy_shadow_mode && !agent.policy_config.policy_enforce {
info!(
session_id,
iteration, model_budget, policy_budget, "Context budget shadow comparison"
);
}
let effective_budget = if agent.policy_config.policy_enforce {
policy_budget.min(model_budget)
} else {
model_budget
};
messages = crate::memory::context_window::fit_messages_with_source_quotas(
messages,
effective_budget,
session_summary.as_ref().map(|s| s.summary.as_str()),
);
}
if empty_response_retry_pending && !is_trigger_session(session_id) {
let before = messages.len();
messages = build_empty_response_retry_messages(&messages, user_text);
info!(
session_id,
iteration,
before,
after = messages.len(),
"Empty-response recovery: reduced history while preserving immediate parent context"
);
}
let effective_system_prompt = if iteration > 1 {
let style = match policy_bundle.policy.model_profile {
ModelProfile::Cheap => ToolLoopPromptStyle::Lite,
ModelProfile::Balanced | ModelProfile::Strong => ToolLoopPromptStyle::Standard,
};
build_tool_loop_system_prompt(system_prompt, style)
} else {
system_prompt.to_string()
};
messages.insert(
0,
json!({
"role": "system",
"content": effective_system_prompt,
}),
);
if let Some(ref summary) = session_summary {
if !summary.summary.is_empty() {
messages.insert(
1,
json!({
"role": "system",
"content": format!("[Session Summary]\n{}", summary.summary),
}),
);
}
}
if let Some(checkpoint) = execution_checkpoint {
messages.push(json!({
"role": "system",
"content": checkpoint,
}));
info!(
session_id,
iteration, "Injected execution checkpoint for in-progress task continuity"
);
}
{
let non_system_non_user_count = messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
role != "system" && role != "user"
})
.count();
if non_system_non_user_count == 0 {
messages.retain(|m| {
m.get("role").and_then(|r| r.as_str()) != Some("user")
|| m.get("content").and_then(|c| c.as_str()) == Some(user_text)
});
pending_system_messages.push(SystemDirective::FreshConversationContext);
}
}
for directive in pending_system_messages.drain(..) {
messages.push(json!({
"role": "system",
"content": directive.render(),
}));
}
if empty_response_retry_pending && !is_trigger_session(session_id) {
messages.push(json!({
"role": "system",
"content": SystemDirective::EmptyResponseRetry.render()
}));
}
if iteration > 1 {
send_status(status_tx, StatusUpdate::Thinking(iteration));
}
{
let summary: Vec<String> = messages
.iter()
.map(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("?");
let name = m.get("name").and_then(|n| n.as_str()).unwrap_or("");
let tc_id = m
.get("tool_call_id")
.and_then(|id| id.as_str())
.unwrap_or("");
let tc_count = m
.get("tool_calls")
.and_then(|v| v.as_array())
.map_or(0, |a| a.len());
if role == "tool" {
format!("tool({},tc_id={})", name, &tc_id[..tc_id.len().min(12)])
} else if tc_count > 0 {
format!("{}(tc={})", role, tc_count)
} else {
role.to_string()
}
})
.collect();
let messages_json = serde_json::to_string(&messages).unwrap_or_default();
let tools_json = serde_json::to_string(tool_defs).unwrap_or_default();
let est_msg_tokens = messages_json.len() / 4;
let est_tool_tokens = tools_json.len() / 4;
let est_total_tokens = est_msg_tokens + est_tool_tokens;
let est_msg_tokens_u64 = est_msg_tokens as u64;
let est_tool_tokens_u64 = est_tool_tokens as u64;
let est_total_tokens_u64 = est_total_tokens as u64;
let est_tool_share_bps = est_tool_tokens_u64
.saturating_mul(10_000)
.checked_div(est_total_tokens_u64)
.unwrap_or(0);
POLICY_METRICS
.est_input_token_samples
.fetch_add(1, Ordering::Relaxed);
POLICY_METRICS
.est_input_tokens_total
.fetch_add(est_total_tokens_u64, Ordering::Relaxed);
POLICY_METRICS
.est_msg_tokens_total
.fetch_add(est_msg_tokens_u64, Ordering::Relaxed);
POLICY_METRICS
.est_tool_tokens_total
.fetch_add(est_tool_tokens_u64, Ordering::Relaxed);
const HIGH_TOOL_SHARE_BPS: u64 = 3500; const HIGH_TOOL_TOKENS_ABS: u64 = 1_500; if est_tool_share_bps >= HIGH_TOOL_SHARE_BPS {
POLICY_METRICS
.est_tool_tokens_high_share_total
.fetch_add(1, Ordering::Relaxed);
}
if est_tool_tokens_u64 >= HIGH_TOOL_TOKENS_ABS {
POLICY_METRICS
.est_tool_tokens_high_abs_total
.fetch_add(1, Ordering::Relaxed);
}
info!(
session_id,
iteration,
est_input_tokens = est_total_tokens,
est_msg_tokens,
est_tool_tokens,
est_tool_share_pct = est_tool_share_bps as f64 / 100.0,
msg_count = messages.len(),
msgs = ?summary,
"Context before LLM call"
);
}
Ok(MessageBuildData { messages })
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn msg(role: &str, content: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
session_id: "test-session".to_string(),
role: role.to_string(),
content: Some(content.to_string()),
tool_call_id: None,
tool_name: None,
tool_calls_json: None,
created_at: Utc::now(),
importance: 0.5,
..Message::runtime_defaults()
}
}
fn tool_msg(name: &str, content: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
session_id: "test-session".to_string(),
role: "tool".to_string(),
content: Some(content.to_string()),
tool_call_id: Some(format!("tool-call-{}", uuid::Uuid::new_v4())),
tool_name: Some(name.to_string()),
tool_calls_json: None,
created_at: Utc::now(),
importance: 0.5,
..Message::runtime_defaults()
}
}
#[test]
fn empty_retry_preserves_parent_pair_and_current_user() {
let messages = vec![
json!({"role": "user", "content": "can you clear cache using drush?"}),
json!({"role": "assistant", "content": "I can see updates available. Should I proceed with updating these?"}),
json!({"role": "user", "content": "yes, update them"}),
];
let recovered = build_empty_response_retry_messages(&messages, "yes, update them");
assert_eq!(recovered.len(), 3);
assert_eq!(recovered[0]["role"], "user");
assert_eq!(recovered[1]["role"], "assistant");
assert_eq!(recovered[2]["role"], "user");
assert_eq!(recovered[2]["content"].as_str(), Some("yes, update them"));
}
#[test]
fn empty_retry_falls_back_to_current_user_when_no_history() {
let messages = vec![json!({"role": "user", "content": "help"})];
let recovered = build_empty_response_retry_messages(&messages, "help");
assert_eq!(recovered.len(), 1);
assert_eq!(recovered[0]["role"], "user");
assert_eq!(recovered[0]["content"].as_str(), Some("help"));
}
#[tokio::test]
async fn sliding_window_retains_pairs_that_fit_budget() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::{ConversationSummary, MessageStore};
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Older task"))
.await
.expect("append oldest user");
harness
.state
.append_message(&msg("assistant", "Older answer"))
.await
.expect("append oldest assistant");
harness
.state
.append_message(&msg(
"user",
"Please work in ~/projects/blog.aidaemon.ai/src/content/posts",
))
.await
.expect("append prior user");
harness
.state
.append_message(&msg("assistant", "Which posts should I update?"))
.await
.expect("append prior assistant");
harness
.state
.append_message(&msg("user", "Why?"))
.await
.expect("append current user");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let pinned_memories: Vec<Message> = Vec::new();
let tool_defs: Vec<Value> = Vec::new();
let session_summary: Option<ConversationSummary> = None;
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Why?",
completed_tool_calls: &[],
model: "mock-model",
system_prompt: "You are a helpful test assistant.",
pinned_memories: &pinned_memories,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
session_summary: &session_summary,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
serialized.contains("blog.aidaemon.ai"),
"immediately prior user turn should be retained: {}",
serialized
);
assert!(
serialized.contains("Which posts should I update?"),
"immediately prior assistant turn should be retained: {}",
serialized
);
assert!(
serialized.contains("Older task"),
"older pair within budget should be retained by sliding window: {}",
serialized
);
assert!(
serialized.contains("Older answer"),
"older assistant within budget should be retained: {}",
serialized
);
assert!(
serialized.contains("Why?"),
"current user message should remain present: {}",
serialized
);
}
#[tokio::test]
async fn later_iterations_include_execution_checkpoint_after_tool_progress() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::{ConversationSummary, MessageStore};
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Find the system details and summarize them."))
.await
.expect("append user");
harness
.state
.append_message(&tool_msg(
"system_info",
"OS: macOS 15.0\nMemory: 16 GB\nHostname: dev-machine",
))
.await
.expect("append tool");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let pinned_memories: Vec<Message> = Vec::new();
let tool_defs: Vec<Value> = Vec::new();
let session_summary: Option<ConversationSummary> = None;
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let completed_tool_calls = vec!["system_info({})".to_string()];
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 2,
user_text: "Find the system details and summarize them.",
completed_tool_calls: &completed_tool_calls,
model: "mock-model",
system_prompt: "You are a helpful test assistant.",
pinned_memories: &pinned_memories,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
session_summary: &session_summary,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
serialized.contains("EXECUTION CHECKPOINT"),
"later iterations should carry a live execution checkpoint: {}",
serialized
);
assert!(
serialized.contains("Find the system details and summarize them."),
"checkpoint should restate the active request: {}",
serialized
);
assert!(
serialized.contains("system_info"),
"checkpoint should include completed tool/evidence context: {}",
serialized
);
assert!(
serialized.contains("Do NOT reset into a generic availability reply"),
"checkpoint should explicitly block idle reset replies: {}",
serialized
);
}
#[tokio::test]
async fn idle_gap_resets_sliding_window_to_zero() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::{ConversationSummary, MessageStore};
use chrono::Duration as ChronoDuration;
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
let old_time = Utc::now() - ChronoDuration::hours(3);
let old_user = Message {
created_at: old_time,
..msg("user", "Old stale question from 3 hours ago")
};
let old_assistant = Message {
created_at: old_time,
..msg("assistant", "Old stale answer from 3 hours ago")
};
harness
.state
.append_message(&old_user)
.await
.expect("append old user");
harness
.state
.append_message(&old_assistant)
.await
.expect("append old assistant");
harness
.state
.append_message(&msg("user", "Fresh question now"))
.await
.expect("append current user");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let pinned_memories: Vec<Message> = Vec::new();
let tool_defs: Vec<Value> = Vec::new();
let session_summary: Option<ConversationSummary> = None;
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Fresh question now",
completed_tool_calls: &[],
model: "mock-model",
system_prompt: "You are a helpful test assistant.",
pinned_memories: &pinned_memories,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
session_summary: &session_summary,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
!serialized.contains("Old stale question from 3 hours ago"),
"idle gap should reset window to 0, removing old stale pairs: {}",
serialized
);
assert!(
!serialized.contains("Old stale answer from 3 hours ago"),
"idle gap should reset window to 0, removing old stale assistant: {}",
serialized
);
assert!(
serialized.contains("Fresh question now"),
"current user message should always be present: {}",
serialized
);
}
#[tokio::test]
async fn session_summary_injected_when_present() {
use crate::execution_policy::PolicyBundle;
use crate::testing::{setup_test_agent, MockProvider};
use crate::traits::{ConversationSummary, MessageStore};
let harness = setup_test_agent(MockProvider::new())
.await
.expect("test harness");
harness
.state
.append_message(&msg("user", "Current question"))
.await
.expect("append user");
let policy_bundle = PolicyBundle::from_scores(0.1, 0.1, 0.9);
let pinned_memories: Vec<Message> = Vec::new();
let tool_defs: Vec<Value> = Vec::new();
let session_summary = Some(ConversationSummary {
session_id: "test-session".to_string(),
summary: "User previously asked about deploying a blog. Config was created."
.to_string(),
message_count: 5,
last_message_id: "old-msg-id".to_string(),
updated_at: Utc::now(),
});
let mut pending_system_messages = Vec::new();
let status_tx: Option<mpsc::Sender<StatusUpdate>> = None;
let mut ctx = MessageBuildCtx {
session_id: "test-session",
iteration: 1,
user_text: "Current question",
completed_tool_calls: &[],
model: "mock-model",
system_prompt: "You are a helpful test assistant.",
pinned_memories: &pinned_memories,
tool_defs: &tool_defs,
policy_bundle: &policy_bundle,
session_summary: &session_summary,
pending_system_messages: &mut pending_system_messages,
empty_response_retry_pending: false,
status_tx: &status_tx,
};
let built = run_message_build_phase(
&crate::agent::services::AgentServices::new(&harness.agent),
&mut ctx,
)
.await
.expect("message build");
let serialized = serde_json::to_string(&built.messages).expect("serialize messages");
assert!(
serialized.contains("[Session Summary]"),
"session summary should be injected as [Session Summary]: {}",
serialized
);
assert!(
serialized.contains("deploying a blog"),
"session summary content should be present: {}",
serialized
);
let has_summary_system_msg = built.messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("system")
&& m.get("content")
.and_then(|c| c.as_str())
.is_some_and(|s| s.contains("[Session Summary]"))
});
assert!(
has_summary_system_msg,
"summary should be a system-role message"
);
}
}