1use crate::approval_flow::{handle_ask_user, request_approval};
31use crate::config::KodaConfig;
32use crate::db::{Database, Role};
33use crate::engine::{ApprovalDecision, EngineCommand, EngineEvent};
34use crate::file_tracker::FileTracker;
35use crate::persistence::Persistence;
36use crate::preview;
37use crate::providers::ToolCall;
38use crate::sub_agent_cache::SubAgentCache;
39use crate::sub_agent_dispatch;
40use crate::tools;
41use crate::trust::{self, ToolApproval, TrustMode};
42
43use anyhow::Result;
44use std::path::{Path, PathBuf};
45use tokio::sync::mpsc;
46use tokio_util::sync::CancellationToken;
47
48#[allow(clippy::too_many_arguments)]
52pub(crate) async fn record_tool_result(
53 tc: &ToolCall,
54 result: &str,
55 success: bool,
56 full_output: Option<&str>,
57 db: &Database,
58 session_id: &str,
59 max_result_chars: usize,
60 project_root: &Path,
61 file_tracker: &mut FileTracker,
62 sink: &dyn crate::engine::EngineSink,
63) -> Result<()> {
64 sink.emit(EngineEvent::ToolCallResult {
65 id: tc.id.clone(),
66 name: tc.function_name.clone(),
67 output: result.to_string(),
68 });
69
70 if let Some(full) = full_output {
74 db.insert_tool_message_with_full(session_id, result, &tc.id, full)
75 .await?;
76 } else {
77 let stored = truncate_for_history(result, max_result_chars);
78 db.insert_message(
79 session_id,
80 &Role::Tool,
81 Some(&stored),
82 None,
83 Some(&tc.id),
84 None,
85 )
86 .await?;
87 }
88 crate::progress::track_progress(db, session_id, &tc.function_name, &tc.arguments, result).await;
89 let parsed_args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
90 track_file_lifecycle(
91 &tc.function_name,
92 &parsed_args,
93 project_root,
94 file_tracker,
95 success,
96 )
97 .await;
98 Ok(())
99}
100
101fn truncate_for_history(output: &str, max_chars: usize) -> String {
104 if output.len() <= max_chars {
105 return output.to_string();
106 }
107 let mut end = max_chars;
109 while end > 0 && !output.is_char_boundary(end) {
110 end -= 1;
111 }
112 format!(
113 "{}\n\n[...truncated {} chars. Re-read the file if you need the full content.]",
114 &output[..end],
115 output.len() - end
116 )
117}
118
119fn resolve_tool_path(
124 tool_name: &str,
125 args: &serde_json::Value,
126 project_root: &Path,
127) -> Option<PathBuf> {
128 if !matches!(tool_name, "Write" | "Delete") {
129 return None;
130 }
131 crate::file_tracker::resolve_file_path_from_args(args, project_root)
132}
133
134async fn track_file_lifecycle(
142 tool_name: &str,
143 args: &serde_json::Value,
144 project_root: &Path,
145 file_tracker: &mut FileTracker,
146 success: bool,
147) {
148 if !success {
149 return;
150 }
151 if let Some(path) = resolve_tool_path(tool_name, args, project_root) {
152 match tool_name {
153 "Write" => file_tracker.track_created(path).await,
154 "Delete" => file_tracker.untrack(&path).await,
155 _ => {}
156 }
157 }
158}
159
160pub(crate) fn can_parallelize(
161 tool_calls: &[ToolCall],
162 mode: TrustMode,
163 project_root: &Path,
164) -> bool {
165 let all_approved = !tool_calls.iter().any(|tc| {
166 let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
167 matches!(
168 trust::check_tool(&tc.function_name, &args, mode, Some(project_root)),
169 ToolApproval::NeedsConfirmation | ToolApproval::Blocked
170 )
171 });
172
173 if !all_approved {
174 return false;
175 }
176
177 let mut seen = std::collections::HashSet::new();
178 let has_conflict = tool_calls.iter().any(|tc| {
179 if !crate::tools::is_mutating_tool(&tc.function_name) {
180 return false;
181 }
182 let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
183 if let Some(path) = crate::undo::extract_file_path(&tc.function_name, &args) {
184 !seen.insert(path)
186 } else {
187 false
188 }
189 });
190
191 !has_conflict
192}
193
194#[tracing::instrument(skip_all, fields(tool = %tc.function_name))]
196#[allow(clippy::too_many_arguments)]
197pub(crate) async fn execute_one_tool(
198 tc: &ToolCall,
199 project_root: &Path,
200 config: &KodaConfig,
201 db: &Database,
202 _session_id: &str,
203 tools: &crate::tools::ToolRegistry,
204 mode: TrustMode,
205 sink: &dyn crate::engine::EngineSink,
206 cancel: CancellationToken,
207 sub_agent_cache: &SubAgentCache,
208 bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
209) -> (String, String, bool, Option<String>) {
210 let (result, success, full_output) = if tc.function_name == "InvokeAgent" {
211 match sub_agent_dispatch::execute_sub_agent(
213 project_root,
214 config,
215 db,
216 &tc.arguments,
217 mode,
218 sink,
219 cancel.clone(),
220 &mut mpsc::channel(1).1,
222 Some(tools.file_read_cache()),
223 sub_agent_cache,
224 _session_id,
225 bg_agents,
226 )
227 .await
228 {
229 Ok(output) => (output, true, None),
230 Err(e) => (format!("Error invoking sub-agent: {e}"), false, None),
231 }
232 } else {
233 if crate::tools::is_mutating_tool(&tc.function_name) {
235 sub_agent_cache.invalidate();
236 }
237 let streaming = if tc.function_name == "Bash" {
238 Some((sink, tc.id.as_str()))
239 } else {
240 None
241 };
242 let r = tools
243 .execute(&tc.function_name, &tc.arguments, streaming)
244 .await;
245 (r.output, r.success, r.full_output)
246 };
247
248 (tc.id.clone(), result, success, full_output)
249}
250
251#[allow(clippy::too_many_arguments)]
253pub(crate) async fn execute_tools_parallel(
254 tool_calls: &[ToolCall],
255 project_root: &Path,
256 config: &KodaConfig,
257 db: &Database,
258 session_id: &str,
259 tools: &crate::tools::ToolRegistry,
260 mode: TrustMode,
261 sink: &dyn crate::engine::EngineSink,
262 cancel: CancellationToken,
263 sub_agent_cache: &SubAgentCache,
264 file_tracker: &mut FileTracker,
265 bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
266) -> Result<()> {
267 let count = tool_calls.len();
268 sink.emit(EngineEvent::Info {
269 message: format!("Running {count} tools in parallel..."),
270 });
271
272 let futures: Vec<_> = tool_calls
274 .iter()
275 .map(|tc| {
276 execute_one_tool(
277 tc,
278 project_root,
279 config,
280 db,
281 session_id,
282 tools,
283 mode,
284 sink,
285 cancel.clone(),
286 sub_agent_cache,
287 bg_agents,
288 )
289 })
290 .collect();
291 let results = futures_util::future::join_all(futures).await;
292
293 for (i, (tc_id, result, success, full_output)) in results.into_iter().enumerate() {
295 sink.emit(EngineEvent::ToolCallStart {
296 id: tc_id.clone(),
297 name: tool_calls[i].function_name.clone(),
298 args: serde_json::from_str(&tool_calls[i].arguments).unwrap_or_default(),
299 is_sub_agent: false,
300 });
301 record_tool_result(
302 &tool_calls[i],
303 &result,
304 success,
305 full_output.as_deref(),
306 db,
307 session_id,
308 tools.caps.tool_result_chars,
309 project_root,
310 file_tracker,
311 sink,
312 )
313 .await?;
314 }
315 Ok(())
316}
317
318#[allow(clippy::too_many_arguments)]
325pub(crate) async fn execute_tools_split_batch(
326 tool_calls: &[ToolCall],
327 project_root: &Path,
328 config: &KodaConfig,
329 db: &Database,
330 session_id: &str,
331 tools: &crate::tools::ToolRegistry,
332 mode: TrustMode,
333 sink: &dyn crate::engine::EngineSink,
334 cancel: CancellationToken,
335 cmd_rx: &mut mpsc::Receiver<EngineCommand>,
336 sub_agent_cache: &SubAgentCache,
337 file_tracker: &mut FileTracker,
338 bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
339) -> Result<()> {
340 let (parallel, sequential): (Vec<_>, Vec<_>) = tool_calls.iter().partition(|tc| {
342 let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default();
343 matches!(
344 trust::check_tool(&tc.function_name, &args, mode, Some(project_root),),
345 ToolApproval::AutoApprove
346 )
347 });
348
349 if parallel.len() > 1 {
351 sink.emit(EngineEvent::Info {
352 message: format!("Running {} tools in parallel...", parallel.len()),
353 });
354
355 let futures: Vec<_> = parallel
356 .iter()
357 .map(|tc| {
358 execute_one_tool(
359 tc,
360 project_root,
361 config,
362 db,
363 session_id,
364 tools,
365 mode,
366 sink,
367 cancel.clone(),
368 sub_agent_cache,
369 bg_agents,
370 )
371 })
372 .collect();
373 let results = futures_util::future::join_all(futures).await;
374
375 for (j, (tc_id, result, success, full_output)) in results.into_iter().enumerate() {
376 sink.emit(EngineEvent::ToolCallStart {
377 id: tc_id.clone(),
378 name: parallel[j].function_name.clone(),
379 args: serde_json::from_str(¶llel[j].arguments).unwrap_or_default(),
380 is_sub_agent: false,
381 });
382 record_tool_result(
383 parallel[j],
384 &result,
385 success,
386 full_output.as_deref(),
387 db,
388 session_id,
389 tools.caps.tool_result_chars,
390 project_root,
391 file_tracker,
392 sink,
393 )
394 .await?;
395 }
396 } else {
397 for tc in ¶llel {
399 let calls = std::slice::from_ref(*tc);
400 execute_tools_sequential(
401 calls,
402 project_root,
403 config,
404 db,
405 session_id,
406 tools,
407 mode,
408 sink,
409 cancel.clone(),
410 cmd_rx,
411 sub_agent_cache,
412 file_tracker,
413 bg_agents,
414 )
415 .await?;
416 }
417 }
418
419 if !sequential.is_empty() {
421 let seq_calls: Vec<ToolCall> = sequential.into_iter().cloned().collect();
422 execute_tools_sequential(
423 &seq_calls,
424 project_root,
425 config,
426 db,
427 session_id,
428 tools,
429 mode,
430 sink,
431 cancel.clone(),
432 cmd_rx,
433 sub_agent_cache,
434 file_tracker,
435 bg_agents,
436 )
437 .await?;
438 }
439
440 Ok(())
441}
442
443#[allow(clippy::too_many_arguments)]
445pub(crate) async fn execute_tools_sequential(
446 tool_calls: &[ToolCall],
447 project_root: &Path,
448 config: &KodaConfig,
449 db: &Database,
450 session_id: &str,
451 tools: &crate::tools::ToolRegistry,
452 mode: TrustMode,
453 sink: &dyn crate::engine::EngineSink,
454 cancel: CancellationToken,
455 cmd_rx: &mut mpsc::Receiver<EngineCommand>,
456 sub_agent_cache: &SubAgentCache,
457 file_tracker: &mut FileTracker,
458 bg_agents: &std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
459) -> Result<()> {
460 for tc in tool_calls {
461 if cancel.is_cancelled() {
463 sink.emit(EngineEvent::Warn {
464 message: "Interrupted".into(),
465 });
466 return Ok(());
467 }
468
469 let parsed_args: serde_json::Value =
470 serde_json::from_str(&tc.arguments).unwrap_or_default();
471
472 sink.emit(EngineEvent::ToolCallStart {
473 id: tc.id.clone(),
474 name: tc.function_name.clone(),
475 args: parsed_args.clone(),
476 is_sub_agent: false,
477 });
478
479 if tc.function_name == "AskUser" {
482 let answer = handle_ask_user(sink, cmd_rx, &cancel, &parsed_args).await;
483 let result = match answer {
484 Some(text) if !text.trim().is_empty() => text,
485 Some(_) => "User did not provide an answer.".into(),
486 None => return Ok(()), };
488 record_tool_result(
489 tc,
490 &result,
491 true,
492 None, db,
494 session_id,
495 tools.caps.tool_result_chars,
496 project_root,
497 file_tracker,
498 sink,
499 )
500 .await?;
501 continue;
502 }
503
504 if let Some(error) = {
507 let cache = tools.file_read_cache();
508 let last_writer = tools.last_writer_cache();
509 let last_bash = tools.last_bash_cache();
510 tools::validate::validate_tool_call(
511 &tc.function_name,
512 &parsed_args,
513 project_root,
514 Some(&cache),
515 Some(&last_writer),
516 Some(&last_bash),
517 )
518 .await
519 } {
520 record_tool_result(
521 tc,
522 &format!("Validation error: {error}"),
523 false,
524 None,
525 db,
526 session_id,
527 tools.caps.tool_result_chars,
528 project_root,
529 file_tracker,
530 sink,
531 )
532 .await?;
533 continue;
534 }
535
536 let approval = trust::check_tool_with_tracker(
538 &tc.function_name,
539 &parsed_args,
540 mode,
541 Some(project_root),
542 Some(file_tracker),
543 );
544
545 match approval {
546 ToolApproval::AutoApprove => {
547 }
549 ToolApproval::Blocked => {
550 let detail = tools::describe_action(&tc.function_name, &parsed_args);
552 let diff_preview =
553 preview::compute(&tc.function_name, &parsed_args, project_root).await;
554 sink.emit(EngineEvent::ActionBlocked {
555 tool_name: tc.function_name.clone(),
556 detail: detail.clone(),
557 preview: diff_preview,
558 });
559 db.insert_message(
560 session_id,
561 &Role::Tool,
562 Some("[safe mode] Action blocked. You are in read-only mode. DO NOT retry this command. Describe what you would do instead. The user must press Shift+Tab to switch to auto or strict mode."),
563 None,
564 Some(&tc.id),
565 None,
566 )
567 .await?;
568 continue;
569 }
570 ToolApproval::NeedsConfirmation => {
571 let detail = tools::describe_action(&tc.function_name, &parsed_args);
572 let diff_preview =
573 preview::compute(&tc.function_name, &parsed_args, project_root).await;
574 let effect = crate::trust::resolve_tool_effect_with_registry(
575 &tc.function_name,
576 &parsed_args,
577 tools,
578 );
579
580 match request_approval(
581 sink,
582 cmd_rx,
583 &cancel,
584 &tc.function_name,
585 &detail,
586 diff_preview,
587 effect,
588 )
589 .await
590 {
591 Some(ApprovalDecision::Approve) => {}
592 Some(ApprovalDecision::Reject) => {
593 db.insert_message(
594 session_id,
595 &Role::Tool,
596 Some("User rejected this action."),
597 None,
598 Some(&tc.id),
599 None,
600 )
601 .await?;
602 continue;
603 }
604 Some(ApprovalDecision::RejectWithFeedback { feedback }) => {
605 let result = format!("User rejected this action with feedback: {feedback}");
606 db.insert_message(
607 session_id,
608 &Role::Tool,
609 Some(&result),
610 None,
611 Some(&tc.id),
612 None,
613 )
614 .await?;
615 continue;
616 }
617 None => {
618 return Ok(());
620 }
621 }
622 }
623 }
624
625 let (_, result, success, full_output) = execute_one_tool(
626 tc,
627 project_root,
628 config,
629 db,
630 session_id,
631 tools,
632 mode,
633 sink,
634 cancel.clone(),
635 sub_agent_cache,
636 bg_agents,
637 )
638 .await;
639 record_tool_result(
640 tc,
641 &result,
642 success,
643 full_output.as_deref(),
644 db,
645 session_id,
646 tools.caps.tool_result_chars,
647 project_root,
648 file_tracker,
649 sink,
650 )
651 .await?;
652 }
653 Ok(())
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use crate::providers::ToolCall;
660
661 fn make_tool_call(name: &str) -> ToolCall {
662 ToolCall {
663 id: "t1".to_string(),
664 function_name: name.to_string(),
665 arguments: "{}".to_string(),
666 thought_signature: None,
667 }
668 }
669
670 #[test]
671 fn test_can_parallelize_read_only() {
672 let calls = vec![make_tool_call("Read"), make_tool_call("Grep")];
673 assert!(can_parallelize(
674 &calls,
675 TrustMode::Safe,
676 Path::new("/test/project")
677 ));
678 }
679
680 #[test]
681 fn test_cannot_parallelize_writes() {
682 let calls = vec![make_tool_call("Read"), make_tool_call("Write")];
683 assert!(!can_parallelize(
684 &calls,
685 TrustMode::Safe,
686 Path::new("/test/project")
687 ));
688 }
689
690 #[test]
691 fn test_cannot_parallelize_bash() {
692 let calls = vec![
694 make_tool_call("Read"),
695 ToolCall {
696 id: "t2".to_string(),
697 function_name: "Bash".to_string(),
698 arguments: r#"{"command": "rm -rf /tmp/test"}"#.to_string(),
699 thought_signature: None,
700 },
701 ];
702 assert!(!can_parallelize(
703 &calls,
704 TrustMode::Safe,
705 Path::new("/test/project")
706 ));
707 }
708
709 #[test]
710 fn test_can_parallelize_agents() {
711 let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("InvokeAgent")];
712 assert!(can_parallelize(
713 &calls,
714 TrustMode::Safe,
715 Path::new("/test/project")
716 ));
717 }
718
719 #[test]
720 fn test_cannot_parallelize_same_file_edits() {
721 let calls = vec![
722 ToolCall {
723 id: "t1".to_string(),
724 function_name: "Edit".to_string(),
725 arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
726 thought_signature: None,
727 },
728 ToolCall {
729 id: "t2".to_string(),
730 function_name: "Edit".to_string(),
731 arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
732 thought_signature: None,
733 },
734 ];
735 assert!(!can_parallelize(
736 &calls,
737 TrustMode::Auto, Path::new("/test/project")
739 ));
740 }
741
742 #[test]
743 fn test_can_parallelize_different_file_edits() {
744 let calls = vec![
745 ToolCall {
746 id: "t1".to_string(),
747 function_name: "Edit".to_string(),
748 arguments: r#"{"file_path": "src/main.rs"}"#.to_string(),
749 thought_signature: None,
750 },
751 ToolCall {
752 id: "t2".to_string(),
753 function_name: "Edit".to_string(),
754 arguments: r#"{"file_path": "src/lib.rs"}"#.to_string(),
755 thought_signature: None,
756 },
757 ];
758 assert!(can_parallelize(
759 &calls,
760 TrustMode::Auto,
761 Path::new("/test/project")
762 ));
763 }
764
765 #[test]
766 fn test_is_mutating_tool() {
767 assert!(crate::tools::is_mutating_tool("Write"));
768 assert!(crate::tools::is_mutating_tool("Edit"));
769 assert!(crate::tools::is_mutating_tool("Delete"));
770 assert!(crate::tools::is_mutating_tool("Bash"));
771 assert!(crate::tools::is_mutating_tool("MemoryWrite"));
772 assert!(!crate::tools::is_mutating_tool("Read"));
773 assert!(!crate::tools::is_mutating_tool("List"));
774 assert!(!crate::tools::is_mutating_tool("InvokeAgent"));
776 }
777
778 #[test]
779 fn test_mixed_batch_not_fully_parallelizable() {
780 let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("Write")];
781 assert!(!can_parallelize(
782 &calls,
783 TrustMode::Safe,
784 Path::new("/test/project")
785 ));
786 }
787
788 #[test]
789 fn test_mixed_batch_fully_parallelizable_in_auto() {
790 let calls = vec![make_tool_call("InvokeAgent"), make_tool_call("Write")];
791 assert!(can_parallelize(
792 &calls,
793 TrustMode::Auto,
794 Path::new("/test/project")
795 ));
796 }
797}