pingap_core/
service.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{Error, LOG_CATEGORY};
16use async_trait::async_trait;
17use futures::future::BoxFuture;
18use pingora::server::ShutdownWatch;
19use pingora::services::background::BackgroundService;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::time::{Duration, SystemTime};
22use tokio::time::interval;
23use tracing::{error, info};
24
25// Type alias for a boxed future that represents a background task
26// Takes a u32 counter and returns Result<bool, Error>
27pub type SimpleServiceTaskFuture =
28    Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, Error>> + Sync + Send>;
29
30// Represents a collection of background tasks that run periodically
31pub struct SimpleServiceTask {
32    name: String,     // Name identifier for the service
33    count: AtomicU32, // Counter for tracking task executions
34    tasks: Vec<(String, SimpleServiceTaskFuture)>, // List of named tasks to execute
35    interval: Duration, // Time between task executions
36}
37
38/// Creates a new SimpleServiceTask with the specified name, interval, and collection of tasks.
39/// This service manages multiple background tasks that run concurrently at fixed intervals.
40///
41/// # Arguments
42/// * `name` - Identifier for this service instance, used in logging
43/// * `interval` - Duration between task executions (e.g., Duration::from_secs(60) for minute intervals)
44/// * `tasks` - Vector of named tasks to execute periodically, where each task is a tuple of (name, task_function)
45pub fn new_simple_service_task(
46    name: &str,
47    interval: Duration,
48    tasks: Vec<(String, SimpleServiceTaskFuture)>,
49) -> SimpleServiceTask {
50    SimpleServiceTask {
51        name: name.to_string(),
52        count: AtomicU32::new(0),
53        tasks,
54        interval,
55    }
56}
57
58fn duration_to_string(duration: Duration) -> String {
59    let secs = duration.as_secs_f64();
60
61    if secs < 60.0 {
62        format!("{secs:.1}s")
63    } else if secs < 3600.0 {
64        format!("{:.1}m", secs / 60.0)
65    } else if secs < 86400.0 {
66        format!("{:.1}h", secs / 3600.0)
67    } else {
68        format!("{:.1}d", secs / 86400.0)
69    }
70}
71
72#[async_trait]
73impl BackgroundService for SimpleServiceTask {
74    /// Starts the background service, executing all tasks at the specified interval
75    /// until shutdown is signaled or tasks complete. Each task execution is logged
76    /// with timing information and success/failure status.
77    ///
78    /// # Arguments
79    /// * `shutdown` - Watch channel for shutdown coordination
80    ///
81    /// # Task Execution
82    /// - Tasks are executed sequentially in the order they were added
83    /// - Each task receives a counter value that increments with each interval
84    /// - Failed tasks are logged with error details but don't stop the service
85    /// - Task execution times are logged for monitoring purposes
86    ///
87    /// # Shutdown Behavior
88    /// - Service stops gracefully when shutdown signal is received
89    /// - Current task iteration completes before shutdown
90    async fn start(&self, mut shutdown: ShutdownWatch) {
91        let period_human = duration_to_string(self.interval);
92        let task_names: Vec<String> =
93            self.tasks.iter().map(|item| item.0.clone()).collect();
94        info!(
95            category = LOG_CATEGORY,
96            name = self.name,
97            tasks = task_names.join(","),
98            interval = period_human.to_string(),
99            "background service is running",
100        );
101
102        let mut period = interval(self.interval);
103        loop {
104            tokio::select! {
105                // Handle shutdown signal
106                _ = shutdown.changed() => {
107                    break;
108                }
109                // Execute tasks on each interval tick
110                _ = period.tick() => {
111                    let now = SystemTime::now();
112                    let count = self.count.fetch_add(1, Ordering::Relaxed);
113                    let mut success_tasks = vec![];
114                    let mut fail_tasks = vec![];
115                    // Execute each task and track results
116                    for (task_name, task) in self.tasks.iter() {
117                        let task_start = SystemTime::now();
118                        match task(count).await {
119                           Err(e)  => {
120                               fail_tasks.push(task_name.to_string());
121                               error!(
122                                   category = LOG_CATEGORY,
123                                   name = self.name,
124                                   task = task_name,
125                                   error = %e,
126                               );
127                           }
128                           Ok(executed) => {
129                               if executed {
130                                   success_tasks.push(task_name.to_string());
131                                   info!(
132                                       category = LOG_CATEGORY,
133                                       name = self.name,
134                                       task = task_name,
135                                       elapsed = format!(
136                                           "{}ms",
137                                           task_start.elapsed().unwrap_or_default().as_millis()
138                                       ),
139                                   );
140                               }
141                           }
142                        };
143                    }
144                    if !success_tasks.is_empty() || !fail_tasks.is_empty() {
145                        info!(
146                            category = LOG_CATEGORY,
147                            name = self.name,
148                            success_tasks = success_tasks.join(","),
149                            fails = fail_tasks.len(),
150                            fail_tasks = fail_tasks.join(","),
151                            elapsed = format!(
152                                "{}ms",
153                                now.elapsed().unwrap_or_default().as_millis()
154                            ),
155                        );
156                    }
157                }
158            }
159        }
160    }
161}
162
163// Trait defining interface for individual service tasks
164#[async_trait]
165pub trait ServiceTask: Sync + Send {
166    /// Executes a single iteration of the task. This method is called repeatedly
167    /// at the specified interval until shutdown or task completion.
168    ///
169    /// # Returns
170    /// * `None` or `Some(false)` - Task completed normally, continue running the service
171    /// * `Some(true)` - Task completed and requests service shutdown
172    async fn run(&self) -> Option<bool>;
173
174    /// Returns a human-readable description of the task for logging and monitoring.
175    /// Implementations should provide meaningful descriptions of their purpose.
176    ///
177    /// # Returns
178    /// * String describing the task's purpose, default is "unknown"
179    fn description(&self) -> String {
180        "unknown".to_string()
181    }
182}
183
184// Wrapper for individual ServiceTask implementations
185pub struct CommonServiceTask {
186    task: Box<dyn ServiceTask>, // The actual task to execute
187    interval: Duration,         // Time between executions
188}
189
190impl CommonServiceTask {
191    /// Creates a new CommonServiceTask that wraps a single task implementation.
192    /// This is useful for simpler cases where only one recurring task is needed.
193    ///
194    /// # Arguments
195    /// * `interval` - Duration between task executions
196    /// * `task` - Implementation of ServiceTask to execute
197    ///
198    /// # Special Cases
199    /// - If interval is less than 1 second, task runs only once
200    /// - Task can signal completion via return value to stop service
201    pub fn new(interval: Duration, task: impl ServiceTask + 'static) -> Self {
202        Self {
203            task: Box::new(task),
204            interval,
205        }
206    }
207}
208
209#[async_trait]
210impl BackgroundService for CommonServiceTask {
211    /// Starts the background service, executing the wrapped task at the specified interval.
212    /// The service runs until one of the following conditions is met:
213    /// - Shutdown signal is received
214    /// - Task returns Some(true) indicating completion
215    /// - Interval is less than 1 second (runs once and stops)
216    ///
217    /// # Arguments
218    /// * `shutdown` - Watch channel for shutdown coordination
219    ///
220    /// # Logging
221    /// - Service start is logged with task description and interval
222    /// - Each task execution is logged with elapsed time
223    /// - Task completion status is logged
224    ///
225    /// # Performance Considerations
226    /// - Task execution time is measured and logged
227    /// - Long-running tasks may delay the next interval
228    async fn start(&self, mut shutdown: ShutdownWatch) {
229        let period_human = duration_to_string(self.interval);
230        // if interval is less than 1s
231        // the task should only run once
232        let once = self.interval.as_millis() < 1000;
233
234        info!(
235            category = LOG_CATEGORY,
236            description = self.task.description(),
237            interval = period_human.to_string(),
238            "background service is running",
239        );
240
241        let mut period = interval(self.interval);
242        loop {
243            tokio::select! {
244                _ = shutdown.changed() => {
245                    break;
246                }
247                _ = period.tick() => {
248                    let now = SystemTime::now();
249                    let done = self.task.run().await.unwrap_or_default();
250                    info!(
251                        category = LOG_CATEGORY,
252                        done,
253                        elapsed = format!(
254                            "{}ms",
255                            now.elapsed().unwrap_or_default().as_millis()
256                        ),
257                        description = self.task.description(),
258                    );
259                    if once || done {
260                        break;
261                    }
262                }
263            }
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use pretty_assertions::assert_eq;
272
273    #[test]
274    fn test_simple_service_task() {
275        let task =
276            new_simple_service_task("test", Duration::from_secs(1), vec![]);
277
278        assert_eq!(task.name, "test");
279        assert_eq!(task.interval, Duration::from_secs(1));
280        assert_eq!(task.tasks.len(), 0);
281    }
282
283    #[test]
284    fn test_duration_to_string() {
285        assert_eq!(duration_to_string(Duration::from_secs(1)), "1.0s");
286        assert_eq!(duration_to_string(Duration::from_secs(60)), "1.0m");
287        assert_eq!(duration_to_string(Duration::from_secs(3600)), "1.0h");
288        assert_eq!(duration_to_string(Duration::from_secs(86400)), "1.0d");
289    }
290}