pub struct TimerService { /* private fields */ }Expand description
TimerService - 基于 Actor 模式的定时器服务 (TimerService - timer service based on Actor pattern)
管理多个定时器句柄,监听所有超时事件,并将 TaskId 聚合转发给用户。 (Manages multiple timer handles, listens to all timeout events, and aggregates TaskId to be forwarded to the user.)
§特性 (Features)
- 自动监听所有添加的定时器句柄的超时事件 (Automatically listens to all added timer handles’ timeout events)
- 超时后自动从内部管理中移除该任务 (Automatically removes the task from internal management after timeout)
- 将超时的 TaskId 转发到统一的通道供用户接收 (Aggregates TaskId to be forwarded to the user’s unified channel)
- 支持动态添加 BatchHandle 和 TimerHandle (Supports dynamic addition of BatchHandle and TimerHandle)
§示例 (Examples)
use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, ServiceConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// 使用两步式 API 通过 service 批量调度定时器
// (Use two-step API to batch schedule timers through service)
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100), callback)
})
.collect();
// (Create batch of tasks with callbacks)
let tasks = TimerService::create_batch_with_callbacks(callbacks);
service.register_batch(tasks).unwrap();
// (Register batch of tasks)
// 接收超时通知
// (Receive timeout notifications)
let mut rx = service.take_receiver().unwrap();
// (Take receiver and receive timeout notifications)
while let Some(task_id) = rx.recv().await {
// (Receive timeout notification)
println!("Task {:?} completed", task_id);
}
}Implementations§
Source§impl TimerService
impl TimerService
Sourcepub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
pub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
获取超时接收器(转移所有权) (Get timeout receiver, transfer ownership)
§返回 (Returns)
超时通知接收器,如果已经被取走则返回 None (Timeout notification receiver, if already taken, returns None)
§注意 (Notes)
此方法只能调用一次,因为它会转移接收器的所有权 (This method can only be called once, because it transfers ownership of the receiver)
§示例 (Examples)s
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
println!("Task {:?} timed out", task_id);
}Sourcepub fn cancel_task(&self, task_id: TaskId) -> bool
pub fn cancel_task(&self, task_id: TaskId) -> bool
取消指定的任务 (Cancel specified task)
§参数 (Parameters)
task_id: 要取消的任务 ID (Task ID to cancel)
§返回 (Returns)
true: 任务存在且成功取消 (Task exists and cancellation is successful)false: 任务不存在或取消失败 (Task does not exist or cancellation fails)
§性能说明 (Performance Notes)
此方法使用直接取消优化,不需要等待 Actor 处理,大幅降低延迟 (This method uses direct cancellation optimization, does not need to wait for Actor processing, greatly reducing latency)
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 使用两步式 API 调度定时器
// (Use two-step API to schedule timers)
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap();
// 取消任务
// (Cancel task)
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled);Sourcepub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
批量取消任务 (Batch cancel tasks)
使用底层的批量取消操作一次性取消多个任务,性能优于循环调用 cancel_task。 (Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.)
§参数 (Parameters)
task_ids: 要取消的任务 ID 列表 (List of task IDs to cancel)
§返回 (Returns)
成功取消的任务数量 (Number of successfully cancelled tasks)
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..10)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(10), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量取消
let cancelled = service.cancel_batch(&task_ids);
println!("Cancelled {} tasks", cancelled);Sourcepub fn postpone(
&self,
task_id: TaskId,
new_delay: Duration,
callback: Option<CallbackWrapper>,
) -> bool
pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool
推迟任务(替换回调) (Postpone task (replace callback))
§参数 (Parameters)
task_id: 要推迟的任务 ID (Task ID to postpone)new_delay: 新的延迟时间(从当前时间点重新计算) (New delay time (recalculated from current time point))callback: 新的回调函数 (New callback function)
§返回 (Returns)
true: 任务存在且成功推迟 (Task exists and is successfully postponed)false: 任务不存在或推迟失败 (Task does not exist or postponement fails)
§注意 (Notes)
- 推迟后任务 ID 保持不变 (Task ID remains unchanged after postponement)
- 原有的超时通知仍然有效 (Original timeout notification remains valid)
- 回调函数会被替换为新的回调 (Callback function will be replaced with new callback)
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async {
println!("Original callback");
}));
let task = TimerService::create_task(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap();
// 推迟并替换回调
// (Postpone and replace callback)
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
task_id,
Duration::from_secs(10),
new_callback
);
println!("Postponed successfully: {}", success);Sourcepub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
批量推迟任务(保持原回调) (Batch postpone tasks, keep original callbacks)
§参数 (Parameters)
updates: (任务ID, 新延迟) 的元组列表 (List of tuples of (task ID, new delay))
§返回 (Returns)
成功推迟的任务数量 (Number of successfully postponed tasks)
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(5), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量推迟(保持原回调)
// (Batch postpone, keep original callbacks)
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| (id, Duration::from_secs(10)))
.collect();
let postponed = service.postpone_batch(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn postpone_batch_with_callbacks(
&self,
updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
) -> usize
pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize
批量推迟任务(替换回调) (Batch postpone tasks, replace callbacks)
§参数 (Parameters)
updates: (任务ID, 新延迟, 新回调) 的元组列表 (List of tuples of (task ID, new delay, new callback))
§返回 (Returns)
成功推迟的任务数量 (Number of successfully postponed tasks)
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 创建 3 个任务,初始没有回调
// (Create 3 tasks, initially no callbacks)
let delays: Vec<Duration> = (0..3)
.map(|_| Duration::from_secs(5))
.collect();
let tasks = TimerService::create_batch(delays);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量推迟并添加新回调
// (Batch postpone and add new callbacks)
let updates: Vec<_> = task_ids
.into_iter()
.enumerate()
.map(|(i, id)| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("New callback {}", i);
}));
(id, Duration::from_secs(10), callback)
})
.collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn create_task(
delay: Duration,
callback: Option<CallbackWrapper>,
) -> TimerTask
pub fn create_task( delay: Duration, callback: Option<CallbackWrapper>, ) -> TimerTask
创建定时器任务(静态方法,申请阶段) (Create timer task (static method, apply stage))
§参数 (Parameters)
delay: 延迟时间 (Delay time)callback: 实现了 TimerCallback trait 的回调对象 (Callback object implementing TimerCallback trait)
§返回 (Returns)
返回 TimerTask,需要通过 register() 注册
(Return TimerTask, needs to be registered through register())
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 创建任务
// (Step 1: create task)
let callback = Some(CallbackWrapper::new(|| async {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
println!("Created task: {:?}", task_id);
// 步骤 2: 注册任务
// (Step 2: register task)
service.register(task).unwrap();Sourcepub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
pub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
批量创建定时器任务(静态方法,申请阶段,不带回调) (Create batch of timer tasks (static method, apply stage, no callbacks))
§参数 (Parameters)
delays: 延迟时间列表 (List of delay times)
§返回 (Returns)
返回 TimerTask 列表,需要通过 register_batch() 注册
(Return TimerTask list, needs to be registered through register_batch())
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 批量创建任务
// (Step 1: create batch of tasks)
let delays: Vec<Duration> = (0..3)
.map(|i| Duration::from_millis(100 * (i + 1)))
.collect();
let tasks = TimerService::create_batch(delays);
println!("Created {} tasks", tasks.len());
// 步骤 2: 批量注册任务
// (Step 2: register batch of tasks)
service.register_batch(tasks).unwrap();Sourcepub fn create_batch_with_callbacks(
callbacks: Vec<(Duration, Option<CallbackWrapper>)>,
) -> Vec<TimerTask>
pub fn create_batch_with_callbacks( callbacks: Vec<(Duration, Option<CallbackWrapper>)>, ) -> Vec<TimerTask>
批量创建定时器任务(静态方法,申请阶段,带回调)
§参数 (Parameters)
callbacks: (延迟时间, 回调) 的元组列表 (List of tuples of (delay time, callback))
§返回 (Returns)
返回 TimerTask 列表,需要通过 register_batch() 注册
(Return TimerTask list, needs to be registered through register_batch())
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 批量创建任务
// (Step 1: create batch of tasks with callbacks)
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100 * (i + 1)), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
println!("Created {} tasks", tasks.len());
// 步骤 2: 批量注册任务
// (Step 2: register batch of tasks with callbacks)
service.register_batch(tasks).unwrap();Sourcepub fn register(&self, task: TimerTask) -> Result<(), TimerError>
pub fn register(&self, task: TimerTask) -> Result<(), TimerError>
注册定时器任务到服务(注册阶段) (Register timer task to service (registration phase))
§参数 (Parameters)
task: 通过create_task()创建的任务 (Task created viacreate_task())
§返回 (Returns)
(Task created via `create_task()`)Ok(()): 注册成功 (Register successfully)Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭) (Register failed (internal channel is full or closed))
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
service.register(task).unwrap();Sourcepub fn register_batch(&self, tasks: Vec<TimerTask>) -> Result<(), TimerError>
pub fn register_batch(&self, tasks: Vec<TimerTask>) -> Result<(), TimerError>
批量注册定时器任务到服务(注册阶段) (Batch register timer tasks to service (registration phase))
§参数 (Parameters)
tasks: 通过create_batch()创建的任务列表 (List of tasks created viacreate_batch())
§返回 (Returns)
(List of tasks created via `create_batch()`)Ok(()): 注册成功 (Register successfully)Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭) (Register failed (internal channel is full or closed))
§示例 (Examples)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(1), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
service.register_batch(tasks).unwrap();