pub struct TimerService { /* private fields */ }Expand description
TimerService - timer service based on Actor pattern Manages multiple timer handles, listens to all timeout events, and aggregates TaskId to be forwarded to the user.
§Features
- Automatically listens to all added timer handles’ timeout events
- Automatically removes the task from internal management after timeout
- Aggregates TaskId to be forwarded to the user’s unified channel
- Supports dynamic addition of BatchHandle and TimerHandle
§定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将 TaskId 聚合转发给用户。
- 自动监听所有添加的定时器句柄的超时事件
- 自动在超时后从内部管理中移除任务
- 将 TaskId 聚合转发给用户
- 支持动态添加 BatchHandle 和 TimerHandle
§Examples (示例)
use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
// Use two-step API to batch schedule timers through service
// (使用两步 API 通过服务批量调度定时器)
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
service.register_batch(tasks).unwrap();
// Receive timeout notifications (接收超时通知)
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
// Receive timeout notification (接收超时通知)
println!("Task {:?} completed", task_id);
}
}Implementations§
Source§impl TimerService
impl TimerService
Sourcepub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
pub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>
Get timeout receiver (transfer ownership)
§Returns
Timeout notification receiver, if already taken, returns None
§Notes
This method can only be called once, because it transfers ownership of the receiver
获取超时通知接收器 (转移所有权)
§返回值
超时通知接收器,如果已经取走,返回 None
§注意
此方法只能调用一次,因为它转移了接收器的所有权
§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
println!("Task {:?} timed out", task_id);
}Sourcepub fn cancel_task(&self, task_id: TaskId) -> bool
pub fn cancel_task(&self, task_id: TaskId) -> bool
Cancel specified task
§Parameters
task_id: Task ID to cancel
§Returns
true: Task exists and cancellation is successfulfalse: Task does not exist or cancellation fails
取消指定任务
§参数
task_id: 任务 ID
§返回值
true: 任务存在且取消成功false: 任务不存在或取消失败
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Use two-step API to schedule timers
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!"); // 定时器触发
}));
let task = TimerService::create_task(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
// Cancel task
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled); // 任务取消Sourcepub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize
Batch cancel tasks
Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
§Parameters
task_ids: List of task IDs to cancel
§Returns
Number of successfully cancelled tasks
批量取消任务
§参数
task_ids: 任务 ID 列表
§返回值
成功取消的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..10)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i); // 定时器触发
}));
(Duration::from_secs(10), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap(); // 注册定时器
// Batch cancel
let cancelled = service.cancel_batch(&task_ids);
println!("Cancelled {} tasks", cancelled); // 任务取消Sourcepub fn postpone(
&self,
task_id: TaskId,
new_delay: Duration,
callback: Option<CallbackWrapper>,
) -> bool
pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool
Postpone task (replace callback)
§Parameters
task_id: Task ID to postponenew_delay: New delay time (recalculated from current time point)callback: New callback function
§Returns
true: Task exists and is successfully postponedfalse: Task does not exist or postponement fails
§Notes
- Task ID remains unchanged after postponement
- Original timeout notification remains valid
- Callback function will be replaced with new callback
推迟任务 (替换回调)
§参数
task_id: 任务 IDnew_delay: 新的延迟时间 (从当前时间点重新计算)callback: 新的回调函数
§返回值
true: 任务存在且延期成功false: 任务不存在或延期失败
§注意
- 任务 ID 在延期后保持不变
- 原始超时通知保持有效
- 回调函数将被新的回调函数替换
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callback = Some(CallbackWrapper::new(|| async {
println!("Original callback"); // 原始回调
}));
let task = TimerService::create_task(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
// Postpone and replace callback (延期并替换回调)
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
task_id,
Duration::from_secs(10),
new_callback
);
println!("Postponed successfully: {}", success);Sourcepub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize
Batch postpone tasks (keep original callbacks)
§Parameters
updates: List of tuples of (task ID, new delay)
§Returns
Number of successfully postponed tasks
批量延期任务 (保持原始回调)
§参数
updates: (任务 ID, 新延迟) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(5), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// Batch postpone (keep original callbacks)
// 批量延期任务 (保持原始回调)
let updates: Vec<_> = task_ids
.into_iter()
.map(|id| (id, Duration::from_secs(10)))
.collect();
let postponed = service.postpone_batch(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn postpone_batch_with_callbacks(
&self,
updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
) -> usize
pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize
Batch postpone tasks (replace callbacks)
§Parameters
updates: List of tuples of (task ID, new delay, new callback)
§Returns
Number of successfully postponed tasks
批量延期任务 (替换回调)
§参数
updates: (任务 ID, 新延迟, 新回调) 元组列表
§返回值
成功延期的任务数量
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Create 3 tasks, initially no callbacks
// 创建 3 个任务,最初没有回调
let delays: Vec<Duration> = (0..3)
.map(|_| Duration::from_secs(5))
.collect();
let tasks = TimerService::create_batch(delays);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
// Batch postpone and add new callbacks
// 批量延期并添加新的回调
let updates: Vec<_> = task_ids
.into_iter()
.enumerate()
.map(|(i, id)| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("New callback {}", i);
}));
(id, Duration::from_secs(10), callback)
})
.collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("Postponed {} tasks", postponed);Sourcepub fn create_task(
delay: Duration,
callback: Option<CallbackWrapper>,
) -> TimerTask
pub fn create_task( delay: Duration, callback: Option<CallbackWrapper>, ) -> TimerTask
Create timer task (static method, apply stage)
§Parameters
delay: Delay timecallback: Callback object implementing TimerCallback trait
§Returns
Return TimerTask, needs to be registered through register()
创建定时器任务 (静态方法,应用阶段)
§参数
delay: 延迟时间callback: 回调对象,实现 TimerCallback 特质
§返回值
返回 TimerTask,需要通过 register() 注册
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create task
// 创建任务
let callback = Some(CallbackWrapper::new(|| async {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
println!("Created task: {:?}", task_id);
// Step 2: register task
// 注册任务
service.register(task).unwrap();Sourcepub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
pub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>
Create batch of timer tasks (static method, apply stage, no callbacks)
§Parameters
delays: List of delay times
§Returns
Return TimerTask list, needs to be registered through register_batch()
创建定时器任务 (静态方法,应用阶段,没有回调)
§参数
delays: 延迟时间列表
§返回值
返回 TimerTask 列表,需要通过 register_batch() 注册
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create batch of tasks
// 创建批量任务
let delays: Vec<Duration> = (0..3)
.map(|i| Duration::from_millis(100 * (i + 1)))
.collect();
let tasks = TimerService::create_batch(delays);
println!("Created {} tasks", tasks.len());
// Step 2: register batch of tasks
// 注册批量任务
service.register_batch(tasks).unwrap();Sourcepub fn create_batch_with_callbacks(
callbacks: Vec<(Duration, Option<CallbackWrapper>)>,
) -> Vec<TimerTask>
pub fn create_batch_with_callbacks( callbacks: Vec<(Duration, Option<CallbackWrapper>)>, ) -> Vec<TimerTask>
Create batch of timer tasks (static method, apply stage, with callbacks)
§Parameters
callbacks: List of tuples of (delay time, callback)
§Returns
Return TimerTask list, needs to be registered through register_batch()
创建定时器任务 (静态方法,应用阶段,有回调)
§参数
callbacks: (延迟时间, 回调) 元组列表
§返回值
返回 TimerTask 列表,需要通过 register_batch() 注册
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create batch of tasks with callbacks
// 创建批量任务,带有回调
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_millis(100 * (i + 1)), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
println!("Created {} tasks", tasks.len());
// Step 2: register batch of tasks with callbacks
// 注册批量任务,带有回调
service.register_batch(tasks).unwrap();Sourcepub fn register(&self, task: TimerTask) -> Result<TimerHandle, TimerError>
pub fn register(&self, task: TimerTask) -> Result<TimerHandle, TimerError>
Register timer task to service (registration phase)
§Parameters
task: Task created viacreate_task()
§Returns
Ok(TimerHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)
注册定时器任务到服务 (注册阶段)
§参数
task: 通过create_task()创建的任务
§返回值
Ok(TimerHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create task
// 创建任务
let callback = Some(CallbackWrapper::new(|| async move {
println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
// Step 2: register task
// 注册任务
service.register(task).unwrap();Sourcepub fn register_batch(
&self,
tasks: Vec<TimerTask>,
) -> Result<BatchHandle, TimerError>
pub fn register_batch( &self, tasks: Vec<TimerTask>, ) -> Result<BatchHandle, TimerError>
Batch register timer tasks to service (registration phase)
§Parameters
tasks: List of tasks created viacreate_batch()
§Returns
Ok(BatchHandle): Register successfullyErr(TimerError::RegisterFailed): Register failed (internal channel is full or closed)
批量注册定时器任务到服务 (注册阶段)
§参数
tasks: 通过create_batch()创建的任务列表
§返回值
Ok(BatchHandle): 注册成功Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
// Step 1: create batch of tasks with callbacks
// 创建批量任务,带有回调
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
.map(|i| {
let callback = Some(CallbackWrapper::new(move || async move {
println!("Timer {} fired!", i);
}));
(Duration::from_secs(1), callback)
})
.collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
// Step 2: register batch of tasks with callbacks
// 注册批量任务,带有回调
service.register_batch(tasks).unwrap();