1use crate::blueprint::compiler::{CompileError, Compiler};
22use crate::blueprint::{Blueprint, EngineDispatcher};
23use crate::core::ctx::OperatorKind;
24use crate::core::engine::Engine;
25use crate::core::errors::EngineError;
26use crate::middleware::project_name_alias::ProjectNameAliasMiddleware;
27use crate::middleware::SpawnerStack;
28use crate::service::linker;
29use crate::types::{CapToken, Role};
30use serde_json::Value;
31use std::collections::HashMap;
32use std::time::Duration;
33use thiserror::Error;
34
35fn derive_bp_agent_kinds(blueprint: &Blueprint) -> HashMap<String, OperatorKind> {
55 let mut out = HashMap::new();
56 if blueprint.operators.is_empty() {
57 return out;
58 }
59 for agent in &blueprint.agents {
60 let Some(op_ref) = agent.spec.get("operator_ref").and_then(|v| v.as_str()) else {
61 continue;
62 };
63 let Some(op_def) = blueprint.operators.iter().find(|o| o.name == op_ref) else {
64 continue;
65 };
66 if let Some(kind) = op_def.kind {
67 out.insert(agent.name.clone(), OperatorKind::from(kind));
68 }
69 }
70 out
71}
72
73#[derive(Debug, Error)]
75pub enum TaskLaunchError {
76 #[error("compile: {0}")]
78 Compile(#[from] CompileError),
79 #[error("engine: {0}")]
81 Engine(#[from] EngineError),
82 #[error("flow eval: {0}")]
85 FlowEval(String),
86}
87
88#[derive(Debug, Clone)]
90pub struct TaskLaunchInput {
91 pub blueprint: Blueprint,
93 pub operator_id: String,
95 pub role: Role,
97 pub ttl: Duration,
99 pub operator_kind: Option<OperatorKind>,
108 pub bridge_id: Option<String>,
112 pub hook_id: Option<String>,
115 pub operator_backend_id: Option<String>,
122 pub operator_kind_overrides: HashMap<String, OperatorKind>,
127 pub init_ctx: Value,
131}
132
133impl TaskLaunchInput {
134 pub fn automate(
142 blueprint: Blueprint,
143 operator_id: impl Into<String>,
144 role: Role,
145 ttl: Duration,
146 init_ctx: Value,
147 ) -> Self {
148 Self {
149 blueprint,
150 operator_id: operator_id.into(),
151 role,
152 ttl,
153 operator_kind: None,
154 bridge_id: None,
155 hook_id: None,
156 operator_backend_id: None,
157 operator_kind_overrides: HashMap::new(),
158 init_ctx,
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct TaskLaunchOutput {
166 pub token: CapToken,
168 pub final_ctx: Value,
172}
173
174pub struct TaskLaunchService {
178 engine: Engine,
179 compiler: Compiler,
180}
181
182impl TaskLaunchService {
183 pub fn new(engine: Engine, compiler: Compiler) -> Self {
185 Self { engine, compiler }
186 }
187
188 pub fn engine(&self) -> &Engine {
190 &self.engine
191 }
192
193 pub fn compiler(&self) -> &Compiler {
195 &self.compiler
196 }
197
198 pub async fn launch(
210 &self,
211 input: TaskLaunchInput,
212 ) -> Result<TaskLaunchOutput, TaskLaunchError> {
213 let compiled = self.compiler.compile(&input.blueprint)?;
222 let spawner = linker::link(
223 compiled.router.clone(),
224 &input.blueprint.spawner_hints.layers,
225 &self.engine,
226 );
227 let spawner = if let Some(alias) = input.blueprint.metadata.project_name_alias.as_deref() {
234 SpawnerStack::new(spawner)
235 .layer(ProjectNameAliasMiddleware::new(alias))
236 .build()
237 } else {
238 spawner
239 };
240
241 let bp_agent_kinds = derive_bp_agent_kinds(&input.blueprint);
246 let bp_global_kind = input
247 .blueprint
248 .default_operator_kind
249 .map(OperatorKind::from);
250
251 let token = self
252 .engine
253 .attach_with_ids(
254 input.operator_id,
255 input.role,
256 input.ttl,
257 input.operator_kind,
258 input.bridge_id,
259 input.hook_id,
260 input.operator_backend_id,
261 input.operator_kind_overrides,
262 bp_agent_kinds,
263 bp_global_kind,
264 )
265 .await?;
266 let dispatcher =
267 EngineDispatcher::with_spawner(self.engine.clone(), token.clone(), spawner);
268 let final_ctx =
269 mlua_flow_ir::eval_async(&input.blueprint.flow, input.init_ctx, &dispatcher)
270 .await
271 .map_err(|e| TaskLaunchError::FlowEval(e.to_string()))?;
272 Ok(TaskLaunchOutput { token, final_ctx })
273 }
274}
275
276#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::blueprint::compiler::{RustFnInProcessSpawnerFactory, SpawnerRegistry};
284 use crate::blueprint::{
285 current_schema_version, AgentDef, AgentKind, AgentMeta, BlueprintMetadata, CompilerHints,
286 CompilerStrategy,
287 };
288 use crate::core::config::EngineCfg;
289 use crate::worker::adapter::{WorkerError, WorkerResult};
290 use mlua_flow_ir::{Expr, JoinMode, Node as FlowNode};
291 use serde_json::json;
292 use std::sync::Arc;
293
294 fn path(s: &str) -> Expr {
295 Expr::Path { at: s.to_string() }
296 }
297 fn step(ref_: &str, in_: Expr, out: Expr) -> FlowNode {
298 FlowNode::Step {
299 ref_: ref_.to_string(),
300 in_,
301 out,
302 }
303 }
304
305 fn agent(name: &str, fn_id: &str) -> AgentDef {
306 AgentDef {
307 name: name.to_string(),
308 kind: AgentKind::RustFn,
309 spec: json!({ "fn_id": fn_id }),
310 profile: None,
311 meta: Some(AgentMeta::default()),
312 }
313 }
314
315 fn build_service(factory: RustFnInProcessSpawnerFactory) -> TaskLaunchService {
316 let engine = Engine::new(EngineCfg::default());
317 let mut reg = SpawnerRegistry::new();
318 reg.register::<RustFnInProcessSpawnerFactory>(Arc::new(factory));
319 let compiler = Compiler::new(reg);
320 TaskLaunchService::new(engine, compiler)
321 }
322
323 fn bp(flow: FlowNode, agents: Vec<AgentDef>) -> Blueprint {
324 Blueprint {
325 schema_version: current_schema_version(),
326 id: "ut".into(),
327 flow,
328 agents,
329 operators: vec![],
330 hints: CompilerHints::default(),
331 strategy: CompilerStrategy::default(),
332 metadata: BlueprintMetadata::default(),
333 spawner_hints: Default::default(),
334 default_agent_kind: AgentKind::Operator,
335 default_operator_kind: None,
336 }
337 }
338
339 fn launch_input(blueprint: Blueprint, init_ctx: Value) -> TaskLaunchInput {
340 TaskLaunchInput::automate(
341 blueprint,
342 "ut-op",
343 Role::Operator,
344 Duration::from_secs(30),
345 init_ctx,
346 )
347 }
348
349 #[tokio::test]
350 async fn launch_single_step_writes_out_path() {
351 let factory = RustFnInProcessSpawnerFactory::new().register_fn("echo", |inv| async move {
352 Ok(WorkerResult {
353 value: json!({ "echoed": inv.prompt }),
354 ok: true,
355 })
356 });
357 let svc = build_service(factory);
358 let blueprint = bp(
359 step("echo", path("$.input"), path("$.out")),
360 vec![agent("echo", "echo")],
361 );
362 let out = svc
363 .launch(launch_input(blueprint, json!({ "input": "hi" })))
364 .await
365 .expect("launch ok");
366 assert_eq!(out.final_ctx["out"]["echoed"], "hi");
367 }
368
369 #[tokio::test]
370 async fn launch_three_step_seq_threads_ctx_forward() {
371 let factory = RustFnInProcessSpawnerFactory::new()
372 .register_fn("upper", |inv| async move {
373 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
374 Ok(WorkerResult {
375 value: json!(s.to_uppercase()),
376 ok: true,
377 })
378 })
379 .register_fn("suffix", |inv| async move {
380 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
381 Ok(WorkerResult {
382 value: json!(format!("{s}!")),
383 ok: true,
384 })
385 })
386 .register_fn("wrap", |inv| async move {
387 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
388 Ok(WorkerResult {
389 value: json!(format!("[{s}]")),
390 ok: true,
391 })
392 });
393 let svc = build_service(factory);
394 let flow = FlowNode::Seq {
395 children: vec![
396 step("upper", path("$.in"), path("$.s1")),
397 step("suffix", path("$.s1"), path("$.s2")),
398 step("wrap", path("$.s2"), path("$.s3")),
399 ],
400 };
401 let blueprint = bp(
402 flow,
403 vec![
404 agent("upper", "upper"),
405 agent("suffix", "suffix"),
406 agent("wrap", "wrap"),
407 ],
408 );
409 let out = svc
410 .launch(launch_input(blueprint, json!({ "in": "hello" })))
411 .await
412 .expect("launch ok");
413 assert_eq!(out.final_ctx["s1"], "HELLO");
414 assert_eq!(out.final_ctx["s2"], "HELLO!");
415 assert_eq!(out.final_ctx["s3"], "[HELLO!]");
416 }
417
418 #[tokio::test]
419 async fn launch_fanout_join_all_parallel_completes() {
420 use std::sync::atomic::{AtomicU32, Ordering};
421 let counter = Arc::new(AtomicU32::new(0));
422 let max_seen = Arc::new(AtomicU32::new(0));
423 let counter_clone = counter.clone();
424 let max_clone = max_seen.clone();
425
426 let factory = RustFnInProcessSpawnerFactory::new().register_fn("para", move |inv| {
429 let counter = counter_clone.clone();
430 let max_seen = max_clone.clone();
431 async move {
432 let now = counter.fetch_add(1, Ordering::SeqCst) + 1;
433 let mut prev = max_seen.load(Ordering::SeqCst);
434 while now > prev {
435 match max_seen.compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst) {
436 Ok(_) => break,
437 Err(p) => prev = p,
438 }
439 }
440 tokio::time::sleep(Duration::from_millis(50)).await;
441 counter.fetch_sub(1, Ordering::SeqCst);
442 let s = serde_json::from_str::<String>(&inv.prompt).unwrap_or(inv.prompt);
443 Ok(WorkerResult {
444 value: json!(format!("did:{s}")),
445 ok: true,
446 })
447 }
448 });
449 let svc = build_service(factory);
450 let flow = FlowNode::Fanout {
451 items: path("$.items"),
452 bind: path("$.item"),
453 body: Box::new(step("para", path("$.item"), path("$.r"))),
454 join: JoinMode::All,
455 out: path("$.results"),
456 };
457 let blueprint = bp(flow, vec![agent("para", "para")]);
458 let out = svc
459 .launch(launch_input(
460 blueprint,
461 json!({ "items": ["a", "b", "c", "d"] }),
462 ))
463 .await
464 .expect("launch ok");
465 let results = out.final_ctx["results"].as_array().expect("array");
466 assert_eq!(results.len(), 4);
467 for (i, expected) in ["a", "b", "c", "d"].iter().enumerate() {
468 assert_eq!(results[i]["r"], json!(format!("did:{expected}")));
469 }
470 let max = max_seen.load(Ordering::SeqCst);
471 assert!(
472 max >= 2,
473 "expected parallel execution (max inflight >= 2), got {max}"
474 );
475 }
476
477 #[tokio::test]
478 async fn launch_propagates_worker_error_as_flow_eval_err() {
479 let factory = RustFnInProcessSpawnerFactory::new()
480 .register_fn("ok", |inv| async move {
481 Ok(WorkerResult {
482 value: json!(inv.prompt),
483 ok: true,
484 })
485 })
486 .register_fn("boom", |_inv| async move {
487 Err(WorkerError::Failed("intentional boom".into()))
488 });
489 let svc = build_service(factory);
490 let flow = FlowNode::Seq {
491 children: vec![
492 step("ok", path("$.input"), path("$.s1")),
493 step("boom", path("$.s1"), path("$.s2")),
494 step("ok", path("$.s2"), path("$.s3")),
495 ],
496 };
497 let blueprint = bp(flow, vec![agent("ok", "ok"), agent("boom", "boom")]);
498 let err = svc
499 .launch(launch_input(blueprint, json!({ "input": "x" })))
500 .await
501 .expect_err("expected fail");
502 match err {
503 TaskLaunchError::FlowEval(msg) => {
504 assert!(
505 msg.contains("boom") || msg.contains("intentional"),
506 "expected error to mention worker failure, got: {msg}"
507 );
508 }
509 other => panic!("expected FlowEval error, got {other:?}"),
510 }
511 }
512}