1use super::semver_resolve::SemverResolveError;
9use super::Application;
10use crate::blueprint::store::{BlueprintId, BlueprintStore, BlueprintStoreError, BlueprintVersion};
11use crate::blueprint::Blueprint;
12use crate::core::ctx::OperatorKind;
13use crate::service::{TaskLaunchError, TaskLaunchInput, TaskLaunchOutput, TaskLaunchService};
14use crate::types::{CapToken, Role};
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::Duration;
21use thiserror::Error;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "snake_case")]
26pub enum BlueprintRef {
27 Inline {
30 value: Box<Blueprint>,
32 },
33 Id {
35 id: BlueprintId,
37 #[serde(default)]
39 version: VersionSelector,
40 },
41}
42
43#[derive(Debug, Clone, Default, Serialize, Deserialize)]
45#[serde(tag = "kind", rename_all = "snake_case")]
46pub enum VersionSelector {
47 #[default]
49 Latest,
50 Fixed {
52 value: BlueprintVersion,
54 },
55 SemverReq {
58 req: semver::VersionReq,
61 },
62}
63
64#[derive(Debug, Clone)]
67pub struct TaskApplicationInput {
68 pub blueprint: BlueprintRef,
71 pub operator_id: String,
73 pub role: Role,
75 pub ttl: Duration,
77 pub init_ctx: Value,
79 pub operator_kind: Option<crate::core::ctx::OperatorKind>,
88 pub bridge_id: Option<String>,
92 pub hook_id: Option<String>,
95 pub operator_backend_id: Option<String>,
98 pub operator_kind_overrides: HashMap<String, OperatorKind>,
103}
104
105impl TaskApplicationInput {
106 pub fn automate(
114 blueprint: BlueprintRef,
115 operator_id: impl Into<String>,
116 role: Role,
117 ttl: Duration,
118 init_ctx: Value,
119 ) -> Self {
120 Self {
121 blueprint,
122 operator_id: operator_id.into(),
123 role,
124 ttl,
125 init_ctx,
126 operator_kind: None,
127 bridge_id: None,
128 hook_id: None,
129 operator_backend_id: None,
130 operator_kind_overrides: HashMap::new(),
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct TaskApplicationOutput {
138 pub token: CapToken,
140 pub final_ctx: Value,
142 pub bound_version: Option<BlueprintVersion>,
145}
146
147#[derive(Debug, Error)]
150pub enum TaskApplicationError {
151 #[error("store not configured (BlueprintRef::Id requires store)")]
154 NoStore,
155 #[error("store: {0}")]
157 Store(#[from] BlueprintStoreError),
158 #[error("launch: {0}")]
160 Launch(#[from] TaskLaunchError),
161 #[error("invalid semver version_label {label:?}: {source}")]
163 InvalidSemver {
164 label: String,
166 #[source]
168 source: semver::Error,
169 },
170 #[error("no version matches semver req: {req}")]
172 NoMatchingVersion {
173 req: String,
175 },
176}
177
178impl From<SemverResolveError> for TaskApplicationError {
179 fn from(e: SemverResolveError) -> Self {
180 match e {
181 SemverResolveError::Store(e) => TaskApplicationError::Store(e),
182 SemverResolveError::InvalidSemver { label, source } => {
183 TaskApplicationError::InvalidSemver { label, source }
184 }
185 SemverResolveError::NoMatchingVersion { req } => {
186 TaskApplicationError::NoMatchingVersion { req }
187 }
188 }
189 }
190}
191
192pub struct TaskApplication {
195 launch: Arc<TaskLaunchService>,
196 store: Option<Arc<dyn BlueprintStore>>,
199}
200
201impl TaskApplication {
202 pub fn new(launch: Arc<TaskLaunchService>, store: Arc<dyn BlueprintStore>) -> Self {
205 Self {
206 launch,
207 store: Some(store),
208 }
209 }
210
211 pub fn new_inline_only(launch: Arc<TaskLaunchService>) -> Self {
215 Self {
216 launch,
217 store: None,
218 }
219 }
220
221 pub async fn resolve(
224 &self,
225 bp_ref: &BlueprintRef,
226 ) -> Result<(Blueprint, Option<BlueprintVersion>), TaskApplicationError> {
227 match bp_ref {
228 BlueprintRef::Inline { value } => Ok((value.as_ref().clone(), None)),
229 BlueprintRef::Id { id, version } => {
230 let store = self.store.as_ref().ok_or(TaskApplicationError::NoStore)?;
231 let bp_id = id.clone();
232 let traced = match version {
233 VersionSelector::Latest => store.read_head(&bp_id).await?,
234 VersionSelector::Fixed { value } => store.read_version(&bp_id, *value).await?,
235 VersionSelector::SemverReq { req } => {
236 let v = super::semver_resolve::resolve_semver(store.as_ref(), &bp_id, req)
237 .await?;
238 store.read_version(&bp_id, v).await?
239 }
240 };
241 let ver = traced.trace.version;
242 Ok((traced.value, Some(ver)))
243 }
244 }
245 }
246}
247
248#[async_trait]
249impl Application for TaskApplication {
250 type Input = TaskApplicationInput;
251 type Output = TaskApplicationOutput;
252 type Error = TaskApplicationError;
253
254 fn name(&self) -> &str {
255 "task"
256 }
257
258 async fn handle(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
261 let (blueprint, bound_version) = self.resolve(&input.blueprint).await?;
262 let TaskLaunchOutput { token, final_ctx } = self
263 .launch
264 .launch(TaskLaunchInput {
265 blueprint,
266 operator_id: input.operator_id,
267 role: input.role,
268 ttl: input.ttl,
269 operator_kind: input.operator_kind,
270 bridge_id: input.bridge_id,
271 hook_id: input.hook_id,
272 operator_backend_id: input.operator_backend_id,
273 operator_kind_overrides: input.operator_kind_overrides,
274 init_ctx: input.init_ctx,
275 })
276 .await?;
277 Ok(TaskApplicationOutput {
278 token,
279 final_ctx,
280 bound_version,
281 })
282 }
283}
284
285#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::blueprint::compiler::{Compiler, SpawnerRegistry};
293 use crate::blueprint::store::{
294 blueprint_version, BlueprintId, BlueprintStore, BlueprintStoreError, CommitMetadata,
295 InMemoryBlueprintStore,
296 };
297 use crate::blueprint::{
298 current_schema_version, AgentKind, Blueprint, BlueprintMetadata, CompilerHints,
299 CompilerStrategy,
300 };
301 use crate::core::config::EngineCfg;
302 use crate::core::ctx::OperatorKind;
303 use crate::core::engine::Engine;
304 use mlua_flow_ir::Node as FlowNode;
305
306 fn empty_bp() -> Blueprint {
307 Blueprint {
308 schema_version: current_schema_version(),
309 id: "ut-bp".into(),
310 flow: FlowNode::Seq { children: vec![] },
311 agents: vec![],
312 operators: vec![],
313 hints: CompilerHints::default(),
314 strategy: CompilerStrategy::default(),
315 metadata: BlueprintMetadata::default(),
316 spawner_hints: Default::default(),
317 default_agent_kind: AgentKind::Operator,
318 default_operator_kind: None,
319 }
320 }
321
322 fn bp_with_label(id: &str, version_label: Option<&str>) -> Blueprint {
323 Blueprint {
324 schema_version: current_schema_version(),
325 id: id.into(),
326 flow: FlowNode::Seq { children: vec![] },
327 agents: vec![],
328 operators: vec![],
329 hints: CompilerHints::default(),
330 strategy: CompilerStrategy::default(),
331 metadata: BlueprintMetadata {
332 description: None,
333 origin: Default::default(),
334 tags: vec![],
335 version_label: version_label.map(|s| s.to_string()),
336 project_name_alias: None,
337 default_run_ttl_secs: None,
338 },
339 spawner_hints: Default::default(),
340 default_agent_kind: AgentKind::Operator,
341 default_operator_kind: None,
342 }
343 }
344
345 fn build_app_with_store() -> (TaskApplication, Arc<dyn BlueprintStore>) {
346 let reg = SpawnerRegistry::new();
347 let compiler = Compiler::new(reg);
348 let engine = Engine::new(EngineCfg::default());
349 let launch = Arc::new(TaskLaunchService::new(engine, compiler));
350 let store: Arc<dyn BlueprintStore> = Arc::new(InMemoryBlueprintStore::new());
351 (TaskApplication::new(launch, store.clone()), store)
352 }
353
354 fn build_app_inline_only() -> TaskApplication {
355 let reg = SpawnerRegistry::new();
356 let compiler = Compiler::new(reg);
357 let engine = Engine::new(EngineCfg::default());
358 let launch = Arc::new(TaskLaunchService::new(engine, compiler));
359 TaskApplication::new_inline_only(launch)
360 }
361
362 async fn seed(store: &Arc<dyn BlueprintStore>, bp: &Blueprint) -> BlueprintVersion {
363 let id = BlueprintId::new(bp.id.clone());
364 let v = blueprint_version(bp).expect("hash");
365 store
366 .write_new(&id, bp, &[], CommitMetadata::seed(id.clone(), v, 0))
367 .await
368 .expect("seed");
369 v
370 }
371
372 #[test]
373 fn automate_helper_sets_defaults() {
374 let input = TaskApplicationInput::automate(
375 BlueprintRef::Inline {
376 value: Box::new(empty_bp()),
377 },
378 "op-1",
379 Role::Operator,
380 Duration::from_secs(10),
381 serde_json::json!({}),
382 );
383 assert!(
384 input.operator_kind.is_none(),
385 "automate() leaves the Runtime Global tier unspecified (None), \
386 not an explicit Some(Automate) override"
387 );
388 assert!(input.bridge_id.is_none());
389 assert!(input.hook_id.is_none());
390 assert_eq!(input.operator_id, "op-1");
391 }
392
393 #[test]
394 fn struct_literal_allows_callback_ids() {
395 let input = TaskApplicationInput {
396 blueprint: BlueprintRef::Inline {
397 value: Box::new(empty_bp()),
398 },
399 operator_id: "op-2".into(),
400 role: Role::Operator,
401 ttl: Duration::from_secs(5),
402 init_ctx: serde_json::json!({}),
403 operator_kind: Some(OperatorKind::MainAi),
404 bridge_id: Some("br-x".into()),
405 hook_id: Some("hk-y".into()),
406 operator_backend_id: None,
407 operator_kind_overrides: HashMap::new(),
408 };
409 assert!(matches!(input.operator_kind, Some(OperatorKind::MainAi)));
410 assert_eq!(input.bridge_id.as_deref(), Some("br-x"));
411 assert_eq!(input.hook_id.as_deref(), Some("hk-y"));
412 }
413
414 #[tokio::test]
419 async fn resolve_inline_returns_bp_and_no_version() {
420 let app = build_app_inline_only();
421 let bp = empty_bp();
422 let (got, ver) = app
423 .resolve(&BlueprintRef::Inline {
424 value: Box::new(bp.clone()),
425 })
426 .await
427 .expect("resolve inline ok");
428 assert_eq!(got.id, bp.id);
429 assert!(ver.is_none(), "the Inline path yields bound_version=None");
430 }
431
432 #[tokio::test]
433 async fn resolve_id_latest_returns_bp_and_version() {
434 let (app, store) = build_app_with_store();
435 let bp = bp_with_label("rid-latest", Some("0.1.0"));
436 let v = seed(&store, &bp).await;
437 let (got, ver) = app
438 .resolve(&BlueprintRef::Id {
439 id: BlueprintId::new(bp.id.clone()),
440 version: VersionSelector::Latest,
441 })
442 .await
443 .expect("resolve id latest ok");
444 assert_eq!(got.id, bp.id);
445 assert_eq!(ver, Some(v), "Latest = seed version");
446 }
447
448 #[tokio::test]
449 async fn resolve_id_fixed_picks_exact_version() {
450 let (app, store) = build_app_with_store();
451 let id = "rid-fixed";
452 let bp1 = bp_with_label(id, Some("1.0.0"));
453 let bp2 = bp_with_label(id, Some("2.0.0"));
454 let v1 = seed(&store, &bp1).await;
455 let _v2 = seed(&store, &bp2).await;
456 let (got, ver) = app
457 .resolve(&BlueprintRef::Id {
458 id: BlueprintId::new(id),
459 version: VersionSelector::Fixed { value: v1 },
460 })
461 .await
462 .expect("resolve id fixed ok");
463 assert_eq!(ver, Some(v1));
464 assert_eq!(
465 got.metadata.version_label.as_deref(),
466 Some("1.0.0"),
467 "Fixed{{v1}} resolves to v1 = 1.0.0"
468 );
469 }
470
471 #[tokio::test]
472 async fn resolve_id_semver_picks_highest_matching() {
473 let (app, store) = build_app_with_store();
474 let id = "rid-semver";
475 let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
476 let _ = seed(&store, &bp_with_label(id, Some("1.2.0"))).await;
477 let _ = seed(&store, &bp_with_label(id, Some("2.0.0"))).await;
478 let req = semver::VersionReq::parse("^1").expect("req");
479 let (got, ver) = app
480 .resolve(&BlueprintRef::Id {
481 id: BlueprintId::new(id),
482 version: VersionSelector::SemverReq { req },
483 })
484 .await
485 .expect("resolve semver ok");
486 assert!(ver.is_some());
487 assert_eq!(
488 got.metadata.version_label.as_deref(),
489 Some("1.2.0"),
490 "^1 max = 1.2.0 (2.0.0 is out of range; 1.0.0 is lower)"
491 );
492 }
493
494 #[tokio::test]
495 async fn resolve_id_semver_no_match_errs() {
496 let (app, store) = build_app_with_store();
497 let id = "rid-semver-nomatch";
498 let _ = seed(&store, &bp_with_label(id, Some("1.0.0"))).await;
499 let req = semver::VersionReq::parse("^3").expect("req");
500 let err = app
501 .resolve(&BlueprintRef::Id {
502 id: BlueprintId::new(id),
503 version: VersionSelector::SemverReq { req },
504 })
505 .await
506 .expect_err("expected NoMatchingVersion");
507 match err {
508 TaskApplicationError::NoMatchingVersion { req } => {
509 assert!(req.contains("^3"), "req string carry: {req}");
510 }
511 other => panic!("expected NoMatchingVersion, got {other:?}"),
512 }
513 }
514
515 #[tokio::test]
516 async fn resolve_id_semver_invalid_label_errs() {
517 let (app, store) = build_app_with_store();
518 let id = "rid-semver-bad";
519 let _ = seed(&store, &bp_with_label(id, Some("not-semver"))).await;
520 let req = semver::VersionReq::parse("^1").expect("req");
521 let err = app
522 .resolve(&BlueprintRef::Id {
523 id: BlueprintId::new(id),
524 version: VersionSelector::SemverReq { req },
525 })
526 .await
527 .expect_err("expected InvalidSemver");
528 match err {
529 TaskApplicationError::InvalidSemver { label, .. } => {
530 assert_eq!(label, "not-semver");
531 }
532 other => panic!("expected InvalidSemver, got {other:?}"),
533 }
534 }
535
536 #[tokio::test]
537 async fn resolve_id_without_store_errs_no_store() {
538 let app = build_app_inline_only();
539 let err = app
540 .resolve(&BlueprintRef::Id {
541 id: BlueprintId::new("anything"),
542 version: VersionSelector::Latest,
543 })
544 .await
545 .expect_err("expected NoStore");
546 assert!(matches!(err, TaskApplicationError::NoStore), "got {err:?}");
547 }
548
549 #[tokio::test]
550 async fn resolve_id_not_found_errs_store() {
551 let (app, _store) = build_app_with_store();
552 let err = app
553 .resolve(&BlueprintRef::Id {
554 id: BlueprintId::new("never-seeded"),
555 version: VersionSelector::Latest,
556 })
557 .await
558 .expect_err("expected Store(IdNotFound|HeadEmpty)");
559 match err {
560 TaskApplicationError::Store(
561 BlueprintStoreError::IdNotFound(_) | BlueprintStoreError::HeadEmpty(_),
562 ) => {}
563 other => panic!("expected Store(IdNotFound|HeadEmpty), got {other:?}"),
564 }
565 }
566}