Skip to main content

mlua_swarm/service/
task_launch.rs

1//! `TaskLaunchService` — the domain service that runs a Blueprint flow
2//! to completion through the engine.
3//!
4//! Responsibilities:
5//! 1. Compile the Blueprint and link it into a `SpawnerAdapter` (via
6//!    `service::linker::link`, wrapped by `EngineDispatcher::with_spawner`).
7//! 2. Acquire an Operator session (via `engine.attach`).
8//! 3. Run flow.ir's `eval_async_externs` through an `EngineDispatcher`
9//!    (threading the service-held `call_extern` registry) and return
10//!    the final `ctx`.
11//! 4. If any step fails (dispatcher error), the eval errors and
12//!    the failure propagates as-is.
13//!
14//! Callers on the Application layer never touch the engine directly —
15//! `bind`, `start_task`, and `eval_async` all stay inside the Service.
16//!
17//! A single-task-spawn API (calling `start_task` directly) is
18//! deliberately absent here: a single spawn can be modeled as a
19//! one-Step flow, and we do not want two interfaces for the same
20//! shape.
21
22use crate::blueprint::compiler::{CompileError, Compiler};
23use crate::blueprint::{Blueprint, EngineDispatcher};
24use crate::core::ctx::OperatorKind;
25use crate::core::engine::Engine;
26use crate::core::errors::EngineError;
27use crate::middleware::project_name_alias::ProjectNameAliasMiddleware;
28use crate::middleware::SpawnerStack;
29use crate::service::linker;
30use crate::types::{CapToken, Role};
31use mlua_flow_ir::{Externs, NoExterns};
32use serde_json::Value;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use thiserror::Error;
37
38/// Derive the "BP Agent-level" tier of the `OperatorKind` cascade from a
39/// Blueprint: for every `AgentDef` whose `spec.operator_ref` resolves to an
40/// `OperatorDef` with a `Some` `kind`, map `AgentDef.name -> OperatorKind`.
41///
42/// Deliberately **not** filtered by `AgentDef.kind == AgentKind::Operator`:
43/// the `OperatorKind` cascade is a middleware-level cross-cutting concern
44/// (spawn_hook / senior_bridge / operator-delegate gating via `Ctx.operator`),
45/// orthogonal to the Worker IMPL axis that `AgentKind` expresses (see the
46/// crate root doc, "Operator is delivered as a cross-cutting overlay through
47/// `Ctx` plus middleware"). A `RustFn` / `Lua` / `Subprocess` agent can
48/// equally declare `spec.operator_ref` to opt into a BP-declared
49/// `OperatorKind` without changing its Worker IMPL. Agents without an
50/// `operator_ref`, an unresolved `operator_ref`, or an `OperatorDef.kind =
51/// None` are simply absent from the map (= that tier falls through for
52/// them). This is a separate, independent consumer of `Blueprint.operators`
53/// from the design-time `operator_ref` validation in
54/// `blueprint::compiler::Compiler::compile` (issue: `OperatorDef`
55/// first-class treatment), which only checks the reference resolves for
56/// `AgentKind::Operator` agents and is unaffected by this function.
57fn derive_bp_agent_kinds(blueprint: &Blueprint) -> HashMap<String, OperatorKind> {
58    let mut out = HashMap::new();
59    if blueprint.operators.is_empty() {
60        return out;
61    }
62    for agent in &blueprint.agents {
63        let Some(op_ref) = agent.spec.get("operator_ref").and_then(|v| v.as_str()) else {
64            continue;
65        };
66        let Some(op_def) = blueprint.operators.iter().find(|o| o.name == op_ref) else {
67            continue;
68        };
69        if let Some(kind) = op_def.kind {
70            out.insert(agent.name.clone(), OperatorKind::from(kind));
71        }
72    }
73    out
74}
75
76/// Failure modes of [`TaskLaunchService::launch`].
77#[derive(Debug, Error)]
78pub enum TaskLaunchError {
79    /// `Compiler::compile` rejected the Blueprint.
80    #[error("compile: {0}")]
81    Compile(#[from] CompileError),
82    /// `Engine::attach_with_ids` failed.
83    #[error("engine: {0}")]
84    Engine(#[from] EngineError),
85    /// A `Step` inside `flow.ir`'s `eval_async` produced a dispatcher
86    /// error, or a sub-flow raised.
87    #[error("flow eval: {0}")]
88    FlowEval(String),
89}
90
91/// Input to [`TaskLaunchService::launch`].
92#[derive(Debug, Clone)]
93pub struct TaskLaunchInput {
94    /// The Blueprint to compile, link, and run.
95    pub blueprint: Blueprint,
96    /// Caller-supplied id for the Operator that owns this run.
97    pub operator_id: String,
98    /// The Operator's role for this run.
99    pub role: Role,
100    /// How long the attached session is allowed to live.
101    pub ttl: Duration,
102    /// "Runtime Global" tier of the `OperatorKind` cascade. `Some(_)` is
103    /// always an explicit request — including `Some(OperatorKind::Automate)`
104    /// — that outranks the BP-level tiers (`OperatorDef.kind` /
105    /// `Blueprint.default_operator_kind`); `None` leaves it unspecified so
106    /// those tiers / the final default decide. Under `MainAi` or
107    /// `Composite`, `MainAIMiddleware`'s `spawn_hook` before/after
108    /// callbacks become effective. See
109    /// `crate::core::ctx::collapse_operator_kind`.
110    pub operator_kind: Option<OperatorKind>,
111    /// `SeniorBridge` registry ID. `None` — no bridge; `Some(id)` —
112    /// attach a bridge previously registered via
113    /// `engine.register_senior_bridge`.
114    pub bridge_id: Option<String>,
115    /// `SpawnHook` registry ID. Same shape as above, via
116    /// `engine.register_spawn_hook`.
117    pub hook_id: Option<String>,
118    /// Operator registry ID — used on the path that hands the whole
119    /// spawn off to an external Operator. Name previously registered
120    /// with `engine.register_operator`; resolved by
121    /// `OperatorDelegateMiddleware`, which — for `kind = MainAi` or
122    /// `Composite` — bypasses `inner.spawn` and calls
123    /// `operator.execute`.
124    pub operator_backend_id: Option<String>,
125    /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
126    /// cascade — per-agent override, keyed by `AgentDef.name`. Empty by
127    /// default (no override for any agent). See
128    /// `crate::core::ctx::collapse_operator_kind` for the full tier list.
129    pub operator_kind_overrides: HashMap<String, OperatorKind>,
130    /// The initial `ctx` (JSON `Value`) that flow.ir's `eval_async`
131    /// starts from. Every `Step.in` `$.<path>` reference reads from
132    /// here.
133    pub init_ctx: Value,
134}
135
136impl TaskLaunchInput {
137    /// Helper for existing callers on the default path — no hooks and no
138    /// per-agent `OperatorKind` overrides. Leaves the "Runtime Global" tier
139    /// unspecified (`None`), so the BP-level tiers / final default
140    /// (`OperatorKind::Automate`) decide — this preserves today's
141    /// behaviour for every existing caller without silently forcing
142    /// `Automate` as an explicit override that would outrank a BP-declared
143    /// `MainAi`/`Composite` kind.
144    pub fn automate(
145        blueprint: Blueprint,
146        operator_id: impl Into<String>,
147        role: Role,
148        ttl: Duration,
149        init_ctx: Value,
150    ) -> Self {
151        Self {
152            blueprint,
153            operator_id: operator_id.into(),
154            role,
155            ttl,
156            operator_kind: None,
157            bridge_id: None,
158            hook_id: None,
159            operator_backend_id: None,
160            operator_kind_overrides: HashMap::new(),
161            init_ctx,
162        }
163    }
164}
165
166/// Result of a successful [`TaskLaunchService::launch`] call.
167#[derive(Debug, Clone)]
168pub struct TaskLaunchOutput {
169    /// The capability token for the attached session.
170    pub token: CapToken,
171    /// The final `ctx` after the flow ran — every `Step.out` has
172    /// been written. Application-layer callers pull the outcome out
173    /// of this `Value` and fold it into a domain status.
174    pub final_ctx: Value,
175}
176
177/// Domain service that compiles, links, and runs a Blueprint's flow to
178/// completion through the [`Engine`]. See the module doc for the full
179/// responsibility list.
180pub struct TaskLaunchService {
181    engine: Engine,
182    compiler: Compiler,
183    /// `call_extern` registry threaded into flow eval. Defaults to
184    /// [`NoExterns`] (= every `call_extern` in a Blueprint raises
185    /// `ExternError`); hosts opt in via [`Self::with_externs`] with an
186    /// `ExternMap` of pure value-shape functions.
187    externs: Arc<dyn Externs + Send + Sync>,
188}
189
190impl TaskLaunchService {
191    /// Build a service bound to one `Engine` and one `Compiler`.
192    pub fn new(engine: Engine, compiler: Compiler) -> Self {
193        Self {
194            engine,
195            compiler,
196            externs: Arc::new(NoExterns),
197        }
198    }
199
200    /// Replace the `call_extern` registry (builder style). Entries MUST be
201    /// pure functions — no side effects, no flow control; effectful work
202    /// belongs to `Step` / agents, not externs (flow-ir canonical contract).
203    pub fn with_externs(mut self, externs: Arc<dyn Externs + Send + Sync>) -> Self {
204        self.externs = externs;
205        self
206    }
207
208    /// The bound `Engine`.
209    pub fn engine(&self) -> &Engine {
210        &self.engine
211    }
212
213    /// The bound `Compiler`.
214    pub fn compiler(&self) -> &Compiler {
215        &self.compiler
216    }
217
218    /// Run the Blueprint's flow to completion and return the final
219    /// `ctx`.
220    ///
221    /// Failure paths:
222    ///
223    /// - `compiler.compile` failure → `TaskLaunchError::Compile`.
224    /// - `engine.attach` failure → `TaskLaunchError::Engine`.
225    /// - A `Step` inside `flow eval` producing a dispatcher error, or
226    ///   a sub-flow raising, → `TaskLaunchError::FlowEval`. There is
227    ///   no silent partial-success completion; failures always
228    ///   propagate.
229    pub async fn launch(
230        &self,
231        input: TaskLaunchInput,
232    ) -> Result<TaskLaunchOutput, TaskLaunchError> {
233        // After the stateless-executor refactor, the
234        // caller (Service) does compile + link +
235        // `EngineDispatcher::with_spawner` itself; the engine no longer
236        // holds any global spawner state to touch. The link path (base
237        // `SpawnerAdapter` +
238        // `LayerRegistry` resolution + `SpawnerStack` wrapping) is
239        // concentrated inside `service::linker::link` — Service
240        // scatter is intentionally prevented.
241        let compiled = self.compiler.compile(&input.blueprint)?;
242        let spawner = linker::link(
243            compiled.router.clone(),
244            &input.blueprint.spawner_hints.layers,
245            &self.engine,
246        );
247        // When `Blueprint.metadata.project_name_alias` is Some, layer a
248        // `ProjectNameAliasMiddleware` on top of the stack that injects the
249        // alias into `Ctx.meta.runtime.project_name_alias` just before spawn.
250        // Downstream operators (for example, the server crate's
251        // `Operator.execute`) read `ctx.meta.runtime.get("project_name_alias")`
252        // and expand it into the Spawn directive prompt body.
253        let spawner = if let Some(alias) = input.blueprint.metadata.project_name_alias.as_deref() {
254            SpawnerStack::new(spawner)
255                .layer(ProjectNameAliasMiddleware::new(alias))
256                .build()
257        } else {
258            spawner
259        };
260
261        // "BP Agent-level" (`OperatorDef.kind` via `operator_ref`) + "BP
262        // Global" (`Blueprint.default_operator_kind`) tiers of the
263        // `OperatorKind` cascade, baked here (the only point that has both
264        // the resolved Blueprint and the launch-time overrides in scope).
265        let bp_agent_kinds = derive_bp_agent_kinds(&input.blueprint);
266        let bp_global_kind = input
267            .blueprint
268            .default_operator_kind
269            .map(OperatorKind::from);
270
271        let token = self
272            .engine
273            .attach_with_ids(
274                input.operator_id,
275                input.role,
276                input.ttl,
277                input.operator_kind,
278                input.bridge_id,
279                input.hook_id,
280                input.operator_backend_id,
281                input.operator_kind_overrides,
282                bp_agent_kinds,
283                bp_global_kind,
284            )
285            .await?;
286        let dispatcher =
287            EngineDispatcher::with_spawner(self.engine.clone(), token.clone(), spawner);
288        let final_ctx = mlua_flow_ir::eval_async_externs(
289            &input.blueprint.flow,
290            input.init_ctx,
291            &dispatcher,
292            &*self.externs,
293        )
294        .await
295        .map_err(|e| TaskLaunchError::FlowEval(e.to_string()))?;
296        Ok(TaskLaunchOutput { token, final_ctx })
297    }
298}
299
300// ──────────────────────────────────────────────────────────────────────────
301// UT
302// ──────────────────────────────────────────────────────────────────────────
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::blueprint::compiler::{RustFnInProcessSpawnerFactory, SpawnerRegistry};
308    use crate::blueprint::{
309        current_schema_version, AgentDef, AgentKind, AgentMeta, BlueprintMetadata, CompilerHints,
310        CompilerStrategy,
311    };
312    use crate::core::config::EngineCfg;
313    use crate::worker::adapter::{WorkerError, WorkerResult};
314    use mlua_flow_ir::{Expr, JoinMode, Node as FlowNode};
315    use serde_json::json;
316    use std::sync::Arc;
317
318    fn path(s: &str) -> Expr {
319        Expr::Path { at: s.to_string() }
320    }
321    fn step(ref_: &str, in_: Expr, out: Expr) -> FlowNode {
322        FlowNode::Step {
323            ref_: ref_.to_string(),
324            in_,
325            out,
326        }
327    }
328
329    fn agent(name: &str, fn_id: &str) -> AgentDef {
330        AgentDef {
331            name: name.to_string(),
332            kind: AgentKind::RustFn,
333            spec: json!({ "fn_id": fn_id }),
334            profile: None,
335            meta: Some(AgentMeta::default()),
336        }
337    }
338
339    fn build_service(factory: RustFnInProcessSpawnerFactory) -> TaskLaunchService {
340        let engine = Engine::new(EngineCfg::default());
341        let mut reg = SpawnerRegistry::new();
342        reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(factory));
343        let compiler = Compiler::new(reg);
344        TaskLaunchService::new(engine, compiler)
345    }
346
347    fn bp(flow: FlowNode, agents: Vec<AgentDef>) -> Blueprint {
348        Blueprint {
349            schema_version: current_schema_version(),
350            id: "ut".into(),
351            flow,
352            agents,
353            operators: vec![],
354            hints: CompilerHints::default(),
355            strategy: CompilerStrategy::default(),
356            metadata: BlueprintMetadata::default(),
357            spawner_hints: Default::default(),
358            default_agent_kind: AgentKind::Operator,
359            default_operator_kind: None,
360        }
361    }
362
363    fn launch_input(blueprint: Blueprint, init_ctx: Value) -> TaskLaunchInput {
364        TaskLaunchInput::automate(
365            blueprint,
366            "ut-op",
367            Role::Operator,
368            Duration::from_secs(30),
369            init_ctx,
370        )
371    }
372
373    #[tokio::test]
374    async fn launch_single_step_writes_out_path() {
375        let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
376            Ok(WorkerResult {
377                value: json!({ "echoed": inv.prompt }),
378                ok: true,
379            })
380        });
381        let svc = build_service(factory);
382        let blueprint = bp(
383            step("echo", path("$.input"), path("$.out")),
384            vec![agent("echo", "echo")],
385        );
386        let out = svc
387            .launch(launch_input(blueprint, json!({ "input": "hi" })))
388            .await
389            .expect("launch ok");
390        assert_eq!(out.final_ctx["out"]["echoed"], "hi");
391    }
392
393    #[tokio::test]
394    async fn launch_three_step_seq_threads_ctx_forward() {
395        let factory = RustFnInProcessSpawnerFactory::new()
396            .register_fn("upper", |inv| async move {
397                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
398                Ok(WorkerResult {
399                    value: json!(s.to_uppercase()),
400                    ok: true,
401                })
402            })
403            .register_fn("suffix", |inv| async move {
404                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
405                Ok(WorkerResult {
406                    value: json!(format!("{s}!")),
407                    ok: true,
408                })
409            })
410            .register_fn("wrap", |inv| async move {
411                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
412                Ok(WorkerResult {
413                    value: json!(format!("[{s}]")),
414                    ok: true,
415                })
416            });
417        let svc = build_service(factory);
418        let flow = FlowNode::Seq {
419            children: vec![
420                step("upper", path("$.in"), path("$.s1")),
421                step("suffix", path("$.s1"), path("$.s2")),
422                step("wrap", path("$.s2"), path("$.s3")),
423            ],
424        };
425        let blueprint = bp(
426            flow,
427            vec![
428                agent("upper", "upper"),
429                agent("suffix", "suffix"),
430                agent("wrap", "wrap"),
431            ],
432        );
433        let out = svc
434            .launch(launch_input(blueprint, json!({ "in": "hello" })))
435            .await
436            .expect("launch ok");
437        assert_eq!(out.final_ctx["s1"], "HELLO");
438        assert_eq!(out.final_ctx["s2"], "HELLO!");
439        assert_eq!(out.final_ctx["s3"], "[HELLO!]");
440    }
441
442    #[tokio::test]
443    async fn launch_fanout_join_all_parallel_completes() {
444        use std::sync::atomic::{AtomicU32, Ordering};
445        let counter = Arc::new(AtomicU32::new(0));
446        let max_seen = Arc::new(AtomicU32::new(0));
447        let counter_clone = counter.clone();
448        let max_clone = max_seen.clone();
449
450        // Each worker bumps the inflight counter up, sleeps 50ms, then bumps it down.
451        // When parallel execution is working, max inflight exceeds 1.
452        let factory = RustFnInProcessSpawnerFactory::new().register_fn("para", move |inv| {
453            let counter = counter_clone.clone();
454            let max_seen = max_clone.clone();
455            async move {
456                let now = counter.fetch_add(1, Ordering::SeqCst) + 1;
457                let mut prev = max_seen.load(Ordering::SeqCst);
458                while now > prev {
459                    match max_seen.compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst) {
460                        Ok(_) => break,
461                        Err(p) => prev = p,
462                    }
463                }
464                tokio::time::sleep(Duration::from_millis(50)).await;
465                counter.fetch_sub(1, Ordering::SeqCst);
466                let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
467                Ok(WorkerResult {
468                    value: json!(format!("did:{s}")),
469                    ok: true,
470                })
471            }
472        });
473        let svc = build_service(factory);
474        let flow = FlowNode::Fanout {
475            items: path("$.items"),
476            bind: path("$.item"),
477            body: Box::new(step("para", path("$.item"), path("$.r"))),
478            join: JoinMode::All,
479            out: path("$.results"),
480        };
481        let blueprint = bp(flow, vec![agent("para", "para")]);
482        let out = svc
483            .launch(launch_input(
484                blueprint,
485                json!({ "items": ["a", "b", "c", "d"] }),
486            ))
487            .await
488            .expect("launch ok");
489        let results = out.final_ctx["results"].as_array().expect("array");
490        assert_eq!(results.len(), 4);
491        for (i, expected) in ["a", "b", "c", "d"].iter().enumerate() {
492            assert_eq!(results[i]["r"], json!(format!("did:{expected}")));
493        }
494        let max = max_seen.load(Ordering::SeqCst);
495        assert!(
496            max >= 2,
497            "expected parallel execution (max inflight >= 2), got {max}"
498        );
499    }
500
501    #[tokio::test]
502    async fn launch_propagates_worker_error_as_flow_eval_err() {
503        let factory = RustFnInProcessSpawnerFactory::new()
504            .register_fn("ok", |inv| async move {
505                Ok(WorkerResult {
506                    value: json!(inv.prompt),
507                    ok: true,
508                })
509            })
510            .register_fn("boom", |_inv| async move {
511                Err(WorkerError::Failed("intentional boom".into()))
512            });
513        let svc = build_service(factory);
514        let flow = FlowNode::Seq {
515            children: vec![
516                step("ok", path("$.input"), path("$.s1")),
517                step("boom", path("$.s1"), path("$.s2")),
518                step("ok", path("$.s2"), path("$.s3")),
519            ],
520        };
521        let blueprint = bp(flow, vec![agent("ok", "ok"), agent("boom", "boom")]);
522        let err = svc
523            .launch(launch_input(blueprint, json!({ "input": "x" })))
524            .await
525            .expect_err("expected fail");
526        match err {
527            TaskLaunchError::FlowEval(msg) => {
528                assert!(
529                    msg.contains("boom") || msg.contains("intentional"),
530                    "expected error to mention worker failure, got: {msg}"
531                );
532            }
533            other => panic!("expected FlowEval error, got {other:?}"),
534        }
535    }
536
537    #[tokio::test]
538    async fn launch_resolves_call_extern_via_registered_externs() {
539        let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
540            Ok(WorkerResult {
541                value: json!({ "echoed": inv.prompt }),
542                ok: true,
543            })
544        });
545        let mut externs = mlua_flow_ir::ExternMap::new();
546        externs.register("fmt.greet", |args: &[Value]| {
547            let name = args[0].as_str().unwrap_or("?");
548            Ok(json!(format!("hello, {name}")))
549        });
550        let svc = build_service(factory).with_externs(Arc::new(externs));
551        let flow = step(
552            "echo",
553            Expr::CallExtern {
554                ref_: "fmt.greet".into(),
555                args: vec![path("$.who")],
556            },
557            path("$.out"),
558        );
559        let blueprint = bp(flow, vec![agent("echo", "echo")]);
560        let out = svc
561            .launch(launch_input(blueprint, json!({ "who": "swarm" })))
562            .await
563            .expect("launch ok");
564        assert_eq!(out.final_ctx["out"]["echoed"], json!("hello, swarm"));
565    }
566
567    #[tokio::test]
568    async fn launch_call_extern_without_registry_fails_as_flow_eval() {
569        let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
570            Ok(WorkerResult {
571                value: json!(inv.prompt),
572                ok: true,
573            })
574        });
575        let svc = build_service(factory); // default NoExterns
576        let flow = step(
577            "echo",
578            Expr::CallExtern {
579                ref_: "fmt.greet".into(),
580                args: vec![],
581            },
582            path("$.out"),
583        );
584        let blueprint = bp(flow, vec![agent("echo", "echo")]);
585        let err = svc
586            .launch(launch_input(blueprint, json!({})))
587            .await
588            .expect_err("expected fail");
589        match err {
590            TaskLaunchError::FlowEval(msg) => {
591                assert!(msg.contains("extern"), "expected extern error, got: {msg}");
592            }
593            other => panic!("expected FlowEval error, got {other:?}"),
594        }
595    }
596}