mlua_swarm_server/lib.rs
1//! the server lib: axum Router + handler set. Split out as a library so it can
2//! be used from both `main.rs` (CLI) and integration tests.
3//!
4//! # Endpoints
5//!
6//! - `GET /v1/healthz`
7//! - `POST /v1/sessions` / `DELETE /v1/sessions` (= operator attach / detach, Bearer sid)
8//! - `POST /v1/tasks` (= unified Flow-form entry, Operator inject supported;
9//! `operator_sid` explicitly pins the task to a registered Operator session, S2)
10//! - `POST /v1/operators` / `GET /v1/operators/:sid` / `DELETE /v1/operators/:sid` /
11//! `GET /v1/operators/:sid/ws` (WS upgrade) — REST-like Operator login flow,
12//! Bearer-mandatory; the sole WS Operator session route. See `operator_ws::login`
13//! module doc.
14//!
15//! The Enhance issue axis (`/issues`) lives in the `issues` module; callers merge
16//! `build_issues_router` to integrate it into the same server.
17//!
18//! # The 3 faces of the Operator role (= registered directly on the engine SoT)
19//!
20//! The engine stateless-executor refactor removed the three
21//! `AppState` registries (former `HookRegistry` / `BridgeRegistry` / `OperatorRegistry`);
22//! all registration now goes directly to the engine SoT via
23//! `engine.register_spawn_hook` / `register_senior_bridge` / `register_operator`.
24//! `WSOperatorSession` (in the `operator_ws` module) registers all three traits
25//! simultaneously under a single sid — one WS connection covers all 3 faces of
26//! the Operator role, the canonical pattern.
27//!
28//! # `build_*` family
29//!
30//! - [`build_router`] — minimal entry (= `default_registry()`)
31//! - [`build_router_with`] — caller provides a `SpawnerRegistry` and optional `BlueprintStore`
32//!
33//! The engine should be started with [`default_layer_registry`] (= `Engine::new_with_layers`);
34//! otherwise `Blueprint.spawner_hints` is ignored.
35
36#![warn(missing_docs)]
37
38/// HTTP surface for inspecting/registering Blueprint state (`/v1/blueprints/*`).
39pub mod blueprints;
40/// Server config file support (`~/.mse/config.toml`, CLI > file > default merge).
41pub mod config;
42/// `/v1/data/*` endpoints (v9 Big Response handling, Store-owner direct path).
43pub mod data;
44/// `GET /v1/doctor` — read-only startup config / Store snapshot.
45pub mod doctor;
46/// HTTP surface for the `/v1/enhance/log` axis.
47pub mod enhance_log;
48/// `EnhanceSetting` HTTP CRUD (`/v1/enhance-settings*`).
49pub mod enhance_settings;
50/// HTTP surface for the Enhance issue axis (`/v1/issues*`).
51pub mod issues;
52/// WebSocket Operator Callback IF (`/v1/operators*`).
53pub mod operator_ws;
54/// `/v1/worker/*` endpoints (SubAgent self-fetch path).
55pub mod worker;
56pub use blueprints::{build_blueprints_router, build_blueprints_router_with_refs};
57pub use enhance_log::build_enhance_log_router;
58pub use enhance_settings::build_enhance_settings_router;
59pub use issues::{build_issues_router, GetIssueResponse, PostIssueRequest, PostIssueResponse};
60pub use operator_ws::{
61 operators_create, operators_delete, operators_info, operators_ws_connect, ClientMsg,
62 OperatorSessionEntry, ServerMsg, WSOperatorSession,
63};
64pub use worker::{worker_prompt, worker_result, PromptQuery, WorkerResultReq};
65
66use axum::{
67 extract::State,
68 http::{header::AUTHORIZATION, HeaderMap, StatusCode},
69 response::{IntoResponse, Response},
70 routing::{get, post},
71 Json, Router,
72};
73use mlua_swarm::application::{BlueprintRef, TaskApplication};
74use mlua_swarm::blueprint::store::BlueprintStore;
75use mlua_swarm::service::TaskLaunchService;
76use mlua_swarm::{
77 CapToken, Compiler, Engine, LayerRegistry, LuaInProcessSpawnerFactory, MainAIMiddleware,
78 OperatorDelegateMiddleware, OperatorSpawnerFactory, Role, RustFnInProcessSpawnerFactory,
79 SeniorEscalationMiddleware, SpawnerRegistry, SubprocessProcessSpawnerFactory,
80};
81use serde::{Deserialize, Serialize};
82use serde_json::{json, Value};
83use std::collections::HashMap;
84use std::sync::Arc;
85use std::time::Duration;
86use tokio::sync::Mutex;
87
88/// In-memory `sid → CapToken` map backing `/v1/sessions` attach/detach.
89#[derive(Default)]
90pub struct SessionStore {
91 /// Live session tokens keyed by `sid` (the token's `nonce`).
92 pub map: HashMap<String, CapToken>,
93}
94
95/// Shared axum handler state for the whole router. Cloned per-request (all
96/// fields are `Arc`/cheap-clone), constructed once in [`build_router_with_ws_factory`].
97#[derive(Clone)]
98pub struct AppState {
99 /// The engine SoT (attach/detach, dispatch, registries).
100 pub engine: Engine,
101 /// Live `/v1/sessions` attach records (Operator/Worker/etc session tokens).
102 pub sessions: Arc<Mutex<SessionStore>>,
103 /// Application used at the task entry to resolve `BlueprintRef`. Without a Store, runs in Inline-only mode.
104 pub task_app: Arc<TaskApplication>,
105 /// When `Some`, on WS connect a new `WSOperatorSession` is automatically registered
106 /// with this factory under the sid name (= a `kind=operator` + `operator_ref=<sid>` AgentDef
107 /// binds to the `WSOperatorSession` backend).
108 /// When `None`, no auto-registration happens; the session is only registered on
109 /// `engine.OperatorRegistry` (= only the `OperatorDelegateMiddleware` path is effective;
110 /// the `OperatorSpawnerFactory` path is dead).
111 pub ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
112 /// Owner of the Store on the Data path (Big Response handling). Added in v9.
113 /// Independent layer — the Engine core and the Domain path (`/v1/worker/result`)
114 /// are not involved.
115 /// Default = `InMemoryOutputStore` (constructed inside `build_router_with_ws_factory`);
116 /// callers can swap in an sqlite/fs backend later (future carry).
117 pub data_store: Arc<dyn mlua_swarm::store::output::OutputStore>,
118 /// Login-flow session store (`POST /v1/operators` mint records). `sid` →
119 /// `OperatorSessionEntry`. This is the sole session store for the WS
120 /// Operator role. See `operator_ws::login` module doc.
121 pub operator_sessions:
122 Arc<Mutex<HashMap<String, Arc<crate::operator_ws::login::OperatorSessionEntry>>>>,
123 /// S1 login-flow roles-exclusivity map. Role name → owning `sid`. Checked
124 /// (and updated) atomically under a single lock in
125 /// `operator_ws::login::operators_create` — a role already present here
126 /// causes `POST /v1/operators` to return `409 CONFLICT`. Entries are
127 /// released on `DELETE /v1/operators/:sid`.
128 pub roles_to_sid: Arc<Mutex<HashMap<String, String>>>,
129}
130
131/// Minimal entry point: builds a router with [`default_registry`] and no
132/// `BlueprintStore` (Inline-only mode) or `ws_operator_factory`.
133pub fn build_router(engine: Engine) -> Router {
134 build_router_with(engine, default_registry(), None)
135}
136
137/// Default `LayerRegistry` for the server. Hint keys:
138/// - `"main_ai"` → `MainAIMiddleware` (= fires SpawnHook before/after)
139/// - `"senior_escalation"` → `SeniorEscalationMiddleware` (= on `ok=false`, escalates via `SeniorBridge.ask`)
140/// - `"operator_delegate"` → `OperatorDelegateMiddleware` (= when an operator backend is registered, delegates the entire spawn)
141///
142/// Including any of these keys in `Blueprint.spawner_hints.layers` causes them to
143/// be wrapped into a `SpawnerStack` at `service::linker::link` time (= per-launch;
144/// the old `engine.bind` global-state path is retired).
145/// Callers (the engine builder side) receive it via
146/// `Engine::new_with_layers(cfg, mse_server::default_layer_registry())`.
147pub fn default_layer_registry() -> LayerRegistry {
148 LayerRegistry::new()
149 .with_hint("main_ai", |_engine| Arc::new(MainAIMiddleware::new()))
150 .with_hint("senior_escalation", |_engine| {
151 Arc::new(SeniorEscalationMiddleware::new())
152 })
153 .with_hint("operator_delegate", |_engine| {
154 Arc::new(OperatorDelegateMiddleware::new())
155 })
156}
157
158/// Build form where the caller supplies a registry and an optional `BlueprintStore`.
159/// The Operator callback path (= external HTTP / WS callers acting as an Operator)
160/// must be pre-registered via `engine.register_*` (= the engine is the SoT).
161/// See the `operator_ws` module doc and `OperatorInfo` (engine-side `ctx.rs`) for details.
162pub fn build_router_with(
163 engine: Engine,
164 registry: SpawnerRegistry,
165 store: Option<Arc<dyn BlueprintStore>>,
166) -> Router {
167 build_router_with_ws_factory(engine, registry, store, None)
168}
169
170/// 4-argument variant of `build_router_with`. Passing `ws_operator_factory = Some(arc)`
171/// causes each WS connect to auto-register a new `WSOperatorSession` under its sid
172/// name with the factory (= a `kind=operator` AgentDef with `operator_ref: <sid>`
173/// can then bind to the WS client backend). Callers are expected to also install
174/// the same `Arc` into the `SpawnerRegistry` via
175/// `reg.register::<OperatorSpawnerFactory>(arc.clone())`.
176pub fn build_router_with_ws_factory(
177 engine: Engine,
178 registry: SpawnerRegistry,
179 store: Option<Arc<dyn BlueprintStore>>,
180 ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
181) -> Router {
182 build_router_with_ws_factory_and_output(engine, registry, store, ws_operator_factory, None)
183}
184
185/// 5-argument variant of [`build_router_with_ws_factory`]. Passing
186/// `output_store = Some(arc)` swaps the default `InMemoryOutputStore` for a
187/// caller-supplied backend (a `SqliteOutputStore`, for instance). `None`
188/// preserves the historical behaviour (fresh in-memory store per call).
189pub fn build_router_with_ws_factory_and_output(
190 engine: Engine,
191 registry: SpawnerRegistry,
192 store: Option<Arc<dyn BlueprintStore>>,
193 ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
194 output_store: Option<Arc<dyn mlua_swarm::store::output::OutputStore>>,
195) -> Router {
196 let compiler = Compiler::new(registry);
197 let launch = Arc::new(TaskLaunchService::new(engine.clone(), compiler));
198 let task_app = Arc::new(match store {
199 Some(s) => TaskApplication::new(launch, s),
200 None => TaskApplication::new_inline_only(launch),
201 });
202 let data_store: Arc<dyn mlua_swarm::store::output::OutputStore> = match output_store {
203 Some(s) => s,
204 None => Arc::new(mlua_swarm::store::output::InMemoryOutputStore::new()),
205 };
206 let state = AppState {
207 engine,
208 sessions: Arc::new(Mutex::new(SessionStore::default())),
209 task_app,
210 ws_operator_factory,
211 data_store,
212 operator_sessions: Arc::new(Mutex::new(HashMap::new())),
213 roles_to_sid: Arc::new(Mutex::new(HashMap::new())),
214 };
215 Router::new()
216 .route("/v1/healthz", get(healthz))
217 // session = collection (POST = attach, DELETE = detach, sid via Authorization)
218 .route(
219 "/v1/sessions",
220 post(sessions_attach).delete(sessions_detach),
221 )
222 // task = flat, single level; authz resolved via Authorization: Bearer <sid>
223 .route("/v1/tasks", post(tasks_start))
224 // REST-like Operator login flow (Bearer-mandatory, roles exclusivity).
225 // Sole WS Operator session route; see `operator_ws::login` module doc.
226 .route("/v1/operators", post(operators_create))
227 .route("/v1/operators/:sid/ws", get(operators_ws_connect))
228 .route(
229 "/v1/operators/:sid",
230 get(operators_info).delete(operators_delete),
231 )
232 // SubAgent self-fetch path (the SubAgent self-fetch design). The SubAgent puts the
233 // CapToken handed over via WS Spawn into Bearer and hits the prompt / result
234 // endpoints directly over HTTP. See the `worker` module doc for details.
235 .route("/v1/worker/prompt", get(worker::worker_prompt))
236 .route("/v1/worker/result", post(worker::worker_result))
237 // Simplified endpoint (= worker POSTs with just token + raw body; task_id is auto-looked-up)
238 .route("/v1/worker/submit", post(worker::worker_submit))
239 // Data path (v9 Big Response handling, independent from Domain / verdict flow)
240 .route("/v1/data/emit", post(data::data_emit))
241 .route(
242 "/v1/data/:key",
243 get(data::data_get).post(data::data_emit_named),
244 )
245 .with_state(state)
246}
247
248/// Default registry = Subprocess + RustFn (baseline `identity` worker pre-baked) + empty Operator factory.
249///
250/// `RustFnInProcessSpawnerFactory` gets one baseline entry (`fn_id = "identity"`)
251/// baked in via [`mlua_swarm::worker::baseline::extend_with_baseline`]. This
252/// is the shared bootstrap / smoke worker SoT across each binary (the server / MCP adapter /
253/// one-shot runner) — it structurally replaces the old per-binary inline echo injection.
254///
255/// Usage: default Task path at server startup. If production needs additional
256/// backends, callers bring in a different registry via
257/// `build_router_with(engine, custom_registry)`. The enhance flow
258/// (= patch-spawner / patch-applier / verifier-router / committer axes) uses
259/// [`default_registry_with_enhance_flow`].
260///
261/// The Operator factory is an empty shell with zero registrations (= sids are
262/// dynamically registered per WS connect; see the `operator_ws` module).
263pub fn default_registry() -> SpawnerRegistry {
264 let rustfn_factory =
265 mlua_swarm::worker::baseline::extend_with_baseline(RustFnInProcessSpawnerFactory::new());
266
267 let mut reg = SpawnerRegistry::new();
268 reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(SubprocessProcessSpawnerFactory));
269 reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(rustfn_factory));
270 reg.register::<OperatorSpawnerFactory>(Arc::new(OperatorSpawnerFactory::new()));
271 reg
272}
273
274/// Opt-in registry that merges [`default_registry`] with the enhance flow
275/// (Lua factory + AgentBlock factory).
276///
277/// Selected via the `the server` CLI flag `--enable-enhance-flow`. The enhance
278/// flow is a separate-axis wrapper: the Lua factory (= 3 Lua workers + 3 primitive
279/// bridges) and the AgentBlock factory (= patch-spawner path, expects
280/// `assets/operator_scripts/blueprint_patch_spawner.lua` + `ANTHROPIC_API_KEY`)
281/// are baked in as pipeline defaults. The baseline RustFn (`identity`) is pre-baked
282/// the same way as in `default_registry`.
283pub fn default_registry_with_enhance_flow() -> SpawnerRegistry {
284 let lua_factory =
285 mlua_swarm::enhance::blueprint::extend_factory(LuaInProcessSpawnerFactory::new());
286 // The Factory is stateless (= 1 process → 1 factory shared by all AgentDefs).
287 // Per-agent specialization (script_path / project_root, etc.) goes through AgentDef.spec.
288 // The enhance-flow patch-spawner is declared literally in agents[].spec of `default_blueprint.yaml`.
289 let agent_block_factory =
290 mlua_swarm::worker::agent_block::AgentBlockInProcessSpawnerFactory::new();
291 let rustfn_factory =
292 mlua_swarm::worker::baseline::extend_with_baseline(RustFnInProcessSpawnerFactory::new());
293
294 let mut reg = SpawnerRegistry::new();
295 reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(SubprocessProcessSpawnerFactory));
296 reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(rustfn_factory));
297 reg.register::<LuaInProcessSpawnerFactory>(Arc::new(lua_factory));
298 reg.register::<mlua_swarm::worker::agent_block::AgentBlockInProcessSpawnerFactory>(Arc::new(
299 agent_block_factory,
300 ));
301 reg.register::<OperatorSpawnerFactory>(Arc::new(OperatorSpawnerFactory::new()));
302 reg
303}
304
305// ─── handlers ────────────────────────────────────────────────────────────
306
307async fn healthz() -> &'static str {
308 "ok"
309}
310
311#[derive(Deserialize)]
312struct AttachReq {
313 agent_id: String,
314 role: String,
315 ttl_secs: u64,
316}
317
318#[derive(Serialize)]
319struct AttachResp {
320 session_id: String,
321 role: String,
322}
323
324async fn sessions_attach(
325 State(state): State<AppState>,
326 Json(req): Json<AttachReq>,
327) -> Result<Json<AttachResp>, ApiError> {
328 let role = parse_role(&req.role)?;
329 let token = state
330 .engine
331 .attach(req.agent_id, role, Duration::from_secs(req.ttl_secs))
332 .await
333 .map_err(ApiError::engine)?;
334 let sid = token.nonce.clone();
335 state.sessions.lock().await.map.insert(sid.clone(), token);
336 Ok(Json(AttachResp {
337 session_id: sid,
338 role: req.role,
339 }))
340}
341
342async fn sessions_detach(
343 State(state): State<AppState>,
344 headers: HeaderMap,
345) -> Result<StatusCode, ApiError> {
346 let sid = extract_bearer(&headers)?;
347 let token = take_session_token(&state, &sid).await?;
348 state
349 .engine
350 .detach(&token)
351 .await
352 .map_err(ApiError::engine)?;
353 Ok(StatusCode::NO_CONTENT)
354}
355
356// ─── Unified /v1/tasks schema (= flow-eval path, Operator inject supported) ───────
357
358/// `/v1/tasks` POST schema. Uses the flow-eval path and supports Operator inject
359/// (kind / spawn_hook / senior_bridge). Expressing a one-shot task as a 1-Step
360/// Blueprint is the only correct model.
361#[derive(Deserialize)]
362struct FlowTasksReq {
363 blueprint: BlueprintRef,
364 init_ctx: Value,
365 /// TTL in seconds. When unspecified (`None`), falls back in this order:
366 /// (1) `metadata.default_run_ttl_secs` from the resolved BP,
367 /// (2) if absent, the server global `default_run_ttl()` (1800s).
368 #[serde(default)]
369 ttl_secs: Option<u64>,
370 #[serde(default)]
371 operator: Option<OperatorReq>,
372 /// Explicit Operator session sid (or role alias) this task's entire Spawn
373 /// stream should be routed to (runtime Operator match stage 1).
374 ///
375 /// When `Some`, it is validated at request time against
376 /// `state.engine.list_operator_ids()` (the live `engine.operators`
377 /// registry key set): an unknown/never-registered id returns `400`
378 /// immediately — this is a deliberate hard-fail, in contrast to
379 /// `OperatorDelegateWrapped::spawn`, which silently falls through to
380 /// `inner.spawn` on a registry miss. A sid that *was* registered but has
381 /// since disconnected (WS `tx` cleared, session entry retained for
382 /// reconnect) passes this check and surfaces as an explicit dispatch-time
383 /// error instead (`WSOperatorSession::send_and_await` returns `Err` when
384 /// `tx` is `None`), which also propagates as a request failure rather
385 /// than a silent fallback.
386 ///
387 /// On success this value **overrides** `operator.operator_backend_id`
388 /// (last-write-wins, `operator_sid` takes priority) before the flow is
389 /// dispatched — see `run_flow_form`. Dispatch still only delegates if the
390 /// Blueprint opts into `spawner_hints.layers = ["operator_delegate"]`
391 /// (unchanged precondition, same as the existing `operator_backend_id`
392 /// field).
393 ///
394 /// When unset, behavior is unchanged: whatever
395 /// `operator.operator_backend_id` / BP-level `operator_ref` alias
396 /// resolution already does still applies.
397 #[serde(default)]
398 operator_sid: Option<String>,
399}
400
401#[derive(Deserialize, Default)]
402struct OperatorReq {
403 /// `main_ai` / `automate` / `composite`. This is the "Runtime Global"
404 /// tier of the 4-tier `OperatorKind` cascade (see `mlua_swarm
405 /// ::ctx::collapse_operator_kind`); when unspecified, falls through to
406 /// the BP-level tiers (`OperatorDef.kind` / `Blueprint
407 /// .default_operator_kind`) instead of eagerly defaulting to `automate`.
408 #[serde(default)]
409 kind: Option<String>,
410 /// Operator id at attach time (= sessions tracking key in the EventLog); unspecified defaults to `"http-run"`.
411 #[serde(default)]
412 id: Option<String>,
413 /// Name of a hook pre-registered via `engine.register_spawn_hook`; `None` if unspecified.
414 #[serde(default)]
415 spawn_hook_id: Option<String>,
416 /// Name of a bridge pre-registered via `engine.register_senior_bridge`; `None` if unspecified.
417 #[serde(default)]
418 senior_bridge_id: Option<String>,
419 /// Name of an Operator backend pre-registered via `engine.register_operator`
420 /// (= the path that delegates the entire spawn to an external Operator);
421 /// `None` if unspecified. When `kind == MainAi/Composite` and this id is `Some`,
422 /// `OperatorDelegateMiddleware` bypasses `inner.spawn` and calls `operator.execute` instead.
423 /// This is a different axis from `operator.id` (= session tracking label);
424 /// `operator_backend_id` is the registry lookup key.
425 #[serde(default)]
426 operator_backend_id: Option<String>,
427 /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
428 /// cascade — per-agent override, keyed by `AgentDef.name`, value is
429 /// `main_ai` / `automate` / `composite` (same parsing as `kind`).
430 /// `None` / absent means no per-agent override.
431 #[serde(default)]
432 per_agent_kinds: Option<HashMap<String, String>>,
433}
434
435/// Parse a wire-level kind string (`"main_ai"` / `"automate"` / `"composite"`)
436/// into `OperatorKind`. Shared by `OperatorReq.kind` and
437/// `OperatorReq.per_agent_kinds` values.
438fn parse_operator_kind_str(s: &str) -> Result<mlua_swarm::OperatorKind, ApiError> {
439 use mlua_swarm::OperatorKind;
440 match s {
441 "main_ai" => Ok(OperatorKind::MainAi),
442 "composite" => Ok(OperatorKind::Composite),
443 "automate" => Ok(OperatorKind::Automate),
444 other => Err(ApiError::bad_request(format!(
445 "operator kind: unknown value '{other}' (expected main_ai|automate|composite)"
446 ))),
447 }
448}
449
450#[derive(Serialize)]
451struct FlowTasksResp {
452 final_ctx: Value,
453 bound_version: Option<String>,
454 /// Resolved TTL (seconds) actually applied to the run. Exposes the
455 /// 3-layer cascade (request body → BP metadata → server default) so
456 /// clients can verify which value took effect without re-deriving it.
457 effective_ttl_secs: u64,
458 ttl_source: TtlSource,
459}
460
461#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)]
462#[serde(rename_all = "snake_case")]
463enum TtlSource {
464 RequestBody,
465 BpMetadata,
466 ServerDefault,
467}
468
469/// Unified `/v1/tasks` POST entry (= Flow form only).
470/// Runs `Blueprint.flow` to completion via flow eval in a single round-trip.
471/// One-shot tasks are also expressed as a 1-Step Blueprint. Operator
472/// (kind / spawn_hook / senior_bridge) can be injected per request body.
473/// `operator_sid` (S2, runtime Operator match stage 1) additionally
474/// lets the caller pin the task to a specific already-registered Operator
475/// session sid, bypassing BP-level alias lookup — see `FlowTasksReq` doc.
476async fn tasks_start(
477 State(state): State<AppState>,
478 Json(req): Json<FlowTasksReq>,
479) -> Result<Json<FlowTasksResp>, ApiError> {
480 let resp = run_flow_form(&state, req).await?;
481 Ok(Json(resp))
482}
483
484/// Flow-form path (= via `TaskApplication.handle`).
485/// Core handler behind the `/v1/tasks` entry (`tasks_start`).
486///
487/// Engine stateless-executor refactor: the per-request
488/// sub_engine + 3-registry propagate loop is retired; the startup-built
489/// `state.task_app` (= a `TaskLaunchService` wrap around `state.engine`) is
490/// used directly. The Operator callback IF (`spawn_hook_id` /
491/// `senior_bridge_id` / `operator_backend_id`) is registered on
492/// `state.engine.register_*` at WS connect time — the engine is the SoT.
493/// See the `operator_ws` module doc for details.
494async fn run_flow_form(state: &AppState, req: FlowTasksReq) -> Result<FlowTasksResp, ApiError> {
495 use mlua_swarm::application::{BlueprintRef as AppBlueprintRef, TaskApplicationInput};
496 use mlua_swarm::Application;
497 use mlua_swarm::OperatorKind;
498
499 let mut op_req = req.operator.unwrap_or_default();
500
501 // S2: explicit `operator_sid` override (runtime Operator match stage 1).
502 // Resolved *before* building `operator_kind` / dispatching so an
503 // unknown sid fails fast with a 400, never silently falling back to the
504 // BP-level alias lookup. See `FlowTasksReq::operator_sid` doc for the
505 // disconnected-vs-unknown distinction.
506 if let Some(sid) = &req.operator_sid {
507 let known_ids = state.engine.list_operator_ids().await;
508 if !known_ids.iter().any(|id| id == sid) {
509 return Err(ApiError::bad_request(format!(
510 "operator_sid: no such registered operator session '{sid}'"
511 )));
512 }
513 op_req.operator_backend_id = Some(sid.clone());
514 }
515
516 // "Runtime Global" tier: `Some(_)` — including `Some(Automate)` — is
517 // always an explicit request that outranks the BP-level tiers; an
518 // absent/unset `kind` in the request body stays `None`, leaving the
519 // BP-level tiers (`OperatorDef.kind` / `Blueprint.default_operator_kind`)
520 // to decide instead of eagerly defaulting to `Automate`.
521 let operator_kind = op_req
522 .kind
523 .as_deref()
524 .map(parse_operator_kind_str)
525 .transpose()?;
526 let operator_id = op_req.id.unwrap_or_else(|| "http-run".to_string());
527 // "Runtime Agent-level" tier: per-agent overrides. Absent/empty = no
528 // override for any agent, letting the BP-level tiers decide per agent.
529 let mut operator_kind_overrides: HashMap<String, OperatorKind> = HashMap::new();
530 for (agent, kind_str) in op_req.per_agent_kinds.take().unwrap_or_default() {
531 operator_kind_overrides.insert(agent, parse_operator_kind_str(&kind_str)?);
532 }
533
534 let blueprint: AppBlueprintRef = match req.blueprint {
535 AppBlueprintRef::Inline { value } => AppBlueprintRef::Inline { value },
536 AppBlueprintRef::Id { id, version } => AppBlueprintRef::Id { id, version },
537 };
538
539 // TTL resolution cascade: (1) request body value, (2) BP metadata `default_run_ttl_secs`,
540 // (3) server global default (`default_run_ttl()`, 1800s).
541 let (ttl_secs, ttl_source) = match req.ttl_secs {
542 Some(v) => (v, TtlSource::RequestBody),
543 None => {
544 let (resolved_bp, _ver) = state
545 .task_app
546 .resolve(&blueprint)
547 .await
548 .map_err(|e| ApiError::bad_request(format!("bp resolve: {e}")))?;
549 match resolved_bp.metadata.default_run_ttl_secs {
550 Some(v) => (v, TtlSource::BpMetadata),
551 None => (default_run_ttl(), TtlSource::ServerDefault),
552 }
553 }
554 };
555
556 let out = state
557 .task_app
558 .handle(TaskApplicationInput {
559 blueprint,
560 operator_id: operator_id.clone(),
561 role: Role::Operator,
562 ttl: Duration::from_secs(ttl_secs),
563 init_ctx: req.init_ctx,
564 operator_kind,
565 bridge_id: op_req.senior_bridge_id,
566 hook_id: op_req.spawn_hook_id,
567 operator_backend_id: op_req.operator_backend_id,
568 operator_kind_overrides,
569 })
570 .await
571 .map_err(|e| ApiError::bad_request(format!("run: {e}")))?;
572
573 Ok(FlowTasksResp {
574 final_ctx: out.final_ctx,
575 bound_version: out.bound_version.map(|v| format!("{:?}", v)),
576 effective_ttl_secs: ttl_secs,
577 ttl_source,
578 })
579}
580
581// ─── helpers ─────────────────────────────────────────────────────────────
582
583async fn take_session_token(state: &AppState, sid: &str) -> Result<CapToken, ApiError> {
584 state
585 .sessions
586 .lock()
587 .await
588 .map
589 .remove(sid)
590 .ok_or_else(|| ApiError::not_found(format!("session: {sid}")))
591}
592
593/// Extracts sid from `Authorization: Bearer <sid>`. Strict — does not accept any other scheme prefix.
594fn extract_bearer(headers: &HeaderMap) -> Result<String, ApiError> {
595 let v = headers
596 .get(AUTHORIZATION)
597 .ok_or_else(|| ApiError::bad_request("missing Authorization header".into()))?
598 .to_str()
599 .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?;
600 let sid = v
601 .strip_prefix("Bearer ")
602 .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <sid>'".into()))?
603 .trim();
604 if sid.is_empty() {
605 return Err(ApiError::bad_request("Bearer sid is empty".into()));
606 }
607 Ok(sid.to_string())
608}
609
610fn parse_role(s: &str) -> Result<Role, ApiError> {
611 match s.to_ascii_lowercase().as_str() {
612 "operator" => Ok(Role::Operator),
613 "worker" => Ok(Role::Worker),
614 "observer" => Ok(Role::Observer),
615 "senior" => Ok(Role::Senior),
616 other => Err(ApiError::bad_request(format!("unknown role: {other}"))),
617 }
618}
619
620// ─── error type ──────────────────────────────────────────────────────────
621
622/// Uniform error response type for the handlers in this module. Converts to
623/// a JSON `{"error": message}` body with the given status via [`IntoResponse`].
624#[derive(Debug)]
625pub struct ApiError {
626 status: StatusCode,
627 message: String,
628}
629
630impl ApiError {
631 /// Wraps an engine-side error as `500 Internal Server Error`.
632 pub fn engine(e: impl std::fmt::Display) -> Self {
633 Self {
634 status: StatusCode::INTERNAL_SERVER_ERROR,
635 message: format!("engine: {e}"),
636 }
637 }
638 /// Builds a `404 Not Found` with the given message.
639 pub fn not_found(m: String) -> Self {
640 Self {
641 status: StatusCode::NOT_FOUND,
642 message: m,
643 }
644 }
645 /// Builds a `400 Bad Request` with the given message.
646 pub fn bad_request(m: String) -> Self {
647 Self {
648 status: StatusCode::BAD_REQUEST,
649 message: m,
650 }
651 }
652}
653
654impl IntoResponse for ApiError {
655 fn into_response(self) -> Response {
656 (self.status, Json(json!({"error": self.message}))).into_response()
657 }
658}
659
660fn default_run_ttl() -> u64 {
661 // 1800s (= 30 min). Prevents op_token expiry across a flow.ir multi-step chain
662 // (= 5+ SubAgent dispatches at 30–60s each). Origin: the observed fvloop smoke
663 // where a post-gate mock-commit dispatch blew past 300s and expired — sibling of worker_token TTL.
664 1800
665}
666
667/// TTL cascade resolve helper (Blueprint metadata → server default fallback).
668/// Second-stage fallback, called when the POST `/v1/tasks` body does not set `ttl_secs`.
669/// (1) If BP metadata `default_run_ttl_secs` is `Some`, use it.
670/// (2) If `None`, fall back to the server global `default_run_ttl()` (1800s).
671///
672/// # Full cascade (combined in `run_flow_form`)
673///
674/// - request body `ttl_secs=Some(v)` → v (this helper is not called)
675/// - request body `None` + metadata `Some(v)` → v
676/// - request body `None` + metadata `None` → `default_run_ttl()` = 1800s
677#[cfg(test)]
678fn resolve_ttl_from_metadata(metadata_ttl: Option<u64>) -> u64 {
679 metadata_ttl.unwrap_or_else(default_run_ttl)
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685
686 /// TTL cascade case 1: when the request body sets it, that value is used as-is
687 /// (upper branch that does not go through the helper; semantic verify of the
688 /// `Some(v) => v` direct-return path in `run_flow_form`).
689 #[test]
690 fn ttl_cascade_request_body_wins_over_metadata() {
691 let req_ttl: Option<u64> = Some(100);
692 let metadata_ttl: Option<u64> = Some(3600);
693 let effective = match req_ttl {
694 Some(v) => v,
695 None => resolve_ttl_from_metadata(metadata_ttl),
696 };
697 assert_eq!(
698 effective, 100,
699 "request body ttl_secs=100 must win over metadata=3600 (cascade priority (1) > (2))"
700 );
701 }
702
703 /// TTL cascade case 2: request body omitted + BP metadata `Some(N)` → `N` is effective.
704 #[test]
705 fn ttl_cascade_metadata_used_when_body_missing() {
706 let req_ttl: Option<u64> = None;
707 let metadata_ttl: Option<u64> = Some(3600);
708 let effective = match req_ttl {
709 Some(v) => v,
710 None => resolve_ttl_from_metadata(metadata_ttl),
711 };
712 assert_eq!(
713 effective, 3600,
714 "body None + metadata=3600 must resolve to 3600 (cascade (2))"
715 );
716 }
717
718 /// TTL cascade case 3: request body omitted + BP metadata `None` → server default (1800s).
719 #[test]
720 fn ttl_cascade_server_default_when_both_missing() {
721 let req_ttl: Option<u64> = None;
722 let metadata_ttl: Option<u64> = None;
723 let effective = match req_ttl {
724 Some(v) => v,
725 None => resolve_ttl_from_metadata(metadata_ttl),
726 };
727 assert_eq!(
728 effective,
729 default_run_ttl(),
730 "body None + metadata None must fall back to default_run_ttl() = 1800s"
731 );
732 assert_eq!(effective, 1800, "default_run_ttl() literal = 1800s");
733 }
734
735 /// Helper unit: metadata `None` → 1800 (server default expansion).
736 #[test]
737 fn resolve_ttl_from_metadata_none_returns_server_default() {
738 assert_eq!(resolve_ttl_from_metadata(None), 1800);
739 }
740
741 /// Helper unit: metadata `Some(N)` → `N` (server default ignored).
742 #[test]
743 fn resolve_ttl_from_metadata_some_returns_value() {
744 assert_eq!(resolve_ttl_from_metadata(Some(7200)), 7200);
745 assert_eq!(resolve_ttl_from_metadata(Some(60)), 60);
746 }
747}