pub struct TimerService { /* private fields */ }Expand description
TimerService - timer service based on Actor pattern Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
§Features
- Automatically listens to all added timer handles’ timeout events
- Automatically removes one-shot tasks from internal management after timeout
- Continuously monitors periodic tasks and forwards each invocation
- Aggregates notifications (both one-shot and periodic) to be forwarded to the user’s unified channel
- Supports dynamic addition of BatchHandle and TimerHandle
§定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户。
- 自动监听所有添加的定时器句柄的超时事件
- 自动在一次性任务超时后从内部管理中移除任务
- 持续监听周期性任务并转发每次调用通知
- 将通知(一次性和周期性)聚合转发给用户
- 支持动态添加 BatchHandle 和 TimerHandle
§Examples (示例)
use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// Register one-shot tasks (注册一次性任务)
use kestrel_timer::TimerTask;
let oneshot_tasks: Vec<_> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("One-shot timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_millis(100), callback)
})
.collect();
service.register_batch(oneshot_tasks).unwrap();
// Register periodic tasks (注册周期性任务)
let periodic_task = TimerTask::new_periodic(
Duration::from_millis(100),
Duration::from_millis(50),
Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
None
);
service.register(periodic_task).unwrap();
// Receive notifications (接收通知)
let rx = service.take_receiver().unwrap();
while let Some(notification) = rx.recv().await {
match notification {
TaskNotification::OneShot(task_id) => {
println!("One-shot task {:?} expired", task_id);
}
TaskNotification::Periodic(task_id) => {
println!("Periodic task {:?} called", task_id);
}
}
}
}Implementations§
Source§impl TimerService
impl TimerService
Sourcepub fn take_receiver(&mut self) -> Option<Receiver<TaskNotification, 32>>
pub fn take_receiver(&mut self) -> Option<Receiver<TaskNotification, 32>>
Get timeout receiver (transfer ownership)
§Returns
Timeout notification receiver, if already taken, returns None
§Notes
This method can only be called once, because it transfers ownership of the receiver The receiver will receive both one-shot task expired notifications and periodic task called notifications
获取超时通知接收器 (转移所有权)
§返回值
超时通知接收器,如果已经取走,返回 None
§注意
此方法只能调用一次,因为它转移了接收器的所有权 接收器将接收一次性任务过期通知和周期性任务被调用通知
§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let rx = service.take_receiver().unwrap();
while let Some(notification) = rx.recv().await {
match notification {
TaskNotification::OneShot(task_id) => {
println!("One-shot task {:?} expired", task_id);
}
TaskNotification::Periodic(task_id) => {
println!("Periodic task {:?} called", task_id);
}
}
}Sourcepub fn cancel_task(&self, task_id: TaskId) -> bool
pub fn cancel_task(&self, task_id: TaskId) -> bool
Cancel specified task
§Parameters
task_id: Task ID to cancel
§Returns
true: Task exists and cancellation is successfulfalse: Task does not exist or cancellation fails
取消指定任务
§参数
task_id: 任务 ID
§返回值
true: 任务存在且取消成功false: 任务不存在或取消失败
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Use two-step API to schedule timers
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!"); // 定时器触发
}));
let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
// Cancel task
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled); // 任务取消Sourcepub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
Batch cancel tasks
Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
§Parameters
task_ids: List of task IDs to cancel
§Returns
Number of successfully cancelled tasks
批量取消任务
§参数
task_ids: 任务 ID 列表
§返回值
成功取消的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let tasks: Vec<_> = (0..10)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i); // 定时器触发
}));
TimerTask::new_oneshot(Duration::from_secs(10), callback)
})
.collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap(); // 注册定时器
// Batch cancel
let cancelled = service.cancel_batch(&task_ids);
println!("Cancelled {} tasks", cancelled); // 任务取消Sourcepub fn postpone(
&self,
task_id: TaskId,
new_delay: Duration,
callback: Option<CallbackWrapper>,
) -> bool
pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool
Postpone task (replace callback)
§Parameters
task_id: Task ID to postponenew_delay: New delay time (recalculated from current time point)callback: New callback function
§Returns
true: Task exists and is successfully postponedfalse: Task does not exist or postponement fails
§Notes
- Task ID remains unchanged after postponement
- Original timeout notification remains valid
- Callback function will be replaced with new callback
推迟任务 (替换回调)
§参数
task_id: 任务 IDnew_delay: 新的延迟时间 (从当前时间点重新计算)callback: 新的回调函数
§返回值
true: 任务存在且延期成功false: 任务不存在或延期失败
§注意
- 任务 ID 在延期后保持不变
- 原始超时通知保持有效
- 回调函数将被新的回调函数替换
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async {
println!("Original callback"); // 原始回调
}));
let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
// Postpone and replace callback (延期并替换回调)
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
task_id,
Duration::from_secs(10),
new_callback
);
println!("Postponed successfully: {}", success);Sourcepub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
Batch postpone tasks (keep original callbacks)
§Parameters
updates: List of tuples of (task ID, new delay)
§Returns
Number of successfully postponed tasks
批量延期任务 (保持原始回调)
§参数
updates: (任务 ID, 新延迟) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let tasks: Vec<_> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_secs(5), callback)
})
.collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// Batch postpone (keep original callbacks)
// 批量延期任务 (保持原始回调)
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| (id, Duration::from_secs(10)))
.collect();
let postponed = service.postpone_batch(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn postpone_batch_with_callbacks(
&self,
updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
) -> usize
pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize
Batch postpone tasks (replace callbacks)
§Parameters
updates: List of tuples of (task ID, new delay, new callback)
§Returns
Number of successfully postponed tasks
批量延期任务 (替换回调)
§参数
updates: (任务 ID, 新延迟, 新回调) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Create 3 tasks, initially no callbacks
// 创建 3 个任务,最初没有回调
let tasks: Vec<_> = (0..3)
.map(|_| TimerTask::new_oneshot(Duration::from_secs(5), None))
.collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// Batch postpone and add new callbacks
// 批量延期并添加新的回调
let updates: Vec<_> = task_ids
.into_iter()
.enumerate()
.map(|(i, id)| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("New callback {}", i);
}));
(id, Duration::from_secs(10), callback)
})
.collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn register(&self, task: TimerTask) -> Result<TimerHandle, TimerError>
pub fn register(&self, task: TimerTask) -> Result<TimerHandle, TimerError>
Register timer task to service (registration phase)
§Parameters
task: Task created viaTimerTask::new_oneshot()
§Returns
Ok(TimerHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)
注册定时器任务到服务 (注册阶段)
§参数
task: 通过TimerTask::new_oneshot()创建的任务
§返回值
Ok(TimerHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create task
// 创建任务
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
let task_id = task.get_id();
// Step 2: register task
// 注册任务
service.register(task).unwrap();Sourcepub fn register_batch(
&self,
tasks: Vec<TimerTask>,
) -> Result<BatchHandle, TimerError>
pub fn register_batch( &self, tasks: Vec<TimerTask>, ) -> Result<BatchHandle, TimerError>
Batch register timer tasks to service (registration phase)
§Parameters
tasks: List of tasks created viaTimerTask::new_oneshot()
§Returns
Ok(BatchHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)
批量注册定时器任务到服务 (注册阶段)
§参数
tasks: 通过TimerTask::new_oneshot()创建的任务列表
§返回值
Ok(BatchHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create batch of tasks with callbacks
// 创建批量任务,带有回调
let tasks: Vec<TimerTask> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_secs(1), callback)
})
.collect();
// Step 2: register batch of tasks with callbacks
// 注册批量任务,带有回调
service.register_batch(tasks).unwrap();