use crate::{
capabilities::CapabilitySet, checkpoint::Checkpoint, CostBudget, Runtime, ToolExecutor,
};
use car_ir::*;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
struct TestExecutor;
#[async_trait::async_trait]
impl ToolExecutor for TestExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
match tool {
"add" => {
let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0);
let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(Value::from(a + b))
}
"echo" => {
let msg = params.get("message").and_then(|v| v.as_str()).unwrap_or("");
Ok(Value::from(msg))
}
"fail" => Err("boom".to_string()),
"slow" => {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(Value::from("done"))
}
_ => Err(format!("unknown tool: {}", tool)),
}
}
}
fn make_runtime() -> Runtime {
Runtime::new().with_executor(Arc::new(TestExecutor))
}
async fn setup_runtime() -> Runtime {
let rt = make_runtime();
rt.register_tool("add").await;
rt.register_tool("echo").await;
rt.register_tool("fail").await;
rt.register_tool("slow").await;
rt
}
fn tool_call(tool: &str, params: HashMap<String, Value>) -> Action {
Action {
id: uuid::Uuid::new_v4().simple().to_string()[..12].to_string(),
action_type: ActionType::ToolCall,
tool: Some(tool.to_string()),
parameters: params,
preconditions: vec![],
expected_effects: HashMap::new(),
state_dependencies: vec![],
idempotent: false,
max_retries: 3,
failure_behavior: FailureBehavior::Abort,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn state_write(key: &str, value: Value) -> Action {
Action {
id: uuid::Uuid::new_v4().simple().to_string()[..12].to_string(),
action_type: ActionType::StateWrite,
tool: None,
parameters: [
("key".to_string(), Value::from(key)),
("value".to_string(), value),
]
.into(),
preconditions: vec![],
expected_effects: HashMap::new(),
state_dependencies: vec![],
idempotent: false,
max_retries: 3,
failure_behavior: FailureBehavior::Abort,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn proposal(actions: Vec<Action>) -> ActionProposal {
ActionProposal {
id: "test-proposal".to_string(),
source: "test".to_string(),
actions,
timestamp: chrono::Utc::now(),
context: HashMap::new(),
}
}
#[tokio::test]
async fn test_tool_call_succeeds() {
let rt = setup_runtime().await;
let p = proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
assert_eq!(result.results[0].output, Some(Value::from(3)));
}
#[tokio::test]
async fn test_agent_basics_read_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("note.txt");
fs::write(&path, "alpha\nbeta\ngamma\n").unwrap();
let rt = Runtime::new();
rt.register_agent_basics().await;
let result = rt
.execute(&proposal(vec![tool_call(
"read_file",
[("path".to_string(), Value::from(path.display().to_string()))].into(),
)]))
.await;
assert!(result.all_succeeded());
let output = result.results[0].output.as_ref().unwrap();
assert_eq!(
output.get("content"),
Some(&Value::from("alpha\nbeta\ngamma\n"))
);
assert_eq!(output.get("total_lines"), Some(&Value::from(3)));
}
#[tokio::test]
async fn test_agent_basics_write_and_edit_file() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("story.txt");
let rt = Runtime::new();
rt.register_agent_basics().await;
let write = tool_call(
"write_file",
[
(
"path".to_string(),
Value::from(file_path.display().to_string()),
),
("content".to_string(), Value::from("hello world")),
]
.into(),
);
let edit = tool_call(
"edit_file",
[
(
"path".to_string(),
Value::from(file_path.display().to_string()),
),
("old_text".to_string(), Value::from("world")),
("new_text".to_string(), Value::from("car")),
]
.into(),
);
let result = rt.execute(&proposal(vec![write, edit])).await;
assert!(result.all_succeeded());
assert_eq!(fs::read_to_string(file_path).unwrap(), "hello car");
}
#[tokio::test]
async fn test_agent_basics_find_and_grep_files() {
let dir = tempfile::tempdir().unwrap();
let src_dir = dir.path().join("src");
fs::create_dir_all(&src_dir).unwrap();
fs::write(src_dir.join("lib.rs"), "fn main() {}\n").unwrap();
fs::write(src_dir.join("data.txt"), "hello car runtime\n").unwrap();
let rt = Runtime::new();
rt.register_agent_basics().await;
let find = tool_call(
"find_files",
[
("pattern".to_string(), Value::from("*.rs")),
(
"path".to_string(),
Value::from(dir.path().display().to_string()),
),
]
.into(),
);
let grep = tool_call(
"grep_files",
[
("pattern".to_string(), Value::from("car")),
(
"path".to_string(),
Value::from(dir.path().display().to_string()),
),
]
.into(),
);
let result = rt.execute(&proposal(vec![find, grep])).await;
assert!(result.all_succeeded());
assert_eq!(
result.results[0].output.as_ref().unwrap()["count"],
Value::from(1)
);
assert_eq!(
result.results[1].output.as_ref().unwrap()["count"],
Value::from(1)
);
}
#[tokio::test]
async fn test_state_write_and_read() {
let rt = setup_runtime().await;
let p = proposal(vec![state_write("x", Value::from(42)), {
let mut a = state_write("unused", Value::Null);
a.action_type = ActionType::StateRead;
a.parameters = [("key".to_string(), Value::from("x"))].into();
a.state_dependencies = vec!["x".to_string()];
a
}]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
assert_eq!(result.results[1].output, Some(Value::from(42)));
}
#[tokio::test]
async fn test_unknown_tool_rejected() {
let rt = Runtime::new().with_executor(Arc::new(TestExecutor));
let p = proposal(vec![tool_call("nonexistent", HashMap::new())]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
}
#[tokio::test]
async fn test_precondition_blocks() {
let rt = setup_runtime().await;
let mut action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
action.preconditions = vec![Precondition {
key: "auth".to_string(),
operator: "eq".to_string(),
value: Value::Bool(true),
description: String::new(),
}];
let result = rt.execute(&proposal(vec![action])).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
}
#[tokio::test]
async fn test_precondition_passes() {
let rt = setup_runtime().await;
rt.state.set("auth", Value::Bool(true), "setup");
let mut action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
action.preconditions = vec![Precondition {
key: "auth".to_string(),
operator: "eq".to_string(),
value: Value::Bool(true),
description: String::new(),
}];
let result = rt.execute(&proposal(vec![action])).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_abort_stops_remaining() {
let rt = setup_runtime().await;
let mut fail_action = tool_call("fail", HashMap::new());
fail_action.failure_behavior = FailureBehavior::Abort;
let echo_action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
let result = rt.execute(&proposal(vec![fail_action, echo_action])).await;
assert_eq!(result.results[0].status, ActionStatus::Failed);
assert_eq!(result.results[1].status, ActionStatus::Skipped);
}
#[tokio::test]
async fn test_skip_continues() {
let rt = setup_runtime().await;
let mut fail_action = tool_call("fail", HashMap::new());
fail_action.failure_behavior = FailureBehavior::Skip;
let echo_action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
let result = rt.execute(&proposal(vec![fail_action, echo_action])).await;
assert_eq!(result.results[0].status, ActionStatus::Skipped);
assert_eq!(result.results[1].status, ActionStatus::Succeeded);
}
#[tokio::test]
async fn test_abort_rolls_back_state() {
let rt = setup_runtime().await;
let mut fail_action = tool_call("fail", HashMap::new());
fail_action.failure_behavior = FailureBehavior::Abort;
let p = proposal(vec![state_write("x", Value::from(1)), fail_action]);
rt.execute(&p).await;
assert_eq!(rt.state.get("x"), None);
}
#[tokio::test]
async fn test_timeout() {
let rt = setup_runtime().await;
let mut action = tool_call("slow", HashMap::new());
action.timeout_ms = Some(50);
let result = rt.execute(&proposal(vec![action])).await;
assert_eq!(result.results[0].status, ActionStatus::Failed);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("timed out"));
}
#[tokio::test]
async fn test_idempotent_cached() {
let rt = setup_runtime().await;
let mut action1 = tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
);
action1.idempotent = true;
rt.execute(&proposal(vec![action1.clone()])).await;
let mut action2 = action1.clone();
action2.id = "second".to_string();
let result = rt.execute(&proposal(vec![action2])).await;
assert_eq!(result.results[0].output, Some(Value::from(3)));
assert_eq!(result.results[0].duration_ms, Some(0.0));
}
#[tokio::test]
async fn test_retry_succeeds() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct FlakyExecutor(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for FlakyExecutor {
async fn execute(&self, _tool: &str, _params: &Value) -> Result<Value, String> {
let n = self.0.fetch_add(1, Ordering::SeqCst) + 1;
if n < 3 {
Err("not yet".to_string())
} else {
Ok(Value::from("ok"))
}
}
}
let rt = Runtime::new().with_executor(Arc::new(FlakyExecutor(cc)));
rt.register_tool("flaky").await;
let mut action = tool_call("flaky", HashMap::new());
action.failure_behavior = FailureBehavior::Retry;
action.max_retries = 3;
let result = rt.execute(&proposal(vec![action])).await;
assert_eq!(result.results[0].status, ActionStatus::Succeeded);
assert_eq!(call_count.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_policy_blocks() {
let rt = setup_runtime().await;
{
let mut policies = rt.policies.write().await;
policies.register(
"no_echo",
Box::new(|action, _state| {
if action.tool.as_deref() == Some("echo") {
Some("echo forbidden".to_string())
} else {
None
}
}),
"",
);
}
let action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
let result = rt.execute(&proposal(vec![action])).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("forbidden"));
}
#[tokio::test]
async fn test_session_policy_denies_what_global_allows() {
let rt = setup_runtime().await;
let session_id = rt.open_session().await;
rt.register_policy_in_session(
&session_id,
"no_echo_in_session",
Box::new(|action, _state| {
if action.tool.as_deref() == Some("echo") {
Some("echo forbidden in this session".to_string())
} else {
None
}
}),
"Session-scoped echo deny",
)
.await
.unwrap();
let global = rt
.execute(&proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hi"))].into(),
)]))
.await;
assert_eq!(global.results[0].status, ActionStatus::Succeeded);
let scoped = rt
.execute_with_session(
&proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hi"))].into(),
)]),
&session_id,
)
.await;
assert_eq!(scoped.results[0].status, ActionStatus::Rejected);
assert!(scoped.results[0]
.error
.as_ref()
.unwrap()
.contains("forbidden in this session"));
}
#[tokio::test]
async fn test_global_policy_still_enforced_under_session() {
let rt = setup_runtime().await;
{
let mut policies = rt.policies.write().await;
policies.register(
"global_no_fail",
Box::new(|action, _state| {
if action.tool.as_deref() == Some("fail") {
Some("fail tool blocked globally".to_string())
} else {
None
}
}),
"Global block on fail",
);
}
let session_id = rt.open_session().await;
let result = rt
.execute_with_session(
&proposal(vec![tool_call("fail", HashMap::new())]),
&session_id,
)
.await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("blocked globally"));
}
#[tokio::test]
async fn test_close_session_drops_its_policies() {
let rt = setup_runtime().await;
let session_id = rt.open_session().await;
rt.register_policy_in_session(
&session_id,
"no_add",
Box::new(|action, _state| {
if action.tool.as_deref() == Some("add") {
Some("add denied in session".to_string())
} else {
None
}
}),
"",
)
.await
.unwrap();
let denied = rt
.execute_with_session(
&proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]),
&session_id,
)
.await;
assert_eq!(denied.results[0].status, ActionStatus::Rejected);
let removed = rt.close_session(&session_id).await;
assert!(removed);
assert!(!rt.session_exists(&session_id).await);
assert!(!rt.close_session(&session_id).await);
let allowed = rt
.execute(&proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]))
.await;
assert_eq!(allowed.results[0].status, ActionStatus::Succeeded);
}
#[tokio::test]
async fn test_unknown_session_id_rejects_actions() {
let rt = setup_runtime().await;
let result = rt
.execute_with_session(
&proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("x"))].into(),
)]),
"no-such-session",
)
.await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("unknown session id"));
}
#[tokio::test]
async fn test_register_in_unknown_session_errors() {
let rt = setup_runtime().await;
let err = rt
.register_policy_in_session(
"ghost",
"p",
Box::new(|_, _| None),
"",
)
.await
.expect_err("should refuse to register against unknown session");
assert!(err.contains("unknown session id"));
}
#[tokio::test]
async fn test_global_only_path_unchanged_when_no_session_used() {
let rt = setup_runtime().await;
let action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
let result = rt.execute(&proposal(vec![action])).await;
assert_eq!(result.results[0].status, ActionStatus::Succeeded);
}
#[tokio::test]
async fn test_expected_effects() {
let rt = setup_runtime().await;
let mut action = tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
);
action.expected_effects = [("result".to_string(), Value::from(3))].into();
rt.execute(&proposal(vec![action])).await;
assert_eq!(rt.state.get("result"), Some(Value::from(3)));
}
#[tokio::test]
async fn test_cost_summary_computed() {
let rt = setup_runtime().await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
state_write("x", Value::from(42)),
tool_call("echo", [("message".to_string(), Value::from("hi"))].into()),
]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
assert_eq!(result.cost.tool_calls, 2);
assert_eq!(result.cost.actions_executed, 3);
assert_eq!(result.cost.actions_skipped, 0);
assert!(result.cost.total_duration_ms > 0.0);
assert_eq!(result.cost.retries, 0);
}
#[tokio::test]
async fn test_cost_summary_with_skipped() {
let rt = setup_runtime().await;
let mut fail_action = tool_call("fail", HashMap::new());
fail_action.failure_behavior = FailureBehavior::Abort;
let echo_action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
let result = rt.execute(&proposal(vec![fail_action, echo_action])).await;
assert_eq!(result.cost.actions_executed, 1);
assert_eq!(result.cost.actions_skipped, 1);
assert_eq!(result.cost.tool_calls, 0); }
#[tokio::test]
async fn test_cost_budget_max_actions() {
let rt = setup_runtime().await;
rt.set_cost_budget(CostBudget {
max_tool_calls: None,
max_duration_ms: None,
max_actions: Some(1),
})
.await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
tool_call("echo", [("message".to_string(), Value::from("hi"))].into()),
]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Succeeded);
assert_eq!(result.results[1].status, ActionStatus::Skipped);
assert!(result.results[1]
.error
.as_ref()
.unwrap()
.contains("cost budget exceeded"));
assert_eq!(result.cost.actions_executed, 1);
assert_eq!(result.cost.actions_skipped, 1);
}
#[tokio::test]
async fn test_cost_budget_max_tool_calls() {
let rt = setup_runtime().await;
rt.set_cost_budget(CostBudget {
max_tool_calls: Some(1),
max_duration_ms: None,
max_actions: None,
})
.await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
tool_call("echo", [("message".to_string(), Value::from("hi"))].into()),
]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Succeeded);
assert_eq!(result.results[1].status, ActionStatus::Skipped);
assert_eq!(result.cost.tool_calls, 1);
}
#[tokio::test]
async fn test_result_cache_hit_returns_stored_result() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct CountingExecutor(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for CountingExecutor {
async fn execute(&self, _tool: &str, params: &Value) -> Result<Value, String> {
self.0.fetch_add(1, Ordering::SeqCst);
let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0);
let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(Value::from(a + b))
}
}
let rt = Runtime::new().with_executor(Arc::new(CountingExecutor(cc)));
rt.register_tool("add").await;
rt.enable_tool_cache("add", 60).await;
let action = tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
);
let r1 = rt.execute(&proposal(vec![action.clone()])).await;
assert!(r1.all_succeeded());
assert_eq!(r1.results[0].output, Some(Value::from(3)));
assert_eq!(call_count.load(Ordering::SeqCst), 1);
let r2 = rt.execute(&proposal(vec![action])).await;
assert!(r2.all_succeeded());
assert_eq!(r2.results[0].output, Some(Value::from(3)));
assert_eq!(call_count.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_result_cache_expired_entries_return_none() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct CountingExecutor2(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for CountingExecutor2 {
async fn execute(&self, _tool: &str, _params: &Value) -> Result<Value, String> {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(Value::from("result"))
}
}
let rt = Runtime::new().with_executor(Arc::new(CountingExecutor2(cc)));
rt.register_tool("echo").await;
rt.enable_tool_cache("echo", 0).await;
let action = tool_call("echo", [("message".to_string(), Value::from("hi"))].into());
rt.execute(&proposal(vec![action.clone()])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 1);
rt.execute(&proposal(vec![action])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_result_cache_different_params_different_keys() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct CountingExecutor3(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for CountingExecutor3 {
async fn execute(&self, _tool: &str, params: &Value) -> Result<Value, String> {
self.0.fetch_add(1, Ordering::SeqCst);
let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0);
let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(Value::from(a + b))
}
}
let rt = Runtime::new().with_executor(Arc::new(CountingExecutor3(cc)));
rt.register_tool("add").await;
rt.enable_tool_cache("add", 60).await;
let action_a = tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
);
let action_b = tool_call(
"add",
[
("a".to_string(), Value::from(3)),
("b".to_string(), Value::from(4)),
]
.into(),
);
let r1 = rt.execute(&proposal(vec![action_a.clone()])).await;
assert_eq!(r1.results[0].output, Some(Value::from(3)));
let r2 = rt.execute(&proposal(vec![action_b.clone()])).await;
assert_eq!(r2.results[0].output, Some(Value::from(7)));
assert_eq!(call_count.load(Ordering::SeqCst), 2);
rt.execute(&proposal(vec![action_a])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_capability_default_allows_everything() {
let rt = setup_runtime().await;
rt.set_capabilities(CapabilitySet::new()).await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
state_write("x", Value::from(42)),
]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_capability_allowed_tool_passes() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().allow_tool("add");
rt.set_capabilities(caps).await;
let p = proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_capability_denied_tool_rejected() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().deny_tool("echo");
rt.set_capabilities(caps).await;
let p = proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hi"))].into(),
)]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("capability denied: tool 'echo' not allowed"));
}
#[tokio::test]
async fn test_capability_unlisted_tool_rejected() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().allow_tool("add");
rt.set_capabilities(caps).await;
let p = proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hi"))].into(),
)]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("capability denied: tool 'echo' not allowed"));
}
#[tokio::test]
async fn test_capability_deny_overrides_allow() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().allow_tool("echo").deny_tool("echo");
rt.set_capabilities(caps).await;
let p = proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hi"))].into(),
)]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
}
#[tokio::test]
async fn test_capability_state_key_allowed() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().allow_state_key("x");
rt.set_capabilities(caps).await;
let p = proposal(vec![state_write("x", Value::from(42))]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_capability_state_key_denied() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().allow_state_key("x");
rt.set_capabilities(caps).await;
let p = proposal(vec![state_write("secret", Value::from("nope"))]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("capability denied: state key 'secret' not allowed"));
}
#[tokio::test]
async fn test_capability_state_read_key_denied() {
let rt = setup_runtime().await;
rt.state.set("secret", Value::from(42), "setup");
let caps = CapabilitySet::new().allow_state_key("x");
rt.set_capabilities(caps).await;
let mut read_action = state_write("unused", Value::Null);
read_action.action_type = ActionType::StateRead;
read_action.parameters = [("key".to_string(), Value::from("secret"))].into();
let p = proposal(vec![read_action]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("capability denied: state key 'secret' not allowed"));
}
#[tokio::test]
async fn test_capability_max_actions_enforced() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().with_max_actions(1);
rt.set_capabilities(caps).await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
tool_call("echo", [("message".to_string(), Value::from("hi"))].into()),
]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert_eq!(result.results[1].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("capability denied"));
}
#[tokio::test]
async fn test_capability_max_actions_within_budget() {
let rt = setup_runtime().await;
let caps = CapabilitySet::new().with_max_actions(2);
rt.set_capabilities(caps).await;
let p = proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_no_capabilities_allows_everything() {
let rt = setup_runtime().await;
let p = proposal(vec![
tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
),
state_write("anything", Value::from(99)),
]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_result_cache_invalidate_clears_correctly() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct CountingExecutor4(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for CountingExecutor4 {
async fn execute(&self, _tool: &str, params: &Value) -> Result<Value, String> {
self.0.fetch_add(1, Ordering::SeqCst);
let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0);
let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(Value::from(a + b))
}
}
let rt = Runtime::new().with_executor(Arc::new(CountingExecutor4(cc)));
rt.register_tool("add").await;
rt.enable_tool_cache("add", 60).await;
let action = tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
);
rt.execute(&proposal(vec![action.clone()])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 1);
rt.result_cache.invalidate("add").await;
rt.execute(&proposal(vec![action])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_save_checkpoint_captures_state_and_tools() {
let rt = setup_runtime().await;
rt.state.set("key1", Value::from("value1"), "test");
rt.state.set("key2", Value::from(42), "test");
let cp = rt.save_checkpoint().await;
assert!(!cp.checkpoint_id.is_empty());
assert_eq!(cp.state.get("key1"), Some(&Value::from("value1")));
assert_eq!(cp.state.get("key2"), Some(&Value::from(42)));
assert!(cp.tools.contains(&"add".to_string()));
assert!(cp.tools.contains(&"echo".to_string()));
assert!(cp.tools.contains(&"fail".to_string()));
assert!(cp.tools.contains(&"slow".to_string()));
assert_eq!(cp.tools.len(), 4);
}
#[tokio::test]
async fn test_restore_checkpoint_restores_state_and_tools() {
let rt = Runtime::new();
let cp = Checkpoint {
checkpoint_id: "test-cp".to_string(),
created_at: chrono::Utc::now(),
state: [("restored_key".to_string(), Value::from("restored_value"))].into(),
events: vec![],
tools: vec!["tool_a".to_string(), "tool_b".to_string()],
metadata: HashMap::new(),
};
rt.restore_checkpoint(&cp).await;
assert_eq!(
rt.state.get("restored_key"),
Some(Value::from("restored_value"))
);
let tools = rt.tools.read().await;
assert!(tools.contains_key("tool_a"));
assert!(tools.contains_key("tool_b"));
assert_eq!(tools.len(), 2);
}
#[tokio::test]
async fn test_restore_checkpoint_clears_previous_tools() {
let rt = Runtime::new();
rt.register_tool("old_tool").await;
let cp = Checkpoint {
checkpoint_id: "test-cp".to_string(),
created_at: chrono::Utc::now(),
state: HashMap::new(),
events: vec![],
tools: vec!["new_tool".to_string()],
metadata: HashMap::new(),
};
rt.restore_checkpoint(&cp).await;
let tools = rt.tools.read().await;
assert!(!tools.contains_key("old_tool"));
assert!(tools.contains_key("new_tool"));
assert_eq!(tools.len(), 1);
}
#[tokio::test]
async fn test_checkpoint_file_roundtrip() {
let rt = setup_runtime().await;
rt.state
.set("persist_key", Value::from("persist_value"), "test");
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("checkpoint.json");
let path_str = path.to_str().unwrap();
rt.save_checkpoint_to_file(path_str).await.unwrap();
assert!(path.exists());
let rt2 = Runtime::new();
let loaded = rt2.load_checkpoint_from_file(path_str).await.unwrap();
assert_eq!(
rt2.state.get("persist_key"),
Some(Value::from("persist_value"))
);
let tools = rt2.tools.read().await;
assert!(tools.contains_key("add"));
assert!(tools.contains_key("echo"));
assert!(!loaded.checkpoint_id.is_empty());
assert_eq!(
loaded.state.get("persist_key"),
Some(&Value::from("persist_value"))
);
}
#[tokio::test]
async fn test_checkpoint_serializes_as_json() {
let cp = Checkpoint {
checkpoint_id: "json-test".to_string(),
created_at: chrono::Utc::now(),
state: [("k".to_string(), Value::from(1))].into(),
events: vec![serde_json::json!({"kind": "test"})],
tools: vec!["mytool".to_string()],
metadata: [("meta_key".to_string(), Value::from("meta_val"))].into(),
};
let json = serde_json::to_string(&cp).unwrap();
let deserialized: Checkpoint = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.checkpoint_id, "json-test");
assert_eq!(deserialized.state.get("k"), Some(&Value::from(1)));
assert_eq!(deserialized.tools, vec!["mytool".to_string()]);
assert_eq!(
deserialized.metadata.get("meta_key"),
Some(&Value::from("meta_val"))
);
assert_eq!(deserialized.events.len(), 1);
}
#[tokio::test]
async fn test_checkpoint_captures_events() {
let rt = setup_runtime().await;
let p = proposal(vec![tool_call(
"add",
[
("a".to_string(), Value::from(1)),
("b".to_string(), Value::from(2)),
]
.into(),
)]);
rt.execute(&p).await;
let cp = rt.save_checkpoint().await;
assert!(!cp.events.is_empty());
}
#[tokio::test]
async fn test_load_checkpoint_from_nonexistent_file() {
let rt = Runtime::new();
let result = rt
.load_checkpoint_from_file("/tmp/nonexistent_checkpoint_file.json")
.await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("read error"));
}
#[tokio::test]
async fn restore_checkpoint_replaces_state_completely() {
let rt = Runtime::new();
rt.state
.set("pre_existing", serde_json::json!("should_be_gone"), "setup");
rt.state
.set("shared_key", serde_json::json!("old_value"), "setup");
let checkpoint = Checkpoint {
checkpoint_id: "cp-replace".to_string(),
created_at: chrono::Utc::now(),
state: {
let mut m = std::collections::HashMap::new();
m.insert(
"shared_key".to_string(),
serde_json::json!("checkpoint_value"),
);
m.insert("checkpoint_only".to_string(), serde_json::json!(true));
m
},
events: vec![],
tools: vec![],
metadata: HashMap::new(),
};
rt.restore_checkpoint(&checkpoint).await;
assert_eq!(
rt.state.get("shared_key"),
Some(serde_json::json!("checkpoint_value"))
);
assert_eq!(
rt.state.get("checkpoint_only"),
Some(serde_json::json!(true))
);
assert_eq!(rt.state.get("pre_existing"), None);
}
#[tokio::test]
async fn restore_checkpoint_does_not_emit_synthetic_state_transitions() {
let rt = Runtime::new();
rt.state
.set("pre_existing", serde_json::json!("old"), "setup");
let _before = rt.state.transition_count();
let checkpoint = Checkpoint {
checkpoint_id: "cp-transitions".to_string(),
created_at: chrono::Utc::now(),
state: [("replacement".to_string(), serde_json::json!(1))].into(),
events: vec![],
tools: vec![],
metadata: HashMap::new(),
};
rt.restore_checkpoint(&checkpoint).await;
assert_eq!(rt.state.transition_count(), 0);
}
#[tokio::test]
async fn restore_checkpoint_clears_idempotency_cache() {
let call_count = Arc::new(AtomicU32::new(0));
let cc = call_count.clone();
struct CountingExecutor(Arc<AtomicU32>);
#[async_trait::async_trait]
impl ToolExecutor for CountingExecutor {
async fn execute(&self, _tool: &str, _params: &Value) -> Result<Value, String> {
self.0.fetch_add(1, Ordering::SeqCst);
Ok(Value::from(42))
}
}
let rt = Runtime::new().with_executor(Arc::new(CountingExecutor(cc)));
rt.register_tool("counter").await;
let mut action = tool_call("counter", HashMap::new());
action.idempotent = true;
rt.execute(&proposal(vec![action.clone()])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 1);
let mut action2 = action.clone();
action2.id = "second".to_string();
let cached_result = rt.execute(&proposal(vec![action2])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 1); assert_eq!(cached_result.results[0].duration_ms, Some(0.0));
let checkpoint = Checkpoint {
checkpoint_id: "cp-idem".to_string(),
created_at: chrono::Utc::now(),
state: HashMap::new(),
events: vec![],
tools: vec!["counter".to_string()],
metadata: HashMap::new(),
};
rt.restore_checkpoint(&checkpoint).await;
let mut action3 = action.clone();
action3.id = "third".to_string();
let fresh_result = rt.execute(&proposal(vec![action3])).await;
assert_eq!(call_count.load(Ordering::SeqCst), 2); assert_eq!(fresh_result.results[0].status, ActionStatus::Succeeded);
}
#[tokio::test]
async fn test_register_tool_backward_compat() {
let rt = Runtime::new();
rt.register_tool("echo").await;
let tools = rt.tools.read().await;
assert!(tools.contains_key("echo"));
let schema = tools.get("echo").unwrap();
assert_eq!(schema.name, "echo");
assert_eq!(schema.description, "");
assert!(!schema.idempotent);
assert!(schema.cache_ttl_secs.is_none());
assert!(schema.rate_limit.is_none());
}
#[tokio::test]
async fn test_register_tool_schema_full() {
let rt = make_runtime();
let schema = ToolSchema {
name: "add".to_string(),
description: "Add two numbers".to_string(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["a", "b"]
}),
returns: Some(serde_json::json!({"type": "number"})),
idempotent: true,
cache_ttl_secs: None,
rate_limit: None,
};
rt.register_tool_schema(schema).await;
let tools = rt.tools.read().await;
let s = tools.get("add").unwrap();
assert_eq!(s.description, "Add two numbers");
assert!(s.idempotent);
assert_eq!(
s.parameters
.get("required")
.unwrap()
.as_array()
.unwrap()
.len(),
2
);
}
#[tokio::test]
async fn test_tool_schemas_returns_all() {
let rt = Runtime::new();
rt.register_tool("a").await;
rt.register_tool("b").await;
rt.register_tool("c").await;
let schemas = rt.tool_schemas().await;
assert_eq!(schemas.len(), 3);
let names: Vec<String> = schemas.iter().map(|s| s.name.clone()).collect();
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
assert!(names.contains(&"c".to_string()));
}
#[tokio::test]
async fn test_schema_auto_configures_cache() {
let rt = make_runtime();
let schema = ToolSchema {
name: "cached_tool".to_string(),
description: String::new(),
parameters: Value::Object(Default::default()),
returns: None,
idempotent: true,
cache_ttl_secs: Some(120),
rate_limit: None,
};
rt.register_tool_schema(schema).await;
let params = serde_json::json!({"x": 1});
rt.result_cache
.put("cached_tool", ¶ms, Value::from(42))
.await;
let cached = rt.result_cache.get("cached_tool", ¶ms).await;
assert_eq!(cached, Some(Value::from(42)));
}
#[tokio::test]
async fn test_schema_auto_configures_rate_limit() {
let rt = make_runtime();
let schema = ToolSchema {
name: "limited_tool".to_string(),
description: String::new(),
parameters: Value::Object(Default::default()),
returns: None,
idempotent: false,
cache_ttl_secs: None,
rate_limit: Some(ToolRateLimit {
max_calls: 5,
interval_secs: 1.0,
}),
};
rt.register_tool_schema(schema).await;
rt.rate_limiter.acquire("limited_tool").await;
}
#[tokio::test]
async fn test_schema_parameter_validation_rejects_missing_required() {
let rt = make_runtime();
let schema = ToolSchema {
name: "strict_tool".to_string(),
description: String::new(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"x": {"type": "number"}
},
"required": ["x"]
}),
returns: None,
idempotent: false,
cache_ttl_secs: None,
rate_limit: None,
};
rt.register_tool_schema(schema).await;
let action = tool_call("strict_tool", HashMap::new());
let p = proposal(vec![action]);
let result = rt.execute(&p).await;
assert_eq!(result.results[0].status, ActionStatus::Rejected);
assert!(result.results[0]
.error
.as_ref()
.unwrap()
.contains("missing required parameter 'x'"));
}
use crate::executor::{ReplanCallback, ReplanConfig, ReplanContext};
struct MockReplanner {
call_count: AtomicU32,
}
#[async_trait::async_trait]
impl ReplanCallback for MockReplanner {
async fn replan(&self, _ctx: &ReplanContext) -> Result<ActionProposal, String> {
self.call_count.fetch_add(1, Ordering::SeqCst);
Ok(ActionProposal {
id: "replan-proposal".to_string(),
source: "replanner".to_string(),
actions: vec![tool_call(
"echo",
[("message".to_string(), Value::from("recovered"))].into(),
)],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
})
}
}
struct FailingReplanner;
#[async_trait::async_trait]
impl ReplanCallback for FailingReplanner {
async fn replan(&self, _ctx: &ReplanContext) -> Result<ActionProposal, String> {
Err("replanner failed".to_string())
}
}
struct PersistentlyFailingReplanner {
call_count: AtomicU32,
}
#[async_trait::async_trait]
impl ReplanCallback for PersistentlyFailingReplanner {
async fn replan(&self, _ctx: &ReplanContext) -> Result<ActionProposal, String> {
self.call_count.fetch_add(1, Ordering::SeqCst);
Ok(ActionProposal {
id: format!("replan-{}", self.call_count.load(Ordering::SeqCst)),
source: "replanner".to_string(),
actions: vec![tool_call("fail", HashMap::new())],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
})
}
}
struct BadToolReplanner;
#[async_trait::async_trait]
impl ReplanCallback for BadToolReplanner {
async fn replan(&self, _ctx: &ReplanContext) -> Result<ActionProposal, String> {
Ok(ActionProposal {
id: "bad-replan".to_string(),
source: "test".to_string(),
actions: vec![tool_call("nonexistent_tool", HashMap::new())],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
})
}
}
#[tokio::test]
async fn test_replan_recovers_from_failure() {
let replanner = Arc::new(MockReplanner {
call_count: AtomicU32::new(0),
});
let rt = make_runtime().with_replan(
replanner.clone(),
ReplanConfig {
max_replans: 3,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("echo").await;
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(
result.all_succeeded(),
"expected success after replan, got: {:?}",
result.results
);
assert_eq!(replanner.call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_replan_disabled_by_default() {
let rt = setup_runtime().await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(!result.all_succeeded());
}
#[tokio::test]
async fn test_replan_exhausted_after_max_attempts() {
let replanner = Arc::new(PersistentlyFailingReplanner {
call_count: AtomicU32::new(0),
});
let rt = make_runtime().with_replan(
replanner.clone(),
ReplanConfig {
max_replans: 2,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(!result.all_succeeded());
assert_eq!(replanner.call_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_replan_callback_failure_returns_original() {
let rt = make_runtime().with_replan(
Arc::new(FailingReplanner),
ReplanConfig {
max_replans: 3,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(!result.all_succeeded());
}
#[tokio::test]
async fn test_replan_context_contains_failure_info() {
use std::sync::Mutex;
struct CapturingReplanner {
captured: Mutex<Option<ReplanContext>>,
}
#[async_trait::async_trait]
impl ReplanCallback for CapturingReplanner {
async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String> {
*self.captured.lock().unwrap() = Some(ctx.clone());
Ok(ActionProposal {
id: "recovery".to_string(),
source: "test".to_string(),
actions: vec![],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
})
}
}
let replanner = Arc::new(CapturingReplanner {
captured: Mutex::new(None),
});
let rt = make_runtime().with_replan(
replanner.clone(),
ReplanConfig {
max_replans: 1,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let _result = rt.execute(&p).await;
let ctx = replanner.captured.lock().unwrap().take().unwrap();
assert_eq!(ctx.attempt, 1);
assert_eq!(ctx.proposal_id, "test-proposal");
assert_eq!(ctx.original_action_count, 1);
assert!(!ctx.failed_actions.is_empty());
assert_eq!(ctx.failed_actions[0].error, "boom");
assert_eq!(ctx.replans_remaining, 0);
}
#[tokio::test]
async fn test_replan_state_is_clean_between_attempts() {
use std::sync::Mutex;
struct StateCheckingReplanner {
captured_state: Mutex<Option<HashMap<String, Value>>>,
}
#[async_trait::async_trait]
impl ReplanCallback for StateCheckingReplanner {
async fn replan(&self, ctx: &ReplanContext) -> Result<ActionProposal, String> {
*self.captured_state.lock().unwrap() = Some(ctx.state_snapshot.clone());
Ok(ActionProposal {
id: "replan-clean".to_string(),
source: "test".to_string(),
actions: vec![tool_call(
"echo",
[("message".to_string(), Value::from("clean"))].into(),
)],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
})
}
}
let replanner = Arc::new(StateCheckingReplanner {
captured_state: Mutex::new(None),
});
let rt = make_runtime().with_replan(
replanner.clone(),
ReplanConfig {
max_replans: 1,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("echo").await;
rt.register_tool("fail").await;
let mut write_then_fail = vec![
state_write("dirty_key", Value::from("dirty_value")),
tool_call("fail", HashMap::new()),
];
write_then_fail[1]
.state_dependencies
.push("dirty_key".to_string());
let p = proposal(write_then_fail);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
let snapshot = replanner.captured_state.lock().unwrap().take().unwrap();
assert!(
!snapshot.contains_key("dirty_key"),
"state should be clean after rollback, but found dirty_key: {:?}",
snapshot
);
}
#[tokio::test]
async fn test_replan_with_unregistered_tool_is_rejected() {
let rt = make_runtime().with_replan(
Arc::new(BadToolReplanner),
ReplanConfig {
max_replans: 2,
delay_ms: 0,
verify_before_execute: false,
},
);
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(!result.all_succeeded());
}
#[tokio::test]
async fn test_replan_quality_gate_rejects_bad_proposal() {
let replanner = Arc::new(MockReplanner {
call_count: AtomicU32::new(0),
});
let rt = make_runtime().with_replan(
replanner.clone(),
ReplanConfig {
max_replans: 1,
delay_ms: 0,
verify_before_execute: true,
},
);
rt.register_tool("echo").await;
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(result.all_succeeded());
assert_eq!(replanner.call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_replan_quality_gate_blocks_invalid_replan() {
let rt = make_runtime().with_replan(
Arc::new(BadToolReplanner),
ReplanConfig {
max_replans: 2,
delay_ms: 0,
verify_before_execute: true,
},
);
rt.register_tool("fail").await;
let p = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.execute(&p).await;
assert!(!result.all_succeeded());
}
#[tokio::test]
async fn test_plan_and_execute_picks_best() {
let rt = setup_runtime().await;
let good = proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("hello"))].into(),
)]);
let bad = proposal(vec![tool_call("fail", HashMap::new())]);
let result = rt.plan_and_execute(&[bad, good], None, None).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_plan_and_execute_falls_back() {
let rt = setup_runtime().await;
let fails = proposal(vec![tool_call("fail", HashMap::new())]);
let succeeds = proposal(vec![tool_call(
"echo",
[("message".to_string(), Value::from("fallback"))].into(),
)]);
let result = rt.plan_and_execute(&[fails, succeeds], None, None).await;
assert!(result.all_succeeded());
}
#[tokio::test]
async fn test_plan_and_execute_empty_candidates() {
let rt = setup_runtime().await;
let result = rt.plan_and_execute(&[], None, None).await;
assert_eq!(result.proposal_id, "empty");
assert!(result.results.is_empty());
}
#[tokio::test]
async fn execute_with_cancel_emits_canceled_results_when_token_tripped() {
use tokio_util::sync::CancellationToken;
let rt = setup_runtime().await;
let mut a1 = state_write("x", Value::from(1));
a1.id = "a1".into();
let mut a2 = state_write("y", Value::from(2));
a2.id = "a2".into();
a2.state_dependencies = vec!["x".into()];
let mut a3 = state_write("z", Value::from(3));
a3.id = "a3".into();
a3.state_dependencies = vec!["y".into()];
let proposal = ActionProposal {
id: "p-cancel".into(),
source: "test".into(),
actions: vec![a1, a2, a3],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
};
let token = CancellationToken::new();
token.cancel();
let result = rt.execute_with_cancel(&proposal, &token).await;
assert_eq!(result.results.len(), 3, "all 3 actions accounted for");
for r in &result.results {
let err = r.error.as_deref().unwrap_or("");
assert!(
err.contains("canceled"),
"action {} should be canceled, got status={:?} error={:?}",
r.action_id,
r.status,
r.error
);
}
}
#[tokio::test]
async fn execute_with_cancel_skips_later_levels_when_tripped_mid_execute() {
use tokio_util::sync::CancellationToken;
let rt = setup_runtime().await;
let mut a0 = state_write("x", Value::from(1));
a0.id = "a0".into();
let mut a1 = tool_call("slow", HashMap::new());
a1.id = "a1".into();
a1.state_dependencies = vec!["x".into()];
let mut a2 = state_write("y", Value::from(2));
a2.id = "a2".into();
a2.state_dependencies = vec!["x".into()];
let mut a3 = state_write("z", Value::from(3));
a3.id = "a3".into();
a3.state_dependencies = vec!["y".into()];
let proposal = ActionProposal {
id: "p-mid".into(),
source: "test".into(),
actions: vec![a0, a1, a2, a3],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
};
let token = CancellationToken::new();
let token_for_trip = token.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
token_for_trip.cancel();
});
let result = rt.execute_with_cancel(&proposal, &token).await;
let a0_result = result.results.iter().find(|r| r.action_id == "a0").unwrap();
assert_eq!(a0_result.status, ActionStatus::Succeeded);
let later_canceled = result
.results
.iter()
.filter(|r| matches!(r.action_id.as_str(), "a2" | "a3"))
.filter(|r| r.error.as_deref().is_some_and(|e| e.contains("canceled")))
.count();
assert!(
later_canceled >= 1,
"expected ≥1 of a2/a3 to be canceled mid-execute, got results: {:?}",
result
.results
.iter()
.map(|r| (&r.action_id, &r.status, &r.error))
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn execute_with_cancel_no_op_when_token_never_tripped() {
use tokio_util::sync::CancellationToken;
let rt = setup_runtime().await;
let mut params = HashMap::new();
params.insert("a".into(), Value::from(1));
params.insert("b".into(), Value::from(2));
let action = tool_call("add", params);
let proposal = ActionProposal {
id: "p-no-cancel".into(),
source: "test".into(),
actions: vec![action],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
};
let token = CancellationToken::new();
let result = rt.execute_with_cancel(&proposal, &token).await;
assert_eq!(result.results.len(), 1);
assert_eq!(result.results[0].status, ActionStatus::Succeeded);
assert_eq!(result.results[0].output, Some(Value::from(3)));
}
#[tokio::test]
async fn execute_falls_through_to_execute_with_cancel_with_inert_token() {
let rt = setup_runtime().await;
let mut params = HashMap::new();
params.insert("a".into(), Value::from(40));
params.insert("b".into(), Value::from(2));
let proposal = ActionProposal {
id: "p-default".into(),
source: "test".into(),
actions: vec![tool_call("add", params)],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
};
let result = rt.execute(&proposal).await;
assert_eq!(result.results[0].output, Some(Value::from(42)));
}