Skip to main content

mlua_swarm/
blueprint.rs

1//! Blueprint runner — glue that executes a flow.ir AST
2//! (`mlua_flow_ir::Node`) through the engine. Each `Step.ref` is run as a
3//! single task via `start_task` + `dispatch_attempt_with`, and the
4//! resulting `Pass` `Value` is written back to `Step.out`.
5//!
6//! **Fully-async chain.** Uses `mlua_flow_ir::eval_async` and
7//! `AsyncDispatcher`; `block_on` and `spawn_blocking` are never mixed in,
8//! so the whole stack stays consistent with the engine's tokio async
9//! world.
10//!
11//! # Usage
12//!
13//! ```ignore
14//! let dispatcher = EngineDispatcher::with_spawner(engine.clone(), op_token, spawner);
15//! let bp: mlua_flow_ir::Node = serde_json::from_str(BP_JSON)?;
16//! let final_ctx = mlua_flow_ir::eval_async(&bp, init_ctx, &dispatcher).await?;
17//! ```
18//!
19//! # Schema types (the IF crate)
20//!
21//! `Blueprint` / `AgentDef` / `AgentKind` and friends live in the
22//! `mlua_swarm_schema` crate and are re-exported from here.
23//! The `struct`/`enum` set that used to live directly in `src/blueprint.rs`
24//! has been moved into the IF crate to support extension discipline,
25//! versioning, and external consumers.
26
27use crate::core::engine::Engine;
28use crate::core::state::{DispatchOutcome, TaskSpec};
29use crate::types::CapToken;
30use crate::worker::adapter::SpawnerAdapter;
31use async_trait::async_trait;
32pub mod compiler;
33pub mod loader;
34pub mod store;
35
36use mlua_flow_ir::{AsyncDispatcher, EvalError};
37use serde_json::Value;
38use std::sync::Arc;
39
40// The schema types are owned by the IF crate (mlua-swarm-schema); we re-export them here.
41/// The schema-side `OperatorKind` (see `crate::core::ctx::OperatorKind` for the
42/// runtime duplicate consumed by `Engine`). Re-exported under an explicit
43/// alias so callers reading `Blueprint.operators[].kind` /
44/// `Blueprint.default_operator_kind` do not have to reach into
45/// `mlua_swarm_schema` directly.
46pub use mlua_swarm_schema::OperatorKind as SchemaOperatorKind;
47pub use mlua_swarm_schema::{
48    current_schema_version, default_global_agent_kind, AgentDef, AgentKind, AgentMeta,
49    AgentProfile, Blueprint, BlueprintMetadata, BlueprintOrigin, CompilerHints, CompilerStrategy,
50    OperatorDef, SpawnerHints, CURRENT_SCHEMA_VERSION,
51};
52
53/// Bridges `mlua_flow_ir::AsyncDispatcher` to the engine's
54/// `start_task` + `dispatch_attempt_with` pair. Holds one Operator session
55/// token and one `spawner`, and spins up a fresh task per `Step.ref`, using
56/// it as the agent name.
57///
58/// Constructed exclusively via `with_spawner`: each dispatch goes through
59/// `engine.dispatch_attempt_with(token, tid, spawner)`, carrying the
60/// spawner per request. Nothing is stashed on engine-global state, so
61/// multiple dispatchers can drive different Blueprints against the same
62/// `Engine` in parallel without racing.
63pub struct EngineDispatcher {
64    engine: Engine,
65    op_token: CapToken,
66    spawner: Arc<dyn SpawnerAdapter>,
67}
68
69impl EngineDispatcher {
70    /// The sole constructor: the spawner is carried per-dispatcher.
71    pub fn with_spawner(
72        engine: Engine,
73        op_token: CapToken,
74        spawner: Arc<dyn SpawnerAdapter>,
75    ) -> Self {
76        Self {
77            engine,
78            op_token,
79            spawner,
80        }
81    }
82}
83
84#[async_trait]
85impl AsyncDispatcher for EngineDispatcher {
86    async fn dispatch(&self, ref_: &str, input: Value) -> Result<Value, EvalError> {
87        // Turn the evaluated Step.in value into a directive. Strings pass
88        // through verbatim; anything else is serde-stringified (the worker
89        // is expected to re-parse it).
90        let directive = match &input {
91            Value::String(s) => s.clone(),
92            other => other.to_string(),
93        };
94
95        let tid = self
96            .engine
97            .start_task(
98                &self.op_token,
99                TaskSpec {
100                    agent: ref_.to_string(),
101                    initial_directive: directive,
102                },
103            )
104            .await
105            .map_err(|e| EvalError::DispatcherError {
106                ref_: ref_.to_string(),
107                msg: format!("start_task: {e}"),
108            })?;
109
110        let outcome = self
111            .engine
112            .dispatch_attempt_with(&self.op_token, &tid, &self.spawner)
113            .await;
114        match outcome {
115            Ok(DispatchOutcome::Pass(v)) => Ok(v),
116            Ok(DispatchOutcome::Blocked(v)) => Err(EvalError::DispatcherError {
117                ref_: ref_.to_string(),
118                msg: format!("blocked: {v}"),
119            }),
120            Ok(other) => Err(EvalError::DispatcherError {
121                ref_: ref_.to_string(),
122                msg: format!("non-terminal outcome: {:?}", other),
123            }),
124            Err(e) => Err(EvalError::DispatcherError {
125                ref_: ref_.to_string(),
126                msg: format!("dispatch_attempt: {e}"),
127            }),
128        }
129    }
130}