TimerService

Struct TimerService 

Source
pub struct TimerService { /* private fields */ }
Expand description

TimerService - 基于 Actor 模式的定时器服务

管理多个定时器句柄,监听所有超时事件,并将 TaskId 聚合转发给用户。

§特性

  • 自动监听所有添加的定时器句柄的超时事件
  • 超时后自动从内部管理中移除该任务
  • 将超时的 TaskId 转发到统一的通道供用户接收
  • 支持动态添加 BatchHandle 和 TimerHandle

§示例

use kestrel_protocol_timer::{TimerWheel, TimerService, CallbackWrapper, ServiceConfig};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let timer = TimerWheel::with_defaults();
    let mut service = timer.create_service(ServiceConfig::default());
     
    // 使用两步式 API 通过 service 批量调度定时器
    let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
        .map(|i| {
            let callback = Some(CallbackWrapper::new(move || async move {
                println!("Timer {} fired!", i);
            }));
            (Duration::from_millis(100), callback)
        })
        .collect();
    let tasks = TimerService::create_batch_with_callbacks(callbacks);
    service.register_batch(tasks).unwrap();
     
    // 接收超时通知
    let mut rx = service.take_receiver().unwrap();
    while let Some(task_id) = rx.recv().await {
        println!("Task {:?} completed", task_id);
    }
}

Implementations§

Source§

impl TimerService

Source

pub fn take_receiver(&mut self) -> Option<Receiver<TaskId>>

获取超时接收器(转移所有权)

§返回

超时通知接收器,如果已经被取走则返回 None

§注意

此方法只能调用一次,因为它会转移接收器的所有权

§示例
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
 
let mut rx = service.take_receiver().unwrap();
while let Some(task_id) = rx.recv().await {
    println!("Task {:?} timed out", task_id);
}
Source

pub fn cancel_task(&self, task_id: TaskId) -> bool

取消指定的任务

§参数
  • task_id: 要取消的任务 ID
§返回
  • true: 任务存在且成功取消
  • false: 任务不存在或取消失败
§性能说明

此方法使用直接取消优化,不需要等待 Actor 处理,大幅降低延迟

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// 使用两步式 API 调度定时器
let callback = Some(CallbackWrapper::new(|| async move {
    println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_secs(10), callback);
let task_id = task.get_id();
service.register(task).unwrap();
 
// 取消任务
let cancelled = service.cancel_task(task_id);
println!("Task cancelled: {}", cancelled);
Source

pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize

批量取消任务

使用底层的批量取消操作一次性取消多个任务,性能优于循环调用 cancel_task。

§参数
  • task_ids: 要取消的任务 ID 列表
§返回

成功取消的任务数量

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..10)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        (Duration::from_secs(10), callback)
    })
    .collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
 
// 批量取消
let cancelled = service.cancel_batch(&task_ids);
println!("成功取消 {} 个任务", cancelled);
Source

pub fn postpone( &self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>, ) -> bool

推迟任务(替换回调)

§参数
  • task_id: 要推迟的任务 ID
  • new_delay: 新的延迟时间(从当前时间点重新计算)
  • callback: 新的回调函数
§返回
  • true: 任务存在且成功推迟
  • false: 任务不存在或推迟失败
§注意
  • 推迟后任务 ID 保持不变
  • 原有的超时通知仍然有效
  • 回调函数会被替换为新的回调
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callback = Some(CallbackWrapper::new(|| async {
    println!("Original callback");
}));
let task = TimerService::create_task(Duration::from_secs(5), callback);
let task_id = task.get_id();
service.register(task).unwrap();
 
// 推迟并替换回调
let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
let success = service.postpone(
    task_id,
    Duration::from_secs(10),
    new_callback
);
println!("推迟成功: {}", success);
Source

pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize

批量推迟任务(保持原回调)

§参数
  • updates: (任务ID, 新延迟) 的元组列表
§返回

成功推迟的任务数量

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        (Duration::from_secs(5), callback)
    })
    .collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
 
// 批量推迟(保持原回调)
let updates: Vec<_> = task_ids
    .into_iter()
    .map(|id| (id, Duration::from_secs(10)))
    .collect();
let postponed = service.postpone_batch(updates);
println!("成功推迟 {} 个任务", postponed);
Source

pub fn postpone_batch_with_callbacks( &self, updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>, ) -> usize

批量推迟任务(替换回调)

§参数
  • updates: (任务ID, 新延迟, 新回调) 的元组列表
§返回

成功推迟的任务数量

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// 创建 3 个任务,初始没有回调
let delays: Vec<Duration> = (0..3)
    .map(|_| Duration::from_secs(5))
    .collect();
let tasks = TimerService::create_batch(delays);
let task_ids: Vec<_> = tasks.iter().map(|t| t.get_id()).collect();
service.register_batch(tasks).unwrap();
 
// 批量推迟并添加新回调
let updates: Vec<_> = task_ids
    .into_iter()
    .enumerate()
    .map(|(i, id)| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("New callback {}", i);
        }));
        (id, Duration::from_secs(10), callback)
    })
    .collect();
let postponed = service.postpone_batch_with_callbacks(updates);
println!("成功推迟 {} 个任务", postponed);
Source

pub fn create_task( delay: Duration, callback: Option<CallbackWrapper>, ) -> TimerTask

创建定时器任务(静态方法,申请阶段)

§参数
  • delay: 延迟时间
  • callback: 实现了 TimerCallback trait 的回调对象
§返回

返回 TimerTask,需要通过 register() 注册

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// 步骤 1: 创建任务
let callback = Some(CallbackWrapper::new(|| async {
    println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
 
let task_id = task.get_id();
println!("Created task: {:?}", task_id);
 
// 步骤 2: 注册任务
service.register(task).unwrap();
Source

pub fn create_batch(delays: Vec<Duration>) -> Vec<TimerTask>

批量创建定时器任务(静态方法,申请阶段,不带回调)

§参数
  • delays: 延迟时间列表
§返回

返回 TimerTask 列表,需要通过 register_batch() 注册

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// 步骤 1: 批量创建任务
let delays: Vec<Duration> = (0..3)
    .map(|i| Duration::from_millis(100 * (i + 1)))
    .collect();
 
let tasks = TimerService::create_batch(delays);
println!("Created {} tasks", tasks.len());
 
// 步骤 2: 批量注册任务
service.register_batch(tasks).unwrap();
Source

pub fn create_batch_with_callbacks( callbacks: Vec<(Duration, Option<CallbackWrapper>)>, ) -> Vec<TimerTask>

批量创建定时器任务(静态方法,申请阶段,带回调)

§参数
  • callbacks: (延迟时间, 回调) 的元组列表
§返回

返回 TimerTask 列表,需要通过 register_batch() 注册

§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
// 步骤 1: 批量创建任务
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        (Duration::from_millis(100 * (i + 1)), callback)
    })
    .collect();
 
let tasks = TimerService::create_batch_with_callbacks(callbacks);
println!("Created {} tasks", tasks.len());
 
// 步骤 2: 批量注册任务
service.register_batch(tasks).unwrap();
Source

pub fn register(&self, task: TimerTask) -> Result<(), TimerError>

注册定时器任务到服务(注册阶段)

§参数
  • task: 通过 create_task() 创建的任务
§返回
  • Ok(()): 注册成功
  • Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭)
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callback = Some(CallbackWrapper::new(|| async move {
    println!("Timer fired!");
}));
let task = TimerService::create_task(Duration::from_millis(100), callback);
let task_id = task.get_id();
 
service.register(task).unwrap();
Source

pub fn register_batch(&self, tasks: Vec<TimerTask>) -> Result<(), TimerError>

批量注册定时器任务到服务(注册阶段)

§参数
  • tasks: 通过 create_batch() 创建的任务列表
§返回
  • Ok(()): 注册成功
  • Err(TimerError::RegisterFailed): 注册失败(内部通道已满或已关闭)
§示例
let timer = TimerWheel::with_defaults();
let service = timer.create_service(ServiceConfig::default());
 
let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..3)
    .map(|i| {
        let callback = Some(CallbackWrapper::new(move || async move {
            println!("Timer {} fired!", i);
        }));
        (Duration::from_secs(1), callback)
    })
    .collect();
let tasks = TimerService::create_batch_with_callbacks(callbacks);
 
service.register_batch(tasks).unwrap();
Source

pub async fn shutdown(self)

优雅关闭 TimerService

§示例
let timer = TimerWheel::with_defaults();
let mut service = timer.create_service(ServiceConfig::default());
 
// 使用 service...
 
service.shutdown().await;

Trait Implementations§

Source§

impl Drop for TimerService

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.