use sqlx::PgPool;
use tasker_shared::models::{
core::IdentityStrategy,
named_step::{NamedStep, NewNamedStep},
named_task::{NamedTask, NewNamedTask},
task::{NewTask, Task},
task_namespace::{NewTaskNamespace, TaskNamespace},
workflow_step::{NewWorkflowStep, WorkflowStep},
workflow_step_edge::{NewWorkflowStepEdge, WorkflowStepEdge},
};
use uuid::Uuid;
async fn create_test_task(pool: &PgPool, identity_hash: &str) -> sqlx::Result<Task> {
let namespace = TaskNamespace::create(
pool,
NewTaskNamespace {
name: format!("test_cycle_ns_{}", identity_hash),
description: Some("Test namespace for cycle detection".to_string()),
},
)
.await?;
let named_task = NamedTask::create(
pool,
NewNamedTask {
name: format!("test_cycle_task_{}", identity_hash),
version: Some("1.0.0".to_string()),
description: Some("Test task for cycle detection".to_string()),
task_namespace_uuid: namespace.task_namespace_uuid,
configuration: None,
identity_strategy: IdentityStrategy::Strict,
},
)
.await?;
Task::create(
pool,
NewTask {
named_task_uuid: named_task.named_task_uuid,
requested_at: None,
initiator: None,
source_system: None,
reason: None,
tags: None,
context: Some(serde_json::json!({"test": "cycle_detection"})),
identity_hash: identity_hash.to_string(),
priority: Some(5),
correlation_id: Uuid::now_v7(),
parent_correlation_id: None,
},
)
.await
}
async fn create_test_steps(
pool: &PgPool,
task_uuid: Uuid,
step_names: &[&str],
) -> sqlx::Result<Vec<WorkflowStep>> {
let mut workflow_steps = Vec::new();
for name in step_names {
let named_step = NamedStep::create(
pool,
NewNamedStep {
name: format!("{}_{}", name, task_uuid),
description: Some(format!("Test step: {}", name)),
},
)
.await?;
let workflow_step = WorkflowStep::create(
pool,
NewWorkflowStep {
task_uuid,
named_step_uuid: named_step.named_step_uuid,
retryable: Some(true),
max_attempts: Some(3),
inputs: None,
},
)
.await?;
workflow_steps.push(workflow_step);
}
Ok(workflow_steps)
}
#[sqlx::test(migrator = "tasker_shared::database::migrator::MIGRATOR")]
async fn test_direct_cycle_detection(pool: PgPool) -> sqlx::Result<()> {
let task = create_test_task(&pool, "direct_cycle").await?;
let steps = create_test_steps(&pool, task.task_uuid, &["step_a", "step_b"]).await?;
let step_a = &steps[0];
let step_b = &steps[1];
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_a.workflow_step_uuid,
to_step_uuid: step_b.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
let would_cycle = WorkflowStepEdge::would_create_cycle(
&pool,
step_b.workflow_step_uuid,
step_a.workflow_step_uuid,
)
.await?;
assert!(would_cycle, "Expected cycle to be detected for B -> A");
Ok(())
}
#[sqlx::test(migrator = "tasker_shared::database::migrator::MIGRATOR")]
async fn test_indirect_cycle_detection(pool: PgPool) -> sqlx::Result<()> {
let task = create_test_task(&pool, "indirect_cycle").await?;
let steps = create_test_steps(&pool, task.task_uuid, &["step_a", "step_b", "step_c"]).await?;
let step_a = &steps[0];
let step_b = &steps[1];
let step_c = &steps[2];
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_a.workflow_step_uuid,
to_step_uuid: step_b.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_b.workflow_step_uuid,
to_step_uuid: step_c.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
let would_cycle = WorkflowStepEdge::would_create_cycle(
&pool,
step_c.workflow_step_uuid,
step_a.workflow_step_uuid,
)
.await?;
assert!(would_cycle, "Expected cycle to be detected for C -> A");
Ok(())
}
#[sqlx::test(migrator = "tasker_shared::database::migrator::MIGRATOR")]
async fn test_self_referencing_cycle(pool: PgPool) -> sqlx::Result<()> {
let task = create_test_task(&pool, "self_reference").await?;
let steps = create_test_steps(&pool, task.task_uuid, &["step_a"]).await?;
let step_a = &steps[0];
let would_cycle = WorkflowStepEdge::would_create_cycle(
&pool,
step_a.workflow_step_uuid,
step_a.workflow_step_uuid,
)
.await?;
assert!(
!would_cycle,
"SQL function does not detect self-loops (handled at WorkflowStepBuilder level)"
);
Ok(())
}
#[sqlx::test(migrator = "tasker_shared::database::migrator::MIGRATOR")]
async fn test_valid_dag_no_cycle(pool: PgPool) -> sqlx::Result<()> {
let task = create_test_task(&pool, "valid_dag").await?;
let steps = create_test_steps(&pool, task.task_uuid, &["step_a", "step_b", "step_c"]).await?;
let step_a = &steps[0];
let step_b = &steps[1];
let step_c = &steps[2];
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_a.workflow_step_uuid,
to_step_uuid: step_b.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
let would_cycle = WorkflowStepEdge::would_create_cycle(
&pool,
step_b.workflow_step_uuid,
step_c.workflow_step_uuid,
)
.await?;
assert!(!would_cycle, "Expected no cycle for valid DAG: A -> B -> C");
Ok(())
}
#[sqlx::test(migrator = "tasker_shared::database::migrator::MIGRATOR")]
async fn test_diamond_pattern_no_cycle(pool: PgPool) -> sqlx::Result<()> {
let task = create_test_task(&pool, "diamond_pattern").await?;
let steps = create_test_steps(
&pool,
task.task_uuid,
&["step_a", "step_b", "step_c", "step_d"],
)
.await?;
let step_a = &steps[0];
let step_b = &steps[1];
let step_c = &steps[2];
let step_d = &steps[3];
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_a.workflow_step_uuid,
to_step_uuid: step_b.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_a.workflow_step_uuid,
to_step_uuid: step_c.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
let would_cycle_b_d = WorkflowStepEdge::would_create_cycle(
&pool,
step_b.workflow_step_uuid,
step_d.workflow_step_uuid,
)
.await?;
assert!(
!would_cycle_b_d,
"Expected no cycle for diamond pattern: B -> D"
);
let would_cycle_c_d = WorkflowStepEdge::would_create_cycle(
&pool,
step_c.workflow_step_uuid,
step_d.workflow_step_uuid,
)
.await?;
assert!(
!would_cycle_c_d,
"Expected no cycle for diamond pattern: C -> D"
);
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_b.workflow_step_uuid,
to_step_uuid: step_d.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
WorkflowStepEdge::create(
&pool,
NewWorkflowStepEdge {
from_step_uuid: step_c.workflow_step_uuid,
to_step_uuid: step_d.workflow_step_uuid,
name: "provides".to_string(),
},
)
.await?;
let would_cycle_d_a = WorkflowStepEdge::would_create_cycle(
&pool,
step_d.workflow_step_uuid,
step_a.workflow_step_uuid,
)
.await?;
assert!(
would_cycle_d_a,
"Expected cycle to be detected for D -> A after diamond completion"
);
Ok(())
}