timer-deque-rs 0.8.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
/*-
 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
 *  functionality.
 * 
 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
 *  4neko.org alex@4neko.org
 * 
 * The timer-rs crate can be redistributed and/or modified
 * under the terms of either of the following licenses:
 *
 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
 *                     
 *   2. The MIT License (MIT)
 *                     
 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
 */

use std::
{
    collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
};

use nix::
{
    errno::Errno, poll::PollTimeout, sys::
    {
        epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}, 
        eventfd::{EfdFlags, EventFd}
    }
};

use crate::
{
    error::{TimerError, TimerErrorType, TimerResult}, 
    map_timer_err, 
    timer_err, 
    timer_portable::
    {
        AsTimerId, FdTimerMarker, TimerFd, TimerId, poll::{ PollEventType, PollInterrupt, TimerPollOps}, timer::FdTimerRead
    }
};



/// A [Epoll] based event monitor.
#[derive(Debug)]
pub struct TimerEventWatch
{
    /// A event notification facility.
    epoll: Epoll, 

    /// A waker which interrupts the `poll`.
    wakeup_event: Arc<EventFd>,

    /// A poll guard which prevents the multiple call to poll.
    polling_flag: AtomicBool,

    /// All regestered timers.
    timers: RwLock<BTreeMap<TimerId, TimerFd>>,
}

impl fmt::Display for TimerEventWatch
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(), 
            self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
        ) //self.epoll_fds.load(Ordering::Acquire))
    }
}

impl Eq for TimerEventWatch {}

impl PartialEq for TimerEventWatch
{
    fn eq(&self, other: &Self) -> bool 
    {
        return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
    }
}

impl TimerPollOps for TimerEventWatch
{
    fn new() -> TimerResult<Self>
    {
        let epoll = 
            Epoll::new(EpollCreateFlags::empty())
                .map_err(|e|
                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
                )?;

        let wakeup_event = 
            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
                .map_err(|e|
                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
                )?;

         epoll
            .add(
                &wakeup_event, 
                EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
            )
            .map_err(|e|
                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
            )?;

        return Ok( 
            Self
            {
                epoll: epoll, 
                wakeup_event: Arc::new(wakeup_event),
                polling_flag: AtomicBool::new(false),
                timers: RwLock::new(BTreeMap::new())
            } 
        );
    }

    fn add(&self, timer: TimerFd) -> TimerResult<()>
    {
        // lock shared lock for exclusive access
        let mut timers_lock = 
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let false = 
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate", 
                timer)
        };

        // add to epoll
        self
            .epoll
            .add(
                &timer, 
                EpollEvent::new(EpollFlags::EPOLLIN, timer.as_timer_id().0 as u64)
            )
            .map_err(|e|
                map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
            )?;

        // store on the list
        timers_lock.insert(timer.as_timer_id(), timer);

        return Ok(());
    }
    
    fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
    { 
        let mut timers_lock = 
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let true = 
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate", 
                timer.as_fd().as_raw_fd())
        };

        if let Err(ern) = 
            self
                .epoll
                .delete(&timer)
        {
            if ern != Errno::ENOENT
            {
                timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll", 
                    timer.as_timer_id());
            }
        }

        let Some(_) = timers_lock.remove(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
        };

        return Ok(());
    }

    fn poll(&self, timeout: Option<i32>) ->  TimerResult<Option<Vec<PollEventType>>>
    {
        if self.polling_flag.swap(true, Ordering::SeqCst) == true
        {
            timer_err!(TimerErrorType::EPollAlreadyPolling, 
                "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
        }

        let mut events = vec![EpollEvent::empty(); self.get_count() + 1];

        let mut poll_funct = || 
            {               
                let poll_timeout =
                    timeout
                        .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
                        .map_err(|e|
                            map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)   
                        )?;
                
                let evs_res = 
                    self
                        .epoll
                        .wait(events.as_mut_slice(), poll_timeout)
                        .map_err(|e|
                            map_timer_err!(TimerErrorType::EPoll(e), "poll error")   
                        )?;

                return Ok(evs_res);
            };

        let poll_res: Result<usize, TimerError> = poll_funct();

        self.polling_flag.store(false, Ordering::SeqCst);
        
        // ignore EINTR
        if let Err(err) = poll_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
        {
            return Ok(None);
        }

        let evs = poll_res?;
        if evs == 0
        {
            return Ok(None);
        }
        else
        {
            // lock timers in read mode
            let timers_read = 
                self
                    .timers
                    .read()
                    .map_or_else(|e| e.into_inner(), |v| v);

            let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);

            for event 
            in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
            {
                if event.data() as RawFd == self.wakeup_event.as_raw_fd()
                {
                    let _ = self.wakeup_event.read();
                }
                else
                {
                    let Some(timer) = 
                        timers_read
                            .get(&(TimerId::from(event.data()))).map(|c| c)
                        else
                        {
                            // probably removed, just ignore
                            continue;
                        };
                    
                    match timer.read()
                    {
                        Ok(res) => 
                            poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
                        Err(e) => 
                            poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
                    }
                }
            }
           
            if poll_res.is_empty() == false
            {
                return Ok(Some(poll_res));
            }
            else
            {
                return Ok(None);
            }
        }
    }
    
    fn get_count(&self) -> usize 
    {
        //return self.epoll_fds.load(Ordering::SeqCst);
        return 
            self
                .timers
                .read()
                .map_or_else(|e| e.into_inner(), |v| v)
                .len();
    }

    fn get_poll_interruptor(&self) -> PollInterrupt
    {
        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
    }
    
    /// Interrupts poll genrally.
    fn interrupt_poll(&self) -> bool 
    {
        return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
    }
}