persistent_scheduler/core/periodic.rs
1use crate::core::error::BoxError;
2use crate::core::shutdown::shutdown_signal;
3use std::{future::Future, sync::Arc, time::Duration};
4use tokio::sync::RwLock;
5use tokio::time::sleep;
6use tracing::{error, info, warn};
7
8#[derive(Default)]
9pub struct PeriodicTask {
10 // Name of the periodic task.
11 name: String,
12 // A notification mechanism for shutdown signaling.
13 shutdown: Arc<RwLock<bool>>,
14}
15
16impl PeriodicTask {
17 /// Creates a new instance of `PeriodicTask`.
18 ///
19 /// # Arguments
20 ///
21 /// * `name`: A string slice that holds the name of the task.
22 ///
23 /// # Returns
24 ///
25 /// Returns a new instance of `PeriodicTask`.
26 pub fn new(name: &str) -> Self {
27 Self {
28 name: name.to_owned(),
29 shutdown: Arc::new(RwLock::new(false)),
30 }
31 }
32
33 /// Sends a shutdown signal to the task.
34 ///
35 /// This method notifies the task to stop executing.
36 pub async fn shutdown(self: Arc<Self>) {
37 let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state
38 *triggered = true; // Set the shutdown state to true
39 }
40
41 /// Starts the periodic task and sets up a signal handler for shutdown.
42 ///
43 /// # Arguments
44 ///
45 /// * `task`: An `Arc` of a function that returns a future to be executed periodically.
46 /// * `interval`: A `Duration` specifying how often the task should run.
47 ///
48 /// # Type Parameters
49 ///
50 /// * `F`: The type of the future returned by the task function.
51 ///
52 /// # Requirements
53 ///
54 /// The future must output a `Result` with a unit value or a `BoxError` on failure.
55 pub fn start_with_signal<F, P>(
56 self: Arc<Self>,
57 task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
58 param: Option<Arc<P>>,
59 interval: Duration,
60 ) where
61 F: Future<Output = Result<(), BoxError>> + Send + 'static,
62 P: Send + Sync + 'static,
63 {
64 // Clone the periodic task instance for the task runner.
65 let task_clone = Arc::clone(&self);
66 let param_clone = param.clone();
67 let task_runner = async move {
68 // Run the task periodically.
69 task_clone.run(task.clone(), param_clone, interval).await;
70 };
71
72 // Clone the periodic task instance for the signal handler.
73 let signal_clone = Arc::clone(&self);
74 let signal_handler = async move {
75 // Listen for a shutdown signal (Ctrl+C).
76 shutdown_signal().await;
77 info!("Shutting down periodic task '{}'...", &self.name);
78 // Notify the task to shut down.
79 signal_clone.shutdown().await;
80 };
81
82 // Spawn the task runner and signal handler as asynchronous tasks.
83 tokio::spawn(task_runner);
84 tokio::spawn(signal_handler);
85 }
86
87 /// Runs the periodic task at the specified interval.
88 ///
89 /// # Arguments
90 ///
91 /// * `task`: An `Arc` of a function that returns a future to be executed.
92 /// * `interval`: A `Duration` specifying how often the task should run.
93 ///
94 /// # Type Parameters
95 ///
96 /// * `F`: The type of the future returned by the task function.
97 async fn run<F, P>(
98 self: Arc<Self>,
99 task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
100 param: Option<Arc<P>>,
101 interval: Duration,
102 ) where
103 F: Future<Output = Result<(), BoxError>> + Send + 'static,
104 P: Send + Sync + 'static,
105 {
106 info!("task '{}' started", &self.name);
107 loop {
108 // Check if shutdown is triggered
109 let triggered = self.shutdown.read().await;
110 if *triggered {
111 break; // Exit loop if shutdown is triggered
112 }
113
114 let task_clone = Arc::clone(&task);
115 let param_clone = param.clone();
116 let task_future = tokio::spawn(async move {
117 task_clone(param_clone).await // Execute the task.
118 });
119
120 // Handle the result of the task execution.
121 match task_future.await {
122 Ok(Ok(_)) => {}
123 Ok(Err(e)) => {
124 warn!("task '{}' failed: {:?}", &self.name, e);
125 }
126 Err(e) if e.is_panic() => {
127 error!("Fatal: task '{}' encountered a panic.", &self.name);
128 }
129 Err(e) => {
130 error!("task '{}' failed unexpectedly: {:?}", &self.name, e);
131 }
132 }
133 sleep(interval).await;
134 }
135 info!("task '{}' stopped", &self.name);
136 }
137}