use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{InputSchema, OutputSchema, RunError, RuntimeKind};
fn default_failure_inherit() -> bool {
true
}
pub const MAX_NESTING_DEPTH: u32 = 10;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum SwarmApiVersion {
#[serde(rename = "bzzz.dev/v1")]
#[default]
V1,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum SwarmKind {
#[default]
Swarm,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SwarmId(pub String);
impl SwarmId {
pub fn new(name: impl Into<String>) -> Self {
SwarmId(name.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct SwarmInterface {
#[serde(default)]
pub input: Option<InputSchema>,
#[serde(default)]
pub output: Option<OutputSchema>,
}
impl SwarmInterface {
pub fn new() -> Self {
SwarmInterface::default()
}
pub fn with_input(mut self, input: InputSchema) -> Self {
self.input = Some(input);
self
}
pub fn with_output(mut self, output: OutputSchema) -> Self {
self.output = Some(output);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ExposeMapping {
pub name: String,
pub from: String,
#[serde(default)]
pub transform: Option<String>,
}
impl ExposeMapping {
pub fn new(name: impl Into<String>, from: impl Into<String>) -> Self {
ExposeMapping {
name: name.into(),
from: from.into(),
transform: None,
}
}
pub fn with_transform(mut self, transform: impl Into<String>) -> Self {
self.transform = Some(transform.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InlineWorker {
pub command: String,
#[serde(default)]
pub env: HashMap<String, String>,
}
impl InlineWorker {
pub fn new(command: impl Into<String>) -> Self {
InlineWorker {
command: command.into(),
env: HashMap::new(),
}
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Worker {
pub name: String,
#[serde(default)]
pub spec: Option<std::path::PathBuf>,
#[serde(default)]
pub a2a: Option<String>,
#[serde(default)]
pub inline: Option<InlineWorker>,
#[serde(default)]
pub runtime: Option<RuntimeKind>,
#[serde(default)]
pub input: HashMap<String, Value>,
#[serde(default)]
pub output: HashMap<String, Value>,
#[serde(default, with = "serde_duration_opt")]
pub timeout: Option<Duration>,
}
impl Worker {
pub fn new(name: impl Into<String>, spec: impl Into<std::path::PathBuf>) -> Self {
Worker {
name: name.into(),
spec: Some(spec.into()),
a2a: None,
inline: None,
runtime: None,
input: HashMap::new(),
output: HashMap::new(),
timeout: None,
}
}
pub fn new_a2a(name: impl Into<String>, url: impl Into<String>) -> Self {
Worker {
name: name.into(),
spec: None,
a2a: Some(url.into()),
inline: None,
runtime: None,
input: HashMap::new(),
output: HashMap::new(),
timeout: None,
}
}
pub fn new_inline(name: impl Into<String>, command: impl Into<String>) -> Self {
Worker {
name: name.into(),
spec: None,
a2a: None,
inline: Some(InlineWorker::new(command)),
runtime: None,
input: HashMap::new(),
output: HashMap::new(),
timeout: None,
}
}
pub fn with_runtime(mut self, runtime: RuntimeKind) -> Self {
self.runtime = Some(runtime);
self
}
pub fn with_input(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.input.insert(key.into(), Value::String(value.into()));
self
}
pub fn with_input_value(mut self, key: impl Into<String>, value: Value) -> Self {
self.input.insert(key.into(), value);
self
}
pub fn with_output(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.output.insert(key.into(), Value::String(value.into()));
self
}
pub fn with_output_value(mut self, key: impl Into<String>, value: Value) -> Self {
self.output.insert(key.into(), value);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkflowTask {
pub name: String,
#[serde(default)]
pub depends_on: Vec<String>,
}
impl WorkflowTask {
pub fn new(name: impl Into<String>) -> Self {
WorkflowTask {
name: name.into(),
depends_on: vec![],
}
}
pub fn with_depends_on(mut self, deps: Vec<String>) -> Self {
self.depends_on = deps;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FlowPattern {
Sequence {
#[serde(default)]
steps: Vec<String>,
},
Parallel {
#[serde(default)]
branches: Vec<String>,
#[serde(default)]
fail_fast: bool,
},
Conditional {
condition: String,
then: String,
#[serde(default)]
else_: Option<String>,
},
Loop {
over: String,
do_: String,
#[serde(default)]
max_iterations: u32,
},
Delegate {
#[serde(default)]
swarm: Option<PathBuf>,
#[serde(default)]
input_mapping: HashMap<String, Value>,
#[serde(default)]
output_mapping: HashMap<String, Value>,
#[serde(default = "default_failure_inherit")]
failure_inherit: bool,
#[serde(default)]
worker_expr: Option<String>,
#[serde(default)]
fallback: Option<String>,
},
Supervisor {
workers: Vec<String>,
#[serde(default)]
restart_policy: RestartPolicy,
#[serde(default)]
recovery_policy: Option<RecoveryPolicy>,
},
Compete {
workers: Vec<String>,
},
Escalation {
primary: String,
chain: Vec<String>,
},
Alongside {
main: String,
side: Vec<String>,
},
Workflow {
tasks: Vec<WorkflowTask>,
#[serde(default)]
synthesis: Option<String>,
},
}
impl FlowPattern {
pub fn type_name(&self) -> &'static str {
match self {
FlowPattern::Sequence { .. } => "sequence",
FlowPattern::Parallel { .. } => "parallel",
FlowPattern::Conditional { .. } => "conditional",
FlowPattern::Loop { .. } => "loop",
FlowPattern::Delegate { .. } => "delegate",
FlowPattern::Supervisor { .. } => "supervisor",
FlowPattern::Compete { .. } => "compete",
FlowPattern::Escalation { .. } => "escalation",
FlowPattern::Alongside { .. } => "alongside",
FlowPattern::Workflow { .. } => "workflow",
}
}
pub fn referenced_worker_names(&self) -> Vec<&str> {
match self {
FlowPattern::Sequence { steps } => steps.iter().map(String::as_str).collect(),
FlowPattern::Parallel { branches, .. } => {
branches.iter().map(String::as_str).collect()
}
FlowPattern::Conditional { then, else_, .. } => {
let mut names = vec![then.as_str()];
if let Some(e) = else_ {
names.push(e.as_str());
}
names
}
FlowPattern::Loop { do_, .. } => vec![do_.as_str()],
FlowPattern::Delegate { fallback, .. } => {
if let Some(f) = fallback {
vec![f.as_str()]
} else {
vec![]
}
}
FlowPattern::Supervisor { workers, .. } => {
workers.iter().map(String::as_str).collect()
}
FlowPattern::Compete { workers } => workers.iter().map(String::as_str).collect(),
FlowPattern::Escalation { primary, chain } => {
let mut names = vec![primary.as_str()];
names.extend(chain.iter().map(String::as_str));
names
}
FlowPattern::Alongside { main, side } => {
let mut names = vec![main.as_str()];
names.extend(side.iter().map(String::as_str));
names
}
FlowPattern::Workflow { tasks, synthesis } => {
let mut names: Vec<&str> = tasks.iter().map(|t| t.name.as_str()).collect();
if let Some(s) = synthesis {
names.push(s.as_str());
}
names
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum RestartPolicy {
Never,
#[default]
OnFailure,
Always,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub struct RecoveryPolicy {
#[serde(default = "default_retry_attempts")]
pub retry_attempts: u32,
#[serde(default)]
pub replan_expr: Option<String>,
#[serde(default)]
pub replan_fallback: Option<String>,
#[serde(default)]
pub decompose_swarm: Option<PathBuf>,
#[serde(default)]
pub decompose_input: HashMap<String, Value>,
#[serde(default)]
pub decompose_output: HashMap<String, Value>,
}
fn default_retry_attempts() -> u32 {
3
}
impl RecoveryPolicy {
pub fn new() -> Self {
RecoveryPolicy {
retry_attempts: default_retry_attempts(),
replan_expr: None,
replan_fallback: None,
decompose_swarm: None,
decompose_input: HashMap::new(),
decompose_output: HashMap::new(),
}
}
pub fn with_retry_attempts(mut self, attempts: u32) -> Self {
self.retry_attempts = attempts;
self
}
pub fn with_replan(mut self, expr: impl Into<String>) -> Self {
self.replan_expr = Some(expr.into());
self
}
pub fn with_replan_fallback(mut self, fallback: impl Into<String>) -> Self {
self.replan_fallback = Some(fallback.into());
self
}
pub fn with_decompose(mut self, swarm: impl Into<PathBuf>) -> Self {
self.decompose_swarm = Some(swarm.into());
self
}
pub fn with_decompose_input(mut self, input: HashMap<String, Value>) -> Self {
self.decompose_input = input;
self
}
pub fn with_decompose_output(mut self, output: HashMap<String, Value>) -> Self {
self.decompose_output = output;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum FailureBehavior {
#[default]
FailFast,
Continue,
Ignore,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum OutputBehavior {
All,
#[default]
Last,
Aggregate,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmFile {
#[serde(rename = "apiVersion")]
pub api_version: SwarmApiVersion,
pub kind: SwarmKind,
pub id: SwarmId,
pub workers: Vec<Worker>,
pub flow: FlowPattern,
#[serde(default)]
pub runtime: Option<RuntimeKind>,
#[serde(default)]
pub on_failure: FailureBehavior,
#[serde(default, with = "serde_duration_opt")]
pub timeout: Option<Duration>,
#[serde(default)]
pub output: OutputBehavior,
#[serde(default)]
pub interface: SwarmInterface,
#[serde(default)]
pub expose: Vec<ExposeMapping>,
#[serde(skip)]
pub file_path: Option<PathBuf>,
}
mod serde_duration_opt {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::Duration;
pub fn serialize<S>(value: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Some(d) => d.as_secs().serialize(serializer),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<u64>::deserialize(deserializer)?;
Ok(opt.map(Duration::from_secs))
}
}
impl SwarmFile {
pub fn new(id: impl Into<String>, flow: FlowPattern) -> Self {
SwarmFile {
api_version: SwarmApiVersion::V1,
kind: SwarmKind::Swarm,
id: SwarmId::new(id),
workers: Vec::new(),
flow,
runtime: None,
on_failure: FailureBehavior::default(),
timeout: None,
output: OutputBehavior::default(),
interface: SwarmInterface::default(),
expose: Vec::new(),
file_path: None,
}
}
pub fn with_worker(mut self, worker: Worker) -> Self {
self.workers.push(worker);
self
}
pub fn with_workers(mut self, workers: Vec<Worker>) -> Self {
self.workers.extend(workers);
self
}
pub fn with_failure_behavior(mut self, behavior: FailureBehavior) -> Self {
self.on_failure = behavior;
self
}
pub fn with_runtime(mut self, runtime: RuntimeKind) -> Self {
self.runtime = Some(runtime);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_output_behavior(mut self, behavior: OutputBehavior) -> Self {
self.output = behavior;
self
}
pub fn with_interface(mut self, interface: SwarmInterface) -> Self {
self.interface = interface;
self
}
pub fn with_expose(mut self, expose: ExposeMapping) -> Self {
self.expose.push(expose);
self
}
pub fn with_exposes(mut self, exposes: Vec<ExposeMapping>) -> Self {
self.expose = exposes;
self
}
pub fn validate(&self) -> Result<(), RunError> {
if self.id.0.is_empty() {
return Err(RunError::InvalidConfig {
message: "SwarmFile ID cannot be empty".into(),
});
}
for worker in &self.workers {
if worker.name.is_empty() {
return Err(RunError::InvalidConfig {
message: "Worker name cannot be empty".into(),
});
}
let has_spec = worker.spec.is_some();
let has_a2a = worker.a2a.is_some();
let has_inline = worker.inline.is_some();
let count = has_spec as usize + has_a2a as usize + has_inline as usize;
if count == 0 {
return Err(RunError::InvalidConfig {
message: format!(
"Worker '{}' must have one of: spec, a2a, or inline",
worker.name
),
});
}
if count > 1 {
return Err(RunError::InvalidConfig {
message: format!(
"Worker '{}' has conflicting definitions (spec, a2a, inline are mutually exclusive)",
worker.name
),
});
}
if let Some(inline) = &worker.inline {
if inline.command.is_empty() {
return Err(RunError::InvalidConfig {
message: format!(
"Worker '{}' inline definition has empty command",
worker.name
),
});
}
}
}
for mapping in &self.expose {
if mapping.name.is_empty() {
return Err(RunError::InvalidConfig {
message: "Expose mapping name cannot be empty".into(),
});
}
if mapping.from.is_empty() {
return Err(RunError::InvalidConfig {
message: "Expose mapping 'from' cannot be empty".into(),
});
}
}
let mut seen_names = std::collections::HashSet::new();
for mapping in &self.expose {
if seen_names.contains(&mapping.name) {
return Err(RunError::InvalidConfig {
message: format!("Duplicate expose name: {}", mapping.name),
});
}
seen_names.insert(&mapping.name);
}
let worker_names: std::collections::HashSet<&str> =
self.workers.iter().map(|w| w.name.as_str()).collect();
let pattern_type = self.flow.type_name();
for name in self.flow.referenced_worker_names() {
if !worker_names.contains(name) {
return Err(RunError::InvalidConfig {
message: format!(
"pattern '{}' references undefined worker '{}'",
pattern_type, name
),
});
}
}
if let Some(path) = &self.file_path {
self.validate_no_circular_nesting(path, &mut std::collections::HashSet::new())?;
}
if let FlowPattern::Workflow { tasks, synthesis } = &self.flow {
let mut task_names: std::collections::HashSet<&str> = std::collections::HashSet::new();
for task in tasks {
if task_names.contains(task.name.as_str()) {
return Err(RunError::InvalidConfig {
message: format!("duplicate task name in workflow: {}", task.name),
});
}
task_names.insert(task.name.as_str());
}
for task in tasks {
for dep in &task.depends_on {
if !task_names.contains(dep.as_str()) {
return Err(RunError::InvalidConfig {
message: format!(
"Task '{}' depends on undefined task '{}'",
task.name, dep
),
});
}
}
}
if has_workflow_cycle(tasks) {
return Err(RunError::InvalidConfig {
message: "workflow contains a cycle in dependencies".into(),
});
}
if let Some(synth) = synthesis {
if !worker_names.contains(synth.as_str()) {
return Err(RunError::InvalidConfig {
message: format!(
"Workflow synthesis worker '{}' not found in workers list",
synth
),
});
}
}
}
Ok(())
}
}
fn has_workflow_cycle(tasks: &[WorkflowTask]) -> bool {
let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
for task in tasks {
for dep in &task.depends_on {
adj.entry(dep.as_str()).or_default().push(task.name.as_str());
}
}
let mut visited: std::collections::HashSet<&str> = std::collections::HashSet::new();
let mut in_progress: std::collections::HashSet<&str> = std::collections::HashSet::new();
for task in tasks {
let start = task.name.as_str();
if visited.contains(start) {
continue;
}
let mut stack: Vec<(&str, bool)> = vec![(start, false)];
while let Some((node, is_backtrack)) = stack.pop() {
if is_backtrack {
in_progress.remove(node);
continue;
}
if visited.contains(node) {
continue;
}
if in_progress.contains(node) {
return true; }
in_progress.insert(node);
stack.push((node, true));
if let Some(neighbors) = adj.get(node) {
for neighbor in neighbors {
if !visited.contains(neighbor) {
stack.push((neighbor, false));
}
}
}
}
visited.extend(in_progress.iter().copied());
}
false
}
impl SwarmFile {
pub fn validate_no_circular_nesting(
&self,
current_path: &Path,
visited: &mut std::collections::HashSet<PathBuf>,
) -> Result<(), RunError> {
let canonical_path = current_path
.canonicalize()
.map_err(|e| RunError::InvalidConfig {
message: format!("Failed to resolve path {}: {}", current_path.display(), e),
})?;
if visited.contains(&canonical_path) {
return Err(RunError::InvalidConfig {
message: format!(
"Circular nesting detected: {} references already-visited SwarmFile",
current_path.display()
),
});
}
visited.insert(canonical_path.clone());
if let FlowPattern::Delegate {
swarm: Some(swarm_path),
..
} = &self.flow
{
let base_dir = current_path.parent().unwrap_or(std::path::Path::new("."));
let delegate_path = base_dir.join(swarm_path);
if delegate_path.exists() {
let delegate_swarm = SwarmFile::from_yaml_file(&delegate_path)?;
delegate_swarm.validate_no_circular_nesting(&delegate_path, visited)?;
}
}
Ok(())
}
pub fn nesting_depth(&self) -> Result<u32, RunError> {
self.compute_nesting_depth(&mut std::collections::HashSet::new())
}
fn compute_nesting_depth(
&self,
visited: &mut std::collections::HashSet<PathBuf>,
) -> Result<u32, RunError> {
match &self.flow {
FlowPattern::Delegate {
swarm: Some(swarm_path),
..
} => {
let current_path = self.file_path.clone().unwrap_or_default();
if current_path.exists() {
let canonical =
current_path
.canonicalize()
.map_err(|e| RunError::InvalidConfig {
message: format!("Failed to resolve path: {}", e),
})?;
if visited.contains(&canonical) {
return Err(RunError::InvalidConfig {
message: "Circular nesting detected while computing depth".into(),
});
}
visited.insert(canonical);
}
let base_dir = current_path.parent().unwrap_or(std::path::Path::new("."));
let delegate_path = base_dir.join(swarm_path);
if !delegate_path.exists() {
return Ok(1);
}
let delegate_swarm = SwarmFile::from_yaml_file(&delegate_path)?;
let child_depth = delegate_swarm.compute_nesting_depth(visited)?;
let total_depth = 1 + child_depth;
if total_depth > MAX_NESTING_DEPTH {
return Err(RunError::InvalidConfig {
message: format!(
"Nesting depth {} exceeds maximum allowed {}",
total_depth, MAX_NESTING_DEPTH
),
});
}
Ok(total_depth)
}
_ => Ok(0),
}
}
pub fn from_yaml_file(path: &PathBuf) -> Result<Self, RunError> {
let content = std::fs::read_to_string(path).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to read {}: {}", path.display(), e),
})?;
let mut swarm: SwarmFile =
serde_yaml::from_str(&content).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to parse YAML: {}", e),
})?;
swarm.file_path = Some(path.clone());
swarm.validate()?;
Ok(swarm)
}
pub fn to_yaml_file(&self, path: &PathBuf) -> Result<(), RunError> {
let content = serde_yaml::to_string(self).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to serialize: {}", e),
})?;
std::fs::write(path, content).map_err(|e| RunError::InvalidConfig {
message: format!("Failed to write {}: {}", path.display(), e),
})?;
Ok(())
}
pub fn is_simple(&self) -> bool {
if self.workers.len() != 1 {
return false;
}
match &self.flow {
FlowPattern::Sequence { steps } => steps.len() == 1,
FlowPattern::Supervisor { workers, .. } => workers.len() == 1,
FlowPattern::Escalation { primary, chain } => chain.is_empty() && primary == &self.workers[0].name,
FlowPattern::Alongside { main, side } => side.is_empty() && main == &self.workers[0].name,
FlowPattern::Compete { workers } => workers.len() == 1,
FlowPattern::Parallel { branches, .. } => branches.len() == 1,
FlowPattern::Conditional { .. } => false,
FlowPattern::Loop { .. } => false,
FlowPattern::Delegate { .. } => false,
FlowPattern::Workflow { .. } => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_swarm_file_creation() {
let swarm = SwarmFile::new("test-swarm", FlowPattern::Sequence { steps: vec![] });
assert_eq!(swarm.id.as_str(), "test-swarm");
assert_eq!(swarm.api_version, SwarmApiVersion::V1);
assert!(swarm.interface.input.is_none());
assert!(swarm.interface.output.is_none());
assert!(swarm.expose.is_empty());
}
#[test]
fn test_worker() {
let worker = Worker::new("agent1", "agent.yaml")
.with_runtime(RuntimeKind::Local)
.with_input("key", "value");
assert_eq!(worker.name, "agent1");
assert_eq!(worker.runtime, Some(RuntimeKind::Local));
assert_eq!(
worker.input.get("key"),
Some(&Value::String("value".to_string()))
);
}
#[test]
fn test_worker_complex_input() {
let worker = Worker::new("agent2", "agent.yaml")
.with_input_value(
"items",
Value::Array(vec![
Value::String("a".to_string()),
Value::String("b".to_string()),
]),
)
.with_input_value(
"config",
Value::Object(serde_json::Map::from_iter(vec![(
"timeout".to_string(),
Value::Number(30.into()),
)])),
);
assert!(worker.input.get("items").unwrap().is_array());
assert!(worker.input.get("config").unwrap().is_object());
}
#[test]
fn test_inline_worker_creation() {
let worker = Worker::new_inline("processor", "python process.py");
assert_eq!(worker.name, "processor");
assert!(worker.inline.is_some());
assert_eq!(worker.inline.unwrap().command, "python process.py");
assert!(worker.spec.is_none());
assert!(worker.a2a.is_none());
}
#[test]
fn test_inline_worker_with_env() {
let inline = InlineWorker::new("./scripts/analyze.sh")
.with_env("DEBUG", "true")
.with_env("PATH", "/usr/local/bin");
assert_eq!(inline.command, "./scripts/analyze.sh");
assert_eq!(inline.env.get("DEBUG"), Some(&"true".to_string()));
assert_eq!(inline.env.get("PATH"), Some(&"/usr/local/bin".to_string()));
}
#[test]
fn test_inline_worker_yaml_parsing() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: inline-test
workers:
- name: processor
inline:
command: python process.py
env:
DEBUG: "true"
flow:
type: sequence
steps: [processor]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "inline-test");
assert_eq!(parsed.workers.len(), 1);
let worker = &parsed.workers[0];
assert_eq!(worker.name, "processor");
assert!(worker.inline.is_some());
assert!(worker.spec.is_none());
assert!(worker.a2a.is_none());
let inline = worker.inline.as_ref().unwrap();
assert_eq!(inline.command, "python process.py");
assert_eq!(inline.env.get("DEBUG"), Some(&"true".to_string()));
}
#[test]
fn test_inline_worker_yaml_roundtrip() {
let swarm = SwarmFile::new(
"inline-demo",
FlowPattern::Sequence {
steps: vec!["worker1".into()],
},
)
.with_worker(Worker::new_inline("worker1", "echo hello"));
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.id, swarm.id);
assert_eq!(parsed.workers.len(), 1);
assert!(parsed.workers[0].inline.is_some());
}
#[test]
fn test_inline_worker_validation() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["inline_worker".into()],
},
)
.with_worker(Worker::new_inline("inline_worker", "python script.py"));
assert!(swarm.validate().is_ok());
}
#[test]
fn test_inline_worker_minimal_yaml() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: minimal-inline
workers:
- name: simple
inline:
command: echo "hello"
flow:
type: sequence
steps: [simple]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert!(parsed.validate().is_ok());
let inline = parsed.workers[0].inline.as_ref().unwrap();
assert_eq!(inline.command, "echo \"hello\"");
assert!(inline.env.is_empty());
}
#[test]
fn test_inline_worker_backward_compatibility() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: legacy-swarm
workers:
- name: agent1
spec: agent.yaml
flow:
type: sequence
steps: [agent1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "legacy-swarm");
assert!(parsed.workers[0].inline.is_none());
assert!(parsed.validate().is_ok());
}
#[test]
fn test_worker_validation_missing_definition() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: missing-def
workers:
- name: worker1
flow:
type: sequence
steps: [worker1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
let err = parsed.validate().unwrap_err();
assert!(err.to_string().contains("must have one of"));
}
#[test]
fn test_worker_validation_conflicting_spec_and_inline() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: conflict-spec-inline
workers:
- name: worker1
spec: agent.yaml
inline:
command: echo hello
flow:
type: sequence
steps: [worker1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
let err = parsed.validate().unwrap_err();
assert!(err.to_string().contains("conflicting definitions"));
}
#[test]
fn test_worker_validation_conflicting_a2a_and_inline() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: conflict-a2a-inline
workers:
- name: worker1
a2a: https://agent.example.com
inline:
command: echo hello
flow:
type: sequence
steps: [worker1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
let err = parsed.validate().unwrap_err();
assert!(err.to_string().contains("conflicting definitions"));
}
#[test]
fn test_worker_validation_conflicting_spec_and_a2a() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: conflict-spec-a2a
workers:
- name: worker1
spec: agent.yaml
a2a: https://agent.example.com
flow:
type: sequence
steps: [worker1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
let err = parsed.validate().unwrap_err();
assert!(err.to_string().contains("conflicting definitions"));
}
#[test]
fn test_inline_worker_empty_command_validation() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: empty-command
workers:
- name: worker1
inline:
command: ""
flow:
type: sequence
steps: [worker1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
let err = parsed.validate().unwrap_err();
assert!(err.to_string().contains("empty command"));
}
#[test]
fn test_flow_patterns() {
let sequence = FlowPattern::Sequence {
steps: vec!["step1".into(), "step2".into()],
};
assert!(matches!(sequence, FlowPattern::Sequence { .. }));
let parallel = FlowPattern::Parallel {
branches: vec!["a".into(), "b".into()],
fail_fast: true,
};
assert!(matches!(parallel, FlowPattern::Parallel { .. }));
let conditional = FlowPattern::Conditional {
condition: "x > 0".into(),
then: "a".into(),
else_: Some("b".into()),
};
assert!(matches!(conditional, FlowPattern::Conditional { .. }));
}
#[test]
fn test_swarm_validation() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_worker(Worker::new("w1", "agent.yaml"));
assert!(swarm.validate().is_ok());
let invalid = SwarmFile::new("", FlowPattern::Sequence { steps: vec![] });
assert!(invalid.validate().is_err());
}
#[test]
fn test_yaml_roundtrip() {
let swarm = SwarmFile::new(
"test-swarm",
FlowPattern::Sequence {
steps: vec!["a".into()],
},
)
.with_worker(Worker::new("agent1", "agent.yaml"))
.with_timeout(Duration::from_secs(60));
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.id, swarm.id);
assert_eq!(parsed.workers.len(), 1);
assert_eq!(parsed.timeout, swarm.timeout);
}
#[test]
fn test_all_patterns() {
let patterns = vec![
FlowPattern::Sequence { steps: vec![] },
FlowPattern::Parallel {
branches: vec![],
fail_fast: false,
},
FlowPattern::Conditional {
condition: "true".into(),
then: "a".into(),
else_: None,
},
FlowPattern::Loop {
over: "items".into(),
do_: "process".into(),
max_iterations: 0,
},
FlowPattern::Delegate {
swarm: Some(PathBuf::from("sub.yaml")),
worker_expr: None,
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
FlowPattern::Supervisor {
workers: vec![],
restart_policy: RestartPolicy::OnFailure,
recovery_policy: None,
},
FlowPattern::Compete { workers: vec![] },
FlowPattern::Escalation {
primary: "main".into(),
chain: vec![],
},
FlowPattern::Alongside {
main: "main".into(),
side: vec![],
},
];
for pattern in patterns {
let swarm = SwarmFile::new("test", pattern.clone());
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.flow, pattern);
}
}
#[test]
fn test_swarm_interface_creation() {
let empty = SwarmInterface::new();
assert!(empty.input.is_none());
assert!(empty.output.is_none());
let interface = SwarmInterface::new()
.with_input(InputSchema::new(serde_json::json!({
"type": "object",
"properties": { "query": { "type": "string" } },
"required": ["query"]
})))
.with_output(OutputSchema::new(serde_json::json!({
"type": "object",
"properties": { "result": { "type": "string" } }
})));
assert!(interface.input.is_some());
assert!(interface.output.is_some());
}
#[test]
fn test_expose_mapping_creation() {
let mapping = ExposeMapping::new("results", "steps.parser.output.items");
assert_eq!(mapping.name, "results");
assert_eq!(mapping.from, "steps.parser.output.items");
assert!(mapping.transform.is_none());
let with_transform = mapping.with_transform("uppercase");
assert_eq!(with_transform.transform, Some("uppercase".to_string()));
}
#[test]
fn test_swarm_with_interface() {
let swarm = SwarmFile::new(
"data-processor",
FlowPattern::Sequence {
steps: vec!["fetcher".into()],
},
)
.with_interface(
SwarmInterface::new()
.with_input(InputSchema::new(serde_json::json!({
"type": "object",
"properties": { "query": { "type": "string" } }
})))
.with_output(OutputSchema::new(serde_json::json!({
"type": "object",
"properties": { "results": { "type": "array" } }
}))),
);
assert!(swarm.interface.input.is_some());
assert!(swarm.interface.output.is_some());
}
#[test]
fn test_swarm_with_expose() {
let swarm = SwarmFile::new(
"processor",
FlowPattern::Sequence {
steps: vec!["parser".into()],
},
)
.with_expose(ExposeMapping::new("results", "steps.parser.output.items"))
.with_expose(ExposeMapping::new("count", "steps.parser.output.total"));
assert_eq!(swarm.expose.len(), 2);
assert_eq!(swarm.expose[0].name, "results");
assert_eq!(swarm.expose[1].name, "count");
}
#[test]
fn test_swarm_with_exposes() {
let exposes = vec![
ExposeMapping::new("results", "steps.parser.output.items"),
ExposeMapping::new("count", "steps.parser.output.total"),
];
let swarm = SwarmFile::new("processor", FlowPattern::Sequence { steps: vec![] })
.with_exposes(exposes);
assert_eq!(swarm.expose.len(), 2);
}
#[test]
fn test_expose_validation_duplicate_names() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_expose(ExposeMapping::new("result", "steps.a.output.x"))
.with_expose(ExposeMapping::new("result", "steps.b.output.y"));
assert!(swarm.validate().is_err());
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("Duplicate expose name"));
}
#[test]
fn test_expose_validation_empty_name() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_expose(ExposeMapping::new("", "steps.a.output.x"));
assert!(swarm.validate().is_err());
let err = swarm.validate().unwrap_err();
assert!(err
.to_string()
.contains("Expose mapping name cannot be empty"));
}
#[test]
fn test_expose_validation_empty_from() {
let mapping = ExposeMapping {
name: "result".to_string(),
from: "".to_string(),
transform: None,
};
let swarm =
SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] }).with_expose(mapping);
assert!(swarm.validate().is_err());
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("'from' cannot be empty"));
}
#[test]
fn test_interface_yaml_roundtrip() {
let swarm = SwarmFile::new(
"api",
FlowPattern::Sequence {
steps: vec!["step1".into()],
},
)
.with_interface(
SwarmInterface::new()
.with_input(InputSchema::new(serde_json::json!({
"type": "object",
"properties": { "query": { "type": "string" } }
})))
.with_output(OutputSchema::new(serde_json::json!({
"type": "object",
"properties": { "result": { "type": "string" } }
}))),
)
.with_expose(ExposeMapping::new("result", "steps.step1.output.value"));
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.id, swarm.id);
assert!(parsed.interface.input.is_some());
assert!(parsed.interface.output.is_some());
assert_eq!(parsed.expose.len(), 1);
assert_eq!(parsed.expose[0].name, "result");
}
#[test]
fn test_backward_compatibility_no_interface() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: legacy-swarm
workers:
- name: agent1
spec: agent.yaml
flow:
type: sequence
steps: [agent1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "legacy-swarm");
assert!(parsed.interface.input.is_none());
assert!(parsed.interface.output.is_none());
assert!(parsed.expose.is_empty());
assert!(parsed.validate().is_ok());
}
#[test]
fn test_interface_yaml_parsing() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: data-processor
interface:
input:
schema:
type: object
properties:
query:
type: string
required: [query]
required: true
output:
schema:
type: object
properties:
results:
type: array
count:
type: integer
required: [results, count]
workers:
- name: parser
spec: parser.yaml
flow:
type: sequence
steps: [parser]
expose:
- name: results
from: steps.parser.output.items
- name: count
from: steps.parser.output.total
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "data-processor");
assert!(parsed.interface.input.is_some());
assert!(parsed.interface.output.is_some());
assert_eq!(parsed.expose.len(), 2);
assert!(parsed.validate().is_ok());
}
#[test]
fn test_swarm_with_runtime() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] })
.with_runtime(RuntimeKind::Docker);
assert_eq!(swarm.runtime, Some(RuntimeKind::Docker));
}
#[test]
fn test_swarm_runtime_yaml_roundtrip() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["step1".into()],
},
)
.with_worker(Worker::new("w1", "agent.yaml"))
.with_runtime(RuntimeKind::Docker);
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert_eq!(parsed.runtime, Some(RuntimeKind::Docker));
}
#[test]
fn test_swarm_runtime_backward_compatible() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: test-swarm
workers:
- name: agent1
spec: agent.yaml
flow:
type: sequence
steps: [agent1]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "test-swarm");
assert_eq!(parsed.runtime, None);
}
#[test]
fn test_worker_runtime_override() {
let worker = Worker::new("agent1", "agent.yaml").with_runtime(RuntimeKind::Http);
assert_eq!(worker.runtime, Some(RuntimeKind::Http));
}
#[test]
fn test_swarm_with_worker_runtime_override_yaml() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: test-swarm
runtime: Docker
workers:
- name: agent1
spec: agent.yaml
runtime: Http
- name: agent2
spec: agent.yaml
flow:
type: sequence
steps: [agent1, agent2]
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.runtime, Some(RuntimeKind::Docker));
assert_eq!(parsed.workers[0].runtime, Some(RuntimeKind::Http));
assert_eq!(parsed.workers[1].runtime, None);
}
#[test]
fn test_delegate_pattern_with_input_output_mapping() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: Some(PathBuf::from("sub.yaml")),
worker_expr: None,
fallback: None,
input_mapping: HashMap::from([(
"query".to_string(),
Value::String("{{input.search}}".to_string()),
)]),
output_mapping: HashMap::from([(
"result".to_string(),
Value::String("{{steps.delegate.output.data}}".to_string()),
)]),
failure_inherit: true,
},
);
let yaml = serde_yaml::to_string(&swarm).unwrap();
let parsed: SwarmFile = serde_yaml::from_str(&yaml).unwrap();
assert!(matches!(parsed.flow, FlowPattern::Delegate { .. }));
}
#[test]
fn test_delegate_pattern_yaml_parsing() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: parent-swarm
workers:
- name: prep
spec: prep.yaml
flow:
type: delegate
swarm: child.yaml
input_mapping:
query: "{{input.search}}"
limit: 10
output_mapping:
result: "{{steps.delegate.output.data}}"
failure_inherit: true
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "parent-swarm");
if let FlowPattern::Delegate {
swarm,
worker_expr,
fallback,
input_mapping,
output_mapping: _,
failure_inherit,
} = &parsed.flow
{
assert_eq!(swarm, &Some(PathBuf::from("child.yaml")));
assert!(worker_expr.is_none());
assert!(fallback.is_none());
assert_eq!(
input_mapping.get("query"),
Some(&Value::String("{{input.search}}".to_string()))
);
assert_eq!(input_mapping.get("limit"), Some(&Value::Number(10.into())));
assert!(*failure_inherit);
} else {
panic!("Expected Delegate pattern");
}
}
#[test]
fn test_delegate_pattern_defaults() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: minimal-delegate
workers: []
flow:
type: delegate
swarm: sub.yaml
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
if let FlowPattern::Delegate {
swarm,
worker_expr,
fallback,
input_mapping,
output_mapping,
failure_inherit,
} = &parsed.flow
{
assert_eq!(swarm, &Some(PathBuf::from("sub.yaml")));
assert!(worker_expr.is_none());
assert!(fallback.is_none());
assert!(input_mapping.is_empty());
assert!(output_mapping.is_empty());
assert!(*failure_inherit); } else {
panic!("Expected Delegate pattern");
}
}
#[test]
fn test_delegate_pattern_with_worker_expr() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: dynamic-selector
workers:
- name: processor_a
spec: processor_a.yaml
- name: processor_b
spec: processor_b.yaml
- name: default_worker
spec: default.yaml
flow:
type: delegate
worker_expr: "{{input.processor_type}}"
fallback: default_worker
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "dynamic-selector");
if let FlowPattern::Delegate {
swarm,
worker_expr,
fallback,
input_mapping,
output_mapping,
failure_inherit,
} = &parsed.flow
{
assert!(swarm.is_none());
assert_eq!(worker_expr, &Some("{{input.processor_type}}".to_string()));
assert_eq!(fallback, &Some("default_worker".to_string()));
assert!(input_mapping.is_empty());
assert!(output_mapping.is_empty());
assert!(*failure_inherit);
} else {
panic!("Expected Delegate pattern");
}
}
#[test]
fn test_delegate_pattern_swarm_precedence_in_yaml() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: both-specified
workers: []
flow:
type: delegate
swarm: sub.yaml
worker_expr: "{{input.worker}}"
fallback: default
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
if let FlowPattern::Delegate {
swarm,
worker_expr,
fallback,
..
} = &parsed.flow
{
assert_eq!(swarm, &Some(PathBuf::from("sub.yaml")));
assert_eq!(worker_expr, &Some("{{input.worker}}".to_string()));
assert_eq!(fallback, &Some("default".to_string()));
} else {
panic!("Expected Delegate pattern");
}
}
#[test]
fn test_max_nesting_depth_constant() {
assert_eq!(MAX_NESTING_DEPTH, 10);
}
#[test]
fn test_nesting_depth_no_delegate() {
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let depth = swarm.nesting_depth().unwrap();
assert_eq!(depth, 0);
}
#[test]
fn test_validate_sequence_undefined_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["missing".into()],
},
);
let err = swarm.validate().unwrap_err();
assert!(
err.to_string().contains("sequence"),
"error should mention pattern type: {err}"
);
assert!(
err.to_string().contains("missing"),
"error should mention worker name: {err}"
);
}
#[test]
fn test_validate_sequence_valid_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Sequence {
steps: vec!["w1".into()],
},
)
.with_worker(Worker::new("w1", "w1.yaml"));
assert!(swarm.validate().is_ok());
}
#[test]
fn test_validate_parallel_undefined_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Parallel {
branches: vec!["w1".into(), "ghost".into()],
fail_fast: false,
},
)
.with_worker(Worker::new("w1", "w1.yaml"));
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("parallel"));
assert!(err.to_string().contains("ghost"));
}
#[test]
fn test_validate_conditional_undefined_then() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Conditional {
condition: "true".into(),
then: "missing_then".into(),
else_: None,
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("conditional"));
assert!(err.to_string().contains("missing_then"));
}
#[test]
fn test_validate_conditional_undefined_else() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Conditional {
condition: "true".into(),
then: "w1".into(),
else_: Some("missing_else".into()),
},
)
.with_worker(Worker::new("w1", "w1.yaml"));
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("conditional"));
assert!(err.to_string().contains("missing_else"));
}
#[test]
fn test_validate_loop_undefined_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Loop {
over: "items".into(),
do_: "missing_worker".into(),
max_iterations: 0,
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("loop"));
assert!(err.to_string().contains("missing_worker"));
}
#[test]
fn test_validate_delegate_worker_expr_skipped() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.processor_type}}".into()),
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
);
assert!(swarm.validate().is_ok());
}
#[test]
fn test_validate_delegate_fallback_undefined() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.worker}}".into()),
fallback: Some("missing_fallback".into()),
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("delegate"));
assert!(err.to_string().contains("missing_fallback"));
}
#[test]
fn test_validate_delegate_fallback_valid() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.worker}}".into()),
fallback: Some("default_worker".into()),
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
)
.with_worker(Worker::new("default_worker", "default.yaml"));
assert!(swarm.validate().is_ok());
}
#[test]
fn test_validate_supervisor_undefined_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Supervisor {
workers: vec!["w1".into(), "ghost".into()],
restart_policy: RestartPolicy::OnFailure,
recovery_policy: None,
},
)
.with_worker(Worker::new("w1", "w1.yaml"));
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("supervisor"));
assert!(err.to_string().contains("ghost"));
}
#[test]
fn test_validate_compete_undefined_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Compete {
workers: vec!["ghost".into()],
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("compete"));
assert!(err.to_string().contains("ghost"));
}
#[test]
fn test_validate_escalation_undefined_primary() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Escalation {
primary: "missing_primary".into(),
chain: vec![],
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("escalation"));
assert!(err.to_string().contains("missing_primary"));
}
#[test]
fn test_validate_escalation_undefined_chain_member() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Escalation {
primary: "w1".into(),
chain: vec!["w2".into(), "ghost".into()],
},
)
.with_worker(Worker::new("w1", "w1.yaml"))
.with_worker(Worker::new("w2", "w2.yaml"));
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("escalation"));
assert!(err.to_string().contains("ghost"));
}
#[test]
fn test_validate_alongside_undefined_main() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "missing_main".into(),
side: vec![],
},
);
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("alongside"));
assert!(err.to_string().contains("missing_main"));
}
#[test]
fn test_validate_alongside_undefined_side_worker() {
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "main_worker".into(),
side: vec!["side1".into(), "ghost".into()],
},
)
.with_worker(Worker::new("main_worker", "main.yaml"))
.with_worker(Worker::new("side1", "side1.yaml"));
let err = swarm.validate().unwrap_err();
assert!(err.to_string().contains("alongside"));
assert!(err.to_string().contains("ghost"));
}
#[test]
fn test_referenced_worker_names_all_patterns() {
let seq = FlowPattern::Sequence {
steps: vec!["a".into(), "b".into()],
};
assert_eq!(seq.referenced_worker_names(), vec!["a", "b"]);
let par = FlowPattern::Parallel {
branches: vec!["x".into(), "y".into()],
fail_fast: false,
};
assert_eq!(par.referenced_worker_names(), vec!["x", "y"]);
let cond = FlowPattern::Conditional {
condition: "true".into(),
then: "t".into(),
else_: Some("f".into()),
};
assert_eq!(cond.referenced_worker_names(), vec!["t", "f"]);
let cond_no_else = FlowPattern::Conditional {
condition: "true".into(),
then: "t".into(),
else_: None,
};
assert_eq!(cond_no_else.referenced_worker_names(), vec!["t"]);
let lp = FlowPattern::Loop {
over: "items".into(),
do_: "processor".into(),
max_iterations: 0,
};
assert_eq!(lp.referenced_worker_names(), vec!["processor"]);
let del_dynamic = FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.w}}".into()),
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
};
assert!(del_dynamic.referenced_worker_names().is_empty());
let del_fallback = FlowPattern::Delegate {
swarm: None,
worker_expr: Some("{{input.w}}".into()),
fallback: Some("fb".into()),
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
};
assert_eq!(del_fallback.referenced_worker_names(), vec!["fb"]);
let sup = FlowPattern::Supervisor {
workers: vec!["w1".into(), "w2".into()],
restart_policy: RestartPolicy::OnFailure,
recovery_policy: None,
};
assert_eq!(sup.referenced_worker_names(), vec!["w1", "w2"]);
let comp = FlowPattern::Compete {
workers: vec!["c1".into(), "c2".into()],
};
assert_eq!(comp.referenced_worker_names(), vec!["c1", "c2"]);
let esc = FlowPattern::Escalation {
primary: "p".into(),
chain: vec!["e1".into(), "e2".into()],
};
assert_eq!(esc.referenced_worker_names(), vec!["p", "e1", "e2"]);
let aln = FlowPattern::Alongside {
main: "m".into(),
side: vec!["s1".into(), "s2".into()],
};
assert_eq!(aln.referenced_worker_names(), vec!["m", "s1", "s2"]);
}
#[test]
fn test_recovery_policy_yaml_parsing() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: recovery-test
workers:
- name: main
spec: main.yaml
- name: backup
spec: backup.yaml
flow:
type: supervisor
workers: [main]
restart_policy: on_failure
recovery_policy:
retry_attempts: 5
replan_fallback: backup
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(parsed.id.as_str(), "recovery-test");
if let FlowPattern::Supervisor {
recovery_policy: Some(rp),
..
} = &parsed.flow
{
assert_eq!(rp.retry_attempts, 5);
assert_eq!(rp.replan_fallback, Some("backup".to_string()));
} else {
panic!("Expected Supervisor with recovery_policy");
}
}
#[test]
fn test_recovery_policy_default_retry_attempts() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: default-retry
workers:
- name: main
spec: main.yaml
flow:
type: supervisor
workers: [main]
recovery_policy: {}
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
if let FlowPattern::Supervisor {
recovery_policy: Some(rp),
..
} = &parsed.flow
{
assert_eq!(rp.retry_attempts, 3); } else {
panic!("Expected Supervisor with recovery_policy");
}
}
#[test]
fn test_recovery_policy_decompose_swarm() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: decompose-test
workers:
- name: main
spec: main.yaml
flow:
type: supervisor
workers: [main]
recovery_policy:
retry_attempts: 2
decompose_swarm: sub-tasks.yaml
decompose_input:
task: "{{input.query}}"
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
if let FlowPattern::Supervisor {
recovery_policy: Some(rp),
..
} = &parsed.flow
{
assert_eq!(rp.retry_attempts, 2);
assert_eq!(rp.decompose_swarm, Some(PathBuf::from("sub-tasks.yaml")));
assert_eq!(
rp.decompose_input.get("task"),
Some(&Value::String("{{input.query}}".to_string()))
);
} else {
panic!("Expected Supervisor with recovery_policy");
}
}
#[test]
fn test_supervisor_without_recovery_policy() {
let yaml = r#"
apiVersion: bzzz.dev/v1
kind: swarm
id: no-recovery
workers:
- name: main
spec: main.yaml
flow:
type: supervisor
workers: [main]
restart_policy: on_failure
"#;
let parsed: SwarmFile = serde_yaml::from_str(yaml).unwrap();
assert!(parsed.validate().is_ok());
if let FlowPattern::Supervisor {
recovery_policy,
..
} = &parsed.flow
{
assert!(recovery_policy.is_none());
} else {
panic!("Expected Supervisor pattern");
}
}
#[test]
fn test_is_simple_single_step_sequence() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Sequence {
steps: vec!["worker1".into()],
},
)
.with_worker(Worker::new("worker1", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_multi_step_sequence() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Sequence {
steps: vec!["w1".into(), "w2".into()],
},
)
.with_worker(Worker::new("w1", "w1.yaml"))
.with_worker(Worker::new("w2", "w2.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_single_worker_supervisor() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Supervisor {
workers: vec!["main".into()],
restart_policy: RestartPolicy::OnFailure,
recovery_policy: None,
},
)
.with_worker(Worker::new("main", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_multi_worker_supervisor() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Supervisor {
workers: vec!["w1".into(), "w2".into()],
restart_policy: RestartPolicy::OnFailure,
recovery_policy: None,
},
)
.with_worker(Worker::new("w1", "w1.yaml"))
.with_worker(Worker::new("w2", "w2.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_single_compete() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Compete {
workers: vec!["only".into()],
},
)
.with_worker(Worker::new("only", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_multi_compete() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Compete {
workers: vec!["w1".into(), "w2".into()],
},
)
.with_worker(Worker::new("w1", "w1.yaml"))
.with_worker(Worker::new("w2", "w2.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_single_branch_parallel() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Parallel {
branches: vec!["only".into()],
fail_fast: false,
},
)
.with_worker(Worker::new("only", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_multi_branch_parallel() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Parallel {
branches: vec!["w1".into(), "w2".into()],
fail_fast: false,
},
)
.with_worker(Worker::new("w1", "w1.yaml"))
.with_worker(Worker::new("w2", "w2.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_escalation_no_chain() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Escalation {
primary: "main".into(),
chain: vec![],
},
)
.with_worker(Worker::new("main", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_escalation_with_chain() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Escalation {
primary: "main".into(),
chain: vec!["backup".into()],
},
)
.with_worker(Worker::new("main", "agent.yaml"))
.with_worker(Worker::new("backup", "backup.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_alongside_no_side() {
let swarm = SwarmFile::new(
"simple",
FlowPattern::Alongside {
main: "worker1".into(),
side: vec![],
},
)
.with_worker(Worker::new("worker1", "agent.yaml"));
assert!(swarm.is_simple());
}
#[test]
fn test_is_simple_alongside_with_side() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Alongside {
main: "main".into(),
side: vec!["monitor".into()],
},
)
.with_worker(Worker::new("main", "agent.yaml"))
.with_worker(Worker::new("monitor", "monitor.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_conditional_always_complex() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Conditional {
condition: "true".into(),
then: "worker1".into(),
else_: None,
},
)
.with_worker(Worker::new("worker1", "agent.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_loop_always_complex() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Loop {
over: "items".into(),
do_: "worker1".into(),
max_iterations: 0,
},
)
.with_worker(Worker::new("worker1", "agent.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_delegate_always_complex() {
let swarm = SwarmFile::new(
"complex",
FlowPattern::Delegate {
swarm: Some(PathBuf::from("child.yaml")),
worker_expr: None,
fallback: None,
input_mapping: HashMap::new(),
output_mapping: HashMap::new(),
failure_inherit: true,
},
)
.with_worker(Worker::new("caller", "caller.yaml"));
assert!(!swarm.is_simple());
}
#[test]
fn test_is_simple_no_workers() {
let swarm = SwarmFile::new("empty", FlowPattern::Sequence { steps: vec![] });
assert!(!swarm.is_simple());
}
}