use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, broadcast};
use uuid::Uuid;
use crate::channels::web::types::ToolDecisionDto;
use crate::db::Database;
use crate::llm::{CompletionRequest, LlmProvider, ToolCompletionRequest};
use crate::orchestrator::auth::{TokenStore, worker_auth_middleware};
use crate::orchestrator::job_manager::ContainerJobManager;
use crate::secrets::SecretsStore;
use crate::worker::api::JobEventPayload;
use crate::worker::api::{
CompletionReport, CredentialResponse, JobDescription, ProxyCompletionRequest,
ProxyCompletionResponse, ProxyToolCompletionRequest, ProxyToolCompletionResponse, StatusUpdate,
};
use ironclaw_common::AppEvent;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingPrompt {
pub content: String,
pub done: bool,
}
#[derive(Clone)]
pub struct OrchestratorState {
pub llm: Arc<dyn LlmProvider>,
pub job_manager: Arc<ContainerJobManager>,
pub token_store: TokenStore,
pub job_event_tx: Option<broadcast::Sender<(Uuid, String, AppEvent)>>,
pub prompt_queue: Arc<Mutex<HashMap<Uuid, VecDeque<PendingPrompt>>>>,
pub store: Option<Arc<dyn Database>>,
pub secrets_store: Option<Arc<dyn SecretsStore + Send + Sync>>,
pub user_id: String,
pub job_owner_cache: Arc<std::sync::RwLock<HashMap<Uuid, String>>>,
}
pub struct OrchestratorApi;
impl OrchestratorApi {
pub fn router(state: OrchestratorState) -> Router {
Router::new()
.route("/worker/{job_id}/job", get(get_job))
.route("/worker/{job_id}/llm/complete", post(llm_complete))
.route(
"/worker/{job_id}/llm/complete_with_tools",
post(llm_complete_with_tools),
)
.route("/worker/{job_id}/status", post(report_status))
.route("/worker/{job_id}/complete", post(report_complete))
.route("/worker/{job_id}/event", post(job_event_handler))
.route("/worker/{job_id}/prompt", get(get_prompt_handler))
.route("/worker/{job_id}/credentials", get(get_credentials_handler))
.route_layer(axum::middleware::from_fn_with_state(
state.token_store.clone(),
worker_auth_middleware,
))
.route("/health", get(health_check))
.with_state(state)
}
pub async fn start(
state: OrchestratorState,
port: u16,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let router = Self::router(state);
let addr = if cfg!(target_os = "linux") {
std::net::SocketAddr::from(([0, 0, 0, 0], port))
} else {
std::net::SocketAddr::from(([127, 0, 0, 1], port))
};
tracing::info!("Orchestrator internal API listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, router).await?;
Ok(())
}
}
async fn health_check() -> &'static str {
"ok"
}
async fn get_job(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
) -> Result<Json<JobDescription>, StatusCode> {
let handle = state
.job_manager
.get_handle(job_id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(JobDescription {
title: format!("Job {}", job_id),
description: handle.task_description,
project_dir: handle.project_dir.map(|p| p.display().to_string()),
}))
}
async fn llm_complete(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
Json(req): Json<ProxyCompletionRequest>,
) -> Result<Json<ProxyCompletionResponse>, StatusCode> {
let completion_req = CompletionRequest {
messages: req.messages,
model: req.model,
max_tokens: req.max_tokens,
temperature: req.temperature,
stop_sequences: req.stop_sequences,
metadata: std::collections::HashMap::new(),
};
let resp = state.llm.complete(completion_req).await.map_err(|e| {
tracing::error!("LLM completion failed for job {}: {}", job_id, e);
StatusCode::BAD_GATEWAY
})?;
Ok(Json(ProxyCompletionResponse {
content: resp.content,
input_tokens: resp.input_tokens,
output_tokens: resp.output_tokens,
finish_reason: format_finish_reason(resp.finish_reason),
cache_read_input_tokens: resp.cache_read_input_tokens,
cache_creation_input_tokens: resp.cache_creation_input_tokens,
}))
}
async fn llm_complete_with_tools(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
Json(req): Json<ProxyToolCompletionRequest>,
) -> Result<Json<ProxyToolCompletionResponse>, StatusCode> {
let tool_req = ToolCompletionRequest {
messages: req.messages,
tools: req.tools,
model: req.model,
max_tokens: req.max_tokens,
temperature: req.temperature,
stop_sequences: req.stop_sequences,
tool_choice: req.tool_choice,
metadata: std::collections::HashMap::new(),
};
let resp = state.llm.complete_with_tools(tool_req).await.map_err(|e| {
tracing::error!("LLM tool completion failed for job {}: {}", job_id, e);
StatusCode::BAD_GATEWAY
})?;
Ok(Json(ProxyToolCompletionResponse {
content: resp.content,
tool_calls: resp.tool_calls,
input_tokens: resp.input_tokens,
output_tokens: resp.output_tokens,
finish_reason: format_finish_reason(resp.finish_reason),
cache_read_input_tokens: resp.cache_read_input_tokens,
cache_creation_input_tokens: resp.cache_creation_input_tokens,
}))
}
async fn report_status(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
Json(update): Json<StatusUpdate>,
) -> Result<StatusCode, StatusCode> {
tracing::debug!(
job_id = %job_id,
state = %update.state,
iteration = update.iteration,
"Worker status update"
);
state
.job_manager
.update_worker_status(job_id, update.message, update.iteration)
.await;
Ok(StatusCode::OK)
}
async fn report_complete(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
Json(report): Json<CompletionReport>,
) -> Result<Json<serde_json::Value>, StatusCode> {
if report.success {
tracing::info!(
job_id = %job_id,
"Worker reported job complete"
);
} else {
tracing::warn!(
job_id = %job_id,
message = ?report.message,
"Worker reported job failure"
);
}
let result = crate::orchestrator::job_manager::CompletionResult {
success: report.success,
message: report.message.clone(),
};
if let Err(e) = state.job_manager.complete_job(job_id, result).await {
tracing::error!(job_id = %job_id, "Failed to complete job cleanup: {}", e);
}
Ok(Json(serde_json::json!({"status": "ok"})))
}
async fn job_event_handler(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
Json(payload): Json<JobEventPayload>,
) -> Result<StatusCode, StatusCode> {
tracing::debug!(
job_id = %job_id,
event_type = %payload.event_type,
"Job event received"
);
if let Some(ref store) = state.store {
let store = Arc::clone(store);
let event_type = payload.event_type.clone();
let data = payload.data.clone();
tokio::spawn(async move {
if let Err(e) = store.save_job_event(job_id, &event_type, &data).await {
tracing::warn!(job_id = %job_id, "Failed to persist job event: {}", e);
}
});
}
let job_id_str = job_id.to_string();
let app_event = match payload.event_type.as_str() {
"message" => AppEvent::JobMessage {
job_id: job_id_str,
role: payload
.data
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("assistant")
.to_string(),
content: payload
.data
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
},
"tool_use" => AppEvent::JobToolUse {
job_id: job_id_str,
tool_name: payload
.data
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
input: payload
.data
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Null),
},
"tool_result" => AppEvent::JobToolResult {
job_id: job_id_str,
tool_name: payload
.data
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
output: payload
.data
.get("output")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
},
"result" => AppEvent::JobResult {
job_id: job_id_str,
status: payload
.data
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
session_id: payload
.data
.get("session_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
fallback_deliverable: payload.data.get("fallback_deliverable").cloned(),
},
"reasoning" => {
let narrative = payload
.data
.get("narrative")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let decisions = ToolDecisionDto::from_json_array(&payload.data["decisions"]);
AppEvent::JobReasoning {
job_id: job_id_str,
narrative,
decisions,
}
}
_ => AppEvent::JobStatus {
job_id: job_id_str,
message: payload
.data
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
},
};
if let Some(ref tx) = state.job_event_tx {
let cached_uid = state
.job_owner_cache
.read()
.unwrap_or_else(|e| e.into_inner())
.get(&job_id)
.cloned();
let user_id = match cached_uid {
Some(uid) => uid,
None => {
let uid = match state.store.as_ref() {
Some(store) => store
.get_sandbox_job(job_id)
.await
.ok()
.flatten()
.map(|j| j.user_id),
None => None,
};
if let Some(ref uid) = uid {
state
.job_owner_cache
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(job_id, uid.clone());
}
uid.unwrap_or_default()
}
};
if user_id.is_empty() {
let _ = tx.send((job_id, String::new(), app_event));
} else {
let _ = tx.send((job_id, user_id, app_event));
}
}
Ok(StatusCode::OK)
}
async fn get_prompt_handler(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
) -> Result<(StatusCode, Json<serde_json::Value>), StatusCode> {
let mut queue = state.prompt_queue.lock().await;
if let Some(prompts) = queue.get_mut(&job_id)
&& let Some(prompt) = prompts.pop_front()
{
return Ok((
StatusCode::OK,
Json(serde_json::json!({
"content": prompt.content,
"done": prompt.done,
})),
));
}
Ok((StatusCode::NO_CONTENT, Json(serde_json::Value::Null)))
}
async fn get_credentials_handler(
State(state): State<OrchestratorState>,
Path(job_id): Path<Uuid>,
) -> Result<(StatusCode, Json<serde_json::Value>), StatusCode> {
let grants = match state.token_store.get_grants(job_id).await {
Some(g) if !g.is_empty() => g,
_ => return Ok((StatusCode::NO_CONTENT, Json(serde_json::Value::Null))),
};
let secrets = state.secrets_store.as_ref().ok_or_else(|| {
tracing::error!("Credentials requested but no secrets store configured");
StatusCode::SERVICE_UNAVAILABLE
})?;
let mut credentials: Vec<CredentialResponse> = Vec::with_capacity(grants.len());
for grant in &grants {
let decrypted = secrets
.get_decrypted(&state.user_id, &grant.secret_name)
.await
.map_err(|e| {
tracing::error!(
job_id = %job_id,
"Failed to decrypt secret for credential grant: {}", e
);
StatusCode::INTERNAL_SERVER_ERROR
})?;
if let Ok(secret) = secrets.get(&state.user_id, &grant.secret_name).await
&& let Err(e) = secrets.record_usage(secret.id).await
{
tracing::warn!(
job_id = %job_id,
"Failed to record credential usage: {}", e
);
}
tracing::debug!(
job_id = %job_id,
env_var = %grant.env_var,
"Serving credential to container"
);
credentials.push(CredentialResponse {
env_var: grant.env_var.clone(),
value: decrypted.expose().to_string(),
});
}
Ok((
StatusCode::OK,
Json(serde_json::to_value(&credentials).unwrap_or(serde_json::Value::Null)),
))
}
fn format_finish_reason(reason: crate::llm::FinishReason) -> String {
match reason {
crate::llm::FinishReason::Stop => "stop".to_string(),
crate::llm::FinishReason::Length => "length".to_string(),
crate::llm::FinishReason::ToolUse => "tool_use".to_string(),
crate::llm::FinishReason::ContentFilter => "content_filter".to_string(),
crate::llm::FinishReason::Unknown => "unknown".to_string(),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
use uuid::Uuid;
use crate::orchestrator::auth::TokenStore;
use crate::orchestrator::job_manager::{ContainerJobConfig, ContainerJobManager};
use crate::testing::StubLlm;
use super::*;
fn test_state() -> OrchestratorState {
let token_store = TokenStore::new();
let jm = ContainerJobManager::new(ContainerJobConfig::default(), token_store.clone());
OrchestratorState {
llm: Arc::new(StubLlm::default()),
job_manager: Arc::new(jm),
token_store,
job_event_tx: None,
prompt_queue: Arc::new(Mutex::new(HashMap::new())),
store: None,
secrets_store: None,
user_id: "default".to_string(),
job_owner_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
}
}
#[tokio::test]
async fn health_requires_no_auth() {
let state = test_state();
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn worker_route_rejects_missing_token() {
let state = test_state();
let router = OrchestratorApi::router(state);
let job_id = Uuid::new_v4();
let req = Request::builder()
.uri(format!("/worker/{}/job", job_id))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn worker_route_rejects_wrong_token() {
let state = test_state();
let router = OrchestratorApi::router(state);
let job_id = Uuid::new_v4();
let req = Request::builder()
.uri(format!("/worker/{}/job", job_id))
.header("Authorization", "Bearer totally-bogus")
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn worker_route_accepts_valid_token() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/job", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn token_for_job_a_rejected_on_job_b() {
let state = test_state();
let job_a = Uuid::new_v4();
let job_b = Uuid::new_v4();
let token_a = state.token_store.create_token(job_a).await;
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/job", job_b))
.header("Authorization", format!("Bearer {}", token_a))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn prompt_returns_204_when_queue_empty() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/prompt", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn prompt_returns_queued_prompt() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
{
let mut q = state.prompt_queue.lock().await;
q.entry(job_id).or_default().push_back(PendingPrompt {
content: "What is the status?".to_string(),
done: false,
});
}
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/prompt", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), 4096).await.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["content"], "What is the status?");
assert_eq!(json["done"], false);
}
#[tokio::test]
async fn credentials_returns_204_when_no_grants() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/credentials", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn credentials_returns_503_when_no_secrets_store() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
state
.token_store
.store_grants(
job_id,
vec![crate::orchestrator::auth::CredentialGrant {
secret_name: "test_secret".to_string(),
env_var: "TEST_SECRET".to_string(),
}],
)
.await;
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/credentials", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn credentials_returns_secrets_when_store_configured() {
use crate::testing::credentials::test_secrets_store;
use secrecy::SecretString;
let secrets_store = Arc::new(test_secrets_store());
secrets_store
.create(
"default",
crate::secrets::CreateSecretParams {
name: "test_secret".to_string(),
value: SecretString::from("supersecretvalue".to_string()),
provider: None,
expires_at: None,
},
)
.await
.unwrap();
let token_store = TokenStore::new();
let jm = ContainerJobManager::new(ContainerJobConfig::default(), token_store.clone());
let job_id = Uuid::new_v4();
let token = token_store.create_token(job_id).await;
token_store
.store_grants(
job_id,
vec![crate::orchestrator::auth::CredentialGrant {
secret_name: "test_secret".to_string(),
env_var: "MY_SECRET".to_string(),
}],
)
.await;
let state = OrchestratorState {
llm: Arc::new(StubLlm::default()),
job_manager: Arc::new(jm),
token_store,
job_event_tx: None,
prompt_queue: Arc::new(Mutex::new(HashMap::new())),
store: None,
secrets_store: Some(secrets_store),
user_id: "default".to_string(),
job_owner_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
};
let router = OrchestratorApi::router(state);
let req = Request::builder()
.uri(format!("/worker/{}/credentials", job_id))
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), 4096).await.unwrap();
let json: Vec<serde_json::Value> = serde_json::from_slice(&body).unwrap();
assert_eq!(json.len(), 1);
assert_eq!(json[0]["env_var"], "MY_SECRET");
assert_eq!(json[0]["value"], "supersecretvalue");
}
#[tokio::test]
async fn job_event_broadcasts_message() {
let (tx, mut rx) = broadcast::channel(16);
let token_store = TokenStore::new();
let jm = ContainerJobManager::new(ContainerJobConfig::default(), token_store.clone());
let state = OrchestratorState {
llm: Arc::new(StubLlm::default()),
job_manager: Arc::new(jm),
token_store: token_store.clone(),
job_event_tx: Some(tx),
prompt_queue: Arc::new(Mutex::new(HashMap::new())),
store: None,
secrets_store: None,
user_id: "default".to_string(),
job_owner_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
};
let job_id = Uuid::new_v4();
let token = token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let payload = serde_json::json!({
"event_type": "message",
"data": {
"role": "assistant",
"content": "Hello from worker"
}
});
let req = Request::builder()
.method("POST")
.uri(format!("/worker/{}/event", job_id))
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&payload).unwrap()))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let (recv_id, recv_uid, event) = rx.recv().await.unwrap();
assert_eq!(recv_id, job_id);
assert_eq!(recv_uid, "");
match event {
AppEvent::JobMessage {
job_id: jid,
role,
content,
} => {
assert_eq!(jid, job_id.to_string());
assert_eq!(role, "assistant");
assert_eq!(content, "Hello from worker");
}
other => panic!("Expected JobMessage, got {:?}", other),
}
}
#[tokio::test]
async fn job_event_handles_tool_use() {
let (tx, mut rx) = broadcast::channel(16);
let token_store = TokenStore::new();
let jm = ContainerJobManager::new(ContainerJobConfig::default(), token_store.clone());
let state = OrchestratorState {
llm: Arc::new(StubLlm::default()),
job_manager: Arc::new(jm),
token_store: token_store.clone(),
job_event_tx: Some(tx),
prompt_queue: Arc::new(Mutex::new(HashMap::new())),
store: None,
secrets_store: None,
user_id: "default".to_string(),
job_owner_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
};
let job_id = Uuid::new_v4();
let token = token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let payload = serde_json::json!({
"event_type": "tool_use",
"data": {
"tool_name": "shell",
"input": {"command": "ls"}
}
});
let req = Request::builder()
.method("POST")
.uri(format!("/worker/{}/event", job_id))
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&payload).unwrap()))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let (_recv_id, _recv_uid, event) = rx.recv().await.unwrap();
match event {
AppEvent::JobToolUse { tool_name, .. } => {
assert_eq!(tool_name, "shell");
}
other => panic!("Expected JobToolUse, got {:?}", other),
}
}
#[tokio::test]
async fn job_event_handles_unknown_type() {
let (tx, mut rx) = broadcast::channel(16);
let token_store = TokenStore::new();
let jm = ContainerJobManager::new(ContainerJobConfig::default(), token_store.clone());
let state = OrchestratorState {
llm: Arc::new(StubLlm::default()),
job_manager: Arc::new(jm),
token_store: token_store.clone(),
job_event_tx: Some(tx),
prompt_queue: Arc::new(Mutex::new(HashMap::new())),
store: None,
secrets_store: None,
user_id: "default".to_string(),
job_owner_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
};
let job_id = Uuid::new_v4();
let token = token_store.create_token(job_id).await;
let router = OrchestratorApi::router(state);
let payload = serde_json::json!({
"event_type": "custom_thing",
"data": { "message": "something custom" }
});
let req = Request::builder()
.method("POST")
.uri(format!("/worker/{}/event", job_id))
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&payload).unwrap()))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let (_recv_id, _recv_uid, event) = rx.recv().await.unwrap();
assert!(matches!(event, AppEvent::JobStatus { .. }));
}
#[tokio::test]
async fn report_status_updates_handle() {
let state = test_state();
let job_id = Uuid::new_v4();
let token = state.token_store.create_token(job_id).await;
{
let mut containers = state.job_manager.containers.write().await;
containers.insert(
job_id,
crate::orchestrator::job_manager::ContainerHandle {
job_id,
container_id: "test-container".to_string(),
state: crate::orchestrator::job_manager::ContainerState::Running,
mode: crate::orchestrator::job_manager::JobMode::Worker,
created_at: chrono::Utc::now(),
project_dir: None,
task_description: "test".to_string(),
last_worker_status: None,
worker_iteration: 0,
completion_result: None,
},
);
}
let jm = Arc::clone(&state.job_manager);
let router = OrchestratorApi::router(state);
let update = serde_json::json!({
"state": "in_progress",
"message": "Iteration 5",
"iteration": 5
});
let req = Request::builder()
.method("POST")
.uri(format!("/worker/{}/status", job_id))
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&update).unwrap()))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let handle = jm.get_handle(job_id).await.unwrap();
assert_eq!(handle.worker_iteration, 5);
assert_eq!(handle.last_worker_status.as_deref(), Some("Iteration 5"));
}
}