Skip to main content

mlua_swarm_server/
lib.rs

1//! the server lib: axum Router + handler set. Split out as a library so it can
2//! be used from both `main.rs` (CLI) and integration tests.
3//!
4//! # Endpoints
5//!
6//! - `GET /v1/healthz`
7//! - `POST /v1/sessions` / `DELETE /v1/sessions` (= operator attach / detach, Bearer sid)
8//! - `POST /v1/tasks` (= unified Flow-form entry, Operator inject supported;
9//!   `operator_sid` explicitly pins the task to a registered Operator session, S2)
10//! - `POST /v1/operators` / `GET /v1/operators/:sid` / `DELETE /v1/operators/:sid` /
11//!   `GET /v1/operators/:sid/ws` (WS upgrade) — REST-like Operator login flow,
12//!   Bearer-mandatory; the sole WS Operator session route. See `operator_ws::login`
13//!   module doc.
14//!
15//! The Enhance issue axis (`/issues`) lives in the `issues` module; callers merge
16//! `build_issues_router` to integrate it into the same server.
17//!
18//! # The 3 faces of the Operator role (= registered directly on the engine SoT)
19//!
20//! The engine stateless-executor refactor removed the three
21//! `AppState` registries (former `HookRegistry` / `BridgeRegistry` / `OperatorRegistry`);
22//! all registration now goes directly to the engine SoT via
23//! `engine.register_spawn_hook` / `register_senior_bridge` / `register_operator`.
24//! `WSOperatorSession` (in the `operator_ws` module) registers all three traits
25//! simultaneously under a single sid — one WS connection covers all 3 faces of
26//! the Operator role, the canonical pattern.
27//!
28//! # `build_*` family
29//!
30//! - [`build_router`] — minimal entry (= `default_registry()`)
31//! - [`build_router_with`] — caller provides a `SpawnerRegistry` and optional `BlueprintStore`
32//!
33//! The engine should be started with [`default_layer_registry`] (= `Engine::new_with_layers`);
34//! otherwise `Blueprint.spawner_hints` is ignored.
35
36#![warn(missing_docs)]
37
38/// HTTP surface for inspecting/registering Blueprint state (`/v1/blueprints/*`).
39pub mod blueprints;
40/// Server config file support (`~/.mse/config.toml`, CLI > file > default merge).
41pub mod config;
42/// `/v1/data/*` endpoints (v9 Big Response handling, Store-owner direct path).
43pub mod data;
44/// `GET /v1/doctor` — read-only startup config / Store snapshot.
45pub mod doctor;
46/// HTTP surface for the `/v1/enhance/log` axis.
47pub mod enhance_log;
48/// `EnhanceSetting` HTTP CRUD (`/v1/enhance-settings*`).
49pub mod enhance_settings;
50/// HTTP surface for the Enhance issue axis (`/v1/issues*`).
51pub mod issues;
52/// WebSocket Operator Callback IF (`/v1/operators*`).
53pub mod operator_ws;
54/// `/v1/worker/*` endpoints (SubAgent self-fetch path).
55pub mod worker;
56pub use blueprints::{build_blueprints_router, build_blueprints_router_with_refs};
57pub use enhance_log::build_enhance_log_router;
58pub use enhance_settings::build_enhance_settings_router;
59pub use issues::{build_issues_router, GetIssueResponse, PostIssueRequest, PostIssueResponse};
60pub use operator_ws::{
61    operators_create, operators_delete, operators_info, operators_ws_connect, ClientMsg,
62    OperatorSessionEntry, ServerMsg, WSOperatorSession,
63};
64pub use worker::{worker_prompt, worker_result, PromptQuery, WorkerResultReq};
65
66use axum::{
67    extract::State,
68    http::{header::AUTHORIZATION, HeaderMap, StatusCode},
69    response::{IntoResponse, Response},
70    routing::{get, post},
71    Json, Router,
72};
73use mlua_swarm::application::{BlueprintRef, TaskApplication};
74use mlua_swarm::blueprint::store::BlueprintStore;
75use mlua_swarm::service::TaskLaunchService;
76use mlua_swarm::{
77    CapToken, Compiler, Engine, LayerRegistry, LuaInProcessSpawnerFactory, MainAIMiddleware,
78    OperatorDelegateMiddleware, OperatorSpawnerFactory, Role, RustFnInProcessSpawnerFactory,
79    SeniorEscalationMiddleware, SpawnerRegistry, SubprocessProcessSpawnerFactory,
80};
81use serde::{Deserialize, Serialize};
82use serde_json::{json, Value};
83use std::collections::HashMap;
84use std::sync::Arc;
85use std::time::Duration;
86use tokio::sync::Mutex;
87
88/// In-memory `sid → CapToken` map backing `/v1/sessions` attach/detach.
89#[derive(Default)]
90pub struct SessionStore {
91    /// Live session tokens keyed by `sid` (the token's `nonce`).
92    pub map: HashMap<String, CapToken>,
93}
94
95/// Shared axum handler state for the whole router. Cloned per-request (all
96/// fields are `Arc`/cheap-clone), constructed once in [`build_router_with_ws_factory`].
97#[derive(Clone)]
98pub struct AppState {
99    /// The engine SoT (attach/detach, dispatch, registries).
100    pub engine: Engine,
101    /// Live `/v1/sessions` attach records (Operator/Worker/etc session tokens).
102    pub sessions: Arc<Mutex<SessionStore>>,
103    /// Application used at the task entry to resolve `BlueprintRef`. Without a Store, runs in Inline-only mode.
104    pub task_app: Arc<TaskApplication>,
105    /// When `Some`, on WS connect a new `WSOperatorSession` is automatically registered
106    /// with this factory under the sid name (= a `kind=operator` + `operator_ref=<sid>` AgentDef
107    /// binds to the `WSOperatorSession` backend).
108    /// When `None`, no auto-registration happens; the session is only registered on
109    /// `engine.OperatorRegistry` (= only the `OperatorDelegateMiddleware` path is effective;
110    /// the `OperatorSpawnerFactory` path is dead).
111    pub ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
112    /// Owner of the Store on the Data path (Big Response handling). Added in v9.
113    /// Independent layer — the Engine core and the Domain path (`/v1/worker/result`)
114    /// are not involved.
115    /// Default = `InMemoryOutputStore` (constructed inside `build_router_with_ws_factory`);
116    /// callers can swap in an sqlite/fs backend later (future carry).
117    pub data_store: Arc<dyn mlua_swarm::store::output::OutputStore>,
118    /// Login-flow session store (`POST /v1/operators` mint records). `sid` →
119    /// `OperatorSessionEntry`. This is the sole session store for the WS
120    /// Operator role. See `operator_ws::login` module doc.
121    pub operator_sessions:
122        Arc<Mutex<HashMap<String, Arc<crate::operator_ws::login::OperatorSessionEntry>>>>,
123    /// S1 login-flow roles-exclusivity map. Role name → owning `sid`. Checked
124    /// (and updated) atomically under a single lock in
125    /// `operator_ws::login::operators_create` — a role already present here
126    /// causes `POST /v1/operators` to return `409 CONFLICT`. Entries are
127    /// released on `DELETE /v1/operators/:sid`.
128    pub roles_to_sid: Arc<Mutex<HashMap<String, String>>>,
129}
130
131/// Minimal entry point: builds a router with [`default_registry`] and no
132/// `BlueprintStore` (Inline-only mode) or `ws_operator_factory`.
133pub fn build_router(engine: Engine) -> Router {
134    build_router_with(engine, default_registry(), None)
135}
136
137/// Default `LayerRegistry` for the server. Hint keys:
138/// - `"main_ai"` → `MainAIMiddleware` (= fires SpawnHook before/after)
139/// - `"senior_escalation"` → `SeniorEscalationMiddleware` (= on `ok=false`, escalates via `SeniorBridge.ask`)
140/// - `"operator_delegate"` → `OperatorDelegateMiddleware` (= when an operator backend is registered, delegates the entire spawn)
141///
142/// Including any of these keys in `Blueprint.spawner_hints.layers` causes them to
143/// be wrapped into a `SpawnerStack` at `service::linker::link` time (= per-launch;
144/// the old `engine.bind` global-state path is retired).
145/// Callers (the engine builder side) receive it via
146/// `Engine::new_with_layers(cfg, mse_server::default_layer_registry())`.
147pub fn default_layer_registry() -> LayerRegistry {
148    LayerRegistry::new()
149        .with_hint("main_ai", |_engine| Arc::new(MainAIMiddleware::new()))
150        .with_hint("senior_escalation", |_engine| {
151            Arc::new(SeniorEscalationMiddleware::new())
152        })
153        .with_hint("operator_delegate", |_engine| {
154            Arc::new(OperatorDelegateMiddleware::new())
155        })
156}
157
158/// Build form where the caller supplies a registry and an optional `BlueprintStore`.
159/// The Operator callback path (= external HTTP / WS callers acting as an Operator)
160/// must be pre-registered via `engine.register_*` (= the engine is the SoT).
161/// See the `operator_ws` module doc and `OperatorInfo` (engine-side `ctx.rs`) for details.
162pub fn build_router_with(
163    engine: Engine,
164    registry: SpawnerRegistry,
165    store: Option<Arc<dyn BlueprintStore>>,
166) -> Router {
167    build_router_with_ws_factory(engine, registry, store, None)
168}
169
170/// 4-argument variant of `build_router_with`. Passing `ws_operator_factory = Some(arc)`
171/// causes each WS connect to auto-register a new `WSOperatorSession` under its sid
172/// name with the factory (= a `kind=operator` AgentDef with `operator_ref: <sid>`
173/// can then bind to the WS client backend). Callers are expected to also install
174/// the same `Arc` into the `SpawnerRegistry` via
175/// `reg.register::<OperatorSpawnerFactory>(arc.clone())`.
176pub fn build_router_with_ws_factory(
177    engine: Engine,
178    registry: SpawnerRegistry,
179    store: Option<Arc<dyn BlueprintStore>>,
180    ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
181) -> Router {
182    build_router_with_ws_factory_and_output(engine, registry, store, ws_operator_factory, None)
183}
184
185/// 5-argument variant of [`build_router_with_ws_factory`]. Passing
186/// `output_store = Some(arc)` swaps the default `InMemoryOutputStore` for a
187/// caller-supplied backend (a `SqliteOutputStore`, for instance). `None`
188/// preserves the historical behaviour (fresh in-memory store per call).
189pub fn build_router_with_ws_factory_and_output(
190    engine: Engine,
191    registry: SpawnerRegistry,
192    store: Option<Arc<dyn BlueprintStore>>,
193    ws_operator_factory: Option<Arc<OperatorSpawnerFactory>>,
194    output_store: Option<Arc<dyn mlua_swarm::store::output::OutputStore>>,
195) -> Router {
196    let compiler = Compiler::new(registry);
197    let launch = Arc::new(TaskLaunchService::new(engine.clone(), compiler));
198    let task_app = Arc::new(match store {
199        Some(s) => TaskApplication::new(launch, s),
200        None => TaskApplication::new_inline_only(launch),
201    });
202    let data_store: Arc<dyn mlua_swarm::store::output::OutputStore> = match output_store {
203        Some(s) => s,
204        None => Arc::new(mlua_swarm::store::output::InMemoryOutputStore::new()),
205    };
206    let state = AppState {
207        engine,
208        sessions: Arc::new(Mutex::new(SessionStore::default())),
209        task_app,
210        ws_operator_factory,
211        data_store,
212        operator_sessions: Arc::new(Mutex::new(HashMap::new())),
213        roles_to_sid: Arc::new(Mutex::new(HashMap::new())),
214    };
215    Router::new()
216        .route("/v1/healthz", get(healthz))
217        // session = collection (POST = attach, DELETE = detach, sid via Authorization)
218        .route(
219            "/v1/sessions",
220            post(sessions_attach).delete(sessions_detach),
221        )
222        // task = flat, single level; authz resolved via Authorization: Bearer <sid>
223        .route("/v1/tasks", post(tasks_start))
224        // REST-like Operator login flow (Bearer-mandatory, roles exclusivity).
225        // Sole WS Operator session route; see `operator_ws::login` module doc.
226        .route("/v1/operators", post(operators_create))
227        .route("/v1/operators/:sid/ws", get(operators_ws_connect))
228        .route(
229            "/v1/operators/:sid",
230            get(operators_info).delete(operators_delete),
231        )
232        // SubAgent self-fetch path (the SubAgent self-fetch design). The SubAgent puts the
233        // CapToken handed over via WS Spawn into Bearer and hits the prompt / result
234        // endpoints directly over HTTP. See the `worker` module doc for details.
235        .route("/v1/worker/prompt", get(worker::worker_prompt))
236        .route("/v1/worker/result", post(worker::worker_result))
237        // Simplified endpoint (= worker POSTs with just token + raw body; task_id is auto-looked-up)
238        .route("/v1/worker/submit", post(worker::worker_submit))
239        // Data path (v9 Big Response handling, independent from Domain / verdict flow)
240        .route("/v1/data/emit", post(data::data_emit))
241        .route(
242            "/v1/data/:key",
243            get(data::data_get).post(data::data_emit_named),
244        )
245        .with_state(state)
246}
247
248/// Default registry = Subprocess + RustFn (baseline `identity` worker pre-baked) + empty Operator factory.
249///
250/// `RustFnInProcessSpawnerFactory` gets one baseline entry (`fn_id = "identity"`)
251/// baked in via [`mlua_swarm::worker::baseline::extend_with_baseline`]. This
252/// is the shared bootstrap / smoke worker SoT across each binary (the server / MCP adapter /
253/// one-shot runner) — it structurally replaces the old per-binary inline echo injection.
254///
255/// Usage: default Task path at server startup. If production needs additional
256/// backends, callers bring in a different registry via
257/// `build_router_with(engine, custom_registry)`. The enhance flow
258/// (= patch-spawner / patch-applier / verifier-router / committer axes) uses
259/// [`default_registry_with_enhance_flow`].
260///
261/// The Operator factory is an empty shell with zero registrations (= sids are
262/// dynamically registered per WS connect; see the `operator_ws` module).
263pub fn default_registry() -> SpawnerRegistry {
264    let rustfn_factory =
265        mlua_swarm::worker::baseline::extend_with_baseline(RustFnInProcessSpawnerFactory::new());
266
267    let mut reg = SpawnerRegistry::new();
268    reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(SubprocessProcessSpawnerFactory));
269    reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(rustfn_factory));
270    reg.register::<OperatorSpawnerFactory>(Arc::new(OperatorSpawnerFactory::new()));
271    reg
272}
273
274/// Opt-in registry that merges [`default_registry`] with the enhance flow
275/// (Lua factory + AgentBlock factory).
276///
277/// Selected via the `the server` CLI flag `--enable-enhance-flow`. The enhance
278/// flow is a separate-axis wrapper: the Lua factory (= 3 Lua workers + 3 primitive
279/// bridges) and the AgentBlock factory (= patch-spawner path, expects
280/// `assets/operator_scripts/blueprint_patch_spawner.lua` + `ANTHROPIC_API_KEY`)
281/// are baked in as pipeline defaults. The baseline RustFn (`identity`) is pre-baked
282/// the same way as in `default_registry`.
283pub fn default_registry_with_enhance_flow() -> SpawnerRegistry {
284    let lua_factory =
285        mlua_swarm::enhance::blueprint::extend_factory(LuaInProcessSpawnerFactory::new());
286    // The Factory is stateless (= 1 process → 1 factory shared by all AgentDefs).
287    // Per-agent specialization (script_path / project_root, etc.) goes through AgentDef.spec.
288    // The enhance-flow patch-spawner is declared literally in agents[].spec of `default_blueprint.yaml`.
289    let agent_block_factory =
290        mlua_swarm::worker::agent_block::AgentBlockInProcessSpawnerFactory::new();
291    let rustfn_factory =
292        mlua_swarm::worker::baseline::extend_with_baseline(RustFnInProcessSpawnerFactory::new());
293
294    let mut reg = SpawnerRegistry::new();
295    reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(SubprocessProcessSpawnerFactory));
296    reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(rustfn_factory));
297    reg.register::<LuaInProcessSpawnerFactory>(Arc::new(lua_factory));
298    reg.register::<mlua_swarm::worker::agent_block::AgentBlockInProcessSpawnerFactory>(Arc::new(
299        agent_block_factory,
300    ));
301    reg.register::<OperatorSpawnerFactory>(Arc::new(OperatorSpawnerFactory::new()));
302    reg
303}
304
305// ─── handlers ────────────────────────────────────────────────────────────
306
307async fn healthz() -> &'static str {
308    "ok"
309}
310
311#[derive(Deserialize)]
312struct AttachReq {
313    agent_id: String,
314    role: String,
315    ttl_secs: u64,
316}
317
318#[derive(Serialize)]
319struct AttachResp {
320    session_id: String,
321    role: String,
322}
323
324async fn sessions_attach(
325    State(state): State<AppState>,
326    Json(req): Json<AttachReq>,
327) -> Result<Json<AttachResp>, ApiError> {
328    let role = parse_role(&req.role)?;
329    let token = state
330        .engine
331        .attach(req.agent_id, role, Duration::from_secs(req.ttl_secs))
332        .await
333        .map_err(ApiError::engine)?;
334    let sid = token.nonce.clone();
335    state.sessions.lock().await.map.insert(sid.clone(), token);
336    Ok(Json(AttachResp {
337        session_id: sid,
338        role: req.role,
339    }))
340}
341
342async fn sessions_detach(
343    State(state): State<AppState>,
344    headers: HeaderMap,
345) -> Result<StatusCode, ApiError> {
346    let sid = extract_bearer(&headers)?;
347    let token = take_session_token(&state, &sid).await?;
348    state
349        .engine
350        .detach(&token)
351        .await
352        .map_err(ApiError::engine)?;
353    Ok(StatusCode::NO_CONTENT)
354}
355
356// ─── Unified /v1/tasks schema (= flow-eval path, Operator inject supported) ───────
357
358/// `/v1/tasks` POST schema. Uses the flow-eval path and supports Operator inject
359/// (kind / spawn_hook / senior_bridge). Expressing a one-shot task as a 1-Step
360/// Blueprint is the only correct model.
361#[derive(Deserialize)]
362struct FlowTasksReq {
363    blueprint: BlueprintRef,
364    init_ctx: Value,
365    /// TTL in seconds. When unspecified (`None`), falls back in this order:
366    /// (1) `metadata.default_run_ttl_secs` from the resolved BP,
367    /// (2) if absent, the server global `default_run_ttl()` (1800s).
368    #[serde(default)]
369    ttl_secs: Option<u64>,
370    #[serde(default)]
371    operator: Option<OperatorReq>,
372    /// Explicit Operator session sid (or role alias) this task's entire Spawn
373    /// stream should be routed to (runtime Operator match stage 1).
374    ///
375    /// When `Some`, it is validated at request time against
376    /// `state.engine.list_operator_ids()` (the live `engine.operators`
377    /// registry key set): an unknown/never-registered id returns `400`
378    /// immediately — this is a deliberate hard-fail, in contrast to
379    /// `OperatorDelegateWrapped::spawn`, which silently falls through to
380    /// `inner.spawn` on a registry miss. A sid that *was* registered but has
381    /// since disconnected (WS `tx` cleared, session entry retained for
382    /// reconnect) passes this check and surfaces as an explicit dispatch-time
383    /// error instead (`WSOperatorSession::send_and_await` returns `Err` when
384    /// `tx` is `None`), which also propagates as a request failure rather
385    /// than a silent fallback.
386    ///
387    /// On success this value **overrides** `operator.operator_backend_id`
388    /// (last-write-wins, `operator_sid` takes priority) before the flow is
389    /// dispatched — see `run_flow_form`. Dispatch still only delegates if the
390    /// Blueprint opts into `spawner_hints.layers = ["operator_delegate"]`
391    /// (unchanged precondition, same as the existing `operator_backend_id`
392    /// field).
393    ///
394    /// When unset, behavior is unchanged: whatever
395    /// `operator.operator_backend_id` / BP-level `operator_ref` alias
396    /// resolution already does still applies.
397    #[serde(default)]
398    operator_sid: Option<String>,
399}
400
401#[derive(Deserialize, Default)]
402struct OperatorReq {
403    /// `main_ai` / `automate` / `composite`. This is the "Runtime Global"
404    /// tier of the 4-tier `OperatorKind` cascade (see `mlua_swarm
405    /// ::ctx::collapse_operator_kind`); when unspecified, falls through to
406    /// the BP-level tiers (`OperatorDef.kind` / `Blueprint
407    /// .default_operator_kind`) instead of eagerly defaulting to `automate`.
408    #[serde(default)]
409    kind: Option<String>,
410    /// Operator id at attach time (= sessions tracking key in the EventLog); unspecified defaults to `"http-run"`.
411    #[serde(default)]
412    id: Option<String>,
413    /// Name of a hook pre-registered via `engine.register_spawn_hook`; `None` if unspecified.
414    #[serde(default)]
415    spawn_hook_id: Option<String>,
416    /// Name of a bridge pre-registered via `engine.register_senior_bridge`; `None` if unspecified.
417    #[serde(default)]
418    senior_bridge_id: Option<String>,
419    /// Name of an Operator backend pre-registered via `engine.register_operator`
420    /// (= the path that delegates the entire spawn to an external Operator);
421    /// `None` if unspecified. When `kind == MainAi/Composite` and this id is `Some`,
422    /// `OperatorDelegateMiddleware` bypasses `inner.spawn` and calls `operator.execute` instead.
423    /// This is a different axis from `operator.id` (= session tracking label);
424    /// `operator_backend_id` is the registry lookup key.
425    #[serde(default)]
426    operator_backend_id: Option<String>,
427    /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
428    /// cascade — per-agent override, keyed by `AgentDef.name`, value is
429    /// `main_ai` / `automate` / `composite` (same parsing as `kind`).
430    /// `None` / absent means no per-agent override.
431    #[serde(default)]
432    per_agent_kinds: Option<HashMap<String, String>>,
433}
434
435/// Parse a wire-level kind string (`"main_ai"` / `"automate"` / `"composite"`)
436/// into `OperatorKind`. Shared by `OperatorReq.kind` and
437/// `OperatorReq.per_agent_kinds` values.
438fn parse_operator_kind_str(s: &str) -> Result<mlua_swarm::OperatorKind, ApiError> {
439    use mlua_swarm::OperatorKind;
440    match s {
441        "main_ai" => Ok(OperatorKind::MainAi),
442        "composite" => Ok(OperatorKind::Composite),
443        "automate" => Ok(OperatorKind::Automate),
444        other => Err(ApiError::bad_request(format!(
445            "operator kind: unknown value '{other}' (expected main_ai|automate|composite)"
446        ))),
447    }
448}
449
450#[derive(Serialize)]
451struct FlowTasksResp {
452    final_ctx: Value,
453    bound_version: Option<String>,
454    /// Resolved TTL (seconds) actually applied to the run. Exposes the
455    /// 3-layer cascade (request body → BP metadata → server default) so
456    /// clients can verify which value took effect without re-deriving it.
457    effective_ttl_secs: u64,
458    ttl_source: TtlSource,
459}
460
461#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)]
462#[serde(rename_all = "snake_case")]
463enum TtlSource {
464    RequestBody,
465    BpMetadata,
466    ServerDefault,
467}
468
469/// Unified `/v1/tasks` POST entry (= Flow form only).
470/// Runs `Blueprint.flow` to completion via flow eval in a single round-trip.
471/// One-shot tasks are also expressed as a 1-Step Blueprint. Operator
472/// (kind / spawn_hook / senior_bridge) can be injected per request body.
473/// `operator_sid` (S2, runtime Operator match stage 1) additionally
474/// lets the caller pin the task to a specific already-registered Operator
475/// session sid, bypassing BP-level alias lookup — see `FlowTasksReq` doc.
476async fn tasks_start(
477    State(state): State<AppState>,
478    Json(req): Json<FlowTasksReq>,
479) -> Result<Json<FlowTasksResp>, ApiError> {
480    let resp = run_flow_form(&state, req).await?;
481    Ok(Json(resp))
482}
483
484/// Flow-form path (= via `TaskApplication.handle`).
485/// Core handler behind the `/v1/tasks` entry (`tasks_start`).
486///
487/// Engine stateless-executor refactor: the per-request
488/// sub_engine + 3-registry propagate loop is retired; the startup-built
489/// `state.task_app` (= a `TaskLaunchService` wrap around `state.engine`) is
490/// used directly. The Operator callback IF (`spawn_hook_id` /
491/// `senior_bridge_id` / `operator_backend_id`) is registered on
492/// `state.engine.register_*` at WS connect time — the engine is the SoT.
493/// See the `operator_ws` module doc for details.
494async fn run_flow_form(state: &AppState, req: FlowTasksReq) -> Result<FlowTasksResp, ApiError> {
495    use mlua_swarm::application::{BlueprintRef as AppBlueprintRef, TaskApplicationInput};
496    use mlua_swarm::Application;
497    use mlua_swarm::OperatorKind;
498
499    let mut op_req = req.operator.unwrap_or_default();
500
501    // S2: explicit `operator_sid` override (runtime Operator match stage 1).
502    // Resolved *before* building `operator_kind` / dispatching so an
503    // unknown sid fails fast with a 400, never silently falling back to the
504    // BP-level alias lookup. See `FlowTasksReq::operator_sid` doc for the
505    // disconnected-vs-unknown distinction.
506    if let Some(sid) = &req.operator_sid {
507        let known_ids = state.engine.list_operator_ids().await;
508        if !known_ids.iter().any(|id| id == sid) {
509            return Err(ApiError::bad_request(format!(
510                "operator_sid: no such registered operator session '{sid}'"
511            )));
512        }
513        op_req.operator_backend_id = Some(sid.clone());
514    }
515
516    // "Runtime Global" tier: `Some(_)` — including `Some(Automate)` — is
517    // always an explicit request that outranks the BP-level tiers; an
518    // absent/unset `kind` in the request body stays `None`, leaving the
519    // BP-level tiers (`OperatorDef.kind` / `Blueprint.default_operator_kind`)
520    // to decide instead of eagerly defaulting to `Automate`.
521    let operator_kind = op_req
522        .kind
523        .as_deref()
524        .map(parse_operator_kind_str)
525        .transpose()?;
526    let operator_id = op_req.id.unwrap_or_else(|| "http-run".to_string());
527    // "Runtime Agent-level" tier: per-agent overrides. Absent/empty = no
528    // override for any agent, letting the BP-level tiers decide per agent.
529    let mut operator_kind_overrides: HashMap<String, OperatorKind> = HashMap::new();
530    for (agent, kind_str) in op_req.per_agent_kinds.take().unwrap_or_default() {
531        operator_kind_overrides.insert(agent, parse_operator_kind_str(&kind_str)?);
532    }
533
534    let blueprint: AppBlueprintRef = match req.blueprint {
535        AppBlueprintRef::Inline { value } => AppBlueprintRef::Inline { value },
536        AppBlueprintRef::Id { id, version } => AppBlueprintRef::Id { id, version },
537    };
538
539    // TTL resolution cascade: (1) request body value, (2) BP metadata `default_run_ttl_secs`,
540    // (3) server global default (`default_run_ttl()`, 1800s).
541    let (ttl_secs, ttl_source) = match req.ttl_secs {
542        Some(v) => (v, TtlSource::RequestBody),
543        None => {
544            let (resolved_bp, _ver) = state
545                .task_app
546                .resolve(&blueprint)
547                .await
548                .map_err(|e| ApiError::bad_request(format!("bp resolve: {e}")))?;
549            match resolved_bp.metadata.default_run_ttl_secs {
550                Some(v) => (v, TtlSource::BpMetadata),
551                None => (default_run_ttl(), TtlSource::ServerDefault),
552            }
553        }
554    };
555
556    let out = state
557        .task_app
558        .handle(TaskApplicationInput {
559            blueprint,
560            operator_id: operator_id.clone(),
561            role: Role::Operator,
562            ttl: Duration::from_secs(ttl_secs),
563            init_ctx: req.init_ctx,
564            operator_kind,
565            bridge_id: op_req.senior_bridge_id,
566            hook_id: op_req.spawn_hook_id,
567            operator_backend_id: op_req.operator_backend_id,
568            operator_kind_overrides,
569        })
570        .await
571        .map_err(|e| ApiError::bad_request(format!("run: {e}")))?;
572
573    Ok(FlowTasksResp {
574        final_ctx: out.final_ctx,
575        bound_version: out.bound_version.map(|v| format!("{:?}", v)),
576        effective_ttl_secs: ttl_secs,
577        ttl_source,
578    })
579}
580
581// ─── helpers ─────────────────────────────────────────────────────────────
582
583async fn take_session_token(state: &AppState, sid: &str) -> Result<CapToken, ApiError> {
584    state
585        .sessions
586        .lock()
587        .await
588        .map
589        .remove(sid)
590        .ok_or_else(|| ApiError::not_found(format!("session: {sid}")))
591}
592
593/// Extracts sid from `Authorization: Bearer <sid>`. Strict — does not accept any other scheme prefix.
594fn extract_bearer(headers: &HeaderMap) -> Result<String, ApiError> {
595    let v = headers
596        .get(AUTHORIZATION)
597        .ok_or_else(|| ApiError::bad_request("missing Authorization header".into()))?
598        .to_str()
599        .map_err(|_| ApiError::bad_request("invalid Authorization header encoding".into()))?;
600    let sid = v
601        .strip_prefix("Bearer ")
602        .ok_or_else(|| ApiError::bad_request("Authorization must be 'Bearer <sid>'".into()))?
603        .trim();
604    if sid.is_empty() {
605        return Err(ApiError::bad_request("Bearer sid is empty".into()));
606    }
607    Ok(sid.to_string())
608}
609
610fn parse_role(s: &str) -> Result<Role, ApiError> {
611    match s.to_ascii_lowercase().as_str() {
612        "operator" => Ok(Role::Operator),
613        "worker" => Ok(Role::Worker),
614        "observer" => Ok(Role::Observer),
615        "senior" => Ok(Role::Senior),
616        other => Err(ApiError::bad_request(format!("unknown role: {other}"))),
617    }
618}
619
620// ─── error type ──────────────────────────────────────────────────────────
621
622/// Uniform error response type for the handlers in this module. Converts to
623/// a JSON `{"error": message}` body with the given status via [`IntoResponse`].
624#[derive(Debug)]
625pub struct ApiError {
626    status: StatusCode,
627    message: String,
628}
629
630impl ApiError {
631    /// Wraps an engine-side error as `500 Internal Server Error`.
632    pub fn engine(e: impl std::fmt::Display) -> Self {
633        Self {
634            status: StatusCode::INTERNAL_SERVER_ERROR,
635            message: format!("engine: {e}"),
636        }
637    }
638    /// Builds a `404 Not Found` with the given message.
639    pub fn not_found(m: String) -> Self {
640        Self {
641            status: StatusCode::NOT_FOUND,
642            message: m,
643        }
644    }
645    /// Builds a `400 Bad Request` with the given message.
646    pub fn bad_request(m: String) -> Self {
647        Self {
648            status: StatusCode::BAD_REQUEST,
649            message: m,
650        }
651    }
652}
653
654impl IntoResponse for ApiError {
655    fn into_response(self) -> Response {
656        (self.status, Json(json!({"error": self.message}))).into_response()
657    }
658}
659
660fn default_run_ttl() -> u64 {
661    // 1800s (= 30 min). Prevents op_token expiry across a flow.ir multi-step chain
662    // (= 5+ SubAgent dispatches at 30–60s each). Origin: the observed fvloop smoke
663    // where a post-gate mock-commit dispatch blew past 300s and expired — sibling of worker_token TTL.
664    1800
665}
666
667/// TTL cascade resolve helper (Blueprint metadata → server default fallback).
668/// Second-stage fallback, called when the POST `/v1/tasks` body does not set `ttl_secs`.
669/// (1) If BP metadata `default_run_ttl_secs` is `Some`, use it.
670/// (2) If `None`, fall back to the server global `default_run_ttl()` (1800s).
671///
672/// # Full cascade (combined in `run_flow_form`)
673///
674/// - request body `ttl_secs=Some(v)` → v (this helper is not called)
675/// - request body `None` + metadata `Some(v)` → v
676/// - request body `None` + metadata `None` → `default_run_ttl()` = 1800s
677#[cfg(test)]
678fn resolve_ttl_from_metadata(metadata_ttl: Option<u64>) -> u64 {
679    metadata_ttl.unwrap_or_else(default_run_ttl)
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685
686    /// TTL cascade case 1: when the request body sets it, that value is used as-is
687    /// (upper branch that does not go through the helper; semantic verify of the
688    /// `Some(v) => v` direct-return path in `run_flow_form`).
689    #[test]
690    fn ttl_cascade_request_body_wins_over_metadata() {
691        let req_ttl: Option<u64> = Some(100);
692        let metadata_ttl: Option<u64> = Some(3600);
693        let effective = match req_ttl {
694            Some(v) => v,
695            None => resolve_ttl_from_metadata(metadata_ttl),
696        };
697        assert_eq!(
698            effective, 100,
699            "request body ttl_secs=100 must win over metadata=3600 (cascade priority (1) > (2))"
700        );
701    }
702
703    /// TTL cascade case 2: request body omitted + BP metadata `Some(N)` → `N` is effective.
704    #[test]
705    fn ttl_cascade_metadata_used_when_body_missing() {
706        let req_ttl: Option<u64> = None;
707        let metadata_ttl: Option<u64> = Some(3600);
708        let effective = match req_ttl {
709            Some(v) => v,
710            None => resolve_ttl_from_metadata(metadata_ttl),
711        };
712        assert_eq!(
713            effective, 3600,
714            "body None + metadata=3600 must resolve to 3600 (cascade (2))"
715        );
716    }
717
718    /// TTL cascade case 3: request body omitted + BP metadata `None` → server default (1800s).
719    #[test]
720    fn ttl_cascade_server_default_when_both_missing() {
721        let req_ttl: Option<u64> = None;
722        let metadata_ttl: Option<u64> = None;
723        let effective = match req_ttl {
724            Some(v) => v,
725            None => resolve_ttl_from_metadata(metadata_ttl),
726        };
727        assert_eq!(
728            effective,
729            default_run_ttl(),
730            "body None + metadata None must fall back to default_run_ttl() = 1800s"
731        );
732        assert_eq!(effective, 1800, "default_run_ttl() literal = 1800s");
733    }
734
735    /// Helper unit: metadata `None` → 1800 (server default expansion).
736    #[test]
737    fn resolve_ttl_from_metadata_none_returns_server_default() {
738        assert_eq!(resolve_ttl_from_metadata(None), 1800);
739    }
740
741    /// Helper unit: metadata `Some(N)` → `N` (server default ignored).
742    #[test]
743    fn resolve_ttl_from_metadata_some_returns_value() {
744        assert_eq!(resolve_ttl_from_metadata(Some(7200)), 7200);
745        assert_eq!(resolve_ttl_from_metadata(Some(60)), 60);
746    }
747}