1use super::{Error, LOG_TARGET};
16use async_trait::async_trait;
17use futures::future::join_all;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, Instant};
22use tokio::time::interval;
23use tracing::{error, info};
24
25fn duration_to_string(duration: Duration) -> String {
26 let secs = duration.as_secs_f64();
27 if secs < 60.0 {
28 format!("{secs:.1}s")
29 } else if secs < 3600.0 {
30 format!("{:.1}m", secs / 60.0)
31 } else if secs < 86400.0 {
32 format!("{:.1}h", secs / 3600.0)
33 } else {
34 format!("{:.1}d", secs / 86400.0)
35 }
36}
37
38#[async_trait]
40pub trait BackgroundTask: Sync + Send {
41 async fn execute(&self, count: u32) -> Result<bool, Error>;
51}
52
53pub struct BackgroundTaskService {
55 name: String,
56 count: AtomicU32,
57 tasks: Vec<(String, Box<dyn BackgroundTask>)>, interval: Duration,
59 immediately: bool,
60 initial_delay: Option<Duration>,
61}
62
63impl BackgroundTaskService {
64 pub fn new(
66 name: &str,
67 interval: Duration,
68 tasks: Vec<(String, Box<dyn BackgroundTask>)>,
69 ) -> Self {
70 Self {
71 name: name.to_string(),
72 count: AtomicU32::new(0),
73 tasks,
74 interval,
75 immediately: false,
76 initial_delay: None,
77 }
78 }
79 pub fn new_single(
81 name: &str,
82 interval: Duration,
83 task_name: &str,
84 task: Box<dyn BackgroundTask>,
85 ) -> Self {
86 Self::new(name, interval, vec![(task_name.to_string(), task)])
87 }
88 pub fn set_immediately(&mut self, immediately: bool) {
90 self.immediately = immediately;
91 }
92 pub fn set_initial_delay(&mut self, initial_delay: Option<Duration>) {
93 self.initial_delay = initial_delay;
94 }
95 pub fn add_task(&mut self, task_name: &str, task: Box<dyn BackgroundTask>) {
98 self.tasks.push((task_name.to_string(), task));
99 }
100 pub fn name(&self) -> String {
101 self.name.clone()
102 }
103}
104
105#[async_trait]
106impl BackgroundService for BackgroundTaskService {
107 async fn start(&self, mut shutdown: ShutdownWatch) {
108 let task_names: Vec<_> =
109 self.tasks.iter().map(|(name, _)| name.as_str()).collect();
110 info!(
111 target: LOG_TARGET,
112 name = self.name,
113 tasks = task_names.join(", "),
114 interval = duration_to_string(self.interval),
115 "background service is running",
116 );
117
118 if let Some(initial_delay) = self.initial_delay {
119 tokio::time::sleep(initial_delay).await;
120 }
121 let mut period = interval(self.interval);
122 if !self.immediately {
124 period.tick().await;
125 }
126
127 loop {
128 tokio::select! {
129 _ = shutdown.changed() => {
130 info!(
131 target: LOG_TARGET,
132 name = self.name,
133 "background service is shutting down"
134 );
135 break;
136 }
137 _ = period.tick() => {
138 let cycle_start = Instant::now();
139 let count = self.count.fetch_add(1, Ordering::Relaxed);
140
141 let futures = self.tasks.iter().map(|(task_name, task)| async move {
143 let task_start = Instant::now();
144 let result = task.execute(count).await;
145 (task_name, result, task_start.elapsed())
146 });
147
148 let results = join_all(futures).await;
150
151 let mut success_tasks = Vec::new();
152 let mut failed_tasks = Vec::new();
153
154 for (task_name, result, elapsed) in results {
156 match result {
157 Ok(true) => {
158 success_tasks.push(task_name.as_str());
159 info!(
160 target: LOG_TARGET,
161 name = self.name,
162 task = task_name,
163 elapsed = duration_to_string(elapsed),
164 "background task executed successfully"
165 );
166 }
167 Ok(false) => {
168 }
170 Err(e) => {
171 failed_tasks.push(task_name.as_str());
172 error!(
173 target: LOG_TARGET,
174 name = self.name,
175 task = task_name,
176 error = %e,
177 "background task failed"
178 );
179 }
180 }
181 }
182
183 if !success_tasks.is_empty() || !failed_tasks.is_empty() {
184 info!(
185 target: LOG_TARGET,
186 name = self.name,
187 cycle = count,
188 success_count = success_tasks.len(),
189 failed_count = failed_tasks.len(),
190 total_elapsed = duration_to_string(cycle_start.elapsed()),
191 "background service cycle completed",
192 );
193 }
194 }
195 }
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use async_trait::async_trait;
204 use pretty_assertions::assert_eq;
205
206 #[test]
207 fn test_duration_to_string() {
208 assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
209 assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
210 assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
211 assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
212 }
213
214 #[test]
215 fn new_background_task_service() {
216 struct TestTask {}
217 #[async_trait]
218 impl BackgroundTask for TestTask {
219 async fn execute(&self, _count: u32) -> Result<bool, Error> {
220 Ok(true)
221 }
222 }
223 let mut service = BackgroundTaskService::new(
224 "test",
225 Duration::from_secs(1),
226 vec![
227 ("task1".to_string(), Box::new(TestTask {})),
228 ("task2".to_string(), Box::new(TestTask {})),
229 ],
230 );
231 service.add_task("task3", Box::new(TestTask {}));
232
233 assert_eq!(service.name(), "test");
234 assert_eq!(service.tasks.len(), 3);
235 assert_eq!(service.tasks[0].0, "task1");
236 assert_eq!(service.tasks[1].0, "task2");
237 assert_eq!(service.tasks[2].0, "task3");
238 assert_eq!(false, service.immediately);
239
240 let mut service = BackgroundTaskService::new_single(
241 "test",
242 Duration::from_secs(1),
243 "task1",
244 Box::new(TestTask {}),
245 );
246 service.set_immediately(true);
247 assert_eq!(service.name(), "test");
248 assert_eq!(service.tasks.len(), 1);
249 assert_eq!(true, service.immediately);
250 }
251}