use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use deferred_map::{DefaultKey, Key};
use lite_sync::oneshot::lite::{Receiver, Sender, State, channel};
use lite_sync::spsc::{self, TryRecvError};
const ONESHOT_PENDING: u8 = 0;
const ONESHOT_CALLED: u8 = 1;
const ONESHOT_CANCELLED: u8 = 2;
const ONESHOT_CLOSED: u8 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskCompletion {
Called,
Cancelled,
}
impl State for TaskCompletion {
#[inline]
fn to_u8(&self) -> u8 {
match self {
TaskCompletion::Called => ONESHOT_CALLED,
TaskCompletion::Cancelled => ONESHOT_CANCELLED,
}
}
#[inline]
fn from_u8(value: u8) -> Option<Self> {
match value {
ONESHOT_CALLED => Some(TaskCompletion::Called),
ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
_ => None,
}
}
#[inline]
fn pending_value() -> u8 {
ONESHOT_PENDING
}
#[inline]
fn closed_value() -> u8 {
ONESHOT_CLOSED
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(DefaultKey);
impl TaskId {
#[inline]
pub(crate) fn from_key(key: DefaultKey) -> Self {
TaskId(key)
}
#[inline]
pub(crate) fn key(&self) -> DefaultKey {
self.0
}
#[inline]
pub fn raw(&self) -> u64 {
self.0.raw()
}
}
pub struct TaskHandle {
handle: deferred_map::Handle,
}
impl TaskHandle {
#[inline]
pub(crate) fn new(handle: deferred_map::Handle) -> Self {
Self { handle }
}
#[inline]
pub fn task_id(&self) -> TaskId {
TaskId::from_key(self.handle.key())
}
#[inline]
pub(crate) fn into_handle(self) -> deferred_map::Handle {
self.handle
}
}
pub trait TimerCallback: Send + Sync + 'static {
fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
}
impl<F, Fut> TimerCallback for F
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(self())
}
}
#[derive(Clone)]
pub struct CallbackWrapper {
callback: Arc<dyn TimerCallback>,
}
impl CallbackWrapper {
#[inline]
pub fn new(callback: impl TimerCallback) -> Self {
Self {
callback: Arc::new(callback),
}
}
#[inline]
pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.callback.call()
}
}
#[derive(Clone)]
pub enum TaskType {
OneShot,
Periodic {
interval: std::time::Duration,
buffer_size: NonZeroUsize,
},
}
pub enum TaskTypeWithCompletionNotifier {
OneShot {
completion_notifier: Sender<TaskCompletion>,
},
Periodic {
interval: std::time::Duration,
completion_notifier: PeriodicCompletionNotifier,
},
}
impl TaskTypeWithCompletionNotifier {
#[inline]
pub fn get_interval(&self) -> Option<std::time::Duration> {
match self {
TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(*interval),
TaskTypeWithCompletionNotifier::OneShot { .. } => None,
}
}
}
pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
impl PeriodicCompletionReceiver {
#[inline]
pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
self.0.try_recv()
}
#[inline]
pub async fn recv(&mut self) -> Option<TaskCompletion> {
self.0.recv().await
}
}
pub enum CompletionNotifier {
OneShot(Sender<TaskCompletion>),
Periodic(PeriodicCompletionNotifier),
}
pub enum CompletionReceiver {
OneShot(Receiver<TaskCompletion>),
Periodic(PeriodicCompletionReceiver),
}
pub struct TimerTask {
pub(crate) task_type: TaskType,
pub(crate) delay: std::time::Duration,
pub(crate) callback: Option<CallbackWrapper>,
}
impl TimerTask {
#[inline]
pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
Self {
task_type: TaskType::OneShot,
delay,
callback,
}
}
#[inline]
pub fn new_periodic(
initial_delay: std::time::Duration,
interval: std::time::Duration,
callback: Option<CallbackWrapper>,
buffer_size: Option<NonZeroUsize>,
) -> Self {
Self {
task_type: TaskType::Periodic {
interval,
buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()),
},
delay: initial_delay,
callback,
}
}
#[inline]
pub fn get_task_type(&self) -> &TaskType {
&self.task_type
}
#[inline]
pub fn get_interval(&self) -> Option<std::time::Duration> {
match self.task_type {
TaskType::Periodic { interval, .. } => Some(interval),
TaskType::OneShot => None,
}
}
}
pub struct TimerTaskWithCompletionNotifier {
pub(crate) task_type: TaskTypeWithCompletionNotifier,
pub(crate) delay: std::time::Duration,
pub(crate) callback: Option<CallbackWrapper>,
}
impl TimerTaskWithCompletionNotifier {
pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
match task.task_type {
TaskType::OneShot => {
let (notifier, receiver) = channel();
(
Self {
task_type: TaskTypeWithCompletionNotifier::OneShot {
completion_notifier: notifier,
},
delay: task.delay,
callback: task.callback,
},
CompletionReceiver::OneShot(receiver),
)
}
TaskType::Periodic {
interval,
buffer_size,
} => {
let (tx, rx) = spsc::channel(buffer_size);
let notifier = PeriodicCompletionNotifier(tx);
let receiver = PeriodicCompletionReceiver(rx);
(
Self {
task_type: TaskTypeWithCompletionNotifier::Periodic {
interval,
completion_notifier: notifier,
},
delay: task.delay,
callback: task.callback,
},
CompletionReceiver::Periodic(receiver),
)
}
}
}
#[inline]
pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
self.task_type
}
#[inline]
pub fn get_interval(&self) -> Option<std::time::Duration> {
self.task_type.get_interval()
}
}
pub(crate) struct TimerTaskForWheel {
pub(crate) task_id: TaskId,
pub(crate) task: TimerTaskWithCompletionNotifier,
pub(crate) deadline_tick: u64,
pub(crate) rounds: u32,
}
impl TimerTaskForWheel {
#[inline]
pub(crate) fn new_with_id(
task_id: TaskId,
task: TimerTaskWithCompletionNotifier,
deadline_tick: u64,
rounds: u32,
) -> Self {
Self {
task_id,
task,
deadline_tick,
rounds,
}
}
#[inline]
pub fn get_id(&self) -> TaskId {
self.task_id
}
#[inline]
pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
self.task.into_task_type()
}
#[inline]
pub fn update_delay(&mut self, delay: std::time::Duration) {
self.task.delay = delay
}
#[inline]
pub fn update_callback(&mut self, callback: CallbackWrapper) {
self.task.callback = Some(callback)
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct TaskLocation {
pub slot_index: usize,
pub vec_index: usize,
pub level: u8,
}
impl TaskLocation {
#[inline(always)]
pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
Self {
slot_index,
vec_index,
level,
}
}
}