use std::future::Future;
use std::pin::pin;
use std::time::Duration;
use std::time::Instant;
use super::execute_or_shutdown;
use crate::MakeDelay;
use crate::Spawn;
use crate::debug;
use crate::far_future;
use crate::info;
use crate::make_instant_from;
use crate::make_instant_from_now;
pub trait SimpleAction: Send + 'static {
fn name(&self) -> &str;
fn run(&mut self) -> impl Future<Output = ()> + Send;
}
pub trait SimpleActionExt: SimpleAction {
fn schedule_with_fixed_delay<Fut, S, D>(
mut self,
is_shutdown: Fut,
spawn: &S,
make_delay: D,
initial_delay: Option<Duration>,
delay: Duration,
) where
Self: Sized,
Fut: Future<Output = ()> + Send + 'static,
S: Spawn,
D: MakeDelay + Send + 'static,
{
spawn.spawn(async move {
info!(
"start scheduled task {} with fixed delay {:?} and initial delay {:?}",
self.name(),
delay,
initial_delay
);
let mut is_shutdown = pin!(is_shutdown);
'schedule: {
if let Some(initial_delay) = initial_delay {
if initial_delay > Duration::ZERO
&& execute_or_shutdown(make_delay.delay(initial_delay), &mut is_shutdown)
.await
.is_break()
{
break 'schedule;
}
}
loop {
debug!("executing scheduled task {}", self.name());
if execute_or_shutdown(self.run(), &mut is_shutdown)
.await
.is_break()
{
break;
};
if execute_or_shutdown(make_delay.delay(delay), &mut is_shutdown)
.await
.is_break()
{
break;
}
}
}
info!("scheduled task {} is shutdown", self.name());
});
}
fn schedule_at_fixed_rate<Fut, S, D>(
mut self,
is_shutdown: Fut,
spawn: &S,
make_delay: D,
initial_delay: Option<Duration>,
period: Duration,
) where
Self: Sized,
Fut: Future<Output = ()> + Send + 'static,
S: Spawn,
D: MakeDelay + Send + 'static,
{
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
fn calculate_next_on_miss(next: Instant, period: Duration) -> Instant {
let now = Instant::now();
if now.saturating_duration_since(next) <= Duration::from_millis(5) {
make_instant_from(next, period)
} else {
match now.checked_add(period) {
None => far_future(),
Some(instant) => {
let delta = (now - next).as_nanos() % period.as_nanos();
let delta: u64 = delta
.try_into()
.unwrap_or_else(|_| panic!("too much time has elapsed: {delta}"));
let delta = Duration::from_nanos(delta);
instant - delta
}
}
}
}
spawn.spawn(async move {
info!(
"start scheduled task {} at fixed rate {:?} with initial delay {:?}",
self.name(),
period,
initial_delay
);
let mut is_shutdown = pin!(is_shutdown);
'schedule: {
let mut next = Instant::now();
if let Some(initial_delay) = initial_delay {
if initial_delay > Duration::ZERO {
next = make_instant_from_now(initial_delay);
if execute_or_shutdown(make_delay.delay_until(next), &mut is_shutdown)
.await
.is_break()
{
break 'schedule;
}
}
}
loop {
debug!("executing scheduled task {}", self.name());
if execute_or_shutdown(self.run(), &mut is_shutdown)
.await
.is_break()
{
break;
};
next = calculate_next_on_miss(next, period);
if execute_or_shutdown(make_delay.delay_until(next), &mut is_shutdown)
.await
.is_break()
{
break;
}
}
}
info!("scheduled task {} is shutdown", self.name());
});
}
}
impl<T: SimpleAction> SimpleActionExt for T {}