pingap_core/
service.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, LOG_CATEGORY};
16use async_trait::async_trait;
17use futures::future::BoxFuture;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, SystemTime};
22use tokio::time::interval;
23use tracing::{error, info};
24
25// Type alias for a boxed future that represents a background task
26// Takes a u32 counter and returns Result<bool, Error>
27pub type SimpleServiceTaskFuture =
28    Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, Error>> + Sync + Send>;
29
30// Represents a collection of background tasks that run periodically
31pub struct SimpleServiceTask {
32    name: String,     // Name identifier for the service
33    count: AtomicU32, // Counter for tracking task executions
34    tasks: Vec<(String, SimpleServiceTaskFuture)>, // List of named tasks to execute
35    interval: Duration, // Time between task executions
36}
37
38/// Creates a new SimpleServiceTask with the specified name, interval, and collection of tasks.
39/// This service manages multiple background tasks that run concurrently at fixed intervals.
40///
41/// # Arguments
42/// * `name` - Identifier for this service instance, used in logging
43/// * `interval` - Duration between task executions (e.g., Duration::from_secs(60) for minute intervals)
44/// * `tasks` - Vector of named tasks to execute periodically, where each task is a tuple of (name, task_function)
45pub fn new_simple_service_task(
46    name: &str,
47    interval: Duration,
48    tasks: Vec<(String, SimpleServiceTaskFuture)>,
49) -> SimpleServiceTask {
50    SimpleServiceTask {
51        name: name.to_string(),
52        count: AtomicU32::new(0),
53        tasks,
54        interval,
55    }
56}
57
58fn duration_to_string(duration: Duration) -> String {
59    let secs = duration.as_secs_f64();
60    if secs < 60.0 {
61        format!("{:.1}s", secs)
62    } else if secs < 3600.0 {
63        format!("{:.1}m", secs / 60.0)
64    } else if secs < 86400.0 {
65        format!("{:.1}h", secs / 3600.0)
66    } else {
67        format!("{:.1}d", secs / 86400.0)
68    }
69}
70
71#[async_trait]
72impl BackgroundService for SimpleServiceTask {
73    /// Starts the background service, executing all tasks at the specified interval
74    /// until shutdown is signaled or tasks complete. Each task execution is logged
75    /// with timing information and success/failure status.
76    ///
77    /// # Arguments
78    /// * `shutdown` - Watch channel for shutdown coordination
79    ///
80    /// # Task Execution
81    /// - Tasks are executed sequentially in the order they were added
82    /// - Each task receives a counter value that increments with each interval
83    /// - Failed tasks are logged with error details but don't stop the service
84    /// - Task execution times are logged for monitoring purposes
85    ///
86    /// # Shutdown Behavior
87    /// - Service stops gracefully when shutdown signal is received
88    /// - Current task iteration completes before shutdown
89    async fn start(&self, mut shutdown: ShutdownWatch) {
90        let period_human = duration_to_string(self.interval);
91        let task_names: Vec<String> =
92            self.tasks.iter().map(|item| item.0.clone()).collect();
93        info!(
94            category = LOG_CATEGORY,
95            name = self.name,
96            tasks = task_names.join(","),
97            interval = period_human.to_string(),
98            "background service is running",
99        );
100
101        let mut period = interval(self.interval);
102        loop {
103            tokio::select! {
104                // Handle shutdown signal
105                _ = shutdown.changed() => {
106                    break;
107                }
108                // Execute tasks on each interval tick
109                _ = period.tick() => {
110                    let now = SystemTime::now();
111                    let count = self.count.fetch_add(1, Ordering::Relaxed);
112                    let mut success_tasks = vec![];
113                    let mut fail_tasks = vec![];
114                    // Execute each task and track results
115                    for (task_name, task) in self.tasks.iter() {
116                        let task_start = SystemTime::now();
117                        match task(count).await {
118                           Err(e)  => {
119                               fail_tasks.push(task_name.to_string());
120                               error!(
121                                   category = LOG_CATEGORY,
122                                   name = self.name,
123                                   task = task_name,
124                                   error = %e,
125                               );
126                           }
127                           Ok(executed) => {
128                               if executed {
129                                   success_tasks.push(task_name.to_string());
130                                   info!(
131                                       category = LOG_CATEGORY,
132                                       name = self.name,
133                                       task = task_name,
134                                       elapsed = format!(
135                                           "{}ms",
136                                           task_start.elapsed().unwrap_or_default().as_millis()
137                                       ),
138                                   );
139                               }
140                           }
141                        };
142                    }
143                    if !success_tasks.is_empty() || !fail_tasks.is_empty() {
144                        info!(
145                            category = LOG_CATEGORY,
146                            name = self.name,
147                            success_tasks = success_tasks.join(","),
148                            fails = fail_tasks.len(),
149                            fail_tasks = fail_tasks.join(","),
150                            elapsed = format!(
151                                "{}ms",
152                                now.elapsed().unwrap_or_default().as_millis()
153                            ),
154                        );
155                    }
156                }
157            }
158        }
159    }
160}
161
162// Trait defining interface for individual service tasks
163#[async_trait]
164pub trait ServiceTask: Sync + Send {
165    /// Executes a single iteration of the task. This method is called repeatedly
166    /// at the specified interval until shutdown or task completion.
167    ///
168    /// # Returns
169    /// * `None` or `Some(false)` - Task completed normally, continue running the service
170    /// * `Some(true)` - Task completed and requests service shutdown
171    async fn run(&self) -> Option<bool>;
172
173    /// Returns a human-readable description of the task for logging and monitoring.
174    /// Implementations should provide meaningful descriptions of their purpose.
175    ///
176    /// # Returns
177    /// * String describing the task's purpose, default is "unknown"
178    fn description(&self) -> String {
179        "unknown".to_string()
180    }
181}
182
183// Wrapper for individual ServiceTask implementations
184pub struct CommonServiceTask {
185    task: Box<dyn ServiceTask>, // The actual task to execute
186    interval: Duration,         // Time between executions
187}
188
189impl CommonServiceTask {
190    /// Creates a new CommonServiceTask that wraps a single task implementation.
191    /// This is useful for simpler cases where only one recurring task is needed.
192    ///
193    /// # Arguments
194    /// * `interval` - Duration between task executions
195    /// * `task` - Implementation of ServiceTask to execute
196    ///
197    /// # Special Cases
198    /// - If interval is less than 1 second, task runs only once
199    /// - Task can signal completion via return value to stop service
200    pub fn new(interval: Duration, task: impl ServiceTask + 'static) -> Self {
201        Self {
202            task: Box::new(task),
203            interval,
204        }
205    }
206}
207
208#[async_trait]
209impl BackgroundService for CommonServiceTask {
210    /// Starts the background service, executing the wrapped task at the specified interval.
211    /// The service runs until one of the following conditions is met:
212    /// - Shutdown signal is received
213    /// - Task returns Some(true) indicating completion
214    /// - Interval is less than 1 second (runs once and stops)
215    ///
216    /// # Arguments
217    /// * `shutdown` - Watch channel for shutdown coordination
218    ///
219    /// # Logging
220    /// - Service start is logged with task description and interval
221    /// - Each task execution is logged with elapsed time
222    /// - Task completion status is logged
223    ///
224    /// # Performance Considerations
225    /// - Task execution time is measured and logged
226    /// - Long-running tasks may delay the next interval
227    async fn start(&self, mut shutdown: ShutdownWatch) {
228        let period_human = duration_to_string(self.interval);
229        // if interval is less than 1s
230        // the task should only run once
231        let once = self.interval.as_millis() < 1000;
232
233        info!(
234            category = LOG_CATEGORY,
235            description = self.task.description(),
236            interval = period_human.to_string(),
237            "background service is running",
238        );
239
240        let mut period = interval(self.interval);
241        loop {
242            tokio::select! {
243                _ = shutdown.changed() => {
244                    break;
245                }
246                _ = period.tick() => {
247                    let now = SystemTime::now();
248                    let done = self.task.run().await.unwrap_or_default();
249                    info!(
250                        category = LOG_CATEGORY,
251                        done,
252                        elapsed = format!(
253                            "{}ms",
254                            now.elapsed().unwrap_or_default().as_millis()
255                        ),
256                        description = self.task.description(),
257                    );
258                    if once || done {
259                        break;
260                    }
261                }
262            }
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use pretty_assertions::assert_eq;
271
272    #[test]
273    fn test_simple_service_task() {
274        let task =
275            new_simple_service_task("test", Duration::from_secs(1), vec![]);
276
277        assert_eq!(task.name, "test");
278        assert_eq!(task.interval, Duration::from_secs(1));
279        assert_eq!(task.tasks.len(), 0);
280    }
281
282    #[test]
283    fn test_duration_to_string() {
284        assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
285        assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
286        assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
287        assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
288    }
289}