use async_trait::async_trait;
use cloacina::executor::pipeline_executor::{PipelineExecution, PipelineStatus};
use cloacina::executor::PipelineExecutor;
use cloacina::runner::DefaultRunner;
use cloacina::*;
use serde_json::Value;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use crate::fixtures::get_or_init_fixture;
async fn wait_for_status(
execution: &PipelineExecution,
target: impl Fn(&PipelineStatus) -> bool,
timeout: Duration,
) -> Result<PipelineStatus, String> {
let start = std::time::Instant::now();
loop {
let status = execution
.get_status()
.await
.map_err(|e| format!("Failed to get status: {}", e))?;
if target(&status) {
return Ok(status);
}
if start.elapsed() > timeout {
return Err(format!(
"Timeout waiting for target status, current status: {:?}",
status
));
}
time::sleep(Duration::from_millis(50)).await;
}
}
async fn wait_for_terminal(
execution: &PipelineExecution,
timeout: Duration,
) -> Result<PipelineStatus, String> {
wait_for_status(execution, |s| s.is_terminal(), timeout).await
}
#[derive(Debug)]
struct WorkflowTask {
id: String,
dependencies: Vec<TaskNamespace>,
}
impl WorkflowTask {
fn new(id: &str, deps: Vec<&str>) -> Self {
Self {
id: id.to_string(),
dependencies: deps
.into_iter()
.map(|s| TaskNamespace::from_string(s).unwrap())
.collect(),
}
}
}
#[async_trait]
impl Task for WorkflowTask {
async fn execute(
&self,
context: Context<serde_json::Value>,
) -> Result<Context<serde_json::Value>, TaskError> {
Ok(context) }
fn id(&self) -> &str {
&self.id
}
fn dependencies(&self) -> &[TaskNamespace] {
&self.dependencies
}
}
#[task(
id = "quick_task",
dependencies = []
)]
async fn quick_task(context: &mut Context<Value>) -> Result<(), TaskError> {
context.insert("quick_result", Value::String("done".to_string()))?;
Ok(())
}
#[task(
id = "slow_first_task",
dependencies = []
)]
async fn slow_first_task(context: &mut Context<Value>) -> Result<(), TaskError> {
time::sleep(Duration::from_secs(2)).await;
context.insert("slow_first_result", Value::String("completed".to_string()))?;
Ok(())
}
#[task(
id = "slow_second_task",
dependencies = ["slow_first_task"]
)]
async fn slow_second_task(context: &mut Context<Value>) -> Result<(), TaskError> {
time::sleep(Duration::from_secs(2)).await;
context.insert("slow_second_result", Value::String("completed".to_string()))?;
Ok(())
}
#[tokio::test]
async fn test_pause_running_pipeline() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"pause_test_pipeline_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let first_ns = TaskNamespace::new("public", "embedded", &workflow_name, "slow_first_task");
let workflow = Workflow::builder(&workflow_name)
.description("Test pipeline for pause/resume")
.add_task(Arc::new(slow_first_task_task()))
.unwrap()
.add_task(Arc::new(
slow_second_task_task().with_dependencies(vec![first_ns.clone()]),
))
.unwrap()
.build()
.unwrap();
let namespace1 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"slow_first_task",
);
register_task_constructor(namespace1, || Arc::new(slow_first_task_task()));
let namespace2 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"slow_second_task",
);
let first_ns_clone = first_ns.clone();
register_task_constructor(namespace2, move || {
Arc::new(slow_second_task_task().with_dependencies(vec![first_ns_clone.clone()]))
});
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.build()
.await
.unwrap();
let input_context = Context::new();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let pipeline_id = execution.execution_id;
time::sleep(Duration::from_millis(200)).await;
execution.pause(Some("Test pause")).await.unwrap();
let status = execution.get_status().await.unwrap();
assert_eq!(status, PipelineStatus::Paused, "Pipeline should be paused");
let dal = cloacina::dal::DAL::new(database.clone());
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(pipeline_id))
.await
.unwrap();
assert_eq!(pipeline.status, "Paused");
assert!(pipeline.paused_at.is_some(), "paused_at should be set");
assert_eq!(
pipeline.pause_reason,
Some("Test pause".to_string()),
"pause_reason should be set"
);
runner.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_resume_paused_pipeline() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let database = fixture.get_database();
let workflow_name = format!(
"resume_test_pipeline_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let first_ns = TaskNamespace::new("public", "embedded", &workflow_name, "slow_first_task");
let workflow = Workflow::builder(&workflow_name)
.description("Test pipeline for resume")
.add_task(Arc::new(slow_first_task_task()))
.unwrap()
.add_task(Arc::new(
slow_second_task_task().with_dependencies(vec![first_ns.clone()]),
))
.unwrap()
.build()
.unwrap();
let namespace1 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"slow_first_task",
);
register_task_constructor(namespace1, || Arc::new(slow_first_task_task()));
let namespace2 = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"slow_second_task",
);
let first_ns_clone = first_ns.clone();
register_task_constructor(namespace2, move || {
Arc::new(slow_second_task_task().with_dependencies(vec![first_ns_clone.clone()]))
});
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.build()
.await
.unwrap();
let input_context = Context::new();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
let pipeline_id = execution.execution_id;
time::sleep(Duration::from_millis(200)).await;
execution.pause(None).await.unwrap();
let status = execution.get_status().await.unwrap();
assert_eq!(status, PipelineStatus::Paused);
execution.resume().await.unwrap();
let status = execution.get_status().await.unwrap();
assert!(
status == PipelineStatus::Running || status == PipelineStatus::Pending,
"Pipeline should be active after resume, got {:?}",
status
);
let dal = cloacina::dal::DAL::new(database.clone());
let pipeline = dal
.pipeline_execution()
.get_by_id(UniversalUuid(pipeline_id))
.await
.unwrap();
assert_eq!(pipeline.status, "Running");
assert!(
pipeline.paused_at.is_none(),
"paused_at should be cleared after resume"
);
assert!(
pipeline.pause_reason.is_none(),
"pause_reason should be cleared after resume"
);
wait_for_terminal(&execution, Duration::from_secs(30))
.await
.expect("Pipeline should complete after resume");
runner.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_pause_non_running_pipeline_fails() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let workflow_name = format!(
"pause_fail_test_pipeline_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let workflow = Workflow::builder(&workflow_name)
.description("Test pipeline for pause failure")
.add_task(Arc::new(quick_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"quick_task",
);
register_task_constructor(namespace, || Arc::new(quick_task_task()));
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.build()
.await
.unwrap();
let input_context = Context::new();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
wait_for_terminal(&execution, Duration::from_secs(30))
.await
.expect("Pipeline should complete");
let result = execution.pause(None).await;
assert!(result.is_err(), "Pausing a completed pipeline should fail");
runner.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_resume_non_paused_pipeline_fails() {
let fixture = get_or_init_fixture().await;
let mut fixture = fixture.lock().unwrap_or_else(|e| e.into_inner());
fixture.reset_database().await;
fixture.initialize().await;
let database_url = fixture.get_database_url();
let workflow_name = format!(
"resume_fail_test_pipeline_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let workflow = Workflow::builder(&workflow_name)
.description("Test pipeline for resume failure")
.add_task(Arc::new(slow_first_task_task()))
.unwrap()
.build()
.unwrap();
let namespace = TaskNamespace::new(
workflow.tenant(),
workflow.package(),
workflow.name(),
"slow_first_task",
);
register_task_constructor(namespace, || Arc::new(slow_first_task_task()));
register_workflow_constructor(workflow.name().to_string(), {
let workflow = workflow.clone();
move || workflow.clone()
});
let schema = fixture.get_schema();
let runner = DefaultRunner::builder()
.database_url(&database_url)
.schema(&schema)
.build()
.await
.unwrap();
let input_context = Context::new();
let execution = runner
.execute_async(&workflow_name, input_context)
.await
.unwrap();
wait_for_status(
&execution,
|s| *s == PipelineStatus::Running || *s == PipelineStatus::Pending,
Duration::from_secs(5),
)
.await
.expect("Pipeline should be scheduled");
let result = execution.resume().await;
assert!(
result.is_err(),
"Resuming a running (not paused) pipeline should fail"
);
runner.shutdown().await.unwrap();
}