use std::collections::HashSet;
use std::convert::Infallible;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use async_stream::stream;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderValue, Method, StatusCode};
use axum::response::sse::{Event as SseEvent, KeepAlive, Sse};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tower_http::cors::{Any, CorsLayer};
use crate::automation_manager::{
AutomationManager, AutomationRecord, AutomationRunRecord, AutomationSchedulerConfig,
CreateAutomationRequest, SharedAutomationManager, UpdateAutomationRequest, spawn_scheduler,
};
use crate::config::{Config, DEFAULT_TEXT_MODEL};
use crate::mcp::{McpConfig, McpPool};
use crate::runtime_threads::{
CompactThreadRequest, CreateThreadRequest, RuntimeThreadManager, RuntimeThreadManagerConfig,
SharedRuntimeThreadManager, StartTurnRequest, SteerTurnRequest, ThreadDetail, ThreadRecord,
TurnItemKind, TurnRecord, UpdateThreadRequest,
};
use crate::session_manager::{SavedSession, SessionManager, SessionMetadata, default_sessions_dir};
use crate::skills::SkillRegistry;
use crate::task_manager::{
NewTaskRequest, SharedTaskManager, TaskManager, TaskManagerConfig, TaskRecord, TaskSummary,
};
#[derive(Clone)]
pub struct RuntimeApiState {
config: Config,
workspace: PathBuf,
task_manager: SharedTaskManager,
runtime_threads: SharedRuntimeThreadManager,
sessions_dir: PathBuf,
mcp_config_path: PathBuf,
automations: SharedAutomationManager,
}
#[derive(Debug, Clone)]
pub struct RuntimeApiOptions {
pub host: String,
pub port: u16,
pub workers: usize,
}
#[derive(Debug, Deserialize)]
struct StreamTurnRequest {
prompt: String,
model: Option<String>,
mode: Option<String>,
workspace: Option<PathBuf>,
allow_shell: Option<bool>,
trust_mode: Option<bool>,
auto_approve: Option<bool>,
}
#[derive(Debug, Serialize)]
struct HealthResponse {
status: &'static str,
service: &'static str,
mode: &'static str,
}
#[derive(Debug, Serialize)]
struct SessionsResponse {
sessions: Vec<SessionMetadata>,
}
#[derive(Debug, Serialize)]
struct SessionDetailResponse {
metadata: SessionMetadata,
messages: Vec<serde_json::Value>,
system_prompt: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ResumeSessionRequest {
model: Option<String>,
mode: Option<String>,
}
#[derive(Debug, Serialize)]
struct ResumeSessionResponse {
thread_id: String,
session_id: String,
message_count: usize,
summary: String,
}
#[derive(Debug, Serialize)]
struct TasksResponse {
tasks: Vec<TaskSummary>,
counts: crate::task_manager::TaskCounts,
}
#[derive(Debug, Deserialize)]
struct SessionsQuery {
limit: Option<usize>,
search: Option<String>,
}
#[derive(Debug, Deserialize)]
struct TasksQuery {
limit: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct ThreadsQuery {
limit: Option<usize>,
include_archived: Option<bool>,
}
#[derive(Debug, Deserialize)]
struct ThreadSummaryQuery {
limit: Option<usize>,
search: Option<String>,
include_archived: Option<bool>,
}
#[derive(Debug, Serialize)]
struct ThreadSummary {
id: String,
title: String,
preview: String,
model: String,
mode: String,
archived: bool,
updated_at: chrono::DateTime<Utc>,
latest_turn_id: Option<String>,
latest_turn_status: Option<String>,
}
#[derive(Debug, Serialize)]
struct WorkspaceStatusResponse {
workspace: PathBuf,
git_repo: bool,
branch: Option<String>,
staged: usize,
unstaged: usize,
untracked: usize,
ahead: Option<u32>,
behind: Option<u32>,
}
#[derive(Debug, Serialize)]
struct SkillEntry {
name: String,
description: String,
path: PathBuf,
}
#[derive(Debug, Serialize)]
struct SkillsResponse {
directory: PathBuf,
warnings: Vec<String>,
skills: Vec<SkillEntry>,
}
#[derive(Debug, Serialize)]
struct McpServerEntry {
name: String,
enabled: bool,
required: bool,
command: Option<String>,
url: Option<String>,
connected: bool,
enabled_tools: Vec<String>,
disabled_tools: Vec<String>,
}
#[derive(Debug, Serialize)]
struct McpServersResponse {
servers: Vec<McpServerEntry>,
}
#[derive(Debug, Deserialize)]
struct McpToolsQuery {
server: Option<String>,
}
#[derive(Debug, Serialize)]
struct McpToolEntry {
server: String,
name: String,
prefixed_name: String,
description: Option<String>,
input_schema: Value,
}
#[derive(Debug, Serialize)]
struct McpToolsResponse {
tools: Vec<McpToolEntry>,
}
#[derive(Debug, Deserialize)]
struct AutomationRunsQuery {
limit: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct ThreadEventsQuery {
since_seq: Option<u64>,
}
#[derive(Debug, Serialize)]
struct StartTurnResponse {
thread: ThreadRecord,
turn: TurnRecord,
}
pub async fn run_http_server(
config: Config,
workspace: PathBuf,
options: RuntimeApiOptions,
) -> Result<()> {
if options.port == 0 {
bail!("Port must be > 0");
}
let task_cfg = TaskManagerConfig::from_runtime(
&config,
workspace.clone(),
config.default_text_model.clone(),
Some(options.workers),
);
let runtime_threads = Arc::new(RuntimeThreadManager::open(
config.clone(),
workspace.clone(),
RuntimeThreadManagerConfig::from_task_data_dir(task_cfg.data_dir.clone()),
)?);
let task_manager =
TaskManager::start_with_runtime_manager(task_cfg, config.clone(), runtime_threads.clone())
.await?;
let automations = Arc::new(Mutex::new(AutomationManager::default_location()?));
let scheduler_cancel = CancellationToken::new();
let scheduler_handle = spawn_scheduler(
automations.clone(),
task_manager.clone(),
scheduler_cancel.clone(),
AutomationSchedulerConfig::default(),
);
let sessions_dir = default_sessions_dir().unwrap_or_else(|_| {
dirs::home_dir()
.map(|h| h.join(".deepseek").join("sessions"))
.unwrap_or_else(|| PathBuf::from(".deepseek").join("sessions"))
});
let state = RuntimeApiState {
config: config.clone(),
workspace,
task_manager,
runtime_threads,
sessions_dir,
mcp_config_path: config.mcp_config_path(),
automations,
};
let app = build_router(state);
let addr: SocketAddr = format!("{}:{}", options.host, options.port)
.parse()
.with_context(|| format!("Invalid bind address '{}:{}'", options.host, options.port))?;
let listener = TcpListener::bind(addr)
.await
.with_context(|| format!("Failed to bind {addr}"))?;
println!("Runtime API listening on http://{addr}");
println!("Security: this server is local-first. Do not expose it to untrusted networks.");
let serve_result = axum::serve(listener, app)
.await
.map_err(|e| anyhow!("Runtime API server error: {e}"));
scheduler_cancel.cancel();
scheduler_handle.abort();
serve_result
}
pub fn build_router(state: RuntimeApiState) -> Router {
Router::new()
.route("/health", get(health))
.route("/v1/sessions", get(list_sessions))
.route("/v1/sessions/{id}", get(get_session).delete(delete_session))
.route(
"/v1/sessions/{id}/resume-thread",
post(resume_session_thread),
)
.route("/v1/workspace/status", get(workspace_status))
.route("/v1/stream", post(stream_turn))
.route("/v1/threads", get(list_threads).post(create_thread))
.route("/v1/threads/summary", get(list_threads_summary))
.route("/v1/threads/{id}", get(get_thread).patch(update_thread))
.route("/v1/threads/{id}/resume", post(resume_thread))
.route("/v1/threads/{id}/fork", post(fork_thread))
.route("/v1/threads/{id}/turns", post(start_thread_turn))
.route(
"/v1/threads/{id}/turns/{turn_id}/steer",
post(steer_thread_turn),
)
.route(
"/v1/threads/{id}/turns/{turn_id}/interrupt",
post(interrupt_thread_turn),
)
.route("/v1/threads/{id}/compact", post(compact_thread))
.route("/v1/threads/{id}/events", get(stream_thread_events))
.route("/v1/tasks", get(list_tasks).post(create_task))
.route("/v1/tasks/{id}", get(get_task))
.route("/v1/tasks/{id}/cancel", post(cancel_task))
.route("/v1/skills", get(list_skills))
.route("/v1/apps/mcp/servers", get(list_mcp_servers))
.route("/v1/apps/mcp/tools", get(list_mcp_tools))
.route(
"/v1/automations",
get(list_automations).post(create_automation),
)
.route(
"/v1/automations/{id}",
get(get_automation)
.patch(update_automation)
.delete(delete_automation),
)
.route("/v1/automations/{id}/run", post(run_automation))
.route("/v1/automations/{id}/pause", post(pause_automation))
.route("/v1/automations/{id}/resume", post(resume_automation))
.route("/v1/automations/{id}/runs", get(list_automation_runs))
.layer(cors_layer())
.with_state(state)
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok",
service: "deepseek-runtime-api",
mode: "local",
})
}
async fn list_sessions(
State(state): State<RuntimeApiState>,
Query(query): Query<SessionsQuery>,
) -> Result<Json<SessionsResponse>, ApiError> {
let manager = SessionManager::new(state.sessions_dir.clone())
.map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?;
let mut sessions = if let Some(search) = query.search {
manager
.search_sessions(&search)
.map_err(|e| ApiError::internal(format!("Failed to search sessions: {e}")))?
} else {
manager
.list_sessions()
.map_err(|e| ApiError::internal(format!("Failed to list sessions: {e}")))?
};
let limit = query.limit.unwrap_or(50).clamp(1, 500);
sessions.truncate(limit);
Ok(Json(SessionsResponse { sessions }))
}
async fn get_session(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<SessionDetailResponse>, ApiError> {
let manager = SessionManager::new(state.sessions_dir.clone())
.map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?;
let session = manager
.load_session(&id)
.map_err(|e| map_session_err(&id, e, "read"))?;
Ok(Json(session_to_detail(session)))
}
async fn resume_session_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<ResumeSessionRequest>,
) -> Result<(StatusCode, Json<ResumeSessionResponse>), ApiError> {
let manager = SessionManager::new(state.sessions_dir.clone())
.map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?;
let session = manager
.load_session(&id)
.map_err(|e| map_session_err(&id, e, "read"))?;
let model = req.model.unwrap_or_else(|| session.metadata.model.clone());
let mode = req.mode.unwrap_or_else(|| {
session
.metadata
.mode
.clone()
.unwrap_or_else(|| "agent".to_string())
});
let thread = state
.runtime_threads
.create_thread(CreateThreadRequest {
model: Some(model),
workspace: Some(state.workspace.clone()),
mode: Some(mode),
allow_shell: None,
trust_mode: None,
auto_approve: None,
archived: false,
system_prompt: session.system_prompt.clone(),
})
.await
.map_err(|e| ApiError::internal(format!("Failed to create thread: {e}")))?;
let msg_count = session.messages.len();
state
.runtime_threads
.seed_thread_from_messages(&thread.id, &session.messages)
.await
.map_err(|e| ApiError::internal(format!("Failed to seed thread history: {e}")))?;
let summary = format!(
"Resumed session '{}' ({} messages) into thread {}",
session.metadata.title, msg_count, thread.id
);
Ok((
StatusCode::CREATED,
Json(ResumeSessionResponse {
thread_id: thread.id,
session_id: id,
message_count: msg_count,
summary,
}),
))
}
async fn delete_session(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
let manager = SessionManager::new(state.sessions_dir.clone())
.map_err(|e| ApiError::internal(format!("Failed to open sessions dir: {e}")))?;
manager
.delete_session(&id)
.map_err(|e| map_session_err(&id, e, "delete"))?;
Ok(StatusCode::NO_CONTENT)
}
fn session_to_detail(session: SavedSession) -> SessionDetailResponse {
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.map(|msg| {
let content_blocks: Vec<serde_json::Value> = msg
.content
.iter()
.map(|block| match block {
crate::models::ContentBlock::Text { text, .. } => {
json!({ "type": "text", "text": text })
}
crate::models::ContentBlock::Thinking { thinking, .. } => {
json!({ "type": "thinking", "text": thinking })
}
_ => json!({ "type": "other" }),
})
.collect();
json!({
"role": msg.role,
"content": content_blocks,
})
})
.collect();
SessionDetailResponse {
metadata: session.metadata,
messages,
system_prompt: session.system_prompt,
}
}
fn map_session_err(id: &str, err: std::io::Error, action: &str) -> ApiError {
match err.kind() {
std::io::ErrorKind::NotFound => ApiError::not_found(format!("Session '{id}' not found")),
std::io::ErrorKind::InvalidData => {
ApiError::bad_request(format!("Failed to parse session '{id}': {err}"))
}
std::io::ErrorKind::InvalidInput => {
ApiError::bad_request(format!("Invalid session id '{id}'"))
}
_ => ApiError::internal(format!("Failed to {action} session '{id}': {err}")),
}
}
async fn create_task(
State(state): State<RuntimeApiState>,
Json(mut req): Json<NewTaskRequest>,
) -> Result<(StatusCode, Json<TaskRecord>), ApiError> {
if req.prompt.trim().is_empty() {
return Err(ApiError::bad_request("prompt is required"));
}
if req.workspace.is_none() {
req.workspace = Some(state.workspace.clone());
}
if req.model.is_none() {
req.model = Some(
state
.config
.default_text_model
.clone()
.unwrap_or_else(|| DEFAULT_TEXT_MODEL.to_string()),
);
}
let task = state
.task_manager
.add_task(req)
.await
.map_err(|e| ApiError::bad_request(e.to_string()))?;
Ok((StatusCode::CREATED, Json(task)))
}
async fn create_thread(
State(state): State<RuntimeApiState>,
Json(mut req): Json<CreateThreadRequest>,
) -> Result<(StatusCode, Json<ThreadRecord>), ApiError> {
if req.model.as_ref().is_none_or(|m| m.trim().is_empty()) {
req.model = Some(
state
.config
.default_text_model
.clone()
.unwrap_or_else(|| DEFAULT_TEXT_MODEL.to_string()),
);
}
if req.workspace.is_none() {
req.workspace = Some(state.workspace.clone());
}
if req.mode.as_ref().is_none_or(|m| m.trim().is_empty()) {
req.mode = Some("agent".to_string());
}
let thread = state
.runtime_threads
.create_thread(req)
.await
.map_err(|e| ApiError::bad_request(e.to_string()))?;
Ok((StatusCode::CREATED, Json(thread)))
}
async fn list_threads(
State(state): State<RuntimeApiState>,
Query(query): Query<ThreadsQuery>,
) -> Result<Json<Vec<ThreadRecord>>, ApiError> {
let threads = state
.runtime_threads
.list_threads(query.include_archived.unwrap_or(false), query.limit)
.await
.map_err(|e| ApiError::internal(e.to_string()))?;
Ok(Json(threads))
}
async fn list_threads_summary(
State(state): State<RuntimeApiState>,
Query(query): Query<ThreadSummaryQuery>,
) -> Result<Json<Vec<ThreadSummary>>, ApiError> {
let limit = query.limit.unwrap_or(50).clamp(1, 500);
let search = query.search.as_deref().map(str::to_ascii_lowercase);
let threads = state
.runtime_threads
.list_threads(query.include_archived.unwrap_or(false), Some(limit))
.await
.map_err(|e| ApiError::internal(e.to_string()))?;
let mut summaries = Vec::new();
for thread in threads {
let detail = state
.runtime_threads
.get_thread_detail(&thread.id)
.await
.map_err(map_thread_err)?;
let latest_turn = detail.turns.last();
let latest_status =
latest_turn.map(|turn| format!("{:?}", turn.status).to_ascii_lowercase());
let title = latest_turn
.map(|turn| {
if turn.input_summary.trim().is_empty() {
"New Thread".to_string()
} else {
truncate_text(&turn.input_summary, 72)
}
})
.unwrap_or_else(|| "New Thread".to_string());
let preview = detail
.items
.iter()
.rev()
.find_map(|item| match item.kind {
TurnItemKind::AgentMessage | TurnItemKind::UserMessage => {
let text = item.detail.clone().unwrap_or_else(|| item.summary.clone());
if text.trim().is_empty() {
None
} else {
Some(truncate_text(&text, 140))
}
}
_ => None,
})
.unwrap_or_else(|| title.clone());
if let Some(search) = &search {
let haystack = format!(
"{} {} {} {}",
thread.id.to_ascii_lowercase(),
title.to_ascii_lowercase(),
preview.to_ascii_lowercase(),
thread.model.to_ascii_lowercase()
);
if !haystack.contains(search) {
continue;
}
}
summaries.push(ThreadSummary {
id: thread.id,
title,
preview,
model: thread.model,
mode: thread.mode,
archived: thread.archived,
updated_at: thread.updated_at,
latest_turn_id: thread.latest_turn_id,
latest_turn_status: latest_status,
});
}
if summaries.len() > limit {
summaries.truncate(limit);
}
Ok(Json(summaries))
}
async fn workspace_status(
State(state): State<RuntimeApiState>,
) -> Result<Json<WorkspaceStatusResponse>, ApiError> {
Ok(Json(collect_workspace_status(&state.workspace)))
}
async fn list_skills(
State(state): State<RuntimeApiState>,
) -> Result<Json<SkillsResponse>, ApiError> {
let skills_dir = resolve_skills_dir(&state.config, &state.workspace);
let registry = SkillRegistry::discover(&skills_dir);
let skills = registry
.list()
.iter()
.map(|skill| SkillEntry {
name: skill.name.clone(),
description: skill.description.clone(),
path: skills_dir.join(&skill.name).join("SKILL.md"),
})
.collect();
Ok(Json(SkillsResponse {
directory: skills_dir,
warnings: registry.warnings().to_vec(),
skills,
}))
}
async fn list_mcp_servers(
State(state): State<RuntimeApiState>,
) -> Result<Json<McpServersResponse>, ApiError> {
let config = load_mcp_config_or_default(&state.mcp_config_path)?;
let mut pool = McpPool::new(config.clone());
let _errors = pool.connect_all().await;
let connected: HashSet<String> = pool
.connected_servers()
.into_iter()
.map(str::to_string)
.collect();
let mut servers = Vec::new();
for (name, server_cfg) in config.servers {
servers.push(McpServerEntry {
name: name.clone(),
enabled: server_cfg.is_enabled(),
required: server_cfg.required,
command: server_cfg.command.clone(),
url: server_cfg.url.clone(),
connected: connected.contains(&name),
enabled_tools: server_cfg.enabled_tools.clone(),
disabled_tools: server_cfg.disabled_tools.clone(),
});
}
servers.sort_by(|a, b| a.name.cmp(&b.name));
Ok(Json(McpServersResponse { servers }))
}
async fn list_mcp_tools(
State(state): State<RuntimeApiState>,
Query(query): Query<McpToolsQuery>,
) -> Result<Json<McpToolsResponse>, ApiError> {
let mut pool = McpPool::from_config_path(&state.mcp_config_path)
.map_err(|e| ApiError::internal(format!("Failed to load MCP config: {e}")))?;
let _errors = pool.connect_all().await;
let mut tools = Vec::new();
for (prefixed_name, tool) in pool.all_tools() {
let Some(rest) = prefixed_name.strip_prefix("mcp_") else {
continue;
};
let Some((server, name)) = rest.split_once('_') else {
continue;
};
if let Some(filter) = query.server.as_deref()
&& server != filter
{
continue;
}
tools.push(McpToolEntry {
server: server.to_string(),
name: name.to_string(),
prefixed_name,
description: tool.description.clone(),
input_schema: tool.input_schema.clone(),
});
}
tools.sort_by(|a, b| a.server.cmp(&b.server).then_with(|| a.name.cmp(&b.name)));
Ok(Json(McpToolsResponse { tools }))
}
async fn list_automations(
State(state): State<RuntimeApiState>,
) -> Result<Json<Vec<AutomationRecord>>, ApiError> {
let manager = state.automations.lock().await;
let automations = manager
.list_automations()
.map_err(|e| ApiError::internal(format!("Failed to list automations: {e}")))?;
Ok(Json(automations))
}
async fn create_automation(
State(state): State<RuntimeApiState>,
Json(req): Json<CreateAutomationRequest>,
) -> Result<(StatusCode, Json<AutomationRecord>), ApiError> {
let manager = state.automations.lock().await;
let automation = manager
.create_automation(req)
.map_err(|e| ApiError::bad_request(e.to_string()))?;
Ok((StatusCode::CREATED, Json(automation)))
}
async fn get_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<AutomationRecord>, ApiError> {
let manager = state.automations.lock().await;
let automation = manager.get_automation(&id).map_err(map_automation_err)?;
Ok(Json(automation))
}
async fn update_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<UpdateAutomationRequest>,
) -> Result<Json<AutomationRecord>, ApiError> {
let manager = state.automations.lock().await;
let automation = manager
.update_automation(&id, req)
.map_err(map_automation_err)?;
Ok(Json(automation))
}
async fn delete_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<AutomationRecord>, ApiError> {
let manager = state.automations.lock().await;
let automation = manager.delete_automation(&id).map_err(map_automation_err)?;
Ok(Json(automation))
}
async fn run_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<AutomationRunRecord>, ApiError> {
let manager = state.automations.lock().await;
let run = manager
.run_now(&id, &state.task_manager)
.await
.map_err(map_automation_err)?;
Ok(Json(run))
}
async fn pause_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<AutomationRecord>, ApiError> {
let manager = state.automations.lock().await;
let automation = manager.pause_automation(&id).map_err(map_automation_err)?;
Ok(Json(automation))
}
async fn resume_automation(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<AutomationRecord>, ApiError> {
let manager = state.automations.lock().await;
let automation = manager.resume_automation(&id).map_err(map_automation_err)?;
Ok(Json(automation))
}
async fn list_automation_runs(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Query(query): Query<AutomationRunsQuery>,
) -> Result<Json<Vec<AutomationRunRecord>>, ApiError> {
let manager = state.automations.lock().await;
let runs = manager
.list_runs(&id, query.limit)
.map_err(map_automation_err)?;
Ok(Json(runs))
}
async fn get_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<ThreadDetail>, ApiError> {
let detail = state
.runtime_threads
.get_thread_detail(&id)
.await
.map_err(map_thread_err)?;
Ok(Json(detail))
}
async fn update_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<UpdateThreadRequest>,
) -> Result<Json<ThreadRecord>, ApiError> {
let thread = state
.runtime_threads
.update_thread(&id, req)
.await
.map_err(map_thread_err)?;
Ok(Json(thread))
}
async fn resume_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<ThreadRecord>, ApiError> {
let thread = state
.runtime_threads
.resume_thread(&id)
.await
.map_err(map_thread_err)?;
Ok(Json(thread))
}
async fn fork_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<(StatusCode, Json<ThreadRecord>), ApiError> {
let thread = state
.runtime_threads
.fork_thread(&id)
.await
.map_err(map_thread_err)?;
Ok((StatusCode::CREATED, Json(thread)))
}
async fn start_thread_turn(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<StartTurnRequest>,
) -> Result<(StatusCode, Json<StartTurnResponse>), ApiError> {
let turn = state
.runtime_threads
.start_turn(&id, req)
.await
.map_err(map_thread_err)?;
let thread = state
.runtime_threads
.get_thread(&id)
.await
.map_err(map_thread_err)?;
Ok((
StatusCode::CREATED,
Json(StartTurnResponse { thread, turn }),
))
}
async fn steer_thread_turn(
State(state): State<RuntimeApiState>,
Path((id, turn_id)): Path<(String, String)>,
Json(req): Json<SteerTurnRequest>,
) -> Result<Json<TurnRecord>, ApiError> {
let turn = state
.runtime_threads
.steer_turn(&id, &turn_id, req)
.await
.map_err(map_thread_err)?;
Ok(Json(turn))
}
async fn interrupt_thread_turn(
State(state): State<RuntimeApiState>,
Path((id, turn_id)): Path<(String, String)>,
) -> Result<Json<TurnRecord>, ApiError> {
let turn = state
.runtime_threads
.interrupt_turn(&id, &turn_id)
.await
.map_err(map_thread_err)?;
Ok(Json(turn))
}
async fn compact_thread(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Json(req): Json<CompactThreadRequest>,
) -> Result<(StatusCode, Json<StartTurnResponse>), ApiError> {
let turn = state
.runtime_threads
.compact_thread(&id, req)
.await
.map_err(map_thread_err)?;
let thread = state
.runtime_threads
.get_thread(&id)
.await
.map_err(map_thread_err)?;
Ok((
StatusCode::ACCEPTED,
Json(StartTurnResponse { thread, turn }),
))
}
async fn list_tasks(
State(state): State<RuntimeApiState>,
Query(query): Query<TasksQuery>,
) -> Result<Json<TasksResponse>, ApiError> {
let tasks = state.task_manager.list_tasks(query.limit).await;
let counts = state.task_manager.counts().await;
Ok(Json(TasksResponse { tasks, counts }))
}
async fn get_task(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<TaskRecord>, ApiError> {
let task = state
.task_manager
.get_task(&id)
.await
.map_err(map_task_err)?;
Ok(Json(task))
}
async fn cancel_task(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
) -> Result<Json<TaskRecord>, ApiError> {
let task = state
.task_manager
.cancel_task(&id)
.await
.map_err(map_task_err)?;
Ok(Json(task))
}
async fn stream_thread_events(
State(state): State<RuntimeApiState>,
Path(id): Path<String>,
Query(query): Query<ThreadEventsQuery>,
) -> Result<Sse<impl futures_util::Stream<Item = Result<SseEvent, Infallible>>>, ApiError> {
let _ = state
.runtime_threads
.get_thread(&id)
.await
.map_err(map_thread_err)?;
let backlog = state
.runtime_threads
.events_since(&id, query.since_seq)
.map_err(|e| ApiError::internal(e.to_string()))?;
let mut last_seq = query.since_seq.unwrap_or(0);
if let Some(last) = backlog.last() {
last_seq = last.seq;
}
let mut live = state.runtime_threads.subscribe_events();
let thread_id = id.clone();
let stream = stream! {
for event in backlog {
let event_name = event.event.clone();
yield Ok(sse_json(&event_name, runtime_event_payload(event)));
}
loop {
let incoming = live.recv().await;
let Ok(event) = incoming else {
break;
};
if event.thread_id != thread_id {
continue;
}
if event.seq <= last_seq {
continue;
}
last_seq = event.seq;
let event_name = event.event.clone();
yield Ok(sse_json(&event_name, runtime_event_payload(event)));
}
};
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keepalive"),
))
}
async fn stream_turn(
State(state): State<RuntimeApiState>,
Json(req): Json<StreamTurnRequest>,
) -> Result<Sse<impl futures_util::Stream<Item = Result<SseEvent, Infallible>>>, ApiError> {
if req.prompt.trim().is_empty() {
return Err(ApiError::bad_request("prompt is required"));
}
let model = req.model.clone().unwrap_or_else(|| {
state
.config
.default_text_model
.clone()
.unwrap_or_else(|| DEFAULT_TEXT_MODEL.to_string())
});
let workspace = req
.workspace
.clone()
.unwrap_or_else(|| state.workspace.clone());
let mode = req.mode.clone().unwrap_or_else(|| "agent".to_string());
let allow_shell = req.allow_shell.unwrap_or(state.config.allow_shell());
let trust_mode = req.trust_mode.unwrap_or(false);
let auto_approve = req.auto_approve.unwrap_or(false);
let prompt = req.prompt;
let thread = state
.runtime_threads
.create_thread(CreateThreadRequest {
model: Some(model.clone()),
workspace: Some(workspace.clone()),
mode: Some(mode.clone()),
allow_shell: Some(allow_shell),
trust_mode: Some(trust_mode),
auto_approve: Some(auto_approve),
archived: true,
system_prompt: None,
})
.await
.map_err(|e| ApiError::internal(format!("Failed to create stream thread: {e}")))?;
let turn = state
.runtime_threads
.start_turn(
&thread.id,
StartTurnRequest {
prompt,
input_summary: None,
model: Some(model.clone()),
mode: Some(mode.clone()),
allow_shell: Some(allow_shell),
trust_mode: Some(trust_mode),
auto_approve: Some(auto_approve),
},
)
.await
.map_err(|e| ApiError::internal(format!("Failed to start stream turn: {e}")))?;
let backlog = state
.runtime_threads
.events_since(&thread.id, None)
.map_err(|e| ApiError::internal(format!("Failed to load stream backlog: {e}")))?;
let mut live = state.runtime_threads.subscribe_events();
let thread_id = thread.id.clone();
let turn_id = turn.id.clone();
let stream = stream! {
yield Ok(sse_json("turn.started", json!({
"thread_id": thread.id,
"turn_id": turn.id,
"model": model,
"mode": mode,
"workspace": workspace,
})));
for event in backlog {
if event.thread_id != thread_id || event.turn_id.as_deref() != Some(&turn_id) {
continue;
}
if let Some(mapped) = map_compat_stream_event(&event) {
yield Ok(mapped);
}
if event.event == "turn.completed" {
yield Ok(sse_json("done", json!({})));
return;
}
}
loop {
let incoming = live.recv().await;
let Ok(event) = incoming else {
yield Ok(sse_json("error", json!({ "message": "event channel closed" })));
break;
};
if event.thread_id != thread_id || event.turn_id.as_deref() != Some(&turn_id) {
continue;
}
if let Some(mapped) = map_compat_stream_event(&event) {
yield Ok(mapped);
}
if event.event == "turn.completed" {
break;
}
}
yield Ok(sse_json("done", json!({})));
};
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keepalive"),
))
}
fn runtime_event_payload(event: crate::runtime_threads::RuntimeEventRecord) -> serde_json::Value {
json!({
"seq": event.seq,
"timestamp": event.timestamp,
"thread_id": event.thread_id,
"turn_id": event.turn_id,
"item_id": event.item_id,
"event": event.event,
"payload": event.payload,
})
}
fn map_compat_stream_event(event: &crate::runtime_threads::RuntimeEventRecord) -> Option<SseEvent> {
let payload = &event.payload;
match event.event.as_str() {
"item.delta" => {
let kind = payload
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or_default();
if kind == "agent_message" {
let content = payload
.get("delta")
.and_then(|v| v.as_str())
.unwrap_or_default();
Some(sse_json("message.delta", json!({ "content": content })))
} else if kind == "tool_call" {
let output = payload
.get("delta")
.and_then(|v| v.as_str())
.unwrap_or_default();
Some(sse_json("tool.progress", json!({ "output": output })))
} else {
None
}
}
"item.started" => {
let tool = payload.get("tool")?;
let id = tool.get("id").cloned().unwrap_or(Value::Null);
let name = tool.get("name").cloned().unwrap_or(Value::Null);
let input = tool.get("input").cloned().unwrap_or(Value::Null);
Some(sse_json(
"tool.started",
json!({
"id": id,
"name": name,
"input": input,
}),
))
}
"item.completed" | "item.failed" => {
let item = payload.get("item")?;
let kind = item
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or_default();
if kind == "tool_call" || kind == "file_change" || kind == "command_execution" {
let id = item.get("id").cloned().unwrap_or(Value::Null);
let success = event.event == "item.completed";
let output = item.get("detail").cloned().unwrap_or_else(|| {
Value::String(
item.get("summary")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string(),
)
});
Some(sse_json(
"tool.completed",
json!({
"id": id,
"success": success,
"output": output,
}),
))
} else if kind == "status" {
let message = item
.get("detail")
.and_then(|v| v.as_str())
.or_else(|| item.get("summary").and_then(|v| v.as_str()))
.unwrap_or_default();
Some(sse_json("status", json!({ "message": message })))
} else if kind == "error" {
let message = item
.get("detail")
.and_then(|v| v.as_str())
.or_else(|| item.get("summary").and_then(|v| v.as_str()))
.unwrap_or_default();
Some(sse_json("error", json!({ "message": message })))
} else {
None
}
}
"approval.required" => Some(sse_json("approval.required", payload.clone())),
"sandbox.denied" => Some(sse_json("sandbox.denied", payload.clone())),
"turn.completed" => {
let usage = payload
.get("turn")
.and_then(|turn| turn.get("usage"))
.cloned()
.unwrap_or(json!(null));
Some(sse_json("turn.completed", json!({ "usage": usage })))
}
_ => None,
}
}
fn sse_json(event: &str, payload: serde_json::Value) -> SseEvent {
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
SseEvent::default().event(event).data(data)
}
fn truncate_text(text: &str, max_chars: usize) -> String {
let char_count = text.chars().count();
if char_count <= max_chars {
return text.to_string();
}
let truncated: String = text.chars().take(max_chars.saturating_sub(3)).collect();
format!("{truncated}...")
}
fn collect_workspace_status(workspace: &std::path::Path) -> WorkspaceStatusResponse {
let mut status = WorkspaceStatusResponse {
workspace: workspace.to_path_buf(),
git_repo: false,
branch: None,
staged: 0,
unstaged: 0,
untracked: 0,
ahead: None,
behind: None,
};
let Some(repo_check) = run_git(workspace, &["rev-parse", "--is-inside-work-tree"]) else {
return status;
};
if repo_check.trim() != "true" {
return status;
}
status.git_repo = true;
status.branch = run_git(workspace, &["rev-parse", "--abbrev-ref", "HEAD"])
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
if let Some(porcelain) = run_git(workspace, &["status", "--porcelain=v1"]) {
for line in porcelain.lines() {
if line.starts_with("??") {
status.untracked += 1;
continue;
}
let chars: Vec<char> = line.chars().collect();
if chars.len() >= 2 {
if chars[0] != ' ' {
status.staged += 1;
}
if chars[1] != ' ' {
status.unstaged += 1;
}
}
}
}
if let Some(counts) = run_git(
workspace,
&["rev-list", "--left-right", "--count", "@{upstream}...HEAD"],
) {
let mut parts = counts.split_whitespace();
if let (Some(behind), Some(ahead)) = (parts.next(), parts.next()) {
status.behind = behind.parse::<u32>().ok();
status.ahead = ahead.parse::<u32>().ok();
}
}
status
}
fn run_git(workspace: &std::path::Path, args: &[&str]) -> Option<String> {
let output = Command::new("git")
.args(args)
.current_dir(workspace)
.output()
.ok()?;
if !output.status.success() {
return None;
}
String::from_utf8(output.stdout).ok()
}
fn resolve_skills_dir(config: &Config, workspace: &std::path::Path) -> PathBuf {
let agents_skills = workspace.join(".agents").join("skills");
if agents_skills.exists() {
return agents_skills;
}
let local_skills = workspace.join("skills");
if local_skills.exists() {
return local_skills;
}
config.skills_dir()
}
fn load_mcp_config_or_default(path: &std::path::Path) -> Result<McpConfig, ApiError> {
if !path.exists() {
return Ok(McpConfig::default());
}
let raw = fs::read_to_string(path).map_err(|e| {
ApiError::internal(format!("Failed to read MCP config {}: {e}", path.display()))
})?;
serde_json::from_str::<McpConfig>(&raw).map_err(|e| {
ApiError::internal(format!(
"Failed to parse MCP config {}: {e}",
path.display()
))
})
}
fn cors_layer() -> CorsLayer {
CorsLayer::new()
.allow_origin([
HeaderValue::from_static("http://localhost:3000"),
HeaderValue::from_static("http://127.0.0.1:3000"),
HeaderValue::from_static("http://localhost:1420"),
HeaderValue::from_static("http://127.0.0.1:1420"),
HeaderValue::from_static("tauri://localhost"),
])
.allow_methods([
Method::GET,
Method::POST,
Method::PATCH,
Method::DELETE,
Method::OPTIONS,
])
.allow_headers(Any)
}
fn map_task_err(err: anyhow::Error) -> ApiError {
let message = err.to_string();
if message.contains("not found") {
ApiError::not_found(message)
} else {
ApiError::bad_request(message)
}
}
fn map_automation_err(err: anyhow::Error) -> ApiError {
let message = err.to_string();
if message.contains("Failed to read automation")
|| message.contains("No such file or directory")
{
ApiError::not_found(message)
} else {
ApiError::bad_request(message)
}
}
fn map_thread_err(err: anyhow::Error) -> ApiError {
let message = err.to_string();
if message.contains("not found") {
ApiError::not_found(message)
} else if message.contains("already has an active turn")
|| message.contains("No active turn")
|| message.contains("is not active")
{
ApiError {
status: StatusCode::CONFLICT,
message,
}
} else {
ApiError::bad_request(message)
}
}
#[derive(Debug, Clone)]
struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
fn bad_request(message: impl Into<String>) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: message.into(),
}
}
fn not_found(message: impl Into<String>) -> Self {
Self {
status: StatusCode::NOT_FOUND,
message: message.into(),
}
}
fn internal(message: impl Into<String>) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: message.into(),
}
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
(
self.status,
Json(json!({
"error": {
"message": self.message,
"status": self.status.as_u16(),
}
})),
)
.into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::events::{Event as EngineEvent, TurnOutcomeStatus};
use crate::core::ops::Op;
use crate::models::Usage;
use crate::runtime_threads::RuntimeEventRecord;
use anyhow::{Context, bail};
use futures_util::StreamExt;
use std::fs;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use tokio::time::sleep;
use uuid::Uuid;
struct MockExecutor;
#[async_trait::async_trait]
impl crate::task_manager::TaskExecutor for MockExecutor {
async fn execute(
&self,
_task: crate::task_manager::ExecutionTask,
events: mpsc::UnboundedSender<crate::task_manager::TaskExecutionEvent>,
cancel: tokio_util::sync::CancellationToken,
) -> crate::task_manager::TaskExecutionResult {
let _ = events.send(crate::task_manager::TaskExecutionEvent::Status {
message: "started".to_string(),
});
sleep(Duration::from_millis(100)).await;
if cancel.is_cancelled() {
return crate::task_manager::TaskExecutionResult {
status: crate::task_manager::TaskStatus::Canceled,
result_text: None,
error: None,
};
}
crate::task_manager::TaskExecutionResult {
status: crate::task_manager::TaskStatus::Completed,
result_text: Some("ok".to_string()),
error: None,
}
}
}
async fn spawn_test_server_with_root(
root: PathBuf,
sessions_dir: PathBuf,
) -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
fs::create_dir_all(&sessions_dir)?;
let manager = TaskManager::start_with_executor(
TaskManagerConfig {
data_dir: root.join("tasks"),
worker_count: 1,
default_workspace: PathBuf::from("."),
default_model: DEFAULT_TEXT_MODEL.to_string(),
default_mode: "agent".to_string(),
allow_shell: false,
trust_mode: false,
max_subagents: 2,
},
Arc::new(MockExecutor),
)
.await?;
let mut config = Config::default();
config.capacity = Some(crate::config::CapacityConfig {
enabled: Some(false),
low_risk_max: None,
medium_risk_max: None,
severe_min_slack: None,
severe_violation_ratio: None,
refresh_cooldown_turns: None,
replan_cooldown_turns: None,
max_replay_per_turn: None,
min_turns_before_guardrail: None,
profile_window: None,
deepseek_v3_2_chat_prior: None,
deepseek_v3_2_reasoner_prior: None,
deepseek_v4_pro_prior: None,
deepseek_v4_flash_prior: None,
fallback_default_prior: None,
});
let runtime_threads: SharedRuntimeThreadManager = Arc::new(RuntimeThreadManager::open(
config,
PathBuf::from("."),
RuntimeThreadManagerConfig::from_task_data_dir(root.join("runtime")),
)?);
let state = RuntimeApiState {
config: Config::default(),
workspace: PathBuf::from("."),
task_manager: manager,
runtime_threads: runtime_threads.clone(),
sessions_dir,
mcp_config_path: root.join("mcp.json"),
automations: Arc::new(Mutex::new(AutomationManager::open(
root.join("automations"),
)?)),
};
let app = build_router(state);
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => return Ok(None),
Err(err) => return Err(err.into()),
};
let addr = listener.local_addr()?;
let handle = tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
Ok(Some((addr, runtime_threads, handle)))
}
async fn spawn_test_server() -> Result<
Option<(
SocketAddr,
SharedRuntimeThreadManager,
tokio::task::JoinHandle<()>,
)>,
> {
let root = std::env::temp_dir().join(format!("deepseek-runtime-api-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
spawn_test_server_with_root(root, sessions_dir).await
}
async fn read_first_sse_frame(resp: reqwest::Response) -> Result<String> {
let mut stream = resp.bytes_stream();
let mut buf = Vec::new();
loop {
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.context("timed out waiting for SSE frame")?
.context("SSE stream ended unexpectedly")??;
buf.extend_from_slice(&next);
let text = String::from_utf8_lossy(&buf);
if let Some(idx) = text.find("\n\n").or_else(|| text.find("\r\n\r\n")) {
return Ok(text[..idx].to_string());
}
if buf.len() > 64 * 1024 {
bail!("SSE frame exceeded 64KB without delimiter");
}
}
}
fn parse_sse_frame(frame: &str) -> Result<(String, serde_json::Value)> {
let mut event_name: Option<String> = None;
let mut data_lines = Vec::new();
for line in frame.lines() {
if let Some(rest) = line.strip_prefix("event:") {
event_name = Some(rest.trim().to_string());
} else if let Some(rest) = line.strip_prefix("data:") {
data_lines.push(rest.trim_start().to_string());
}
}
let event_name = event_name.context("missing SSE event field")?;
let payload = if data_lines.is_empty() {
json!({})
} else {
serde_json::from_str(&data_lines.join("\n"))
.with_context(|| format!("invalid SSE data payload: {}", data_lines.join("\n")))?
};
Ok((event_name, payload))
}
async fn wait_for_terminal_turn_status(
client: &reqwest::Client,
addr: SocketAddr,
thread_id: &str,
turn_id: &str,
timeout: Duration,
) -> Result<String> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let status = detail["turns"]
.as_array()
.and_then(|turns| turns.iter().find(|turn| turn["id"] == turn_id))
.and_then(|turn| turn.get("status"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
if matches!(
status.as_str(),
"completed" | "failed" | "interrupted" | "canceled"
) {
return Ok(status);
}
if tokio::time::Instant::now() >= deadline {
bail!("timed out waiting for terminal turn status for {turn_id}");
}
sleep(Duration::from_millis(25)).await;
}
}
#[tokio::test]
async fn health_and_tasks_endpoints_work() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let health: serde_json::Value = client
.get(format!("http://{addr}/health"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(health["status"], "ok");
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/tasks"))
.json(&json!({ "prompt": "hello task" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let id = created["id"].as_str().expect("task id").to_string();
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/tasks"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed["tasks"]
.as_array()
.is_some_and(|tasks| !tasks.is_empty())
);
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/tasks/{id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["id"], id);
let _cancelled: serde_json::Value = client
.post(format!("http://{addr}/v1/tasks/{id}/cancel"))
.send()
.await?
.error_for_status()?
.json()
.await?;
handle.abort();
Ok(())
}
#[tokio::test]
async fn workspace_and_automation_endpoints_work() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let workspace: serde_json::Value = client
.get(format!("http://{addr}/v1/workspace/status"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(workspace.get("workspace").is_some());
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/automations"))
.json(&json!({
"name": "Smoke automation",
"prompt": "automation smoke test",
"rrule": "FREQ=HOURLY;INTERVAL=2",
"status": "active"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let automation_id = created["id"]
.as_str()
.context("missing automation id")?
.to_string();
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/automations"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed
.as_array()
.is_some_and(|items| items.iter().any(|item| item["id"] == automation_id))
);
let run_now: serde_json::Value = client
.post(format!("http://{addr}/v1/automations/{automation_id}/run"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(run_now["automation_id"], automation_id);
let paused: serde_json::Value = client
.post(format!(
"http://{addr}/v1/automations/{automation_id}/pause"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(paused["status"], "paused");
let resumed: serde_json::Value = client
.post(format!(
"http://{addr}/v1/automations/{automation_id}/resume"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(resumed["status"], "active");
let updated: serde_json::Value = client
.patch(format!("http://{addr}/v1/automations/{automation_id}"))
.json(&json!({
"name": "Smoke automation edited",
"rrule": "FREQ=WEEKLY;BYDAY=MO,WE;BYHOUR=10;BYMINUTE=15"
}))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(updated["name"], "Smoke automation edited");
let runs: serde_json::Value = client
.get(format!(
"http://{addr}/v1/automations/{automation_id}/runs?limit=5"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
runs.as_array().is_some_and(|items| !items.is_empty()),
"expected at least one run entry"
);
let _deleted: serde_json::Value = client
.delete(format!("http://{addr}/v1/automations/{automation_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let missing_status = client
.get(format!("http://{addr}/v1/automations/{automation_id}"))
.send()
.await?
.status();
assert_eq!(missing_status, StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn stream_requires_prompt() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{addr}/v1/stream"))
.json(&json!({ "prompt": "" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn thread_endpoints_expose_lifecycle_contract() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let archived: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "archived": true }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(archived["id"], thread_id);
assert_eq!(archived["archived"], true);
let listed: serde_json::Value = client
.get(format!("http://{addr}/v1/threads"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed
.as_array()
.is_some_and(|threads| threads.iter().all(|t| t["id"] != thread_id))
);
let listed_all: serde_json::Value = client
.get(format!(
"http://{addr}/v1/threads/summary?include_archived=true&limit=100"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert!(
listed_all
.as_array()
.is_some_and(|threads| threads.iter().any(|t| t["id"] == thread_id))
);
let unarchived: serde_json::Value = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({ "archived": false }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(unarchived["archived"], false);
let invalid_patch = client
.patch(format!("http://{addr}/v1/threads/{thread_id}"))
.json(&json!({}))
.send()
.await?;
assert_eq!(invalid_patch.status(), StatusCode::BAD_REQUEST);
let missing_patch = client
.patch(format!("http://{addr}/v1/threads/thr_missing"))
.json(&json!({ "archived": true }))
.send()
.await?;
assert_eq!(missing_patch.status(), StatusCode::NOT_FOUND);
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["thread"]["id"], thread_id);
let resumed: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/resume"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(resumed["id"], thread_id);
let forked: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/fork"))
.send()
.await?
.error_for_status()?
.json()
.await?;
let forked_id = forked["id"].as_str().context("missing forked id")?;
assert_ne!(forked_id, thread_id);
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
while let Some(op) = rx_op.recv().await {
match op {
Op::SendMessage { .. } => {
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_lifecycle".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: "mock reply".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 10,
output_tokens: 5,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
})
.await;
}
Op::CompactContext => {
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
})
.await;
}
_ => {}
}
}
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "thread endpoint test" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ = wait_for_terminal_turn_status(
&client,
addr,
&thread_id,
&turn_id,
Duration::from_secs(2),
)
.await?;
let steer_resp = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/steer"
))
.json(&json!({ "prompt": "late steer" }))
.send()
.await?;
assert_eq!(steer_resp.status(), StatusCode::CONFLICT);
let interrupt_resp = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/interrupt"
))
.send()
.await?;
assert_eq!(interrupt_resp.status(), StatusCode::CONFLICT);
let compact_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/compact"))
.json(&json!({ "reason": "test manual compact" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(compact_start["thread"]["id"], thread_id);
let events_resp = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let content_type = events_resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_string();
assert!(content_type.starts_with("text/event-stream"));
let chunk_text = read_first_sse_frame(events_resp).await?;
assert!(
chunk_text.contains("event:"),
"expected SSE event chunk, got: {chunk_text}"
);
handle.abort();
Ok(())
}
#[tokio::test]
async fn events_endpoint_respects_since_seq_cursor() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_cursor".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 5,
output_tokens: 3,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
})
.await;
});
let started: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "cursor replay test" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = started["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ = wait_for_terminal_turn_status(
&client,
addr,
&thread_id,
&turn_id,
Duration::from_secs(2),
)
.await?;
let resp_a = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let frame_a = read_first_sse_frame(resp_a).await?;
let (_event_a, payload_a) = parse_sse_frame(&frame_a)?;
let seq_a = payload_a
.get("seq")
.and_then(Value::as_u64)
.context("missing seq in first replay frame")?;
let resp_b = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq={seq_a}"
))
.send()
.await?
.error_for_status()?;
let frame_b = read_first_sse_frame(resp_b).await?;
let (_event_b, payload_b) = parse_sse_frame(&frame_b)?;
let seq_b = payload_b
.get("seq")
.and_then(Value::as_u64)
.context("missing seq in second replay frame")?;
assert!(
seq_b > seq_a,
"expected seq after cursor: {seq_b} <= {seq_a}"
);
assert_eq!(payload_b["thread_id"], thread_id);
handle.abort();
Ok(())
}
#[tokio::test]
async fn steer_and_interrupt_endpoints_work_on_active_turn() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let mut rx_steer = harness.rx_steer;
let tx_event = harness.tx_event;
let cancel_token = harness.cancel_token;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "engine_turn_api".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
if let Some(steer_text) = rx_steer.recv().await {
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: format!("steer:{steer_text}"),
})
.await;
}
cancel_token.cancelled().await;
sleep(Duration::from_millis(60)).await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 2,
output_tokens: 1,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
})
.await;
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "active controls" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let steer_resp: serde_json::Value = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/steer"
))
.json(&json!({ "prompt": "please steer" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(steer_resp["id"], turn_id);
assert_eq!(steer_resp["steer_count"], 1);
let interrupt_resp: serde_json::Value = client
.post(format!(
"http://{addr}/v1/threads/{thread_id}/turns/{turn_id}/interrupt"
))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(interrupt_resp["id"], turn_id);
let terminal = wait_for_terminal_turn_status(
&client,
addr,
&thread_id,
&turn_id,
Duration::from_secs(3),
)
.await?;
assert_eq!(terminal, "interrupted");
let events = runtime_threads.events_since(&thread_id, None)?;
assert!(events.iter().any(|ev| ev.event == "turn.steered"));
assert!(
events
.iter()
.any(|ev| ev.event == "turn.interrupt_requested")
);
assert!(events.iter().any(|ev| {
ev.event == "turn.completed"
&& ev
.payload
.get("turn")
.and_then(|turn| turn.get("status"))
.and_then(Value::as_str)
== Some("interrupted")
}));
handle.abort();
Ok(())
}
#[tokio::test]
async fn stream_compat_mapping_handles_expected_runtime_events() -> Result<()> {
let agent_delta = RuntimeEventRecord {
schema_version: 1,
seq: 1,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_test".to_string()),
event: "item.delta".to_string(),
payload: json!({
"kind": "agent_message",
"delta": "hello",
}),
};
let mapped = map_compat_stream_event(&agent_delta).context("missing mapped SSE event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: message.delta"));
assert!(text.contains("\"content\":\"hello\""));
let tool_start = RuntimeEventRecord {
schema_version: 1,
seq: 2,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_tool".to_string()),
event: "item.started".to_string(),
payload: json!({
"tool": { "id": "tool_1", "name": "exec_shell", "input": { "cmd": "pwd" } }
}),
};
let mapped = map_compat_stream_event(&tool_start).context("missing tool.started event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: tool.started"));
let tool_done = RuntimeEventRecord {
schema_version: 1,
seq: 3,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: Some("item_tool".to_string()),
event: "item.completed".to_string(),
payload: json!({
"item": {
"id": "item_tool",
"kind": "tool_call",
"summary": "ok",
"detail": "done"
}
}),
};
let mapped = map_compat_stream_event(&tool_done).context("missing tool.completed event")?;
let stream = async_stream::stream! {
yield Ok::<_, Infallible>(mapped);
};
let body =
axum::body::to_bytes(Sse::new(stream).into_response().into_body(), usize::MAX).await?;
let text = String::from_utf8_lossy(&body);
assert!(text.contains("event: tool.completed"));
assert!(text.contains("\"success\":true"));
let unknown = RuntimeEventRecord {
schema_version: 1,
seq: 4,
timestamp: chrono::Utc::now(),
thread_id: "thr_test".to_string(),
turn_id: Some("turn_test".to_string()),
item_id: None,
event: "item.delta".to_string(),
payload: json!({
"kind": "context_compaction",
"delta": "ignored",
}),
};
assert!(map_compat_stream_event(&unknown).is_none());
Ok(())
}
#[tokio::test]
async fn stream_endpoint_remains_backward_compatible() -> Result<()> {
let Some((addr, runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let created: serde_json::Value = client
.post(format!("http://{addr}/v1/threads"))
.json(&json!({}))
.send()
.await?
.error_for_status()?
.json()
.await?;
let thread_id = created["id"]
.as_str()
.context("missing thread id")?
.to_string();
let harness = crate::core::engine::mock_engine_handle();
runtime_threads
.install_test_engine(&thread_id, harness.handle.clone())
.await?;
let mut rx_op = harness.rx_op;
let tx_event = harness.tx_event;
tokio::spawn(async move {
if !matches!(rx_op.recv().await, Some(Op::SendMessage { .. })) {
return;
}
let _ = tx_event
.send(EngineEvent::TurnStarted {
turn_id: "mock_stream".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageStarted { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::MessageDelta {
index: 0,
content: "streamed".to_string(),
})
.await;
let _ = tx_event
.send(EngineEvent::MessageComplete { index: 0 })
.await;
let _ = tx_event
.send(EngineEvent::TurnComplete {
usage: Usage {
input_tokens: 4,
output_tokens: 2,
..Usage::default()
},
status: TurnOutcomeStatus::Completed,
error: None,
})
.await;
});
let turn_start: serde_json::Value = client
.post(format!("http://{addr}/v1/threads/{thread_id}/turns"))
.json(&json!({ "prompt": "compatibility stream" }))
.send()
.await?
.error_for_status()?
.json()
.await?;
let turn_id = turn_start["turn"]["id"]
.as_str()
.context("missing turn id")?
.to_string();
let _ = wait_for_terminal_turn_status(
&client,
addr,
&thread_id,
&turn_id,
Duration::from_secs(2),
)
.await?;
let events = runtime_threads.events_since(&thread_id, None)?;
assert!(
events.iter().any(|ev| ev.event == "turn.started"),
"expected turn.started event"
);
assert!(
events.iter().any(|ev| ev.event == "turn.completed"),
"expected turn.completed event"
);
let events_resp = client
.get(format!(
"http://{addr}/v1/threads/{thread_id}/events?since_seq=0"
))
.send()
.await?
.error_for_status()?;
let content_type = events_resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_string();
assert!(content_type.starts_with("text/event-stream"));
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_get_returns_404_for_missing_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{addr}/v1/sessions/nonexistent_id"))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_endpoints_reject_invalid_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let get_resp = client
.get(format!("http://{addr}/v1/sessions/invalid%20id"))
.send()
.await?;
assert_eq!(get_resp.status(), StatusCode::BAD_REQUEST);
let resume_resp = client
.post(format!(
"http://{addr}/v1/sessions/invalid%20id/resume-thread"
))
.json(&json!({}))
.send()
.await?;
assert_eq!(resume_resp.status(), StatusCode::BAD_REQUEST);
let delete_resp = client
.delete(format!("http://{addr}/v1/sessions/invalid%20id"))
.send()
.await?;
assert_eq!(delete_resp.status(), StatusCode::BAD_REQUEST);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_resume_thread_returns_404_for_missing_session() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://{addr}/v1/sessions/nonexistent_session/resume-thread"
))
.json(&json!({}))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_resume_thread_creates_thread_from_saved_session() -> Result<()> {
let root = std::env::temp_dir().join(format!("deepseek-session-resume-{}", Uuid::new_v4()));
let sessions_dir = root.join("sessions");
fs::create_dir_all(&sessions_dir)?;
let session_id = "sess_test_resume";
let session = json!({
"schema_version": 1,
"metadata": {
"id": session_id,
"title": "Test resume session",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:10:00Z",
"message_count": 2,
"total_tokens": 100,
"model": "deepseek-v4-pro",
"workspace": "/tmp/test",
"mode": "agent"
},
"messages": [
{
"role": "user",
"content": [{ "type": "text", "text": "Hello, world!" }]
},
{
"role": "assistant",
"content": [{ "type": "text", "text": "Hello! How can I help you?" }]
}
],
"system_prompt": null
});
fs::write(
sessions_dir.join(format!("{session_id}.json")),
serde_json::to_string_pretty(&session)?,
)?;
let Some((addr, _runtime_threads, handle)) =
spawn_test_server_with_root(root.clone(), sessions_dir.clone()).await?
else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://{addr}/v1/sessions/{session_id}/resume-thread"
))
.json(&json!({ "model": "deepseek-v4-pro" }))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::CREATED);
let resumed: serde_json::Value = resp.json().await?;
assert_eq!(resumed["session_id"], session_id);
assert_eq!(resumed["message_count"], 2);
let thread_id = resumed["thread_id"]
.as_str()
.context("missing resumed thread id")?;
let detail: serde_json::Value = client
.get(format!("http://{addr}/v1/threads/{thread_id}"))
.send()
.await?
.error_for_status()?
.json()
.await?;
assert_eq!(detail["thread"]["id"], thread_id);
assert_eq!(detail["turns"].as_array().map_or(0, Vec::len), 1);
assert_eq!(detail["items"].as_array().map_or(0, Vec::len), 2);
handle.abort();
Ok(())
}
#[tokio::test]
async fn session_delete_returns_404_for_missing_id() -> Result<()> {
let Some((addr, _runtime_threads, handle)) = spawn_test_server().await? else {
return Ok(());
};
let client = reqwest::Client::new();
let resp = client
.delete(format!("http://{addr}/v1/sessions/nonexistent-id"))
.send()
.await?;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
handle.abort();
Ok(())
}
}