mlua_swarm_server/data.rs
1//! HTTP `/v1/data/*` endpoints (v9 Data path, for Big Response handling).
2//!
3//! Thin HTTP wrapper that lets SubAgents push Big Responses (4k-token-scale
4//! bodies / intermediate artifacts / file paths) **directly to the Store owner,
5//! bypassing the MainAgent**. The Store implements
6//! `mlua_swarm::store::output::OutputStore` (default =
7//! `InMemoryOutputStore`). This module never touches the Engine core or the
8//! Domain path (`/v1/worker/result` / `submit_output` / `output_tail` /
9//! dispatch verdict) — it is the boundary point that physically wires the
10//! Data / Domain separation axis. For the canonical narrative, see
11//! `mlua_swarm::store::output` module docs.
12//!
13//! ## Routes
14//!
15//! - `POST /v1/data/emit` — body `{task_id, attempt, producer_agent, event, parent_refs?}`
16//! → calls `OutputStore.append` and returns `{out_id}`. The MainAgent only
17//! needs to receive an out_id ref (avoids context bloat).
18//! - `POST /v1/data/:name` — same body **minus `producer_agent`** (the path
19//! segment is the producer name); the write-side twin of name addressing.
20//! - `GET /v1/data/:key` — `key` is an `out_id` (`out-<10hex>`) or an
21//! `out_name` (producer agent name → latest emit). Id lookup first, name
22//! fallback. Used by the next Spawn's `$IN_REFS` to fetch.
23//!
24//! ## Auth (single-mouth contract)
25//!
26//! Every emit requires a worker `CapToken`, carried either as
27//! `Authorization: Bearer <token>` or `?token=<token>` (same token material —
28//! the transport is the caller's choice). The token passes the
29//! `Verb::EmitOutput` gate and is verified against the body's `task_id`.
30//! The former split surface (`/v1/data/emit` unauthenticated +
31//! `/v1/data/emit-auth` Bearer) was collapsed into this single mouth
32//! (the emit-auth API consolidation): expressing auth as endpoint forks multiplies the API
33//! surface without adding capability. How far to tighten GET (and token
34//! scoping in general) is deferred to the security-hardening pass after
35//! dogfooding.
36
37use axum::{
38 extract::{Path, Query, State},
39 http::{header::AUTHORIZATION, HeaderMap},
40 Json,
41};
42use mlua_swarm::store::output::{OutputEvent, OutputRecord, OutputRef};
43use mlua_swarm::{types::Verb, CapToken, TaskId};
44use serde::{Deserialize, Serialize};
45
46use crate::{ApiError, AppState};
47
48/// Input for `POST /v1/data/emit`.
49#[derive(Debug, Deserialize)]
50pub struct DataEmitReq {
51 /// Producing task.
52 pub task_id: String,
53 /// Attempt number.
54 pub attempt: u32,
55 /// Producer agent name.
56 pub producer_agent: String,
57 /// Event body (`Progress` / `Partial` / `Artifact` / `Final`).
58 pub event: OutputEvent,
59 /// Refs to upstream outputs (= chain, list of ids received via handoff). May be empty.
60 #[serde(default)]
61 pub parent_refs: Vec<OutputRef>,
62}
63
64/// Input for `POST /v1/data/:name` (name addressing — `producer_agent` comes
65/// from the path segment, not the body).
66#[derive(Debug, Deserialize)]
67pub struct DataEmitNamedReq {
68 /// Producing task.
69 pub task_id: String,
70 /// Attempt number.
71 pub attempt: u32,
72 /// Event body (`Progress` / `Partial` / `Artifact` / `Final`).
73 pub event: OutputEvent,
74 /// Refs to upstream outputs. May be empty.
75 #[serde(default)]
76 pub parent_refs: Vec<OutputRef>,
77}
78
79/// Response for the emit endpoints.
80#[derive(Debug, Serialize)]
81pub struct DataEmitResp {
82 /// Assigned ref. The caller (MainAgent) forwards this into the next Spawn's `$IN_REFS`.
83 pub out_id: OutputRef,
84}
85
86/// Auth-carrying query params (`?token=` — the header-less twin of Bearer).
87#[derive(Debug, Deserialize)]
88pub struct TokenQuery {
89 /// Encoded worker `CapToken`. Same material as the Bearer form.
90 pub token: Option<String>,
91}
92
93/// Handler for `POST /v1/data/emit` (single mouth, auth required).
94pub async fn data_emit(
95 State(state): State<AppState>,
96 Query(q): Query<TokenQuery>,
97 headers: HeaderMap,
98 Json(req): Json<DataEmitReq>,
99) -> Result<Json<DataEmitResp>, ApiError> {
100 emit_inner(&state, &headers, q.token.as_deref(), req).await
101}
102
103/// Handler for `POST /v1/data/:name` (name addressing, auth required).
104///
105/// The static `/v1/data/emit` route shadows this for the literal segment
106/// `emit`, so `emit` is effectively a reserved producer name.
107pub async fn data_emit_named(
108 State(state): State<AppState>,
109 Path(name): Path<String>,
110 Query(q): Query<TokenQuery>,
111 headers: HeaderMap,
112 Json(req): Json<DataEmitNamedReq>,
113) -> Result<Json<DataEmitResp>, ApiError> {
114 let req = DataEmitReq {
115 task_id: req.task_id,
116 attempt: req.attempt,
117 producer_agent: name,
118 event: req.event,
119 parent_refs: req.parent_refs,
120 };
121 emit_inner(&state, &headers, q.token.as_deref(), req).await
122}
123
124async fn emit_inner(
125 state: &AppState,
126 headers: &HeaderMap,
127 query_token: Option<&str>,
128 req: DataEmitReq,
129) -> Result<Json<DataEmitResp>, ApiError> {
130 let token = extract_captoken(headers, query_token)?;
131 let tid = TaskId(req.task_id.clone());
132 state
133 .engine
134 .verify_token_for_task(&token, Verb::EmitOutput, &tid)
135 .await
136 .map_err(|e| ApiError::engine(format!("data_emit verify: {e}")))?;
137 let out_id = state
138 .data_store
139 .append(
140 &req.task_id,
141 req.attempt,
142 &req.producer_agent,
143 req.event,
144 req.parent_refs,
145 )
146 .await
147 .map_err(|e| ApiError::engine(format!("data_emit: {e}")))?;
148 Ok(Json(DataEmitResp { out_id }))
149}
150
151/// Pull the worker `CapToken` from `Authorization: Bearer <t>` or `?token=<t>`
152/// (header wins when both are present — it is the more deliberate form).
153fn extract_captoken(headers: &HeaderMap, query_token: Option<&str>) -> Result<CapToken, ApiError> {
154 let encoded: &str = if let Some(v) = headers.get(AUTHORIZATION) {
155 v.to_str()
156 .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?
157 .strip_prefix("Bearer ")
158 .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <token>'".into()))?
159 .trim()
160 } else if let Some(t) = query_token {
161 t.trim()
162 } else {
163 return Err(ApiError::bad_request(
164 "missing token: pass 'Authorization: Bearer <token>' or '?token=<token>'".into(),
165 ));
166 };
167 if encoded.is_empty() {
168 return Err(ApiError::bad_request("token is empty".into()));
169 }
170 CapToken::decode(encoded).map_err(|e| ApiError::bad_request(format!("invalid token: {e}")))
171}
172
173/// Handler for `GET /v1/data/:key` (`key` = out_id, falling back to out_name).
174pub async fn data_get(
175 State(state): State<AppState>,
176 Path(key): Path<String>,
177) -> Result<Json<OutputRecord>, ApiError> {
178 use mlua_swarm::store::output::OutputStoreError;
179 let id = OutputRef(key.clone());
180 match state.data_store.get(&id).await {
181 Ok(record) => Ok(Json(record)),
182 Err(OutputStoreError::NotFound(_)) => {
183 let record = state
184 .data_store
185 .get_latest_by_name(&key)
186 .await
187 .map_err(|e| match e {
188 OutputStoreError::NotFound(k) => {
189 ApiError::not_found(format!("output not found (id nor name): {k}"))
190 }
191 other => ApiError::engine(format!("data_get by name: {other}")),
192 })?;
193 Ok(Json(record))
194 }
195 Err(other) => Err(ApiError::engine(format!("data_get: {other}"))),
196 }
197}