timer-deque-rs 0.8.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
/*-
 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
 *  functionality.
 * 
 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
 *  4neko.org alex@4neko.org
 * 
 * The timer-rs crate can be redistributed and/or modified
 * under the terms of either of the following licenses:
 *
 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
 *                     
 *   2. The MIT License (MIT)
 *                     
 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
 */

use std::
{
    collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{atomic::{AtomicBool, Ordering}, Arc, RwLock}
};

use nix::
{
    errno::Errno, libc::timespec, sys::{event::{EvFlags, EventFilter, FilterFlag, KEvent, Kqueue}, eventfd::EventFd}
};

use nix::sys::eventfd::EfdFlags;
use crate::
{
    error::{TimerErrorType, TimerResult}, 
    map_timer_err, 
    timer_err, 
    timer_portable::{poll::{PollInterrupt, TimerPollOps}, timer::FdTimerRead, AsTimerId, FdTimerMarker, PollEventType, TimerFd, TimerId}
};


/// A [KQueue] based event monitor.
#[derive(Debug)]
pub struct TimerEventWatch
{
    /// A event notification facility.
    epoll: Kqueue, 

    /// A waker which interrupts the `poll`.
    wakeup_event: Arc<EventFd>,

    /// A poll guard which prevents the multiple call to poll.
    polling_flag: AtomicBool,

    /// All regestered timers.
    timers: RwLock<BTreeMap<TimerId, TimerFd>>,
}

impl fmt::Display for TimerEventWatch
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
         write!(f, "fd:{}, cnt:{}", self.epoll.as_fd().as_raw_fd(), 
            self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
        ) //self.epoll_fds.load(Ordering::Acquire))
    }
}

impl Eq for TimerEventWatch{}

impl PartialEq for TimerEventWatch
{
    fn eq(&self, other: &Self) -> bool 
    {
        return self.epoll.as_fd().as_raw_fd() == other.epoll.as_fd().as_raw_fd();
    }
}

impl TimerPollOps for TimerEventWatch
{
    fn new() -> TimerResult<Self>
    {
        let kqueue = 
            Kqueue::new()
                .map_err(|e|
                    map_timer_err!(TimerErrorType::EPoll(e), "kqueue new() error")
                )?;

        let wakeup_event = 
            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
                .map_err(|e|
                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
                )?;

        let wakeup_kevent = 
            KEvent::new(wakeup_event.as_raw_fd() as usize, EventFilter::EVFILT_READ, 
                EvFlags::EV_ADD | EvFlags::EV_ENABLE, FilterFlag::empty(), 0, 0);

        kqueue
            .kevent(&[wakeup_kevent], &mut [], None)
            .map_err(|e|
                map_timer_err!(TimerErrorType::EPoll(e), "epoll kevent() error")
            )?;

        return Ok( 
            Self
            {
                epoll: kqueue, 
                wakeup_event: Arc::new(wakeup_event),
                polling_flag: AtomicBool::new(false),
                timers: RwLock::new(BTreeMap::new())
            } 
        );
    }

    fn add(&self, timer: TimerFd) -> TimerResult<()>
    {
        let mut timers_lock = 
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let false = 
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate", 
                timer)
        };

        // add to kqueue
        let timer_kevent = 
            KEvent::new(timer.as_fd().as_raw_fd() as usize, EventFilter::EVFILT_READ, 
                EvFlags::EV_ADD | EvFlags::EV_ENABLE, FilterFlag::empty(), 
                timer.as_fd().as_raw_fd() as isize, timer.as_timer_id().0 as isize);

        self
            .epoll
            .kevent(&[timer_kevent], &mut [], None)
            .map_err(|e|
                map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to kqueue", timer)
            )?;

        // store on the list
        timers_lock.insert(timer.as_timer_id(), timer);

        return Ok(());
    }
    
    fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
    { 
        let mut timers_lock = 
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let true = 
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate", 
                timer.as_timer_id())
        };

        // remove from the list
        let _ = timers_lock.remove(&timer.as_timer_id());

        let timer_kevent = 
            KEvent::new(timer.as_fd().as_raw_fd() as usize, EventFilter::EVFILT_READ, 
                EvFlags::EV_DELETE , FilterFlag::empty(), 0, 0);

        let res =
            self
                .epoll
                .kevent(&[timer_kevent], &mut [], None);
        
        if let Err(ern) = res && ern != Errno::ENOENT
        {
            timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} to kqueue", timer.as_fd().as_raw_fd());
        }

        return Ok(());
    }

    fn poll(&self, timeout: Option<i32>) ->  TimerResult<Option<Vec<PollEventType>>>
    {
        if self.polling_flag.swap(true, Ordering::SeqCst) == true
        {
            timer_err!(TimerErrorType::EPollAlreadyPolling, 
                "epoll fd: '{}' other thread already polling", self.epoll.as_fd().as_raw_fd());
        }

        let revent = 
            KEvent::new(0, EventFilter::EVFILT_READ, 
                EvFlags::empty(), FilterFlag::empty(), 0, 0);

        let mut events = vec![revent; self.get_count() + 1];
        
        let poll_timeout =
            timeout
                .map_or(
                    None, 
                    |f| 
                        {
                            let sec = f / 1000;
                            let nsec = f % 1000 * 100_000;

                            Some(timespec{ tv_sec: sec as i64, tv_nsec: nsec as i64})
                        }
                    );

        // reset previous events which happened before poll
        //let _ = self.cancel_event.read();
        
        let evs_res = 
            self
                .epoll
                .kevent(&[], events.as_mut_slice(), poll_timeout)
                .map_err(|e|
                    map_timer_err!(TimerErrorType::EPoll(e), "kevent error")   
                );

        //let mut poll_res = PollResult::None;

        self.polling_flag.store(false, Ordering::SeqCst);

        // ignore EINTR
        if let Err(err) = evs_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
        {
            return Ok(None);
        }
        
        let evs = evs_res?;
        if evs == 0
        {
            return Ok(None);
        }
        else
        {
            // lock timers in read mode
            let timers_read = 
                self
                    .timers
                    .read()
                    .map_or_else(|e| e.into_inner(), |v| v);

            let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
            //let amount = events[0].data() as usize;
            
            for event in events[..evs].iter()
            {
                if event.ident() as RawFd == self.wakeup_event.as_raw_fd()
                {
                    let _ = self.wakeup_event.read();
                }
                else
                {
                    let Some(timer) = 
                        timers_read
                            .get(&(TimerId::from(event.ident()))).map(|c| c)
                        else
                        {
                            // probably removed, just ignore
                            continue;
                        };
                    
                    match timer.read()
                    {
                        Ok(res) => 
                            poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
                        Err(e) => 
                            poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
                    }
                }
            }
           
            if poll_res.is_empty() == false
            {
                return Ok(Some(poll_res));
            }
            else
            {
                return Ok(None);
            }
        }
    }
    
    fn get_count(&self) -> usize 
    {
        return 
            self
                .timers
                .read()
                .map_or_else(|e| e.into_inner(), |v| v)
                .len();
    }

    fn get_poll_interruptor(&self) -> PollInterrupt
    {
        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
    }
    
    fn interrupt_poll(&self) -> bool 
    {
        return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
    }
}