use crate::approval_flow::{handle_ask_user, request_approval};
use crate::config::KodaConfig;
use crate::db::{Database, Role};
use crate::engine::{ApprovalDecision, EngineCommand, EngineEvent};
use crate::file_tracker::FileTracker;
use crate::persistence::Persistence;
use crate::preview;
use crate::providers::ToolCall;
use crate::sub_agent_cache::SubAgentCache;
use crate::sub_agent_dispatch;
use crate::tools;
use crate::trust::{self, ToolApproval, TrustMode};
use anyhow::Result;
use std::path::{Path, PathBuf};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
#[allow(clippy::too_many_arguments)]
pub(crate) async fn record_tool_result(
tc: &ToolCall,
result: &str,
success: bool,
full_output: Option<&str>,
db: &Database,
session_id: &str,
max_result_chars: usize,
project_root: &Path,
file_tracker: &mut FileTracker,
sink: &dyn crate::engine::EngineSink,
) -> Result<()> {
sink.emit(EngineEvent::ToolCallResult {
id: tc.id.clone(),
name: tc.function_name.clone(),
output: result.to_string(),
});
if let Some(full) = full_output {
db.insert_tool_message_with_full(session_id, result, &tc.id, full)
.await?;
} else {
let stored = truncate_for_history(result, max_result_chars);
db.insert_message(
session_id,
&Role::Tool,
Some(&stored),
None,
Some(&tc.id),
None,
)
.await?;
}
let parsed_args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
track_file_lifecycle(
&tc.function_name,
&parsed_args,
project_root,
file_tracker,
success,
)
.await;
Ok(())
}
fn truncate_for_history(output: &str, max_chars: usize) -> String {
if output.len() <= max_chars {
return output.to_string();
}
let mut end = max_chars;
while end > 0 && !output.is_char_boundary(end) {
end -= 1;
}
format!(
"{}\n\n[...truncated {} chars. Re-read the file if you need the full content.]",
&output[..end],
output.len() - end
)
}
fn resolve_tool_path(
tool_name: &str,
args: &serde_json::Value,
project_root: &Path,
) -> Option<PathBuf> {
if !matches!(tool_name, "Write" | "Delete") {
return None;
}
crate::file_tracker::resolve_file_path_from_args(args, project_root)
}
async fn track_file_lifecycle(
tool_name: &str,
args: &serde_json::Value,
project_root: &Path,
file_tracker: &mut FileTracker,
success: bool,
) {
if !success {
return;
}
if let Some(path) = resolve_tool_path(tool_name, args, project_root) {
match tool_name {
"Write" => file_tracker.track_created(path).await,
"Delete" => file_tracker.untrack(&path).await,
_ => {}
}
}
}
pub(crate) fn can_parallelize(
tool_calls: &[ToolCall],
mode: TrustMode,
project_root: &Path,
file_tracker: Option<&crate::file_tracker::FileTracker>,
) -> bool {
let all_approved = !tool_calls.iter().any(|tc| {
let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
matches!(
trust::check_tool_with_tracker(
&tc.function_name,
&args,
mode,
Some(project_root),
file_tracker,
),
ToolApproval::NeedsConfirmation | ToolApproval::Blocked
)
});
if !all_approved {
return false;
}
let mut seen = std::collections::HashSet::new();
let has_conflict = tool_calls.iter().any(|tc| {
if !crate::tools::is_mutating_tool(&tc.function_name) {
return false;
}
let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
if let Some(path) = crate::undo::extract_file_path(&tc.function_name, &args) {
!seen.insert(path)
} else {
false
}
});
!has_conflict
}
#[tracing::instrument(skip_all, fields(tool = %tc.function_name))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_one_tool(
tc: &ToolCall,
project_root: &Path,
config: &KodaConfig,
db: &Database,
_session_id: &str,
tools: &crate::tools::ToolRegistry,
mode: TrustMode,
sink: &dyn crate::engine::EngineSink,
cancel: CancellationToken,
sub_agent_cache: &SubAgentCache,
bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
caller_spawner: Option<u32>,
) -> (String, String, bool, Option<String>) {
let (result, success, full_output) = if matches!(
tc.function_name.as_str(),
"ListBackgroundTasks" | "CancelTask" | "WaitTask"
) {
let r = crate::tools::bg_task_tools::execute(
&tc.function_name,
&tc.arguments,
bg_agents,
&tools.bg_registry,
caller_spawner,
)
.await;
(r.output, r.success, r.full_output)
} else if tc.function_name == "InvokeAgent" {
let (_, mut dummy_rx) = mpsc::channel(1);
let policy = tools.sandbox_policy().clone();
let read_cache = tools.file_read_cache();
let fut = sub_agent_dispatch::execute_sub_agent(
project_root,
config,
db,
&tc.arguments,
mode,
sink,
cancel.clone(),
&mut dummy_rx,
Some(read_cache),
sub_agent_cache,
_session_id,
bg_agents,
&policy,
caller_spawner,
None,
);
match Box::pin(fut).await {
Ok(output) => (output, true, None),
Err(e) => (format!("Error invoking sub-agent: {e}"), false, None),
}
} else {
if crate::tools::is_mutating_tool(&tc.function_name) {
sub_agent_cache.invalidate();
}
let streaming = if tc.function_name == "Bash" {
Some((sink, tc.id.as_str()))
} else {
None
};
let r = tools
.execute(&tc.function_name, &tc.arguments, streaming, caller_spawner)
.await;
(r.output, r.success, r.full_output)
};
(tc.id.clone(), result, success, full_output)
}
#[allow(clippy::too_many_arguments)]
async fn validate_then_execute_one_tool(
tc: &ToolCall,
project_root: &Path,
config: &KodaConfig,
db: &Database,
session_id: &str,
tools: &crate::tools::ToolRegistry,
mode: TrustMode,
sink: &dyn crate::engine::EngineSink,
cancel: CancellationToken,
sub_agent_cache: &SubAgentCache,
bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
caller_spawner: Option<u32>,
) -> (String, String, bool, Option<String>) {
let parsed_args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
let validation_error = tools::validate::validate_with_registry(
tools,
&tc.function_name,
&parsed_args,
project_root,
)
.await;
if let Some(error) = validation_error {
return (
tc.id.clone(),
format!("Validation error: {error}"),
false,
None,
);
}
execute_one_tool(
tc,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel,
sub_agent_cache,
bg_agents,
caller_spawner,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_tools_parallel(
tool_calls: &[ToolCall],
project_root: &Path,
config: &KodaConfig,
db: &Database,
session_id: &str,
tools: &crate::tools::ToolRegistry,
mode: TrustMode,
sink: &dyn crate::engine::EngineSink,
cancel: CancellationToken,
sub_agent_cache: &SubAgentCache,
file_tracker: &mut FileTracker,
bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
caller_spawner: Option<u32>,
) -> Result<()> {
let count = tool_calls.len();
sink.emit(EngineEvent::Info {
message: format!("Running {count} tools in parallel..."),
});
let futures: Vec<_> = tool_calls
.iter()
.map(|tc| {
validate_then_execute_one_tool(
tc,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
sub_agent_cache,
bg_agents,
caller_spawner,
)
})
.collect();
let results = futures_util::future::join_all(futures).await;
for (i, (tc_id, result, success, full_output)) in results.into_iter().enumerate() {
sink.emit(EngineEvent::ToolCallStart {
id: tc_id.clone(),
name: tool_calls[i].function_name.clone(),
args: serde_json::from_str(&tool_calls[i].arguments).unwrap_or_default(),
is_sub_agent: false,
});
record_tool_result(
&tool_calls[i],
&result,
success,
full_output.as_deref(),
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_tools_split_batch(
tool_calls: &[ToolCall],
project_root: &Path,
config: &KodaConfig,
db: &Database,
session_id: &str,
tools: &crate::tools::ToolRegistry,
mode: TrustMode,
sink: &dyn crate::engine::EngineSink,
cancel: CancellationToken,
cmd_rx: &mut mpsc::Receiver<EngineCommand>,
sub_agent_cache: &SubAgentCache,
file_tracker: &mut FileTracker,
bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
caller_spawner: Option<u32>,
) -> Result<()> {
let (parallel, sequential): (Vec<_>, Vec<_>) = tool_calls.iter().partition(|tc| {
let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
matches!(
trust::check_tool(&tc.function_name, &args, mode, Some(project_root),),
ToolApproval::AutoApprove
)
});
if parallel.len() > 1 {
sink.emit(EngineEvent::Info {
message: format!("Running {} tools in parallel...", parallel.len()),
});
let futures: Vec<_> = parallel
.iter()
.map(|tc| {
validate_then_execute_one_tool(
tc,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
sub_agent_cache,
bg_agents,
caller_spawner,
)
})
.collect();
let results = futures_util::future::join_all(futures).await;
for (j, (tc_id, result, success, full_output)) in results.into_iter().enumerate() {
sink.emit(EngineEvent::ToolCallStart {
id: tc_id.clone(),
name: parallel[j].function_name.clone(),
args: serde_json::from_str(¶llel[j].arguments).unwrap_or_default(),
is_sub_agent: false,
});
record_tool_result(
parallel[j],
&result,
success,
full_output.as_deref(),
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
}
} else {
for tc in ¶llel {
let calls = std::slice::from_ref(*tc);
execute_tools_sequential(
calls,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
cmd_rx,
sub_agent_cache,
file_tracker,
bg_agents,
caller_spawner,
)
.await?;
}
}
if !sequential.is_empty() {
let seq_calls: Vec<ToolCall> = sequential.into_iter().cloned().collect();
execute_tools_sequential(
&seq_calls,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
cmd_rx,
sub_agent_cache,
file_tracker,
bg_agents,
caller_spawner,
)
.await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_tools_sequential(
tool_calls: &[ToolCall],
project_root: &Path,
config: &KodaConfig,
db: &Database,
session_id: &str,
tools: &crate::tools::ToolRegistry,
mode: TrustMode,
sink: &dyn crate::engine::EngineSink,
cancel: CancellationToken,
cmd_rx: &mut mpsc::Receiver<EngineCommand>,
sub_agent_cache: &SubAgentCache,
file_tracker: &mut FileTracker,
bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
caller_spawner: Option<u32>,
) -> Result<()> {
for tc in tool_calls {
if cancel.is_cancelled() {
sink.emit(EngineEvent::Warn {
message: "Interrupted".into(),
});
return Ok(());
}
let parsed_args: serde_json::Value =
serde_json::from_str(&tc.arguments).unwrap_or_default();
sink.emit(EngineEvent::ToolCallStart {
id: tc.id.clone(),
name: tc.function_name.clone(),
args: parsed_args.clone(),
is_sub_agent: false,
});
if tc.function_name == "AskUser" {
let answer = handle_ask_user(sink, cmd_rx, &cancel, &parsed_args).await;
let result = match answer {
Some(text) if !text.trim().is_empty() => text,
Some(_) => "User did not provide an answer.".into(),
None => return Ok(()), };
record_tool_result(
tc,
&result,
true,
None, db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
continue;
}
if let Some(error) = tools::validate::validate_with_registry(
tools,
&tc.function_name,
&parsed_args,
project_root,
)
.await
{
record_tool_result(
tc,
&format!("Validation error: {error}"),
false,
None,
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
continue;
}
let approval = trust::check_tool_with_tracker(
&tc.function_name,
&parsed_args,
mode,
Some(project_root),
Some(file_tracker),
);
match approval {
ToolApproval::AutoApprove => {
}
ToolApproval::Blocked => {
let detail = tools::describe_action(&tc.function_name, &parsed_args);
let diff_preview =
preview::compute(&tc.function_name, &parsed_args, project_root).await;
sink.emit(EngineEvent::ActionBlocked {
tool_name: tc.function_name.clone(),
detail: detail.clone(),
preview: diff_preview,
});
db.insert_message(
session_id,
&Role::Tool,
Some("[safe mode] Action blocked. You are in read-only mode. DO NOT retry this command. Describe what you would do instead. The user must press Shift+Tab to switch to auto or strict mode."),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
ToolApproval::NeedsConfirmation => {
let detail = tools::describe_action(&tc.function_name, &parsed_args);
let diff_preview =
preview::compute(&tc.function_name, &parsed_args, project_root).await;
let effect = crate::trust::resolve_tool_effect_with_registry(
&tc.function_name,
&parsed_args,
tools,
);
match request_approval(
sink,
cmd_rx,
&cancel,
&tc.function_name,
&detail,
diff_preview,
effect,
)
.await
{
Some(ApprovalDecision::Approve) => {}
Some(ApprovalDecision::Reject) => {
db.insert_message(
session_id,
&Role::Tool,
Some("User rejected this action."),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
Some(ApprovalDecision::RejectWithFeedback { feedback }) => {
let result = format!("User rejected this action with feedback: {feedback}");
db.insert_message(
session_id,
&Role::Tool,
Some(&result),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
Some(ApprovalDecision::RejectAuto { reason }) => {
let result = format!("[auto-rejected: {reason}]");
db.insert_message(
session_id,
&Role::Tool,
Some(&result),
None,
Some(&tc.id),
None,
)
.await?;
continue;
}
None => {
return Ok(());
}
}
}
}
let (_, result, success, full_output) = execute_one_tool(
tc,
project_root,
config,
db,
session_id,
tools,
mode,
sink,
cancel.clone(),
sub_agent_cache,
bg_agents,
caller_spawner,
)
.await;
record_tool_result(
tc,
&result,
success,
full_output.as_deref(),
db,
session_id,
tools.caps.tool_result_chars,
project_root,
file_tracker,
sink,
)
.await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::ToolCall;
fn make_tool_call(name: &str) -> ToolCall {
ToolCall {
id: "t1".to_string(),
function_name: name.to_string(),
arguments: "{}".to_string(),
thought_signature: None,
}
}
#[test]
fn test_can_parallelize_read_only() {
let calls = vec![make_tool_call("Read"), make_tool_call("Grep")];
assert!(can_parallelize(
&calls,
TrustMode::Safe,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_cannot_parallelize_writes() {
let calls = vec![make_tool_call("Read"), make_tool_call("Write")];
assert!(!can_parallelize(
&calls,
TrustMode::Safe,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_cannot_parallelize_bash() {
let calls = vec![
make_tool_call("Read"),
ToolCall {
id: "t2".to_string(),
function_name: "Bash".to_string(),
arguments: r#"{"command": "rm -rf /tmp/test"}"#.to_string(),
thought_signature: None,
},
];
assert!(!can_parallelize(
&calls,
TrustMode::Safe,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_can_parallelize_agents() {
let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("InvokeAgent")];
assert!(can_parallelize(
&calls,
TrustMode::Safe,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_cannot_parallelize_same_file_edits() {
let calls = vec![
ToolCall {
id: "t1".to_string(),
function_name: "Edit".to_string(),
arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
thought_signature: None,
},
ToolCall {
id: "t2".to_string(),
function_name: "Edit".to_string(),
arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
thought_signature: None,
},
];
assert!(!can_parallelize(
&calls,
TrustMode::Auto, Path::new("/test/project"),
None,
));
}
#[test]
fn test_can_parallelize_different_file_edits() {
let calls = vec![
ToolCall {
id: "t1".to_string(),
function_name: "Edit".to_string(),
arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
thought_signature: None,
},
ToolCall {
id: "t2".to_string(),
function_name: "Edit".to_string(),
arguments: r#"{"file_path": "src/lib.rs"}"#.to_string(),
thought_signature: None,
},
];
assert!(can_parallelize(
&calls,
TrustMode::Auto,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_is_mutating_tool() {
assert!(crate::tools::is_mutating_tool("Write"));
assert!(crate::tools::is_mutating_tool("Edit"));
assert!(crate::tools::is_mutating_tool("Delete"));
assert!(crate::tools::is_mutating_tool("Bash"));
assert!(crate::tools::is_mutating_tool("MemoryWrite"));
assert!(!crate::tools::is_mutating_tool("Read"));
assert!(!crate::tools::is_mutating_tool("List"));
assert!(!crate::tools::is_mutating_tool("InvokeAgent"));
}
#[test]
fn test_mixed_batch_not_fully_parallelizable() {
let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("Write")];
assert!(!can_parallelize(
&calls,
TrustMode::Safe,
Path::new("/test/project"),
None,
));
}
#[test]
fn test_mixed_batch_fully_parallelizable_in_auto() {
let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("Write")];
assert!(can_parallelize(
&calls,
TrustMode::Auto,
Path::new("/test/project"),
None,
));
}
#[tokio::test]
async fn test_can_parallelize_delete_owned_file_uses_tracker() {
let dir = tempfile::TempDir::new().unwrap();
let db = crate::db::Database::open(&dir.path().join("test.db"))
.await
.unwrap();
let mut tracker = crate::file_tracker::FileTracker::new("test-sess", db).await;
let root = dir.path().join("project");
std::fs::create_dir_all(&root).unwrap();
let root = root.canonicalize().unwrap();
let owned_abs = root.join("temp_output.md");
std::fs::write(&owned_abs, "").unwrap();
tracker
.track_created(owned_abs.canonicalize().unwrap())
.await;
let calls = vec![
ToolCall {
id: "t1".to_string(),
function_name: "Read".to_string(),
arguments: r#"{"path": "other.txt"}"#.to_string(),
thought_signature: None,
},
ToolCall {
id: "t2".to_string(),
function_name: "Delete".to_string(),
arguments: r#"{"path": "temp_output.md"}"#.to_string(),
thought_signature: None,
},
];
assert!(
!can_parallelize(&calls, TrustMode::Safe, &root, None),
"sanity: without tracker, Delete must look like NeedsConfirmation"
);
assert!(
can_parallelize(&calls, TrustMode::Safe, &root, Some(&tracker)),
"with tracker, Delete of Koda-owned file must be \
parallel-eligible (matches sequential path classification)"
);
}
}