use crate::config::KodaConfig;
use crate::db::{Database, Role};
use crate::engine::{EngineCommand, EngineEvent, EngineSink};
use crate::file_tracker::FileTracker;
use crate::inference_helpers::{
AUTO_COMPACT_THRESHOLD, CONTEXT_WARN_THRESHOLD, RATE_LIMIT_MAX_RETRIES, assemble_messages,
estimate_tokens, is_context_overflow_error, is_image_rejection_error, is_rate_limit_error,
is_server_error, rate_limit_backoff,
};
use crate::loop_guard::{LoopAction, LoopDetector};
use crate::persistence::Persistence;
use crate::providers::{
ChatMessage, ImageData, LlmProvider, StreamChunk, TokenUsage, ToolCall, ToolDefinition,
stream_collector::SseCollector,
};
use crate::skill_scope::SkillToolScope;
use crate::tool_dispatch::{
can_parallelize, execute_tools_parallel, execute_tools_sequential, execute_tools_split_batch,
};
use crate::tools::ToolRegistry;
use crate::trust::TrustMode;
use anyhow::{Context, Result};
use std::path::Path;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
struct TurnState<'a> {
db: &'a Database,
session_id: &'a str,
system_message: &'a ChatMessage,
pending_images: Option<&'a [ImageData]>,
iteration: u32,
config: &'a KodaConfig,
provider: &'a dyn LlmProvider,
tool_defs: &'a [ToolDefinition],
sink: &'a dyn EngineSink,
cancel: &'a CancellationToken,
}
struct StreamResult {
text: String,
thinking_content: String,
tool_calls: Vec<ToolCall>,
eager_results: Vec<(String, String, bool, Option<String>)>,
usage: TokenUsage,
char_count: usize,
interrupted: bool,
network_error: Option<String>,
}
async fn assemble_context(turn: &TurnState<'_>) -> Result<Vec<ChatMessage>> {
let history = turn.db.load_context(turn.session_id).await?;
let analysis = crate::context_analysis::analyze_context(&history);
if analysis.total > 0 {
tracing::debug!(
"Context analysis: {} total, {}% tool results, {}% duplicate reads",
analysis.total,
analysis.tool_result_percent(),
analysis.duplicate_read_percent(),
);
for (tool, tokens) in analysis.top_tool_results(3) {
tracing::debug!(" {tool}: ~{tokens} tokens");
}
}
let mut messages = assemble_messages(turn.system_message, &history);
if turn.iteration == 0
&& let Some(imgs) = turn.pending_images
&& !imgs.is_empty()
&& let Some(last_user) = messages.iter_mut().rev().find(|m| m.role == "user")
{
last_user.images = Some(imgs.to_vec());
}
let context_used = estimate_tokens(&messages);
crate::context::update(context_used, turn.config.max_context_tokens);
turn.sink.emit(EngineEvent::ContextUsage {
used: context_used,
max: turn.config.max_context_tokens,
});
let ctx_pct = crate::context::percentage();
if (CONTEXT_WARN_THRESHOLD..AUTO_COMPACT_THRESHOLD).contains(&ctx_pct) {
let mut warning = format!("Context at {ctx_pct}% — approaching limit.");
let top = analysis.top_tool_results(2);
if !top.is_empty() {
let hogs: Vec<String> = top
.iter()
.map(|(name, tokens)| format!("{name} (~{tokens} tok)"))
.collect();
warning.push_str(&format!(" Top consumers: {}.", hogs.join(", ")));
}
let waste = analysis.total_duplicate_waste();
if waste > 500 {
warning.push_str(&format!(" ~{waste} tokens wasted on duplicate file reads."));
}
warning.push_str(" Run /compact to free up space.");
turn.sink.emit(EngineEvent::Warn { message: warning });
}
Ok(messages)
}
async fn preflight_compact_if_needed(
turn: &TurnState<'_>,
messages: Vec<ChatMessage>,
) -> Result<Vec<ChatMessage>> {
let ctx_pct = crate::context::percentage();
if ctx_pct < AUTO_COMPACT_THRESHOLD {
return Ok(messages);
}
if crate::compact::is_compact_circuit_broken() {
tracing::warn!("Pre-flight: context at {ctx_pct}% but circuit breaker tripped — skipping");
return Ok(messages);
}
tracing::warn!("Pre-flight: context at {ctx_pct}%, attempting auto-compact");
turn.sink.emit(EngineEvent::Info {
message: format!("\u{1f4e6} Context at {ctx_pct}% \u{2014} compacting before sending..."),
});
match crate::compact::compact_session_with_provider(
turn.db,
turn.session_id,
turn.config.max_context_tokens,
&turn.config.model_settings,
turn.provider,
)
.await
{
Ok(Ok(result)) => {
turn.sink.emit(EngineEvent::Info {
message: format!(
"\u{2705} Compacted {} messages (~{} token summary)",
result.deleted, result.summary_tokens
),
});
assemble_context(turn).await
}
Ok(Err(skip)) => {
tracing::info!("Pre-flight compact skipped: {skip:?}");
if matches!(skip, crate::compact::CompactSkip::HistoryTooLarge) {
crate::compact::record_compact_failure();
turn.sink.emit(EngineEvent::Warn {
message: "\u{26a0}\u{fe0f} Context is full but history is too large for \
this model to summarize. Start a new session (/session) or \
switch to a model with a larger context window."
.to_string(),
});
}
Ok(messages)
}
Err(e) => {
tracing::warn!("Pre-flight compact failed: {e:#}");
let tripped = crate::compact::record_compact_failure();
let suffix = if tripped {
" Auto-compact disabled after repeated failures."
} else {
" Continuing anyway..."
};
turn.sink.emit(EngineEvent::Warn {
message: format!("Compact failed: {e:#}.{suffix}"),
});
Ok(messages)
}
}
}
async fn try_with_rate_limit(
provider: &dyn LlmProvider,
messages: &[ChatMessage],
tool_defs: &[ToolDefinition],
model_settings: &crate::config::ModelSettings,
cancel: &CancellationToken,
sink: &dyn EngineSink,
) -> Result<Option<SseCollector>> {
let mut last_err = None;
for attempt in 0..RATE_LIMIT_MAX_RETRIES {
let result = tokio::select! {
result = provider.chat_stream(messages, tool_defs, model_settings) => result,
_ = cancel.cancelled() => return Ok(None),
};
match result {
Ok(collector) => return Ok(Some(collector)),
Err(e) if is_rate_limit_error(&e) && attempt + 1 < RATE_LIMIT_MAX_RETRIES => {
let delay = rate_limit_backoff(attempt);
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::Warn {
message: format!("\u{23f3} Rate limited. Retrying in {}s...", delay.as_secs()),
});
tracing::warn!(
"Rate limit (attempt {}/{}): {e:#}",
attempt + 1,
RATE_LIMIT_MAX_RETRIES
);
tokio::time::sleep(delay).await;
sink.emit(EngineEvent::SpinnerStart {
message: format!("Retrying (attempt {})...", attempt + 2),
});
last_err = Some(e);
}
Err(e) => return Err(e),
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("Rate limit retries exhausted")))
}
async fn try_overflow_recovery(
turn: &TurnState<'_>,
original_err: anyhow::Error,
) -> Result<Option<(SseCollector, Vec<ChatMessage>)>> {
turn.sink.emit(EngineEvent::SpinnerStop);
turn.sink.emit(EngineEvent::Warn {
message: "\u{26a0}\u{fe0f} Provider rejected request (context overflow). \
Compacting and retrying..."
.to_string(),
});
tracing::warn!("Context overflow from provider: {original_err:#}");
match crate::compact::compact_session_with_provider(
turn.db,
turn.session_id,
turn.config.max_context_tokens,
&turn.config.model_settings,
turn.provider,
)
.await
{
Ok(Ok(result)) => {
turn.sink.emit(EngineEvent::Info {
message: format!(
"\u{2705} Compacted {} messages. Retrying...",
result.deleted
),
});
}
_ => {
return Err(original_err)
.context("LLM inference failed (context overflow, compaction unsuccessful)");
}
}
let messages = assemble_context(turn).await?;
turn.sink.emit(EngineEvent::SpinnerStart {
message: "Retrying...".into(),
});
let collector = tokio::select! {
result = turn.provider.chat_stream(&messages, turn.tool_defs, &turn.config.model_settings) => {
result.context("LLM inference failed after compaction retry")?
}
_ = turn.cancel.cancelled() => return Ok(None),
};
Ok(Some((collector, messages)))
}
async fn collect_stream(
rx: &mut mpsc::Receiver<StreamChunk>,
sink: &dyn EngineSink,
cancel: &CancellationToken,
tools: &ToolRegistry,
mode: TrustMode,
project_root: &Path,
) -> StreamResult {
let mut full_text = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut eager_results: Vec<(String, String, bool, Option<String>)> = Vec::new();
let mut usage = TokenUsage::default();
let mut first_token = true;
let mut char_count: usize = 0;
let mut thinking_content = String::new();
let mut in_thinking_block = false;
let mut response_banner_shown = false;
let mut thinking_banner_shown = false;
let mut interrupted = false;
loop {
let chunk = tokio::select! {
c = rx.recv() => c,
_ = cancel.cancelled() => {
interrupted = true;
None
}
};
if interrupted || cancel.is_cancelled() {
sink.emit(EngineEvent::SpinnerStop);
if !full_text.is_empty() {
sink.emit(EngineEvent::TextDone);
}
sink.emit(EngineEvent::Warn {
message: "Interrupted".into(),
});
return StreamResult {
text: full_text,
thinking_content,
tool_calls,
eager_results,
usage,
char_count,
interrupted: true,
network_error: None,
};
}
let Some(chunk) = chunk else { break };
match chunk {
StreamChunk::TextDelta(delta) => {
if first_token {
if in_thinking_block {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::ThinkingDone);
in_thinking_block = false;
thinking_banner_shown = true;
}
sink.emit(EngineEvent::SpinnerStop);
first_token = false;
}
if !response_banner_shown && !delta.trim().is_empty() {
sink.emit(EngineEvent::ResponseStart);
response_banner_shown = true;
}
full_text.push_str(&delta);
char_count += delta.len();
sink.emit(EngineEvent::TextDelta {
text: delta.clone(),
});
}
StreamChunk::ThinkingDelta(delta) => {
if !thinking_banner_shown {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::ThinkingStart);
thinking_banner_shown = true;
}
in_thinking_block = true;
sink.emit(EngineEvent::ThinkingDelta {
text: delta.clone(),
});
thinking_content.push_str(&delta);
}
StreamChunk::ToolCallReady(tc) => {
if in_thinking_block {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::ThinkingDone);
in_thinking_block = false;
}
let args: serde_json::Value =
serde_json::from_str(&tc.arguments).unwrap_or_default();
let is_read_only = !crate::tools::is_mutating_tool(&tc.function_name);
let is_auto_approved = !matches!(
crate::approval::check_tool(&tc.function_name, &args, mode, Some(project_root),),
crate::approval::ToolApproval::NeedsConfirmation
| crate::approval::ToolApproval::Blocked
);
if is_read_only && is_auto_approved && tc.function_name != "InvokeAgent" {
tracing::debug!("Eager dispatch: {} (id={})", tc.function_name, tc.id);
let r = tools.execute(&tc.function_name, &tc.arguments, None).await;
eager_results.push((tc.id.clone(), r.output, r.success, r.full_output));
}
tool_calls.push(tc);
}
StreamChunk::ToolCalls(tcs) => {
if in_thinking_block {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::ThinkingDone);
in_thinking_block = false;
}
sink.emit(EngineEvent::SpinnerStop);
tool_calls.extend(tcs);
}
StreamChunk::Done(u) => {
if in_thinking_block {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::ThinkingDone);
}
usage = u;
break;
}
StreamChunk::NetworkError(err) => {
sink.emit(EngineEvent::SpinnerStop);
if !full_text.is_empty() {
sink.emit(EngineEvent::TextDone);
}
sink.emit(EngineEvent::Warn {
message: format!("Connection lost mid-stream — turn discarded ({err})"),
});
return StreamResult {
text: full_text,
thinking_content,
tool_calls,
eager_results,
usage,
char_count,
interrupted: false,
network_error: Some(err),
};
}
}
}
sink.emit(EngineEvent::TextDone);
if first_token {
sink.emit(EngineEvent::SpinnerStop);
}
StreamResult {
text: full_text,
thinking_content,
tool_calls,
eager_results,
usage,
char_count,
interrupted: false,
network_error: None,
}
}
pub struct InferenceContext<'a> {
pub project_root: &'a Path,
pub config: &'a KodaConfig,
pub db: &'a Database,
pub session_id: &'a str,
pub system_prompt: &'a str,
pub provider: &'a dyn LlmProvider,
pub tools: &'a ToolRegistry,
pub tool_defs: &'a [ToolDefinition],
pub pending_images: Option<Vec<ImageData>>,
pub mode: TrustMode,
pub sink: &'a dyn EngineSink,
pub cancel: CancellationToken,
pub cmd_rx: &'a mut mpsc::Receiver<EngineCommand>,
pub file_tracker: &'a mut FileTracker,
}
#[tracing::instrument(skip_all, fields(session_id = %ctx.session_id, agent = %ctx.config.agent_name))]
pub async fn inference_loop(ctx: InferenceContext<'_>) -> Result<()> {
let InferenceContext {
project_root,
config,
db,
session_id,
system_prompt,
provider,
tools,
tool_defs,
pending_images,
mode,
sink,
cancel,
cmd_rx,
file_tracker,
} = ctx;
let mut hard_cap = config.max_iterations;
let mut iteration = 0u32;
let mut made_tool_calls = false;
let mut retried_empty = false;
let mut loop_detector = LoopDetector::new();
let sub_agent_cache = crate::sub_agent_cache::SubAgentCache::new();
let bg_agents = crate::bg_agent::new_shared();
let mut skill_scope = SkillToolScope::new();
let mut total_prompt_tokens: i64 = 0;
let mut total_completion_tokens: i64 = 0;
let mut total_cache_read_tokens: i64 = 0;
let mut total_thinking_tokens: i64 = 0;
let mut total_char_count: usize = 0;
let loop_start = Instant::now();
let base_system_prompt = system_prompt.to_string();
if let Ok(Some(mc)) = crate::microcompact::microcompact_session(db, session_id).await {
sink.emit(EngineEvent::Info {
message: format!(
"\u{1f9f9} Microcompact: cleared {} old tool results (~{} tokens)",
mc.cleared, mc.tokens_saved,
),
});
}
loop {
for bg_result in bg_agents.drain_completed() {
let status = if bg_result.success {
"completed"
} else {
"failed"
};
let injection = format!(
"[Background agent '{}' {status}]\n\
Original task: {}\n\
Result:\n{}",
bg_result.agent_name, bg_result.prompt, bg_result.output
);
sink.emit(EngineEvent::Info {
message: format!(
" \u{2705} Background agent '{}' {status}",
bg_result.agent_name
),
});
db.insert_message(session_id, &Role::User, Some(&injection), None, None, None)
.await?;
}
if iteration >= hard_cap {
let recent = loop_detector.recent_names();
sink.emit(EngineEvent::LoopCapReached {
cap: hard_cap,
recent_tools: recent,
});
let extra = loop {
tokio::select! {
cmd = cmd_rx.recv() => match cmd {
Some(EngineCommand::LoopDecision { action }) => {
break action.extra_iterations();
}
Some(EngineCommand::Interrupt) => {
cancel.cancel();
break 0;
}
None => break 0,
_ => continue,
},
_ = cancel.cancelled() => break 0,
}
};
if extra == 0 {
break Ok(());
}
hard_cap += extra;
}
let progress = crate::progress::get_progress_summary(db, session_id)
.await
.unwrap_or_default();
let todo_section = crate::tools::todo::get_todo_section(db, session_id).await;
let git_line = crate::git::git_context(project_root)
.map(|ctx| format!("\n{ctx}"))
.unwrap_or_default();
let system_prompt_full = format!("{base_system_prompt}{progress}{todo_section}{git_line}");
let system_message = ChatMessage::text("system", &system_prompt_full);
let scoped_tool_defs = skill_scope.filter_tool_defs(tool_defs);
let active_tool_defs: &[ToolDefinition] = &scoped_tool_defs;
let turn = TurnState {
db,
session_id,
system_message: &system_message,
pending_images: pending_images.as_deref(),
iteration,
config,
provider,
tool_defs: active_tool_defs,
sink,
cancel: &cancel,
};
let messages = assemble_context(&turn).await?;
let messages = preflight_compact_if_needed(&turn, messages).await?;
let had_images = turn
.pending_images
.map(|imgs| !imgs.is_empty())
.unwrap_or(false);
sink.emit(EngineEvent::SpinnerStart {
message: "Thinking...".into(),
});
let stream_result = try_with_rate_limit(
provider,
&messages,
active_tool_defs,
&config.model_settings,
&cancel,
sink,
)
.await;
let stream_result: Result<SseCollector> = match stream_result {
Ok(Some(c)) => Ok(c),
Ok(None) => {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::Warn {
message: "Interrupted".into(),
});
return Ok(());
}
Err(e) => Err(e),
};
let SseCollector {
mut rx,
handle: sse_handle,
} = match stream_result {
Ok(c) => c,
Err(e) if is_context_overflow_error(&e) => {
match try_overflow_recovery(&turn, e).await? {
Some((rx, _updated)) => rx,
None => {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::Warn {
message: "Interrupted".into(),
});
return Ok(());
}
}
}
Err(e) if is_server_error(&e) => {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::Warn {
message: format!(
"Provider returned a server error: {e:#}. \
This often means the model can't handle the current \
conversation state. Try a different model or start a new session."
),
});
return Ok(());
}
Err(e) if had_images && is_image_rejection_error(&e) => {
sink.emit(EngineEvent::SpinnerStop);
sink.emit(EngineEvent::Warn {
message: format!(
"⚠ This model rejected the image attachment — \
it likely does not support vision input. \
Switch to a vision-capable model such as \
claude-sonnet, gemini-flash, or gpt-4o. ({e})"
),
});
return Ok(());
}
Err(e) => {
return Err(e).context("LLM inference failed");
}
};
let stream_result = collect_stream(&mut rx, sink, &cancel, tools, mode, project_root).await;
if stream_result.interrupted {
sse_handle.abort();
let has_text = !stream_result.text.is_empty();
let has_thinking = !stream_result.thinking_content.is_empty();
if has_text || has_thinking {
let mid = db
.insert_message(
session_id,
&Role::Assistant,
if has_text {
Some(stream_result.text.as_str())
} else {
None
},
None,
None,
None,
)
.await?;
if has_thinking {
db.update_message_thinking_content(mid, &stream_result.thinking_content)
.await?;
}
}
return Ok(());
}
if stream_result.network_error.is_some() {
sse_handle.abort();
return Ok(());
}
let full_text = stream_result.text;
let stream_thinking = stream_result.thinking_content;
let tool_calls = crate::tool_normalize::normalize_tool_calls(stream_result.tool_calls);
let usage = stream_result.usage;
let char_count = stream_result.char_count;
if tool_calls.is_empty()
&& made_tool_calls
&& full_text.trim().is_empty()
&& usage.stop_reason != "max_tokens"
&& !retried_empty
{
retried_empty = true;
sink.emit(EngineEvent::SpinnerStart {
message: "Empty response — retrying...".into(),
});
continue;
}
let content = if full_text.is_empty() {
None
} else {
Some(full_text.as_str())
};
let tool_calls_json = if tool_calls.is_empty() {
None
} else {
Some(serde_json::to_string(&tool_calls)?)
};
let msg_id = db
.insert_message(
session_id,
&Role::Assistant,
content,
tool_calls_json.as_deref(),
None,
Some(&usage),
)
.await?;
db.mark_message_complete(msg_id).await?;
if !stream_thinking.is_empty() {
db.update_message_thinking_content(msg_id, &stream_thinking)
.await?;
}
if tool_calls.is_empty() {
if usage.stop_reason == "max_tokens" {
sink.emit(EngineEvent::Warn {
message: format!(
"Model {} hit max_tokens limit — response was truncated. \
The context may be too large. Try /compact or start a new session.",
config.model,
),
});
continue;
} else if made_tool_calls && full_text.trim().is_empty() {
sink.emit(EngineEvent::Warn {
message: format!(
"Model {} produced an empty response after tool use. \
Try rephrasing, run /compact, or switch models with /model.",
config.model,
),
});
}
total_prompt_tokens += usage.prompt_tokens;
total_completion_tokens += usage.completion_tokens;
total_cache_read_tokens += usage.cache_read_tokens;
total_thinking_tokens += usage.thinking_tokens;
total_char_count += char_count;
let display_tokens = if total_completion_tokens > 0 {
total_completion_tokens
} else {
(total_char_count / 4) as i64
};
let total_elapsed = loop_start.elapsed();
let total_secs = total_elapsed.as_secs_f64();
let rate = if total_secs > 0.0 && display_tokens > 0 {
display_tokens as f64 / total_secs
} else {
0.0
};
let context = crate::context::format_footer();
sink.emit(EngineEvent::Footer {
prompt_tokens: total_prompt_tokens,
completion_tokens: total_completion_tokens,
cache_read_tokens: total_cache_read_tokens,
thinking_tokens: total_thinking_tokens,
total_chars: total_char_count,
elapsed_ms: total_elapsed.as_millis() as u64,
rate,
context,
});
return Ok(());
}
total_prompt_tokens += usage.prompt_tokens;
total_completion_tokens += usage.completion_tokens;
total_cache_read_tokens += usage.cache_read_tokens;
total_thinking_tokens += usage.thinking_tokens;
total_char_count += char_count;
made_tool_calls = true;
let eager_ids: std::collections::HashSet<String> = stream_result
.eager_results
.iter()
.map(|(id, _, _, _)| id.clone())
.collect();
if !eager_ids.is_empty() {
tracing::info!(
"{} tool(s) executed eagerly during streaming",
eager_ids.len()
);
for (tc_id, result, success, full_output) in &stream_result.eager_results {
if let Some(tc) = tool_calls.iter().find(|tc| tc.id == *tc_id) {
sink.emit(EngineEvent::ToolCallStart {
id: tc_id.clone(),
name: tc.function_name.clone(),
args: serde_json::from_str(&tc.arguments).unwrap_or_default(),
is_sub_agent: false,
});
crate::tool_dispatch::record_tool_result(
tc,
result,
*success,
full_output.as_deref(),
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
}
}
}
let remaining_tools: Vec<ToolCall> = tool_calls
.iter()
.filter(|tc| !eager_ids.contains(&tc.id))
.cloned()
.collect();
let remaining_tools = if skill_scope.is_active() {
let mut allowed = Vec::with_capacity(remaining_tools.len());
for tc in remaining_tools {
if let Some(err_msg) = skill_scope.check_tool_call(&tc.function_name) {
let parsed_args: serde_json::Value =
serde_json::from_str(&tc.arguments).unwrap_or_default();
sink.emit(EngineEvent::ToolCallStart {
id: tc.id.clone(),
name: tc.function_name.clone(),
args: parsed_args,
is_sub_agent: false,
});
crate::tool_dispatch::record_tool_result(
&tc,
&err_msg,
false,
None,
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
} else {
allowed.push(tc);
}
}
allowed
} else {
remaining_tools
};
if remaining_tools.len() > 1 && can_parallelize(&remaining_tools, mode, project_root) {
execute_tools_parallel(
&remaining_tools,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
&sub_agent_cache,
file_tracker,
&bg_agents,
)
.await?;
} else if remaining_tools.len() > 1 {
execute_tools_split_batch(
&remaining_tools,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
cmd_rx,
&sub_agent_cache,
file_tracker,
&bg_agents,
)
.await?;
} else if !remaining_tools.is_empty() {
execute_tools_sequential(
&remaining_tools,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
cmd_rx,
&sub_agent_cache,
file_tracker,
&bg_agents,
)
.await?;
}
{
let scope_calls: Vec<(String, serde_json::Value)> = tool_calls
.iter()
.map(|tc| {
let args: serde_json::Value =
serde_json::from_str(&tc.arguments).unwrap_or_default();
(tc.function_name.clone(), args)
})
.collect();
let was_active = skill_scope.is_active();
skill_scope.update_from_tool_calls(&scope_calls, &tools.skill_registry);
match (was_active, skill_scope.is_active()) {
(false, true) => {
sink.emit(EngineEvent::Info {
message: "\u{1f512} Skill tool scope activated — tool set restricted"
.into(),
});
}
(true, false) => {
sink.emit(EngineEvent::Info {
message: "\u{1f513} Skill tool scope cleared — full tool set restored"
.into(),
});
}
_ => {}
}
}
match loop_detector.record(&tool_calls) {
LoopAction::Ok => {}
LoopAction::InjectFeedback(detail) => {
tracing::warn!(%detail, "Loop detected — injecting feedback");
sink.emit(EngineEvent::Warn {
message: format!(
"Loop detected: {detail}. Injecting feedback to nudge the model."
),
});
db.insert_message(
session_id,
&Role::User,
Some(&format!(
"System: Potential loop detected — {detail}. \
Please take a step back and confirm you're making forward progress. \
If not, analyze your previous actions and try a different approach. \
Avoid repeating the same tool calls without new results."
)),
None,
None,
None,
)
.await?;
loop_detector.clear_after_feedback();
}
LoopAction::HardStop(detail) => {
sink.emit(EngineEvent::Warn {
message: format!(
"Loop guard: {detail} — model ignored feedback, stopping. \
Send a follow-up message to continue."
),
});
break Ok(());
}
}
iteration += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::sink::TestSink;
use crate::providers::{StreamChunk, TokenUsage, ToolCall};
use crate::trust::TrustMode;
use tokio::sync::mpsc;
fn test_tools(root: &Path) -> ToolRegistry {
ToolRegistry::new(root.to_path_buf(), 100_000)
}
async fn run_collect(
chunks: Vec<StreamChunk>,
cancel: Option<CancellationToken>,
) -> StreamResult {
let (tx, mut rx) = mpsc::channel(32);
let sink = TestSink::new();
let cancel = cancel.unwrap_or_default();
let tmp = tempfile::tempdir().unwrap();
let tools = test_tools(tmp.path());
tokio::spawn(async move {
for chunk in chunks {
let _ = tx.send(chunk).await;
}
});
collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await
}
#[tokio::test]
async fn collect_stream_accumulates_text_deltas() {
let result = run_collect(
vec![
StreamChunk::TextDelta("Hello ".into()),
StreamChunk::TextDelta("world!".into()),
StreamChunk::Done(TokenUsage::default()),
],
None,
)
.await;
assert_eq!(result.text, "Hello world!");
assert!(!result.interrupted);
assert!(result.network_error.is_none());
assert!(result.tool_calls.is_empty());
assert_eq!(result.char_count, 12);
}
#[tokio::test]
async fn collect_stream_empty_stream_returns_empty() {
let result = run_collect(vec![StreamChunk::Done(TokenUsage::default())], None).await;
assert!(result.text.is_empty());
assert!(!result.interrupted);
assert!(result.tool_calls.is_empty());
}
#[tokio::test]
async fn collect_stream_preserves_usage_from_done() {
let usage = TokenUsage {
prompt_tokens: 42,
completion_tokens: 17,
stop_reason: "end_turn".into(),
..Default::default()
};
let result = run_collect(
vec![
StreamChunk::TextDelta("hi".into()),
StreamChunk::Done(usage),
],
None,
)
.await;
assert_eq!(result.usage.prompt_tokens, 42);
assert_eq!(result.usage.completion_tokens, 17);
assert_eq!(result.usage.stop_reason, "end_turn");
}
#[tokio::test]
async fn collect_stream_thinking_then_text() {
let result = run_collect(
vec![
StreamChunk::ThinkingDelta("Let me think...".into()),
StreamChunk::TextDelta("Answer!".into()),
StreamChunk::Done(TokenUsage::default()),
],
None,
)
.await;
assert_eq!(result.thinking_content, "Let me think...");
assert_eq!(result.text, "Answer!");
}
#[tokio::test]
async fn collect_stream_tool_calls_batch() {
let tc = ToolCall {
id: "tc_1".into(),
function_name: "Bash".into(),
arguments: r#"{"command":"echo hi"}"#.into(),
thought_signature: None,
};
let result = run_collect(
vec![
StreamChunk::ToolCalls(vec![tc]),
StreamChunk::Done(TokenUsage::default()),
],
None,
)
.await;
assert_eq!(result.tool_calls.len(), 1);
assert_eq!(result.tool_calls[0].function_name, "Bash");
assert!(result.text.is_empty());
}
#[tokio::test]
async fn collect_stream_eager_executes_read_only_tool() {
let tmp = tempfile::tempdir().unwrap();
let test_file = tmp.path().join("hello.txt");
std::fs::write(&test_file, "file content").unwrap();
let tc = ToolCall {
id: "tc_eager".into(),
function_name: "Read".into(),
arguments: serde_json::json!({"file_path": test_file.to_string_lossy()}).to_string(),
thought_signature: None,
};
let (tx, mut rx) = mpsc::channel(32);
let sink = TestSink::new();
let cancel = CancellationToken::new();
let tools = test_tools(tmp.path());
tokio::spawn(async move {
let _ = tx.send(StreamChunk::ToolCallReady(tc)).await;
let _ = tx.send(StreamChunk::ToolCalls(vec![])).await;
let _ = tx.send(StreamChunk::Done(TokenUsage::default())).await;
});
let result =
collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
assert_eq!(result.tool_calls.len(), 1, "tool call should be recorded");
assert_eq!(result.eager_results.len(), 1, "should have 1 eager result");
let (id, output, success, _) = &result.eager_results[0];
assert_eq!(id, "tc_eager");
assert!(output.contains("file content"), "eager result: {output}");
assert!(success);
}
#[tokio::test]
async fn collect_stream_does_not_eagerly_execute_mutating_tool() {
let tc = ToolCall {
id: "tc_write".into(),
function_name: "Write".into(),
arguments: r#"{"file_path":"/tmp/x","content":"y"}"#.into(),
thought_signature: None,
};
let result = run_collect(
vec![
StreamChunk::ToolCallReady(tc),
StreamChunk::ToolCalls(vec![]),
StreamChunk::Done(TokenUsage::default()),
],
None,
)
.await;
assert_eq!(result.tool_calls.len(), 1);
assert!(
result.eager_results.is_empty(),
"Write should NOT be eagerly executed"
);
}
#[tokio::test]
async fn collect_stream_cancellation_sets_interrupted() {
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let (tx, mut rx) = mpsc::channel(32);
let sink = TestSink::new();
let tmp = tempfile::tempdir().unwrap();
let tools = test_tools(tmp.path());
tokio::spawn(async move {
let _ = tx.send(StreamChunk::TextDelta("partial".into())).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
cancel_clone.cancel();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = tx.send(StreamChunk::TextDelta(" ignored".into())).await;
});
let result =
collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
assert!(result.interrupted);
assert!(result.network_error.is_none());
assert!(result.text.contains("partial"));
}
#[tokio::test]
async fn collect_stream_network_error_preserves_partial() {
let result = run_collect(
vec![
StreamChunk::TextDelta("partial response".into()),
StreamChunk::NetworkError("connection reset".into()),
],
None,
)
.await;
assert!(!result.interrupted);
assert_eq!(result.network_error.as_deref(), Some("connection reset"));
assert_eq!(result.text, "partial response");
}
#[tokio::test]
async fn collect_stream_network_error_with_no_text() {
let result = run_collect(vec![StreamChunk::NetworkError("timeout".into())], None).await;
assert!(result.text.is_empty());
assert!(result.network_error.is_some());
}
}