mod builder;
mod graph;
mod metadata;
mod registry;
pub use builder::WorkflowBuilder;
pub use graph::DependencyGraph;
pub use metadata::WorkflowMetadata;
pub use registry::{
get_all_workflows, global_workflow_registry, register_workflow_constructor,
GlobalWorkflowRegistry, WorkflowConstructor, GLOBAL_WORKFLOW_REGISTRY,
};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::error::{SubgraphError, ValidationError, WorkflowError};
use crate::task::{Task, TaskNamespace};
#[derive(Clone)]
pub struct Workflow {
name: String,
pub(crate) tenant: String,
package: String,
tasks: HashMap<TaskNamespace, Arc<dyn Task>>,
dependency_graph: DependencyGraph,
metadata: WorkflowMetadata,
}
impl std::fmt::Debug for Workflow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Workflow")
.field("name", &self.name)
.field("tenant", &self.tenant)
.field("package", &self.package)
.field("task_count", &self.tasks.len())
.field("dependency_graph", &self.dependency_graph)
.field("metadata", &self.metadata)
.finish()
}
}
impl Workflow {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
tenant: "public".to_string(),
package: "embedded".to_string(),
tasks: HashMap::new(),
dependency_graph: DependencyGraph::new(),
metadata: WorkflowMetadata::default(),
}
}
pub fn builder(name: &str) -> WorkflowBuilder {
WorkflowBuilder::new(name)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn tenant(&self) -> &str {
&self.tenant
}
pub fn set_tenant(&mut self, tenant: &str) {
self.tenant = tenant.to_string();
}
pub fn package(&self) -> &str {
&self.package
}
pub fn set_package(&mut self, package: &str) {
self.package = package.to_string();
}
pub fn metadata(&self) -> &WorkflowMetadata {
&self.metadata
}
pub fn set_version(&mut self, version: &str) {
self.metadata.version = version.to_string();
}
pub fn set_description(&mut self, description: &str) {
self.metadata.description = Some(description.to_string());
}
pub fn add_tag(&mut self, key: &str, value: &str) {
self.metadata
.tags
.insert(key.to_string(), value.to_string());
}
pub fn remove_tag(&mut self, key: &str) -> Option<String> {
self.metadata.tags.remove(key)
}
pub fn add_task(&mut self, task: Arc<dyn Task>) -> Result<(), WorkflowError> {
let task_namespace = TaskNamespace::new(&self.tenant, &self.package, &self.name, task.id());
if self.tasks.contains_key(&task_namespace) {
return Err(WorkflowError::DuplicateTask(task_namespace.to_string()));
}
self.dependency_graph.add_node(task_namespace.clone());
for dep in task.dependencies() {
self.dependency_graph
.add_edge(task_namespace.clone(), dep.clone());
}
self.tasks.insert(task_namespace, task);
Ok(())
}
pub fn remove_task(&mut self, namespace: &TaskNamespace) -> Option<Arc<dyn Task>> {
self.dependency_graph.remove_node(namespace);
self.tasks.remove(namespace)
}
pub fn remove_dependency(&mut self, from_task: &TaskNamespace, to_task: &TaskNamespace) {
self.dependency_graph.remove_edge(from_task, to_task);
}
pub fn validate(&self) -> Result<(), ValidationError> {
if self.tasks.is_empty() {
return Err(ValidationError::EmptyWorkflow);
}
for (task_namespace, task) in &self.tasks {
for dependency in task.dependencies() {
if !self.tasks.contains_key(dependency) {
return Err(ValidationError::MissingDependency {
task: task_namespace.to_string(),
dependency: dependency.to_string(),
});
}
}
}
if self.dependency_graph.has_cycles() {
let cycle = self
.dependency_graph
.find_cycle()
.unwrap_or_default()
.into_iter()
.map(|ns| ns.to_string())
.collect();
return Err(ValidationError::CyclicDependency { cycle });
}
Ok(())
}
pub fn topological_sort(&self) -> Result<Vec<TaskNamespace>, ValidationError> {
self.validate()?;
self.dependency_graph.topological_sort()
}
pub fn get_task(&self, namespace: &TaskNamespace) -> Result<Arc<dyn Task>, WorkflowError> {
self.tasks
.get(namespace)
.cloned()
.ok_or_else(|| WorkflowError::TaskNotFound(namespace.to_string()))
}
pub fn get_dependencies(
&self,
namespace: &TaskNamespace,
) -> Result<&[TaskNamespace], WorkflowError> {
self.tasks
.get(namespace)
.map(|task| task.dependencies())
.ok_or_else(|| WorkflowError::TaskNotFound(namespace.to_string()))
}
pub fn get_dependents(
&self,
namespace: &TaskNamespace,
) -> Result<Vec<TaskNamespace>, WorkflowError> {
if !self.tasks.contains_key(namespace) {
return Err(WorkflowError::TaskNotFound(namespace.to_string()));
}
Ok(self.dependency_graph.get_dependents(namespace))
}
pub fn subgraph(&self, task_namespaces: &[&TaskNamespace]) -> Result<Workflow, SubgraphError> {
let mut subgraph_tasks = HashSet::new();
for &task_namespace in task_namespaces {
if !self.tasks.contains_key(task_namespace) {
return Err(SubgraphError::TaskNotFound(task_namespace.to_string()));
}
self.collect_dependencies(task_namespace, &mut subgraph_tasks);
}
let mut workflow = Workflow::new(&format!("{}-subgraph", self.name));
workflow.metadata = self.metadata.clone();
for task_namespace in &subgraph_tasks {
if let Some(task) = self.tasks.get(task_namespace) {
workflow.tasks.insert(task_namespace.clone(), task.clone());
workflow.dependency_graph.add_node(task_namespace.clone());
for dep in task.dependencies() {
if subgraph_tasks.contains(dep) {
workflow
.dependency_graph
.add_edge(task_namespace.clone(), dep.clone());
}
}
} else {
return Err(SubgraphError::TaskNotFound(task_namespace.to_string()));
}
}
Ok(workflow)
}
fn collect_dependencies(
&self,
task_namespace: &TaskNamespace,
collected: &mut HashSet<TaskNamespace>,
) {
if collected.contains(task_namespace) {
return;
}
collected.insert(task_namespace.clone());
if let Some(task) = self.tasks.get(task_namespace) {
for dep in task.dependencies() {
self.collect_dependencies(dep, collected);
}
}
}
pub fn get_execution_levels(&self) -> Result<Vec<Vec<TaskNamespace>>, ValidationError> {
let sorted = self.topological_sort()?;
let mut levels = Vec::new();
let mut remaining: HashSet<TaskNamespace> = sorted.into_iter().collect();
let mut completed = HashSet::new();
while !remaining.is_empty() {
let mut current_level = Vec::new();
for task_namespace in &remaining {
if let Some(task) = self.tasks.get(task_namespace) {
let all_deps_done = task
.dependencies()
.iter()
.all(|dep| completed.contains(dep));
if all_deps_done {
current_level.push(task_namespace.clone());
}
}
}
for task_namespace in ¤t_level {
remaining.remove(task_namespace);
completed.insert(task_namespace.clone());
}
levels.push(current_level);
}
Ok(levels)
}
pub fn get_roots(&self) -> Vec<TaskNamespace> {
self.tasks
.iter()
.filter_map(|(namespace, task)| {
if task.dependencies().is_empty() {
Some(namespace.clone())
} else {
None
}
})
.collect()
}
pub fn get_leaves(&self) -> Vec<TaskNamespace> {
let all_dependencies: HashSet<TaskNamespace> = self
.tasks
.values()
.flat_map(|task| task.dependencies().iter().cloned())
.collect();
self.tasks
.keys()
.filter(|&namespace| !all_dependencies.contains(namespace))
.cloned()
.collect()
}
pub fn can_run_parallel(&self, task_a: &TaskNamespace, task_b: &TaskNamespace) -> bool {
!self.has_path(task_a, task_b) && !self.has_path(task_b, task_a)
}
fn has_path(&self, from: &TaskNamespace, to: &TaskNamespace) -> bool {
if from == to {
return true;
}
let mut visited = HashSet::new();
let mut stack = vec![from];
while let Some(current) = stack.pop() {
if visited.contains(current) {
continue;
}
visited.insert(current);
if let Some(task) = self.tasks.get(current) {
for dep in task.dependencies() {
if dep == to {
return true;
}
stack.push(dep);
}
}
}
false
}
pub fn calculate_version(&self) -> String {
let mut hasher = DefaultHasher::new();
self.hash_topology(&mut hasher);
self.hash_task_definitions(&mut hasher);
self.hash_configuration(&mut hasher);
format!("{:016x}", hasher.finish())
}
fn hash_topology(&self, hasher: &mut DefaultHasher) {
let mut task_ids: Vec<_> = self.tasks.keys().collect();
task_ids.sort();
for task_id in task_ids {
task_id.hash(hasher);
let mut deps: Vec<_> = self.tasks[task_id].dependencies().to_vec();
deps.sort();
deps.hash(hasher);
}
}
fn hash_task_definitions(&self, hasher: &mut DefaultHasher) {
let mut task_ids: Vec<_> = self.tasks.keys().collect();
task_ids.sort();
for task_id in task_ids {
let task = &self.tasks[task_id];
task.id().hash(hasher);
task.dependencies().hash(hasher);
if let Some(code_hash) = self.get_task_code_hash(task_id) {
code_hash.hash(hasher);
}
}
}
fn hash_configuration(&self, hasher: &mut DefaultHasher) {
self.name.hash(hasher);
self.tenant.hash(hasher);
self.metadata.description.hash(hasher);
let mut tags: Vec<_> = self.metadata.tags.iter().collect();
tags.sort_by_key(|(k, _)| *k);
tags.hash(hasher);
}
fn get_task_code_hash(&self, task_namespace: &TaskNamespace) -> Option<String> {
self.tasks
.get(task_namespace)
.and_then(|task| task.code_fingerprint())
}
pub fn get_task_ids(&self) -> Vec<TaskNamespace> {
self.tasks.keys().cloned().collect()
}
pub fn recreate_from_registry(&self) -> Result<Workflow, WorkflowError> {
let mut new_workflow = Workflow::new(&self.name);
new_workflow.metadata.description = self.metadata.description.clone();
new_workflow.metadata.tags = self.metadata.tags.clone();
new_workflow.metadata.created_at = self.metadata.created_at;
let registry = crate::task::global_task_registry();
let guard = registry.write();
for task_namespace in self.get_task_ids() {
let constructor = guard.get(&task_namespace).ok_or_else(|| {
WorkflowError::TaskNotFound(format!(
"Task '{}' not found in global registry during workflow recreation",
task_namespace
))
})?;
let task = constructor();
new_workflow.add_task(task).map_err(|e| {
WorkflowError::TaskError(format!(
"Failed to add task '{}' during recreation: {}",
task_namespace, e
))
})?;
}
new_workflow.validate().map_err(|e| {
WorkflowError::ValidationError(format!("Recreated workflow validation failed: {}", e))
})?;
Ok(new_workflow.finalize())
}
pub fn finalize(mut self) -> Self {
let version = self.calculate_version();
self.metadata.version = version;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::Context;
use crate::error::TaskError;
use crate::init_test_logging;
use async_trait::async_trait;
struct TestTask {
id: String,
dependencies: Vec<TaskNamespace>,
fingerprint: Option<String>,
}
impl TestTask {
fn new(id: &str, dependencies: Vec<TaskNamespace>) -> Self {
Self {
id: id.to_string(),
dependencies,
fingerprint: None,
}
}
fn with_fingerprint(mut self, fingerprint: &str) -> Self {
self.fingerprint = Some(fingerprint.to_string());
self
}
}
#[async_trait]
impl Task for TestTask {
async fn execute(
&self,
context: Context<serde_json::Value>,
) -> Result<Context<serde_json::Value>, TaskError> {
Ok(context)
}
fn id(&self) -> &str {
&self.id
}
fn dependencies(&self) -> &[TaskNamespace] {
&self.dependencies
}
fn code_fingerprint(&self) -> Option<String> {
self.fingerprint.clone()
}
}
#[test]
fn test_workflow_creation() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
assert_eq!(workflow.name(), "test-workflow");
assert_eq!(workflow.metadata().version, "");
}
#[test]
fn test_workflow_add_task() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let task = TestTask::new("task1", vec![]);
let task_namespace = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
assert!(workflow.add_task(Arc::new(task)).is_ok());
assert!(workflow.get_task(&task_namespace).is_ok());
}
#[test]
fn test_workflow_validation() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![ns1]);
workflow.add_task(Arc::new(task1)).unwrap();
workflow.add_task(Arc::new(task2)).unwrap();
assert!(workflow.validate().is_ok());
}
#[test]
fn test_workflow_cycle_detection() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let task1 = TestTask::new("task1", vec![ns2]);
let task2 = TestTask::new("task2", vec![ns1]);
workflow.add_task(Arc::new(task1)).unwrap();
workflow.add_task(Arc::new(task2)).unwrap();
assert!(matches!(
workflow.validate(),
Err(ValidationError::CyclicDependency { .. })
));
}
#[test]
fn test_workflow_topological_sort() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let _ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![ns1.clone()]);
let task3 = TestTask::new("task3", vec![ns1.clone(), ns2.clone()]);
workflow.add_task(Arc::new(task1)).unwrap();
workflow.add_task(Arc::new(task2)).unwrap();
workflow.add_task(Arc::new(task3)).unwrap();
let sorted = workflow.topological_sort().unwrap();
let pos1 = sorted.iter().position(|x| x.task_id == "task1").unwrap();
let pos2 = sorted.iter().position(|x| x.task_id == "task2").unwrap();
let pos3 = sorted.iter().position(|x| x.task_id == "task3").unwrap();
assert!(pos1 < pos2);
assert!(pos1 < pos3);
assert!(pos2 < pos3);
}
#[test]
fn test_workflow_builder_auto_versioning() {
init_test_logging();
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![ns1]);
let workflow = Workflow::builder("test-workflow")
.description("Test Workflow with auto-versioning")
.tag("env", "test")
.add_task(Arc::new(task1))
.unwrap()
.add_task(Arc::new(task2))
.unwrap()
.validate()
.unwrap()
.build()
.unwrap();
assert_eq!(workflow.name(), "test-workflow");
assert!(!workflow.metadata().version.is_empty());
assert_ne!(workflow.metadata().version, "1.0"); assert_eq!(
workflow.metadata().description,
Some("Test Workflow with auto-versioning".to_string())
);
assert_eq!(
workflow.metadata().tags.get("env"),
Some(&"test".to_string())
);
}
#[test]
fn test_execution_levels() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![]);
let task3 = TestTask::new("task3", vec![ns1.clone(), ns2.clone()]);
let task4 = TestTask::new("task4", vec![ns3]);
workflow.add_task(Arc::new(task1)).unwrap();
workflow.add_task(Arc::new(task2)).unwrap();
workflow.add_task(Arc::new(task3)).unwrap();
workflow.add_task(Arc::new(task4)).unwrap();
let levels = workflow.get_execution_levels().unwrap();
assert_eq!(levels[0].len(), 2);
assert!(levels[0].contains(&ns1));
assert!(levels[0].contains(&ns2));
assert_eq!(levels[1].len(), 1);
let expected_ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
assert!(levels[1].contains(&expected_ns3));
assert_eq!(levels[2].len(), 1);
let expected_ns4 = TaskNamespace::new("public", "embedded", "test-workflow", "task4");
assert!(levels[2].contains(&expected_ns4));
}
#[test]
fn test_workflow_version_consistency() {
init_test_logging();
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![ns1]);
let workflow1 = Workflow::builder("test-workflow")
.description("Test Workflow")
.add_task(Arc::new(task1))
.unwrap()
.add_task(Arc::new(task2))
.unwrap()
.build()
.unwrap();
let ns1_copy = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1_copy = TestTask::new("task1", vec![]);
let task2_copy = TestTask::new("task2", vec![ns1_copy]);
let workflow2 = Workflow::builder("test-workflow")
.description("Test Workflow")
.add_task(Arc::new(task1_copy))
.unwrap()
.add_task(Arc::new(task2_copy))
.unwrap()
.build()
.unwrap();
assert_eq!(workflow1.metadata().version, workflow2.metadata().version);
}
#[test]
fn test_workflow_version_changes() {
init_test_logging();
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = TestTask::new("task1", vec![]);
let task2 = TestTask::new("task2", vec![ns1]);
let workflow1 = Workflow::builder("test-workflow")
.description("Original description")
.add_task(Arc::new(task1))
.unwrap()
.add_task(Arc::new(task2))
.unwrap()
.build()
.unwrap();
let ns1_copy = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1_copy = TestTask::new("task1", vec![]);
let task2_copy = TestTask::new("task2", vec![ns1_copy]);
let workflow2 = Workflow::builder("test-workflow")
.description("Changed description") .add_task(Arc::new(task1_copy))
.unwrap()
.add_task(Arc::new(task2_copy))
.unwrap()
.build()
.unwrap();
assert_ne!(workflow1.metadata().version, workflow2.metadata().version);
}
#[test]
fn test_workflow_finalize() {
init_test_logging();
let mut workflow = Workflow::new("my-workflow");
let task1 = TestTask::new("task1", vec![]);
workflow.add_task(Arc::new(task1)).unwrap();
assert!(workflow.metadata().version.is_empty());
let finalized_workflow = workflow.finalize();
assert!(!finalized_workflow.metadata().version.is_empty());
assert_eq!(finalized_workflow.metadata().version.len(), 16); }
#[test]
fn test_workflow_version_with_code_fingerprints() {
init_test_logging();
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = TestTask::new("task1", vec![]).with_fingerprint("fingerprint1");
let task2 = TestTask::new("task2", vec![ns1]).with_fingerprint("fingerprint2");
let workflow1 = Workflow::builder("test-workflow")
.description("Test workflow")
.add_task(Arc::new(task1))
.unwrap()
.add_task(Arc::new(task2))
.unwrap()
.build()
.unwrap();
let ns1_diff = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1_diff = TestTask::new("task1", vec![]).with_fingerprint("different_fingerprint");
let task2_same = TestTask::new("task2", vec![ns1_diff]).with_fingerprint("fingerprint2");
let workflow2 = Workflow::builder("test-workflow")
.description("Test workflow")
.add_task(Arc::new(task1_diff))
.unwrap()
.add_task(Arc::new(task2_same))
.unwrap()
.build()
.unwrap();
assert_ne!(workflow1.metadata().version, workflow2.metadata().version);
}
#[test]
fn test_workflow_removal_methods() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_tag("env", "test");
workflow.add_tag("team", "eng");
assert!(workflow.get_task(&ns1).is_ok());
let removed_task = workflow.remove_task(&ns1);
assert!(removed_task.is_some());
assert!(workflow.get_task(&ns1).is_err());
assert_eq!(
workflow.metadata().tags.get("env"),
Some(&"test".to_string())
);
let removed_tag = workflow.remove_tag("env");
assert_eq!(removed_tag, Some("test".to_string()));
assert!(!workflow.metadata().tags.contains_key("env"));
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
workflow.remove_dependency(&ns2, &ns1);
}
#[test]
fn test_get_task_found() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task = Arc::new(TestTask::new("task1", vec![]));
workflow.add_task(task).unwrap();
let result = workflow.get_task(&ns1);
assert!(result.is_ok());
assert_eq!(result.unwrap().id(), "task1");
}
#[test]
fn test_get_task_not_found() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "nonexistent");
let result = workflow.get_task(&ns_missing);
assert!(matches!(result, Err(WorkflowError::TaskNotFound(_))));
}
#[test]
fn test_get_dependencies_with_deps() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
let deps = workflow.get_dependencies(&ns2).unwrap();
assert_eq!(deps.len(), 1);
assert_eq!(deps[0], ns1);
}
#[test]
fn test_get_dependencies_no_deps() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = Arc::new(TestTask::new("task1", vec![]));
workflow.add_task(task1).unwrap();
let deps = workflow.get_dependencies(&ns1).unwrap();
assert!(deps.is_empty());
}
#[test]
fn test_get_dependencies_task_not_found() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "nonexistent");
let result = workflow.get_dependencies(&ns_missing);
assert!(result.is_err());
}
#[test]
fn test_remove_task_returns_task() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = Arc::new(TestTask::new("task1", vec![]));
workflow.add_task(task1).unwrap();
let removed = workflow.remove_task(&ns1);
assert!(removed.is_some());
assert_eq!(removed.unwrap().id(), "task1");
assert!(workflow.get_task(&ns1).is_err());
}
#[test]
fn test_remove_task_nonexistent_returns_none() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "nonexistent");
let removed = workflow.remove_task(&ns_missing);
assert!(removed.is_none());
}
#[test]
fn test_remove_task_cleans_up_edges() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.remove_task(&ns1);
assert!(workflow.get_task(&ns2).is_ok());
assert!(workflow.get_task(&ns1).is_err());
}
#[test]
fn test_remove_dependency() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.remove_dependency(&ns2, &ns1);
assert!(workflow.get_task(&ns1).is_ok());
assert!(workflow.get_task(&ns2).is_ok());
}
#[test]
fn test_get_roots() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![]));
let task3 = Arc::new(TestTask::new("task3", vec![ns1.clone(), ns2.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_task(task3).unwrap();
let roots = workflow.get_roots();
assert_eq!(roots.len(), 2);
assert!(roots.contains(&ns1));
assert!(roots.contains(&ns2));
}
#[test]
fn test_get_leaves() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
let task3 = Arc::new(TestTask::new("task3", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_task(task3).unwrap();
let leaves = workflow.get_leaves();
assert_eq!(leaves.len(), 2);
assert!(leaves.contains(&ns2));
assert!(leaves.contains(&ns3));
}
#[test]
fn test_get_roots_single_task() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "only");
let task = Arc::new(TestTask::new("only", vec![]));
workflow.add_task(task).unwrap();
let roots = workflow.get_roots();
assert_eq!(roots.len(), 1);
assert!(roots.contains(&ns1));
}
#[test]
fn test_get_leaves_single_task() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "only");
let task = Arc::new(TestTask::new("only", vec![]));
workflow.add_task(task).unwrap();
let leaves = workflow.get_leaves();
assert_eq!(leaves.len(), 1);
assert!(leaves.contains(&ns1));
}
#[test]
fn test_validate_success() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
assert!(workflow.validate().is_ok());
}
#[test]
fn test_validate_empty_workflow() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
let result = workflow.validate();
assert!(matches!(result, Err(ValidationError::EmptyWorkflow)));
}
#[test]
fn test_validate_missing_dependency() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "missing");
let task1 = Arc::new(TestTask::new("task1", vec![ns_missing]));
workflow.add_task(task1).unwrap();
let result = workflow.validate();
assert!(matches!(
result,
Err(ValidationError::MissingDependency { .. })
));
}
#[test]
fn test_get_dependents() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
let task3 = Arc::new(TestTask::new("task3", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_task(task3).unwrap();
let dependents = workflow.get_dependents(&ns1).unwrap();
assert_eq!(dependents.len(), 2);
assert!(dependents.contains(&ns2));
assert!(dependents.contains(&ns3));
}
#[test]
fn test_get_dependents_task_not_found() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "nonexistent");
let result = workflow.get_dependents(&ns_missing);
assert!(result.is_err());
}
#[test]
fn test_can_run_parallel() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![]));
let task3 = Arc::new(TestTask::new("task3", vec![ns1.clone()]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_task(task3).unwrap();
assert!(workflow.can_run_parallel(&ns1, &ns2));
assert!(!workflow.can_run_parallel(&ns1, &ns3));
assert!(workflow.can_run_parallel(&ns2, &ns3));
}
#[test]
fn test_duplicate_task_rejected() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let task1a = Arc::new(TestTask::new("task1", vec![]));
let task1b = Arc::new(TestTask::new("task1", vec![]));
workflow.add_task(task1a).unwrap();
let result = workflow.add_task(task1b);
assert!(matches!(result, Err(WorkflowError::DuplicateTask(_))));
}
#[test]
fn test_subgraph() {
init_test_logging();
let mut workflow = Workflow::new("test-workflow");
let ns1 = TaskNamespace::new("public", "embedded", "test-workflow", "task1");
let ns2 = TaskNamespace::new("public", "embedded", "test-workflow", "task2");
let ns3 = TaskNamespace::new("public", "embedded", "test-workflow", "task3");
let task1 = Arc::new(TestTask::new("task1", vec![]));
let task2 = Arc::new(TestTask::new("task2", vec![ns1.clone()]));
let task3 = Arc::new(TestTask::new("task3", vec![]));
workflow.add_task(task1).unwrap();
workflow.add_task(task2).unwrap();
workflow.add_task(task3).unwrap();
let sub = workflow.subgraph(&[&ns2]).unwrap();
let ids = sub.get_task_ids();
assert_eq!(ids.len(), 2);
assert!(!ids.contains(&ns3));
}
#[test]
fn test_subgraph_task_not_found() {
init_test_logging();
let workflow = Workflow::new("test-workflow");
let ns_missing = TaskNamespace::new("public", "embedded", "test-workflow", "nonexistent");
let result = workflow.subgraph(&[&ns_missing]);
assert!(matches!(result, Err(SubgraphError::TaskNotFound(_))));
}
}