1use crate::blueprint::compiler::{CompileError, Compiler};
23use crate::blueprint::{Blueprint, EngineDispatcher};
24use crate::core::ctx::OperatorKind;
25use crate::core::engine::Engine;
26use crate::core::errors::EngineError;
27use crate::middleware::project_name_alias::ProjectNameAliasMiddleware;
28use crate::middleware::SpawnerStack;
29use crate::service::linker;
30use crate::types::{CapToken, Role};
31use mlua_flow_ir::{Externs, NoExterns};
32use serde_json::Value;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use thiserror::Error;
37
38fn derive_bp_agent_kinds(blueprint: &Blueprint) -> HashMap<String, OperatorKind> {
58 let mut out = HashMap::new();
59 if blueprint.operators.is_empty() {
60 return out;
61 }
62 for agent in &blueprint.agents {
63 let Some(op_ref) = agent.spec.get("operator_ref").and_then(|v| v.as_str()) else {
64 continue;
65 };
66 let Some(op_def) = blueprint.operators.iter().find(|o| o.name == op_ref) else {
67 continue;
68 };
69 if let Some(kind) = op_def.kind {
70 out.insert(agent.name.clone(), OperatorKind::from(kind));
71 }
72 }
73 out
74}
75
76#[derive(Debug, Error)]
78pub enum TaskLaunchError {
79 #[error("compile: {0}")]
81 Compile(#[from] CompileError),
82 #[error("engine: {0}")]
84 Engine(#[from] EngineError),
85 #[error("flow eval: {0}")]
88 FlowEval(String),
89}
90
91#[derive(Debug, Clone)]
93pub struct TaskLaunchInput {
94 pub blueprint: Blueprint,
96 pub operator_id: String,
98 pub role: Role,
100 pub ttl: Duration,
102 pub operator_kind: Option<OperatorKind>,
111 pub bridge_id: Option<String>,
115 pub hook_id: Option<String>,
118 pub operator_backend_id: Option<String>,
125 pub operator_kind_overrides: HashMap<String, OperatorKind>,
130 pub init_ctx: Value,
134}
135
136impl TaskLaunchInput {
137 pub fn automate(
145 blueprint: Blueprint,
146 operator_id: impl Into<String>,
147 role: Role,
148 ttl: Duration,
149 init_ctx: Value,
150 ) -> Self {
151 Self {
152 blueprint,
153 operator_id: operator_id.into(),
154 role,
155 ttl,
156 operator_kind: None,
157 bridge_id: None,
158 hook_id: None,
159 operator_backend_id: None,
160 operator_kind_overrides: HashMap::new(),
161 init_ctx,
162 }
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct TaskLaunchOutput {
169 pub token: CapToken,
171 pub final_ctx: Value,
175}
176
177pub struct TaskLaunchService {
181 engine: Engine,
182 compiler: Compiler,
183 externs: Arc<dyn Externs + Send + Sync>,
188}
189
190impl TaskLaunchService {
191 pub fn new(engine: Engine, compiler: Compiler) -> Self {
193 Self {
194 engine,
195 compiler,
196 externs: Arc::new(NoExterns),
197 }
198 }
199
200 pub fn with_externs(mut self, externs: Arc<dyn Externs + Send + Sync>) -> Self {
204 self.externs = externs;
205 self
206 }
207
208 pub fn engine(&self) -> &Engine {
210 &self.engine
211 }
212
213 pub fn compiler(&self) -> &Compiler {
215 &self.compiler
216 }
217
218 pub async fn launch(
230 &self,
231 input: TaskLaunchInput,
232 ) -> Result<TaskLaunchOutput, TaskLaunchError> {
233 let compiled = self.compiler.compile(&input.blueprint)?;
242 let spawner = linker::link(
243 compiled.router.clone(),
244 &input.blueprint.spawner_hints.layers,
245 &self.engine,
246 );
247 let spawner = if let Some(alias) = input.blueprint.metadata.project_name_alias.as_deref() {
254 SpawnerStack::new(spawner)
255 .layer(ProjectNameAliasMiddleware::new(alias))
256 .build()
257 } else {
258 spawner
259 };
260
261 let bp_agent_kinds = derive_bp_agent_kinds(&input.blueprint);
266 let bp_global_kind = input
267 .blueprint
268 .default_operator_kind
269 .map(OperatorKind::from);
270
271 let token = self
272 .engine
273 .attach_with_ids(
274 input.operator_id,
275 input.role,
276 input.ttl,
277 input.operator_kind,
278 input.bridge_id,
279 input.hook_id,
280 input.operator_backend_id,
281 input.operator_kind_overrides,
282 bp_agent_kinds,
283 bp_global_kind,
284 )
285 .await?;
286 let dispatcher =
287 EngineDispatcher::with_spawner(self.engine.clone(), token.clone(), spawner);
288 let final_ctx = mlua_flow_ir::eval_async_externs(
289 &input.blueprint.flow,
290 input.init_ctx,
291 &dispatcher,
292 &*self.externs,
293 )
294 .await
295 .map_err(|e| TaskLaunchError::FlowEval(e.to_string()))?;
296 Ok(TaskLaunchOutput { token, final_ctx })
297 }
298}
299
300#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::blueprint::compiler::{RustFnInProcessSpawnerFactory, SpawnerRegistry};
308 use crate::blueprint::{
309 current_schema_version, AgentDef, AgentKind, AgentMeta, BlueprintMetadata, CompilerHints,
310 CompilerStrategy,
311 };
312 use crate::core::config::EngineCfg;
313 use crate::worker::adapter::{WorkerError, WorkerResult};
314 use mlua_flow_ir::{Expr, JoinMode, Node as FlowNode};
315 use serde_json::json;
316 use std::sync::Arc;
317
318 fn path(s: &str) -> Expr {
319 Expr::Path { at: s.to_string() }
320 }
321 fn step(ref_: &str, in_: Expr, out: Expr) -> FlowNode {
322 FlowNode::Step {
323 ref_: ref_.to_string(),
324 in_,
325 out,
326 }
327 }
328
329 fn agent(name: &str, fn_id: &str) -> AgentDef {
330 AgentDef {
331 name: name.to_string(),
332 kind: AgentKind::RustFn,
333 spec: json!({ "fn_id": fn_id }),
334 profile: None,
335 meta: Some(AgentMeta::default()),
336 }
337 }
338
339 fn build_service(factory: RustFnInProcessSpawnerFactory) -> TaskLaunchService {
340 let engine = Engine::new(EngineCfg::default());
341 let mut reg = SpawnerRegistry::new();
342 reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(factory));
343 let compiler = Compiler::new(reg);
344 TaskLaunchService::new(engine, compiler)
345 }
346
347 fn bp(flow: FlowNode, agents: Vec<AgentDef>) -> Blueprint {
348 Blueprint {
349 schema_version: current_schema_version(),
350 id: "ut".into(),
351 flow,
352 agents,
353 operators: vec![],
354 hints: CompilerHints::default(),
355 strategy: CompilerStrategy::default(),
356 metadata: BlueprintMetadata::default(),
357 spawner_hints: Default::default(),
358 default_agent_kind: AgentKind::Operator,
359 default_operator_kind: None,
360 }
361 }
362
363 fn launch_input(blueprint: Blueprint, init_ctx: Value) -> TaskLaunchInput {
364 TaskLaunchInput::automate(
365 blueprint,
366 "ut-op",
367 Role::Operator,
368 Duration::from_secs(30),
369 init_ctx,
370 )
371 }
372
373 #[tokio::test]
374 async fn launch_single_step_writes_out_path() {
375 let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
376 Ok(WorkerResult {
377 value: json!({ "echoed": inv.prompt }),
378 ok: true,
379 })
380 });
381 let svc = build_service(factory);
382 let blueprint = bp(
383 step("echo", path("$.input"), path("$.out")),
384 vec![agent("echo", "echo")],
385 );
386 let out = svc
387 .launch(launch_input(blueprint, json!({ "input": "hi" })))
388 .await
389 .expect("launch ok");
390 assert_eq!(out.final_ctx["out"]["echoed"], "hi");
391 }
392
393 #[tokio::test]
394 async fn launch_three_step_seq_threads_ctx_forward() {
395 let factory = RustFnInProcessSpawnerFactory::new()
396 .register_fn("upper", |inv| async move {
397 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
398 Ok(WorkerResult {
399 value: json!(s.to_uppercase()),
400 ok: true,
401 })
402 })
403 .register_fn("suffix", |inv| async move {
404 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
405 Ok(WorkerResult {
406 value: json!(format!("{s}!")),
407 ok: true,
408 })
409 })
410 .register_fn("wrap", |inv| async move {
411 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
412 Ok(WorkerResult {
413 value: json!(format!("[{s}]")),
414 ok: true,
415 })
416 });
417 let svc = build_service(factory);
418 let flow = FlowNode::Seq {
419 children: vec![
420 step("upper", path("$.in"), path("$.s1")),
421 step("suffix", path("$.s1"), path("$.s2")),
422 step("wrap", path("$.s2"), path("$.s3")),
423 ],
424 };
425 let blueprint = bp(
426 flow,
427 vec![
428 agent("upper", "upper"),
429 agent("suffix", "suffix"),
430 agent("wrap", "wrap"),
431 ],
432 );
433 let out = svc
434 .launch(launch_input(blueprint, json!({ "in": "hello" })))
435 .await
436 .expect("launch ok");
437 assert_eq!(out.final_ctx["s1"], "HELLO");
438 assert_eq!(out.final_ctx["s2"], "HELLO!");
439 assert_eq!(out.final_ctx["s3"], "[HELLO!]");
440 }
441
442 #[tokio::test]
443 async fn launch_fanout_join_all_parallel_completes() {
444 use std::sync::atomic::{AtomicU32, Ordering};
445 let counter = Arc::new(AtomicU32::new(0));
446 let max_seen = Arc::new(AtomicU32::new(0));
447 let counter_clone = counter.clone();
448 let max_clone = max_seen.clone();
449
450 let factory = RustFnInProcessSpawnerFactory::new().register_fn("para", move |inv| {
453 let counter = counter_clone.clone();
454 let max_seen = max_clone.clone();
455 async move {
456 let now = counter.fetch_add(1, Ordering::SeqCst) + 1;
457 let mut prev = max_seen.load(Ordering::SeqCst);
458 while now > prev {
459 match max_seen.compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst) {
460 Ok(_) => break,
461 Err(p) => prev = p,
462 }
463 }
464 tokio::time::sleep(Duration::from_millis(50)).await;
465 counter.fetch_sub(1, Ordering::SeqCst);
466 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
467 Ok(WorkerResult {
468 value: json!(format!("did:{s}")),
469 ok: true,
470 })
471 }
472 });
473 let svc = build_service(factory);
474 let flow = FlowNode::Fanout {
475 items: path("$.items"),
476 bind: path("$.item"),
477 body: Box::new(step("para", path("$.item"), path("$.r"))),
478 join: JoinMode::All,
479 out: path("$.results"),
480 };
481 let blueprint = bp(flow, vec![agent("para", "para")]);
482 let out = svc
483 .launch(launch_input(
484 blueprint,
485 json!({ "items": ["a", "b", "c", "d"] }),
486 ))
487 .await
488 .expect("launch ok");
489 let results = out.final_ctx["results"].as_array().expect("array");
490 assert_eq!(results.len(), 4);
491 for (i, expected) in ["a", "b", "c", "d"].iter().enumerate() {
492 assert_eq!(results[i]["r"], json!(format!("did:{expected}")));
493 }
494 let max = max_seen.load(Ordering::SeqCst);
495 assert!(
496 max >= 2,
497 "expected parallel execution (max inflight >= 2), got {max}"
498 );
499 }
500
501 #[tokio::test]
502 async fn launch_propagates_worker_error_as_flow_eval_err() {
503 let factory = RustFnInProcessSpawnerFactory::new()
504 .register_fn("ok", |inv| async move {
505 Ok(WorkerResult {
506 value: json!(inv.prompt),
507 ok: true,
508 })
509 })
510 .register_fn("boom", |_inv| async move {
511 Err(WorkerError::Failed("intentional boom".into()))
512 });
513 let svc = build_service(factory);
514 let flow = FlowNode::Seq {
515 children: vec![
516 step("ok", path("$.input"), path("$.s1")),
517 step("boom", path("$.s1"), path("$.s2")),
518 step("ok", path("$.s2"), path("$.s3")),
519 ],
520 };
521 let blueprint = bp(flow, vec![agent("ok", "ok"), agent("boom", "boom")]);
522 let err = svc
523 .launch(launch_input(blueprint, json!({ "input": "x" })))
524 .await
525 .expect_err("expected fail");
526 match err {
527 TaskLaunchError::FlowEval(msg) => {
528 assert!(
529 msg.contains("boom") || msg.contains("intentional"),
530 "expected error to mention worker failure, got: {msg}"
531 );
532 }
533 other => panic!("expected FlowEval error, got {other:?}"),
534 }
535 }
536
537 #[tokio::test]
538 async fn launch_resolves_call_extern_via_registered_externs() {
539 let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
540 Ok(WorkerResult {
541 value: json!({ "echoed": inv.prompt }),
542 ok: true,
543 })
544 });
545 let mut externs = mlua_flow_ir::ExternMap::new();
546 externs.register("fmt.greet", |args: &[Value]| {
547 let name = args[0].as_str().unwrap_or("?");
548 Ok(json!(format!("hello, {name}")))
549 });
550 let svc = build_service(factory).with_externs(Arc::new(externs));
551 let flow = step(
552 "echo",
553 Expr::CallExtern {
554 ref_: "fmt.greet".into(),
555 args: vec![path("$.who")],
556 },
557 path("$.out"),
558 );
559 let blueprint = bp(flow, vec![agent("echo", "echo")]);
560 let out = svc
561 .launch(launch_input(blueprint, json!({ "who": "swarm" })))
562 .await
563 .expect("launch ok");
564 assert_eq!(out.final_ctx["out"]["echoed"], json!("hello, swarm"));
565 }
566
567 #[tokio::test]
568 async fn launch_call_extern_without_registry_fails_as_flow_eval() {
569 let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
570 Ok(WorkerResult {
571 value: json!(inv.prompt),
572 ok: true,
573 })
574 });
575 let svc = build_service(factory); let flow = step(
577 "echo",
578 Expr::CallExtern {
579 ref_: "fmt.greet".into(),
580 args: vec![],
581 },
582 path("$.out"),
583 );
584 let blueprint = bp(flow, vec![agent("echo", "echo")]);
585 let err = svc
586 .launch(launch_input(blueprint, json!({})))
587 .await
588 .expect_err("expected fail");
589 match err {
590 TaskLaunchError::FlowEval(msg) => {
591 assert!(msg.contains("extern"), "expected extern error, got: {msg}");
592 }
593 other => panic!("expected FlowEval error, got {other:?}"),
594 }
595 }
596}