persistent_scheduler/core/
periodic.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use crate::core::error::BoxError;
use crate::core::shutdown::shutdown_signal;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, info, warn};

#[derive(Default)]
pub struct PeriodicTask {
    // Name of the periodic task.
    name: String,
    // A notification mechanism for shutdown signaling.
    shutdown: Arc<RwLock<bool>>,
}

impl PeriodicTask {
    /// Creates a new instance of `PeriodicTask`.
    ///
    /// # Arguments
    ///
    /// * `name`: A string slice that holds the name of the task.
    ///
    /// # Returns
    ///
    /// Returns a new instance of `PeriodicTask`.
    pub fn new(name: &str) -> Self {
        Self {
            name: name.to_owned(),
            shutdown: Arc::new(RwLock::new(false)),
        }
    }

    /// Sends a shutdown signal to the task.
    ///
    /// This method notifies the task to stop executing.
    pub async fn shutdown(self: Arc<Self>) {
        let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state
        *triggered = true; // Set the shutdown state to true
    }

    /// Starts the periodic task and sets up a signal handler for shutdown.
    ///
    /// # Arguments
    ///
    /// * `task`: An `Arc` of a function that returns a future to be executed periodically.
    /// * `interval`: A `Duration` specifying how often the task should run.
    ///
    /// # Type Parameters
    ///
    /// * `F`: The type of the future returned by the task function.
    ///
    /// # Requirements
    ///
    /// The future must output a `Result` with a unit value or a `BoxError` on failure.
    pub fn start_with_signal<F, P>(
        self: Arc<Self>,
        task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
        param: Option<Arc<P>>,
        interval: Duration,
    ) where
        F: Future<Output = Result<(), BoxError>> + Send + 'static,
        P: Send + Sync + 'static,
    {
        // Clone the periodic task instance for the task runner.
        let task_clone = Arc::clone(&self);
        let param_clone = param.clone();
        let task_runner = async move {
            // Run the task periodically.
            task_clone.run(task.clone(), param_clone, interval).await;
        };

        // Clone the periodic task instance for the signal handler.
        let signal_clone = Arc::clone(&self);
        let signal_handler = async move {
            // Listen for a shutdown signal (Ctrl+C).
            shutdown_signal().await;
            info!("Shutting down periodic task '{}'...", &self.name);
            // Notify the task to shut down.
            signal_clone.shutdown().await;
        };

        // Spawn the task runner and signal handler as asynchronous tasks.
        tokio::spawn(task_runner);
        tokio::spawn(signal_handler);
    }

    /// Runs the periodic task at the specified interval.
    ///
    /// # Arguments
    ///
    /// * `task`: An `Arc` of a function that returns a future to be executed.
    /// * `interval`: A `Duration` specifying how often the task should run.
    ///
    /// # Type Parameters
    ///
    /// * `F`: The type of the future returned by the task function.
    async fn run<F, P>(
        self: Arc<Self>,
        task: Arc<dyn Fn(Option<Arc<P>>) -> F + Send + Sync>,
        param: Option<Arc<P>>,
        interval: Duration,
    ) where
        F: Future<Output = Result<(), BoxError>> + Send + 'static,
        P: Send + Sync + 'static,
    {
        info!("task '{}' started", &self.name);
        loop {
            // Check if shutdown is triggered
            let triggered = self.shutdown.read().await;
            if *triggered {
                break; // Exit loop if shutdown is triggered
            }

            let task_clone = Arc::clone(&task);
            let param_clone = param.clone();
            let task_future = tokio::spawn(async move {
                task_clone(param_clone).await // Execute the task.
            });

            // Handle the result of the task execution.
            match task_future.await {
                Ok(Ok(_)) => {}
                Ok(Err(e)) => {
                    warn!("task '{}' failed: {:?}", &self.name, e);
                }
                Err(e) if e.is_panic() => {
                    error!("Fatal: task '{}' encountered a panic.", &self.name);
                }
                Err(e) => {
                    error!("task '{}' failed unexpectedly: {:?}", &self.name, e);
                }
            }
            sleep(interval).await;
        }
        info!("task '{}' stopped", &self.name);
    }
}