use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use taskmill::{
Domain, IoBudget, PauseReasons, Scheduler, SchedulerEvent, SubmitOutcome, TaskStatus,
TaskStore, TaskSubmission,
};
use tokio_util::sync::CancellationToken;
use super::common::*;
#[tokio::test]
async fn submit_to_paused_group_inserts_as_paused() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
sched.pause_group("g1").await.unwrap();
let outcome = sched
.submit(
&TaskSubmission::new("test::test")
.key("paused-submit")
.group("g1"),
)
.await
.unwrap();
match outcome {
SubmitOutcome::Inserted { id, group_paused } => {
assert!(group_paused, "group_paused flag should be true");
let task = sched.store().task_by_id(id).await.unwrap().unwrap();
assert_eq!(task.status, TaskStatus::Paused);
assert!(task.pause_reasons.contains(PauseReasons::GROUP));
}
other => panic!("expected Inserted, got {other:?}"),
}
let outcome2 = sched
.submit(
&TaskSubmission::new("test::test")
.key("normal-submit")
.group("g2"),
)
.await
.unwrap();
match outcome2 {
SubmitOutcome::Inserted { group_paused, .. } => {
assert!(
!group_paused,
"group_paused should be false for non-paused group"
);
}
other => panic!("expected Inserted, got {other:?}"),
}
}
#[tokio::test]
async fn submit_batch_to_paused_group() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
sched.pause_group("g1").await.unwrap();
let subs: Vec<_> = (0..3)
.map(|i| {
TaskSubmission::new("test::test")
.key(format!("batch-{i}"))
.group("g1")
})
.collect();
let outcomes = sched.submit_batch(&subs).await.unwrap();
for outcome in outcomes {
match outcome {
SubmitOutcome::Inserted { id, group_paused } => {
assert!(group_paused);
let task = sched.store().task_by_id(id).await.unwrap().unwrap();
assert_eq!(task.status, TaskStatus::Paused);
}
other => panic!("expected Inserted, got {other:?}"),
}
}
}
#[tokio::test]
async fn submit_to_paused_group_resumes_on_group_resume() {
let count = Arc::new(AtomicUsize::new(0));
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(
Domain::<TestDomain>::new().task::<TestTask>(CountingExecutor {
count: count.clone(),
}),
)
.max_concurrency(4)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let mut rx = sched.subscribe();
sched.pause_group("g1").await.unwrap();
for i in 0..3 {
sched
.submit(
&TaskSubmission::new("test::test")
.key(format!("resume-{i}"))
.group("g1"),
)
.await
.unwrap();
}
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
let handle = tokio::spawn(async move { sched_clone.run(token_clone).await });
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(
count.load(Ordering::SeqCst),
0,
"no tasks should run while group is paused"
);
sched.resume_group("g1").await.unwrap();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
let mut completed = 0;
while tokio::time::Instant::now() < deadline && completed < 3 {
if let Ok(Ok(SchedulerEvent::Completed(..))) =
tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
{
completed += 1;
}
}
token.cancel();
let _ = handle.await;
assert_eq!(
completed, 3,
"all 3 tasks should complete after group resume"
);
assert_eq!(count.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn recurring_next_instance_paused_in_paused_group() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.max_concurrency(4)
.poll_interval(Duration::from_millis(50))
.build()
.await
.unwrap();
let store = sched.store();
let outcome = sched
.submit(
&TaskSubmission::new("test::test")
.key("recurring-g1")
.group("g1")
.recurring(Duration::from_secs(600)),
)
.await
.unwrap();
let id = outcome.id().unwrap();
let task = store.pop_by_id(id).await.unwrap().unwrap();
sched.pause_group("g1").await.unwrap();
store
.complete_with_record(&task, &IoBudget::default())
.await
.unwrap();
let key = task.key.clone();
let next = store.task_by_key(&key).await.unwrap().unwrap();
assert_eq!(next.status, TaskStatus::Paused);
assert!(next.pause_reasons.contains(PauseReasons::GROUP));
}
#[tokio::test]
async fn blocked_task_unblocks_into_paused_group() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
let store = sched.store();
let handle = sched.domain::<TestDomain>();
let dep_outcome = handle
.submit_with(TestTask)
.key("dep")
.group("g1")
.await
.unwrap();
let dep_id = dep_outcome.id().unwrap();
let blocked_outcome = handle
.submit_with(TestTask)
.key("blocked")
.group("g1")
.depends_on(dep_id)
.await
.unwrap();
let blocked_id = blocked_outcome.id().unwrap();
let t = store.task_by_id(blocked_id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Blocked);
sched.pause_group("g1").await.unwrap();
let dep = store.pop_by_id(dep_id).await.unwrap();
if let Some(dep) = dep {
store
.complete_with_record(&dep, &IoBudget::default())
.await
.unwrap();
store.resolve_dependents(dep_id).await.unwrap();
} else {
sched.resume_group("g1").await.unwrap();
let dep = store.pop_by_id(dep_id).await.unwrap().unwrap();
sched.pause_group("g1").await.unwrap();
store
.complete_with_record(&dep, &IoBudget::default())
.await
.unwrap();
store.resolve_dependents(dep_id).await.unwrap();
}
let t = store.task_by_id(blocked_id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Paused);
assert!(t.pause_reasons.contains(PauseReasons::GROUP));
sched.resume_group("g1").await.unwrap();
let t = store.task_by_id(blocked_id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Pending);
assert!(t.pause_reasons.is_empty());
}
#[tokio::test]
async fn multi_reason_pause_no_stranding() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
let store = sched.store();
let handle = sched.domain::<TestDomain>();
let outcome = handle
.submit_with(TestTask)
.key("multi")
.group("g1")
.await
.unwrap();
let id = outcome.id().unwrap();
handle.pause().await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Paused);
assert!(t.pause_reasons.contains(PauseReasons::MODULE));
sched.pause_group("g1").await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert!(t.pause_reasons.contains(PauseReasons::MODULE));
assert!(t.pause_reasons.contains(PauseReasons::GROUP));
handle.resume().await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Paused);
assert!(!t.pause_reasons.contains(PauseReasons::MODULE));
assert!(t.pause_reasons.contains(PauseReasons::GROUP));
sched.resume_group("g1").await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Pending);
assert!(t.pause_reasons.is_empty());
}
#[tokio::test]
async fn group_resume_with_module_still_paused() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
let store = sched.store();
let handle = sched.domain::<TestDomain>();
let outcome = handle
.submit_with(TestTask)
.key("dual")
.group("g1")
.await
.unwrap();
let id = outcome.id().unwrap();
handle.pause().await.unwrap();
sched.pause_group("g1").await.unwrap();
sched.resume_group("g1").await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Paused);
assert!(t.pause_reasons.contains(PauseReasons::MODULE));
assert!(!t.pause_reasons.contains(PauseReasons::GROUP));
handle.resume().await.unwrap();
let t = store.task_by_id(id).await.unwrap().unwrap();
assert_eq!(t.status, TaskStatus::Pending);
assert!(t.pause_reasons.is_empty());
}
#[tokio::test]
async fn domain_handle_group_pause_delegation() {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<TestDomain>::new().task::<TestTask>(NoopExecutor))
.build()
.await
.unwrap();
let handle = sched.domain::<TestDomain>();
assert!(!handle.is_group_paused("g1"));
assert!(handle.paused_groups().is_empty());
handle.pause_group("g1").await.unwrap();
assert!(handle.is_group_paused("g1"));
assert_eq!(handle.paused_groups(), vec!["g1".to_string()]);
let outcome = handle
.submit_with(TestTask)
.key("via-handle")
.group("g1")
.await
.unwrap();
match outcome {
SubmitOutcome::Inserted { group_paused, .. } => assert!(group_paused),
other => panic!("expected Inserted, got {other:?}"),
}
handle.resume_group("g1").await.unwrap();
assert!(!handle.is_group_paused("g1"));
assert!(handle.paused_groups().is_empty());
}