TimerService

Struct TimerService 

Source
pub struct TimerService { /* private fields */ }
Expand description

TimerService - timer service based on Actor pattern Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.

§Features

  • Automatically listens to all added timer handles’ timeout events
  • Automatically removes one-shot tasks from internal management after timeout
  • Continuously monitors periodic tasks and forwards each invocation
  • Aggregates notifications (both one-shot and periodic) to be forwarded to the user’s unified channel
  • Supports dynamic addition of BatchHandle and TimerHandle

§定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户。

  • 自动监听所有添加的定时器句柄的超时事件
  • 自动在一次性任务超时后从内部管理中移除任务
  • 持续监听周期性任务并转发每次调用通知
  • 将通知(一次性和周期性)聚合转发给用户
  • 支持动态添加 BatchHandle 和 TimerHandle

§Examples (示例)

use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let timer = TimerWheel::with_defaults();
    let mut service = timer.create_service(ServiceConfig::default());
     
    // Register one-shot tasks (注册一次性任务)
    use kestrel_timer::TimerTask;
    let oneshot_tasks: Vec<_> = (0..3)
        .map(|i| {
            let callback = Some(CallbackWrapper::new(move || async move {
                println!("One-shot timer {} fired!", i);
            }));
            TimerTask::new_oneshot(Duration::from_millis(100), callback)
        })
        .collect();
    service.register_batch(oneshot_tasks).unwrap();
     
    // Register periodic tasks (注册周期性任务)
    let periodic_task = TimerTask::new_periodic(
        Duration::from_millis(100),
        Duration::from_millis(50),
        Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
        None
    );
    service.register(periodic_task).unwrap();
     
    // Receive notifications (接收通知)
    let rx = service.take_receiver().unwrap();
    while let Some(notification) = rx.recv().await {
        match notification {
            TaskNotification::OneShot(task_id) => {
                println!("One-shot task {:?} expired", task_id);
            }
            TaskNotification::Periodic(task_id) => {
                println!("Periodic task {:?} called", task_id);
            }
        }
    }
}

Implementations§

Source§

impl TimerService

Source

pub fn take_receiver(&mut self) -> Option<Receiver<TaskNotification>>

Get timeout receiver (transfer ownership)

§Returns

Timeout notification receiver, if already taken, returns None

§Notes

This method can only be called once, because it transfers ownership of the receiver The receiver will receive both one-shot task expired notifications and periodic task called notifications

获取超时通知接收器 (转移所有权)

§返回值

超时通知接收器,如果已经取走,返回 None

§注意

此方法只能调用一次,因为它转移了接收器的所有权 接收器将接收一次性任务过期通知和周期性任务被调用通知

§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
 
let rx = service.take_receiver().unwrap();
while let Some(notification) = rx.recv().await {
    match notification {
        TaskNotification::OneShot(task_id) => {
            println!("One-shot task {:?} expired", task_id);
        }
        TaskNotification::Periodic(task_id) => {
            println!("Periodic task {:?} called", task_id);
        }
    }
}
Source

pub fn cancel_task(&self, task_id: TaskId) -> bool

Cancel specified task

§Parameters
  • task_id: Task ID to cancel
§Returns
  • true: Task exists and cancellation is successful
  • false: Task does not exist or cancellation fails

取消指定任务

§参数
  • task_id: 任务 ID
§返回值
  • true: 任务存在且取消成功
  • false: 任务不存在或取消失败
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// Use two-step API to schedule timers
let callback = Some(CallbackWrapper::new(|| async move {
    println!("Timer fired!"); // 定时器触发
}));
let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
 
// Cancel task
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled); // 任务取消
Source

pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize

Batch cancel tasks

Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.

§Parameters
  • task_ids: List of task IDs to cancel
§Returns

Number of successfully cancelled tasks

批量取消任务

§参数
  • task_ids: 任务 ID 列表
§返回值

成功取消的任务数量

§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let tasks: Vec<_> = (0..10)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i); // 定时器触发
        }));
        TimerTask::new_oneshot(Duration::from_secs(10), callback)
    })
    .collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap(); // 注册定时器
 
// Batch cancel
let cancelled = service.cancel_batch(&task_ids);
println!("Cancelled {} tasks", cancelled); // 任务取消
Source

pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool

Postpone task (replace callback)

§Parameters
  • task_id: Task ID to postpone
  • new_delay: New delay time (recalculated from current time point)
  • callback: New callback function
§Returns
  • true: Task exists and is successfully postponed
  • false: Task does not exist or postponement fails
§Notes
  • Task ID remains unchanged after postponement
  • Original timeout notification remains valid
  • Callback function will be replaced with new callback

推迟任务 (替换回调)

§参数
  • task_id: 任务 ID
  • new_delay: 新的延迟时间 (从当前时间点重新计算)
  • callback: 新的回调函数
§返回值
  • true: 任务存在且延期成功
  • false: 任务不存在或延期失败
§注意
  • 任务 ID 在延期后保持不变
  • 原始超时通知保持有效
  • 回调函数将被新的回调函数替换
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callback = Some(CallbackWrapper::new(|| async {
    println!("Original callback"); // 原始回调
}));
let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap(); // 注册定时器
 
// Postpone and replace callback (延期并替换回调)
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
    task_id,
    Duration::from_secs(10),
    new_callback
);
println!("Postponed successfully: {}", success);
Source

pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize

Batch postpone tasks (keep original callbacks)

§Parameters
  • updates: List of tuples of (task ID, new delay)
§Returns

Number of successfully postponed tasks

批量延期任务 (保持原始回调)

§参数
  • updates: (任务 ID, 新延迟) 元组列表
§返回值

成功延期的任务数量

§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let tasks: Vec<_> = (0..3)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        TimerTask::new_oneshot(Duration::from_secs(5), callback)
    })
    .collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
 
// Batch postpone (keep original callbacks)
// 批量延期任务 (保持原始回调)
let updates: Vec<_> = task_ids
    .into_iter()
    .map(|id| (id, Duration::from_secs(10)))
    .collect();
let postponed = service.postpone_batch(updates);
println!("Postponed {} tasks", postponed);
Source

pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize

Batch postpone tasks (replace callbacks)

§Parameters
  • updates: List of tuples of (task ID, new delay, new callback)
§Returns

Number of successfully postponed tasks

批量延期任务 (替换回调)

§参数
  • updates: (任务 ID, 新延迟, 新回调) 元组列表
§返回值

成功延期的任务数量

§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// Create 3 tasks, initially no callbacks
// 创建 3 个任务,最初没有回调
let tasks: Vec<_> = (0..3)
    .map(|_| TimerTask::new_oneshot(Duration::from_secs(5), None))
    .collect();
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
 
// Batch postpone and add new callbacks
// 批量延期并添加新的回调
let updates: Vec<_> = task_ids
    .into_iter()
    .enumerate()
    .map(|(i, id)| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("New callback {}", i);
        }));
        (id, Duration::from_secs(10), callback)
    })
    .collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("Postponed {} tasks", postponed);
Source

pub fn register(&self, task: TimerTask) -> Result<TimerHandle, TimerError>

Register timer task to service (registration phase)

§Parameters
  • task: Task created via TimerTask::new_oneshot()
§Returns
  • Ok(TimerHandle): Register successfully
  • Err(TimerError::RegisterFailed): Register failed (internal channel is full or closed)

注册定时器任务到服务 (注册阶段)

§参数
  • task: 通过 TimerTask::new_oneshot() 创建的任务
§返回值
  • Ok(TimerHandle): 注册成功
  • Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// Step 1: create task
// 创建任务
let callback = Some(CallbackWrapper::new(|| async move {
    println!("Timer fired!");
}));
let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
let task_id = task.get_id();
 
// Step 2: register task
// 注册任务
service.register(task).unwrap();
Source

pub fn register_batch( &self, tasks: Vec<TimerTask>, ) -> Result<BatchHandle, TimerError>

Batch register timer tasks to service (registration phase)

§Parameters
  • tasks: List of tasks created via TimerTask::new_oneshot()
§Returns
  • Ok(BatchHandle): Register successfully
  • Err(TimerError::RegisterFailed): Register failed (internal channel is full or closed)

批量注册定时器任务到服务 (注册阶段)

§参数
  • tasks: 通过 TimerTask::new_oneshot() 创建的任务列表
§返回值
  • Ok(BatchHandle): 注册成功
  • Err(TimerError::RegisterFailed): 注册失败 (内部通道已满或关闭)
§Examples (示例)
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// Step 1: create batch of tasks with callbacks
// 创建批量任务,带有回调
let tasks: Vec<TimerTask> = (0..3)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        TimerTask::new_oneshot(Duration::from_secs(1), callback)
    })
    .collect();
 
// Step 2: register batch of tasks with callbacks
// 注册批量任务,带有回调
service.register_batch(tasks).unwrap();
Source

pub async fn shutdown(self)

Graceful shutdown of TimerService

优雅关闭 TimerService

§Examples (示例)
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
 
// Use service... (使用服务...)
 
service.shutdown().await;

Trait Implementations§

Source§

impl Drop for TimerService

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.