use super::{Trigger, TriggerError};
use duration_string::DurationString;
use log::info;
use std::{
sync::mpsc::Sender,
thread::sleep,
time::{Duration, Instant},
};
use thiserror::Error;
pub struct ScheduleTrigger {
duration: Duration,
timeout: Option<Duration>,
}
#[derive(Debug, Error)]
pub enum ScheduleError {
#[error("cannot trigger changes, receiver hang up")]
ReceiverHangup(#[from] std::sync::mpsc::SendError<Option<()>>),
}
impl From<ScheduleError> for TriggerError {
fn from(val: ScheduleError) -> Self {
match val {
ScheduleError::ReceiverHangup(s) => TriggerError::ReceiverHangup(s),
}
}
}
impl ScheduleTrigger {
pub fn new(duration: Duration) -> Self {
Self {
duration,
timeout: None,
}
}
pub fn new_with_timeout(duration: Duration, timeout: Duration) -> Self {
Self {
duration,
timeout: Some(timeout),
}
}
pub fn step(
&self,
tx: Sender<Option<()>>,
final_timeout: Option<Instant>,
) -> Result<bool, ScheduleError> {
let next_check = Instant::now() + self.duration;
tx.send(Some(()))?;
if let Some(final_timeout) = final_timeout {
if next_check > final_timeout {
let until_final_timeout = final_timeout - Instant::now();
sleep(until_final_timeout);
return Ok(false);
}
}
let until_next_check = next_check - Instant::now();
sleep(until_next_check);
Ok(true)
}
}
impl Trigger for ScheduleTrigger {
fn listen(&self, tx: Sender<Option<()>>) -> Result<(), TriggerError> {
let final_timeout = self.timeout.map(|t| Instant::now() + t);
info!(
"Starting schedule in every {}.",
DurationString::new(self.duration)
);
loop {
let should_continue = self.step(tx.clone(), final_timeout)?;
if !should_continue {
break;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::triggers::TriggerError;
use std::{sync::mpsc, time::Instant};
#[test]
fn it_should_be_created_from_duration() {
let trigger = ScheduleTrigger::new(Duration::from_millis(100));
assert_eq!(Duration::from_millis(100), trigger.duration);
assert_eq!(None, trigger.timeout);
}
#[test]
fn it_should_be_created_from_duration_and_timeout() {
let trigger = ScheduleTrigger::new_with_timeout(
Duration::from_millis(100),
Duration::from_millis(200),
);
assert_eq!(Duration::from_millis(100), trigger.duration);
assert_eq!(Some(Duration::from_millis(200)), trigger.timeout);
}
#[test]
fn it_should_trigger_every_100_ms() -> Result<(), TriggerError> {
let trigger = ScheduleTrigger::new(Duration::from_millis(100));
let (tx, rx) = mpsc::channel::<Option<()>>();
for _ in 0..5 {
let start = Instant::now();
let should_continue = trigger.step(tx.clone(), None)?;
assert!(should_continue);
let _ = rx.recv().unwrap();
let diff = start.elapsed();
assert!(diff >= Duration::from_millis(95));
assert!(diff <= Duration::from_millis(105));
}
Ok(())
}
#[test]
fn it_should_not_continue_after_the_timeout() -> Result<(), TriggerError> {
let trigger = ScheduleTrigger::new(Duration::from_millis(100));
let (tx, _rx) = mpsc::channel::<Option<()>>();
let final_timeout = Instant::now() + Duration::from_millis(350);
for i in 0..5 {
let should_continue = trigger.step(tx.clone(), Some(final_timeout))?;
if i < 3 {
assert!(should_continue)
} else {
assert!(!should_continue)
};
if i == 3 {
assert!(final_timeout.elapsed() < Duration::from_millis(10));
}
}
Ok(())
}
#[test]
fn it_should_not_trigger_on_a_send_error() {
let trigger = ScheduleTrigger::new(Duration::from_millis(100));
let (tx, rx) = mpsc::channel::<Option<()>>();
drop(rx);
let final_timeout = Instant::now() + Duration::from_millis(350);
let result = trigger.step(tx.clone(), Some(final_timeout));
assert!(
matches!(result, Err(ScheduleError::ReceiverHangup(_)),),
"{result:?} should be ReceiverHangup"
);
}
}