Skip to main content

mlua_swarm_server/
data.rs

1//! HTTP `/v1/data/*` endpoints (v9 Data path, for Big Response handling).
2//!
3//! Thin HTTP wrapper that lets SubAgents push Big Responses (4k-token-scale
4//! bodies / intermediate artifacts / file paths) **directly to the Store owner,
5//! bypassing the MainAgent**. The Store implements
6//! `mlua_swarm::store::output::OutputStore` (default =
7//! `InMemoryOutputStore`). This module never touches the Engine core or the
8//! Domain path (`/v1/worker/result` / `submit_output` / `output_tail` /
9//! dispatch verdict) — it is the boundary point that physically wires the
10//! Data / Domain separation axis. For the canonical narrative, see
11//! `mlua_swarm::store::output` module docs.
12//!
13//! ## Routes
14//!
15//! - `POST /v1/data/emit` — body `{task_id, attempt, producer_agent, event, parent_refs?}`
16//!   → calls `OutputStore.append` and returns `{out_id}`. The MainAgent only
17//!   needs to receive an out_id ref (avoids context bloat).
18//! - `POST /v1/data/:name` — same body **minus `producer_agent`** (the path
19//!   segment is the producer name); the write-side twin of name addressing.
20//! - `GET /v1/data/:key` — `key` is an `out_id` (`out-<10hex>`) or an
21//!   `out_name` (producer agent name → latest emit). Id lookup first, name
22//!   fallback. Used by the next Spawn's `$IN_REFS` to fetch.
23//!
24//! ## Auth (single-mouth contract)
25//!
26//! Every emit requires a worker `CapToken`, carried either as
27//! `Authorization: Bearer <token>` or `?token=<token>` (same token material —
28//! the transport is the caller's choice). The token passes the
29//! `Verb::EmitOutput` gate and is verified against the body's `task_id`.
30//! The former split surface (`/v1/data/emit` unauthenticated +
31//! `/v1/data/emit-auth` Bearer) was collapsed into this single mouth
32//! (the emit-auth API consolidation): expressing auth as endpoint forks multiplies the API
33//! surface without adding capability. How far to tighten GET (and token
34//! scoping in general) is deferred to the security-hardening pass after
35//! dogfooding.
36
37use axum::{
38    extract::{Path, Query, State},
39    http::{header::AUTHORIZATION, HeaderMap},
40    Json,
41};
42use mlua_swarm::store::output::{OutputEvent, OutputRecord, OutputRef};
43use mlua_swarm::{types::Verb, CapToken, TaskId};
44use serde::{Deserialize, Serialize};
45
46use crate::{ApiError, AppState};
47
48/// Input for `POST /v1/data/emit`.
49#[derive(Debug, Deserialize)]
50pub struct DataEmitReq {
51    /// Producing task.
52    pub task_id: String,
53    /// Attempt number.
54    pub attempt: u32,
55    /// Producer agent name.
56    pub producer_agent: String,
57    /// Event body (`Progress` / `Partial` / `Artifact` / `Final`).
58    pub event: OutputEvent,
59    /// Refs to upstream outputs (= chain, list of ids received via handoff). May be empty.
60    #[serde(default)]
61    pub parent_refs: Vec<OutputRef>,
62}
63
64/// Input for `POST /v1/data/:name` (name addressing — `producer_agent` comes
65/// from the path segment, not the body).
66#[derive(Debug, Deserialize)]
67pub struct DataEmitNamedReq {
68    /// Producing task.
69    pub task_id: String,
70    /// Attempt number.
71    pub attempt: u32,
72    /// Event body (`Progress` / `Partial` / `Artifact` / `Final`).
73    pub event: OutputEvent,
74    /// Refs to upstream outputs. May be empty.
75    #[serde(default)]
76    pub parent_refs: Vec<OutputRef>,
77}
78
79/// Response for the emit endpoints.
80#[derive(Debug, Serialize)]
81pub struct DataEmitResp {
82    /// Assigned ref. The caller (MainAgent) forwards this into the next Spawn's `$IN_REFS`.
83    pub out_id: OutputRef,
84}
85
86/// Auth-carrying query params (`?token=` — the header-less twin of Bearer).
87#[derive(Debug, Deserialize)]
88pub struct TokenQuery {
89    /// Encoded worker `CapToken`. Same material as the Bearer form.
90    pub token: Option<String>,
91}
92
93/// Handler for `POST /v1/data/emit` (single mouth, auth required).
94pub async fn data_emit(
95    State(state): State<AppState>,
96    Query(q): Query<TokenQuery>,
97    headers: HeaderMap,
98    Json(req): Json<DataEmitReq>,
99) -> Result<Json<DataEmitResp>, ApiError> {
100    emit_inner(&state, &headers, q.token.as_deref(), req).await
101}
102
103/// Handler for `POST /v1/data/:name` (name addressing, auth required).
104///
105/// The static `/v1/data/emit` route shadows this for the literal segment
106/// `emit`, so `emit` is effectively a reserved producer name.
107pub async fn data_emit_named(
108    State(state): State<AppState>,
109    Path(name): Path<String>,
110    Query(q): Query<TokenQuery>,
111    headers: HeaderMap,
112    Json(req): Json<DataEmitNamedReq>,
113) -> Result<Json<DataEmitResp>, ApiError> {
114    let req = DataEmitReq {
115        task_id: req.task_id,
116        attempt: req.attempt,
117        producer_agent: name,
118        event: req.event,
119        parent_refs: req.parent_refs,
120    };
121    emit_inner(&state, &headers, q.token.as_deref(), req).await
122}
123
124async fn emit_inner(
125    state: &AppState,
126    headers: &HeaderMap,
127    query_token: Option<&str>,
128    req: DataEmitReq,
129) -> Result<Json<DataEmitResp>, ApiError> {
130    let token = extract_captoken(headers, query_token)?;
131    let tid = TaskId(req.task_id.clone());
132    state
133        .engine
134        .verify_token_for_task(&token, Verb::EmitOutput, &tid)
135        .await
136        .map_err(|e| ApiError::engine(format!("data_emit verify: {e}")))?;
137    let out_id = state
138        .data_store
139        .append(
140            &req.task_id,
141            req.attempt,
142            &req.producer_agent,
143            req.event,
144            req.parent_refs,
145        )
146        .await
147        .map_err(|e| ApiError::engine(format!("data_emit: {e}")))?;
148    Ok(Json(DataEmitResp { out_id }))
149}
150
151/// Pull the worker `CapToken` from `Authorization: Bearer <t>` or `?token=<t>`
152/// (header wins when both are present — it is the more deliberate form).
153fn extract_captoken(headers: &HeaderMap, query_token: Option<&str>) -> Result<CapToken, ApiError> {
154    let encoded: &str = if let Some(v) = headers.get(AUTHORIZATION) {
155        v.to_str()
156            .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?
157            .strip_prefix("Bearer ")
158            .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <token>'".into()))?
159            .trim()
160    } else if let Some(t) = query_token {
161        t.trim()
162    } else {
163        return Err(ApiError::bad_request(
164            "missing token: pass 'Authorization: Bearer <token>' or '?token=<token>'".into(),
165        ));
166    };
167    if encoded.is_empty() {
168        return Err(ApiError::bad_request("token is empty".into()));
169    }
170    CapToken::decode(encoded).map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))
171}
172
173/// Handler for `GET /v1/data/:key` (`key` = out_id, falling back to out_name).
174pub async fn data_get(
175    State(state): State<AppState>,
176    Path(key): Path<String>,
177) -> Result<Json<OutputRecord>, ApiError> {
178    use mlua_swarm::store::output::OutputStoreError;
179    let id = OutputRef(key.clone());
180    match state.data_store.get(&id).await {
181        Ok(record) => Ok(Json(record)),
182        Err(OutputStoreError::NotFound(_)) => {
183            let record = state
184                .data_store
185                .get_latest_by_name(&key)
186                .await
187                .map_err(|e| match e {
188                    OutputStoreError::NotFound(k) => {
189                        ApiError::not_found(format!("output not found (id nor name): {k}"))
190                    }
191                    other => ApiError::engine(format!("data_get by name: {other}")),
192                })?;
193            Ok(Json(record))
194        }
195        Err(other) => Err(ApiError::engine(format!("data_get: {other}"))),
196    }
197}