pingap_core/service.rs
1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, LOG_CATEGORY};
16use async_trait::async_trait;
17use futures::future::BoxFuture;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, SystemTime};
22use tokio::time::interval;
23use tracing::{error, info};
24
25// Type alias for a boxed future that represents a background task
26// Takes a u32 counter and returns Result<bool, Error>
27pub type SimpleServiceTaskFuture =
28 Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, Error>> + Sync + Send>;
29
30// Represents a collection of background tasks that run periodically
31pub struct SimpleServiceTask {
32 name: String, // Name identifier for the service
33 count: AtomicU32, // Counter for tracking task executions
34 tasks: Vec<(String, SimpleServiceTaskFuture)>, // List of named tasks to execute
35 interval: Duration, // Time between task executions
36}
37
38/// Creates a new SimpleServiceTask with the specified name, interval, and collection of tasks.
39/// This service manages multiple background tasks that run concurrently at fixed intervals.
40///
41/// # Arguments
42/// * `name` - Identifier for this service instance, used in logging
43/// * `interval` - Duration between task executions (e.g., Duration::from_secs(60) for minute intervals)
44/// * `tasks` - Vector of named tasks to execute periodically, where each task is a tuple of (name, task_function)
45pub fn new_simple_service_task(
46 name: &str,
47 interval: Duration,
48 tasks: Vec<(String, SimpleServiceTaskFuture)>,
49) -> SimpleServiceTask {
50 SimpleServiceTask {
51 name: name.to_string(),
52 count: AtomicU32::new(0),
53 tasks,
54 interval,
55 }
56}
57
58fn duration_to_string(duration: Duration) -> String {
59 let secs = duration.as_secs_f64();
60 if secs < 60.0 {
61 format!("{:.1}s", secs)
62 } else if secs < 3600.0 {
63 format!("{:.1}m", secs / 60.0)
64 } else if secs < 86400.0 {
65 format!("{:.1}h", secs / 3600.0)
66 } else {
67 format!("{:.1}d", secs / 86400.0)
68 }
69}
70
71#[async_trait]
72impl BackgroundService for SimpleServiceTask {
73 /// Starts the background service, executing all tasks at the specified interval
74 /// until shutdown is signaled or tasks complete. Each task execution is logged
75 /// with timing information and success/failure status.
76 ///
77 /// # Arguments
78 /// * `shutdown` - Watch channel for shutdown coordination
79 ///
80 /// # Task Execution
81 /// - Tasks are executed sequentially in the order they were added
82 /// - Each task receives a counter value that increments with each interval
83 /// - Failed tasks are logged with error details but don't stop the service
84 /// - Task execution times are logged for monitoring purposes
85 ///
86 /// # Shutdown Behavior
87 /// - Service stops gracefully when shutdown signal is received
88 /// - Current task iteration completes before shutdown
89 async fn start(&self, mut shutdown: ShutdownWatch) {
90 let period_human = duration_to_string(self.interval);
91 let task_names: Vec<String> =
92 self.tasks.iter().map(|item| item.0.clone()).collect();
93 info!(
94 category = LOG_CATEGORY,
95 name = self.name,
96 tasks = task_names.join(","),
97 interval = period_human.to_string(),
98 "background service is running",
99 );
100
101 let mut period = interval(self.interval);
102 loop {
103 tokio::select! {
104 // Handle shutdown signal
105 _ = shutdown.changed() => {
106 break;
107 }
108 // Execute tasks on each interval tick
109 _ = period.tick() => {
110 let now = SystemTime::now();
111 let count = self.count.fetch_add(1, Ordering::Relaxed);
112 let mut success_tasks = vec![];
113 let mut fail_tasks = vec![];
114 // Execute each task and track results
115 for (task_name, task) in self.tasks.iter() {
116 let task_start = SystemTime::now();
117 match task(count).await {
118 Err(e) => {
119 fail_tasks.push(task_name.to_string());
120 error!(
121 category = LOG_CATEGORY,
122 name = self.name,
123 task = task_name,
124 error = %e,
125 );
126 }
127 Ok(executed) => {
128 if executed {
129 success_tasks.push(task_name.to_string());
130 info!(
131 category = LOG_CATEGORY,
132 name = self.name,
133 task = task_name,
134 elapsed = format!(
135 "{}ms",
136 task_start.elapsed().unwrap_or_default().as_millis()
137 ),
138 );
139 }
140 }
141 };
142 }
143 if !success_tasks.is_empty() || !fail_tasks.is_empty() {
144 info!(
145 category = LOG_CATEGORY,
146 name = self.name,
147 success_tasks = success_tasks.join(","),
148 fails = fail_tasks.len(),
149 fail_tasks = fail_tasks.join(","),
150 elapsed = format!(
151 "{}ms",
152 now.elapsed().unwrap_or_default().as_millis()
153 ),
154 );
155 }
156 }
157 }
158 }
159 }
160}
161
162// Trait defining interface for individual service tasks
163#[async_trait]
164pub trait ServiceTask: Sync + Send {
165 /// Executes a single iteration of the task. This method is called repeatedly
166 /// at the specified interval until shutdown or task completion.
167 ///
168 /// # Returns
169 /// * `None` or `Some(false)` - Task completed normally, continue running the service
170 /// * `Some(true)` - Task completed and requests service shutdown
171 async fn run(&self) -> Option<bool>;
172
173 /// Returns a human-readable description of the task for logging and monitoring.
174 /// Implementations should provide meaningful descriptions of their purpose.
175 ///
176 /// # Returns
177 /// * String describing the task's purpose, default is "unknown"
178 fn description(&self) -> String {
179 "unknown".to_string()
180 }
181}
182
183// Wrapper for individual ServiceTask implementations
184pub struct CommonServiceTask {
185 task: Box<dyn ServiceTask>, // The actual task to execute
186 interval: Duration, // Time between executions
187}
188
189impl CommonServiceTask {
190 /// Creates a new CommonServiceTask that wraps a single task implementation.
191 /// This is useful for simpler cases where only one recurring task is needed.
192 ///
193 /// # Arguments
194 /// * `interval` - Duration between task executions
195 /// * `task` - Implementation of ServiceTask to execute
196 ///
197 /// # Special Cases
198 /// - If interval is less than 1 second, task runs only once
199 /// - Task can signal completion via return value to stop service
200 pub fn new(interval: Duration, task: impl ServiceTask + 'static) -> Self {
201 Self {
202 task: Box::new(task),
203 interval,
204 }
205 }
206}
207
208#[async_trait]
209impl BackgroundService for CommonServiceTask {
210 /// Starts the background service, executing the wrapped task at the specified interval.
211 /// The service runs until one of the following conditions is met:
212 /// - Shutdown signal is received
213 /// - Task returns Some(true) indicating completion
214 /// - Interval is less than 1 second (runs once and stops)
215 ///
216 /// # Arguments
217 /// * `shutdown` - Watch channel for shutdown coordination
218 ///
219 /// # Logging
220 /// - Service start is logged with task description and interval
221 /// - Each task execution is logged with elapsed time
222 /// - Task completion status is logged
223 ///
224 /// # Performance Considerations
225 /// - Task execution time is measured and logged
226 /// - Long-running tasks may delay the next interval
227 async fn start(&self, mut shutdown: ShutdownWatch) {
228 let period_human = duration_to_string(self.interval);
229 // if interval is less than 1s
230 // the task should only run once
231 let once = self.interval.as_millis() < 1000;
232
233 info!(
234 category = LOG_CATEGORY,
235 description = self.task.description(),
236 interval = period_human.to_string(),
237 "background service is running",
238 );
239
240 let mut period = interval(self.interval);
241 loop {
242 tokio::select! {
243 _ = shutdown.changed() => {
244 break;
245 }
246 _ = period.tick() => {
247 let now = SystemTime::now();
248 let done = self.task.run().await.unwrap_or_default();
249 info!(
250 category = LOG_CATEGORY,
251 done,
252 elapsed = format!(
253 "{}ms",
254 now.elapsed().unwrap_or_default().as_millis()
255 ),
256 description = self.task.description(),
257 );
258 if once || done {
259 break;
260 }
261 }
262 }
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use pretty_assertions::assert_eq;
271
272 #[test]
273 fn test_simple_service_task() {
274 let task =
275 new_simple_service_task("test", Duration::from_secs(1), vec![]);
276
277 assert_eq!(task.name, "test");
278 assert_eq!(task.interval, Duration::from_secs(1));
279 assert_eq!(task.tasks.len(), 0);
280 }
281
282 #[test]
283 fn test_duration_to_string() {
284 assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
285 assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
286 assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
287 assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
288 }
289}