pub struct TimerService { /* private fields */ }Expand description
TimerService - timer service based on Actor pattern Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
§Features
- Automatically listens to all added timer handles’ timeout events
- Automatically removes one-shot tasks from internal management after timeout
- Continuously monitors periodic tasks and forwards each invocation
- Aggregates notifications (both one-shot and periodic) to be forwarded to the user’s unified channel
- Supports dynamic addition of BatchHandle and TimerHandle
§定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户
- 自动监听所有添加的定时器句柄的超时事件
- 自动在一次性任务超时后从内部管理中移除任务
- 持续监听周期性任务并转发每次调用通知
- 将通知(一次性和周期性)聚合转发给用户
- 支持动态添加 BatchHandle 和 TimerHandle
§Examples (示例)
use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// Register one-shot tasks (注册一次性任务)
use kestrel_timer::TimerTask;
let handles = service.allocate_handles(3);
let tasks: Vec<_> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("One-shot timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_millis(100), callback)
})
.collect();
service.register_batch(handles, tasks).unwrap();
// Register periodic tasks (注册周期性任务)
let handle = service.allocate_handle();
let periodic_task = TimerTask::new_periodic(
Duration::from_millis(100),
Duration::from_millis(50),
Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
None
);
service.register(handle, periodic_task).unwrap();
// Receive notifications (接收通知)
let rx = service.take_receiver().unwrap();
while let Some(notification) = rx.recv().await {
match notification {
TaskNotification::OneShot(task_id) => {
println!("One-shot task {:?} expired", task_id);
}
TaskNotification::Periodic(task_id) => {
println!("Periodic task {:?} called", task_id);
}
}
}
}Implementations§
Source§impl TimerService
impl TimerService
Sourcepub fn allocate_handle(&self) -> TaskHandle
pub fn allocate_handle(&self) -> TaskHandle
Allocate a handle from DeferredMap
§Returns
A unique handle for later insertion
§返回值
用于后续插入的唯一 handle
§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// Allocate handle first
// 先分配handle
let handle = service.allocate_handle();Sourcepub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle>
pub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle>
Batch allocate handles from DeferredMap
§Parameters
count: Number of handles to allocate
§Returns
Vector of unique handles for later batch insertion
§参数
count: 要分配的 handle 数量
§返回值
用于后续批量插入的唯一 handles 向量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Batch allocate handles
// 批量分配 handles
let handles = service.allocate_handles(10);
assert_eq!(handles.len(), 10);Sourcepub fn take_receiver(&mut self) -> Option<Receiver<TaskNotification, 32>>
pub fn take_receiver(&mut self) -> Option<Receiver<TaskNotification, 32>>
Get timeout receiver (transfer ownership)
§Returns
Timeout notification receiver, if already taken, returns None
§Notes
This method can only be called once, because it transfers ownership of the receiver The receiver will receive both one-shot task expired notifications and periodic task called notifications
获取超时通知接收器(转移所有权)
§返回值
超时通知接收器,如果已经取走,返回 None
§注意
此方法只能调用一次,因为它转移了接收器的所有权 接收器将接收一次性任务过期通知和周期性任务被调用通知
§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let rx = service.take_receiver().unwrap();
while let Some(notification) = rx.recv().await {
match notification {
TaskNotification::OneShot(task_id) => {
println!("One-shot task {:?} expired", task_id);
}
TaskNotification::Periodic(task_id) => {
println!("Periodic task {:?} called", task_id);
}
}
}Sourcepub fn cancel_task(&self, task_id: TaskId) -> bool
pub fn cancel_task(&self, task_id: TaskId) -> bool
Cancel specified task
§Parameters
task_id: Task ID to cancel
§Returns
true: Task exists and cancellation is successfulfalse: Task does not exist or cancellation fails
取消指定任务
§参数
task_id: 任务 ID
§返回值
true: 任务存在且取消成功false: 任务不存在或取消失败
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Use two-step API to schedule timers
let handle = service.allocate_handle();
let task_id = handle.task_id();
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!"); // 定时器触发
}));
let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
service.register(handle, task).unwrap(); // 注册定时器
// Cancel task
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled); // 任务取消Sourcepub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
Batch cancel tasks
Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
§Parameters
task_ids: List of task IDs to cancel
§Returns
Number of successfully cancelled tasks
批量取消任务
§参数
task_ids: 任务 ID 列表
§返回值
成功取消的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let handles = service.allocate_handles(10);
let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
let tasks: Vec<_> = (0..10)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i); // 定时器触发
}));
TimerTask::new_oneshot(Duration::from_secs(10), callback)
})
.collect();
service.register_batch(handles, tasks).unwrap(); // 注册定时器
// Batch cancel
let cancelled = service.cancel_batch(&task_ids);
println!("Cancelled {} tasks", cancelled); // 任务取消Sourcepub fn postpone(
&self,
task_id: TaskId,
new_delay: Duration,
callback: Option<CallbackWrapper>,
) -> bool
pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool
Postpone task (optionally replace callback)
§Parameters
task_id: Task ID to postponenew_delay: New delay time (recalculated from current time point)callback: New callback function (ifNone, keeps the original callback)
§Returns
true: Task exists and is successfully postponedfalse: Task does not exist or postponement fails
§Notes
- Task ID remains unchanged after postponement
- Original timeout notification remains valid
- If callback is
Some, it will replace the original callback - If callback is
None, the original callback is preserved
推迟任务 (可选替换回调)
§参数
task_id: 任务 IDnew_delay: 新的延迟时间 (从当前时间点重新计算)callback: 新的回调函数 (如果为None,则保留原回调)
§返回值
true: 任务存在且延期成功false: 任务不存在或延期失败
§注意
- 任务 ID 在延期后保持不变
- 原始超时通知保持有效
- 如果 callback 为
Some,将替换原始回调 - 如果 callback 为
None,保留原始回调
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let handle = service.allocate_handle();
let task_id = handle.task_id();
let callback = Some(CallbackWrapper::new(|| async {
println!("Original callback"); // 原始回调
}));
let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
service.register(handle, task).unwrap(); // 注册定时器
// Postpone and replace callback (延期并替换回调)
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
task_id,
Duration::from_secs(10),
new_callback
);
println!("Postponed successfully: {}", success);Sourcepub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
Batch postpone tasks (keep original callbacks)
§Parameters
updates: List of tuples of (task ID, new delay)
§Returns
Number of successfully postponed tasks
批量延期任务 (保持原始回调)
§参数
updates: (任务 ID, 新延迟) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let handles = service.allocate_handles(3);
let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
let tasks: Vec<_> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_secs(5), callback)
})
.collect();
service.register_batch(handles, tasks).unwrap();
// Batch postpone (keep original callbacks)
// 批量延期任务 (保持原始回调)
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| (id, Duration::from_secs(10)))
.collect();
let postponed = service.postpone_batch(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn postpone_batch_with_callbacks(
&self,
updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
) -> usize
pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize
Batch postpone tasks (replace callbacks)
§Parameters
updates: List of tuples of (task ID, new delay, new callback)
§Returns
Number of successfully postponed tasks
批量延期任务 (替换回调)
§参数
updates: (任务 ID, 新延迟, 新回调) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Create 3 tasks, initially no callbacks
// 创建 3 个任务,最初没有回调
let handles = service.allocate_handles(3);
let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
let tasks: Vec<_> = (0..3)
.map(|_| {
TimerTask::new_oneshot(Duration::from_secs(5), None)
})
.collect();
service.register_batch(handles, tasks).unwrap();
// Batch postpone and add new callbacks
// 批量延期并添加新的回调
let updates: Vec<_> = task_ids
.into_iter()
.enumerate()
.map(|(i, id)| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("New callback {}", i);
}));
(id, Duration::from_secs(10), callback)
})
.collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn register(
&self,
handle: TaskHandle,
task: TimerTask,
) -> Result<TimerHandle, TimerError>
pub fn register( &self, handle: TaskHandle, task: TimerTask, ) -> Result<TimerHandle, TimerError>
Register timer task to service (registration phase)
§Parameters
handle: Handle allocated viaallocate_handle()task: Task created viaTimerTask::new_oneshot()orTimerTask::new_periodic()
§Returns
Ok(TimerHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)
注册定时器任务到服务 (注册阶段)
§参数
handle: 通过allocate_handle()分配的 handletask: 通过TimerTask::new_oneshot()或TimerTask::new_periodic()创建的任务
§返回值
Ok(TimerHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: allocate handle
// 分配 handle
let handle = service.allocate_handle();
let task_id = handle.task_id();
// Step 2: create task
// 创建任务
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
// Step 3: register task
// 注册任务
service.register(handle, task).unwrap();Sourcepub fn register_batch(
&self,
handles: Vec<TaskHandle>,
tasks: Vec<TimerTask>,
) -> Result<BatchHandle, TimerError>
pub fn register_batch( &self, handles: Vec<TaskHandle>, tasks: Vec<TimerTask>, ) -> Result<BatchHandle, TimerError>
Batch register timer tasks to service (registration phase)
§Parameters
handles: Pre-allocated handles for taskstasks: List of timer tasks
§Returns
Ok(BatchHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)Err(TimerError::BatchLengthMismatch): Handles and tasks lengths don’t match
批量注册定时器任务到服务 (注册阶段)
§参数
handles: 任务的预分配 handlestasks: 定时器任务列表
§返回值
Ok(BatchHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)Err(TimerError::BatchLengthMismatch): handles 和 tasks 长度不匹配
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: batch allocate handles
// 批量分配 handles
let handles = service.allocate_handles(3);
// Step 2: create tasks
// 创建任务
let tasks: Vec<_> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
TimerTask::new_oneshot(Duration::from_secs(1), callback)
})
.collect();
// Step 3: register batch
// 注册批量任务
service.register_batch(handles, tasks).unwrap();