use async_trait::async_trait;
use cloacina::executor::WorkflowExecutor;
use cloacina::runner::{DefaultRunner, DefaultRunnerConfig};
use cloacina::*;
use serde_json::Value;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use crate::fixtures::get_or_init_fixture;
#[derive(Debug)]
struct WorkflowTask {
id: String,
dependencies: Vec<TaskNamespace>,
}
impl WorkflowTask {
fn new(id: &str, deps: Vec<&str>) -> Self {
Self {
id: id.to_string(),
dependencies: deps
.into_iter()
.map(|s| TaskNamespace::from_string(s).unwrap())
.collect(),
}
}
}
#[async_trait]
impl Task for WorkflowTask {
async fn execute(
&self,
context: Context<serde_json::Value>,
) -> Result<Context<serde_json::Value>, TaskError> {
Ok(context) }
fn id(&self) -> &str {
&self.id
}
fn dependencies(&self) -> &[TaskNamespace] {
&self.dependencies
}
}
#[task(
id = "test_task",
dependencies = []
)]
async fn test_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("result", Value::String("success".to_string()))?;
Ok(())
}
#[task(
id = "producer_task",
dependencies = []
)]
async fn producer_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("shared_data", Value::String("important_value".to_string()))?;
Ok(())
}
#[task(
id = "consumer_task",
dependencies = ["producer_task"]
)]
async fn consumer_task(context: &mut Context<Value>) -> Result<(), TaskError> {
if let Some(value) = context.get("shared_data") {
context.insert(
"derived_from_shared_data",
Value::String(format!("Processed: {}", value)),
)?;
} else {
return Err(TaskError::Unknown {
task_id: "consumer_task".to_string(),
message: "Dependency key 'shared_data' not found".to_string(),
});
}
Ok(())
}
#[task(
id = "timeout_task_test",
dependencies = [],
retry_attempts = 1
)]
async fn timeout_task_test(_context: &mut Context<Value>) -> Result<(), TaskError> {
time::sleep(Duration::from_secs(10)).await;
Ok(())
}
#[tokio::test]
async fn test_task_executor_basic_execution() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow = Workflow::builder("test_pipeline_basic")
.description("Test workflow for executor")
.add_task(Arc::new(WorkflowTask::new("test_task", vec![])))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"test_task",
);
runtime.register_task(namespace, || {
Arc::new(test_task_task()) as Arc<dyn cloacina::Task>
});
runtime.register_workflow("test_pipeline_basic".to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("test_data", Value::String("test_value".to_string()))
.unwrap();
let execution = runner
.execute_async("test_pipeline_basic", input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"task should complete",
|| {
let dal = dal.clone();
async move {
let tasks = dal
.task_execution()
.get_all_tasks_for_workflow(UniversalUuid(exec_id))
.await
.unwrap_or_default();
tasks.len() == 1 && tasks[0].status == "Completed"
}
},
)
.await;
let task_executions = dal
.task_execution()
.get_all_tasks_for_workflow(UniversalUuid(exec_id))
.await
.unwrap();
assert_eq!(task_executions.len(), 1);
let task = &task_executions[0];
assert_eq!(task.status, "Completed");
let expected_task_name = format!(
"{}::{}::{}::test_task",
workflow.tenant(),
workflow.package(),
workflow.name()
);
assert_eq!(task.task_name, expected_task_name);
runner.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_task_executor_dependency_loading() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"dependency_pipeline_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let producer_ns = TaskNamespace::new("public", "embedded", &workflow_name, "producer_task");
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow with dependencies")
.add_task(Arc::new(producer_task_task()))
.unwrap()
.add_task(Arc::new(
consumer_task_task().with_dependencies(vec![producer_ns.clone()]),
))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace1 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"producer_task",
);
runtime.register_task(namespace1, || {
Arc::new(producer_task_task()) as Arc<dyn cloacina::Task>
});
let namespace2 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"consumer_task",
);
let producer_ns_for_closure = producer_ns.clone();
runtime.register_task(namespace2, move || {
Arc::new(consumer_task_task().with_dependencies(vec![producer_ns_for_closure.clone()]))
as Arc<dyn cloacina::Task>
});
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("initial_data", Value::String("test_value".to_string()))
.unwrap();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
let consumer_namespace_for_poll = cloacina::TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"consumer_task",
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"consumer task should complete",
|| {
let dal = dal.clone();
let ns = consumer_namespace_for_poll.clone();
async move {
let meta = dal
.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &ns)
.await;
meta.is_ok()
}
},
)
.await;
let consumer_namespace = cloacina::TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"consumer_task",
);
let consumer_metadata = dal
.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &consumer_namespace)
.await
.unwrap();
let context_data: std::collections::HashMap<String, Value> =
if let Some(context_id) = consumer_metadata.context_id {
let context = dal
.context()
.read::<serde_json::Value>(context_id)
.await
.unwrap();
context.data().clone()
} else {
std::collections::HashMap::new()
};
assert!(
context_data.contains_key("derived_from_shared_data"),
"Consumer task should have processed dependency data"
);
if let Some(derived_value) = context_data.get("derived_from_shared_data") {
assert_eq!(
derived_value,
&Value::String("Processed: \"important_value\"".to_string()),
"Derived value should contain processed dependency data"
);
}
runner.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_task_executor_timeout_handling() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"timeout_pipeline_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow with timeout")
.add_task(Arc::new(WorkflowTask::new("timeout_task_test", vec![])))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"timeout_task_test",
);
runtime.register_task(namespace, || {
Arc::new(timeout_task_test_task()) as Arc<dyn cloacina::Task>
});
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let config = DefaultRunnerConfig::builder()
.task_timeout(Duration::from_millis(500))
.build()
.unwrap();
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.with_config(config)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("test_data", Value::String("timeout_test".to_string()))
.unwrap();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
let full_task_name = format!(
"{}::{}::{}::timeout_task_test",
workflow.tenant(),
workflow.package(),
workflow.name()
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"task should fail due to timeout",
|| {
let dal = dal.clone();
let name = full_task_name.clone();
async move {
let status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &name)
.await
.unwrap_or_default();
status == "Failed"
}
},
)
.await;
let task_status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &full_task_name)
.await
.unwrap();
assert_eq!(
task_status, "Failed",
"Task should have failed due to timeout"
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"workflow execution should be marked as terminal (Failed or Completed)",
|| {
let dal = dal.clone();
async move {
if let Ok(exec) = dal
.workflow_execution()
.get_by_id(UniversalUuid(exec_id))
.await
{
exec.status == "Failed" || exec.status == "Completed"
} else {
false
}
}
},
)
.await;
let wf_exec = dal
.workflow_execution()
.get_by_id(UniversalUuid(exec_id))
.await
.unwrap();
assert_eq!(
wf_exec.status, "Failed",
"Workflow execution with failed task(s) must be marked Failed, not Completed"
);
runner.shutdown().await.unwrap();
}
#[task(
id = "unified_task_test",
dependencies = []
)]
async fn unified_task_test(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("result", Value::String("unified_success".to_string()))?;
Ok(())
}
#[tokio::test]
async fn test_default_runner_execution() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow = Workflow::builder("unified_pipeline_test")
.description("Test workflow for unified mode")
.add_task(Arc::new(WorkflowTask::new("unified_task_test", vec![])))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"unified_task_test",
);
runtime.register_task(namespace, || {
Arc::new(unified_task_test_task()) as Arc<dyn cloacina::Task>
});
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("engine_test", Value::String("unified_mode".to_string()))
.unwrap();
let execution = runner
.execute_async("unified_pipeline_test", input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
let task_namespace = cloacina::TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"unified_task_test",
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"unified task should be processed",
|| {
let dal = dal.clone();
let ns = task_namespace.clone();
async move {
dal.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &ns)
.await
.is_ok()
}
},
)
.await;
let task_metadata = dal
.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &task_namespace)
.await;
match task_metadata {
Ok(metadata) => {
if let Some(context_id) = metadata.context_id {
let context = dal
.context()
.read::<serde_json::Value>(context_id)
.await
.unwrap();
let context_data = context.data();
assert!(
context_data.contains_key("result"),
"Task output should be present"
);
} else {
println!("Task completed but produced no output context");
}
}
Err(_) => {
let full_task_name = format!(
"{}::{}::{}::unified_task_test",
workflow.tenant(),
workflow.package(),
workflow.name()
);
let task_status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &full_task_name)
.await
.unwrap();
assert_ne!(task_status, "Pending", "Task should have been processed");
}
}
runner.shutdown().await.unwrap();
}
#[task(
id = "initial_context_task_test",
dependencies = []
)]
async fn initial_context_task_test(context: &mut Context<Value>) -> Result<(), TaskError> {
let initial_value = context
.get("initial_data")
.ok_or_else(|| TaskError::ValidationFailed {
message: "No initial_data found in context".to_string(),
})?;
context.insert(
"processed_initial",
Value::String(format!("Processed: {}", initial_value)),
)?;
Ok(())
}
#[tokio::test]
async fn test_task_executor_context_loading_no_dependencies() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"initial_context_pipeline_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow for initial context loading")
.add_task(Arc::new(WorkflowTask::new(
"initial_context_task_test",
vec![],
)))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"initial_context_task_test",
);
runtime.register_task(namespace, || {
Arc::new(initial_context_task_test_task()) as Arc<dyn cloacina::Task>
});
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("initial_data", Value::String("hello_world".to_string()))
.unwrap();
input_context
.insert("config_value", Value::Number(serde_json::Number::from(42)))
.unwrap();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
let full_task_name = format!(
"{}::{}::{}::initial_context_task_test",
workflow.tenant(),
workflow.package(),
workflow.name()
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"initial_context_task should complete",
|| {
let dal = dal.clone();
let name = full_task_name.clone();
async move {
let status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &name)
.await
.unwrap_or_default();
status == "Completed"
}
},
)
.await;
let task_status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &full_task_name)
.await
.unwrap();
assert_eq!(
task_status, "Completed",
"Task should have completed successfully"
);
let task_namespace = cloacina::TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"initial_context_task_test",
);
let task_metadata = dal
.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &task_namespace)
.await
.unwrap();
if let Some(context_id) = task_metadata.context_id {
let context = dal
.context()
.read::<serde_json::Value>(context_id)
.await
.unwrap();
let context_data = context.data();
assert!(
context_data.contains_key("processed_initial"),
"Task should have processed initial context data"
);
assert!(
context_data.contains_key("config_value"),
"Initial context should be preserved"
);
if let Some(processed) = context_data.get("processed_initial") {
assert_eq!(
processed,
&Value::String("Processed: \"hello_world\"".to_string())
);
}
} else {
panic!("Task should have produced output context");
}
runner.shutdown().await.unwrap();
}
#[task(
id = "producer_context_task",
dependencies = []
)]
async fn producer_context_task(context: &mut Context<Value>) -> Result<(), TaskError> {
let initial_value = context
.get("seed_value")
.ok_or_else(|| TaskError::ValidationFailed {
message: "No seed_value found in context".to_string(),
})?;
context.insert(
"produced_data",
Value::String(format!("Produced from: {}", initial_value)),
)?;
Ok(())
}
#[task(
id = "consumer_context_task",
dependencies = ["producer_context_task"]
)]
async fn consumer_context_task(context: &mut Context<Value>) -> Result<(), TaskError> {
let produced_data =
context
.get("produced_data")
.ok_or_else(|| TaskError::ValidationFailed {
message: "No produced_data found in context from dependency".to_string(),
})?;
let seed_value = context
.get("seed_value")
.ok_or_else(|| TaskError::ValidationFailed {
message: "No seed_value found in context".to_string(),
})?;
context.insert(
"final_result",
Value::String(format!("Final: {} + {}", produced_data, seed_value)),
)?;
Ok(())
}
#[tokio::test]
async fn test_task_executor_context_loading_with_dependencies() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"dependency_context_pipeline_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let producer_ns = TaskNamespace::new(
"public",
"embedded",
&workflow_name,
"producer_context_task",
);
let workflow = Workflow::builder(&workflow_name)
.description("Test workflow for dependency context loading")
.add_task(Arc::new(producer_context_task_task()))
.unwrap()
.add_task(Arc::new(
consumer_context_task_task().with_dependencies(vec![producer_ns.clone()]),
))
.unwrap()
.build()
.unwrap();
let runtime = cloacina::Runtime::empty();
let namespace1 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"producer_context_task",
);
runtime.register_task(namespace1, || {
Arc::new(producer_context_task_task()) as Arc<dyn cloacina::Task>
});
let namespace2 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"consumer_context_task",
);
let producer_ns_for_closure = producer_ns.clone();
runtime.register_task(namespace2, move || {
Arc::new(
consumer_context_task_task().with_dependencies(vec![producer_ns_for_closure.clone()]),
) as Arc<dyn cloacina::Task>
});
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let mut input_context = Context::new();
input_context
.insert("seed_value", Value::String("initial_seed".to_string()))
.unwrap();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
let producer_full_name = format!(
"{}::{}::{}::producer_context_task",
workflow.tenant(),
workflow.package(),
workflow.name()
);
let consumer_full_name = format!(
"{}::{}::{}::consumer_context_task",
workflow.tenant(),
workflow.package(),
workflow.name()
);
crate::fixtures::poll_until(
Duration::from_secs(10),
Duration::from_millis(100),
"both producer and consumer tasks should complete",
|| {
let dal = dal.clone();
let producer_name = producer_full_name.clone();
let consumer_name = consumer_full_name.clone();
async move {
let p = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &producer_name)
.await
.unwrap_or_default();
let c = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &consumer_name)
.await
.unwrap_or_default();
p == "Completed" && c == "Completed"
}
},
)
.await;
let producer_status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &producer_full_name)
.await
.unwrap();
let consumer_status = dal
.task_execution()
.get_task_status(UniversalUuid(exec_id), &consumer_full_name)
.await
.unwrap();
assert_eq!(
producer_status, "Completed",
"Producer task should have completed"
);
assert_eq!(
consumer_status, "Completed",
"Consumer task should have completed"
);
let consumer_namespace = cloacina::TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"consumer_context_task",
);
let consumer_metadata = dal
.task_execution_metadata()
.get_by_workflow_and_task(UniversalUuid(exec_id), &consumer_namespace)
.await
.unwrap();
if let Some(context_id) = consumer_metadata.context_id {
let context = dal
.context()
.read::<serde_json::Value>(context_id)
.await
.unwrap();
let context_data = context.data();
assert!(
context_data.contains_key("final_result"),
"Consumer should have produced final result"
);
assert!(
context_data.contains_key("produced_data"),
"Consumer should have access to producer data"
);
assert!(
context_data.contains_key("seed_value"),
"Consumer should have access to initial context"
);
if let Some(final_result) = context_data.get("final_result") {
assert_eq!(
final_result,
&Value::String(
"Final: \"Produced from: \\\"initial_seed\\\"\" + \"initial_seed\"".to_string()
)
);
}
} else {
panic!("Consumer task should have produced output context");
}
runner.shutdown().await.unwrap();
}
#[task(id = "always_fails_task", dependencies = [], retry_attempts = 0)]
async fn always_fails_task(_context: &mut Context<Value>) -> Result<(), TaskError> {
Err(TaskError::Unknown {
task_id: "always_fails_task".to_string(),
message: "intentional failure for COR-01 test".to_string(),
})
}
#[task(id = "always_succeeds_task", dependencies = [])]
async fn always_succeeds_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("success", Value::Bool(true))?;
Ok(())
}
#[task(id = "downstream_of_failure", dependencies = ["always_fails_task"])]
async fn downstream_of_failure(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("downstream_ran", Value::Bool(true))?;
Ok(())
}
type TaskCtor = Box<dyn Fn() -> Arc<dyn Task> + Send + Sync>;
async fn run_workflow_and_get_status(
workflow_name: &str,
task_defs: Vec<(&str, TaskCtor)>,
dep_map: Vec<(&str, Vec<&str>)>,
) -> String {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let unique_name = format!(
"{}_{}",
workflow_name,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let default_tenant = "public";
let default_package = "embedded";
let mut builder = Workflow::builder(&unique_name).description("COR-01 regression test");
for (task_id, deps) in &dep_map {
let qualified_deps: Vec<String> = deps
.iter()
.map(|d| {
format!(
"{}::{}::{}::{}",
default_tenant, default_package, unique_name, d
)
})
.collect();
let qualified_refs: Vec<&str> = qualified_deps.iter().map(|s| s.as_str()).collect();
builder = builder
.add_task(Arc::new(WorkflowTask::new(task_id, qualified_refs)))
.unwrap();
}
let workflow = builder.build().unwrap();
let runtime = cloacina::Runtime::new();
for (task_id, constructor) in &task_defs {
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
task_id,
);
let ctor = constructor();
runtime.register_task(namespace, move || ctor.clone());
}
runtime.register_workflow(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.runtime(runtime)
.build()
.await
.unwrap();
let execution = runner
.execute_async(&unique_name, Context::new())
.await
.unwrap();
let exec_id = execution.execution_id;
let dal = cloacina::dal::DAL::new(database.clone());
crate::fixtures::poll_until(
Duration::from_secs(15),
Duration::from_millis(100),
"workflow execution should reach terminal state",
|| {
let dal = dal.clone();
async move {
if let Ok(exec) = dal
.workflow_execution()
.get_by_id(UniversalUuid(exec_id))
.await
{
exec.status == "Failed" || exec.status == "Completed"
} else {
false
}
}
},
)
.await;
let exec = dal
.workflow_execution()
.get_by_id(UniversalUuid(exec_id))
.await
.unwrap();
runner.shutdown().await.unwrap();
exec.status
}
#[tokio::test]
#[serial_test::serial]
async fn test_workflow_all_tasks_succeed_marked_completed() {
let status = run_workflow_and_get_status(
"cor01_all_succeed",
vec![(
"always_succeeds_task",
Box::new(|| Arc::new(always_succeeds_task_task())),
)],
vec![("always_succeeds_task", vec![])],
)
.await;
assert_eq!(
status, "Completed",
"Workflow with all successful tasks must be Completed"
);
}
#[tokio::test]
#[serial_test::serial]
async fn test_workflow_task_fails_marked_failed() {
let status = run_workflow_and_get_status(
"cor01_task_fails",
vec![(
"always_fails_task",
Box::new(|| Arc::new(always_fails_task_task())),
)],
vec![("always_fails_task", vec![])],
)
.await;
assert_eq!(
status, "Failed",
"Workflow with failed task must be Failed, not Completed"
);
}
#[tokio::test]
#[serial_test::serial]
async fn test_workflow_mixed_results_marked_failed() {
let status = run_workflow_and_get_status(
"cor01_mixed",
vec![
(
"always_succeeds_task",
Box::new(|| Arc::new(always_succeeds_task_task())),
),
(
"always_fails_task",
Box::new(|| Arc::new(always_fails_task_task())),
),
],
vec![
("always_succeeds_task", vec![]),
("always_fails_task", vec![]),
],
)
.await;
assert_eq!(
status, "Failed",
"Workflow with any failed task must be Failed"
);
}
#[tokio::test]
#[serial_test::serial]
async fn test_workflow_skipped_downstream_marked_failed() {
let status = run_workflow_and_get_status(
"cor01_skipped_downstream",
vec![
(
"always_fails_task",
Box::new(|| Arc::new(always_fails_task_task())),
),
(
"downstream_of_failure",
Box::new(|| Arc::new(downstream_of_failure_task())),
),
],
vec![
("always_fails_task", vec![]),
("downstream_of_failure", vec!["always_fails_task"]),
],
)
.await;
assert_eq!(
status, "Failed",
"Workflow with failed task + skipped dependents must be Failed"
);
}