pingap_core/service.rs
1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, LOG_CATEGORY};
16use async_trait::async_trait;
17use futures::future::BoxFuture;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, SystemTime};
22use tokio::time::interval;
23use tracing::{error, info};
24
25// Type alias for a boxed future that represents a background task
26// Takes a u32 counter and returns Result<bool, Error>
27pub type SimpleServiceTaskFuture =
28 Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, Error>> + Sync + Send>;
29
30// Represents a collection of background tasks that run periodically
31pub struct SimpleServiceTask {
32 name: String, // Name identifier for the service
33 count: AtomicU32, // Counter for tracking task executions
34 tasks: Vec<(String, SimpleServiceTaskFuture)>, // List of named tasks to execute
35 interval: Duration, // Time between task executions
36}
37
38/// Creates a new SimpleServiceTask with the specified name, interval, and collection of tasks.
39/// This service manages multiple background tasks that run concurrently at fixed intervals.
40///
41/// # Arguments
42/// * `name` - Identifier for this service instance, used in logging
43/// * `interval` - Duration between task executions (e.g., Duration::from_secs(60) for minute intervals)
44/// * `tasks` - Vector of named tasks to execute periodically, where each task is a tuple of (name, task_function)
45pub fn new_simple_service_task(
46 name: &str,
47 interval: Duration,
48 tasks: Vec<(String, SimpleServiceTaskFuture)>,
49) -> SimpleServiceTask {
50 SimpleServiceTask {
51 name: name.to_string(),
52 count: AtomicU32::new(0),
53 tasks,
54 interval,
55 }
56}
57
58fn duration_to_string(duration: Duration) -> String {
59 let secs = duration.as_secs_f64();
60
61 if secs < 60.0 {
62 format!("{secs:.1}s")
63 } else if secs < 3600.0 {
64 format!("{:.1}m", secs / 60.0)
65 } else if secs < 86400.0 {
66 format!("{:.1}h", secs / 3600.0)
67 } else {
68 format!("{:.1}d", secs / 86400.0)
69 }
70}
71
72#[async_trait]
73impl BackgroundService for SimpleServiceTask {
74 /// Starts the background service, executing all tasks at the specified interval
75 /// until shutdown is signaled or tasks complete. Each task execution is logged
76 /// with timing information and success/failure status.
77 ///
78 /// # Arguments
79 /// * `shutdown` - Watch channel for shutdown coordination
80 ///
81 /// # Task Execution
82 /// - Tasks are executed sequentially in the order they were added
83 /// - Each task receives a counter value that increments with each interval
84 /// - Failed tasks are logged with error details but don't stop the service
85 /// - Task execution times are logged for monitoring purposes
86 ///
87 /// # Shutdown Behavior
88 /// - Service stops gracefully when shutdown signal is received
89 /// - Current task iteration completes before shutdown
90 async fn start(&self, mut shutdown: ShutdownWatch) {
91 let period_human = duration_to_string(self.interval);
92 let task_names: Vec<String> =
93 self.tasks.iter().map(|item| item.0.clone()).collect();
94 info!(
95 category = LOG_CATEGORY,
96 name = self.name,
97 tasks = task_names.join(","),
98 interval = period_human.to_string(),
99 "background service is running",
100 );
101
102 let mut period = interval(self.interval);
103 loop {
104 tokio::select! {
105 // Handle shutdown signal
106 _ = shutdown.changed() => {
107 break;
108 }
109 // Execute tasks on each interval tick
110 _ = period.tick() => {
111 let now = SystemTime::now();
112 let count = self.count.fetch_add(1, Ordering::Relaxed);
113 let mut success_tasks = vec![];
114 let mut fail_tasks = vec![];
115 // Execute each task and track results
116 for (task_name, task) in self.tasks.iter() {
117 let task_start = SystemTime::now();
118 match task(count).await {
119 Err(e) => {
120 fail_tasks.push(task_name.to_string());
121 error!(
122 category = LOG_CATEGORY,
123 name = self.name,
124 task = task_name,
125 error = %e,
126 );
127 }
128 Ok(executed) => {
129 if executed {
130 success_tasks.push(task_name.to_string());
131 info!(
132 category = LOG_CATEGORY,
133 name = self.name,
134 task = task_name,
135 elapsed = format!(
136 "{}ms",
137 task_start.elapsed().unwrap_or_default().as_millis()
138 ),
139 );
140 }
141 }
142 };
143 }
144 if !success_tasks.is_empty() || !fail_tasks.is_empty() {
145 info!(
146 category = LOG_CATEGORY,
147 name = self.name,
148 success_tasks = success_tasks.join(","),
149 fails = fail_tasks.len(),
150 fail_tasks = fail_tasks.join(","),
151 elapsed = format!(
152 "{}ms",
153 now.elapsed().unwrap_or_default().as_millis()
154 ),
155 );
156 }
157 }
158 }
159 }
160 }
161}
162
163// Trait defining interface for individual service tasks
164#[async_trait]
165pub trait ServiceTask: Sync + Send {
166 /// Executes a single iteration of the task. This method is called repeatedly
167 /// at the specified interval until shutdown or task completion.
168 ///
169 /// # Returns
170 /// * `None` or `Some(false)` - Task completed normally, continue running the service
171 /// * `Some(true)` - Task completed and requests service shutdown
172 async fn run(&self) -> Option<bool>;
173
174 /// Returns a human-readable description of the task for logging and monitoring.
175 /// Implementations should provide meaningful descriptions of their purpose.
176 ///
177 /// # Returns
178 /// * String describing the task's purpose, default is "unknown"
179 fn description(&self) -> String {
180 "unknown".to_string()
181 }
182}
183
184// Wrapper for individual ServiceTask implementations
185pub struct CommonServiceTask {
186 task: Box<dyn ServiceTask>, // The actual task to execute
187 interval: Duration, // Time between executions
188}
189
190impl CommonServiceTask {
191 /// Creates a new CommonServiceTask that wraps a single task implementation.
192 /// This is useful for simpler cases where only one recurring task is needed.
193 ///
194 /// # Arguments
195 /// * `interval` - Duration between task executions
196 /// * `task` - Implementation of ServiceTask to execute
197 ///
198 /// # Special Cases
199 /// - If interval is less than 1 second, task runs only once
200 /// - Task can signal completion via return value to stop service
201 pub fn new(interval: Duration, task: impl ServiceTask + 'static) -> Self {
202 Self {
203 task: Box::new(task),
204 interval,
205 }
206 }
207}
208
209#[async_trait]
210impl BackgroundService for CommonServiceTask {
211 /// Starts the background service, executing the wrapped task at the specified interval.
212 /// The service runs until one of the following conditions is met:
213 /// - Shutdown signal is received
214 /// - Task returns Some(true) indicating completion
215 /// - Interval is less than 1 second (runs once and stops)
216 ///
217 /// # Arguments
218 /// * `shutdown` - Watch channel for shutdown coordination
219 ///
220 /// # Logging
221 /// - Service start is logged with task description and interval
222 /// - Each task execution is logged with elapsed time
223 /// - Task completion status is logged
224 ///
225 /// # Performance Considerations
226 /// - Task execution time is measured and logged
227 /// - Long-running tasks may delay the next interval
228 async fn start(&self, mut shutdown: ShutdownWatch) {
229 let period_human = duration_to_string(self.interval);
230 // if interval is less than 1s
231 // the task should only run once
232 let once = self.interval.as_millis() < 1000;
233
234 info!(
235 category = LOG_CATEGORY,
236 description = self.task.description(),
237 interval = period_human.to_string(),
238 "background service is running",
239 );
240
241 let mut period = interval(self.interval);
242 loop {
243 tokio::select! {
244 _ = shutdown.changed() => {
245 break;
246 }
247 _ = period.tick() => {
248 let now = SystemTime::now();
249 let done = self.task.run().await.unwrap_or_default();
250 info!(
251 category = LOG_CATEGORY,
252 done,
253 elapsed = format!(
254 "{}ms",
255 now.elapsed().unwrap_or_default().as_millis()
256 ),
257 description = self.task.description(),
258 );
259 if once || done {
260 break;
261 }
262 }
263 }
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use pretty_assertions::assert_eq;
272
273 #[test]
274 fn test_simple_service_task() {
275 let task =
276 new_simple_service_task("test", Duration::from_secs(1), vec![]);
277
278 assert_eq!(task.name, "test");
279 assert_eq!(task.interval, Duration::from_secs(1));
280 assert_eq!(task.tasks.len(), 0);
281 }
282
283 #[test]
284 fn test_duration_to_string() {
285 assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
286 assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
287 assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
288 assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
289 }
290}