Skip to main content

mlua_swarm_server/
worker.rs

1//! HTTP `/v1/worker/*` endpoints (SubAgent self-fetch path).
2//!
3//! # 7-Entry pointer #6 (Output Event design)
4//!
5//! **This endpoint accesses `OutputStore` directly and does NOT go through the engine.**
6//! It is one of the seven entry points enumerated in project `CLAUDE.md` §"Output Event
7//! Design SoT". For the canonical description, see the crate root doc of
8//! `mlua-swarm-output-store` (`cargo doc -p mlua-swarm-output-store`).
9//!
10//! # Path
11//!
12//! A thin-payload path where a SubAgent (= worker process launched by a MainAI) uses
13//! the capability token it received via WS Spawn to self-fetch its prompt and
14//! submit its result — putting the token in `Authorization: Bearer <encoded CapToken>`.
15//!
16//! ## Routes
17//!
18//! - `GET /v1/worker/prompt?task_id=<tid>` — via `engine.fetch_worker_payload`,
19//!   returns `{task_id, attempt, agent, system?, prompt}`.
20//! - `POST /v1/worker/result` with body `{task_id, value, ok}` — appends one `Final`
21//!   to the output tail via `engine.submit_output(Final)` (= the canonical path
22//!   through which the dispatch layer decides Pass/Blocked) and updates
23//!   `task.last_result` via `engine.post_result`.
24//!
25//! ## Bearer authentication
26//!
27//! The Bearer value is the string produced by `CapToken::encode()` (= URL-safe
28//! base64 of serde_json). The server decodes it with `CapToken::decode` and then,
29//! inside the engine, verifies HMAC sig + role × verb gate + TTL via
30//! `verify_token_for_task` (= self-contained capability token; no server-side
31//! store lookup required).
32//!
33//! Tokens are minted during the "2) mint outside the lock" phase of
34//! `engine.dispatch_attempt` (`Role::Worker`, 600s TTL, `scopes=["*"]`).
35//! The verb gate covers `FetchPrompt` / `EmitOutput` / `PostResult` — the worker
36//! leaf capability set (`crate::types::WORKER_LEAF_VERBS`).
37
38use axum::{
39    extract::{Query, State},
40    http::{header::AUTHORIZATION, HeaderMap, StatusCode},
41    Json,
42};
43use mlua_swarm::{CapToken, ContentRef, OutputEvent, TaskId, WorkerPayload};
44use serde::Deserialize;
45use serde_json::Value;
46
47use crate::{ApiError, AppState};
48
49/// Query params for `GET /v1/worker/prompt`.
50#[derive(Debug, Deserialize)]
51pub struct PromptQuery {
52    /// Task the fetched prompt belongs to; cross-checked against the Bearer handle/token.
53    pub task_id: String,
54}
55
56/// `GET /v1/worker/prompt?task_id=<tid>`. Bearer = encoded `CapToken` or short `wh-` handle.
57/// Thin HTTP wrapper over `engine.fetch_worker_payload` / `fetch_worker_payload_trusted`.
58/// Short-handle path (recommended for SubAgents): handle → task_id
59/// cross-check → trusted fetch.
60/// Full-`CapToken` path: token decode → verify → fetch.
61pub async fn worker_prompt(
62    State(state): State<AppState>,
63    headers: HeaderMap,
64    Query(q): Query<PromptQuery>,
65) -> Result<Json<WorkerPayload>, ApiError> {
66    let task_id = TaskId(q.task_id.clone());
67    let bearer = extract_bearer_raw(&headers)?;
68    let payload = if let Some(handle) = parse_worker_handle(&bearer) {
69        // Short-handle path: verify handle → task_id (security: confirm the handle is bound to this task).
70        let resolved = state
71            .engine
72            .task_id_from_handle(handle)
73            .await
74            .map_err(|e| ApiError::engine(format!("task_id_from_handle: {e}")))?;
75        if resolved != task_id {
76            return Err(ApiError::bad_request(format!(
77                "handle {handle} is bound to task {}, not {}",
78                resolved.0, task_id.0
79            )));
80        }
81        state
82            .engine
83            .fetch_worker_payload_trusted(&task_id)
84            .await
85            .map_err(|e| ApiError::engine(format!("fetch_worker_payload_trusted: {e}")))?
86    } else {
87        // Full CapToken path (the alternate Bearer form).
88        let token = CapToken::decode(bearer.trim())
89            .map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))?;
90        state
91            .engine
92            .fetch_worker_payload(&token, &task_id)
93            .await
94            .map_err(|e| ApiError::engine(format!("fetch_worker_payload: {e}")))?
95    };
96    Ok(Json(payload))
97}
98
99/// Body for `POST /v1/worker/result`.
100#[derive(Debug, Deserialize)]
101pub struct WorkerResultReq {
102    /// Task this result belongs to (looked up together with the Bearer token).
103    pub task_id: String,
104    /// `WorkerResult.value` (= the value returned by the Operator: LLM inference result or tool execution result).
105    pub value: Value,
106    /// `WorkerResult.ok`. `false` makes the dispatch path decide Blocked
107    /// (= same semantics as `OutputEvent::Final { ok: false, .. }` from a
108    /// `SpawnerAdapter`). Defaults to `true`.
109    #[serde(default = "default_ok_true")]
110    pub ok: bool,
111    /// Optional explicit attempt. Normally omitted (= the server looks up `task.attempt`).
112    /// A carry for race-condition tests that need to write to a fixed attempt.
113    #[serde(default)]
114    pub attempt: Option<u32>,
115}
116
117fn default_ok_true() -> bool {
118    true
119}
120
121/// `POST /v1/worker/result`. Bearer = encoded `CapToken`.
122/// Fires `engine.submit_output(Final)` + `engine.post_result`.
123pub async fn worker_result(
124    State(state): State<AppState>,
125    headers: HeaderMap,
126    Json(req): Json<WorkerResultReq>,
127) -> Result<StatusCode, ApiError> {
128    let token = decode_worker_bearer(&headers)?;
129    let task_id = TaskId(req.task_id);
130
131    // Use body-explicit attempt if provided; otherwise the current task.attempt.
132    let attempt = match req.attempt {
133        Some(n) => n,
134        None => state
135            .engine
136            .task_attempt(&task_id)
137            .await
138            .map_err(|e| ApiError::engine(format!("task_attempt: {e}")))?,
139    };
140
141    let event = OutputEvent::Final {
142        content: ContentRef::Inline {
143            value: req.value.clone(),
144        },
145        ok: req.ok,
146    };
147    state
148        .engine
149        .submit_output(&token, &task_id, attempt, event)
150        .await
151        .map_err(|e| ApiError::engine(format!("submit_output: {e}")))?;
152    state
153        .engine
154        .post_result(&token, &task_id, req.value)
155        .await
156        .map_err(|e| ApiError::engine(format!("post_result: {e}")))?;
157    Ok(StatusCode::NO_CONTENT)
158}
159
160/// `POST /v1/worker/submit`. Bearer = encoded `CapToken`. Body = raw text/octet.
161///
162/// Simplification-axis endpoint for SubAgents. Removes the JSON construction,
163/// duplicated `task_id`, and JSON-escape burden of `/v1/worker/result` — the
164/// worker completes a POST with just token + raw body. Origin: the recent clean-up
165/// of the SubAgent contract drift (fewer IDs to pass around, multi-line escape
166/// accidents eliminated).
167///
168/// Behavior:
169/// - `task_id` is auto-looked-up server-side from the token (already bound to the `CapToken`).
170/// - Body raw bytes go as-is into `Value::String` for `submit_output` + `post_result`.
171/// - `ok=true` fixed (= the submit endpoint is success-path only). For the error
172///   path, use `/v1/worker/result` with an explicit `ok=false`.
173#[derive(Debug, Deserialize, Default)]
174pub struct SubmitQuery {
175    /// Optional. `ok=false` signals failure (= `DispatchOutcome::Blocked`, caught
176    /// by the flow.ir Try path). Unspecified (`None`) is treated as `ok=true`
177    /// (= normal success).
178    #[serde(default)]
179    pub ok: Option<bool>,
180}
181
182/// `POST /v1/worker/submit`. Simplified counterpart of [`worker_result`]:
183/// the caller sends only the raw result body, `task_id` is resolved
184/// server-side from the Bearer handle/token, and `ok` defaults to `true`
185/// unless overridden via [`SubmitQuery::ok`]. See the module doc for the
186/// short-handle vs full-`CapToken` Bearer forms.
187pub async fn worker_submit(
188    State(state): State<AppState>,
189    headers: HeaderMap,
190    Query(q): Query<SubmitQuery>,
191    body: axum::body::Bytes,
192) -> Result<StatusCode, ApiError> {
193    // Bearer accepts either (a) `wh-<8 hex>` short handle (recommended for
194    // SubAgents) or (b) base64-wrapped CapToken JSON (the full-token form).
195    let bearer = extract_bearer_raw(&headers)?;
196    let task_id = if let Some(handle) = parse_worker_handle(&bearer) {
197        state
198            .engine
199            .task_id_from_handle(handle)
200            .await
201            .map_err(|e| ApiError::engine(format!("task_id_from_handle: {e}")))?
202    } else {
203        let token = CapToken::decode(bearer.trim())
204            .map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))?;
205        state
206            .engine
207            .task_id_from_token(&token)
208            .await
209            .map_err(|e| ApiError::engine(format!("task_id_from_token: {e}")))?
210    };
211    let attempt = state
212        .engine
213        .task_attempt(&task_id)
214        .await
215        .map_err(|e| ApiError::engine(format!("task_attempt: {e}")))?;
216    // Strip trailing whitespace (newlines, etc.) so flow.ir `Eq` string matches
217    // don't drift on `"BLOCKED\n" == "BLOCKED"` false results. Origin: the recent clean-up
218    // verdict_loop smoke — sharp-edge removal. Internal `\n` inside the raw bytes
219    // is preserved (= only trailing).
220    let body_str = String::from_utf8_lossy(&body).trim_end().to_string();
221    let value = Value::String(body_str);
222
223    // The handle path = trusted internal API (= the server-minted handle is validated
224    // by the earlier lookup); the full-token path = existing verify-by-token API.
225    // Both are reflected identically into final + last_result.
226    // `?ok=false` in the query signals failure (= `DispatchOutcome::Blocked`,
227    // the flow.ir Try catch path).
228    let ok = q.ok.unwrap_or(true);
229    state
230        .engine
231        .submit_worker_result_trusted(&task_id, attempt, value, ok)
232        .await
233        .map_err(|e| ApiError::engine(format!("submit_worker_result_trusted: {e}")))?;
234    Ok(StatusCode::NO_CONTENT)
235}
236
237/// Extracts the raw string from the `Authorization` header (= strips the `Bearer `
238/// prefix). To let `worker_submit` accept both short handles and full tokens, we
239/// fetch the raw value before any decode.
240fn extract_bearer_raw(headers: &HeaderMap) -> Result<String, ApiError> {
241    let v = headers
242        .get(AUTHORIZATION)
243        .ok_or_else(|| ApiError::bad_request("missing Authorization header".into()))?
244        .to_str()
245        .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?;
246    let s = v
247        .strip_prefix("Bearer ")
248        .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <token>'".into()))?
249        .trim();
250    if s.is_empty() {
251        return Err(ApiError::bad_request("Bearer is empty".into()));
252    }
253    Ok(s.to_string())
254}
255
256/// Decides whether the Bearer is a short handle (`wh-XXXXXXXX`). Returns
257/// `Some(handle)` on a match, `None` otherwise (= caller proceeds to try decoding
258/// as full `CapToken` JSON).
259fn parse_worker_handle(s: &str) -> Option<&str> {
260    let s = s.trim();
261    if s.starts_with("wh-")
262        && s.len() >= 5
263        && s.len() <= 64
264        && s[3..].chars().all(|c| c.is_ascii_alphanumeric())
265    {
266        Some(s)
267    } else {
268        None
269    }
270}
271
272/// Decodes an encoded `CapToken` from `Authorization: Bearer <encoded CapToken>`.
273/// Kept separate from `extract_bearer` (sid-only) — kept as a distinct fn so
274/// that sid strings and encoded tokens are not confused, distinguishing them by type.
275fn decode_worker_bearer(headers: &HeaderMap) -> Result<CapToken, ApiError> {
276    let v = headers
277        .get(AUTHORIZATION)
278        .ok_or_else(|| ApiError::bad_request("missing Authorization header".into()))?
279        .to_str()
280        .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?;
281    let encoded = v
282        .strip_prefix("Bearer ")
283        .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <token>'".into()))?
284        .trim();
285    if encoded.is_empty() {
286        return Err(ApiError::bad_request("Bearer token is empty".into()));
287    }
288    CapToken::decode(encoded).map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))
289}