use super::*;
use crate::core::events::{Event as EngineEvent, TurnOutcomeStatus};
use crate::core::ops::Op;
use crate::models::Usage;
use crate::runtime_threads::RuntimeEventRecord;
use crate::test_support::{EnvVarGuard, lock_test_env};
use anyhow::{Context, bail};
use futures_util::StreamExt;
use std::fs;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio::time::sleep;
use uuid::Uuid;
struct MockExecutor;
#[async_trait::async_trait]
impl crate::task_manager::TaskExecutor for MockExecutor {
async fn execute(
&self,
_task: crate::task_manager::ExecutionTask,
events: mpsc::UnboundedSender<crate::task_manager::TaskExecutionEvent>,
cancel: tokio_util::sync::CancellationToken,
) -> crate::task_manager::TaskExecutionResult {
let _ = events.send(crate::task_manager::TaskExecutionEvent::Status {
message: "started".to_string(),
});
sleep(Duration::from_millis(100)).await;
if cancel.is_cancelled() {
return crate::task_manager::TaskExecutionResult {
status: crate::task_manager::TaskStatus::Canceled,
result_text: None,
error: None,
};
}
crate::task_manager::TaskExecutionResult {
status: crate::task_manager::TaskStatus::Completed,
result_text: Some("ok".to_string()),
error: None,
}
}
}
fn saved_session_with_blocks(blocks: Vec<crate::models::ContentBlock>) -> SavedSession {
SavedSession {
schema_version: 1,
metadata: SessionMetadata {
id: "session-1".to_string(),
title: "test session".to_string(),
created_at: Utc::now(),
updated_at: Utc::now(),
message_count: 1,
total_tokens: 0,
model: "test-model".to_string(),
workspace: PathBuf::from("."),
mode: None,
cost: Default::default(),
parent_session_id: None,
forked_from_message_count: None,
cumulative_turn_secs: 0,
},
messages: vec![crate::models::Message {
role: "assistant".to_string(),
content: blocks,
}],
system_prompt: None,
context_references: Vec::new(),
artifacts: Vec::new(),
}
}
fn run_test_git(workspace: &std::path::Path, args: &[&str]) -> Result<()> {
let output = crate::dependencies::Git::output(args, workspace)
.with_context(|| format!("git {args:?} failed to spawn"))?;
if !output.status.success() {
bail!(
"git {args:?} failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
Ok(())
}
#[test]
fn workspace_status_reports_head_and_dirty_counts() -> Result<()> {
let tmp = tempfile::tempdir()?;
let repo = tmp.path().join("repo");
fs::create_dir_all(&repo)?;
run_test_git(&repo, &["init", "-b", "main"])?;
run_test_git(&repo, &["config", "core.autocrlf", "false"])?;
fs::write(repo.join("tracked.txt"), "clean\n")?;
run_test_git(&repo, &["add", "tracked.txt"])?;
run_test_git(
&repo,
&[
"-c",
"user.name=CodeWhale Test",
"-c",
"user.email=codewhale@example.invalid",
"commit",
"-m",
"init",
],
)?;
let clean = collect_workspace_status(&repo);
assert!(clean.git_repo);
assert_eq!(clean.branch.as_deref(), Some("main"));
assert!(clean.head.as_deref().is_some_and(|head| !head.is_empty()));
assert!(!clean.dirty);
fs::write(repo.join("tracked.txt"), "dirty\n")?;
fs::write(repo.join("untracked.txt"), "new\n")?;
let dirty = collect_workspace_status(&repo);
assert!(dirty.dirty);
assert_eq!(dirty.unstaged, 1);
assert_eq!(dirty.untracked, 1);
Ok(())
}
#[test]
fn session_detail_tool_use_preserves_caller_metadata() {
let detail = session_to_detail(saved_session_with_blocks(vec![
crate::models::ContentBlock::ToolUse {
id: "tool-1".to_string(),
name: "task_shell_start".to_string(),
input: json!({ "cmd": "cargo test" }),
caller: Some(crate::models::ToolCaller {
caller_type: "subagent".to_string(),
tool_id: Some("parent-tool".to_string()),
}),
},
]));
let block = &detail.messages[0]["content"][0];
assert_eq!(block["type"].as_str(), Some("tool_use"));
assert_eq!(block["caller"]["type"].as_str(), Some("subagent"));
assert_eq!(block["caller"]["tool_id"].as_str(), Some("parent-tool"));
}
#[test]
fn session_detail_tool_result_keeps_fallback_content_with_blocks() {
let detail = session_to_detail(saved_session_with_blocks(vec![
crate::models::ContentBlock::ToolResult {
tool_use_id: "tool-1".to_string(),
content: "fallback text".to_string(),
is_error: Some(false),
content_blocks: Some(vec![json!({
"type": "text",
"text": "structured text"
})]),
},
]));
let block = &detail.messages[0]["content"][0];
assert_eq!(block["type"].as_str(), Some("tool_result"));
assert_eq!(block["content"].as_str(), Some("fallback text"));
assert_eq!(
block["content_blocks"][0]["text"].as_str(),
Some("structured text")
);
assert_eq!(block["is_error"].as_bool(), Some(false));
}
#[test]
fn messages_from_thread_detail_batches_tool_results() {
let now = Utc::now();
let turn_id = "turn_detail".to_string();
let thread = ThreadRecord {
schema_version: 2,
id: "thr_detail".to_string(),
created_at: now,
updated_at: now,
model: DEFAULT_TEXT_MODEL.to_string(),
workspace: PathBuf::from("."),
mode: "agent".to_string(),
allow_shell: false,
trust_mode: false,
auto_approve: false,
latest_turn_id: Some(turn_id.clone()),
latest_response_bookmark: None,
archived: false,
system_prompt: None,
task_id: None,
title: None,
session_id: None,
};
let turn = TurnRecord {
schema_version: 2,
id: turn_id.clone(),
thread_id: thread.id.clone(),
status: RuntimeTurnStatus::Completed,
input_summary: "check".to_string(),
created_at: now,
started_at: Some(now),
ended_at: Some(now),
duration_ms: Some(0),
usage: None,
error: None,
item_ids: vec![
"item_user".to_string(),
"item_reasoning".to_string(),
"item_tool_use".to_string(),
"item_result_one".to_string(),
"item_result_two".to_string(),
"item_answer".to_string(),
],
steer_count: 0,
};
let item = |id: &str,
kind: TurnItemKind,
summary: &str,
detail: Option<&str>,
metadata: Option<Value>| {
crate::runtime_threads::TurnItemRecord {
schema_version: 2,
id: id.to_string(),
turn_id: turn_id.clone(),
kind,
status: TurnItemLifecycleStatus::Completed,
summary: summary.to_string(),
detail: detail.map(str::to_string),
metadata,
artifact_refs: Vec::new(),
started_at: Some(now),
ended_at: Some(now),
}
};
let detail = ThreadDetail {
thread,
turns: vec![turn],
items: vec![
item(
"item_user",
TurnItemKind::UserMessage,
"check",
Some("check"),
None,
),
item(
"item_reasoning",
TurnItemKind::AgentReasoning,
"thinking",
Some("thinking"),
None,
),
item(
"item_tool_use",
TurnItemKind::ToolCall,
"shell",
Some(r#"{"cmd":"pwd"}"#),
Some(json!({
"tool_use_id": "tool-1",
"tool_name": "shell"
})),
),
item(
"item_result_one",
TurnItemKind::ToolCall,
"one",
Some("one"),
Some(json!({
"tool_result_for": "tool-1",
"is_error": false,
"content_blocks": [{
"type": "text",
"text": "structured one"
}]
})),
),
item(
"item_result_two",
TurnItemKind::ToolCall,
"two",
Some("two"),
Some(json!({
"tool_result_for": "tool-2",
"is_error": true
})),
),
item(
"item_answer",
TurnItemKind::AgentMessage,
"done",
Some("done"),
None,
),
],
latest_seq: 0,
};
let messages = messages_from_thread_detail(&detail);
let roles = messages
.iter()
.map(|message| message.role.as_str())
.collect::<Vec<_>>();
assert_eq!(roles, vec!["user", "assistant", "user", "assistant"]);
assert_eq!(messages[2].content.len(), 2);
match &messages[2].content[0] {
ContentBlock::ToolResult {
tool_use_id,
content,
is_error,
content_blocks,
} => {
assert_eq!(tool_use_id, "tool-1");
assert_eq!(content, "one");
assert_eq!(*is_error, None);
assert_eq!(
content_blocks
.as_ref()
.and_then(|blocks| blocks[0].get("text")),
Some(&json!("structured one"))
);
}
other => panic!("expected first tool result, got {other:?}"),
}
match &messages[2].content[1] {
ContentBlock::ToolResult {
tool_use_id,
content,
is_error,
content_blocks,
} => {
assert_eq!(tool_use_id, "tool-2");
assert_eq!(content, "two");
assert_eq!(*is_error, Some(true));
assert!(content_blocks.is_none());
}
other => panic!("expected second tool result, got {other:?}"),
}
}
#[test]
fn runtime_auth_generates_token_by_default() {
let auth = resolve_runtime_auth(None, None, false);
assert!(auth.generated);
let token = auth.token.expect("generated token");
assert!(token.starts_with("cwrt_"));
assert!(token.len() > 32);
}
#[test]
fn runtime_auth_status_does_not_render_generated_token() {
let auth = ResolvedRuntimeAuth {
token: Some("cwrt_super_secret_test_token".to_string()),
generated: true,
};
let rendered = runtime_auth_status_lines(&auth).join("\n");
assert!(!rendered.contains("cwrt_super_secret_test_token"));
assert!(rendered.contains("not printed"));
}
#[test]
fn runtime_auth_requires_explicit_insecure_for_no_token() {
let auth = resolve_runtime_auth(None, None, true);
assert_eq!(
auth,
ResolvedRuntimeAuth {
token: None,
generated: false,
}
);
}
#[test]
fn runtime_auth_prefers_cli_token_over_env_token() {
let auth = resolve_runtime_auth(
Some(" cli-token ".to_string()),
Some("env-token".to_string()),
false,
);
assert_eq!(
auth,
ResolvedRuntimeAuth {
token: Some("cli-token".to_string()),
generated: false,
}
);
}
#[test]
fn runtime_auth_ignores_blank_configured_tokens() {
let auth = resolve_runtime_auth(Some(" ".to_string()), Some("\t".to_string()), false);
assert!(auth.generated);
assert!(auth.token.is_some());
}
#[test]
fn url_query_component_percent_encodes_token() {
assert_eq!(
url_query_component("abc ABC+/?:=&%"),
"abc%20ABC%2B%2F%3F%3A%3D%26%25"
);
}
#[test]
fn token_from_cookie_header_decodes_percent_encoded_token() {
assert_eq!(
token_from_cookie_header(Some(
"theme=dark; codewhale_runtime_token=abc%20ABC%2B%2F%3F%3A%3D%26%25"
)),
Some("abc ABC+/?:=&%".to_string())
);
assert_eq!(
token_from_cookie_header(Some("codewhale_runtime_token=bad%ZZ")),
None
);
}
async fn spawn_test_server_with_root(
root: PathBuf,
sessions_dir: PathBuf,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
spawn_test_server_with_root_and_token(root, sessions_dir, None).await
}
async fn spawn_test_server_with_root_and_token(
root: PathBuf,
sessions_dir: PathBuf,
runtime_token: Option<String>,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
spawn_test_server_with_root_token_and_mobile(root, sessions_dir, runtime_token, false).await
}
async fn spawn_test_server_with_root_token_and_mobile(
root: PathBuf,
sessions_dir: PathBuf,
runtime_token: Option<String>,
mobile_enabled: bool,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
spawn_test_server_with_root_token_mobile_workspace(
root,
sessions_dir,
runtime_token,
mobile_enabled,
PathBuf::from("."),
)
.await
}
async fn spawn_test_server_with_root_token_mobile_workspace(
root: PathBuf,
sessions_dir: PathBuf,
runtime_token: Option<String>,
mobile_enabled: bool,
workspace: PathBuf,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
let _ = rustls::crypto::ring::default_provider().install_default();
fs::create_dir_all(&sessions_dir)?;
fs::create_dir_all(&workspace)?;
let manager = TaskManager::start_with_executor(
TaskManagerConfig {
data_dir: root.join("tasks"),
worker_count: 1,
default_workspace: workspace.clone(),
default_model: DEFAULT_TEXT_MODEL.to_string(),
default_mode: "agent".to_string(),
allow_shell: false,
trust_mode: false,
max_subagents: 2,
},
Arc::new(MockExecutor),
)
.await?;
let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open(
Config::default(),
workspace.clone(),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")),
)?);
runtime_threads.attach_task_manager(manager.clone());
let automations = Arc::new(Mutex::new(AutomationManager::open(
root.join("automations"),
)?));
runtime_threads.attach_automation_manager(automations.clone());
let auth_required = runtime_token.is_some();
let state = RuntimeApiState {
config: Config::default(),
workspace,
task_manager: manager,
runtime_threads: runtime_threads.clone(),
cors_origins: Vec::new(),
sessions_dir,
mcp_config_path: root.join("mcp.json"),
automations,
runtime_token,
skill_state: Arc::new(Mutex::new(
SkillStateStore::load_from(root.join("skills_state.toml")).unwrap_or_default(),
)),
auth_required,
bind_host: "127.0.0.1".to_string(),
bind_port: 0,
mobile_enabled,
};
let app = build_router(state);
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(err) => return Err(err.into()),
};
let addr = listener.local_addr()?;
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
Ok(Some((addr, runtime_threads, handle)))
}
async fn spawn_test_server() -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
let root = std::env::temp_dir().join(format!("deepseek-runtime-api-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
spawn_test_server_with_root(root, sessions_dir).await
}
async fn read_first_sse_frame(resp: reqwest::Response) -> Result<String> {
let mut stream = resp.bytes_stream();
let mut buf = Vec::new();
loop {
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.context("timed out waiting for SSE frame")?
.context("SSE stream ended unexpectedly")??;
buf.extend_from_slice(&next);
let text = String::from_utf8_lossy(&buf);
if let Some(idx) = text.find("\n\n").or_else(|| text.find("\r\n\r\n")) {
return Ok(text[..idx].to_string());
}
if buf.len() > 64 * 1024 {
bail!("SSE frame exceeded 64KB without delimiter");
}
}
}
fn parse_sse_frame(frame: &str) -> Result<(String, serde_json::Value)> {
let mut event_name: Option<String> = None;
let mut data_lines = Vec::new();
for line in frame.lines() {
if let Some(rest) = line.strip_prefix("event:") {
event_name = Some(rest.trim().to_string());
} else if let Some(rest) = line.strip_prefix("data:") {
data_lines.push(rest.trim_start().to_string());
}
}
let event_name = event_name.context("missing SSE event field")?;
let payload = if data_lines.is_empty() {
json!({})
} else {
serde_json::from_str(&data_lines.join("\n"))
.with_context(|| format!("invalid SSE data payload: {}", data_lines.join("\n")))?
};
Ok((event_name, payload))
}
async fn wait_for_terminal_turn_status(
client: &reqwest::Client,
addr: SocketAddr,
thread_id: &str,
turn_id: &str,
timeout: Duration,
) -> Result<String> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let status = detail["turns"]
.as_array()
.and_then(|turns| turns.iter().find(|turn| turn["id"] == turn_id))
.and_then(|turn| turn.get("status"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
if matches!(
status.as_str(),
"completed" | "failed" | "interrupted" | "canceled"
) {
return Ok(status);
}
if tokio::time::Instant::now() >= deadline {
bail!("timed out waiting for terminal turn status for {turn_id}");
}
sleep(Duration::from_millis(25)).await;
}
}
async fn wait_for_in_progress_item(
client: &reqwest::Client,
addr: SocketAddr,
thread_id: &str,
timeout: Duration,
) -> Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
if detail["items"]
.as_array()
.is_some_and(|items| items.iter().any(|item| item["status"] == "in_progress"))
{
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
bail!("timed out waiting for in-progress item in thread {thread_id}");
}
sleep(Duration::from_millis(25)).await;
}
}
#[tokio::test]
async fn health_and_tasks_endpoints_work() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let health: serde_json::Value = client
.get(format!("http://{addr}/health"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(health["status"], "ok");
assert_eq!(health["service"], "codewhale-runtime-api");
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/tasks"))
.json(&json!({ "prompt": "hello task" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let id = created["id"].as_str().expect("task id").to_string();
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/tasks"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed["tasks"]
.as_array()
.is_some_and(|tasks| !tasks.is_empty())
);
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/tasks/{id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["id"], id);
let _cancelled: serde_json::Value = client
.post(format!("http://{addr}/v1/tasks/{id}/cancel"))
.send()
.await?
.error_for_status()?
.json()
.await?;
handle.abort();
Ok(())
}
#[tokio::test]
async fn runtime_token_guard_protects_v1_routes() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-runtime-api-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let token = "local-test-token".to_string();
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_and_token(root, sessions_dir, Some(token.clone())).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let health = client
.get(format!("http://{addr}/health"))
.send()
.await?
.error_for_status()?;
assert_eq!(health.status(), StatusCode::OK);
let unauthorized = client
.get(format!("http://{addr}/v1/threads/summary"))
.send()
.await?;
assert_eq!(unauthorized.status(), StatusCode::UNAUTHORIZED);
let bearer = client
.get(format!("http://{addr}/v1/threads/summary"))
.bearer_auth(&token)
.send()
.await?
.error_for_status()?;
assert_eq!(bearer.status(), StatusCode::OK);
let query_token = client
.get(format!("http://{addr}/v1/threads/summary?token={token}"))
.send()
.await?;
assert_eq!(query_token.status(), StatusCode::UNAUTHORIZED);
let cookie_token = client
.get(format!("http://{addr}/v1/threads/summary"))
.header(
header::COOKIE,
format!("codewhale_runtime_token={}", url_query_component(&token)),
)
.send()
.await?
.error_for_status()?;
assert_eq!(cookie_token.status(), StatusCode::OK);
let codewhale_header = client
.get(format!("http://{addr}/v1/threads/summary"))
.header("x-codewhale-runtime-token", &token)
.send()
.await?
.error_for_status()?;
assert_eq!(codewhale_header.status(), StatusCode::OK);
let deepseek_header = client
.get(format!("http://{addr}/v1/threads/summary"))
.header("x-deepseek-runtime-token", &token)
.send()
.await?
.error_for_status()?;
assert_eq!(deepseek_header.status(), StatusCode::OK);
handle.abort();
Ok(())
}
#[tokio::test]
async fn thread_summary_includes_workspace_branch_metadata() -> Result<()> {
let tmp = tempfile::tempdir()?;
let root = tmp.path().join("runtime");
let sessions_dir = root.join("sessions");
let repo = tmp.path().join("repo");
fs::create_dir_all(&repo)?;
run_test_git(&repo, &["init", "-b", "feature/agent"])?;
run_test_git(&repo, &["config", "core.autocrlf", "false"])?;
fs::write(repo.join("README.md"), "branch visibility\n")?;
run_test_git(&repo, &["add", "README.md"])?;
run_test_git(
&repo,
&[
"-c",
"user.name=CodeWhale Test",
"-c",
"user.email=codewhale@example.invalid",
"commit",
"-m",
"init",
],
)?;
let non_git = tmp.path().join("non-git");
fs::create_dir_all(&non_git)?;
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root(root, sessions_dir).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let git_thread: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"title": "Git workspace",
"workspace": repo,
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let git_thread_id = git_thread["id"]
.as_str()
.context("missing git thread id")?
.to_string();
fs::write(
repo.join("dirty.txt"),
"worktree changed after thread spawn\n",
)?;
let plain_thread: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"title": "Plain workspace",
"workspace": non_git,
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let plain_thread_id = plain_thread["id"]
.as_str()
.context("missing plain thread id")?
.to_string();
let summary: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/summary?limit=100"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let summaries = summary.as_array().context("summary should be an array")?;
let git_summary = summaries
.iter()
.find(|item| item["id"] == git_thread_id)
.context("missing git workspace summary")?;
assert_eq!(git_summary["branch"], "feature/agent");
assert!(
git_summary["head"]
.as_str()
.is_some_and(|head| !head.is_empty())
);
assert_eq!(git_summary["dirty"], true);
assert_eq!(git_summary["workspace"], repo.to_string_lossy().as_ref());
let plain_summary = summaries
.iter()
.find(|item| item["id"] == plain_thread_id)
.context("missing plain workspace summary")?;
assert_eq!(plain_summary["branch"], serde_json::Value::Null);
assert_eq!(plain_summary["head"], serde_json::Value::Null);
assert_eq!(plain_summary["dirty"], false);
assert_eq!(
plain_summary["workspace"],
non_git.to_string_lossy().as_ref()
);
handle.abort();
Ok(())
}
#[tokio::test]
async fn workspace_and_automation_endpoints_work() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let workspace: serde_json::Value = client
.get(format!("http://{addr}/v1/workspace/status"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(workspace.get("workspace").is_some());
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/automations"))
.json(&json!({
"name": "Smoke automation",
"prompt": "automation smoke test",
"rrule": "FREQ=HOURLY;INTERVAL=2",
"status": "active"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let automation_id = created["id"]
.as_str()
.context("missing automation id")?
.to_string();
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/automations"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed
.as_array()
.is_some_and(|items| items.iter().any(|item| item["id"] == automation_id))
);
let run_now: serde_json::Value = client
.post(format!("http://{addr}/v1/automations/{automation_id}/run"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(run_now["automation_id"], automation_id);
let paused: serde_json::Value = client
.post(format!(
"http://{addr}/v1/automations/{automation_id}/pause"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(paused["status"], "paused");
let resumed: serde_json::Value = client
.post(format!(
"http://{addr}/v1/automations/{automation_id}/resume"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(resumed["status"], "active");
let updated: serde_json::Value = client
.patch(format!("http://{addr}/v1/automations/{automation_id}"))
.json(&json!({
"name": "Smoke automation edited",
"rrule": "FREQ=WEEKLY;BYDAY=MO,WE;BYHOUR=10;BYMINUTE=15"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(updated["name"], "Smoke automation edited");
let runs: serde_json::Value = client
.get(format!(
"http://{addr}/v1/automations/{automation_id}/runs?limit=5"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
runs.as_array().is_some_and(|items| !items.is_empty()),
"expected at least one run entry"
);
let _deleted: serde_json::Value = client
.delete(format!("http://{addr}/v1/automations/{automation_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let missing_status = client
.get(format!("http://{addr}/v1/automations/{automation_id}"))
.send()
.await?
.status();
assert_eq!(missing_status, StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn fleet_status_runtime_api_exposes_state_and_actions() -> Result<()> {
let root = std::env::temp_dir().join(format!("codewhale-fleet-api-{}", Uuid::new_v4()));
let workspace = root.join("workspace");
fs::create_dir_all(&workspace)?;
let manager = FleetManager::open(&workspace)?;
let task = codewhale_protocol::fleet::FleetTaskSpec {
id: "task-a".to_string(),
name: "Task A".to_string(),
description: None,
objective: Some("Inspect fleet status through Runtime API".to_string()),
instructions: "Stay running for inspection.".to_string(),
worker: Some(codewhale_protocol::fleet::FleetTaskWorkerProfile {
role: Some("status-reviewer".to_string()),
tool_profile: Some("read-only".to_string()),
tools: vec!["rg".to_string()],
capabilities: vec!["fleet".to_string()],
}),
workspace: None,
input_files: Vec::new(),
context: Vec::new(),
budget: None,
tags: Vec::new(),
expected_artifacts: vec![FleetArtifactKind::Log],
scorer: None,
retry_policy: None,
alert_policy: None,
timeout_seconds: None,
metadata: std::collections::BTreeMap::new(),
};
let report = manager.create_run(
crate::fleet::task_spec::FleetTaskSpecDocument {
name: Some("api smoke".to_string()),
labels: std::collections::BTreeMap::new(),
security_policy: None,
workers: Vec::new(),
tasks: vec![task],
},
1,
)?;
let worker_id = report.worker_ids[0].clone();
let sessions_dir = root.join("sessions");
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_mobile_workspace(
root.clone(),
sessions_dir,
None,
false,
workspace,
)
.await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let runs: serde_json::Value = client
.get(format!("http://{addr}/v1/fleet/runs"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(runs["status"]["running"], 1);
assert_eq!(runs["runs"][0]["id"], report.run_id.0);
let worker: serde_json::Value = client
.get(format!("http://{addr}/v1/fleet/workers/{worker_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(
worker["objective"],
"Inspect fleet status through Runtime API"
);
assert_eq!(worker["role"], "status-reviewer");
assert_eq!(worker["host"], "local");
assert_eq!(worker["artifacts"][0]["kind"], "log");
let interrupted: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/workers/{worker_id}/interrupt"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(interrupted["action"], "interrupt");
assert_eq!(interrupted["worker"]["last_error"], "cancelled by operator");
let restarted: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/workers/{worker_id}/restart"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(restarted["action"], "restart");
assert_eq!(restarted["worker"]["status"], "busy");
let stopped: serde_json::Value = client
.post(format!(
"http://{addr}/v1/fleet/runs/{}/stop",
report.run_id.0
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(stopped["action"], "stop");
assert_eq!(stopped["stopped"], 1);
assert_eq!(stopped["status"]["cancelled"], 1);
handle.abort();
Ok(())
}
#[tokio::test]
async fn agent_runs_runtime_api_exposes_persisted_worker_receipts() -> Result<()> {
use crate::tools::subagent::{
AgentRunArtifactRef, AgentRunFollowUpTarget, AgentRunRecommendedAction,
AgentRunTakeoverTarget, AgentRunUsage, AgentRunVerificationSummary, AgentWorkerEvent,
AgentWorkerRecord, AgentWorkerSpec, AgentWorkerStatus, AgentWorkerToolProfile,
SubAgentType,
};
use crate::worker_profile::{ModelRoute, ToolScope, WorkerRuntimeProfile};
use std::collections::VecDeque;
let root = std::env::temp_dir().join(format!("codewhale-agent-runs-api-{}", Uuid::new_v4()));
let workspace = root.join("workspace");
fs::create_dir_all(workspace.join(".codewhale/state"))?;
let record = AgentWorkerRecord {
spec: AgentWorkerSpec {
worker_id: "agent_receipt".to_string(),
run_id: "run_receipt".to_string(),
parent_run_id: Some("parent_run".to_string()),
session_name: Some("receipt_lane".to_string()),
objective: "Verify run receipt projection".to_string(),
role: Some("verifier".to_string()),
agent_type: SubAgentType::Verifier,
model: "deepseek-v4-flash".to_string(),
workspace: workspace.clone(),
git_branch: Some("codex/v0.8.60".to_string()),
context_mode: "fresh".to_string(),
fork_context: false,
tool_profile: AgentWorkerToolProfile::Explicit(vec!["read_file".to_string()]),
runtime_profile: {
let mut profile = WorkerRuntimeProfile::for_role(SubAgentType::Verifier);
profile.tools = ToolScope::Explicit(vec!["read_file".to_string()]);
profile.model = ModelRoute::Fixed("deepseek-v4-flash".to_string());
profile.max_spawn_depth =
crate::tools::subagent::DEFAULT_MAX_SPAWN_DEPTH.saturating_sub(1);
profile
},
max_steps: 4,
spawn_depth: 1,
max_spawn_depth: crate::tools::subagent::DEFAULT_MAX_SPAWN_DEPTH,
},
actor_kind: "subagent".to_string(),
parent_run_id: Some("parent_run".to_string()),
follow_up: AgentRunFollowUpTarget {
tool: "handle_read".to_string(),
agent_id: "agent_receipt".to_string(),
session_name: Some("receipt_lane".to_string()),
accepted_statuses: vec!["running".to_string(), "interrupted_continuable".to_string()],
latest_delivery: None,
},
takeover: AgentRunTakeoverTarget {
kind: "local_subagent_session".to_string(),
supported: true,
agent_id: "agent_receipt".to_string(),
session_name: Some("receipt_lane".to_string()),
instructions: "Use handle_read on the transcript_handle for agent_receipt.".to_string(),
unsupported_reason: None,
},
artifacts: vec![AgentRunArtifactRef {
kind: "transcript".to_string(),
name: "transcript_handle".to_string(),
target: "agent:agent_receipt".to_string(),
description: "Read with handle_read from a live projection.".to_string(),
}],
usage: AgentRunUsage {
status: "unknown".to_string(),
input_tokens: None,
output_tokens: None,
total_tokens: None,
token_budget: None,
budget_spent_tokens: None,
budget_remaining_tokens: None,
budget_scope: None,
note: "not reported".to_string(),
},
verification: AgentRunVerificationSummary {
status: "self_report_only".to_string(),
summary: "no verified receipt attached".to_string(),
},
recommended_action: AgentRunRecommendedAction {
action: "verify_self_report".to_string(),
tool: Some("handle_read".to_string()),
reason: "Worker agent_receipt completed; verify its self-report.".to_string(),
},
status: AgentWorkerStatus::Completed,
created_at_ms: 1,
updated_at_ms: 2,
started_at_ms: Some(1),
completed_at_ms: Some(2),
latest_message: Some("completed".to_string()),
result_summary: Some("receipt complete".to_string()),
error: None,
steps_taken: 2,
events: VecDeque::from([AgentWorkerEvent {
seq: 1,
worker_id: "agent_receipt".to_string(),
status: AgentWorkerStatus::Completed,
timestamp_ms: 2,
message: Some("completed".to_string()),
step: Some(2),
tool_name: None,
}]),
};
let state_payload = json!({
"schema_version": 1,
"agents": [],
"workers": [record],
});
fs::write(
workspace.join(".codewhale/state/subagents.v1.json"),
serde_json::to_vec_pretty(&state_payload)?,
)?;
let sessions_dir = root.join("sessions");
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_mobile_workspace(
root.clone(),
sessions_dir,
None,
false,
workspace,
)
.await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let runs: serde_json::Value = client
.get(format!("http://{addr}/v1/agent-runs"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(runs["runs"][0]["spec"]["run_id"], "run_receipt");
assert_eq!(runs["runs"][0]["follow_up"]["tool"], "handle_read");
assert_eq!(
runs["runs"][0]["verification"]["status"],
"self_report_only"
);
let run: serde_json::Value = client
.get(format!("http://{addr}/v1/agent-runs/run_receipt"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(run["spec"]["worker_id"], "agent_receipt");
assert_eq!(run["takeover"]["supported"], true);
assert_eq!(run["artifacts"][0]["kind"], "transcript");
let missing = client
.get(format!("http://{addr}/v1/agent-runs/missing"))
.send()
.await?
.status();
assert_eq!(missing, StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn stream_requires_prompt() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/stream"))
.json(&json!({ "prompt": "" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn thread_endpoints_expose_lifecycle_contract() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let archived: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "archived": true }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(archived["id"], thread_id);
assert_eq!(archived["archived"], true);
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/threads"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed
.as_array()
.is_some_and(|threads| threads.iter().all(|t| t["id"] != thread_id))
);
let listed_all: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads/summary?include_archived=true&limit=100"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed_all
.as_array()
.is_some_and(|threads| threads.iter().any(|t| t["id"] == thread_id))
);
let unarchived: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "archived": false }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(unarchived["archived"], false);
let invalid_patch = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({}))
.send()
.await?;
assert_eq!(invalid_patch.status(), StatusCode::BAD_REQUEST);
let missing_patch = client
.patch(format!("http://{addr}/v1/threads/thr_missing"))
.json(&json!({ "archived": true }))
.send()
.await?;
assert_eq!(missing_patch.status(), StatusCode::NOT_FOUND);
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["thread"]["id"], thread_id);
let resumed: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/resume"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(resumed["id"], thread_id);
let forked: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/fork"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let forked_id = forked["id"].as_str().context("missing forked id")?;
assert_ne!(forked_id, thread_id);
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
while let Some(op) = rx_op.recv().await {
match op {
Op::SendMessage { .. } => {
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_lifecycle".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: "mock reply".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 10,
output_tokens: 5,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
}
Op::CompactContext => {
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
}
_ => {}
}
}
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "thread endpoint test" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ =
wait_for_terminal_turn_status(&client, addr, &thread_id, &turn_id, Duration::from_secs(2))
.await?;
let steer_resp = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/steer"
))
.json(&json!({ "prompt": "late steer" }))
.send()
.await?;
assert_eq!(steer_resp.status(), StatusCode::CONFLICT);
let interrupt_resp = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/interrupt"
))
.send()
.await?;
assert_eq!(interrupt_resp.status(), StatusCode::CONFLICT);
let compact_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/compact"))
.json(&json!({ "reason": "test manual compact" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(compact_start["thread"]["id"], thread_id);
let events_resp = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let content_type = events_resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_string();
assert!(content_type.starts_with("text/event-stream"));
let chunk_text = read_first_sse_frame(events_resp).await?;
assert!(
chunk_text.contains("event:"),
"expected SSE event chunk, got: {chunk_text}"
);
let (event_name, payload) = parse_sse_frame(&chunk_text)?;
assert_eq!(event_name, "thread.started");
assert!(
event_name.starts_with("item.")
|| event_name.starts_with("turn.")
|| event_name.starts_with("thread.")
|| event_name == "turn.completed"
|| event_name == "turn.started"
|| event_name == "thread.started",
"unexpected first event name: {event_name}"
);
assert_eq!(payload["event"], payload["kind"]);
assert!(payload.get("turn_id").is_some());
assert!(payload.get("item_id").is_some());
assert!(payload["turn_id"].is_null());
assert!(payload["item_id"].is_null());
assert_eq!(payload["thread_id"], thread_id);
assert!(
payload["schema_version"]
.as_u64()
.is_some_and(|version| version >= 1)
);
assert!(payload.get("seq").and_then(Value::as_u64).is_some());
assert!(payload["payload"].is_object() || payload["payload"].is_array());
handle.abort();
Ok(())
}
#[tokio::test]
async fn events_endpoint_respects_since_seq_cursor() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_cursor".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 5,
output_tokens: 3,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
});
let started: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "cursor replay test" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = started["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ =
wait_for_terminal_turn_status(&client, addr, &thread_id, &turn_id, Duration::from_secs(2))
.await?;
let resp_a = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let frame_a = read_first_sse_frame(resp_a).await?;
let (event_a, payload_a) = parse_sse_frame(&frame_a)?;
assert_eq!(event_a, "thread.started");
assert!(payload_a.get("turn_id").is_some());
assert!(payload_a.get("item_id").is_some());
assert!(payload_a["turn_id"].is_null());
assert!(payload_a["item_id"].is_null());
assert!(payload_a.get("schema_version").is_some());
assert_eq!(payload_a["event"], payload_a["kind"]);
assert_eq!(payload_a["thread_id"], thread_id);
let seq_a = payload_a
.get("seq")
.and_then(Value::as_u64)
.context("missing seq in first replay frame")?;
let resp_b = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq={seq_a}"
))
.send()
.await?
.error_for_status()?;
let frame_b = read_first_sse_frame(resp_b).await?;
let (_event_b, payload_b) = parse_sse_frame(&frame_b)?;
assert!(payload_b.get("schema_version").is_some());
assert_eq!(payload_b["event"], payload_b["kind"]);
assert_eq!(payload_b["thread_id"], thread_id);
let seq_b = payload_b
.get("seq")
.and_then(Value::as_u64)
.context("missing seq in second replay frame")?;
assert!(
seq_b > seq_a,
"expected seq after cursor: {seq_b} <= {seq_a}"
);
handle.abort();
Ok(())
}
#[tokio::test]
async fn steer_and_interrupt_endpoints_work_on_active_turn() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let mut rx_steer = harness.rx_steer;
let tx_event = harness.tx_event;
let cancel_token = harness.cancel_token;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "engine_turn_api".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
if let Some(steer_text) = rx_steer.recv().await {
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: format!("steer:{steer_text}"),
})
.await;
}
cancel_token.cancelled().await;
sleep(Duration::from_millis(60)).await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 2,
output_tokens: 1,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "active controls" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let steer_resp: serde_json::Value = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/steer"
))
.json(&json!({ "prompt": "please steer" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(steer_resp["id"], turn_id);
assert_eq!(steer_resp["steer_count"], 1);
let interrupt_resp: serde_json::Value = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/interrupt"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(interrupt_resp["id"], turn_id);
let terminal =
wait_for_terminal_turn_status(&client, addr, &thread_id, &turn_id, Duration::from_secs(3))
.await?;
assert_eq!(terminal, "interrupted");
let events = runtime_threads.events_since(&thread_id, None)?;
assert!(events.iter().any(|ev| ev.event == "turn.steered"));
assert!(
events
.iter()
.any(|ev| ev.event == "turn.interrupt_requested")
);
assert!(events.iter().any(|ev| {
ev.event == "turn.completed"
&& ev
.payload
.get("turn")
.and_then(|turn| turn.get("status"))
.and_then(Value::as_str)
== Some("interrupted")
}));
handle.abort();
Ok(())
}
#[tokio::test]
async fn stream_compat_mapping_handles_expected_runtime_events() -> Result<()> {
let agent_delta = RuntimeEventRecord {
schema_version: 1,
seq: 1,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_test".to_string()),
event: "item.delta".to_string(),
payload: json!({
"kind": "agent_message",
"delta": "hello",
}),
};
let mapped = map_compat_stream_event(&agent_delta).context("missing mapped SSE event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: message.delta"));
assert!(text.contains("\"content\":\"hello\""));
let tool_start = RuntimeEventRecord {
schema_version: 1,
seq: 2,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_tool".to_string()),
event: "item.started".to_string(),
payload: json!({
"tool": { "id": "tool_1", "name": "exec_shell", "input": { "cmd": "pwd" } }
}),
};
let mapped = map_compat_stream_event(&tool_start).context("missing tool.started event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: tool.started"));
let tool_done = RuntimeEventRecord {
schema_version: 1,
seq: 3,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_tool".to_string()),
event: "item.completed".to_string(),
payload: json!({
"item": {
"id": "item_tool",
"kind": "tool_call",
"summary": "ok",
"detail": "done"
}
}),
};
let mapped = map_compat_stream_event(&tool_done).context("missing tool.completed event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: tool.completed"));
assert!(text.contains("\"success\":true"));
let unknown = RuntimeEventRecord {
schema_version: 1,
seq: 4,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: None,
event: "item.delta".to_string(),
payload: json!({
"kind": "context_compaction",
"delta": "ignored",
}),
};
assert!(map_compat_stream_event(&unknown).is_none());
Ok(())
}
#[tokio::test]
async fn stream_endpoint_remains_backward_compatible() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_stream".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: "streamed".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 4,
output_tokens: 2,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "compatibility stream" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ =
wait_for_terminal_turn_status(&client, addr, &thread_id, &turn_id, Duration::from_secs(2))
.await?;
let events = runtime_threads.events_since(&thread_id, None)?;
assert!(
events.iter().any(|ev| ev.event == "turn.started"),
"expected turn.started event"
);
assert!(
events.iter().any(|ev| ev.event == "turn.completed"),
"expected turn.completed event"
);
let events_resp = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let content_type = events_resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_string();
assert!(content_type.starts_with("text/event-stream"));
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_get_returns_404_for_missing_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.get(format!("http://{addr}/v1/sessions/nonexistent_id"))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_endpoints_reject_invalid_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let get_resp = client
.get(format!("http://{addr}/v1/sessions/invalid%20id"))
.send()
.await?;
assert_eq!(get_resp.status(), StatusCode::BAD_REQUEST);
let resume_resp = client
.post(format!(
"http://{addr}/v1/sessions/invalid%20id/resume-thread"
))
.json(&json!({}))
.send()
.await?;
assert_eq!(resume_resp.status(), StatusCode::BAD_REQUEST);
let delete_resp = client
.delete(format!("http://{addr}/v1/sessions/invalid%20id"))
.send()
.await?;
assert_eq!(delete_resp.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_resume_thread_returns_404_for_missing_session() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!(
"http://{addr}/v1/sessions/nonexistent_session/resume-thread"
))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_resume_thread_creates_thread_from_saved_session() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-session-resume-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
fs::create_dir_all(&sessions_dir)?;
let session = json!({
"schema_version": 1,
"metadata": {
"id": "sess_test_resume",
"title": "Test resume session",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:10:00Z",
"message_count": 2,
"total_tokens": 100,
"model": "deepseek-v4-pro",
"workspace": "/tmp/test",
"mode": "agent"
},
"messages": [
{
"role": "user",
"content": [{ "type": "text", "text": "Hello, world!" }]
},
{
"role": "assistant",
"content": [{ "type": "text", "text": "Hello! How can I help you?" }]
}
],
"system_prompt": null
});
fs::write(
sessions_dir.join("sess_test_resume.json"),
serde_json::to_string_pretty(&session)?,
)?;
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir.clone()).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!(
"http://{addr}/v1/sessions/sess_test_resume/resume-thread"
))
.json(&json!({ "model": "deepseek-v4-pro" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let resumed: serde_json::Value = resp.json().await?;
assert_eq!(resumed["session_id"], "sess_test_resume");
assert_eq!(resumed["message_count"], 2);
let thread_id = resumed["thread_id"]
.as_str()
.context("missing resumed thread id")?;
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["thread"]["id"], thread_id);
assert_eq!(detail["turns"].as_array().map_or(0, Vec::len), 1);
assert_eq!(detail["items"].as_array().map_or(0, Vec::len), 2);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_create_from_completed_thread_saves_messages() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-thread-session-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "deepseek-v4-pro",
"mode": "plan",
"workspace": root.join("workspace")
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let patched: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "title": "Thread title fallback" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(patched["title"], "Thread title fallback");
runtime_threads
.seed_thread_from_messages(
&thread_id,
&[
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: "Please save this runtime thread".to_string(),
cache_control: None,
}],
},
Message {
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: "Saved replies should round-trip.".to_string(),
cache_control: None,
}],
},
],
)
.await?;
let resp = client
.post(format!("http://{addr}/v1/sessions"))
.json(&json!({ "thread_id": thread_id }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let saved: serde_json::Value = resp.json().await?;
assert_eq!(saved["thread_id"], thread_id);
assert_eq!(saved["message_count"], 2);
assert_eq!(saved["title"], "Thread title fallback");
let saved_session_handle = saved["session_id"]
.as_str()
.context("missing session id")?
.to_string();
let session_manager = crate::session_manager::SessionManager::new(root.join("sessions"))?;
let created_session = session_manager.load_session_by_prefix(&saved_session_handle)?;
assert_eq!(created_session.metadata.title, "Thread title fallback");
assert_eq!(created_session.metadata.model, "deepseek-v4-pro");
assert_eq!(created_session.metadata.mode.as_deref(), Some("plan"));
assert_eq!(created_session.metadata.message_count, 2);
assert_eq!(created_session.messages[0].role, "user");
assert_eq!(created_session.messages[1].role, "assistant");
let mut endpoint_session = crate::session_manager::create_saved_session_with_id_and_mode(
"sess_endpoint_fetch".to_string(),
&created_session.messages,
"deepseek-v4-pro",
&root,
0,
None,
Some("plan"),
);
endpoint_session.metadata.title = "Thread title fallback".to_string();
session_manager.save_session(&endpoint_session)?;
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/sessions/sess_endpoint_fetch"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["metadata"]["title"], "Thread title fallback");
assert_eq!(detail["metadata"]["model"], "deepseek-v4-pro");
assert_eq!(detail["metadata"]["mode"], "plan");
assert_eq!(detail["metadata"]["message_count"], 2);
assert_eq!(detail["messages"][0]["role"], "user");
assert_eq!(
detail["messages"][0]["content"][0]["text"],
"Please save this runtime thread"
);
assert_eq!(detail["messages"][1]["role"], "assistant");
let manual_title: serde_json::Value = client
.post(format!("http://{addr}/v1/sessions"))
.json(&json!({
"thread_id": thread_id,
"title": "Manual saved title"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(manual_title["title"], "Manual saved title");
assert_ne!(manual_title["session_id"], saved_session_handle);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_create_from_thread_returns_404_for_missing_thread() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/sessions"))
.json(&json!({ "thread_id": "thr_missing" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
async fn create_seeded_thread(
addr: &SocketAddr,
runtime_threads: &SharedRuntimeThreadManager,
root: &FsPath,
user_text: &str,
) -> Result<String> {
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "deepseek-v4-pro",
"mode": "agent",
"workspace": root.join("workspace")
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
runtime_threads
.seed_thread_from_messages(
&thread_id,
&[
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: user_text.to_string(),
cache_control: None,
}],
},
Message {
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: "Done — anything else?".to_string(),
cache_control: None,
}],
},
],
)
.await?;
Ok(thread_id)
}
#[tokio::test]
async fn undo_endpoint_forks_thread_and_returns_original_user_text() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-undo-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Please undo this turn").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let undone: serde_json::Value = resp.json().await?;
assert_eq!(undone["original_user_text"], "Please undo this turn");
let forked_id = undone["thread"]["id"]
.as_str()
.context("missing forked thread id")?;
assert_ne!(forked_id, thread_id, "undo must fork, not mutate in place");
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{forked_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["turns"].as_array().map_or(usize::MAX, Vec::len), 0);
handle.abort();
Ok(())
}
#[tokio::test]
async fn undo_endpoint_404s_for_missing_thread() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/thr_missing/undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn patch_undo_endpoint_forks_and_reports_file_rollback_state() -> Result<()> {
let root =
std::env::temp_dir().join(format!("deepseek-patch-undo-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Roll back the patch").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/patch-undo"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let undone: serde_json::Value = resp.json().await?;
assert_eq!(undone["patch_result"]["files_restored"], false);
assert!(undone["patch_result"]["summary"].is_string());
assert_eq!(undone["original_user_text"], "Roll back the patch");
assert_ne!(undone["thread"]["id"].as_str(), Some(thread_id.as_str()));
handle.abort();
Ok(())
}
#[tokio::test]
async fn retry_endpoint_reuses_dropped_user_text_to_start_a_turn() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-retry-endpoint-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
let Some((addr, runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir).await?
else {
return Ok(());
};
let thread_id =
create_seeded_thread(&addr, &runtime_threads, &root, "Retry this request").await?;
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/threads/{thread_id}/retry"))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let retried: serde_json::Value = resp.json().await?;
let forked_id = retried["thread"]["id"]
.as_str()
.context("missing forked thread id")?;
assert_ne!(forked_id, thread_id);
assert_eq!(retried["turn"]["thread_id"], forked_id);
handle.abort();
Ok(())
}
#[test]
fn restore_snapshot_endpoint_helper_restores_workspace_files() -> Result<()> {
let _lock = lock_test_env();
let root = tempfile::tempdir()?;
let home = root.path().join("home");
fs::create_dir_all(&home)?;
let _home = EnvVarGuard::set("HOME", &home);
let workspace = root.path().join("workspace");
fs::create_dir_all(&workspace)?;
let repo = crate::snapshot::SnapshotRepo::open_or_init(&workspace)?;
fs::write(workspace.join("a.txt"), "v1")?;
let snapshot_id = repo.snapshot("pre-turn:1")?;
fs::write(workspace.join("a.txt"), "v2")?;
restore_snapshot_for_workspace(&workspace, snapshot_id.as_str())
.expect("snapshot restore should succeed");
assert_eq!(fs::read_to_string(workspace.join("a.txt"))?, "v1");
Ok(())
}
#[tokio::test]
async fn session_create_from_thread_rejects_active_turn() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
let (active_tx, active_rx) = oneshot::channel();
let (finish_tx, finish_rx) = oneshot::channel();
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_active_session_save".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = active_tx.send(());
let _ = finish_rx.await;
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: "now complete".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 2,
output_tokens: 1,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
tool_catalog: None,
base_url: None,
})
.await;
});
let started: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "save me while active" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = started["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
tokio::time::timeout(Duration::from_secs(2), active_rx)
.await
.context("timed out waiting for mock active turn")?
.context("mock active turn sender dropped")?;
wait_for_in_progress_item(&client, addr, &thread_id, Duration::from_secs(2)).await?;
let resp = client
.post(format!("http://{addr}/v1/sessions"))
.json(&json!({ "thread_id": thread_id }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CONFLICT);
let body: serde_json::Value = resp.json().await?;
assert!(
body["error"]["message"]
.as_str()
.is_some_and(|message| message.contains("queued or active turn"))
);
let _ = finish_tx.send(());
let terminal =
wait_for_terminal_turn_status(&client, addr, &thread_id, &turn_id, Duration::from_secs(2))
.await?;
assert_eq!(terminal, "completed");
handle.abort();
Ok(())
}
#[test]
fn snapshots_endpoint_lists_workspace_snapshots() -> Result<()> {
let _lock = lock_test_env();
let root = tempfile::tempdir()?;
let home = root.path().join("home");
fs::create_dir_all(&home)?;
let _home = EnvVarGuard::set("HOME", &home);
let workspace = root.path().join("workspace");
fs::create_dir_all(&workspace)?;
let repo = crate::snapshot::SnapshotRepo::open_or_init(&workspace)?;
fs::write(workspace.join("a.txt"), "v1")?;
repo.snapshot("pre-turn:1")?;
fs::write(workspace.join("a.txt"), "v2")?;
repo.snapshot("post-turn:1")?;
let snapshots = snapshot_entries_for_workspace(&workspace, SnapshotsQuery { limit: Some(1) })
.expect("snapshot listing should succeed");
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].label, "post-turn:1");
assert!(snapshots[0].id.len() >= 8);
assert!(snapshots[0].timestamp > 0);
let bad_limit = snapshot_entries_for_workspace(&workspace, SnapshotsQuery { limit: Some(101) })
.expect_err("limit above cap should fail");
assert_eq!(bad_limit.status, StatusCode::BAD_REQUEST);
Ok(())
}
#[tokio::test]
async fn session_delete_returns_404_for_missing_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.delete(format!("http://{addr}/v1/sessions/nonexistent-id"))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn cors_layer_appends_extra_origins_and_keeps_defaults() -> Result<()> {
let extra = vec!["http://localhost:5173".to_string()];
let layer = cors_layer(&extra);
let router: Router = Router::new()
.route("/probe", get(|| async { "ok" }))
.layer(layer);
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(()),
Err(err) => return Err(err.into()),
};
let addr = listener.local_addr()?;
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
let client = crate::tls::reqwest_client();
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://localhost:5173")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert_eq!(
resp.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://localhost:5173")
);
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://localhost:1420")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert_eq!(
resp.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://localhost:1420")
);
let resp = client
.request(reqwest::Method::OPTIONS, format!("http://{addr}/probe"))
.header("Origin", "http://malicious.example")
.header("Access-Control-Request-Method", "GET")
.send()
.await?;
assert!(
resp.headers().get("access-control-allow-origin").is_none(),
"non-allowed origin must not be echoed back"
);
handle.abort();
Ok(())
}
#[test]
fn cors_layer_skips_invalid_origins() {
let extras = vec![
"http://valid.example".to_string(),
"http://invalid.example\0".to_string(),
" ".to_string(), ];
let _ = cors_layer(&extras);
}
#[tokio::test]
async fn patch_thread_accepts_extended_field_set() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "deepseek-v4-flash",
"mode": "agent"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let patched: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({
"allow_shell": true,
"trust_mode": true,
"auto_approve": true,
"model": "deepseek-v4-pro",
"mode": "yolo",
"title": "Whalescale UI test thread",
"system_prompt": "You are a useful assistant."
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(patched["allow_shell"], true);
assert_eq!(patched["trust_mode"], true);
assert_eq!(patched["auto_approve"], true);
assert_eq!(patched["model"], "deepseek-v4-pro");
assert_eq!(patched["mode"], "yolo");
assert_eq!(patched["title"], "Whalescale UI test thread");
assert_eq!(patched["system_prompt"], "You are a useful assistant.");
let cleared: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "title": "" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
cleared["title"].is_null() || !cleared.as_object().unwrap().contains_key("title"),
"empty title must serialize as None: {cleared:?}"
);
let empty = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({}))
.send()
.await?;
assert_eq!(empty.status(), StatusCode::BAD_REQUEST);
let bad_model = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "model": " " }))
.send()
.await?;
assert_eq!(bad_model.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn list_threads_archived_only_filter_matches_only_archived() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let active: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let active_id = active["id"].as_str().unwrap().to_string();
let archived: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let archived_id = archived["id"].as_str().unwrap().to_string();
client
.patch(format!("http://{addr}/v1/threads/{archived_id}"))
.json(&json!({ "archived": true }))
.send()
.await?
.error_for_status()?;
let active_list: serde_json::Value = client
.get(format!("http://{addr}/v1/threads"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = active_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert!(ids.contains(&active_id.as_str()));
assert!(!ids.contains(&archived_id.as_str()));
let archived_list: serde_json::Value = client
.get(format!("http://{addr}/v1/threads?archived_only=true"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = archived_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(ids, vec![archived_id.as_str()]);
let archived_list: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads?include_archived=true&archived_only=true"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
let ids: Vec<&str> = archived_list
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(ids, vec![archived_id.as_str()]);
let summary: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads/summary?archived_only=true&limit=10"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
let summary_ids: Vec<&str> = summary
.as_array()
.unwrap()
.iter()
.filter_map(|t| t["id"].as_str())
.collect();
assert_eq!(summary_ids, vec![archived_id.as_str()]);
handle.abort();
Ok(())
}
#[tokio::test]
async fn usage_endpoint_returns_empty_aggregation_for_fresh_store() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let body: serde_json::Value = client
.get(format!("http://{addr}/v1/usage"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(body["group_by"], "day");
assert_eq!(body["totals"]["input_tokens"], 0);
assert_eq!(body["totals"]["output_tokens"], 0);
assert_eq!(body["totals"]["turns"], 0);
assert!(
body["buckets"].as_array().unwrap().is_empty(),
"buckets must be empty when no turns exist: {body}"
);
let bad_group = client
.get(format!("http://{addr}/v1/usage?group_by=galaxy"))
.send()
.await?;
assert_eq!(bad_group.status(), StatusCode::BAD_REQUEST);
for gb in ["day", "model", "provider", "thread"] {
let resp = client
.get(format!("http://{addr}/v1/usage?group_by={gb}"))
.send()
.await?;
assert!(resp.status().is_success(), "group_by={gb} failed: {resp:?}");
}
let bad_since = client
.get(format!("http://{addr}/v1/usage?since=not-a-date"))
.send()
.await?;
assert_eq!(bad_since.status(), StatusCode::BAD_REQUEST);
let inverted = client
.get(format!(
"http://{addr}/v1/usage?since=2030-01-02T00:00:00Z&until=2030-01-01T00:00:00Z"
))
.send()
.await?;
assert_eq!(inverted.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn runtime_info_reports_bind_state() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let info: serde_json::Value = client
.get(format!("http://{addr}/v1/runtime/info"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(info["service"], "codewhale-runtime-api");
assert_eq!(info["runtime_api_version"], "1.0");
assert_eq!(info["codewhale_version"], info["version"]);
assert_eq!(info["bind_host"], "127.0.0.1");
assert_eq!(info["auth_required"], false);
assert!(info["version"].is_string());
assert_eq!(info["transports"], json!(["http", "sse"]));
assert_eq!(info["capabilities"]["threads"], true);
assert_eq!(info["capabilities"]["external_tools"], true);
assert!(info["experimental"].is_object());
handle.abort();
Ok(())
}
#[tokio::test]
async fn create_thread_accepts_dynamic_tools_and_environments() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({
"model": "test-model",
"dynamic_tools": [
{
"namespace": "tau_bench",
"name": "get_reservation",
"description": "Look up a reservation.",
"input_schema": { "type": "object" }
}
],
"environments": [
{ "environment_id": "local", "cwd": "/workspace" }
]
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(created["id"].is_string());
handle.abort();
Ok(())
}
#[tokio::test]
async fn start_turn_accepts_dynamic_tools_and_environment_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({ "model": "test-model" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"].as_str().context("missing thread id")?;
let started: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({
"prompt": "hello",
"dynamic_tools": [
{
"name": "simple_tool",
"description": "A simple tool.",
"input_schema": { "type": "object" }
}
],
"environment_id": "local"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(started["turn"]["thread_id"], thread_id);
handle.abort();
Ok(())
}
#[tokio::test]
async fn mobile_page_is_available_only_when_enabled() -> Result<()> {
let tmp = tempfile::tempdir()?;
let root = tmp.path().to_path_buf();
let sessions_dir = root.join("sessions");
let Some((addr, _runtime_threads, handle)) = spawn_test_server_with_root_token_and_mobile(
root.clone(),
sessions_dir.clone(),
None,
false,
)
.await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let disabled = client.get(format!("http://{addr}/mobile")).send().await?;
assert_eq!(disabled.status(), StatusCode::NOT_FOUND);
handle.abort();
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_and_mobile(root, sessions_dir, None, true).await?
else {
return Ok(());
};
let enabled = client
.get(format!("http://{addr}/mobile"))
.send()
.await?
.error_for_status()?;
let html = enabled.text().await?;
assert!(html.contains("CodeWhale Mobile"));
assert!(html.contains("/v1/approvals/"));
assert!(html.contains("MAX_VISIBLE_EVENTS = 100"));
assert!(html.contains("replay_limit="));
handle.abort();
Ok(())
}
#[tokio::test]
async fn mobile_page_serves_shell_when_auth_enabled() -> Result<()> {
let tmp = tempfile::tempdir()?;
let root = tmp.path().to_path_buf();
let sessions_dir = root.join("sessions");
let token = "abc ABC+/?:=&%".to_string();
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_and_mobile(root, sessions_dir, Some(token.clone()), true)
.await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let shell = client
.get(format!("http://{addr}/mobile"))
.send()
.await?
.error_for_status()?;
let html = shell.text().await?;
assert!(html.contains("CodeWhale Mobile"));
assert!(html.contains("TOKEN_COOKIE"));
let bearer = client
.get(format!("http://{addr}/mobile"))
.bearer_auth(&token)
.send()
.await?
.error_for_status()?;
assert!(bearer.text().await?.contains("CodeWhale Mobile"));
handle.abort();
Ok(())
}
#[tokio::test]
async fn mobile_insecure_mode_allows_page_and_v1_routes_without_token() -> Result<()> {
let tmp = tempfile::tempdir()?;
let root = tmp.path().to_path_buf();
let sessions_dir = root.join("sessions");
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root_token_and_mobile(root, sessions_dir, None, true).await?
else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let page = client
.get(format!("http://{addr}/mobile"))
.send()
.await?
.error_for_status()?;
assert!(page.text().await?.contains("CodeWhale Mobile"));
let summary = client
.get(format!("http://{addr}/v1/threads/summary"))
.send()
.await?
.error_for_status()?;
assert_eq!(summary.status(), StatusCode::OK);
handle.abort();
Ok(())
}
#[tokio::test]
async fn decide_approval_404s_when_nothing_pending() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/approvals/no_such_id"))
.json(&json!({ "decision": "allow" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn decide_approval_400s_on_bad_decision() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/approvals/whatever"))
.json(&json!({ "decision": "yolo" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn decide_approval_delivers_to_runtime() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let rx = runtime_threads.register_pending_approval_for_test("ext_id");
let resp = client
.post(format!("http://{addr}/v1/approvals/ext_id"))
.json(&json!({ "decision": "allow", "remember": false }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::OK);
let body: serde_json::Value = resp.json().await?;
assert_eq!(body["ok"], true);
assert_eq!(body["decision"], "allow");
assert_eq!(body["delivered"], true);
let received = tokio::time::timeout(Duration::from_secs(1), rx).await??;
assert_eq!(
received,
ExternalApprovalDecision::Allow { remember: false }
);
handle.abort();
Ok(())
}
#[tokio::test]
async fn dynamic_tool_result_endpoint_delivers_to_runtime() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let thread: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = thread["id"].as_str().context("thread id")?;
let rx = runtime_threads.register_pending_dynamic_tool_for_test("call_1");
let resp = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/turn_1/tool-calls/call_1/result"
))
.json(&json!({
"success": true,
"content": [{ "type": "input_text", "text": "ok" }]
}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::ACCEPTED);
let received = tokio::time::timeout(Duration::from_secs(1), rx).await??;
assert!(received.success);
assert_eq!(received.content.len(), 1);
handle.abort();
Ok(())
}
#[tokio::test]
async fn skills_endpoint_includes_enabled_field() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let body: serde_json::Value = client
.get(format!("http://{addr}/v1/skills"))
.send()
.await?
.error_for_status()?
.json()
.await?;
if let Some(skills) = body["skills"].as_array() {
for skill in skills {
assert!(skill.get("enabled").is_some());
}
}
handle.abort();
Ok(())
}
#[tokio::test]
async fn skill_toggle_endpoint_404s_for_unknown_skill() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = crate::tls::reqwest_client();
let resp = client
.post(format!("http://{addr}/v1/skills/no-such-skill"))
.json(&json!({ "enabled": false }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[test]
fn resolve_skills_dir_finds_workspace_local_agents_skills() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace = tmp.path();
let local_skills = workspace.join(".agents").join("skills");
fs::create_dir_all(&local_skills).expect("create skills dir");
let config = Config::default();
let resolved = resolve_skills_dir(&config, workspace);
let expected = fs::canonicalize(&local_skills).expect("canonical local skills");
assert_eq!(resolved, expected);
}
#[test]
fn resolve_skills_dir_finds_workspace_local_skills_fallback() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace = tmp.path();
let local_skills = workspace.join("skills");
fs::create_dir_all(&local_skills).expect("create skills dir");
let config = Config::default();
let resolved = resolve_skills_dir(&config, workspace);
let expected = fs::canonicalize(&local_skills).expect("canonical local skills");
assert_eq!(resolved, expected);
}
#[test]
fn resolve_skills_dir_respects_codewhale_only_scan() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace = tmp.path();
let agents_skills = workspace.join(".agents").join("skills");
let codewhale_skills = workspace.join(".codewhale").join("skills");
fs::create_dir_all(&agents_skills).expect("create agents skills dir");
fs::create_dir_all(&codewhale_skills).expect("create codewhale skills dir");
let config = Config {
skills: Some(crate::config::SkillsConfig {
scan_codewhale_only: Some(true),
..Default::default()
}),
..Default::default()
};
let resolved = resolve_skills_dir(&config, workspace);
let expected = fs::canonicalize(&codewhale_skills).expect("canonical codewhale skills");
assert_eq!(resolved, expected);
}
#[test]
fn resolve_skills_dir_preserves_explicit_dir_in_codewhale_only_scan() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace = tmp.path().join("workspace");
let codewhale_skills = workspace.join(".codewhale").join("skills");
let configured_skills = tmp.path().join("configured-skills");
fs::create_dir_all(&codewhale_skills).expect("create codewhale skills dir");
fs::create_dir_all(&configured_skills).expect("create configured skills dir");
let config = Config {
skills_dir: Some(configured_skills.to_string_lossy().into_owned()),
skills: Some(crate::config::SkillsConfig {
scan_codewhale_only: Some(true),
..Default::default()
}),
..Default::default()
};
let resolved = resolve_skills_dir(&config, &workspace);
assert_eq!(resolved, configured_skills);
}
#[test]
fn skills_search_directories_includes_custom_skills_dir() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace = tmp.path().join("workspace");
let custom_skills = tmp.path().join("custom-skills");
fs::create_dir_all(&workspace).expect("create workspace");
fs::create_dir_all(&custom_skills).expect("create custom skills");
let directories = skills_search_directories(
&workspace,
&custom_skills,
crate::skills::SkillDiscoveryMode::Compatible,
);
assert!(
directories.iter().any(|dir| dir == &custom_skills),
"custom skills_dir must be reported when discovery searches it"
);
let message = format_skill_search_paths(&directories);
assert!(message.contains("custom-skills"));
}
#[test]
fn skill_entry_is_bundled_requires_configured_bundle_path() {
let tmp = tempfile::tempdir().expect("tempdir");
let bundled_skills_dir = tmp.path().join("bundled-skills");
let bundled_skill_path = bundled_skills_dir.join("delegate").join("SKILL.md");
let override_skill_path = tmp
.path()
.join("workspace")
.join(".agents")
.join("skills")
.join("delegate")
.join("SKILL.md");
fs::create_dir_all(bundled_skill_path.parent().expect("bundled parent"))
.expect("create bundled skill dir");
fs::create_dir_all(override_skill_path.parent().expect("override parent"))
.expect("create override skill dir");
fs::write(
&bundled_skill_path,
"---\nname: delegate\ndescription: bundled\n---\n",
)
.expect("write bundled skill");
fs::write(
&override_skill_path,
"---\nname: delegate\ndescription: override\n---\n",
)
.expect("write override skill");
let bundled_skill = crate::skills::Skill {
name: "delegate".to_string(),
description: String::new(),
body: String::new(),
path: bundled_skill_path,
};
let override_skill = crate::skills::Skill {
name: "delegate".to_string(),
description: String::new(),
body: String::new(),
path: override_skill_path,
};
assert!(skill_entry_is_bundled(&bundled_skill, &bundled_skills_dir));
assert!(!skill_entry_is_bundled(
&override_skill,
&bundled_skills_dir
));
}
#[cfg(unix)]
#[test]
fn resolve_skills_dir_rejects_symlink_escaping_workspace() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace_root = tmp.path().join("workspace");
let escape_target = tmp.path().join("escape_target");
fs::create_dir_all(&workspace_root).expect("create workspace");
fs::create_dir_all(&escape_target).expect("create escape target");
let dotagents = workspace_root.join(".agents");
fs::create_dir_all(&dotagents).expect("create .agents");
let bad_link = dotagents.join("skills");
std::os::unix::fs::symlink(&escape_target, &bad_link).expect("symlink");
let config = Config::default();
let resolved = resolve_skills_dir(&config, &workspace_root);
let canon_escape = fs::canonicalize(&escape_target).expect("canon escape");
assert_ne!(
resolved, canon_escape,
"symlink escaping workspace must not be resolved as skills dir"
);
assert_eq!(
resolved,
config.skills_dir(),
"with no valid in-workspace skills dir, resolution should fall back to config"
);
}
#[cfg(unix)]
#[test]
fn resolve_skills_dir_rejects_codewhale_only_symlink_escaping_workspace() {
let tmp = tempfile::tempdir().expect("tempdir");
let workspace_root = tmp.path().join("workspace");
let escape_target = tmp.path().join("escape_target");
fs::create_dir_all(&workspace_root).expect("create workspace");
fs::create_dir_all(&escape_target).expect("create escape target");
let dotcodewhale = workspace_root.join(".codewhale");
fs::create_dir_all(&dotcodewhale).expect("create .codewhale");
let bad_link = dotcodewhale.join("skills");
std::os::unix::fs::symlink(&escape_target, &bad_link).expect("symlink");
let config = Config {
skills: Some(crate::config::SkillsConfig {
scan_codewhale_only: Some(true),
..Default::default()
}),
..Default::default()
};
let resolved = resolve_skills_dir(&config, &workspace_root);
let canon_escape = fs::canonicalize(&escape_target).expect("canon escape");
assert_ne!(
resolved, canon_escape,
"CodeWhale-only symlink escaping workspace must not be resolved as skills dir"
);
assert_eq!(
resolved,
config.skills_dir(),
"with no valid in-workspace CodeWhale skills dir, resolution should fall back to config"
);
}