1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
//! Background task scheduler for deferred and periodic work (#95).
//!
//! Provides [`TaskScheduler`] which manages one-shot and recurring async tasks
//! using `tokio::spawn` and `tokio::time`.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
/// Unique identifier for a scheduled task.
pub type TaskId = u64;
/// Internal state shared across clones.
struct SchedulerInner {
next_id: AtomicU64,
tasks: Mutex<HashMap<TaskId, TaskEntry>>,
}
struct TaskEntry {
label: String,
handle: JoinHandle<()>,
}
/// A scheduler for one-shot and periodic background tasks.
///
/// All tasks are cancelled when [`TaskScheduler::shutdown`] is called or when
/// the scheduler is dropped.
#[derive(Clone)]
pub struct TaskScheduler {
inner: Arc<SchedulerInner>,
}
impl TaskScheduler {
/// Create a new task scheduler.
pub fn new() -> Self {
Self {
inner: Arc::new(SchedulerInner {
next_id: AtomicU64::new(1),
tasks: Mutex::new(HashMap::new()),
}),
}
}
/// Schedule a one-shot task that executes after `delay`.
///
/// Returns a [`TaskId`] that can be used to cancel the task.
pub fn schedule_once<F, Fut>(&self, delay: Duration, label: impl Into<String>, f: F) -> TaskId
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
let label_str = label.into();
let inner = Arc::clone(&self.inner);
let task_label = label_str.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(delay).await;
debug!("Running one-shot task {id} ({task_label})");
f().await;
// Remove self from the map after completion.
inner.tasks.lock().await.remove(&id);
});
{
let inner = Arc::clone(&self.inner);
let label_str = label_str.clone();
tokio::spawn(async move {
inner.tasks.lock().await.insert(
id,
TaskEntry {
label: label_str,
handle,
},
);
});
}
id
}
/// Schedule a periodic task that runs every `interval`.
///
/// The task function receives the current tick count (starting at 1).
/// The first execution happens after `interval` elapses.
///
/// Returns a [`TaskId`] that can be used to cancel the task.
pub fn schedule_periodic<F, Fut>(
&self,
interval: Duration,
label: impl Into<String>,
f: F,
) -> TaskId
where
F: Fn(u64) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
let label_str = label.into();
let task_label = label_str.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
// First tick fires immediately — skip it so the first execution
// happens after one full interval.
ticker.tick().await;
let mut tick_count: u64 = 0;
loop {
ticker.tick().await;
tick_count += 1;
debug!("Periodic task {id} ({task_label}) tick {tick_count}");
f(tick_count).await;
}
});
let inner = Arc::clone(&self.inner);
let label_owned = label_str;
tokio::spawn(async move {
inner.tasks.lock().await.insert(
id,
TaskEntry {
label: label_owned,
handle,
},
);
});
id
}
/// Cancel a previously scheduled task.
///
/// Returns `true` if the task was found and cancelled.
pub async fn cancel(&self, id: TaskId) -> bool {
if let Some(entry) = self.inner.tasks.lock().await.remove(&id) {
entry.handle.abort();
debug!("Cancelled task {id} ({})", entry.label);
true
} else {
false
}
}
/// Return the number of active (not yet completed / cancelled) tasks.
pub async fn active_count(&self) -> usize {
self.inner.tasks.lock().await.len()
}
/// Cancel all tasks and shut down the scheduler.
pub async fn shutdown(&self) {
let mut tasks = self.inner.tasks.lock().await;
for (id, entry) in tasks.drain() {
entry.handle.abort();
debug!("Shutdown: cancelled task {id} ({})", entry.label);
}
}
}
impl Default for TaskScheduler {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for TaskScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskScheduler").finish()
}
}
/// Convenience: schedule a one-shot closure that returns a boxed future.
pub fn boxed_task<F>(f: F) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
where
F: Future<Output = ()> + Send + 'static,
{
Box::pin(f)
}
#[cfg(test)]
#[path = "task_scheduler_tests.rs"]
mod tests;