Skip to main content

pingap_core/
service.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, LOG_TARGET};
16use async_trait::async_trait;
17use futures::future::join_all;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, Instant};
22use tokio::time::interval;
23use tracing::{error, info};
24
25fn duration_to_string(duration: Duration) -> String {
26    let secs = duration.as_secs_f64();
27    if secs < 60.0 {
28        format!("{secs:.1}s")
29    } else if secs < 3600.0 {
30        format!("{:.1}m", secs / 60.0)
31    } else if secs < 86400.0 {
32        format!("{:.1}h", secs / 3600.0)
33    } else {
34        format!("{:.1}d", secs / 86400.0)
35    }
36}
37
38/// A unified trait for any task that can be run in the background.
39#[async_trait]
40pub trait BackgroundTask: Sync + Send {
41    /// Executes a single iteration of the task.
42    ///
43    /// # Arguments
44    /// * `count` - The current execution cycle number.
45    ///
46    /// # Returns
47    /// * `Ok(true)` if the task performed meaningful work and should be logged as "success".
48    /// * `Ok(false)` if the task was skipped or did no work.
49    /// * `Err(Error)` if the task failed.
50    async fn execute(&self, count: u32) -> Result<bool, Error>;
51}
52
53/// A unified background service runner that can handle one or more named tasks.
54pub struct BackgroundTaskService {
55    name: String,
56    count: AtomicU32,
57    tasks: Vec<(String, Box<dyn BackgroundTask>)>, // Holds named tasks
58    interval: Duration,
59    immediately: bool,
60    initial_delay: Option<Duration>,
61}
62
63impl BackgroundTaskService {
64    /// Creates a new service to run multiple background tasks.
65    pub fn new(
66        name: &str,
67        interval: Duration,
68        tasks: Vec<(String, Box<dyn BackgroundTask>)>,
69    ) -> Self {
70        Self {
71            name: name.to_string(),
72            count: AtomicU32::new(0),
73            tasks,
74            interval,
75            immediately: false,
76            initial_delay: None,
77        }
78    }
79    /// A convenience constructor for creating a service with a single task.
80    pub fn new_single(
81        name: &str,
82        interval: Duration,
83        task_name: &str,
84        task: Box<dyn BackgroundTask>,
85    ) -> Self {
86        Self::new(name, interval, vec![(task_name.to_string(), task)])
87    }
88    /// Set whether the service should run immediately or wait for the interval
89    pub fn set_immediately(&mut self, immediately: bool) {
90        self.immediately = immediately;
91    }
92    pub fn set_initial_delay(&mut self, initial_delay: Option<Duration>) {
93        self.initial_delay = initial_delay;
94    }
95    /// Add a task to the service
96    /// This is useful for adding tasks to the service after it has been created
97    pub fn add_task(&mut self, task_name: &str, task: Box<dyn BackgroundTask>) {
98        self.tasks.push((task_name.to_string(), task));
99    }
100    pub fn name(&self) -> String {
101        self.name.clone()
102    }
103}
104
105#[async_trait]
106impl BackgroundService for BackgroundTaskService {
107    async fn start(&self, mut shutdown: ShutdownWatch) {
108        let task_names: Vec<_> =
109            self.tasks.iter().map(|(name, _)| name.as_str()).collect();
110        info!(
111            target: LOG_TARGET,
112            name = self.name,
113            tasks = task_names.join(", "),
114            interval = duration_to_string(self.interval),
115            "background service is running",
116        );
117
118        if let Some(initial_delay) = self.initial_delay {
119            tokio::time::sleep(initial_delay).await;
120        }
121        let mut period = interval(self.interval);
122        // The first tick fires immediately, which is often not desired. We skip it.
123        if !self.immediately {
124            period.tick().await;
125        }
126
127        loop {
128            tokio::select! {
129                _ = shutdown.changed() => {
130                    info!(
131                        target: LOG_TARGET,
132                        name = self.name,
133                        "background service is shutting down"
134                    );
135                    break;
136                }
137                _ = period.tick() => {
138                    let cycle_start = Instant::now();
139                    let count = self.count.fetch_add(1, Ordering::Relaxed);
140
141                    // Create a collection of futures to run all tasks concurrently.
142                    let futures = self.tasks.iter().map(|(task_name, task)| async move {
143                        let task_start = Instant::now();
144                        let result = task.execute(count).await;
145                        (task_name, result, task_start.elapsed())
146                    });
147
148                    // Await all tasks to complete in parallel.
149                    let results = join_all(futures).await;
150
151                    let mut success_tasks = Vec::new();
152                    let mut failed_tasks = Vec::new();
153
154                    // Process results for logging.
155                    for (task_name, result, elapsed) in results {
156                        match result {
157                            Ok(true) => {
158                                success_tasks.push(task_name.as_str());
159                                info!(
160                                    target: LOG_TARGET,
161                                    name = self.name,
162                                    task = task_name,
163                                    elapsed = duration_to_string(elapsed),
164                                    "background task executed successfully"
165                                );
166                            }
167                            Ok(false) => {
168                                // Task was skipped, do nothing.
169                            }
170                            Err(e) => {
171                                failed_tasks.push(task_name.as_str());
172                                error!(
173                                    target: LOG_TARGET,
174                                    name = self.name,
175                                    task = task_name,
176                                    error = %e,
177                                    "background task failed"
178                                );
179                            }
180                        }
181                    }
182
183                    if !success_tasks.is_empty() || !failed_tasks.is_empty() {
184                         info!(
185                            target: LOG_TARGET,
186                            name = self.name,
187                            cycle = count,
188                            success_count = success_tasks.len(),
189                            failed_count = failed_tasks.len(),
190                            total_elapsed = duration_to_string(cycle_start.elapsed()),
191                            "background service cycle completed",
192                        );
193                    }
194                }
195            }
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use async_trait::async_trait;
204    use pretty_assertions::assert_eq;
205
206    #[test]
207    fn test_duration_to_string() {
208        assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
209        assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
210        assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
211        assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
212    }
213
214    #[test]
215    fn new_background_task_service() {
216        struct TestTask {}
217        #[async_trait]
218        impl BackgroundTask for TestTask {
219            async fn execute(&self, _count: u32) -> Result<bool, Error> {
220                Ok(true)
221            }
222        }
223        let mut service = BackgroundTaskService::new(
224            "test",
225            Duration::from_secs(1),
226            vec![
227                ("task1".to_string(), Box::new(TestTask {})),
228                ("task2".to_string(), Box::new(TestTask {})),
229            ],
230        );
231        service.add_task("task3", Box::new(TestTask {}));
232
233        assert_eq!(service.name(), "test");
234        assert_eq!(service.tasks.len(), 3);
235        assert_eq!(service.tasks[0].0, "task1");
236        assert_eq!(service.tasks[1].0, "task2");
237        assert_eq!(service.tasks[2].0, "task3");
238        assert_eq!(false, service.immediately);
239
240        let mut service = BackgroundTaskService::new_single(
241            "test",
242            Duration::from_secs(1),
243            "task1",
244            Box::new(TestTask {}),
245        );
246        service.set_immediately(true);
247        assert_eq!(service.name(), "test");
248        assert_eq!(service.tasks.len(), 1);
249        assert_eq!(true, service.immediately);
250    }
251}