Skip to main content

mlua_swarm/service/
task_launch.rs

1//! `TaskLaunchService` — the domain service that runs a Blueprint flow
2//! to completion through the engine.
3//!
4//! Responsibilities:
5//! 1. Compile the Blueprint and link it into a `SpawnerAdapter` (via
6//!    `service::linker::link`, wrapped by `EngineDispatcher::with_spawner`).
7//! 2. Acquire an Operator session (via `engine.attach`).
8//! 3. Run flow.ir's `eval_async` through an `EngineDispatcher` and
9//!    return the final `ctx`.
10//! 4. If any step fails (dispatcher error), `eval_async` errors and
11//!    the failure propagates as-is.
12//!
13//! Callers on the Application layer never touch the engine directly —
14//! `bind`, `start_task`, and `eval_async` all stay inside the Service.
15//!
16//! A single-task-spawn API (calling `start_task` directly) is
17//! deliberately absent here: a single spawn can be modeled as a
18//! one-Step flow, and we do not want two interfaces for the same
19//! shape.
20
21use crate::blueprint::compiler::{CompileError, Compiler};
22use crate::blueprint::{Blueprint, EngineDispatcher};
23use crate::core::ctx::OperatorKind;
24use crate::core::engine::Engine;
25use crate::core::errors::EngineError;
26use crate::middleware::project_name_alias::ProjectNameAliasMiddleware;
27use crate::middleware::SpawnerStack;
28use crate::service::linker;
29use crate::types::{CapToken, Role};
30use serde_json::Value;
31use std::collections::HashMap;
32use std::time::Duration;
33use thiserror::Error;
34
35/// Derive the "BP Agent-level" tier of the `OperatorKind` cascade from a
36/// Blueprint: for every `AgentDef` whose `spec.operator_ref` resolves to an
37/// `OperatorDef` with a `Some` `kind`, map `AgentDef.name -> OperatorKind`.
38///
39/// Deliberately **not** filtered by `AgentDef.kind == AgentKind::Operator`:
40/// the `OperatorKind` cascade is a middleware-level cross-cutting concern
41/// (spawn_hook / senior_bridge / operator-delegate gating via `Ctx.operator`),
42/// orthogonal to the Worker IMPL axis that `AgentKind` expresses (see the
43/// crate root doc, "Operator is delivered as a cross-cutting overlay through
44/// `Ctx` plus middleware"). A `RustFn` / `Lua` / `Subprocess` agent can
45/// equally declare `spec.operator_ref` to opt into a BP-declared
46/// `OperatorKind` without changing its Worker IMPL. Agents without an
47/// `operator_ref`, an unresolved `operator_ref`, or an `OperatorDef.kind =
48/// None` are simply absent from the map (= that tier falls through for
49/// them). This is a separate, independent consumer of `Blueprint.operators`
50/// from the design-time `operator_ref` validation in
51/// `blueprint::compiler::Compiler::compile` (issue: `OperatorDef`
52/// first-class treatment), which only checks the reference resolves for
53/// `AgentKind::Operator` agents and is unaffected by this function.
54fn derive_bp_agent_kinds(blueprint: &Blueprint) -> HashMap<String, OperatorKind> {
55    let mut out = HashMap::new();
56    if blueprint.operators.is_empty() {
57        return out;
58    }
59    for agent in &blueprint.agents {
60        let Some(op_ref) = agent.spec.get("operator_ref").and_then(|v| v.as_str()) else {
61            continue;
62        };
63        let Some(op_def) = blueprint.operators.iter().find(|o| o.name == op_ref) else {
64            continue;
65        };
66        if let Some(kind) = op_def.kind {
67            out.insert(agent.name.clone(), OperatorKind::from(kind));
68        }
69    }
70    out
71}
72
73/// Failure modes of [`TaskLaunchService::launch`].
74#[derive(Debug, Error)]
75pub enum TaskLaunchError {
76    /// `Compiler::compile` rejected the Blueprint.
77    #[error("compile: {0}")]
78    Compile(#[from] CompileError),
79    /// `Engine::attach_with_ids` failed.
80    #[error("engine: {0}")]
81    Engine(#[from] EngineError),
82    /// A `Step` inside `flow.ir`'s `eval_async` produced a dispatcher
83    /// error, or a sub-flow raised.
84    #[error("flow eval: {0}")]
85    FlowEval(String),
86}
87
88/// Input to [`TaskLaunchService::launch`].
89#[derive(Debug, Clone)]
90pub struct TaskLaunchInput {
91    /// The Blueprint to compile, link, and run.
92    pub blueprint: Blueprint,
93    /// Caller-supplied id for the Operator that owns this run.
94    pub operator_id: String,
95    /// The Operator's role for this run.
96    pub role: Role,
97    /// How long the attached session is allowed to live.
98    pub ttl: Duration,
99    /// "Runtime Global" tier of the `OperatorKind` cascade. `Some(_)` is
100    /// always an explicit request — including `Some(OperatorKind::Automate)`
101    /// — that outranks the BP-level tiers (`OperatorDef.kind` /
102    /// `Blueprint.default_operator_kind`); `None` leaves it unspecified so
103    /// those tiers / the final default decide. Under `MainAi` or
104    /// `Composite`, `MainAIMiddleware`'s `spawn_hook` before/after
105    /// callbacks become effective. See
106    /// `crate::core::ctx::collapse_operator_kind`.
107    pub operator_kind: Option<OperatorKind>,
108    /// `SeniorBridge` registry ID. `None` — no bridge; `Some(id)` —
109    /// attach a bridge previously registered via
110    /// `engine.register_senior_bridge`.
111    pub bridge_id: Option<String>,
112    /// `SpawnHook` registry ID. Same shape as above, via
113    /// `engine.register_spawn_hook`.
114    pub hook_id: Option<String>,
115    /// Operator registry ID — used on the path that hands the whole
116    /// spawn off to an external Operator. Name previously registered
117    /// with `engine.register_operator`; resolved by
118    /// `OperatorDelegateMiddleware`, which — for `kind = MainAi` or
119    /// `Composite` — bypasses `inner.spawn` and calls
120    /// `operator.execute`.
121    pub operator_backend_id: Option<String>,
122    /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
123    /// cascade — per-agent override, keyed by `AgentDef.name`. Empty by
124    /// default (no override for any agent). See
125    /// `crate::core::ctx::collapse_operator_kind` for the full tier list.
126    pub operator_kind_overrides: HashMap<String, OperatorKind>,
127    /// The initial `ctx` (JSON `Value`) that flow.ir's `eval_async`
128    /// starts from. Every `Step.in` `$.<path>` reference reads from
129    /// here.
130    pub init_ctx: Value,
131}
132
133impl TaskLaunchInput {
134    /// Helper for existing callers on the default path — no hooks and no
135    /// per-agent `OperatorKind` overrides. Leaves the "Runtime Global" tier
136    /// unspecified (`None`), so the BP-level tiers / final default
137    /// (`OperatorKind::Automate`) decide — this preserves today's
138    /// behaviour for every existing caller without silently forcing
139    /// `Automate` as an explicit override that would outrank a BP-declared
140    /// `MainAi`/`Composite` kind.
141    pub fn automate(
142        blueprint: Blueprint,
143        operator_id: impl Into<String>,
144        role: Role,
145        ttl: Duration,
146        init_ctx: Value,
147    ) -> Self {
148        Self {
149            blueprint,
150            operator_id: operator_id.into(),
151            role,
152            ttl,
153            operator_kind: None,
154            bridge_id: None,
155            hook_id: None,
156            operator_backend_id: None,
157            operator_kind_overrides: HashMap::new(),
158            init_ctx,
159        }
160    }
161}
162
163/// Result of a successful [`TaskLaunchService::launch`] call.
164#[derive(Debug, Clone)]
165pub struct TaskLaunchOutput {
166    /// The capability token for the attached session.
167    pub token: CapToken,
168    /// The final `ctx` after the flow ran — every `Step.out` has
169    /// been written. Application-layer callers pull the outcome out
170    /// of this `Value` and fold it into a domain status.
171    pub final_ctx: Value,
172}
173
174/// Domain service that compiles, links, and runs a Blueprint's flow to
175/// completion through the [`Engine`]. See the module doc for the full
176/// responsibility list.
177pub struct TaskLaunchService {
178    engine: Engine,
179    compiler: Compiler,
180}
181
182impl TaskLaunchService {
183    /// Build a service bound to one `Engine` and one `Compiler`.
184    pub fn new(engine: Engine, compiler: Compiler) -> Self {
185        Self { engine, compiler }
186    }
187
188    /// The bound `Engine`.
189    pub fn engine(&self) -> &Engine {
190        &self.engine
191    }
192
193    /// The bound `Compiler`.
194    pub fn compiler(&self) -> &Compiler {
195        &self.compiler
196    }
197
198    /// Run the Blueprint's flow to completion and return the final
199    /// `ctx`.
200    ///
201    /// Failure paths:
202    ///
203    /// - `compiler.compile` failure → `TaskLaunchError::Compile`.
204    /// - `engine.attach` failure → `TaskLaunchError::Engine`.
205    /// - A `Step` inside `flow eval` producing a dispatcher error, or
206    ///   a sub-flow raising, → `TaskLaunchError::FlowEval`. There is
207    ///   no silent partial-success completion; failures always
208    ///   propagate.
209    pub async fn launch(
210        &self,
211        input: TaskLaunchInput,
212    ) -> Result<TaskLaunchOutput, TaskLaunchError> {
213        // After the stateless-executor refactor, the
214        // caller (Service) does compile + link +
215        // `EngineDispatcher::with_spawner` itself; the engine no longer
216        // holds any global spawner state to touch. The link path (base
217        // `SpawnerAdapter` +
218        // `LayerRegistry` resolution + `SpawnerStack` wrapping) is
219        // concentrated inside `service::linker::link` — Service
220        // scatter is intentionally prevented.
221        let compiled = self.compiler.compile(&input.blueprint)?;
222        let spawner = linker::link(
223            compiled.router.clone(),
224            &input.blueprint.spawner_hints.layers,
225            &self.engine,
226        );
227        // When `Blueprint.metadata.project_name_alias` is Some, layer a
228        // `ProjectNameAliasMiddleware` on top of the stack that injects the
229        // alias into `Ctx.meta.runtime.project_name_alias` just before spawn.
230        // Downstream operators (for example, the server crate's
231        // `Operator.execute`) read `ctx.meta.runtime.get("project_name_alias")`
232        // and expand it into the Spawn directive prompt body.
233        let spawner = if let Some(alias) = input.blueprint.metadata.project_name_alias.as_deref() {
234            SpawnerStack::new(spawner)
235                .layer(ProjectNameAliasMiddleware::new(alias))
236                .build()
237        } else {
238            spawner
239        };
240
241        // "BP Agent-level" (`OperatorDef.kind` via `operator_ref`) + "BP
242        // Global" (`Blueprint.default_operator_kind`) tiers of the
243        // `OperatorKind` cascade, baked here (the only point that has both
244        // the resolved Blueprint and the launch-time overrides in scope).
245        let bp_agent_kinds = derive_bp_agent_kinds(&input.blueprint);
246        let bp_global_kind = input
247            .blueprint
248            .default_operator_kind
249            .map(OperatorKind::from);
250
251        let token = self
252            .engine
253            .attach_with_ids(
254                input.operator_id,
255                input.role,
256                input.ttl,
257                input.operator_kind,
258                input.bridge_id,
259                input.hook_id,
260                input.operator_backend_id,
261                input.operator_kind_overrides,
262                bp_agent_kinds,
263                bp_global_kind,
264            )
265            .await?;
266        let dispatcher =
267            EngineDispatcher::with_spawner(self.engine.clone(), token.clone(), spawner);
268        let final_ctx =
269            mlua_flow_ir::eval_async(&input.blueprint.flow, input.init_ctx, &dispatcher)
270                .await
271                .map_err(|e| TaskLaunchError::FlowEval(e.to_string()))?;
272        Ok(TaskLaunchOutput { token, final_ctx })
273    }
274}
275
276// ──────────────────────────────────────────────────────────────────────────
277// UT
278// ──────────────────────────────────────────────────────────────────────────
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::blueprint::compiler::{RustFnInProcessSpawnerFactory, SpawnerRegistry};
284    use crate::blueprint::{
285        current_schema_version, AgentDef, AgentKind, AgentMeta, BlueprintMetadata, CompilerHints,
286        CompilerStrategy,
287    };
288    use crate::core::config::EngineCfg;
289    use crate::worker::adapter::{WorkerError, WorkerResult};
290    use mlua_flow_ir::{Expr, JoinMode, Node as FlowNode};
291    use serde_json::json;
292    use std::sync::Arc;
293
294    fn path(s: &str) -> Expr {
295        Expr::Path { at: s.to_string() }
296    }
297    fn step(ref_: &str, in_: Expr, out: Expr) -> FlowNode {
298        FlowNode::Step {
299            ref_: ref_.to_string(),
300            in_,
301            out,
302        }
303    }
304
305    fn agent(name: &str, fn_id: &str) -> AgentDef {
306        AgentDef {
307            name: name.to_string(),
308            kind: AgentKind::RustFn,
309            spec: json!({ "fn_id": fn_id }),
310            profile: None,
311            meta: Some(AgentMeta::default()),
312        }
313    }
314
315    fn build_service(factory: RustFnInProcessSpawnerFactory) -> TaskLaunchService {
316        let engine = Engine::new(EngineCfg::default());
317        let mut reg = SpawnerRegistry::new();
318        reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(factory));
319        let compiler = Compiler::new(reg);
320        TaskLaunchService::new(engine, compiler)
321    }
322
323    fn bp(flow: FlowNode, agents: Vec<AgentDef>) -> Blueprint {
324        Blueprint {
325            schema_version: current_schema_version(),
326            id: "ut".into(),
327            flow,
328            agents,
329            operators: vec![],
330            hints: CompilerHints::default(),
331            strategy: CompilerStrategy::default(),
332            metadata: BlueprintMetadata::default(),
333            spawner_hints: Default::default(),
334            default_agent_kind: AgentKind::Operator,
335            default_operator_kind: None,
336        }
337    }
338
339    fn launch_input(blueprint: Blueprint, init_ctx: Value) -> TaskLaunchInput {
340        TaskLaunchInput::automate(
341            blueprint,
342            "ut-op",
343            Role::Operator,
344            Duration::from_secs(30),
345            init_ctx,
346        )
347    }
348
349    #[tokio::test]
350    async fn launch_single_step_writes_out_path() {
351        let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
352            Ok(WorkerResult {
353                value: json!({ "echoed": inv.prompt }),
354                ok: true,
355            })
356        });
357        let svc = build_service(factory);
358        let blueprint = bp(
359            step("echo", path("$.input"), path("$.out")),
360            vec![agent("echo", "echo")],
361        );
362        let out = svc
363            .launch(launch_input(blueprint, json!({ "input": "hi" })))
364            .await
365            .expect("launch ok");
366        assert_eq!(out.final_ctx["out"]["echoed"], "hi");
367    }
368
369    #[tokio::test]
370    async fn launch_three_step_seq_threads_ctx_forward() {
371        let factory = RustFnInProcessSpawnerFactory::new()
372            .register_fn("upper", |inv| async move {
373                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
374                Ok(WorkerResult {
375                    value: json!(s.to_uppercase()),
376                    ok: true,
377                })
378            })
379            .register_fn("suffix", |inv| async move {
380                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
381                Ok(WorkerResult {
382                    value: json!(format!("{s}!")),
383                    ok: true,
384                })
385            })
386            .register_fn("wrap", |inv| async move {
387                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
388                Ok(WorkerResult {
389                    value: json!(format!("[{s}]")),
390                    ok: true,
391                })
392            });
393        let svc = build_service(factory);
394        let flow = FlowNode::Seq {
395            children: vec![
396                step("upper", path("$.in"), path("$.s1")),
397                step("suffix", path("$.s1"), path("$.s2")),
398                step("wrap", path("$.s2"), path("$.s3")),
399            ],
400        };
401        let blueprint = bp(
402            flow,
403            vec![
404                agent("upper", "upper"),
405                agent("suffix", "suffix"),
406                agent("wrap", "wrap"),
407            ],
408        );
409        let out = svc
410            .launch(launch_input(blueprint, json!({ "in": "hello" })))
411            .await
412            .expect("launch ok");
413        assert_eq!(out.final_ctx["s1"], "HELLO");
414        assert_eq!(out.final_ctx["s2"], "HELLO!");
415        assert_eq!(out.final_ctx["s3"], "[HELLO!]");
416    }
417
418    #[tokio::test]
419    async fn launch_fanout_join_all_parallel_completes() {
420        use std::sync::atomic::{AtomicU32, Ordering};
421        let counter = Arc::new(AtomicU32::new(0));
422        let max_seen = Arc::new(AtomicU32::new(0));
423        let counter_clone = counter.clone();
424        let max_clone = max_seen.clone();
425
426        // Each worker bumps the inflight counter up, sleeps 50ms, then bumps it down.
427        // When parallel execution is working, max inflight exceeds 1.
428        let factory = RustFnInProcessSpawnerFactory::new().register_fn("para", move |inv| {
429            let counter = counter_clone.clone();
430            let max_seen = max_clone.clone();
431            async move {
432                let now = counter.fetch_add(1, Ordering::SeqCst) + 1;
433                let mut prev = max_seen.load(Ordering::SeqCst);
434                while now > prev {
435                    match max_seen.compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst) {
436                        Ok(_) => break,
437                        Err(p) => prev = p,
438                    }
439                }
440                tokio::time::sleep(Duration::from_millis(50)).await;
441                counter.fetch_sub(1, Ordering::SeqCst);
442                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
443                Ok(WorkerResult {
444                    value: json!(format!("did:{s}")),
445                    ok: true,
446                })
447            }
448        });
449        let svc = build_service(factory);
450        let flow = FlowNode::Fanout {
451            items: path("$.items"),
452            bind: path("$.item"),
453            body: Box::new(step("para", path("$.item"), path("$.r"))),
454            join: JoinMode::All,
455            out: path("$.results"),
456        };
457        let blueprint = bp(flow, vec![agent("para", "para")]);
458        let out = svc
459            .launch(launch_input(
460                blueprint,
461                json!({ "items": ["a", "b", "c", "d"] }),
462            ))
463            .await
464            .expect("launch ok");
465        let results = out.final_ctx["results"].as_array().expect("array");
466        assert_eq!(results.len(), 4);
467        for (i, expected) in ["a", "b", "c", "d"].iter().enumerate() {
468            assert_eq!(results[i]["r"], json!(format!("did:{expected}")));
469        }
470        let max = max_seen.load(Ordering::SeqCst);
471        assert!(
472            max >= 2,
473            "expected parallel execution (max inflight >= 2), got {max}"
474        );
475    }
476
477    #[tokio::test]
478    async fn launch_propagates_worker_error_as_flow_eval_err() {
479        let factory = RustFnInProcessSpawnerFactory::new()
480            .register_fn("ok", |inv| async move {
481                Ok(WorkerResult {
482                    value: json!(inv.prompt),
483                    ok: true,
484                })
485            })
486            .register_fn("boom", |_inv| async move {
487                Err(WorkerError::Failed("intentional boom".into()))
488            });
489        let svc = build_service(factory);
490        let flow = FlowNode::Seq {
491            children: vec![
492                step("ok", path("$.input"), path("$.s1")),
493                step("boom", path("$.s1"), path("$.s2")),
494                step("ok", path("$.s2"), path("$.s3")),
495            ],
496        };
497        let blueprint = bp(flow, vec![agent("ok", "ok"), agent("boom", "boom")]);
498        let err = svc
499            .launch(launch_input(blueprint, json!({ "input": "x" })))
500            .await
501            .expect_err("expected fail");
502        match err {
503            TaskLaunchError::FlowEval(msg) => {
504                assert!(
505                    msg.contains("boom") || msg.contains("intentional"),
506                    "expected error to mention worker failure, got: {msg}"
507                );
508            }
509            other => panic!("expected FlowEval error, got {other:?}"),
510        }
511    }
512}