pub struct TimerService { /* private fields */ }Expand description
TimerService - 基于 Actor 模式的定时器服务
管理多个定时器句柄,监听所有超时事件,并将 TaskId 聚合转发给用户。
§特性
- 自动监听所有添加的定时器句柄的超时事件
- 超时后自动从内部管理中移除该任务
- 将超时的 TaskId 转发到统一的通道供用户接收
- 支持动态添加 BatchHandle 和 TimerHandle
§示例
use kestrel_protocol_timer::{TimerWheel, TimerService, CallbackWrapper, ServiceConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// 使用两步式 API 通过 service 批量调度定时器
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
service.register_batch(tasks).unwrap();
// 接收超时通知
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
println!("Task {:?} completed", task_id);
}
}Implementations§
Source§impl TimerService
impl TimerService
Sourcepub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
pub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
获取超时接收器(转移所有权)
§返回
超时通知接收器,如果已经被取走则返回 None
§注意
此方法只能调用一次,因为它会转移接收器的所有权
§示例
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
println!("Task {:?} timed out", task_id);
}Sourcepub fn cancel_task(&self, task_id: TaskId) -> bool
pub fn cancel_task(&self, task_id: TaskId) -> bool
取消指定的任务
§参数
task_id: 要取消的任务 ID
§返回
true: 任务存在且成功取消false: 任务不存在或取消失败
§性能说明
此方法使用直接取消优化,不需要等待 Actor 处理,大幅降低延迟
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 使用两步式 API 调度定时器
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap();
// 取消任务
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled);Sourcepub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
批量取消任务
使用底层的批量取消操作一次性取消多个任务,性能优于循环调用 cancel_task。
§参数
task_ids: 要取消的任务 ID 列表
§返回
成功取消的任务数量
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..10)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(10), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量取消
let cancelled = service.cancel_batch(&task_ids);
println!("成功取消 {} 个任务", cancelled);Sourcepub fn postpone(
&self,
task_id: TaskId,
new_delay: Duration,
callback: Option<CallbackWrapper>,
) -> bool
pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool
推迟任务(替换回调)
§参数
task_id: 要推迟的任务 IDnew_delay: 新的延迟时间(从当前时间点重新计算)callback: 新的回调函数
§返回
true: 任务存在且成功推迟false: 任务不存在或推迟失败
§注意
- 推迟后任务 ID 保持不变
- 原有的超时通知仍然有效
- 回调函数会被替换为新的回调
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async {
println!("Original callback");
}));
let task = TimerService::create_task(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap();
// 推迟并替换回调
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
task_id,
Duration::from_secs(10),
new_callback
);
println!("推迟成功: {}", success);Sourcepub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
批量推迟任务(保持原回调)
§参数
updates: (任务ID, 新延迟) 的元组列表
§返回
成功推迟的任务数量
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(5), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量推迟(保持原回调)
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| (id, Duration::from_secs(10)))
.collect();
let postponed = service.postpone_batch(updates);
println!("成功推迟 {} 个任务", postponed);Sourcepub fn postpone_batch_with_callbacks(
&self,
updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
) -> usize
pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize
批量推迟任务(替换回调)
§参数
updates: (任务ID, 新延迟, 新回调) 的元组列表
§返回
成功推迟的任务数量
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 创建 3 个任务,初始没有回调
let delays: Vec<Duration> = (0..3)
.map(|_| Duration::from_secs(5))
.collect();
let tasks = TimerService::create_batch(delays);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// 批量推迟并添加新回调
let updates: Vec<_> = task_ids
.into_iter()
.enumerate()
.map(|(i, id)| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("New callback {}", i);
}));
(id, Duration::from_secs(10), callback)
})
.collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("成功推迟 {} 个任务", postponed);Sourcepub fn create_task(
delay: Duration,
callback: Option<CallbackWrapper>,
) -> TimerTask
pub fn create_task( delay: Duration, callback: Option<CallbackWrapper>, ) -> TimerTask
创建定时器任务(静态方法,申请阶段)
§参数
delay: 延迟时间callback: 实现了 TimerCallback trait 的回调对象
§返回
返回 TimerTask,需要通过 register() 注册
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 创建任务
let callback = Some(CallbackWrapper::new(|| async {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
println!("Created task: {:?}", task_id);
// 步骤 2: 注册任务
service.register(task).unwrap();Sourcepub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
pub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
批量创建定时器任务(静态方法,申请阶段,不带回调)
§参数
delays: 延迟时间列表
§返回
返回 TimerTask 列表,需要通过 register_batch() 注册
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 批量创建任务
let delays: Vec<Duration> = (0..3)
.map(|i| Duration::from_millis(100 * (i + 1)))
.collect();
let tasks = TimerService::create_batch(delays);
println!("Created {} tasks", tasks.len());
// 步骤 2: 批量注册任务
service.register_batch(tasks).unwrap();Sourcepub fn create_batch_with_callbacks(
callbacks: Vec<(Duration, Option<CallbackWrapper>)>,
) -> Vec<TimerTask>
pub fn create_batch_with_callbacks( callbacks: Vec<(Duration, Option<CallbackWrapper>)>, ) -> Vec<TimerTask>
批量创建定时器任务(静态方法,申请阶段,带回调)
§参数
callbacks: (延迟时间, 回调) 的元组列表
§返回
返回 TimerTask 列表,需要通过 register_batch() 注册
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// 步骤 1: 批量创建任务
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100 * (i + 1)), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
println!("Created {} tasks", tasks.len());
// 步骤 2: 批量注册任务
service.register_batch(tasks).unwrap();Sourcepub fn register(&self, task: TimerTask) -> Result<(), TimerError>
pub fn register(&self, task: TimerTask) -> Result<(), TimerError>
注册定时器任务到服务(注册阶段)
§参数
task: 通过create_task()创建的任务
§返回
Ok(()): 注册成功Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭)
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
service.register(task).unwrap();Sourcepub fn register_batch(&self, tasks: Vec<TimerTask>) -> Result<(), TimerError>
pub fn register_batch(&self, tasks: Vec<TimerTask>) -> Result<(), TimerError>
批量注册定时器任务到服务(注册阶段)
§参数
tasks: 通过create_batch()创建的任务列表
§返回
Ok(()): 注册成功Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭)
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(1), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
service.register_batch(tasks).unwrap();Trait Implementations§
Auto Trait Implementations§
impl Freeze for TimerService
impl !RefUnwindSafe for TimerService
impl Send for TimerService
impl Sync for TimerService
impl Unpin for TimerService
impl !UnwindSafe for TimerService
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more