use std::io::{BufRead, Write};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use std::time::Duration;
use futures_util::stream;
use opi_ai::provider::{EventStream, ModelInfo, Provider, Request};
use opi_ai::stream::{AssistantStreamEvent, StopReason};
use opi_ai::test_support::{MockProvider, base_assistant, text_response};
use opi_coding_agent::config::OpiConfig;
use opi_coding_agent::policy::ToolSelection;
use opi_coding_agent::rpc::{RPC_SCHEMA_VERSION, RpcCommand, RpcRunner};
#[test]
fn rpc_parse_prompt_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"prompt","message":"hello"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::prompt { .. }));
assert_eq!(cmd.command_name(), "prompt");
assert!(cmd.id().is_none());
}
#[test]
fn rpc_parse_prompt_with_id() {
let cmd: RpcCommand =
serde_json::from_str(r#"{"type":"prompt","id":"req-1","message":"hello"}"#).unwrap();
assert_eq!(cmd.id(), Some("req-1"));
}
#[test]
fn rpc_parse_continue_command() {
let cmd: RpcCommand =
serde_json::from_str(r#"{"type":"continue","message":"more text"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::continue_ { .. }));
assert_eq!(cmd.command_name(), "continue");
}
#[test]
fn rpc_parse_abort_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"abort"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::abort { .. }));
assert!(cmd.id().is_none());
}
#[test]
fn rpc_parse_abort_with_id() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"abort","id":"a1"}"#).unwrap();
assert_eq!(cmd.id(), Some("a1"));
}
#[test]
fn rpc_parse_set_model_command() {
let cmd: RpcCommand =
serde_json::from_str(r#"{"type":"set_model","model":"anthropic:claude-sonnet-4"}"#)
.unwrap();
assert!(matches!(cmd, RpcCommand::set_model { .. }));
}
#[test]
fn rpc_parse_set_thinking_level_command() {
let cmd: RpcCommand =
serde_json::from_str(r#"{"type":"set_thinking_level","level":"high"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::set_thinking_level { .. }));
}
#[test]
fn rpc_parse_compact_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"compact"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::compact { .. }));
}
#[test]
fn rpc_parse_session_info_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"session_info"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::session_info { .. }));
}
#[test]
fn rpc_parse_quit_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"quit"}"#).unwrap();
assert!(cmd.is_quit());
}
#[test]
fn rpc_parse_steer_command() {
let cmd: RpcCommand = serde_json::from_str(r#"{"type":"steer","message":"do this"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::steer { .. }));
}
#[test]
fn rpc_parse_follow_up_command() {
let cmd: RpcCommand =
serde_json::from_str(r#"{"type":"follow_up","message":"then that"}"#).unwrap();
assert!(matches!(cmd, RpcCommand::follow_up { .. }));
}
#[test]
fn rpc_parse_malformed_json_returns_error() {
let result = serde_json::from_str::<RpcCommand>("not json at all");
assert!(result.is_err());
}
#[test]
fn rpc_parse_unknown_type_returns_error() {
let result = serde_json::from_str::<RpcCommand>(r#"{"type":"unknown_command"}"#);
assert!(result.is_err());
}
#[test]
fn rpc_parse_missing_required_field_returns_error() {
let result = serde_json::from_str::<RpcCommand>(r#"{"type":"prompt"}"#);
assert!(result.is_err());
}
fn opi_binary_path() -> std::path::PathBuf {
if let Some(path) = option_env!("CARGO_BIN_EXE_opi") {
return std::path::PathBuf::from(path);
}
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
let mut path = std::path::PathBuf::from(&manifest_dir);
path.push("../../target/debug/opi");
if cfg!(windows) {
path.set_extension("exe");
}
path
}
struct RpcProcess {
child: Child,
stdin: Option<ChildStdin>,
stdout: std::io::BufReader<ChildStdout>,
}
impl RpcProcess {
fn spawn() -> Self {
let binary = opi_binary_path();
let mut child = Command::new(&binary)
.arg("--rpc")
.arg("--model")
.arg("anthropic:claude-sonnet-4-5-20250514")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("ANTHROPIC_API_KEY", "test-key-for-rpc-tests")
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn {:?}: {e}", binary));
let stdin = child.stdin.take().unwrap();
let stdout = std::io::BufReader::new(child.stdout.take().unwrap());
Self {
child,
stdin: Some(stdin),
stdout,
}
}
fn send(&mut self, cmd: &serde_json::Value) {
let mut line = serde_json::to_string(cmd).unwrap();
line.push('\n');
self.stdin
.as_mut()
.unwrap()
.write_all(line.as_bytes())
.unwrap();
self.stdin.as_mut().unwrap().flush().unwrap();
}
fn read_line(&mut self) -> serde_json::Value {
loop {
let mut line = String::new();
let n = self
.stdout
.read_line(&mut line)
.expect("failed to read line");
if n == 0 {
panic!("EOF from child process (no more stdout)");
}
let trimmed = line.trim_end_matches('\n').trim_end_matches('\r');
if trimmed.is_empty() {
continue; }
return serde_json::from_str(trimmed)
.unwrap_or_else(|_| panic!("invalid JSON from stdout: {trimmed}"));
}
}
fn read_until_response(&mut self, command: &str) -> serde_json::Value {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
loop {
if std::time::Instant::now() > deadline {
panic!("timed out waiting for response with command={command}");
}
let line = self.read_line();
if line["type"] == "response" && line["command"] == command {
return line;
}
}
}
fn wait(mut self) -> std::process::ExitStatus {
self.stdin.take();
self.child.wait().unwrap()
}
}
impl Drop for RpcProcess {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
#[test]
fn rpc_subprocess_ready_header() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let header = proc.read_line();
assert_eq!(header["type"], "rpc_ready");
assert_eq!(header["schema_version"], RPC_SCHEMA_VERSION);
assert_eq!(header["mode"], "rpc");
proc.send(&serde_json::json!({"type": "quit"}));
let resp = proc.read_line();
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "quit");
assert_eq!(resp["success"], true);
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_malformed_command() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.stdin
.as_mut()
.unwrap()
.write_all(b"not json\n")
.unwrap();
proc.stdin.as_mut().unwrap().flush().unwrap();
let resp = proc.read_line();
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "parse");
assert_eq!(resp["success"], false);
assert!(resp["error"].is_string());
proc.send(&serde_json::json!({"type": "quit"}));
let _resp = proc.read_line();
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_id_correlation() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.send(&serde_json::json!({"type": "quit", "id": "test-42"}));
let resp = proc.read_line();
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "quit");
assert_eq!(resp["id"], "test-42");
assert_eq!(resp["success"], true);
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_unknown_command_type() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.send(&serde_json::json!({"type": "fly_to_moon"}));
let resp = proc.read_line();
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "parse");
assert_eq!(resp["success"], false);
proc.send(&serde_json::json!({"type": "quit"}));
let _resp = proc.read_line();
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_session_info_command() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.send(&serde_json::json!({"type": "session_info", "id": "si-1"}));
let resp = proc.read_until_response("session_info");
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "session_info");
assert_eq!(resp["success"], true);
assert_eq!(resp["id"], "si-1");
assert!(resp["data"]["model"].is_string());
proc.send(&serde_json::json!({"type": "quit"}));
let _resp = proc.read_line();
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_empty_lines_ignored() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.stdin.as_mut().unwrap().write_all(b"\n\n\n").unwrap();
proc.stdin.as_mut().unwrap().flush().unwrap();
proc.send(&serde_json::json!({"type": "quit"}));
let resp = proc.read_line();
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "quit");
assert_eq!(resp["success"], true);
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_eof_exits_cleanly() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.stdin.take();
let status = proc.child.wait().unwrap();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_subprocess_multiple_commands_sequential() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.send(&serde_json::json!({"type": "session_info", "id": "1"}));
let resp = proc.read_until_response("session_info");
assert_eq!(resp["success"], true);
proc.send(&serde_json::json!({
"type": "set_model",
"model": "anthropic:claude-sonnet-4-5-20250514",
"id": "2"
}));
let resp = proc.read_until_response("set_model");
assert_eq!(resp["success"], true);
proc.send(&serde_json::json!({"type": "session_info", "id": "3"}));
let resp = proc.read_until_response("session_info");
assert_eq!(resp["success"], true);
assert_eq!(resp["id"], "3");
proc.send(&serde_json::json!({"type": "quit"}));
let _resp = proc.read_line();
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[test]
fn rpc_set_model_rejects_cross_provider() {
let binary = opi_binary_path();
if !binary.exists() {
eprintln!(
"skipping rpc subprocess test: binary not found at {:?}",
binary
);
return;
}
let mut proc = RpcProcess::spawn();
let _header = proc.read_line();
proc.send(&serde_json::json!({"type": "set_model", "id": "set", "model": "openai:gpt-4o"}));
let resp = proc.read_until_response("set_model");
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "set_model");
assert_eq!(resp["id"], "set");
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"cannot switch provider from anthropic to openai at runtime"
);
proc.send(&serde_json::json!({"type": "quit"}));
let _resp = proc.read_line();
let status = proc.wait();
assert_eq!(status.code(), Some(0));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_returns_model_data() {
let provider = MockProvider::new_with_models(
"mock",
vec![
rpc_model_info("mock-model", true),
rpc_model_info("next-model", true),
],
Vec::new(),
);
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-1".into()),
model: "mock:next-model".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "set_model");
assert_eq!(resp["id"], "set-1");
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["model"], "mock:next-model");
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_accepts_model_ids_containing_colons() {
let provider = MockProvider::new_with_models(
"bedrock",
vec![
rpc_model_info("anthropic.claude-sonnet-4-20250514-v2:0", true),
rpc_model_info("anthropic.claude-haiku-4-20250514-v1:0", true),
],
Vec::new(),
);
let (command_tx, mut output_rx, task) = custom_provider_runner_with_model(
provider,
"bedrock:anthropic.claude-sonnet-4-20250514-v2:0",
);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-colon".into()),
model: "bedrock:anthropic.claude-haiku-4-20250514-v1:0".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], true);
assert_eq!(
resp["data"]["model"],
"bedrock:anthropic.claude-haiku-4-20250514-v1:0"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_invalid_spec() {
let provider = MockProvider::new("mock", Vec::new());
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-1".into()),
model: "not-a-model-spec".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], false);
assert_eq!(resp["error"], "invalid model spec: expected provider:model");
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_empty_provider_and_empty_model_specs() {
let provider = MockProvider::new("mock", Vec::new());
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("empty-provider".into()),
model: ":mock-model".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], false);
assert_eq!(resp["error"], "invalid model spec: expected provider:model");
command_tx
.send(RpcCommand::set_model {
id: Some("empty-model".into()),
model: "mock:".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], false);
assert_eq!(resp["error"], "invalid model spec: expected provider:model");
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_unknown_same_provider_model() {
let provider = MockProvider::new("mock", Vec::new());
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-1".into()),
model: "mock:bad-model".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"unknown model 'bad-model' for provider 'mock'"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_unadvertised_current_model() {
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("known-model", true)],
Vec::new(),
);
let (command_tx, mut output_rx, task) =
custom_provider_runner_with_model(provider, "mock:legacy-model");
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-current".into()),
model: "mock:legacy-model".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"unknown model 'legacy-model' for provider 'mock'"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_updates_subsequent_provider_request() {
let provider = MockProvider::new_with_models(
"mock",
vec![
rpc_model_info("mock-model", true),
rpc_model_info("next-model", true),
],
vec![text_response("ok")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_model {
id: Some("set-1".into()),
model: "mock:next-model".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_model").await;
assert_eq!(resp["success"], true);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "use the updated model".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].model, "mock:next-model");
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_while_running_without_mutating_model() {
let provider = HeldRequestProvider::new();
let call_log = provider.call_log();
let release = provider.release_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
command_tx
.send(RpcCommand::set_model {
id: Some("set-while-running".into()),
model: "mock:next-model".into(),
})
.unwrap();
let rejected = recv_response(&mut output_rx, "set_model").await;
assert_eq!(rejected["success"], false);
assert_eq!(
rejected["error"],
"cannot change model while agent is running"
);
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].model, "mock:mock-model");
}
release.notify_one();
recv_until_agent_end(&mut output_rx).await;
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-2".into()),
message: "model should still be unchanged".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 2);
assert_eq!(calls[1].model, "mock:mock-model");
}
release.notify_one();
recv_until_agent_end(&mut output_rx).await;
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_revalidates_existing_thinking_before_switching() {
let provider = MockProvider::new_with_models(
"mock",
vec![
rpc_model_info_with_max_output("large-model", true, 100_000),
rpc_model_info_with_max_output("small-model", true, 20_000),
],
vec![text_response("still large")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) =
custom_provider_runner_with_model(provider, "mock:large-model");
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-high".into()),
level: "high".into(),
})
.unwrap();
let thinking = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(thinking["success"], true);
assert_eq!(thinking["data"]["budget_tokens"], 20_000);
command_tx
.send(RpcCommand::set_model {
id: Some("set-small".into()),
model: "mock:small-model".into(),
})
.unwrap();
let rejected = recv_response(&mut output_rx, "set_model").await;
assert_eq!(rejected["success"], false);
assert_eq!(
rejected["error"],
"thinking budget 20000 requires max_tokens 20001, exceeding max output tokens 20000 for model 'small-model'"
);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "model should remain large".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].model, "mock:large-model");
assert_eq!(calls[0].thinking.budget_tokens, Some(20_000));
assert_eq!(calls[0].max_tokens, Some(20_001));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_model_rejects_switch_to_non_thinking_model_with_active_thinking() {
let provider = MockProvider::new_with_models(
"mock",
vec![
rpc_model_info("thinking-model", true),
rpc_model_info("plain-model", false),
],
vec![text_response("still thinking")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) =
custom_provider_runner_with_model(provider, "mock:thinking-model");
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-low".into()),
level: "low".into(),
})
.unwrap();
let thinking = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(thinking["success"], true);
assert_eq!(thinking["data"]["budget_tokens"], 2048);
command_tx
.send(RpcCommand::set_model {
id: Some("set-plain".into()),
model: "mock:plain-model".into(),
})
.unwrap();
let rejected = recv_response(&mut output_rx, "set_model").await;
assert_eq!(rejected["success"], false);
assert_eq!(
rejected["error"],
"model 'plain-model' does not support thinking"
);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "model should remain thinking capable".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].model, "mock:thinking-model");
assert!(calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, Some(2048));
assert_eq!(calls[0].max_tokens, Some(2049));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_changes_runtime_config() {
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("mock-model", true)],
vec![text_response("ok")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-1".into()),
level: "low".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["level"], "low");
assert_eq!(resp["data"]["enabled"], true);
assert_eq!(resp["data"]["budget_tokens"], 2048);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "use low thinking".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, Some(2048));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_off_medium_high_change_runtime_config() {
let mut config = OpiConfig::default();
config.thinking.enabled = true;
config.thinking.budget_tokens = 12_345;
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("mock-model", true)],
vec![
text_response("off"),
text_response("medium"),
text_response("high"),
],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner_with_config(provider, config);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-off".into()),
level: "off".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["level"], "off");
assert_eq!(resp["data"]["enabled"], false);
assert!(resp["data"]["budget_tokens"].is_null());
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-off".into()),
message: "off".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-medium".into()),
level: "medium".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["level"], "medium");
assert_eq!(resp["data"]["enabled"], true);
assert_eq!(resp["data"]["budget_tokens"], 12_345);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-medium".into()),
message: "medium".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-high".into()),
level: "high".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["level"], "high");
assert_eq!(resp["data"]["enabled"], true);
assert_eq!(resp["data"]["budget_tokens"], 20_000);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-high".into()),
message: "high".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 3);
assert!(!calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, None);
assert!(calls[1].thinking.enabled);
assert_eq!(calls[1].thinking.budget_tokens, Some(12_345));
assert!(calls[2].thinking.enabled);
assert_eq!(calls[2].thinking.budget_tokens, Some(20_000));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_medium_and_high_keep_request_token_budget_valid() {
let mut config = OpiConfig::default();
config.thinking.enabled = true;
config.thinking.budget_tokens = 12_345;
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("mock-model", true)],
vec![text_response("medium"), text_response("high")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner_with_config(provider, config);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-medium".into()),
level: "medium".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["budget_tokens"], 12_345);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-medium".into()),
message: "medium".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-high".into()),
level: "high".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["data"]["budget_tokens"], 20_000);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-high".into()),
message: "high".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 2);
for request in calls.iter() {
let budget = request
.thinking
.budget_tokens
.expect("thinking request should carry a budget");
let max_tokens = request
.max_tokens
.expect("thinking request should carry a coherent max token limit");
assert!(
budget < max_tokens,
"thinking budget {budget} must be less than request max_tokens {max_tokens}"
);
}
assert_eq!(calls[0].thinking.budget_tokens, Some(12_345));
assert_eq!(calls[0].max_tokens, Some(12_346));
assert_eq!(calls[1].thinking.budget_tokens, Some(20_000));
assert_eq!(calls[1].max_tokens, Some(20_001));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_startup_thinking_config_sets_valid_first_request_token_budget() {
let mut config = OpiConfig::default();
config.thinking.enabled = true;
config.thinking.budget_tokens = 12_345;
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info_with_max_output("mock-model", true, 12_346)],
vec![text_response("startup thinking")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner_with_config(provider, config);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "first request should have valid thinking config".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, Some(12_345));
assert_eq!(calls[0].max_tokens, Some(12_346));
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_startup_thinking_config_disables_known_non_thinking_model() {
let mut config = OpiConfig::default();
config.thinking.enabled = true;
config.thinking.budget_tokens = 2_048;
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("mock-model", false)],
vec![text_response("startup thinking disabled")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner_with_config(provider, config);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "first request should not enable unsupported thinking".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(!calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, None);
assert_eq!(calls[0].max_tokens, None);
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_rejects_budget_above_known_model_limit() {
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info_with_max_output("mock-model", true, 8_192)],
Vec::new(),
);
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-high".into()),
level: "high".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"thinking budget 20000 requires max_tokens 20001, exceeding max output tokens 8192 for model 'mock-model'"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_rejects_known_non_thinking_model() {
let provider = MockProvider::new_with_models(
"mock",
vec![rpc_model_info("mock-model", false)],
vec![text_response("thinking stays off")],
);
let call_log = provider.call_log_handle();
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-low".into()),
level: "low".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"model 'mock-model' does not support thinking"
);
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "rejected thinking should not mutate runtime config".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
recv_until_agent_end(&mut output_rx).await;
{
let calls = call_log.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(!calls[0].thinking.enabled);
assert_eq!(calls[0].thinking.budget_tokens, None);
assert_eq!(calls[0].max_tokens, None);
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_rejects_invalid_level() {
let provider = MockProvider::new("mock", Vec::new());
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-1".into()),
level: "maximum".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"invalid thinking level 'maximum': expected off, low, medium, or high"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_set_thinking_level_rejects_while_running() {
let provider = ControlledProvider::new();
let (command_tx, mut output_rx, task) = rpc_test_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
command_tx
.send(RpcCommand::set_thinking_level {
id: Some("think-1".into()),
level: "high".into(),
})
.unwrap();
let resp = recv_response(&mut output_rx, "set_thinking_level").await;
assert_eq!(resp["success"], false);
assert_eq!(
resp["error"],
"cannot change thinking level while agent is running"
);
command_tx.send(RpcCommand::abort { id: None }).unwrap();
let _abort = recv_response(&mut output_rx, "abort").await;
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[test]
fn rpc_tool_selection_respects_no_tools() {
let workspace = tempfile::tempdir().expect("workspace tempdir");
let provider = MockProvider::new("mock", vec![text_response("ok")]);
let runner = RpcRunner::new(
Box::new(provider),
"mock:mock-model".into(),
OpiConfig::default(),
workspace.path().to_path_buf(),
false,
ToolSelection::Disabled,
None,
Vec::new(),
)
.expect("rpc runner should construct");
let system = runner.system_prompt().expect("runner should be idle");
assert!(
!system.contains("Available tools:"),
"RPC --no-tools should remove built-in tool definitions from the system prompt"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_session_info_includes_discovered_resource_metadata() {
let workspace = tempfile::tempdir().expect("workspace tempdir");
let skill_dir = workspace
.path()
.join(".opi")
.join("skills")
.join("rpc-skill");
std::fs::create_dir_all(&skill_dir).unwrap();
std::fs::write(
skill_dir.join("SKILL.md"),
r#"---
name: rpc-skill
description: RPC visible skill.
---
Body should remain undisclosed.
"#,
)
.unwrap();
let provider = MockProvider::new("mock", Vec::new());
let runner = RpcRunner::new(
Box::new(provider),
"mock:mock-model".into(),
OpiConfig::default(),
workspace.path().to_path_buf(),
false,
ToolSelection::Disabled,
None,
Vec::new(),
)
.expect("rpc runner should construct");
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let (output_tx, mut output_rx) = tokio::sync::mpsc::unbounded_channel();
let task = tokio::spawn(async move {
let mut runner = runner;
runner.run_with_channels(command_rx, output_tx).await
});
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::session_info {
id: Some("resources-1".into()),
})
.unwrap();
let resp = recv_response(&mut output_rx, "session_info").await;
assert_eq!(resp["success"], true);
assert_eq!(resp["id"], "resources-1");
let skills = resp["data"]["resources"]["skills"]
.as_array()
.expect("skills metadata should be an array");
assert!(
skills.iter().any(|name| name.as_str() == Some("rpc-skill")),
"session_info resources should include workspace skill names: {skills:?}"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
fn rpc_model_info(id: &str, supports_thinking: bool) -> ModelInfo {
rpc_model_info_with_max_output(id, supports_thinking, 100_000)
}
fn rpc_model_info_with_max_output(
id: &str,
supports_thinking: bool,
max_output_tokens: u64,
) -> ModelInfo {
ModelInfo {
id: id.into(),
display_name: id.into(),
context_window: 100_000,
max_output_tokens,
supports_images: true,
supports_streaming: true,
supports_thinking,
}
}
#[derive(Clone)]
struct BlockingCleanupProvider {
cleanup_finished: Arc<AtomicBool>,
}
impl BlockingCleanupProvider {
fn new(cleanup_finished: Arc<AtomicBool>) -> Self {
Self { cleanup_finished }
}
}
impl Provider for BlockingCleanupProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
}]
});
&MODELS
}
fn stream(&self, _request: Request) -> EventStream {
let stream = stream::unfold(0, move |step| async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 => {
std::future::pending::<()>().await;
None
}
_ => None,
}
});
Box::pin(stream)
}
}
impl Drop for BlockingCleanupProvider {
fn drop(&mut self) {
std::thread::sleep(Duration::from_millis(150));
self.cleanup_finished.store(true, Ordering::SeqCst);
}
}
#[derive(Clone)]
struct ControlledEmitCleanupProvider {
continue_streaming: Arc<tokio::sync::Notify>,
cleanup_finished: Arc<AtomicBool>,
}
impl ControlledEmitCleanupProvider {
fn new(
continue_streaming: Arc<tokio::sync::Notify>,
cleanup_finished: Arc<AtomicBool>,
) -> Self {
Self {
continue_streaming,
cleanup_finished,
}
}
}
impl Provider for ControlledEmitCleanupProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
}]
});
&MODELS
}
fn stream(&self, _request: Request) -> EventStream {
let continue_streaming = self.continue_streaming.clone();
let stream = stream::unfold(0, move |step| {
let continue_streaming = continue_streaming.clone();
async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 => {
continue_streaming.notified().await;
let mut partial = base_assistant();
partial
.content
.push(opi_ai::message::AssistantContent::Text {
text: "after drop".into(),
});
Some((
Ok(AssistantStreamEvent::TextDelta {
content_index: 0,
delta: "after drop".into(),
partial,
}),
2,
))
}
2 => {
std::future::pending::<()>().await;
None
}
_ => None,
}
}
});
Box::pin(stream)
}
}
impl Drop for ControlledEmitCleanupProvider {
fn drop(&mut self) {
self.cleanup_finished.store(true, Ordering::SeqCst);
}
}
#[derive(Clone)]
struct HeldRequestProvider {
calls: Arc<Mutex<Vec<Request>>>,
release: Arc<tokio::sync::Notify>,
}
impl HeldRequestProvider {
fn new() -> Self {
Self {
calls: Arc::new(Mutex::new(Vec::new())),
release: Arc::new(tokio::sync::Notify::new()),
}
}
fn call_log(&self) -> Arc<Mutex<Vec<Request>>> {
Arc::clone(&self.calls)
}
fn release_handle(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.release)
}
}
impl Provider for HeldRequestProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![
ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
},
ModelInfo {
id: "next-model".into(),
display_name: "Next Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
},
]
});
&MODELS
}
fn stream(&self, request: Request) -> EventStream {
let cancel = request.cancel.clone();
let release = self.release.clone();
self.calls.lock().unwrap().push(request);
let stream = stream::unfold(0, move |step| {
let cancel = cancel.clone();
let release = release.clone();
async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 => {
tokio::select! {
_ = cancel.cancelled() => None,
_ = release.notified() => {
let mut message = base_assistant();
message.content.push(opi_ai::message::AssistantContent::Text {
text: "released".into(),
});
Some((Ok(AssistantStreamEvent::Done {
reason: StopReason::Stop,
message,
}), 2))
}
}
}
_ => None,
}
}
});
Box::pin(stream)
}
}
#[derive(Clone)]
struct PanickingProvider;
impl Provider for PanickingProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
}]
});
&MODELS
}
fn stream(&self, _request: Request) -> EventStream {
let stream = stream::unfold(0, move |step| async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 => panic!("forced active run panic"),
_ => None,
}
});
Box::pin(stream)
}
}
fn blocking_cleanup_runner(
provider: BlockingCleanupProvider,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
) {
let workspace = tempfile::tempdir().expect("workspace tempdir");
let runner = RpcRunner::new(
Box::new(provider),
"mock:mock-model".into(),
OpiConfig::default(),
workspace.path().to_path_buf(),
false,
ToolSelection::Disabled,
None,
Vec::new(),
)
.expect("rpc runner should construct");
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let (output_tx, output_rx) = tokio::sync::mpsc::unbounded_channel();
let task = tokio::spawn(async move {
let mut runner = runner;
runner.run_with_channels(command_rx, output_tx).await
});
(command_tx, output_rx, task)
}
fn custom_provider_runner<P>(
provider: P,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
)
where
P: Provider + 'static,
{
custom_provider_runner_with_config(provider, OpiConfig::default())
}
fn custom_provider_runner_with_model<P>(
provider: P,
model: impl Into<String>,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
)
where
P: Provider + 'static,
{
custom_provider_runner_with_config_and_model(provider, OpiConfig::default(), model)
}
fn custom_provider_runner_with_config<P>(
provider: P,
config: OpiConfig,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
)
where
P: Provider + 'static,
{
custom_provider_runner_with_config_and_model(provider, config, "mock:mock-model")
}
fn custom_provider_runner_with_config_and_model<P>(
provider: P,
config: OpiConfig,
model: impl Into<String>,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
)
where
P: Provider + 'static,
{
let workspace = tempfile::tempdir().expect("workspace tempdir");
let runner = RpcRunner::new(
Box::new(provider),
model.into(),
config,
workspace.path().to_path_buf(),
false,
ToolSelection::Disabled,
None,
Vec::new(),
)
.expect("rpc runner should construct");
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let (output_tx, output_rx) = tokio::sync::mpsc::unbounded_channel();
let task = tokio::spawn(async move {
let mut runner = runner;
runner.run_with_channels(command_rx, output_tx).await
});
(command_tx, output_rx, task)
}
#[derive(Clone)]
struct ControlledProvider {
calls: Arc<Mutex<Vec<Request>>>,
}
impl ControlledProvider {
fn new() -> Self {
Self {
calls: Arc::new(Mutex::new(Vec::new())),
}
}
fn call_log(&self) -> Arc<Mutex<Vec<Request>>> {
Arc::clone(&self.calls)
}
}
impl Provider for ControlledProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
}]
});
&MODELS
}
fn stream(&self, request: Request) -> EventStream {
let cancel = request.cancel.clone();
self.calls.lock().unwrap().push(request);
let stream = stream::unfold(0, move |step| {
let cancel = cancel.clone();
async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 => {
tokio::select! {
_ = cancel.cancelled() => None,
_ = tokio::time::sleep(Duration::from_millis(50)) => {
let mut partial = base_assistant();
partial.content.push(opi_ai::message::AssistantContent::Text {
text: "partial".into(),
});
Some((Ok(AssistantStreamEvent::TextDelta {
content_index: 0,
delta: "partial".into(),
partial,
}), 2))
}
}
}
2 => {
tokio::select! {
_ = cancel.cancelled() => None,
_ = tokio::time::sleep(Duration::from_millis(250)) => {
let mut message = base_assistant();
message.content.push(opi_ai::message::AssistantContent::Text {
text: "partial".into(),
});
Some((Ok(AssistantStreamEvent::Done {
reason: StopReason::Stop,
message,
}), 3))
}
}
}
_ => None,
}
}
});
Box::pin(stream)
}
}
#[derive(Clone)]
struct SecondTurnGatedDeltaProvider {
run_count: Arc<AtomicUsize>,
second_delta_parked: Arc<tokio::sync::Notify>,
release_second_delta: Arc<tokio::sync::Notify>,
second_delta_emitted: Arc<AtomicBool>,
}
impl SecondTurnGatedDeltaProvider {
fn new() -> Self {
Self {
run_count: Arc::new(AtomicUsize::new(0)),
second_delta_parked: Arc::new(tokio::sync::Notify::new()),
release_second_delta: Arc::new(tokio::sync::Notify::new()),
second_delta_emitted: Arc::new(AtomicBool::new(false)),
}
}
async fn wait_for_second_delta_gate(&self) {
tokio::time::timeout(Duration::from_secs(2), self.second_delta_parked.notified())
.await
.expect("timed out waiting for second run to park before delta");
}
fn release_second_delta_gate(&self) {
self.release_second_delta.notify_one();
}
fn second_delta_emitted(&self) -> bool {
self.second_delta_emitted.load(Ordering::SeqCst)
}
}
impl Provider for SecondTurnGatedDeltaProvider {
fn id(&self) -> &str {
"mock"
}
fn models(&self) -> &[ModelInfo] {
static MODELS: std::sync::LazyLock<Vec<ModelInfo>> = std::sync::LazyLock::new(|| {
vec![ModelInfo {
id: "mock-model".into(),
display_name: "Mock Model".into(),
context_window: 100_000,
max_output_tokens: 4_096,
supports_images: true,
supports_streaming: true,
supports_thinking: false,
}]
});
&MODELS
}
fn stream(&self, request: Request) -> EventStream {
let run_number = self.run_count.fetch_add(1, Ordering::SeqCst) + 1;
let cancel = request.cancel.clone();
let second_delta_parked = self.second_delta_parked.clone();
let release_second_delta = self.release_second_delta.clone();
let second_delta_emitted = self.second_delta_emitted.clone();
let stream = stream::unfold(0, move |step| {
let cancel = cancel.clone();
let second_delta_parked = second_delta_parked.clone();
let release_second_delta = release_second_delta.clone();
let second_delta_emitted = second_delta_emitted.clone();
async move {
match step {
0 => Some((
Ok(AssistantStreamEvent::Start {
partial: base_assistant(),
}),
1,
)),
1 if run_number == 2 => {
second_delta_parked.notify_one();
release_second_delta.notified().await;
if cancel.is_cancelled() {
None
} else {
second_delta_emitted.store(true, Ordering::SeqCst);
let mut partial = base_assistant();
partial
.content
.push(opi_ai::message::AssistantContent::Text {
text: "partial".into(),
});
Some((
Ok(AssistantStreamEvent::TextDelta {
content_index: 0,
delta: "partial".into(),
partial,
}),
2,
))
}
}
1 => {
cancel.cancelled().await;
None
}
2 => {
let mut message = base_assistant();
message
.content
.push(opi_ai::message::AssistantContent::Text {
text: "partial".into(),
});
Some((
Ok(AssistantStreamEvent::Done {
reason: StopReason::Stop,
message,
}),
3,
))
}
_ => None,
}
}
});
Box::pin(stream)
}
}
fn rpc_test_runner(
provider: ControlledProvider,
) -> (
tokio::sync::mpsc::UnboundedSender<RpcCommand>,
tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
tokio::task::JoinHandle<i32>,
) {
let workspace = tempfile::tempdir().expect("workspace tempdir");
let runner = RpcRunner::new(
Box::new(provider),
"mock:mock-model".into(),
OpiConfig::default(),
workspace.path().to_path_buf(),
false,
ToolSelection::Disabled,
None,
Vec::new(),
)
.expect("rpc runner should construct");
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let (output_tx, output_rx) = tokio::sync::mpsc::unbounded_channel();
let task = tokio::spawn(async move {
let mut runner = runner;
runner.run_with_channels(command_rx, output_tx).await
});
(command_tx, output_rx, task)
}
async fn recv_rpc_line(
output_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
) -> serde_json::Value {
tokio::time::timeout(Duration::from_secs(2), output_rx.recv())
.await
.expect("timed out waiting for RPC output")
.expect("RPC output channel closed")
}
async fn recv_response(
output_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
command: &str,
) -> serde_json::Value {
loop {
let line = recv_rpc_line(output_rx).await;
if line["type"] == "response" && line["command"] == command {
return line;
}
}
}
async fn recv_until_agent_end(
output_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
) {
loop {
let line = recv_rpc_line(output_rx).await;
if line["type"] == "AgentEnd" {
return;
}
}
}
async fn wait_for_idle_session_info(
command_tx: &tokio::sync::mpsc::UnboundedSender<RpcCommand>,
output_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
) {
for attempt in 0..16 {
let id = format!("idle-{attempt}");
command_tx
.send(RpcCommand::session_info {
id: Some(id.clone()),
})
.unwrap();
let response = recv_response(output_rx, "session_info").await;
if response["id"] != id {
continue;
}
if response["success"] == true {
return;
}
assert_eq!(
response["error"],
"cannot query session info while agent is running"
);
}
panic!("RPC runner did not become idle after active run completed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_quit_while_running_waits_for_active_task_cleanup() {
let cleanup_finished = Arc::new(AtomicBool::new(false));
let provider = BlockingCleanupProvider::new(cleanup_finished.clone());
let (command_tx, mut output_rx, task) = blocking_cleanup_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
command_tx
.send(RpcCommand::quit {
id: Some("quit-1".into()),
})
.unwrap();
let quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(quit["success"], true);
assert_eq!(task.await.unwrap(), 0);
assert!(
cleanup_finished.load(Ordering::SeqCst),
"RPC runner returned before the active run task completed cleanup"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_eof_while_running_waits_for_active_task_cleanup() {
let cleanup_finished = Arc::new(AtomicBool::new(false));
let provider = BlockingCleanupProvider::new(cleanup_finished.clone());
let (command_tx, mut output_rx, task) = blocking_cleanup_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
drop(command_tx);
assert_eq!(task.await.unwrap(), 0);
assert!(
cleanup_finished.load(Ordering::SeqCst),
"RPC runner returned before the active run task completed cleanup"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_output_drop_while_running_waits_for_active_task_cleanup() {
let continue_streaming = Arc::new(tokio::sync::Notify::new());
let cleanup_finished = Arc::new(AtomicBool::new(false));
let provider =
ControlledEmitCleanupProvider::new(continue_streaming.clone(), cleanup_finished.clone());
let (command_tx, mut output_rx, task) = custom_provider_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
drop(output_rx);
continue_streaming.notify_one();
let exit = tokio::time::timeout(Duration::from_secs(2), task)
.await
.expect("RPC runner did not exit after output failure")
.unwrap();
assert_eq!(exit, 1);
assert!(
cleanup_finished.load(Ordering::SeqCst),
"RPC runner returned after output failure before active run cleanup"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_active_run_join_error_is_fatal() {
let (command_tx, mut output_rx, task) = custom_provider_runner(PanickingProvider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
let exit = tokio::time::timeout(Duration::from_secs(2), task)
.await
.expect("RPC runner continued after active run JoinError")
.unwrap();
assert_eq!(exit, 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_mid_turn_abort_is_processed() {
let provider = ControlledProvider::new();
let (command_tx, mut output_rx, task) = rpc_test_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(prompt["success"], true);
command_tx
.send(RpcCommand::abort {
id: Some("abort-1".into()),
})
.unwrap();
let abort = recv_response(&mut output_rx, "abort").await;
assert_eq!(abort["success"], true);
assert_eq!(abort["id"], "abort-1");
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_second_turn_abort_after_cancel_targets_active_run() {
let provider = SecondTurnGatedDeltaProvider::new();
let (command_tx, mut output_rx, task) = custom_provider_runner(provider.clone());
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "first".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "MessageStart" {
break;
}
}
command_tx.send(RpcCommand::abort { id: None }).unwrap();
let _abort = recv_response(&mut output_rx, "abort").await;
recv_until_agent_end(&mut output_rx).await;
wait_for_idle_session_info(&command_tx, &mut output_rx).await;
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-2".into()),
message: "second".into(),
})
.unwrap();
let second_prompt = recv_response(&mut output_rx, "prompt").await;
assert_eq!(second_prompt["id"], "prompt-2");
assert_eq!(second_prompt["success"], true);
provider.wait_for_second_delta_gate().await;
command_tx
.send(RpcCommand::abort {
id: Some("abort-2".into()),
})
.unwrap();
let second_abort = recv_response(&mut output_rx, "abort").await;
assert_eq!(second_abort["success"], true);
provider.release_second_delta_gate();
loop {
let line = recv_rpc_line(&mut output_rx).await;
assert_ne!(
line["type"], "MessageUpdate",
"stale control handle allowed the second run to continue after abort"
);
if line["type"] == "AgentEnd" {
break;
}
}
assert!(
!provider.second_delta_emitted(),
"second run emitted a delta after abort released the gate"
);
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_mid_turn_steer_is_queued() {
let provider = ControlledProvider::new();
let call_log = provider.call_log();
let (command_tx, mut output_rx, task) = rpc_test_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
command_tx
.send(RpcCommand::steer {
id: Some("steer-1".into()),
message: "use the queued steering".into(),
})
.unwrap();
let steer = recv_response(&mut output_rx, "steer").await;
assert_eq!(steer["success"], true);
loop {
let line = recv_rpc_line(&mut output_rx).await;
if line["type"] == "AgentEnd" {
break;
}
}
{
let calls = call_log.lock().unwrap();
assert!(
calls.len() >= 2,
"queued steering should trigger a second provider request"
);
let second_call = &calls[1];
let saw_steering = second_call.messages.iter().any(|message| match message {
opi_ai::message::Message::User(user) => user.content.iter().any(|content| {
matches!(
content,
opi_ai::message::InputContent::Text { text }
if text == "use the queued steering"
)
}),
_ => false,
});
assert!(
saw_steering,
"second provider request should include steering message"
);
}
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rpc_events_stream_before_turn_end() {
let provider = ControlledProvider::new();
let (command_tx, mut output_rx, task) = rpc_test_runner(provider);
let header = recv_rpc_line(&mut output_rx).await;
assert_eq!(header["type"], "rpc_ready");
command_tx
.send(RpcCommand::prompt {
id: Some("prompt-1".into()),
message: "start".into(),
})
.unwrap();
let _prompt = recv_response(&mut output_rx, "prompt").await;
loop {
let line = recv_rpc_line(&mut output_rx).await;
assert_ne!(
line["type"], "AgentEnd",
"MessageUpdate should stream before the turn ends"
);
if line["type"] == "MessageUpdate" {
break;
}
}
command_tx.send(RpcCommand::abort { id: None }).unwrap();
let _abort = recv_response(&mut output_rx, "abort").await;
command_tx.send(RpcCommand::quit { id: None }).unwrap();
let _quit = recv_response(&mut output_rx, "quit").await;
assert_eq!(task.await.unwrap(), 0);
}
#[test]
fn rpc_response_format_success() {
use serde_json::json;
let resp = json!({
"type": "response",
"id": "req-1",
"command": "prompt",
"success": true,
});
assert_eq!(resp["type"], "response");
assert_eq!(resp["command"], "prompt");
assert_eq!(resp["success"], true);
assert_eq!(resp["id"], "req-1");
}
#[test]
fn rpc_response_format_error() {
use serde_json::json;
let resp = json!({
"type": "response",
"id": "req-2",
"command": "set_model",
"success": false,
"error": "model not found: invalid/model",
});
assert_eq!(resp["success"], false);
assert!(resp["error"].is_string());
}
#[test]
fn rpc_response_format_with_data() {
use serde_json::json;
let resp = json!({
"type": "response",
"id": "req-3",
"command": "session_info",
"success": true,
"data": {
"model": "mock-model",
"session_id": "abc123",
},
});
assert_eq!(resp["success"], true);
assert!(resp["data"].is_object());
assert_eq!(resp["data"]["model"], "mock-model");
}
#[test]
fn rpc_schema_version_is_2() {
assert_eq!(RPC_SCHEMA_VERSION, 2);
}