persistent_scheduler/core/
periodic.rs

1use crate::core::error::BoxError;
2use crate::core::shutdown::shutdown_signal;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::sync::RwLock;
5use tokio::time::sleep;
6use tracing::{error, info, warn};
7
8#[derive(Default)]
9pub struct PeriodicTask {
10    // Name of the periodic task.
11    name: String,
12    // A notification mechanism for shutdown signaling.
13    shutdown: Arc<RwLock<bool>>,
14}
15
16impl PeriodicTask {
17    /// Creates a new instance of `PeriodicTask`.
18    ///
19    /// # Arguments
20    ///
21    /// * `name`: A string slice that holds the name of the task.
22    ///
23    /// # Returns
24    ///
25    /// Returns a new instance of `PeriodicTask`.
26    pub fn new(name: &str) -> Self {
27        Self {
28            name: name.to_owned(),
29            shutdown: Arc::new(RwLock::new(false)),
30        }
31    }
32
33    /// Sends a shutdown signal to the task.
34    ///
35    /// This method notifies the task to stop executing.
36    pub async fn shutdown(self: Arc<Self>) {
37        let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state
38        *triggered = true; // Set the shutdown state to true
39    }
40
41    /// Starts the periodic task and sets up a signal handler for shutdown.
42    ///
43    /// # Arguments
44    ///
45    /// * `task`: An `Arc` of a function that returns a future to be executed periodically.
46    /// * `interval`: A `Duration` specifying how often the task should run.
47    ///
48    /// # Type Parameters
49    ///
50    /// * `F`: The type of the future returned by the task function.
51    ///
52    /// # Requirements
53    ///
54    /// The future must output a `Result` with a unit value or a `BoxError` on failure.
55    pub fn start_with_signal<F, P>(
56        self: Arc<Self>,
57        task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
58        param: Option<Arc<P>>,
59        interval: Duration,
60    ) where
61        F: Future<Output = Result<(), BoxError>> + Send + 'static,
62        P: Send + Sync + 'static,
63    {
64        // Clone the periodic task instance for the task runner.
65        let task_clone = Arc::clone(&self);
66        let param_clone = param.clone();
67        let task_runner = async move {
68            // Run the task periodically.
69            task_clone.run(task.clone(), param_clone, interval).await;
70        };
71
72        // Clone the periodic task instance for the signal handler.
73        let signal_clone = Arc::clone(&self);
74        let signal_handler = async move {
75            // Listen for a shutdown signal (Ctrl+C).
76            shutdown_signal().await;
77            info!("Shutting down periodic task '{}'...", &self.name);
78            // Notify the task to shut down.
79            signal_clone.shutdown().await;
80        };
81
82        // Spawn the task runner and signal handler as asynchronous tasks.
83        tokio::spawn(task_runner);
84        tokio::spawn(signal_handler);
85    }
86
87    /// Runs the periodic task at the specified interval.
88    ///
89    /// # Arguments
90    ///
91    /// * `task`: An `Arc` of a function that returns a future to be executed.
92    /// * `interval`: A `Duration` specifying how often the task should run.
93    ///
94    /// # Type Parameters
95    ///
96    /// * `F`: The type of the future returned by the task function.
97    async fn run<F, P>(
98        self: Arc<Self>,
99        task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
100        param: Option<Arc<P>>,
101        interval: Duration,
102    ) where
103        F: Future<Output = Result<(), BoxError>> + Send + 'static,
104        P: Send + Sync + 'static,
105    {
106        info!("task '{}' started", &self.name);
107        loop {
108            // Check if shutdown is triggered
109            let triggered = self.shutdown.read().await;
110            if *triggered {
111                break; // Exit loop if shutdown is triggered
112            }
113
114            let task_clone = Arc::clone(&task);
115            let param_clone = param.clone();
116            let task_future = tokio::spawn(async move {
117                task_clone(param_clone).await // Execute the task.
118            });
119
120            // Handle the result of the task execution.
121            match task_future.await {
122                Ok(Ok(_)) => {}
123                Ok(Err(e)) => {
124                    warn!("task '{}' failed: {:?}", &self.name, e);
125                }
126                Err(e) if e.is_panic() => {
127                    error!("Fatal: task '{}' encountered a panic.", &self.name);
128                }
129                Err(e) => {
130                    error!("task '{}' failed unexpectedly: {:?}", &self.name, e);
131                }
132            }
133            sleep(interval).await;
134        }
135        info!("task '{}' stopped", &self.name);
136    }
137}