async-timeouts 0.3.2

An instrument to start async tasks after timeouts
Documentation
use std::{
    future::Future,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    time::Duration,
};
use tokio::{
    sync::mpsc::{channel, Sender},
    time,
};

#[derive(Debug)]
enum Msg {
    UpdateTerm(Duration),
    Stop,
}

/// See more information about [reset](Timeout::reset()), [restart](Timeout::restart()), [stop](Timeout::stop()) and [finished](Timeout::finished()) methods
#[derive(Debug, Clone, Default)]
pub struct Timeout {
    tx: Option<Sender<Msg>>,
    running: Arc<AtomicBool>,
}

impl Timeout {
    /// Schedule the task inside closure after timeout
    pub async fn set<T>(
        mut timeout: Duration,
        f: impl Future<Output = T> + Send + 'static,
    ) -> Self {
        let (tx, mut rx) = channel(1);
        let running = Arc::new(AtomicBool::new(true));
        let running_inner = Arc::clone(&running);
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = time::sleep(timeout) => {
                        f.await;
                        running_inner.store(false, Ordering::Release);
                        break;
                    },
                    Some(msg) = rx.recv() => {
                        match msg {
                            Msg::UpdateTerm(duration) => {
                                timeout = duration;
                                continue;
                            },
                            Msg::Stop => break,
                        }
                    },
                    else => {
                        running_inner.store(false, Ordering::Release);
                        break
                    },
                }
            }
        });
        Timeout {
            tx: Some(tx),
            running,
        }
    }

    /// Reset timeout with a new value, i.e. to delay execution of your task. It will return
    /// an error, if the timeout timer is not running. For that cases use [restart](Timeout::restart()) insteed.
    /// ## Example
    /// ```rust
    /// use std::time::{Duration, Instant};
    /// use async_timeouts::Timeout;
    /// use tokio::sync::Notify;
    /// use std::sync::Arc;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let event = Arc::new(Notify::new());
    ///     let event_cloned = event.clone();
    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
    ///         println!("This message will be printed after 6 seconds");
    ///         event_cloned.notify_one();
    ///     }).await;
    ///     let _ = delayed_task.reset(Duration::from_secs(6)).await;
    ///     event.notified().await;
    /// }
    /// ```
    pub async fn reset(&self, timeout: Duration) -> std::io::Result<()> {
        if let Some(timer) = self.tx.as_ref() {
            if let false = self.finished() {
                let _ = timer.send(Msg::UpdateTerm(timeout)).await;
                return Ok(());
            }
        }
        Err(std::io::Error::other(
            "Could not reset timeout: it is not running",
        ))
    }

    /// Restart timeout with a new or previous task. `Timeout` instance will be reused if the
    /// previous timer is over, or the previous task will be stoped ahead of time if not.
    /// ## Example
    /// ```rust
    /// use std::time::{Duration, Instant};
    /// use async_timeouts::Timeout;
    /// use async_channel::unbounded;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let timer = Instant::now();
    ///     let (task_tx, task_rx) = unbounded::<i32>();
    ///     
    ///     let mut delayed_task = {
    ///         let task_tx = task_tx.clone();
    ///         Timeout::set(Duration::from_secs(3), async move {
    ///             let _ = task_tx.send(1).await;
    ///         }).await
    ///     };
    ///     
    ///     delayed_task.restart(Duration::from_secs(6), async move {
    ///         let _ = task_tx.send(2).await;
    ///     }).await;
    ///     
    ///     if let Ok(msg) = task_rx.recv().await {
    ///         println!("Task is finished {msg}");
    ///     }
    ///     assert!(timer.elapsed().as_secs() >= 6);
    /// }
    /// ```
    pub async fn restart<T>(
        &mut self,
        timeout: Duration,
        f: impl Future<Output = T> + Send + 'static,
    ) {
        self.stop().await;
        self.tx = Timeout::set(timeout, f).await.tx;
        self.running.store(true, Ordering::Release);
    }

    /// Stop the timer before your task will be executed. The method will do nothig if you do not run
    /// timer inside `Timeout` by using `Timeout::default()`.
    /// ## Example
    /// ```rust
    /// use std::time::Duration;
    /// use async_timeouts::Timeout;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let mut delayed_task = Timeout::set(Duration::from_secs(3), async move {
    ///         println!("This message will never be printed and event will never be notified.");
    ///     }).await;
    ///     assert!(!delayed_task.finished());
    ///     delayed_task.stop().await;
    ///     assert!(delayed_task.finished());
    /// }
    /// ```
    pub async fn stop(&mut self) {
        if let Some(timer) = self.tx.as_ref() {
            let _ = timer.send(Msg::Stop).await;
            self.running.store(false, Ordering::Release);
        }
    }

    /// Check if the timer of the task is over or not.
    /// ## Example
    /// ```rust,no_run
    /// use std::time::Duration;
    /// use async_timeouts::Timeout;
    /// use tokio::sync::Notify;
    /// use std::sync::Arc;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let event = Arc::new(Notify::new());
    ///     
    ///     let event_cloned = event.clone();
    ///     let delayed_task = Timeout::set(Duration::from_secs(3), async move {
    ///         event_cloned.notify_one();
    ///     }).await;
    ///     
    ///     assert!(!delayed_task.finished());
    ///     println!("delayed_task is not finished yet: {}", delayed_task.finished());
    ///     event.notified().await;
    ///     assert!(delayed_task.finished());
    ///     println!("delayed_task is finished: {}", delayed_task.finished());
    /// }
    /// ```
    pub fn finished(&self) -> bool {
        match self.tx.as_ref().is_none() {
            true => true,
            false => match self.running.load(Ordering::Acquire) {
                true => false,
                false => true,
            },
        }
    }
}