use super::semver_resolve::SemverResolveError;
use super::Application;
use crate::blueprint::store::{BlueprintId, BlueprintStore, BlueprintStoreError, BlueprintVersion};
use crate::blueprint::Blueprint;
use crate::core::ctx::OperatorKind;
use crate::service::{TaskLaunchError, TaskLaunchInput, TaskLaunchOutput, TaskLaunchService};
use crate::types::{CapToken, Role};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum BlueprintRef {
Inline {
value: Box<Blueprint>,
},
Id {
id: BlueprintId,
#[serde(default)]
version: VersionSelector,
},
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum VersionSelector {
#[default]
Latest,
Fixed {
value: BlueprintVersion,
},
SemverReq {
req: semver::VersionReq,
},
}
#[derive(Debug, Clone)]
pub struct TaskApplicationInput {
pub blueprint: BlueprintRef,
pub operator_id: String,
pub role: Role,
pub ttl: Duration,
pub init_ctx: Value,
pub operator_kind: Option<crate::core::ctx::OperatorKind>,
pub bridge_id: Option<String>,
pub hook_id: Option<String>,
pub operator_backend_id: Option<String>,
pub operator_kind_overrides: HashMap<String, OperatorKind>,
}
impl TaskApplicationInput {
pub fn automate(
blueprint: BlueprintRef,
operator_id: impl Into<String>,
role: Role,
ttl: Duration,
init_ctx: Value,
) -> Self {
Self {
blueprint,
operator_id: operator_id.into(),
role,
ttl,
init_ctx,
operator_kind: None,
bridge_id: None,
hook_id: None,
operator_backend_id: None,
operator_kind_overrides: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct TaskApplicationOutput {
pub token: CapToken,
pub final_ctx: Value,
pub bound_version: Option<BlueprintVersion>,
}
#[derive(Debug, Error)]
pub enum TaskApplicationError {
#[error("store not configured (BlueprintRef::Id requires store)")]
NoStore,
#[error("store: {0}")]
Store(#[from] BlueprintStoreError),
#[error("launch: {0}")]
Launch(#[from] TaskLaunchError),
#[error("invalid semver version_label {label:?}: {source}")]
InvalidSemver {
label: String,
#[source]
source: semver::Error,
},
#[error("no version matches semver req: {req}")]
NoMatchingVersion {
req: String,
},
}
impl From<SemverResolveError> for TaskApplicationError {
fn from(e: SemverResolveError) -> Self {
match e {
SemverResolveError::Store(e) => TaskApplicationError::Store(e),
SemverResolveError::InvalidSemver { label, source } => {
TaskApplicationError::InvalidSemver { label, source }
}
SemverResolveError::NoMatchingVersion { req } => {
TaskApplicationError::NoMatchingVersion { req }
}
}
}
}
pub struct TaskApplication {
launch: Arc<TaskLaunchService>,
store: Option<Arc<dyn BlueprintStore>>,
}
impl TaskApplication {
pub fn new(launch: Arc<TaskLaunchService>, store: Arc<dyn BlueprintStore>) -> Self {
Self {
launch,
store: Some(store),
}
}
pub fn new_inline_only(launch: Arc<TaskLaunchService>) -> Self {
Self {
launch,
store: None,
}
}
pub async fn resolve(
&self,
bp_ref: &BlueprintRef,
) -> Result<(Blueprint, Option<BlueprintVersion>), TaskApplicationError> {
match bp_ref {
BlueprintRef::Inline { value } => Ok((value.as_ref().clone(), None)),
BlueprintRef::Id { id, version } => {
let store = self.store.as_ref().ok_or(TaskApplicationError::NoStore)?;
let bp_id = id.clone();
let traced = match version {
VersionSelector::Latest => store.read_head(&bp_id).await?,
VersionSelector::Fixed { value } => store.read_version(&bp_id, *value).await?,
VersionSelector::SemverReq { req } => {
let v = super::semver_resolve::resolve_semver(store.as_ref(), &bp_id, req)
.await?;
store.read_version(&bp_id, v).await?
}
};
let ver = traced.trace.version;
Ok((traced.value, Some(ver)))
}
}
}
}
#[async_trait]
impl Application for TaskApplication {
type Input = TaskApplicationInput;
type Output = TaskApplicationOutput;
type Error = TaskApplicationError;
fn name(&self) -> &str {
"task"
}
async fn handle(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
let (blueprint, bound_version) = self.resolve(&input.blueprint).await?;
let TaskLaunchOutput { token, final_ctx } = self
.launch
.launch(TaskLaunchInput {
blueprint,
operator_id: input.operator_id,
role: input.role,
ttl: input.ttl,
operator_kind: input.operator_kind,
bridge_id: input.bridge_id,
hook_id: input.hook_id,
operator_backend_id: input.operator_backend_id,
operator_kind_overrides: input.operator_kind_overrides,
init_ctx: input.init_ctx,
})
.await?;
Ok(TaskApplicationOutput {
token,
final_ctx,
bound_version,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blueprint::compiler::{Compiler, SpawnerRegistry};
use crate::blueprint::store::{
blueprint_version, BlueprintId, BlueprintStore, BlueprintStoreError, CommitMetadata,
InMemoryBlueprintStore,
};
use crate::blueprint::{
current_schema_version, AgentKind, Blueprint, BlueprintMetadata, CompilerHints,
CompilerStrategy,
};
use crate::core::config::EngineCfg;
use crate::core::ctx::OperatorKind;
use crate::core::engine::Engine;
use mlua_flow_ir::Node as FlowNode;
fn empty_bp() -> Blueprint {
Blueprint {
schema_version: current_schema_version(),
id: "ut-bp".into(),
flow: FlowNode::Seq { children: vec![] },
agents: vec![],
operators: vec![],
hints: CompilerHints::default(),
strategy: CompilerStrategy::default(),
metadata: BlueprintMetadata::default(),
spawner_hints: Default::default(),
default_agent_kind: AgentKind::Operator,
default_operator_kind: None,
}
}
fn bp_with_label(id: &str, version_label: Option<&str>) -> Blueprint {
Blueprint {
schema_version: current_schema_version(),
id: id.into(),
flow: FlowNode::Seq { children: vec![] },
agents: vec![],
operators: vec![],
hints: CompilerHints::default(),
strategy: CompilerStrategy::default(),
metadata: BlueprintMetadata {
description: None,
origin: Default::default(),
tags: vec![],
version_label: version_label.map(|s| s.to_string()),
project_name_alias: None,
default_run_ttl_secs: None,
},
spawner_hints: Default::default(),
default_agent_kind: AgentKind::Operator,
default_operator_kind: None,
}
}
fn build_app_with_store() -> (TaskApplication, Arc<dyn BlueprintStore>) {
let reg = SpawnerRegistry::new();
let compiler = Compiler::new(reg);
let engine = Engine::new(EngineCfg::default());
let launch = Arc::new(TaskLaunchService::new(engine, compiler));
let store: Arc<dyn BlueprintStore> = Arc::new(InMemoryBlueprintStore::new());
(TaskApplication::new(launch, store.clone()), store)
}
fn build_app_inline_only() -> TaskApplication {
let reg = SpawnerRegistry::new();
let compiler = Compiler::new(reg);
let engine = Engine::new(EngineCfg::default());
let launch = Arc::new(TaskLaunchService::new(engine, compiler));
TaskApplication::new_inline_only(launch)
}
async fn seed(store: &Arc<dyn BlueprintStore>, bp: &Blueprint) -> BlueprintVersion {
let id = BlueprintId::new(bp.id.clone());
let v = blueprint_version(bp).expect("hash");
store
.write_new(&id, bp, &[], CommitMetadata::seed(id.clone(), v, 0))
.await
.expect("seed");
v
}
#[test]
fn automate_helper_sets_defaults() {
let input = TaskApplicationInput::automate(
BlueprintRef::Inline {
value: Box::new(empty_bp()),
},
"op-1",
Role::Operator,
Duration::from_secs(10),
serde_json::json!({}),
);
assert!(
input.operator_kind.is_none(),
"automate() leaves the Runtime Global tier unspecified (None), \
not an explicit Some(Automate) override"
);
assert!(input.bridge_id.is_none());
assert!(input.hook_id.is_none());
assert_eq!(input.operator_id, "op-1");
}
#[test]
fn struct_literal_allows_callback_ids() {
let input = TaskApplicationInput {
blueprint: BlueprintRef::Inline {
value: Box::new(empty_bp()),
},
operator_id: "op-2".into(),
role: Role::Operator,
ttl: Duration::from_secs(5),
init_ctx: serde_json::json!({}),
operator_kind: Some(OperatorKind::MainAi),
bridge_id: Some("br-x".into()),
hook_id: Some("hk-y".into()),
operator_backend_id: None,
operator_kind_overrides: HashMap::new(),
};
assert!(matches!(input.operator_kind, Some(OperatorKind::MainAi)));
assert_eq!(input.bridge_id.as_deref(), Some("br-x"));
assert_eq!(input.hook_id.as_deref(), Some("hk-y"));
}
#[tokio::test]
async fn resolve_inline_returns_bp_and_no_version() {
let app = build_app_inline_only();
let bp = empty_bp();
let (got, ver) = app
.resolve(&BlueprintRef::Inline {
value: Box::new(bp.clone()),
})
.await
.expect("resolve inline ok");
assert_eq!(got.id, bp.id);
assert!(ver.is_none(), "the Inline path yields bound_version=None");
}
#[tokio::test]
async fn resolve_id_latest_returns_bp_and_version() {
let (app, store) = build_app_with_store();
let bp = bp_with_label("rid-latest", Some("0.1.0"));
let v = seed(&store, &bp).await;
let (got, ver) = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new(bp.id.clone()),
version: VersionSelector::Latest,
})
.await
.expect("resolve id latest ok");
assert_eq!(got.id, bp.id);
assert_eq!(ver, Some(v), "Latest = seed version");
}
#[tokio::test]
async fn resolve_id_fixed_picks_exact_version() {
let (app, store) = build_app_with_store();
let id = "rid-fixed";
let bp1 = bp_with_label(id, Some("1.0.0"));
let bp2 = bp_with_label(id, Some("2.0.0"));
let v1 = seed(&store, &bp1).await;
let _v2 = seed(&store, &bp2).await;
let (got, ver) = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new(id),
version: VersionSelector::Fixed { value: v1 },
})
.await
.expect("resolve id fixed ok");
assert_eq!(ver, Some(v1));
assert_eq!(
got.metadata.version_label.as_deref(),
Some("1.0.0"),
"Fixed{{v1}} resolves to v1 = 1.0.0"
);
}
#[tokio::test]
async fn resolve_id_semver_picks_highest_matching() {
let (app, store) = build_app_with_store();
let id = "rid-semver";
let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
let _ = seed(&store, &bp_with_label(id, Some("1.2.0"))).await;
let _ = seed(&store, &bp_with_label(id, Some("2.0.0"))).await;
let req = semver::VersionReq::parse("^1").expect("req");
let (got, ver) = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new(id),
version: VersionSelector::SemverReq { req },
})
.await
.expect("resolve semver ok");
assert!(ver.is_some());
assert_eq!(
got.metadata.version_label.as_deref(),
Some("1.2.0"),
"^1 max = 1.2.0 (2.0.0 is out of range; 1.0.0 is lower)"
);
}
#[tokio::test]
async fn resolve_id_semver_no_match_errs() {
let (app, store) = build_app_with_store();
let id = "rid-semver-nomatch";
let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
let req = semver::VersionReq::parse("^3").expect("req");
let err = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new(id),
version: VersionSelector::SemverReq { req },
})
.await
.expect_err("expected NoMatchingVersion");
match err {
TaskApplicationError::NoMatchingVersion { req } => {
assert!(req.contains("^3"), "req string carry: {req}");
}
other => panic!("expected NoMatchingVersion, got {other:?}"),
}
}
#[tokio::test]
async fn resolve_id_semver_invalid_label_errs() {
let (app, store) = build_app_with_store();
let id = "rid-semver-bad";
let _ = seed(&store, &bp_with_label(id, Some("not-semver"))).await;
let req = semver::VersionReq::parse("^1").expect("req");
let err = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new(id),
version: VersionSelector::SemverReq { req },
})
.await
.expect_err("expected InvalidSemver");
match err {
TaskApplicationError::InvalidSemver { label, .. } => {
assert_eq!(label, "not-semver");
}
other => panic!("expected InvalidSemver, got {other:?}"),
}
}
#[tokio::test]
async fn resolve_id_without_store_errs_no_store() {
let app = build_app_inline_only();
let err = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new("anything"),
version: VersionSelector::Latest,
})
.await
.expect_err("expected NoStore");
assert!(matches!(err, TaskApplicationError::NoStore), "got {err:?}");
}
#[tokio::test]
async fn resolve_id_not_found_errs_store() {
let (app, _store) = build_app_with_store();
let err = app
.resolve(&BlueprintRef::Id {
id: BlueprintId::new("never-seeded"),
version: VersionSelector::Latest,
})
.await
.expect_err("expected Store(IdNotFound|HeadEmpty)");
match err {
TaskApplicationError::Store(
BlueprintStoreError::IdNotFound(_) | BlueprintStoreError::HeadEmpty(_),
) => {}
other => panic!("expected Store(IdNotFound|HeadEmpty), got {other:?}"),
}
}
}