pub mod timer_consumer;
pub mod timer_tickets;
#[cfg(test)]
mod tests;
#[cfg(target_family = "unix")]
pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::
{
borrow::Cow,
collections::VecDeque,
fmt,
marker::PhantomData,
};
use crate::
{
TimerDequeConsumer,
TimerDequeTicket,
TimerDequeTicketIssuer,
error::{TimerErrorType, TimerResult},
map_timer_err,
timer_err,
timer_portable::
{
AsTimerId, FdTimerMarker, PollEventType, TimerFd, TimerFlags, TimerId, TimerType, UnixFd, portable_error::TimerPortResult, timer::
{
AbsoluteTime, FdTimerCom, FdTimerRead, RelativeTime, TimerExpMode,
TimerReadRes
}
}
};
pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
{
const IS_ONCE: bool;
fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>;
fn get_absolut_timeout(&self) -> AbsoluteTime;
fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>;
fn advance_timeout(&mut self)
{
return;
}
}
#[derive(Debug, Clone, Copy)]
pub struct DequeOnce
{
absolute_timeout: AbsoluteTime,
}
impl fmt::Display for DequeOnce
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.absolute_timeout)
}
}
impl Ord for DequeOnce
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering
{
return self.absolute_timeout.cmp(&other.absolute_timeout);
}
}
impl PartialOrd for DequeOnce
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
{
return Some(self.cmp(other));
}
}
impl Eq for DequeOnce {}
impl PartialEq for DequeOnce
{
fn eq(&self, other: &Self) -> bool
{
return self.absolute_timeout == other.absolute_timeout;
}
}
impl OrderedTimerDequeMode for DequeOnce
{
const IS_ONCE: bool = true;
fn get_absolut_timeout(&self) -> AbsoluteTime
{
return self.absolute_timeout;
}
fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
{
if cmp > self.absolute_timeout
{
timer_err!(TimerErrorType::Expired,
"deque once time already expired, now: {}, req: {}", cmp, self);
}
return Ok(());
}
fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
{
self.absolute_timeout = self.absolute_timeout + posp_time;
return Ok(());
}
}
impl DequeOnce
{
#[inline]
pub
fn new(absolute_timeout: impl Into<AbsoluteTime>) -> Self
{
return Self{ absolute_timeout: absolute_timeout.into() };
}
}
#[derive(Debug, Clone, Copy)]
pub struct DequePeriodic
{
relative_period: RelativeTime,
absolute_timeout: AbsoluteTime,
}
impl fmt::Display for DequePeriodic
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
}
}
impl Ord for DequePeriodic
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering
{
return self.absolute_timeout.cmp(&other.absolute_timeout);
}
}
impl PartialOrd for DequePeriodic
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
{
return Some(self.cmp(other));
}
}
impl Eq for DequePeriodic {}
impl PartialEq for DequePeriodic
{
fn eq(&self, other: &Self) -> bool
{
return self.absolute_timeout == other.absolute_timeout;
}
}
impl OrderedTimerDequeMode for DequePeriodic
{
const IS_ONCE: bool = false;
fn get_absolut_timeout(&self) -> AbsoluteTime
{
return self.absolute_timeout;
}
fn advance_timeout(&mut self)
{
self.absolute_timeout += self.relative_period;
}
fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
{
if cmp > self.absolute_timeout
{
timer_err!(TimerErrorType::Expired,
"deque periodic absolute time already expired, now: {}, req: {}", cmp, self.absolute_timeout);
}
else if self.relative_period.is_zero() == false
{
timer_err!(TimerErrorType::ZeroRelativeTime,
"deque periodic relative time is 0, rel_time: {}", self.relative_period);
}
return Ok(());
}
fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
{
self.absolute_timeout = self.absolute_timeout - self.relative_period + posp_time;
self.relative_period = posp_time;
return Ok(());
}
}
impl DequePeriodic
{
pub
fn new_from_now(rel_time: impl Into<RelativeTime>) -> Self
{
let inst =
Self
{
relative_period:
rel_time.into(),
absolute_timeout:
AbsoluteTime::now(),
};
return inst;
}
pub
fn new(abs_time: impl Into<AbsoluteTime>, rel_time: impl Into<RelativeTime>) -> Self
{
let inst =
Self
{
relative_period:
rel_time.into(),
absolute_timeout:
abs_time.into(),
};
return inst;
}
}
pub trait TimerTimeoutCollection<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug>
{
fn new() -> Self;
fn push(&mut self, item: ITEM);
fn into_option(self) -> Option<Self> where Self: Sized;
}
impl<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
for Vec<ITEM>
{
fn new() -> Self
{
return Vec::new();
}
fn push(&mut self, item: ITEM)
{
self.push(item);
}
fn into_option(self) -> Option<Self>
{
if self.is_empty() == false
{
return Some(self);
}
else
{
return None;
}
}
}
impl <ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
for ()
{
fn new() -> Self
{
return ();
}
fn push(&mut self, _item: ITEM)
{
return;
}
fn into_option(self) -> Option<Self>
{
return None;
}
}
pub trait OrderedTimerDequeHandle<MODE: OrderedTimerDequeMode>
: Ord + PartialOrd + PartialEq + PartialEq<Self::TimerId> + Eq + fmt::Debug +
fmt::Display + OrderedTimerDequeInterf<MODE>
{
type TimerId: PartialEq + Eq + fmt::Display + fmt::Debug;
type HandleRes: TimerTimeoutCollection<Self::TimerId>;
fn postpone(&mut self, postp_time: RelativeTime) -> TimerResult<()>;
fn resched(&mut self, time: MODE) -> TimerResult<()>;
fn handle(self, timer_self: &mut OrderTimerDeque<MODE, Self>,
timer_ids: &mut Self::HandleRes) -> TimerResult<()>
where Self: Sized;
fn is_same(&self, other: &Self::TimerId) -> bool;
fn into_timer_id(self) -> Option<Self::TimerId>;
}
pub trait OrderedTimerDequeInterf<MODE: OrderedTimerDequeMode>
{
fn get_timeout_absolute(&self) -> AbsoluteTime;
}
#[derive(Debug, Eq)]
pub struct OrderTimerDeque<MODE: OrderedTimerDequeMode, INTF: OrderedTimerDequeHandle<MODE>>
{
pub(crate) deque_timeout_list: VecDeque<INTF>,
pub(crate) timer: TimerFd,
p: PhantomData<MODE>
}
#[cfg(all(target_family = "unix", feature = "enable_mio_compat"))]
pub mod mio_compat
{
use std::{io, os::fd::{AsRawFd, RawFd}};
use mio::{Token, unix::SourceFd};
use crate::{TimerFdMioCompat, deque_timeout::{OrderTimerDeque, OrderedTimerDequeHandle, OrderedTimerDequeMode}};
impl<MODE, INTF> mio::event::Source for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()>
{
return
SourceFd(&self.as_raw_fd()).register(registry, token, interests);
}
fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()>
{
return
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests);
}
fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()>
{
return
SourceFd(&self.as_raw_fd()).deregister(registry)
}
}
impl<MODE, INTF> PartialEq<Token> for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn eq(&self, other: &Token) -> bool
{
return self.as_raw_fd() == other.0 as RawFd;
}
}
impl<MODE, INTF> TimerFdMioCompat for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn get_token(&self) -> Token
{
return Token(self.as_raw_fd() as usize);
}
}
}
impl<MODE, INTF> UnixFd for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
}
impl<MODE, INTF> AsTimerId for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_timer_id(&self) -> TimerId
{
return self.timer.as_timer_id();
}
}
impl<MODE, INTF> FdTimerRead for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
{
return self.timer.read();
}
}
impl<MODE, INTF> PartialEq<str> for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn eq(&self, other: &str) -> bool
{
return self.timer.as_ref() == other;
}
}
impl<MODE, INTF> FdTimerMarker for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn clone_timer(&self) -> TimerFd
{
return self.timer.clone_timer();
}
fn get_strong_count(&self) -> usize
{
return self.timer.get_strong_count();
}
}
impl<MODE, INTF> Drop for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn drop(&mut self)
{
let _ = self.clean_up_timer();
}
}
impl<MODE, INTF> AsRef<str> for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_ref(&self) -> &str
{
return self.timer.as_ref();
}
}
#[cfg(target_family = "unix")]
pub mod deque_timeout_os
{
use super::*;
impl<MODE, INTF> AsFd for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_fd(&self) -> BorrowedFd<'_>
{
return self.timer.as_fd();
}
}
impl<MODE, INTF> AsRawFd for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_raw_fd(&self) -> RawFd
{
return self.timer.as_raw_fd();
}
}
impl<MODE, INTF> PartialEq<RawFd> for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn eq(&self, other: &RawFd) -> bool
{
return self.timer == *other;
}
}
}
#[cfg(target_family = "windows")]
pub mod deque_timeout_os
{
use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, RawHandle};
use super::*;
impl<MODE, INTF> AsHandle for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_handle(&self) -> BorrowedHandle<'_>
{
return self.timer.as_handle();
}
}
impl<MODE, INTF> AsRawHandle for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn as_raw_handle(&self) -> RawHandle
{
return self.timer.as_raw_handle();
}
}
impl<MODE, INTF> PartialEq<RawHandle> for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn eq(&self, other: &RawHandle) -> bool
{
return self.timer == *other;
}
}
}
impl<MODE, INTF> fmt::Display for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
self.timer, self.timer.as_timer_id(), self.deque_timeout_list.len())
}
}
impl<MODE, INTF> PartialEq for OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
fn eq(&self, other: &Self) -> bool
{
return self.timer == other.timer;
}
}
impl<MODE, R> OrderTimerDeque<MODE, TimerDequeConsumer<R, MODE>>
where
MODE: OrderedTimerDequeMode,
R: PartialEq + Eq + fmt::Debug + fmt::Display + Send + Clone
{
pub
fn add(&mut self, item: R, mode: MODE) -> TimerResult<()>
{
let inst =
TimerDequeConsumer::<R, MODE>::new(item, mode)?;
let res = self.queue_item(inst);
return res;
}
}
impl<MODE> OrderTimerDeque<MODE, TimerDequeTicketIssuer<MODE>>
where
MODE: OrderedTimerDequeMode
{
pub
fn add(&mut self, mode: MODE) -> TimerResult<TimerDequeTicket>
{
let (inst, ticket) =
TimerDequeTicketIssuer::<MODE>::new(mode)?;
self.queue_item(inst)?;
return Ok(ticket);
}
}
impl<MODE, INTF> OrderTimerDeque<MODE, INTF>
where
MODE: OrderedTimerDequeMode,
INTF: OrderedTimerDequeHandle<MODE>
{
pub
fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool, non_blocking: bool) -> TimerResult<OrderTimerDeque<MODE, INTF>>
{
let deq_len =
if deq_len == 0
{
10
}
else
{
deq_len
};
let mut tf = TimerFlags::empty();
tf.set(TimerFlags::TFD_CLOEXEC, cloexec);
tf.set(TimerFlags::TFD_NONBLOCK, non_blocking);
let timer =
TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, tf)
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
)?;
return Ok(
Self
{
deque_timeout_list: VecDeque::with_capacity(deq_len),
timer: timer,
p: PhantomData,
}
);
}
pub(super)
fn queue_item(&mut self, inst: INTF) -> TimerResult<()>
{
if self.deque_timeout_list.len() == 0
{
let timer_stamp =
TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
self
.timer
.get_timer()
.set_time(timer_stamp)
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
)?;
self.deque_timeout_list.push_front(inst);
}
else
{
let front_timeout =
self.deque_timeout_list.front().unwrap().get_timeout_absolute();
let inst_timeout = inst.get_timeout_absolute();
if front_timeout >= inst_timeout
{
self.deque_timeout_list.push_front(inst);
self.reschedule_timer()?;
}
else
{
let back_banuntil =
self
.deque_timeout_list
.back()
.unwrap()
.get_timeout_absolute();
if back_banuntil <= inst_timeout
{
self.deque_timeout_list.push_back(inst);
}
else
{
let pos =
self
.deque_timeout_list
.binary_search_by( |se|
se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
)
.map_or_else(|e| e, |r| r);
self.deque_timeout_list.insert(pos, inst);
}
}
}
return Ok(());
}
pub
fn remove_from_queue(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF::TimerId>>
{
return
self
.remove_from_queue_int(item)
.map(|opt_intf|
{
let Some(intf) = opt_intf
else { return None };
return intf.into_timer_id();
}
);
}
pub(super)
fn remove_from_queue_int(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF>>
{
if self.deque_timeout_list.len() == 0
{
timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
}
else
{
if self.deque_timeout_list.len() == 1
{
let ret_ent = self.deque_timeout_list.pop_front().unwrap();
if &ret_ent != item
{
self.deque_timeout_list.push_front(ret_ent);
return Ok(None);
}
self.stop_timer()?;
return Ok(Some(ret_ent));
}
else
{
for (pos, q_item)
in self.deque_timeout_list.iter().enumerate()
{
if q_item == item
{
let ret_ent =
self.deque_timeout_list.remove(pos).unwrap();
if pos == 0
{
self.reschedule_timer()?;
}
return Ok(Some(ret_ent));
}
}
return Ok(None);
}
}
}
pub
fn wait_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
{
let res =
self
.timer
.read()
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
)?;
if let TimerReadRes::WouldBlock = res
{
return Ok(None);
}
return Ok(self.internal_handle_timer_event()?);
}
pub
fn handle_timer_event(&mut self, pet: PollEventType) -> TimerResult<Option<INTF::HandleRes>>
{
match pet
{
PollEventType::TimerRes(_, res) =>
{
if let TimerReadRes::WouldBlock = res
{
return Ok(None);
}
return self.internal_handle_timer_event();
},
PollEventType::SubError(_, err) =>
{
timer_err!(TimerErrorType::TimerError(err.get_errno()), "{}", err)
}
}
}
fn internal_handle_timer_event(&mut self) -> TimerResult<Option<INTF::HandleRes>>
{
let cur_timestamp = AbsoluteTime::now();
let mut timer_ids: INTF::HandleRes = INTF::HandleRes::new();
loop
{
let Some(front_entity) = self.deque_timeout_list.front()
else { break };
let time_until = front_entity.get_timeout_absolute();
if time_until <= cur_timestamp
{
let deq = self.deque_timeout_list.pop_front().unwrap();
deq.handle(self, &mut timer_ids)?;
}
else
{
break;
}
}
self.reschedule_timer()?;
return Ok(timer_ids.into_option());
}
pub async
fn async_poll_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
{
let res =
self
.timer
.get_timer()
.await
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
)?;
if let TimerReadRes::WouldBlock = res
{
return Ok(None);
}
return Ok(self.internal_handle_timer_event()?);
}
pub
fn timer_queue_len(&self) -> usize
{
return self.deque_timeout_list.len();
}
pub
fn postpone(&mut self, target: &INTF::TimerId, rel_time_off: RelativeTime) -> TimerResult<()>
{
let mut item =
self
.remove_from_queue_int(target)?
.ok_or(map_timer_err!(TimerErrorType::NotFound, "ticket: {} not found", target))?;
item.postpone(rel_time_off)?;
self.queue_item(item)?;
return Ok(());
}
pub
fn reschedule(&mut self, target: &INTF::TimerId, time: MODE) -> TimerResult<()>
{
let mut item =
self.remove_from_queue_int(target)?
.ok_or(map_timer_err!(TimerErrorType::NotFound, "{} not found", target))?;
item.resched(time)?;
self.queue_item(item)?;
return Ok(());
}
pub(super)
fn reschedule_timer(&mut self) -> TimerResult<()>
{
if let Some(front_entity) = self.deque_timeout_list.front()
{
let timer_exp =
TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
return
self
.timer
.get_timer()
.set_time(timer_exp)
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
);
}
else
{
return
self
.timer
.get_timer()
.unset_time()
.map_err(|e|
map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
);
}
}
pub
fn clean_up_timer(&mut self) -> TimerResult<()>
{
self
.timer
.get_timer()
.unset_time()
.map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
self.deque_timeout_list.clear();
return Ok(());
}
pub
fn stop_timer(&mut self) -> TimerResult<()>
{
return
self
.timer
.get_timer()
.unset_time()
.map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
}
}