use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use axum::{
Json, Router,
extract::{DefaultBodyLimit, Path, Query, State, WebSocketUpgrade},
http::{StatusCode, header},
middleware,
response::{
Html, IntoResponse,
sse::{Event, KeepAlive, Sse},
},
routing::{get, post},
};
use serde::Deserialize;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tower_http::cors::{AllowHeaders, CorsLayer};
use uuid::Uuid;
use crate::agent::SessionManager;
use crate::channels::IncomingMessage;
use crate::channels::web::auth::{AuthState, auth_middleware};
use crate::channels::web::log_layer::LogBroadcaster;
use crate::channels::web::sse::SseManager;
use crate::channels::web::types::*;
use crate::db::Database;
use crate::extensions::ExtensionManager;
use crate::orchestrator::job_manager::ContainerJobManager;
use crate::tools::ToolRegistry;
use crate::workspace::Workspace;
pub type PromptQueue = Arc<
tokio::sync::Mutex<
std::collections::HashMap<
uuid::Uuid,
std::collections::VecDeque<crate::orchestrator::api::PendingPrompt>,
>,
>,
>;
pub struct RateLimiter {
remaining: AtomicU64,
window_start: AtomicU64,
max_requests: u64,
window_secs: u64,
}
impl RateLimiter {
pub fn new(max_requests: u64, window_secs: u64) -> Self {
Self {
remaining: AtomicU64::new(max_requests),
window_start: AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
max_requests,
window_secs,
}
}
pub fn check(&self) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let window = self.window_start.load(Ordering::Relaxed);
if now.saturating_sub(window) >= self.window_secs {
self.window_start.store(now, Ordering::Relaxed);
self.remaining
.store(self.max_requests - 1, Ordering::Relaxed);
return true;
}
loop {
let current = self.remaining.load(Ordering::Relaxed);
if current == 0 {
return false;
}
if self
.remaining
.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return true;
}
}
}
}
pub struct GatewayState {
pub msg_tx: tokio::sync::RwLock<Option<mpsc::Sender<IncomingMessage>>>,
pub sse: SseManager,
pub workspace: Option<Arc<Workspace>>,
pub session_manager: Option<Arc<SessionManager>>,
pub log_broadcaster: Option<Arc<LogBroadcaster>>,
pub extension_manager: Option<Arc<ExtensionManager>>,
pub tool_registry: Option<Arc<ToolRegistry>>,
pub store: Option<Arc<dyn Database>>,
pub job_manager: Option<Arc<ContainerJobManager>>,
pub prompt_queue: Option<PromptQueue>,
pub user_id: String,
pub shutdown_tx: tokio::sync::RwLock<Option<oneshot::Sender<()>>>,
pub ws_tracker: Option<Arc<crate::channels::web::ws::WsConnectionTracker>>,
pub llm_provider: Option<Arc<dyn crate::llm::LlmProvider>>,
pub chat_rate_limiter: RateLimiter,
}
pub async fn start_server(
addr: SocketAddr,
state: Arc<GatewayState>,
auth_token: String,
) -> Result<SocketAddr, crate::error::ChannelError> {
let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
crate::error::ChannelError::StartupFailed {
name: "gateway".to_string(),
reason: format!("Failed to bind to {}: {}", addr, e),
}
})?;
let bound_addr =
listener
.local_addr()
.map_err(|e| crate::error::ChannelError::StartupFailed {
name: "gateway".to_string(),
reason: format!("Failed to get local addr: {}", e),
})?;
let public = Router::new().route("/api/health", get(health_handler));
let auth_state = AuthState { token: auth_token };
let protected = Router::new()
.route("/api/chat/send", post(chat_send_handler))
.route("/api/chat/approval", post(chat_approval_handler))
.route("/api/chat/auth-token", post(chat_auth_token_handler))
.route("/api/chat/auth-cancel", post(chat_auth_cancel_handler))
.route("/api/chat/events", get(chat_events_handler))
.route("/api/chat/ws", get(chat_ws_handler))
.route("/api/chat/history", get(chat_history_handler))
.route("/api/chat/threads", get(chat_threads_handler))
.route("/api/chat/thread/new", post(chat_new_thread_handler))
.route("/api/memory/tree", get(memory_tree_handler))
.route("/api/memory/list", get(memory_list_handler))
.route("/api/memory/read", get(memory_read_handler))
.route("/api/memory/write", post(memory_write_handler))
.route("/api/memory/search", post(memory_search_handler))
.route("/api/jobs", get(jobs_list_handler))
.route("/api/jobs/summary", get(jobs_summary_handler))
.route("/api/jobs/{id}", get(jobs_detail_handler))
.route("/api/jobs/{id}/cancel", post(jobs_cancel_handler))
.route("/api/jobs/{id}/restart", post(jobs_restart_handler))
.route("/api/jobs/{id}/prompt", post(jobs_prompt_handler))
.route("/api/jobs/{id}/events", get(jobs_events_handler))
.route("/api/jobs/{id}/files/list", get(job_files_list_handler))
.route("/api/jobs/{id}/files/read", get(job_files_read_handler))
.route("/api/logs/events", get(logs_events_handler))
.route("/api/extensions", get(extensions_list_handler))
.route("/api/extensions/tools", get(extensions_tools_handler))
.route("/api/extensions/install", post(extensions_install_handler))
.route(
"/api/extensions/{name}/activate",
post(extensions_activate_handler),
)
.route(
"/api/extensions/{name}/remove",
post(extensions_remove_handler),
)
.route("/api/routines", get(routines_list_handler))
.route("/api/routines/summary", get(routines_summary_handler))
.route("/api/routines/{id}", get(routines_detail_handler))
.route("/api/routines/{id}/trigger", post(routines_trigger_handler))
.route("/api/routines/{id}/toggle", post(routines_toggle_handler))
.route(
"/api/routines/{id}",
axum::routing::delete(routines_delete_handler),
)
.route("/api/routines/{id}/runs", get(routines_runs_handler))
.route("/api/settings", get(settings_list_handler))
.route("/api/settings/export", get(settings_export_handler))
.route("/api/settings/import", post(settings_import_handler))
.route("/api/settings/{key}", get(settings_get_handler))
.route(
"/api/settings/{key}",
axum::routing::put(settings_set_handler),
)
.route(
"/api/settings/{key}",
axum::routing::delete(settings_delete_handler),
)
.route("/api/gateway/status", get(gateway_status_handler))
.route(
"/v1/chat/completions",
post(super::openai_compat::chat_completions_handler),
)
.route("/v1/models", get(super::openai_compat::models_handler))
.route_layer(middleware::from_fn_with_state(
auth_state.clone(),
auth_middleware,
));
let statics = Router::new()
.route("/", get(index_handler))
.route("/style.css", get(css_handler))
.route("/app.js", get(js_handler));
let projects = Router::new()
.route("/projects/{project_id}", get(project_redirect_handler))
.route("/projects/{project_id}/", get(project_index_handler))
.route("/projects/{project_id}/{*path}", get(project_file_handler))
.route_layer(middleware::from_fn_with_state(
auth_state.clone(),
auth_middleware,
));
let cors = CorsLayer::new()
.allow_origin([
format!("http://{}:{}", addr.ip(), addr.port())
.parse()
.expect("valid origin"),
format!("http://localhost:{}", addr.port())
.parse()
.expect("valid origin"),
])
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
])
.allow_headers(AllowHeaders::list([
header::CONTENT_TYPE,
header::AUTHORIZATION,
]))
.allow_credentials(true);
let app = Router::new()
.merge(public)
.merge(statics)
.merge(projects)
.merge(protected)
.layer(cors)
.layer(DefaultBodyLimit::max(1024 * 1024)) .with_state(state.clone());
let (shutdown_tx, shutdown_rx) = oneshot::channel();
*state.shutdown_tx.write().await = Some(shutdown_tx);
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
tracing::info!("Web gateway shutting down");
})
.await
{
tracing::error!("Web gateway server error: {}", e);
}
});
Ok(bound_addr)
}
async fn index_handler() -> Html<&'static str> {
Html(include_str!("static/index.html"))
}
async fn css_handler() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "text/css")],
include_str!("static/style.css"),
)
}
async fn js_handler() -> impl IntoResponse {
(
[(header::CONTENT_TYPE, "application/javascript")],
include_str!("static/app.js"),
)
}
async fn health_handler() -> Json<HealthResponse> {
Json(HealthResponse {
status: "healthy",
channel: "gateway",
})
}
async fn chat_send_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<SendMessageRequest>,
) -> Result<(StatusCode, Json<SendMessageResponse>), (StatusCode, String)> {
if !state.chat_rate_limiter.check() {
return Err((
StatusCode::TOO_MANY_REQUESTS,
"Rate limit exceeded. Try again shortly.".to_string(),
));
}
let mut msg = IncomingMessage::new("gateway", &state.user_id, &req.content);
if let Some(ref thread_id) = req.thread_id {
msg = msg.with_thread(thread_id);
msg = msg.with_metadata(serde_json::json!({"thread_id": thread_id}));
}
let msg_id = msg.id;
let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
tx.send(msg).await.map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"Channel closed".to_string(),
)
})?;
Ok((
StatusCode::ACCEPTED,
Json(SendMessageResponse {
message_id: msg_id,
status: "accepted",
}),
))
}
async fn chat_approval_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<ApprovalRequest>,
) -> Result<(StatusCode, Json<SendMessageResponse>), (StatusCode, String)> {
let (approved, always) = match req.action.as_str() {
"approve" => (true, false),
"always" => (true, true),
"deny" => (false, false),
other => {
return Err((
StatusCode::BAD_REQUEST,
format!("Unknown action: {}", other),
));
}
};
let request_id = Uuid::parse_str(&req.request_id).map_err(|_| {
(
StatusCode::BAD_REQUEST,
"Invalid request_id (expected UUID)".to_string(),
)
})?;
let approval = crate::agent::submission::Submission::ExecApproval {
request_id,
approved,
always,
};
let content = serde_json::to_string(&approval).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to serialize approval: {}", e),
)
})?;
let mut msg = IncomingMessage::new("gateway", &state.user_id, content);
if let Some(ref thread_id) = req.thread_id {
msg = msg.with_thread(thread_id);
}
let msg_id = msg.id;
let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
tx.send(msg).await.map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"Channel closed".to_string(),
)
})?;
Ok((
StatusCode::ACCEPTED,
Json(SendMessageResponse {
message_id: msg_id,
status: "accepted",
}),
))
}
async fn chat_auth_token_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<AuthTokenRequest>,
) -> Result<Json<ActionResponse>, (StatusCode, String)> {
let ext_mgr = state.extension_manager.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Extension manager not available".to_string(),
))?;
let result = ext_mgr
.auth(&req.extension_name, Some(&req.token))
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
if result.status == "authenticated" {
let msg = match ext_mgr.activate(&req.extension_name).await {
Ok(r) => format!(
"{} authenticated ({} tools loaded)",
req.extension_name,
r.tools_loaded.len()
),
Err(e) => format!(
"{} authenticated but activation failed: {}",
req.extension_name, e
),
};
clear_auth_mode(&state).await;
state.sse.broadcast(SseEvent::AuthCompleted {
extension_name: req.extension_name,
success: true,
message: msg.clone(),
});
Ok(Json(ActionResponse::ok(msg)))
} else {
state.sse.broadcast(SseEvent::AuthRequired {
extension_name: req.extension_name.clone(),
instructions: result.instructions.clone(),
auth_url: result.auth_url.clone(),
setup_url: result.setup_url.clone(),
});
Ok(Json(ActionResponse::fail(
result
.instructions
.unwrap_or_else(|| "Invalid token".to_string()),
)))
}
}
async fn chat_auth_cancel_handler(
State(state): State<Arc<GatewayState>>,
Json(_req): Json<AuthCancelRequest>,
) -> Result<Json<ActionResponse>, (StatusCode, String)> {
clear_auth_mode(&state).await;
Ok(Json(ActionResponse::ok("Auth cancelled")))
}
pub async fn clear_auth_mode(state: &GatewayState) {
if let Some(ref sm) = state.session_manager {
let session = sm.get_or_create_session(&state.user_id).await;
let mut sess = session.lock().await;
if let Some(thread_id) = sess.active_thread
&& let Some(thread) = sess.threads.get_mut(&thread_id)
{
thread.pending_auth = None;
}
}
}
async fn chat_events_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
state.sse.subscribe().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Too many connections".to_string(),
))
}
async fn chat_ws_handler(
headers: axum::http::HeaderMap,
ws: WebSocketUpgrade,
State(state): State<Arc<GatewayState>>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let origin = headers
.get("origin")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| {
(
StatusCode::FORBIDDEN,
"WebSocket Origin header required".to_string(),
)
})?;
let host = origin
.strip_prefix("http://")
.or_else(|| origin.strip_prefix("https://"))
.and_then(|rest| rest.split(':').next()?.split('/').next())
.unwrap_or("");
let is_local = matches!(host, "localhost" | "127.0.0.1" | "[::1]");
if !is_local {
return Err((
StatusCode::FORBIDDEN,
"WebSocket origin not allowed".to_string(),
));
}
Ok(ws.on_upgrade(move |socket| crate::channels::web::ws::handle_ws_connection(socket, state)))
}
#[derive(Deserialize)]
struct HistoryQuery {
thread_id: Option<String>,
limit: Option<usize>,
before: Option<String>,
}
async fn chat_history_handler(
State(state): State<Arc<GatewayState>>,
Query(query): Query<HistoryQuery>,
) -> Result<Json<HistoryResponse>, (StatusCode, String)> {
let session_manager = state.session_manager.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Session manager not available".to_string(),
))?;
let session = session_manager.get_or_create_session(&state.user_id).await;
let sess = session.lock().await;
let limit = query.limit.unwrap_or(50);
let before_cursor = query
.before
.as_deref()
.map(|s| {
chrono::DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&chrono::Utc))
.map_err(|_| {
(
StatusCode::BAD_REQUEST,
"Invalid 'before' timestamp".to_string(),
)
})
})
.transpose()?;
let thread_id = if let Some(ref tid) = query.thread_id {
Uuid::parse_str(tid)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid thread_id".to_string()))?
} else {
sess.active_thread
.ok_or((StatusCode::NOT_FOUND, "No active thread".to_string()))?
};
if query.thread_id.is_some()
&& let Some(ref store) = state.store
{
let owned = store
.conversation_belongs_to_user(thread_id, &state.user_id)
.await
.unwrap_or(false);
if !owned && !sess.threads.contains_key(&thread_id) {
return Err((StatusCode::NOT_FOUND, "Thread not found".to_string()));
}
}
if before_cursor.is_some()
&& let Some(ref store) = state.store
{
let (messages, has_more) = store
.list_conversation_messages_paginated(thread_id, before_cursor, limit as i64)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let oldest_timestamp = messages.first().map(|m| m.created_at.to_rfc3339());
let turns = build_turns_from_db_messages(&messages);
return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more,
oldest_timestamp,
}));
}
if let Some(thread) = sess.threads.get(&thread_id)
&& !thread.turns.is_empty()
{
let turns: Vec<TurnInfo> = thread
.turns
.iter()
.map(|t| TurnInfo {
turn_number: t.turn_number,
user_input: t.user_input.clone(),
response: t.response.clone(),
state: format!("{:?}", t.state),
started_at: t.started_at.to_rfc3339(),
completed_at: t.completed_at.map(|dt| dt.to_rfc3339()),
tool_calls: t
.tool_calls
.iter()
.map(|tc| ToolCallInfo {
name: tc.name.clone(),
has_result: tc.result.is_some(),
has_error: tc.error.is_some(),
})
.collect(),
})
.collect();
return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more: false,
oldest_timestamp: None,
}));
}
if let Some(ref store) = state.store {
let (messages, has_more) = store
.list_conversation_messages_paginated(thread_id, None, limit as i64)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
if !messages.is_empty() {
let oldest_timestamp = messages.first().map(|m| m.created_at.to_rfc3339());
let turns = build_turns_from_db_messages(&messages);
return Ok(Json(HistoryResponse {
thread_id,
turns,
has_more,
oldest_timestamp,
}));
}
}
Ok(Json(HistoryResponse {
thread_id,
turns: Vec::new(),
has_more: false,
oldest_timestamp: None,
}))
}
fn build_turns_from_db_messages(messages: &[crate::history::ConversationMessage]) -> Vec<TurnInfo> {
let mut turns = Vec::new();
let mut turn_number = 0;
let mut iter = messages.iter().peekable();
while let Some(msg) = iter.next() {
if msg.role == "user" {
let mut turn = TurnInfo {
turn_number,
user_input: msg.content.clone(),
response: None,
state: "Completed".to_string(),
started_at: msg.created_at.to_rfc3339(),
completed_at: None,
tool_calls: Vec::new(),
};
if let Some(next) = iter.peek()
&& next.role == "assistant"
{
let assistant_msg = iter.next().expect("peeked");
turn.response = Some(assistant_msg.content.clone());
turn.completed_at = Some(assistant_msg.created_at.to_rfc3339());
}
if turn.response.is_none() {
turn.state = "Failed".to_string();
}
turns.push(turn);
turn_number += 1;
}
}
turns
}
async fn chat_threads_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<ThreadListResponse>, (StatusCode, String)> {
let session_manager = state.session_manager.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Session manager not available".to_string(),
))?;
let session = session_manager.get_or_create_session(&state.user_id).await;
let sess = session.lock().await;
if let Some(ref store) = state.store {
let assistant_id = store
.get_or_create_assistant_conversation(&state.user_id, "gateway")
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
if let Ok(summaries) = store
.list_conversations_with_preview(&state.user_id, "gateway", 50)
.await
{
let mut assistant_thread = None;
let mut threads = Vec::new();
for s in &summaries {
let info = ThreadInfo {
id: s.id,
state: "Idle".to_string(),
turn_count: (s.message_count / 2).max(0) as usize,
created_at: s.started_at.to_rfc3339(),
updated_at: s.last_activity.to_rfc3339(),
title: s.title.clone(),
thread_type: s.thread_type.clone(),
};
if s.id == assistant_id {
assistant_thread = Some(info);
} else {
threads.push(info);
}
}
if assistant_thread.is_none() {
assistant_thread = Some(ThreadInfo {
id: assistant_id,
state: "Idle".to_string(),
turn_count: 0,
created_at: chrono::Utc::now().to_rfc3339(),
updated_at: chrono::Utc::now().to_rfc3339(),
title: None,
thread_type: Some("assistant".to_string()),
});
}
return Ok(Json(ThreadListResponse {
assistant_thread,
threads,
active_thread: sess.active_thread,
}));
}
}
let threads: Vec<ThreadInfo> = sess
.threads
.values()
.map(|t| ThreadInfo {
id: t.id,
state: format!("{:?}", t.state),
turn_count: t.turns.len(),
created_at: t.created_at.to_rfc3339(),
updated_at: t.updated_at.to_rfc3339(),
title: None,
thread_type: None,
})
.collect();
Ok(Json(ThreadListResponse {
assistant_thread: None,
threads,
active_thread: sess.active_thread,
}))
}
async fn chat_new_thread_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<ThreadInfo>, (StatusCode, String)> {
let session_manager = state.session_manager.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Session manager not available".to_string(),
))?;
let session = session_manager.get_or_create_session(&state.user_id).await;
let mut sess = session.lock().await;
let thread = sess.create_thread();
let thread_id = thread.id;
let info = ThreadInfo {
id: thread.id,
state: format!("{:?}", thread.state),
turn_count: thread.turns.len(),
created_at: thread.created_at.to_rfc3339(),
updated_at: thread.updated_at.to_rfc3339(),
title: None,
thread_type: Some("thread".to_string()),
};
if let Some(ref store) = state.store {
let store = Arc::clone(store);
let user_id = state.user_id.clone();
tokio::spawn(async move {
if let Err(e) = store
.ensure_conversation(thread_id, "gateway", &user_id, None)
.await
{
tracing::warn!("Failed to persist new thread: {}", e);
}
let metadata_val = serde_json::json!("thread");
if let Err(e) = store
.update_conversation_metadata_field(thread_id, "thread_type", &metadata_val)
.await
{
tracing::warn!("Failed to set thread_type metadata: {}", e);
}
});
}
Ok(Json(info))
}
#[derive(Deserialize)]
struct TreeQuery {
#[allow(dead_code)]
depth: Option<usize>,
}
async fn memory_tree_handler(
State(state): State<Arc<GatewayState>>,
Query(_query): Query<TreeQuery>,
) -> Result<Json<MemoryTreeResponse>, (StatusCode, String)> {
let workspace = state.workspace.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Workspace not available".to_string(),
))?;
let all_paths = workspace
.list_all()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut entries: Vec<TreeEntry> = Vec::new();
let mut seen_dirs: std::collections::HashSet<String> = std::collections::HashSet::new();
for path in &all_paths {
let parts: Vec<&str> = path.split('/').collect();
for i in 0..parts.len().saturating_sub(1) {
let dir_path = parts[..=i].join("/");
if seen_dirs.insert(dir_path.clone()) {
entries.push(TreeEntry {
path: dir_path,
is_dir: true,
});
}
}
entries.push(TreeEntry {
path: path.clone(),
is_dir: false,
});
}
entries.sort_by(|a, b| a.path.cmp(&b.path));
Ok(Json(MemoryTreeResponse { entries }))
}
#[derive(Deserialize)]
struct ListQuery {
path: Option<String>,
}
async fn memory_list_handler(
State(state): State<Arc<GatewayState>>,
Query(query): Query<ListQuery>,
) -> Result<Json<MemoryListResponse>, (StatusCode, String)> {
let workspace = state.workspace.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Workspace not available".to_string(),
))?;
let path = query.path.as_deref().unwrap_or("");
let entries = workspace
.list(path)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let list_entries: Vec<ListEntry> = entries
.iter()
.map(|e| ListEntry {
name: e.path.rsplit('/').next().unwrap_or(&e.path).to_string(),
path: e.path.clone(),
is_dir: e.is_directory,
updated_at: e.updated_at.map(|dt| dt.to_rfc3339()),
})
.collect();
Ok(Json(MemoryListResponse {
path: path.to_string(),
entries: list_entries,
}))
}
#[derive(Deserialize)]
struct ReadQuery {
path: String,
}
async fn memory_read_handler(
State(state): State<Arc<GatewayState>>,
Query(query): Query<ReadQuery>,
) -> Result<Json<MemoryReadResponse>, (StatusCode, String)> {
let workspace = state.workspace.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Workspace not available".to_string(),
))?;
let doc = workspace
.read(&query.path)
.await
.map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
Ok(Json(MemoryReadResponse {
path: query.path,
content: doc.content,
updated_at: Some(doc.updated_at.to_rfc3339()),
}))
}
async fn memory_write_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<MemoryWriteRequest>,
) -> Result<Json<MemoryWriteResponse>, (StatusCode, String)> {
let workspace = state.workspace.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Workspace not available".to_string(),
))?;
workspace
.write(&req.path, &req.content)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(MemoryWriteResponse {
path: req.path,
status: "written",
}))
}
async fn memory_search_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<MemorySearchRequest>,
) -> Result<Json<MemorySearchResponse>, (StatusCode, String)> {
let workspace = state.workspace.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Workspace not available".to_string(),
))?;
let limit = req.limit.unwrap_or(10);
let results = workspace
.search(&req.query, limit)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let hits: Vec<SearchHit> = results
.iter()
.map(|r| SearchHit {
path: r.document_id.to_string(),
content: r.content.clone(),
score: r.score as f64,
})
.collect();
Ok(Json(MemorySearchResponse { results: hits }))
}
async fn jobs_list_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<JobListResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let sandbox_jobs = store
.list_sandbox_jobs_for_user(&state.user_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut jobs: Vec<JobInfo> = sandbox_jobs
.iter()
.filter(|j| j.user_id == state.user_id)
.map(|j| {
let ui_state = match j.status.as_str() {
"creating" => "pending",
"running" => "in_progress",
s => s,
};
JobInfo {
id: j.id,
title: j.task.clone(),
state: ui_state.to_string(),
user_id: j.user_id.clone(),
created_at: j.created_at.to_rfc3339(),
started_at: j.started_at.map(|dt| dt.to_rfc3339()),
}
})
.collect();
jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(Json(JobListResponse { jobs }))
}
async fn jobs_summary_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<JobSummaryResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let s = store
.sandbox_job_summary_for_user(&state.user_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(JobSummaryResponse {
total: s.total,
pending: s.creating,
in_progress: s.running,
completed: s.completed,
failed: s.failed + s.interrupted,
stuck: 0,
}))
}
async fn jobs_detail_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<JobDetailResponse>, (StatusCode, String)> {
let job_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
if let Some(ref store) = state.store
&& let Ok(Some(job)) = store.get_sandbox_job(job_id).await
{
if job.user_id != state.user_id {
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
let browse_id = std::path::Path::new(&job.project_dir)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| job.id.to_string());
let ui_state = match job.status.as_str() {
"creating" => "pending",
"running" => "in_progress",
s => s,
};
let elapsed_secs = job.started_at.map(|start| {
let end = job.completed_at.unwrap_or_else(chrono::Utc::now);
(end - start).num_seconds().max(0) as u64
});
let mut transitions = Vec::new();
if let Some(started) = job.started_at {
transitions.push(TransitionInfo {
from: "creating".to_string(),
to: "running".to_string(),
timestamp: started.to_rfc3339(),
reason: None,
});
}
if let Some(completed) = job.completed_at {
transitions.push(TransitionInfo {
from: "running".to_string(),
to: job.status.clone(),
timestamp: completed.to_rfc3339(),
reason: job.failure_reason.clone(),
});
}
return Ok(Json(JobDetailResponse {
id: job.id,
title: job.task.clone(),
description: String::new(),
state: ui_state.to_string(),
user_id: job.user_id.clone(),
created_at: job.created_at.to_rfc3339(),
started_at: job.started_at.map(|dt| dt.to_rfc3339()),
completed_at: job.completed_at.map(|dt| dt.to_rfc3339()),
elapsed_secs,
project_dir: Some(job.project_dir.clone()),
browse_url: Some(format!("/projects/{}/", browse_id)),
job_mode: {
let mode = store.get_sandbox_job_mode(job.id).await.ok().flatten();
mode.filter(|m| m != "worker")
},
transitions,
}));
}
Err((StatusCode::NOT_FOUND, "Job not found".to_string()))
}
async fn jobs_cancel_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let job_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
if let Some(ref store) = state.store
&& let Ok(Some(job)) = store.get_sandbox_job(job_id).await
{
if job.user_id != state.user_id {
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
if job.status == "running" || job.status == "creating" {
if let Some(ref jm) = state.job_manager
&& let Err(e) = jm.stop_job(job_id).await
{
tracing::warn!(job_id = %job_id, error = %e, "Failed to stop container during cancellation");
}
store
.update_sandbox_job_status(
job_id,
"failed",
Some(false),
Some("Cancelled by user"),
None,
Some(chrono::Utc::now()),
)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
}
return Ok(Json(serde_json::json!({
"status": "cancelled",
"job_id": job_id,
})));
}
Err((StatusCode::NOT_FOUND, "Job not found".to_string()))
}
async fn jobs_restart_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let jm = state.job_manager.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Sandbox not enabled".to_string(),
))?;
let old_job_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
let old_job = store
.get_sandbox_job(old_job_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Job not found".to_string()))?;
if old_job.user_id != state.user_id {
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
if old_job.status != "interrupted" && old_job.status != "failed" {
return Err((
StatusCode::CONFLICT,
format!("Cannot restart job in state '{}'", old_job.status),
));
}
let new_job_id = Uuid::new_v4();
let now = chrono::Utc::now();
let record = crate::history::SandboxJobRecord {
id: new_job_id,
task: old_job.task.clone(),
status: "creating".to_string(),
user_id: old_job.user_id.clone(),
project_dir: old_job.project_dir.clone(),
success: None,
failure_reason: None,
created_at: now,
started_at: None,
completed_at: None,
};
store
.save_sandbox_job(&record)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mode = match store.get_sandbox_job_mode(old_job_id).await {
Ok(Some(m)) if m == "claude_code" => crate::orchestrator::job_manager::JobMode::ClaudeCode,
_ => crate::orchestrator::job_manager::JobMode::Worker,
};
let project_dir = std::path::PathBuf::from(&old_job.project_dir);
let _token = jm
.create_job(new_job_id, &old_job.task, Some(project_dir), mode)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create container: {}", e),
)
})?;
store
.update_sandbox_job_status(new_job_id, "running", None, None, Some(now), None)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(serde_json::json!({
"status": "restarted",
"old_job_id": old_job_id,
"new_job_id": new_job_id,
})))
}
async fn jobs_prompt_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
Json(body): Json<serde_json::Value>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let prompt_queue = state.prompt_queue.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Claude Code not configured".to_string(),
))?;
let job_id: uuid::Uuid = id
.parse()
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
if let Some(ref store) = state.store
&& !store
.sandbox_job_belongs_to_user(job_id, &state.user_id)
.await
.unwrap_or(false)
{
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
let content = body
.get("content")
.and_then(|v| v.as_str())
.ok_or((
StatusCode::BAD_REQUEST,
"Missing 'content' field".to_string(),
))?
.to_string();
let done = body.get("done").and_then(|v| v.as_bool()).unwrap_or(false);
let prompt = crate::orchestrator::api::PendingPrompt { content, done };
{
let mut queue = prompt_queue.lock().await;
queue.entry(job_id).or_default().push_back(prompt);
}
Ok(Json(serde_json::json!({
"status": "queued",
"job_id": job_id.to_string(),
})))
}
async fn jobs_events_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Database not available".to_string(),
))?;
let job_id: uuid::Uuid = id
.parse()
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
if !store
.sandbox_job_belongs_to_user(job_id, &state.user_id)
.await
.unwrap_or(false)
{
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
let events = store
.list_job_events(job_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let events_json: Vec<serde_json::Value> = events
.into_iter()
.map(|e| {
serde_json::json!({
"id": e.id,
"event_type": e.event_type,
"data": e.data,
"created_at": e.created_at.to_rfc3339(),
})
})
.collect();
Ok(Json(serde_json::json!({
"job_id": job_id.to_string(),
"events": events_json,
})))
}
#[derive(Deserialize)]
struct FilePathQuery {
path: Option<String>,
}
async fn job_files_list_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
Query(query): Query<FilePathQuery>,
) -> Result<Json<ProjectFilesResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let job_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
let job = store
.get_sandbox_job(job_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Job not found".to_string()))?;
if job.user_id != state.user_id {
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
let base = std::path::PathBuf::from(&job.project_dir);
let rel_path = query.path.as_deref().unwrap_or("");
let target = base.join(rel_path);
let canonical = target
.canonicalize()
.map_err(|_| (StatusCode::NOT_FOUND, "Path not found".to_string()))?;
let base_canonical = base
.canonicalize()
.map_err(|_| (StatusCode::NOT_FOUND, "Project dir not found".to_string()))?;
if !canonical.starts_with(&base_canonical) {
return Err((StatusCode::FORBIDDEN, "Forbidden".to_string()));
}
let mut entries = Vec::new();
let mut read_dir = tokio::fs::read_dir(&canonical)
.await
.map_err(|_| (StatusCode::NOT_FOUND, "Cannot read directory".to_string()))?;
while let Ok(Some(entry)) = read_dir.next_entry().await {
let name = entry.file_name().to_string_lossy().to_string();
let is_dir = entry
.file_type()
.await
.map(|ft| ft.is_dir())
.unwrap_or(false);
let rel = if rel_path.is_empty() {
name.clone()
} else {
format!("{}/{}", rel_path, name)
};
entries.push(ProjectFileEntry {
name,
path: rel,
is_dir,
});
}
entries.sort_by(|a, b| b.is_dir.cmp(&a.is_dir).then_with(|| a.name.cmp(&b.name)));
Ok(Json(ProjectFilesResponse { entries }))
}
async fn job_files_read_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
Query(query): Query<FilePathQuery>,
) -> Result<Json<ProjectFileReadResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let job_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid job ID".to_string()))?;
let job = store
.get_sandbox_job(job_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Job not found".to_string()))?;
if job.user_id != state.user_id {
return Err((StatusCode::NOT_FOUND, "Job not found".to_string()));
}
let path = query.path.as_deref().ok_or((
StatusCode::BAD_REQUEST,
"path parameter required".to_string(),
))?;
let base = std::path::PathBuf::from(&job.project_dir);
let file_path = base.join(path);
let canonical = file_path
.canonicalize()
.map_err(|_| (StatusCode::NOT_FOUND, "File not found".to_string()))?;
let base_canonical = base
.canonicalize()
.map_err(|_| (StatusCode::NOT_FOUND, "Project dir not found".to_string()))?;
if !canonical.starts_with(&base_canonical) {
return Err((StatusCode::FORBIDDEN, "Forbidden".to_string()));
}
let content = tokio::fs::read_to_string(&canonical)
.await
.map_err(|_| (StatusCode::NOT_FOUND, "Cannot read file".to_string()))?;
Ok(Json(ProjectFileReadResponse {
path: path.to_string(),
content,
}))
}
async fn logs_events_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<
Sse<impl futures::Stream<Item = Result<Event, Infallible>> + Send + 'static>,
(StatusCode, String),
> {
let broadcaster = state.log_broadcaster.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Log broadcaster not available".to_string(),
))?;
let rx = broadcaster.subscribe();
let history = broadcaster.recent_entries();
let history_stream = futures::stream::iter(history).map(|entry| {
let data = serde_json::to_string(&entry).unwrap_or_default();
Ok(Event::default().event("log").data(data))
});
let live_stream = tokio_stream::wrappers::BroadcastStream::new(rx)
.filter_map(|result| result.ok())
.map(|entry| {
let data = serde_json::to_string(&entry).unwrap_or_default();
Ok(Event::default().event("log").data(data))
});
let stream = history_stream.chain(live_stream);
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(std::time::Duration::from_secs(30))
.text(""),
))
}
async fn extensions_list_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<ExtensionListResponse>, (StatusCode, String)> {
let ext_mgr = state.extension_manager.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Extension manager not available (secrets store required)".to_string(),
))?;
let installed = ext_mgr
.list(None)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let extensions = installed
.into_iter()
.map(|ext| ExtensionInfo {
name: ext.name,
kind: ext.kind.to_string(),
description: ext.description,
url: ext.url,
authenticated: ext.authenticated,
active: ext.active,
tools: ext.tools,
})
.collect();
Ok(Json(ExtensionListResponse { extensions }))
}
async fn extensions_tools_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<ToolListResponse>, (StatusCode, String)> {
let registry = state.tool_registry.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Tool registry not available".to_string(),
))?;
let definitions = registry.tool_definitions().await;
let tools = definitions
.into_iter()
.map(|td| ToolInfo {
name: td.name,
description: td.description,
})
.collect();
Ok(Json(ToolListResponse { tools }))
}
async fn extensions_install_handler(
State(state): State<Arc<GatewayState>>,
Json(req): Json<InstallExtensionRequest>,
) -> Result<Json<ActionResponse>, (StatusCode, String)> {
let ext_mgr = state.extension_manager.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Extension manager not available (secrets store required)".to_string(),
))?;
let kind_hint = req.kind.as_deref().and_then(|k| match k {
"mcp_server" => Some(crate::extensions::ExtensionKind::McpServer),
"wasm_tool" => Some(crate::extensions::ExtensionKind::WasmTool),
"wasm_channel" => Some(crate::extensions::ExtensionKind::WasmChannel),
_ => None,
});
match ext_mgr
.install(&req.name, req.url.as_deref(), kind_hint)
.await
{
Ok(result) => Ok(Json(ActionResponse::ok(result.message))),
Err(e) => Ok(Json(ActionResponse::fail(e.to_string()))),
}
}
async fn extensions_activate_handler(
State(state): State<Arc<GatewayState>>,
Path(name): Path<String>,
) -> Result<Json<ActionResponse>, (StatusCode, String)> {
let ext_mgr = state.extension_manager.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Extension manager not available (secrets store required)".to_string(),
))?;
match ext_mgr.activate(&name).await {
Ok(result) => Ok(Json(ActionResponse::ok(result.message))),
Err(activate_err) => {
let err_str = activate_err.to_string();
let needs_auth = err_str.contains("authentication")
|| err_str.contains("401")
|| err_str.contains("Unauthorized");
if !needs_auth {
return Ok(Json(ActionResponse::fail(err_str)));
}
match ext_mgr.auth(&name, None).await {
Ok(auth_result) if auth_result.status == "authenticated" => {
match ext_mgr.activate(&name).await {
Ok(result) => Ok(Json(ActionResponse::ok(result.message))),
Err(e) => Ok(Json(ActionResponse::fail(e.to_string()))),
}
}
Ok(auth_result) => {
let mut resp = ActionResponse::fail(
auth_result
.instructions
.clone()
.unwrap_or_else(|| format!("'{}' requires authentication.", name)),
);
resp.auth_url = auth_result.auth_url;
resp.awaiting_token = Some(auth_result.awaiting_token);
resp.instructions = auth_result.instructions;
Ok(Json(resp))
}
Err(auth_err) => Ok(Json(ActionResponse::fail(format!(
"Authentication failed: {}",
auth_err
)))),
}
}
}
}
async fn project_redirect_handler(Path(project_id): Path<String>) -> impl IntoResponse {
axum::response::Redirect::permanent(&format!("/projects/{project_id}/"))
}
async fn project_index_handler(Path(project_id): Path<String>) -> impl IntoResponse {
serve_project_file(&project_id, "index.html").await
}
async fn project_file_handler(
Path((project_id, path)): Path<(String, String)>,
) -> impl IntoResponse {
serve_project_file(&project_id, &path).await
}
async fn serve_project_file(project_id: &str, path: &str) -> axum::response::Response {
if project_id.contains('/')
|| project_id.contains('\\')
|| project_id.contains("..")
|| project_id.is_empty()
{
return (StatusCode::BAD_REQUEST, "Invalid project ID").into_response();
}
let base = dirs::home_dir()
.unwrap_or_else(|| std::path::PathBuf::from("."))
.join(".ironclaw")
.join("projects")
.join(project_id);
let file_path = base.join(path);
let canonical = match file_path.canonicalize() {
Ok(p) => p,
Err(_) => return (StatusCode::NOT_FOUND, "Not found").into_response(),
};
let base_canonical = match base.canonicalize() {
Ok(p) => p,
Err(_) => return (StatusCode::NOT_FOUND, "Not found").into_response(),
};
if !canonical.starts_with(&base_canonical) {
return (StatusCode::FORBIDDEN, "Forbidden").into_response();
}
match tokio::fs::read(&canonical).await {
Ok(contents) => {
let mime = mime_guess::from_path(&canonical)
.first_or_octet_stream()
.to_string();
([(header::CONTENT_TYPE, mime)], contents).into_response()
}
Err(_) => (StatusCode::NOT_FOUND, "Not found").into_response(),
}
}
async fn extensions_remove_handler(
State(state): State<Arc<GatewayState>>,
Path(name): Path<String>,
) -> Result<Json<ActionResponse>, (StatusCode, String)> {
let ext_mgr = state.extension_manager.as_ref().ok_or((
StatusCode::NOT_IMPLEMENTED,
"Extension manager not available (secrets store required)".to_string(),
))?;
match ext_mgr.remove(&name).await {
Ok(message) => Ok(Json(ActionResponse::ok(message))),
Err(e) => Ok(Json(ActionResponse::fail(e.to_string()))),
}
}
async fn routines_list_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<RoutineListResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routines = store
.list_routines(&state.user_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let items: Vec<RoutineInfo> = routines.iter().map(routine_to_info).collect();
Ok(Json(RoutineListResponse { routines: items }))
}
async fn routines_summary_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<RoutineSummaryResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routines = store
.list_routines(&state.user_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let total = routines.len() as u64;
let enabled = routines.iter().filter(|r| r.enabled).count() as u64;
let disabled = total - enabled;
let failing = routines
.iter()
.filter(|r| r.consecutive_failures > 0)
.count() as u64;
let today_start = chrono::Utc::now()
.date_naive()
.and_hms_opt(0, 0, 0)
.map(|dt| dt.and_utc());
let runs_today = if let Some(start) = today_start {
routines
.iter()
.filter(|r| r.last_run_at.is_some_and(|ts| ts >= start))
.count() as u64
} else {
0
};
Ok(Json(RoutineSummaryResponse {
total,
enabled,
disabled,
failing,
runs_today,
}))
}
async fn routines_detail_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<RoutineDetailResponse>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routine_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid routine ID".to_string()))?;
let routine = store
.get_routine(routine_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Routine not found".to_string()))?;
let runs = store
.list_routine_runs(routine_id, 20)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let recent_runs: Vec<RoutineRunInfo> = runs
.iter()
.map(|run| RoutineRunInfo {
id: run.id,
trigger_type: run.trigger_type.clone(),
started_at: run.started_at.to_rfc3339(),
completed_at: run.completed_at.map(|dt| dt.to_rfc3339()),
status: format!("{:?}", run.status),
result_summary: run.result_summary.clone(),
tokens_used: run.tokens_used,
})
.collect();
Ok(Json(RoutineDetailResponse {
id: routine.id,
name: routine.name.clone(),
description: routine.description.clone(),
enabled: routine.enabled,
trigger: serde_json::to_value(&routine.trigger).unwrap_or_default(),
action: serde_json::to_value(&routine.action).unwrap_or_default(),
guardrails: serde_json::to_value(&routine.guardrails).unwrap_or_default(),
notify: serde_json::to_value(&routine.notify).unwrap_or_default(),
last_run_at: routine.last_run_at.map(|dt| dt.to_rfc3339()),
next_fire_at: routine.next_fire_at.map(|dt| dt.to_rfc3339()),
run_count: routine.run_count,
consecutive_failures: routine.consecutive_failures,
created_at: routine.created_at.to_rfc3339(),
recent_runs,
}))
}
async fn routines_trigger_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routine_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid routine ID".to_string()))?;
let routine = store
.get_routine(routine_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Routine not found".to_string()))?;
let prompt = match &routine.action {
crate::agent::routine::RoutineAction::Lightweight { prompt, .. } => prompt.clone(),
crate::agent::routine::RoutineAction::FullJob {
title, description, ..
} => format!("{}: {}", title, description),
};
let content = format!("[routine:{}] {}", routine.name, prompt);
let msg = IncomingMessage::new("gateway", &state.user_id, content);
let tx_guard = state.msg_tx.read().await;
let tx = tx_guard.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Channel not started".to_string(),
))?;
tx.send(msg).await.map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"Channel closed".to_string(),
)
})?;
Ok(Json(serde_json::json!({
"status": "triggered",
"routine_id": routine_id,
})))
}
#[derive(Deserialize)]
struct ToggleRequest {
enabled: Option<bool>,
}
async fn routines_toggle_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
body: Option<Json<ToggleRequest>>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routine_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid routine ID".to_string()))?;
let mut routine = store
.get_routine(routine_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Routine not found".to_string()))?;
routine.enabled = match body {
Some(Json(req)) => req.enabled.unwrap_or(!routine.enabled),
None => !routine.enabled,
};
store
.update_routine(&routine)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(serde_json::json!({
"status": if routine.enabled { "enabled" } else { "disabled" },
"routine_id": routine_id,
})))
}
async fn routines_delete_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routine_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid routine ID".to_string()))?;
let deleted = store
.delete_routine(routine_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
if deleted {
Ok(Json(serde_json::json!({
"status": "deleted",
"routine_id": routine_id,
})))
} else {
Err((StatusCode::NOT_FOUND, "Routine not found".to_string()))
}
}
async fn routines_runs_handler(
State(state): State<Arc<GatewayState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let store = state.store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"Database not available".to_string(),
))?;
let routine_id = Uuid::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid routine ID".to_string()))?;
let runs = store
.list_routine_runs(routine_id, 50)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let run_infos: Vec<RoutineRunInfo> = runs
.iter()
.map(|run| RoutineRunInfo {
id: run.id,
trigger_type: run.trigger_type.clone(),
started_at: run.started_at.to_rfc3339(),
completed_at: run.completed_at.map(|dt| dt.to_rfc3339()),
status: format!("{:?}", run.status),
result_summary: run.result_summary.clone(),
tokens_used: run.tokens_used,
})
.collect();
Ok(Json(serde_json::json!({
"routine_id": routine_id,
"runs": run_infos,
})))
}
fn routine_to_info(r: &crate::agent::routine::Routine) -> RoutineInfo {
let (trigger_type, trigger_summary) = match &r.trigger {
crate::agent::routine::Trigger::Cron { schedule } => {
("cron".to_string(), format!("cron: {}", schedule))
}
crate::agent::routine::Trigger::Event {
pattern, channel, ..
} => {
let ch = channel.as_deref().unwrap_or("any");
("event".to_string(), format!("on {} /{}/", ch, pattern))
}
crate::agent::routine::Trigger::Webhook { path, .. } => {
let p = path.as_deref().unwrap_or("/");
("webhook".to_string(), format!("webhook: {}", p))
}
crate::agent::routine::Trigger::Manual => ("manual".to_string(), "manual only".to_string()),
};
let action_type = match &r.action {
crate::agent::routine::RoutineAction::Lightweight { .. } => "lightweight",
crate::agent::routine::RoutineAction::FullJob { .. } => "full_job",
};
let status = if !r.enabled {
"disabled"
} else if r.consecutive_failures > 0 {
"failing"
} else {
"active"
};
RoutineInfo {
id: r.id,
name: r.name.clone(),
description: r.description.clone(),
enabled: r.enabled,
trigger_type,
trigger_summary,
action_type: action_type.to_string(),
last_run_at: r.last_run_at.map(|dt| dt.to_rfc3339()),
next_fire_at: r.next_fire_at.map(|dt| dt.to_rfc3339()),
run_count: r.run_count,
consecutive_failures: r.consecutive_failures,
status: status.to_string(),
}
}
async fn settings_list_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<SettingsListResponse>, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let rows = store.list_settings(&state.user_id).await.map_err(|e| {
tracing::error!("Failed to list settings: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let settings = rows
.into_iter()
.map(|r| SettingResponse {
key: r.key,
value: r.value,
updated_at: r.updated_at.to_rfc3339(),
})
.collect();
Ok(Json(SettingsListResponse { settings }))
}
async fn settings_get_handler(
State(state): State<Arc<GatewayState>>,
Path(key): Path<String>,
) -> Result<Json<SettingResponse>, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let row = store
.get_setting_full(&state.user_id, &key)
.await
.map_err(|e| {
tracing::error!("Failed to get setting '{}': {}", key, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(SettingResponse {
key: row.key,
value: row.value,
updated_at: row.updated_at.to_rfc3339(),
}))
}
async fn settings_set_handler(
State(state): State<Arc<GatewayState>>,
Path(key): Path<String>,
Json(body): Json<SettingWriteRequest>,
) -> Result<StatusCode, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
store
.set_setting(&state.user_id, &key, &body.value)
.await
.map_err(|e| {
tracing::error!("Failed to set setting '{}': {}", key, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(StatusCode::NO_CONTENT)
}
async fn settings_delete_handler(
State(state): State<Arc<GatewayState>>,
Path(key): Path<String>,
) -> Result<StatusCode, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
store
.delete_setting(&state.user_id, &key)
.await
.map_err(|e| {
tracing::error!("Failed to delete setting '{}': {}", key, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(StatusCode::NO_CONTENT)
}
async fn settings_export_handler(
State(state): State<Arc<GatewayState>>,
) -> Result<Json<SettingsExportResponse>, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let settings = store.get_all_settings(&state.user_id).await.map_err(|e| {
tracing::error!("Failed to export settings: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(SettingsExportResponse { settings }))
}
async fn settings_import_handler(
State(state): State<Arc<GatewayState>>,
Json(body): Json<SettingsImportRequest>,
) -> Result<StatusCode, StatusCode> {
let store = state
.store
.as_ref()
.ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
store
.set_all_settings(&state.user_id, &body.settings)
.await
.map_err(|e| {
tracing::error!("Failed to import settings: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(StatusCode::NO_CONTENT)
}
async fn gateway_status_handler(
State(state): State<Arc<GatewayState>>,
) -> Json<GatewayStatusResponse> {
let sse_connections = state.sse.connection_count();
let ws_connections = state
.ws_tracker
.as_ref()
.map(|t| t.connection_count())
.unwrap_or(0);
Json(GatewayStatusResponse {
sse_connections,
ws_connections,
total_connections: sse_connections + ws_connections,
})
}
#[derive(serde::Serialize)]
struct GatewayStatusResponse {
sse_connections: u64,
ws_connections: u64,
total_connections: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_turns_from_db_messages_complete() {
let now = chrono::Utc::now();
let messages = vec![
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "Hello".to_string(),
created_at: now,
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Hi there!".to_string(),
created_at: now + chrono::TimeDelta::seconds(1),
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "How are you?".to_string(),
created_at: now + chrono::TimeDelta::seconds(2),
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Doing well!".to_string(),
created_at: now + chrono::TimeDelta::seconds(3),
},
];
let turns = build_turns_from_db_messages(&messages);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].user_input, "Hello");
assert_eq!(turns[0].response.as_deref(), Some("Hi there!"));
assert_eq!(turns[0].state, "Completed");
assert_eq!(turns[1].user_input, "How are you?");
assert_eq!(turns[1].response.as_deref(), Some("Doing well!"));
}
#[test]
fn test_build_turns_from_db_messages_incomplete_last() {
let now = chrono::Utc::now();
let messages = vec![
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "Hello".to_string(),
created_at: now,
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "assistant".to_string(),
content: "Hi!".to_string(),
created_at: now + chrono::TimeDelta::seconds(1),
},
crate::history::ConversationMessage {
id: Uuid::new_v4(),
role: "user".to_string(),
content: "Lost message".to_string(),
created_at: now + chrono::TimeDelta::seconds(2),
},
];
let turns = build_turns_from_db_messages(&messages);
assert_eq!(turns.len(), 2);
assert_eq!(turns[1].user_input, "Lost message");
assert!(turns[1].response.is_none());
assert_eq!(turns[1].state, "Failed");
}
#[test]
fn test_build_turns_from_db_messages_empty() {
let turns = build_turns_from_db_messages(&[]);
assert!(turns.is_empty());
}
}