persistent_scheduler/core/periodic.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
use crate::core::error::BoxError;
use crate::core::shutdown::shutdown_signal;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, info, warn};
#[derive(Default)]
pub struct PeriodicTask {
// Name of the periodic task.
name: String,
// A notification mechanism for shutdown signaling.
shutdown: Arc<RwLock<bool>>,
}
impl PeriodicTask {
/// Creates a new instance of `PeriodicTask`.
///
/// # Arguments
///
/// * `name`: A string slice that holds the name of the task.
///
/// # Returns
///
/// Returns a new instance of `PeriodicTask`.
pub fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
shutdown: Arc::new(RwLock::new(false)),
}
}
/// Sends a shutdown signal to the task.
///
/// This method notifies the task to stop executing.
pub async fn shutdown(self: Arc<Self>) {
let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state
*triggered = true; // Set the shutdown state to true
}
/// Starts the periodic task and sets up a signal handler for shutdown.
///
/// # Arguments
///
/// * `task`: An `Arc` of a function that returns a future to be executed periodically.
/// * `interval`: A `Duration` specifying how often the task should run.
///
/// # Type Parameters
///
/// * `F`: The type of the future returned by the task function.
///
/// # Requirements
///
/// The future must output a `Result` with a unit value or a `BoxError` on failure.
pub fn start_with_signal<F, P>(
self: Arc<Self>,
task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
param: Option<Arc<P>>,
interval: Duration,
) where
F: Future<Output = Result<(), BoxError>> + Send + 'static,
P: Send + Sync + 'static,
{
// Clone the periodic task instance for the task runner.
let task_clone = Arc::clone(&self);
let param_clone = param.clone();
let task_runner = async move {
// Run the task periodically.
task_clone.run(task.clone(), param_clone, interval).await;
};
// Clone the periodic task instance for the signal handler.
let signal_clone = Arc::clone(&self);
let signal_handler = async move {
// Listen for a shutdown signal (Ctrl+C).
shutdown_signal().await;
info!("Shutting down periodic task '{}'...", &self.name);
// Notify the task to shut down.
signal_clone.shutdown().await;
};
// Spawn the task runner and signal handler as asynchronous tasks.
tokio::spawn(task_runner);
tokio::spawn(signal_handler);
}
/// Runs the periodic task at the specified interval.
///
/// # Arguments
///
/// * `task`: An `Arc` of a function that returns a future to be executed.
/// * `interval`: A `Duration` specifying how often the task should run.
///
/// # Type Parameters
///
/// * `F`: The type of the future returned by the task function.
async fn run<F, P>(
self: Arc<Self>,
task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
param: Option<Arc<P>>,
interval: Duration,
) where
F: Future<Output = Result<(), BoxError>> + Send + 'static,
P: Send + Sync + 'static,
{
info!("task '{}' started", &self.name);
loop {
// Check if shutdown is triggered
let triggered = self.shutdown.read().await;
if *triggered {
break; // Exit loop if shutdown is triggered
}
let task_clone = Arc::clone(&task);
let param_clone = param.clone();
let task_future = tokio::spawn(async move {
task_clone(param_clone).await // Execute the task.
});
// Handle the result of the task execution.
match task_future.await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
warn!("task '{}' failed: {:?}", &self.name, e);
}
Err(e) if e.is_panic() => {
error!("Fatal: task '{}' encountered a panic.", &self.name);
}
Err(e) => {
error!("task '{}' failed unexpectedly: {:?}", &self.name, e);
}
}
sleep(interval).await;
}
info!("task '{}' stopped", &self.name);
}
}