use std::{
sync::Arc,
time::Instant,
};
use crate::service::ExecutorServiceLifecycle;
use super::{
scheduled_task_entry::StartedScheduledTask,
single_thread_scheduled_executor_service_inner::SingleThreadScheduledExecutorServiceInner,
single_thread_scheduled_executor_service_state::SingleThreadScheduledExecutorServiceState,
};
pub(crate) struct ScheduledWorker;
impl ScheduledWorker {
pub(crate) fn run(inner: Arc<SingleThreadScheduledExecutorServiceInner>) {
run_scheduled_worker(inner);
}
}
fn run_scheduled_worker(inner: Arc<SingleThreadScheduledExecutorServiceInner>) {
loop {
let task = next_ready_task(&inner);
let Some(task) = task else {
return;
};
task();
inner.finish_running_task();
}
}
fn next_ready_task(
inner: &SingleThreadScheduledExecutorServiceInner,
) -> Option<StartedScheduledTask> {
let mut state = inner.state.lock();
loop {
prune_cancelled_front(&mut state);
if state.lifecycle == ExecutorServiceLifecycle::Stopping {
inner.terminate(&mut state);
return None;
}
if state.tasks.is_empty() && state.lifecycle != ExecutorServiceLifecycle::Running {
inner.terminate(&mut state);
return None;
}
let Some(next_deadline) = state.tasks.peek().map(|task| task.deadline) else {
state = state.wait();
continue;
};
let now = Instant::now();
if next_deadline > now {
let timeout = next_deadline.saturating_duration_since(now);
let (next_state, _) = state.wait_timeout(timeout);
state = next_state;
continue;
}
let Some(task) = state.tasks.pop() else {
continue;
};
let Some(started_task) = task.entry.start() else {
continue;
};
inner.start_task();
return Some(started_task);
}
}
fn prune_cancelled_front(state: &mut SingleThreadScheduledExecutorServiceState) {
while state
.tasks
.peek()
.is_some_and(|task| task.entry.is_cancelled())
{
state.tasks.pop();
}
}