use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use taskmill::{Domain, Scheduler, TaskStore};
use tokio_util::sync::CancellationToken;
use super::common::*;
async fn dep_scheduler() -> Scheduler {
Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap()
}
#[tokio::test]
async fn dep_basic_blocked_then_unblocked() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("dep-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("dep-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Blocked);
assert!(store.peek_next(None).await.unwrap().is_some());
let a = store.pop_next(None).await.unwrap().unwrap();
assert_eq!(a.id, id_a);
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_a).await.unwrap();
assert_eq!(unblocked, vec![id_b]);
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Pending);
}
#[tokio::test]
async fn dep_fail_cancels_dependent() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("fail-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("fail-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let a = store.pop_next(None).await.unwrap().unwrap();
store
.fail(
a.id,
"boom",
false,
0,
&taskmill::IoBudget::default(),
&Default::default(),
)
.await
.unwrap();
let (failed, _) = store.fail_dependents(id_a).await.unwrap();
assert_eq!(failed, vec![id_b]);
assert!(store.task_by_id(id_b).await.unwrap().is_none());
let hist = store.history(10, 0).await.unwrap();
let b_hist = hist.iter().find(|h| h.id == id_b).unwrap();
assert_eq!(b_hist.status, taskmill::HistoryStatus::DependencyFailed);
}
#[tokio::test]
async fn dep_fan_in() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("fi-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("fi-b")
.await
.unwrap()
.id()
.unwrap();
let id_c = handle
.submit_with(TestTask)
.key("fi-c")
.depends_on_all([id_a, id_b])
.await
.unwrap()
.id()
.unwrap();
let c = store.task_by_id(id_c).await.unwrap().unwrap();
assert_eq!(c.status, taskmill::TaskStatus::Blocked);
let a = store.pop_next(None).await.unwrap().unwrap();
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_a).await.unwrap();
assert!(unblocked.is_empty());
let c = store.task_by_id(id_c).await.unwrap().unwrap();
assert_eq!(c.status, taskmill::TaskStatus::Blocked);
let b = store.pop_next(None).await.unwrap().unwrap();
store
.complete(b.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_b).await.unwrap();
assert_eq!(unblocked, vec![id_c]);
let c = store.task_by_id(id_c).await.unwrap().unwrap();
assert_eq!(c.status, taskmill::TaskStatus::Pending);
}
#[tokio::test]
async fn dep_fan_out() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("fo-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("fo-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let id_c = handle
.submit_with(TestTask)
.key("fo-c")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let a = store.pop_next(None).await.unwrap().unwrap();
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let mut unblocked = store.resolve_dependents(id_a).await.unwrap();
unblocked.sort();
let mut expected = vec![id_b, id_c];
expected.sort();
assert_eq!(unblocked, expected);
}
#[tokio::test]
async fn dep_cycle_detection_direct() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let id_a = handle
.submit_with(TestTask)
.key("cyc-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("cyc-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let _id_c = handle
.submit_with(TestTask)
.key("cyc-c")
.depends_on(id_b)
.await
.unwrap()
.id()
.unwrap();
let _ = handle
.submit_with(TestTask)
.key("cyc-self")
.depends_on(id_a)
.await
.unwrap();
assert!(matches!(
taskmill::StoreError::CyclicDependency,
taskmill::StoreError::CyclicDependency
));
}
#[tokio::test]
async fn dep_already_completed() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("done-a")
.await
.unwrap()
.id()
.unwrap();
let a = store.pop_next(None).await.unwrap().unwrap();
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("done-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Pending);
}
#[tokio::test]
async fn dep_already_failed() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("af-a")
.await
.unwrap()
.id()
.unwrap();
let a = store.pop_next(None).await.unwrap().unwrap();
store
.fail(
a.id,
"boom",
false,
0,
&taskmill::IoBudget::default(),
&Default::default(),
)
.await
.unwrap();
let err = handle
.submit_with(TestTask)
.key("af-b")
.depends_on(id_a)
.await
.unwrap_err();
assert!(matches!(err, taskmill::StoreError::DependencyFailed(_)));
}
#[tokio::test]
async fn dep_nonexistent() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let err = handle
.submit_with(TestTask)
.key("ne")
.depends_on(99999)
.await
.unwrap_err();
assert!(matches!(
err,
taskmill::StoreError::InvalidDependency(99999)
));
}
#[tokio::test]
async fn dep_cancel_cascades() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("cc-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("cc-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
store.cancel_to_history(id_a).await.unwrap();
assert!(store.task_by_id(id_b).await.unwrap().is_none());
let hist = store.history(10, 0).await.unwrap();
let b_hist = hist.iter().find(|h| h.id == id_b);
assert!(b_hist.is_some());
assert_eq!(
b_hist.unwrap().status,
taskmill::HistoryStatus::DependencyFailed
);
}
#[tokio::test]
async fn dep_ignore_policy_unblocks() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("ig-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("ig-b")
.depends_on(id_a)
.on_dependency_failure(taskmill::DependencyFailurePolicy::Ignore)
.await
.unwrap()
.id()
.unwrap();
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Blocked);
let a = store.pop_next(None).await.unwrap().unwrap();
store
.fail(
a.id,
"boom",
false,
0,
&taskmill::IoBudget::default(),
&Default::default(),
)
.await
.unwrap();
let (failed, unblocked) = store.fail_dependents(id_a).await.unwrap();
assert!(failed.is_empty());
assert_eq!(unblocked, vec![id_b]);
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Pending);
}
#[tokio::test]
async fn dep_query_methods() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("qm-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("qm-b")
.await
.unwrap()
.id()
.unwrap();
let id_c = handle
.submit_with(TestTask)
.key("qm-c")
.depends_on_all([id_a, id_b])
.await
.unwrap()
.id()
.unwrap();
let deps = store.task_dependencies(id_c).await.unwrap();
assert_eq!(deps.len(), 2);
assert!(deps.contains(&id_a));
assert!(deps.contains(&id_b));
let dependents_a = store.task_dependents(id_a).await.unwrap();
assert_eq!(dependents_a, vec![id_c]);
let blocked = store.blocked_tasks().await.unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].id, id_c);
let blocked_count = store.blocked_count().await.unwrap();
assert_eq!(blocked_count, 1);
}
#[tokio::test]
async fn dep_diamond_chain() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("d-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("d-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let id_c = handle
.submit_with(TestTask)
.key("d-c")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let id_d = handle
.submit_with(TestTask)
.key("d-d")
.depends_on_all([id_b, id_c])
.await
.unwrap()
.id()
.unwrap();
assert_eq!(
store.task_by_id(id_b).await.unwrap().unwrap().status,
taskmill::TaskStatus::Blocked
);
assert_eq!(
store.task_by_id(id_c).await.unwrap().unwrap().status,
taskmill::TaskStatus::Blocked
);
assert_eq!(
store.task_by_id(id_d).await.unwrap().unwrap().status,
taskmill::TaskStatus::Blocked
);
let a = store.pop_next(None).await.unwrap().unwrap();
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_a).await.unwrap();
assert_eq!(unblocked.len(), 2);
assert_eq!(
store.task_by_id(id_d).await.unwrap().unwrap().status,
taskmill::TaskStatus::Blocked
);
let b = store.pop_next(None).await.unwrap().unwrap();
store
.complete(b.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_b).await.unwrap();
assert!(unblocked.is_empty());
let c = store.pop_next(None).await.unwrap().unwrap();
store
.complete(c.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_c).await.unwrap();
assert_eq!(unblocked, vec![id_d]);
let d = store.task_by_id(id_d).await.unwrap().unwrap();
assert_eq!(d.status, taskmill::TaskStatus::Pending);
}
#[tokio::test]
async fn dep_blocked_count_in_snapshot() {
let store = TaskStore::open_memory().await.unwrap();
let sched = Scheduler::builder()
.store(store)
.domain(
Domain::<TestDomain>::new().task::<TestTask>(DelayExecutor(Duration::from_secs(60))),
)
.build()
.await
.unwrap();
let handle = sched.domain::<TestDomain>();
let outcome_a = handle.submit_with(TestTask).key("snap-a").await.unwrap();
let id_a = outcome_a.id().unwrap();
handle
.submit_with(TestTask)
.key("snap-b")
.depends_on(id_a)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let snap = sched.snapshot().await.unwrap();
assert_eq!(snap.blocked_count, 1);
}
#[tokio::test]
async fn dep_full_chain_with_scheduler() {
let store = TaskStore::open_memory().await.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let sched = Scheduler::builder()
.store(store)
.domain(
Domain::<TestDomain>::new().task::<StepTask>(CountingExecutor {
count: counter.clone(),
}),
)
.build()
.await
.unwrap();
let domain_handle = sched.domain::<TestDomain>();
let mut rx = domain_handle.events();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let join_handle = tokio::spawn(async move {
sched_clone.run(token_clone).await;
});
let outcome_a = domain_handle
.submit_with(StepTask)
.key("chain-a")
.await
.unwrap();
let id_a = outcome_a.id().unwrap();
let outcome_b = domain_handle
.submit_with(StepTask)
.key("chain-b")
.depends_on(id_a)
.await
.unwrap();
let id_b = outcome_b.id().unwrap();
let outcome_c = domain_handle
.submit_with(StepTask)
.key("chain-c")
.depends_on(id_b)
.await
.unwrap();
let _id_c = outcome_c.id().unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut completed = 0;
while completed < 3 && tokio::time::Instant::now() < deadline {
match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Ok(taskmill::SchedulerEvent::Completed(_))) => completed += 1,
_ => continue,
}
}
token.cancel();
let _ = join_handle.await;
assert_eq!(completed, 3);
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn dep_blocked_tasks_survive_across_store_reopen() {
let sched = dep_scheduler().await;
let handle = sched.domain::<TestDomain>();
let store = sched.store();
let id_a = handle
.submit_with(TestTask)
.key("rec-a")
.await
.unwrap()
.id()
.unwrap();
let id_b = handle
.submit_with(TestTask)
.key("rec-b")
.depends_on(id_a)
.await
.unwrap()
.id()
.unwrap();
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Blocked);
let deps = store.task_dependencies(id_b).await.unwrap();
assert_eq!(deps, vec![id_a]);
let a = store.pop_next(None).await.unwrap().unwrap();
store
.complete(a.id, &taskmill::IoBudget::default())
.await
.unwrap();
let unblocked = store.resolve_dependents(id_a).await.unwrap();
assert_eq!(unblocked, vec![id_b]);
let b = store.task_by_id(id_b).await.unwrap().unwrap();
assert_eq!(b.status, taskmill::TaskStatus::Pending);
}