timer-deque-rs 0.7.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
/*-
 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
 *  functionality.
 * 
 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
 *  4neko.org alex@4neko.org
 * 
 * The timer-rs crate can be redistributed and/or modified
 * under the terms of either of the following licenses:
 *
 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
 *                     
 *   2. The MIT License (MIT)
 *                     
 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
 */

use std::
{
    borrow::Cow, 
    fmt, 
    os::{
        fd::{AsFd, AsRawFd, BorrowedFd}, unix::prelude::RawFd
    }, 
    sync::TryLockResult, 
    task::Poll
};

use crate::timer_portable::{timer::{FdTimerRead, ModeTimeType}, TimerExpMode};

use crossbeam_utils::atomic::AtomicCell;
use nix::
{
    errno::Errno, 
    fcntl::{self, FcntlArg, OFlag}, 
    libc::timespec, 
    sys::event::{EvFlags, EventFilter, FilterFlag, KEvent, Kqueue}
};

use crate::
{
    map_portable_err, 
    portable_err, 
    timer_portable::
    {
        portable_error::TimerPortResult,  
        TimerFlags, 
        TimerType
    }, 
    FdTimerCom, 
    TimerReadRes
};

use super::kqueue_itimerspecs_specific::itimerspeckenent;



/// An async timer implementation based on the Kqueue for the BSD based
/// systems.
#[derive(Debug)]
pub struct TimerFdInternal
{
    /// A timer's label.
    label: Cow<'static, str>,

    /// A Kqueue FD.
    timer_fd: Kqueue,

    /// A timer flags to detect the fd mode
    timer_flags: AtomicCell<TimerFlags>,

    /// A interval after initial delay
    it_delay_int: AtomicCell<Option<(i64, FilterFlag)>>,
}

impl fmt::Display for TimerFdInternal
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        write!(f, "{}", self.timer_fd.as_fd().as_raw_fd())
    }
}

impl AsFd for TimerFdInternal
{
    fn as_fd(&self) -> BorrowedFd<'_> 
    {
        return self.timer_fd.as_fd();
    }
}

impl AsRawFd for TimerFdInternal
{
    fn as_raw_fd(&self) -> RawFd 
    {
        return self.timer_fd.as_fd().as_raw_fd();
    }
}

impl Eq for TimerFdInternal {}

impl PartialEq for TimerFdInternal 
{
    fn eq(&self, other: &Self) -> bool 
    {
        return self.timer_fd.as_fd().as_raw_fd() == other.timer_fd.as_fd().as_raw_fd();
    }
}

impl PartialEq<RawFd> for TimerFdInternal
{
    fn eq(&self, other: &RawFd) -> bool 
    {
        return self.timer_fd.as_fd().as_raw_fd() == *other;
    }
}

impl PartialEq<str> for TimerFdInternal
{
    fn eq(&self, other: &str) -> bool 
    {
        return self.label == other;
    }
}

impl AsRef<str> for TimerFdInternal
{
    fn as_ref(&self) -> &str 
    {
        return &self.label;
    }
}

impl Ord for TimerFdInternal
{
    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
    {
        return self.timer_fd.as_fd().as_raw_fd().cmp(&other.timer_fd.as_fd().as_raw_fd());
    }
}

impl PartialOrd for TimerFdInternal
{
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
    {
        return Some(self.cmp(other));
    }
}

impl FdTimerRead for TimerFdInternal
{
    fn read(&self) -> TimerPortResult<TimerReadRes<u64>> 
    {
        let timeout = 
            if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
            {
                Some(timespec{ tv_sec: 0, tv_nsec: 0 })
            }
            else
            {
                None
            };

        loop
        {
            let mut ev_list = 
            [
                KEvent::new(
                    self.timer_fd.as_fd().as_raw_fd() as usize, 
                    EventFilter::EVFILT_TIMER, 
                    EvFlags::empty(), 
                    FilterFlag::empty(), 
                    0, 
                    0
                )
            ];

            let ret = 
                self.timer_fd.kevent(&[], ev_list.as_mut_slice(), timeout);

            if let Ok(1) = ret
            {
                let overl = ev_list[0].data();
            
                // set interval after the delay
                if let Some((data, fflags)) = self.it_delay_int.swap(None)
                {
                    let ev = 
                        KEvent::new(
                            0, 
                            EventFilter::EVFILT_TIMER, 
                            EvFlags::EV_ADD | EvFlags::EV_ENABLE, 
                            fflags, 
                            data as isize, 
                            0
                        );

                    self
                        .timer_fd
                        .kevent(&[ev], &mut [], None)
                        .map_err(|e|
                            map_portable_err!(e, "timer: '{}' kevent() failed to set interval after delay", self)
                        )?;
                }

                return Ok(TimerReadRes::Ok(overl as u64));
            }
            else if let Ok(0) = ret
            {
                if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
                {
                    // would block
                    return Ok(TimerReadRes::WouldBlock);
                }
                else
                {
                    return Ok(TimerReadRes::Cancelled);
                }
            }
            else if let Err(Errno::EINTR) = ret
            {
                continue;
            }
            else if let Err(Errno::ECANCELED) = ret
            {
                return Ok(TimerReadRes::Cancelled);
            }
            else if let Err(e) = ret
            {
                portable_err!(e, "read timer overflow error for timer: '{}'", self.label)
            }
        }
    }
}

impl FdTimerCom for TimerFdInternal
{
    /// `timer_type` - is ignored, becuase it is not supported.
    fn new(label: Cow<'static, str>, _timer_type: TimerType, timer_flags: TimerFlags) -> TimerPortResult<Self>
    {
        let timer_fd = 
            Kqueue::new()
                .map_err(|e|
                    map_portable_err!(e, 
                        "KQueue timer: '{}' init failed!", label)
                )?;

        return Ok(
            Self
            { 
                label: 
                    label,
                timer_fd: 
                   timer_fd,
                timer_flags: 
                    AtomicCell::new(timer_flags),
                it_delay_int:
                    AtomicCell::new(None),
            }
        );
    }    

    fn set_time<TIMERTYPE: ModeTimeType>(&self, timer_exp: TimerExpMode<TIMERTYPE>) -> TimerPortResult<()>
    {
        // the `timer_exp` is in the [TimerExpMode] so there is no need to check if it is
        // valid

        let it_ent = itimerspeckenent::try_from(&timer_exp)?;

        self
            .timer_fd
            .kevent(&[it_ent.it_value], &mut [], None)
            .map_err(|e|
                map_portable_err!(e, "timer: '{}' set_time() kevent failed", self)
            )?;

        if it_ent.is_interv_with_delay() == true
        {
            // overwrites the delay value
            self.it_delay_int.store(it_ent.it_delay_int);
        }

        return Ok(());
    }

    fn unset_time(&self) -> TimerPortResult<()>
    {
        // reset interval value
        self.it_delay_int.store(None);

        let ev = 
            KEvent::new(
                0, 
                EventFilter::EVFILT_TIMER, 
                EvFlags::EV_DELETE, 
                FilterFlag::empty(), 
                0, 
                0
            );

        
        let Err(errn) = 
            self
                .timer_fd
                .kevent(&[ev], &mut [], None)
            else
            {
                return Ok(());
            };

        // already removed
        if errn != Errno::ENOENT
        {
            portable_err!(errn, "timer: '{}' set_time() kevent failed", self)
        }

        return Ok(());
    }

    fn set_nonblocking(&self, flag: bool) -> TimerPortResult<()>
    {
        let old_timer_flags = self.timer_flags.load();
        let mut timer_flags = old_timer_flags.clone();

        timer_flags.set(TimerFlags::TFD_NONBLOCK, flag);

        if self.timer_flags.swap(timer_flags) != old_timer_flags
        {
            map_portable_err!(Errno::EACCES, "changed from another thread");
        }

        return Ok(());
    }

    fn is_nonblocking(&self) -> TimerPortResult<bool>
    {
        let old_timer_flags = self.timer_flags.load();

        return Ok(old_timer_flags.intersects(TimerFlags::TFD_NONBLOCK));
    }
}


impl Future for &TimerFdInternal
{
    type Output = TimerPortResult<TimerReadRes<u64>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> 
    {
        if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
        {
            return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
        }

        let res = self.read();

        if let Ok(TimerReadRes::WouldBlock) = res
        {
            cx.waker().wake_by_ref();

            return Poll::Pending;
        }
        else
        {
            return Poll::Ready(res);
        } 
    }
}

impl Future for TimerFdInternal
{
    type Output = TimerPortResult<TimerReadRes<u64>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> 
    {
        if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
        {
            return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
        }

        let res = self.read();

        if let Ok(TimerReadRes::WouldBlock) = res
        {
            cx.waker().wake_by_ref();

            return Poll::Pending;
        }
        else
        {
            return Poll::Ready(res);
        } 
    }
}