Skip to main content

mlua_swarm/application/
task.rs

1//! `TaskApplication` — the `POST /v1/tasks` entry point.
2//!
3//! Input: `BlueprintRef` (Inline / Id) plus a `TaskSpec`. Output:
4//! `(CapToken, TaskId, version)`. Once the Blueprint is resolved, the
5//! engine-side operations (`bind` + `attach` + `start_task`) are
6//! delegated to [`TaskLaunchService`].
7
8use super::semver_resolve::SemverResolveError;
9use super::Application;
10use crate::blueprint::store::{BlueprintId, BlueprintStore, BlueprintStoreError, BlueprintVersion};
11use crate::blueprint::Blueprint;
12use crate::core::ctx::OperatorKind;
13use crate::service::{TaskLaunchError, TaskLaunchInput, TaskLaunchOutput, TaskLaunchService};
14use crate::types::{CapToken, Role};
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21use thiserror::Error;
22
23/// How a task entry says the Blueprint should be resolved.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "snake_case")]
26pub enum BlueprintRef {
27    /// The Blueprint value is embedded directly in the request; no
28    /// store lookup happens.
29    Inline {
30        /// The Blueprint to run as-is.
31        value: Box<Blueprint>,
32    },
33    /// Resolve the Blueprint from the `BlueprintStore` by id.
34    Id {
35        /// The `BlueprintId` to look up in the store.
36        id: BlueprintId,
37        /// Which generation to pick; defaults to `Latest`.
38        #[serde(default)]
39        version: VersionSelector,
40    },
41}
42
43/// How to pick a generation — a `version` inside the store.
44#[derive(Debug, Clone, Default, Serialize, Deserialize)]
45#[serde(tag = "kind", rename_all = "snake_case")]
46pub enum VersionSelector {
47    /// Use the store's current head version.
48    #[default]
49    Latest,
50    /// Use one exact, previously-committed version.
51    Fixed {
52        /// The exact version to read.
53        value: BlueprintVersion,
54    },
55    /// Scan the store's history and pick the highest version whose
56    /// `BlueprintMetadata.version_label` satisfies `req`.
57    SemverReq {
58        /// The semver requirement every candidate label is matched
59        /// against.
60        req: semver::VersionReq,
61    },
62}
63
64/// Input to [`TaskApplication::handle`] — the `POST /v1/tasks` request
65/// body once decoded.
66#[derive(Debug, Clone)]
67pub struct TaskApplicationInput {
68    /// Accepts both Inline (a Blueprint value directly) and Id
69    /// (store fetch + a `VersionSelector`).
70    pub blueprint: BlueprintRef,
71    /// Caller-supplied id for the Operator that owns this run.
72    pub operator_id: String,
73    /// The Operator's role for this run.
74    pub role: Role,
75    /// How long the attached session is allowed to live.
76    pub ttl: Duration,
77    /// Initial `ctx` for flow.ir `eval`. Read by every `Step.in`.
78    pub init_ctx: Value,
79    /// "Runtime Global" tier of the `OperatorKind` cascade. `Some(_)` is
80    /// always an explicit request — including `Some(OperatorKind::Automate)`
81    /// — that outranks the BP-level tiers (`OperatorDef.kind` /
82    /// `Blueprint.default_operator_kind`); `None` leaves it unspecified so
83    /// those tiers / the final default decide. Under `MainAi` /
84    /// `Composite`, `MainAIMiddleware`'s `spawn_hook` before/after
85    /// callbacks become effective. See
86    /// `crate::core::ctx::collapse_operator_kind`.
87    pub operator_kind: Option<crate::core::ctx::OperatorKind>,
88    /// `SeniorBridge` registry ID. `None` — none in use;
89    /// `Some(id)` — attach a bridge previously registered on the
90    /// engine.
91    pub bridge_id: Option<String>,
92    /// `SpawnHook` registry ID. Same shape as above — attach a hook
93    /// previously registered on the engine.
94    pub hook_id: Option<String>,
95    /// Operator registry ID — used on the path that hands the whole
96    /// spawn off to an external Operator.
97    pub operator_backend_id: Option<String>,
98    /// "Runtime Agent-level" tier (highest priority) of the `OperatorKind`
99    /// cascade — per-agent override, keyed by `AgentDef.name`. Empty by
100    /// default. See `crate::core::ctx::collapse_operator_kind` for the full tier
101    /// list.
102    pub operator_kind_overrides: HashMap<String, OperatorKind>,
103}
104
105impl TaskApplicationInput {
106    /// Helper for existing callers on the default path — no hooks and no
107    /// per-agent `OperatorKind` overrides. Leaves the "Runtime Global" tier
108    /// unspecified (`None`), so the BP-level tiers / final default
109    /// (`OperatorKind::Automate`) decide — this preserves today's
110    /// behaviour for every existing caller without silently forcing
111    /// `Automate` as an explicit override that would outrank a BP-declared
112    /// `MainAi`/`Composite` kind.
113    pub fn automate(
114        blueprint: BlueprintRef,
115        operator_id: impl Into<String>,
116        role: Role,
117        ttl: Duration,
118        init_ctx: Value,
119    ) -> Self {
120        Self {
121            blueprint,
122            operator_id: operator_id.into(),
123            role,
124            ttl,
125            init_ctx,
126            operator_kind: None,
127            bridge_id: None,
128            hook_id: None,
129            operator_backend_id: None,
130            operator_kind_overrides: HashMap::new(),
131        }
132    }
133}
134
135/// Result of a successful [`TaskApplication::handle`] call.
136#[derive(Debug, Clone)]
137pub struct TaskApplicationOutput {
138    /// The capability token for the attached session.
139    pub token: CapToken,
140    /// The final `ctx` after the flow ran to completion.
141    pub final_ctx: Value,
142    /// Only `Some` when resolution went through the store
143    /// (`BlueprintRef::Id`); `None` on the Inline path.
144    pub bound_version: Option<BlueprintVersion>,
145}
146
147/// Failure modes of [`TaskApplication::handle`] and
148/// [`TaskApplication::resolve`].
149#[derive(Debug, Error)]
150pub enum TaskApplicationError {
151    /// `BlueprintRef::Id` was used but this `TaskApplication` was
152    /// built via [`TaskApplication::new_inline_only`] (no store).
153    #[error("store not configured (BlueprintRef::Id requires store)")]
154    NoStore,
155    /// The `BlueprintStore` returned an error while resolving the ref.
156    #[error("store: {0}")]
157    Store(#[from] BlueprintStoreError),
158    /// `TaskLaunchService::launch` failed after resolution succeeded.
159    #[error("launch: {0}")]
160    Launch(#[from] TaskLaunchError),
161    /// A stored version's `version_label` is not valid semver.
162    #[error("invalid semver version_label {label:?}: {source}")]
163    InvalidSemver {
164        /// The offending label string.
165        label: String,
166        /// The underlying semver parse error.
167        #[source]
168        source: semver::Error,
169    },
170    /// No stored version's label satisfies the `SemverReq`.
171    #[error("no version matches semver req: {req}")]
172    NoMatchingVersion {
173        /// The requirement string that matched nothing.
174        req: String,
175    },
176}
177
178impl From<SemverResolveError> for TaskApplicationError {
179    fn from(e: SemverResolveError) -> Self {
180        match e {
181            SemverResolveError::Store(e) => TaskApplicationError::Store(e),
182            SemverResolveError::InvalidSemver { label, source } => {
183                TaskApplicationError::InvalidSemver { label, source }
184            }
185            SemverResolveError::NoMatchingVersion { req } => {
186                TaskApplicationError::NoMatchingVersion { req }
187            }
188        }
189    }
190}
191
192/// The `POST /v1/tasks` [`Application`] — resolves a `BlueprintRef` and
193/// runs it to completion through [`TaskLaunchService`].
194pub struct TaskApplication {
195    launch: Arc<TaskLaunchService>,
196    /// Only needed when resolving `BlueprintRef::Id`; `None` in
197    /// Inline-only mode.
198    store: Option<Arc<dyn BlueprintStore>>,
199}
200
201impl TaskApplication {
202    /// Build a `TaskApplication` that can resolve both `Inline` and
203    /// `Id` `BlueprintRef`s (the `Id` path reads through `store`).
204    pub fn new(launch: Arc<TaskLaunchService>, store: Arc<dyn BlueprintStore>) -> Self {
205        Self {
206            launch,
207            store: Some(store),
208        }
209    }
210
211    /// Build a `TaskApplication` restricted to `Inline` `BlueprintRef`s
212    /// — no store is configured, so `Id` resolution always fails with
213    /// `TaskApplicationError::NoStore`.
214    pub fn new_inline_only(launch: Arc<TaskLaunchService>) -> Self {
215        Self {
216            launch,
217            store: None,
218        }
219    }
220
221    /// Resolve a `BlueprintRef` and return the real Blueprint plus,
222    /// when it went through the store, the resolved version.
223    pub async fn resolve(
224        &self,
225        bp_ref: &BlueprintRef,
226    ) -> Result<(Blueprint, Option<BlueprintVersion>), TaskApplicationError> {
227        match bp_ref {
228            BlueprintRef::Inline { value } => Ok((value.as_ref().clone(), None)),
229            BlueprintRef::Id { id, version } => {
230                let store = self.store.as_ref().ok_or(TaskApplicationError::NoStore)?;
231                let bp_id = id.clone();
232                let traced = match version {
233                    VersionSelector::Latest => store.read_head(&bp_id).await?,
234                    VersionSelector::Fixed { value } => store.read_version(&bp_id, *value).await?,
235                    VersionSelector::SemverReq { req } => {
236                        let v = super::semver_resolve::resolve_semver(store.as_ref(), &bp_id, req)
237                            .await?;
238                        store.read_version(&bp_id, v).await?
239                    }
240                };
241                let ver = traced.trace.version;
242                Ok((traced.value, Some(ver)))
243            }
244        }
245    }
246}
247
248#[async_trait]
249impl Application for TaskApplication {
250    type Input = TaskApplicationInput;
251    type Output = TaskApplicationOutput;
252    type Error = TaskApplicationError;
253
254    fn name(&self) -> &str {
255        "task"
256    }
257
258    /// Resolve the `BlueprintRef` (Inline / Id) and run the flow to
259    /// completion through `TaskLaunchService::launch`.
260    async fn handle(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
261        let (blueprint, bound_version) = self.resolve(&input.blueprint).await?;
262        let TaskLaunchOutput { token, final_ctx } = self
263            .launch
264            .launch(TaskLaunchInput {
265                blueprint,
266                operator_id: input.operator_id,
267                role: input.role,
268                ttl: input.ttl,
269                operator_kind: input.operator_kind,
270                bridge_id: input.bridge_id,
271                hook_id: input.hook_id,
272                operator_backend_id: input.operator_backend_id,
273                operator_kind_overrides: input.operator_kind_overrides,
274                init_ctx: input.init_ctx,
275            })
276            .await?;
277        Ok(TaskApplicationOutput {
278            token,
279            final_ctx,
280            bound_version,
281        })
282    }
283}
284
285// ──────────────────────────────────────────────────────────────────────────
286// UT
287// ──────────────────────────────────────────────────────────────────────────
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::blueprint::compiler::{Compiler, SpawnerRegistry};
293    use crate::blueprint::store::{
294        blueprint_version, BlueprintId, BlueprintStore, BlueprintStoreError, CommitMetadata,
295        InMemoryBlueprintStore,
296    };
297    use crate::blueprint::{
298        current_schema_version, AgentKind, Blueprint, BlueprintMetadata, CompilerHints,
299        CompilerStrategy,
300    };
301    use crate::core::config::EngineCfg;
302    use crate::core::ctx::OperatorKind;
303    use crate::core::engine::Engine;
304    use mlua_flow_ir::Node as FlowNode;
305
306    fn empty_bp() -> Blueprint {
307        Blueprint {
308            schema_version: current_schema_version(),
309            id: "ut-bp".into(),
310            flow: FlowNode::Seq { children: vec![] },
311            agents: vec![],
312            operators: vec![],
313            hints: CompilerHints::default(),
314            strategy: CompilerStrategy::default(),
315            metadata: BlueprintMetadata::default(),
316            spawner_hints: Default::default(),
317            default_agent_kind: AgentKind::Operator,
318            default_operator_kind: None,
319        }
320    }
321
322    fn bp_with_label(id: &str, version_label: Option<&str>) -> Blueprint {
323        Blueprint {
324            schema_version: current_schema_version(),
325            id: id.into(),
326            flow: FlowNode::Seq { children: vec![] },
327            agents: vec![],
328            operators: vec![],
329            hints: CompilerHints::default(),
330            strategy: CompilerStrategy::default(),
331            metadata: BlueprintMetadata {
332                description: None,
333                origin: Default::default(),
334                tags: vec![],
335                version_label: version_label.map(|s| s.to_string()),
336                project_name_alias: None,
337                default_run_ttl_secs: None,
338            },
339            spawner_hints: Default::default(),
340            default_agent_kind: AgentKind::Operator,
341            default_operator_kind: None,
342        }
343    }
344
345    fn build_app_with_store() -> (TaskApplication, Arc<dyn BlueprintStore>) {
346        let reg = SpawnerRegistry::new();
347        let compiler = Compiler::new(reg);
348        let engine = Engine::new(EngineCfg::default());
349        let launch = Arc::new(TaskLaunchService::new(engine, compiler));
350        let store: Arc<dyn BlueprintStore> = Arc::new(InMemoryBlueprintStore::new());
351        (TaskApplication::new(launch, store.clone()), store)
352    }
353
354    fn build_app_inline_only() -> TaskApplication {
355        let reg = SpawnerRegistry::new();
356        let compiler = Compiler::new(reg);
357        let engine = Engine::new(EngineCfg::default());
358        let launch = Arc::new(TaskLaunchService::new(engine, compiler));
359        TaskApplication::new_inline_only(launch)
360    }
361
362    async fn seed(store: &Arc<dyn BlueprintStore>, bp: &Blueprint) -> BlueprintVersion {
363        let id = BlueprintId::new(bp.id.clone());
364        let v = blueprint_version(bp).expect("hash");
365        store
366            .write_new(&id, bp, &[], CommitMetadata::seed(id.clone(), v, 0))
367            .await
368            .expect("seed");
369        v
370    }
371
372    #[test]
373    fn automate_helper_sets_defaults() {
374        let input = TaskApplicationInput::automate(
375            BlueprintRef::Inline {
376                value: Box::new(empty_bp()),
377            },
378            "op-1",
379            Role::Operator,
380            Duration::from_secs(10),
381            serde_json::json!({}),
382        );
383        assert!(
384            input.operator_kind.is_none(),
385            "automate() leaves the Runtime Global tier unspecified (None), \
386             not an explicit Some(Automate) override"
387        );
388        assert!(input.bridge_id.is_none());
389        assert!(input.hook_id.is_none());
390        assert_eq!(input.operator_id, "op-1");
391    }
392
393    #[test]
394    fn struct_literal_allows_callback_ids() {
395        let input = TaskApplicationInput {
396            blueprint: BlueprintRef::Inline {
397                value: Box::new(empty_bp()),
398            },
399            operator_id: "op-2".into(),
400            role: Role::Operator,
401            ttl: Duration::from_secs(5),
402            init_ctx: serde_json::json!({}),
403            operator_kind: Some(OperatorKind::MainAi),
404            bridge_id: Some("br-x".into()),
405            hook_id: Some("hk-y".into()),
406            operator_backend_id: None,
407            operator_kind_overrides: HashMap::new(),
408        };
409        assert!(matches!(input.operator_kind, Some(OperatorKind::MainAi)));
410        assert_eq!(input.bridge_id.as_deref(), Some("br-x"));
411        assert_eq!(input.hook_id.as_deref(), Some("hk-y"));
412    }
413
414    // ──────────────────────────────────────────────────────────────────
415    // resolve / resolve_semver carve
416    // ──────────────────────────────────────────────────────────────────
417
418    #[tokio::test]
419    async fn resolve_inline_returns_bp_and_no_version() {
420        let app = build_app_inline_only();
421        let bp = empty_bp();
422        let (got, ver) = app
423            .resolve(&BlueprintRef::Inline {
424                value: Box::new(bp.clone()),
425            })
426            .await
427            .expect("resolve inline ok");
428        assert_eq!(got.id, bp.id);
429        assert!(ver.is_none(), "the Inline path yields bound_version=None");
430    }
431
432    #[tokio::test]
433    async fn resolve_id_latest_returns_bp_and_version() {
434        let (app, store) = build_app_with_store();
435        let bp = bp_with_label("rid-latest", Some("0.1.0"));
436        let v = seed(&store, &bp).await;
437        let (got, ver) = app
438            .resolve(&BlueprintRef::Id {
439                id: BlueprintId::new(bp.id.clone()),
440                version: VersionSelector::Latest,
441            })
442            .await
443            .expect("resolve id latest ok");
444        assert_eq!(got.id, bp.id);
445        assert_eq!(ver, Some(v), "Latest = seed version");
446    }
447
448    #[tokio::test]
449    async fn resolve_id_fixed_picks_exact_version() {
450        let (app, store) = build_app_with_store();
451        let id = "rid-fixed";
452        let bp1 = bp_with_label(id, Some("1.0.0"));
453        let bp2 = bp_with_label(id, Some("2.0.0"));
454        let v1 = seed(&store, &bp1).await;
455        let _v2 = seed(&store, &bp2).await;
456        let (got, ver) = app
457            .resolve(&BlueprintRef::Id {
458                id: BlueprintId::new(id),
459                version: VersionSelector::Fixed { value: v1 },
460            })
461            .await
462            .expect("resolve id fixed ok");
463        assert_eq!(ver, Some(v1));
464        assert_eq!(
465            got.metadata.version_label.as_deref(),
466            Some("1.0.0"),
467            "Fixed{{v1}} resolves to v1 = 1.0.0"
468        );
469    }
470
471    #[tokio::test]
472    async fn resolve_id_semver_picks_highest_matching() {
473        let (app, store) = build_app_with_store();
474        let id = "rid-semver";
475        let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
476        let _ = seed(&store, &bp_with_label(id, Some("1.2.0"))).await;
477        let _ = seed(&store, &bp_with_label(id, Some("2.0.0"))).await;
478        let req = semver::VersionReq::parse("^1").expect("req");
479        let (got, ver) = app
480            .resolve(&BlueprintRef::Id {
481                id: BlueprintId::new(id),
482                version: VersionSelector::SemverReq { req },
483            })
484            .await
485            .expect("resolve semver ok");
486        assert!(ver.is_some());
487        assert_eq!(
488            got.metadata.version_label.as_deref(),
489            Some("1.2.0"),
490            "^1 max = 1.2.0 (2.0.0 is out of range; 1.0.0 is lower)"
491        );
492    }
493
494    #[tokio::test]
495    async fn resolve_id_semver_no_match_errs() {
496        let (app, store) = build_app_with_store();
497        let id = "rid-semver-nomatch";
498        let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
499        let req = semver::VersionReq::parse("^3").expect("req");
500        let err = app
501            .resolve(&BlueprintRef::Id {
502                id: BlueprintId::new(id),
503                version: VersionSelector::SemverReq { req },
504            })
505            .await
506            .expect_err("expected NoMatchingVersion");
507        match err {
508            TaskApplicationError::NoMatchingVersion { req } => {
509                assert!(req.contains("^3"), "req string carry: {req}");
510            }
511            other => panic!("expected NoMatchingVersion, got {other:?}"),
512        }
513    }
514
515    #[tokio::test]
516    async fn resolve_id_semver_invalid_label_errs() {
517        let (app, store) = build_app_with_store();
518        let id = "rid-semver-bad";
519        let _ = seed(&store, &bp_with_label(id, Some("not-semver"))).await;
520        let req = semver::VersionReq::parse("^1").expect("req");
521        let err = app
522            .resolve(&BlueprintRef::Id {
523                id: BlueprintId::new(id),
524                version: VersionSelector::SemverReq { req },
525            })
526            .await
527            .expect_err("expected InvalidSemver");
528        match err {
529            TaskApplicationError::InvalidSemver { label, .. } => {
530                assert_eq!(label, "not-semver");
531            }
532            other => panic!("expected InvalidSemver, got {other:?}"),
533        }
534    }
535
536    #[tokio::test]
537    async fn resolve_id_without_store_errs_no_store() {
538        let app = build_app_inline_only();
539        let err = app
540            .resolve(&BlueprintRef::Id {
541                id: BlueprintId::new("anything"),
542                version: VersionSelector::Latest,
543            })
544            .await
545            .expect_err("expected NoStore");
546        assert!(matches!(err, TaskApplicationError::NoStore), "got {err:?}");
547    }
548
549    #[tokio::test]
550    async fn resolve_id_not_found_errs_store() {
551        let (app, _store) = build_app_with_store();
552        let err = app
553            .resolve(&BlueprintRef::Id {
554                id: BlueprintId::new("never-seeded"),
555                version: VersionSelector::Latest,
556            })
557            .await
558            .expect_err("expected Store(IdNotFound|HeadEmpty)");
559        match err {
560            TaskApplicationError::Store(
561                BlueprintStoreError::IdNotFound(_) | BlueprintStoreError::HeadEmpty(_),
562            ) => {}
563            other => panic!("expected Store(IdNotFound|HeadEmpty), got {other:?}"),
564        }
565    }
566}