use super::execution::{execute_tool_with_timeout, process_llm_tool_output};
use super::timeout::create_timeout_error;
use super::*;
use crate::agent::runloop::unified::state::CtrlCState;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::Notify;
use vtcode_core::config::PermissionsConfig;
use vtcode_core::config::constants::tools;
use vtcode_core::tools::registry::ToolRegistry;
use vtcode_core::tools::registry::ToolTimeoutCategory;
use vtcode_core::ui::inline_theme_from_core_styles;
use vtcode_core::ui::theme;
use vtcode_core::utils::ansi::AnsiRenderer;
use vtcode_tui::app::{InlineHandle, InlineSession, SessionOptions, spawn_session_with_options};
async fn create_test_registry(workspace: &std::path::Path) -> ToolRegistry {
ToolRegistry::new(workspace.to_path_buf()).await
}
fn create_test_renderer(
handle: &vtcode_tui::app::InlineHandle,
) -> vtcode_core::utils::ansi::AnsiRenderer {
AnsiRenderer::with_inline_ui(handle.clone(), Default::default())
}
fn create_headless_session() -> InlineSession {
let (command_tx, _command_rx) = tokio::sync::mpsc::unbounded_channel();
let (_event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
InlineSession {
handle: InlineHandle::new_for_tests(command_tx),
events: event_rx,
}
}
fn build_harness_state() -> crate::agent::runloop::unified::run_loop_context::HarnessTurnState {
build_harness_state_with(4)
}
fn build_harness_state_with(
max_tool_calls: usize,
) -> crate::agent::runloop::unified::run_loop_context::HarnessTurnState {
crate::agent::runloop::unified::run_loop_context::HarnessTurnState::new(
crate::agent::runloop::unified::run_loop_context::TurnRunId("test-run".to_string()),
crate::agent::runloop::unified::run_loop_context::TurnId("test-turn".to_string()),
max_tool_calls,
60,
0,
)
}
struct TestContext {
registry: ToolRegistry,
renderer: vtcode_core::utils::ansi::AnsiRenderer,
session: vtcode_tui::app::InlineSession,
handle: vtcode_tui::app::InlineHandle,
approval_recorder: vtcode_core::tools::ApprovalRecorder,
permissions_state: Arc<tokio::sync::RwLock<PermissionsConfig>>,
workspace: std::path::PathBuf,
}
impl TestContext {
async fn new() -> Self {
let tmp = tempfile::TempDir::new().unwrap();
let workspace = tmp.path().to_path_buf();
let registry = create_test_registry(&workspace).await;
let active_styles = theme::active_styles();
let theme_spec = inline_theme_from_core_styles(&active_styles);
let mut session = match spawn_session_with_options(
theme_spec,
SessionOptions {
inline_rows: 10,
workspace_root: Some(workspace.clone()),
..SessionOptions::default()
},
) {
Ok(session) => session,
Err(err) if err.to_string().contains("stdin is not a terminal") => {
create_headless_session()
}
Err(err) => panic!("failed to spawn test session: {err:#}"),
};
session.set_skip_confirmations(true);
let handle = session.clone_inline_handle();
let renderer = create_test_renderer(&handle);
let approval_recorder = vtcode_core::tools::ApprovalRecorder::new(workspace.clone());
let permissions_state = Arc::new(tokio::sync::RwLock::new(PermissionsConfig::default()));
Self {
registry,
renderer,
session,
handle,
approval_recorder,
permissions_state,
workspace,
}
}
}
mod run_tool_call;
#[tokio::test]
async fn test_execute_tool_with_timeout() {
let registry = ToolRegistry::new(std::env::current_dir().unwrap()).await;
let ctrl_c_state = Arc::new(CtrlCState::new());
let ctrl_c_notify = Arc::new(Notify::new());
let result = execute_tool_with_timeout(
®istry,
"test_tool",
json!({}),
&ctrl_c_state,
&ctrl_c_notify,
None,
0,
)
.await;
match result {
ToolExecutionStatus::Failure { .. } => {
}
ToolExecutionStatus::Success { ref output, .. } => {
if output.get("error").is_some() {
} else {
panic!("Expected tool to return error object for unknown tool");
}
}
other => panic!("Unexpected result type: {:?}", other),
}
}
#[tokio::test]
async fn test_ask_questions_alias_is_rejected() {
let tmp = tempfile::TempDir::new().unwrap();
let registry = ToolRegistry::new(tmp.path().to_path_buf()).await;
let tool = registry.get_tool(tools::ASK_QUESTIONS);
assert!(tool.is_none());
}
#[tokio::test]
async fn test_ask_user_question_alias_is_rejected() {
let tmp = tempfile::TempDir::new().unwrap();
let registry = ToolRegistry::new(tmp.path().to_path_buf()).await;
let tool = registry.get_tool(tools::ASK_USER_QUESTION);
assert!(tool.is_none());
}
#[test]
fn test_process_tool_output() {
let output = json!({
"exit_code": 0,
"stdout": "test output",
"modified_files": ["file1.txt", "file2.txt"],
"has_more": false
});
let status = process_llm_tool_output(output);
if let ToolExecutionStatus::Success {
output: _,
stdout,
modified_files,
command_success,
..
} = status
{
assert_eq!(stdout, Some("test output".to_string()));
assert_eq!(modified_files, vec!["file1.txt", "file2.txt"]);
assert!(command_success);
} else {
panic!("Expected Success variant");
}
}
#[test]
fn test_process_tool_output_loop_detection() {
let output = json!({
"error": {
"tool_name": "read_file",
"error_type": "PolicyViolation",
"message": "Tool 'read_file' blocked after 5 identical invocations in recent history (limit: 5)",
"is_recoverable": false,
"recovery_suggestions": [],
"original_error": null
},
"loop_detected": true,
"repeat_count": 5,
"tool": "read_file"
});
let status = process_llm_tool_output(output);
if let ToolExecutionStatus::Failure { error } = status {
let error_msg = error.to_string();
assert!(error_msg.contains("LOOP DETECTION"));
assert!(error_msg.contains("read_file"));
assert!(error_msg.contains("5"));
assert!(error_msg.contains("DO NOT retry"));
assert!(error_msg.contains("ACTION REQUIRED"));
} else {
panic!(
"Expected Failure variant for loop detection, got: {:?}",
status
);
}
}
#[test]
fn test_create_timeout_error() {
let status = create_timeout_error(
"test_tool",
ToolTimeoutCategory::Default,
Some(Duration::from_secs(42)),
);
if let ToolExecutionStatus::Timeout { error } = status {
assert!(error.message.contains("test_tool"));
assert!(error.message.contains("timeout ceiling"));
assert!(error.message.contains("42"));
} else {
panic!("Expected Timeout variant");
}
}