timer-deque-rs 0.8.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
/*-
 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
 *  functionality.
 * 
 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
 *  4neko.org alex@4neko.org
 * 
 * The timer-rs crate can be redistributed and/or modified
 * under the terms of either of the following licenses:
 *
 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
 *                     
 *   2. The MIT License (MIT)
 *                     
 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
 */

use std::
{
    fmt, 
    os::fd::{AsRawFd, RawFd}, 
    sync::
    {
        atomic::{AtomicBool, AtomicU32, Ordering}, 
        Arc
    }
};
use std::collections::BTreeMap;
use std::sync::RwLock;
use nix::{errno::Errno, libc::{self}, sys::eventfd::EventFd};
use nix::sys::eventfd::EfdFlags;
use crate::
{
    error::{TimerErrorType, TimerResult}, 
    map_timer_err, 
    timer_err, 
    timer_portable::
    {
        poll::{PollInterrupt, TimerPollOps}, AsTimerId, FdTimerMarker, PollEventType, TimerId
    }
};
use crate::timer_portable::timer::FdTimerRead;
use crate::timer_portable::TimerFd;

/// An `id` generator for the [TimerEventWatch].
static ID_GEN: AtomicU32 = AtomicU32::new(0);

/// A [Epoll] based event monitor.
/// 
/// * `0` - [Epoll] instance.
/// 
/// * `1` - [AtomicUsize] amount of timers added
#[derive(Debug)]
pub struct TimerEventWatch
{
    /// A FD replacement to identify the instance
    id: u32,

   /* /// A list of the pollfd for each timer
    poll_fd: ICoW<Vec<libc::pollfd>>,*/

    /// Wakeup evend fd
    wakeup_event: Arc<EventFd>,

    /// Is polling now
    polling_flag: AtomicBool,

    /// All regestered timers.
    timers: RwLock<BTreeMap<TimerId, TimerFd>>
}

impl fmt::Display for TimerEventWatch
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        write!(f, "fake_fd:{}, cnt:{}", self.id,
               self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
        )
    }
}

impl Eq for TimerEventWatch{}

impl PartialEq for TimerEventWatch
{
    fn eq(&self, other: &Self) -> bool 
    {
        return self.id == other.id;
    }
}


impl TimerPollOps for TimerEventWatch
{
    fn new() -> TimerResult<Self>
    {
        let wakeup_event =
            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
                .map_err(|e|
                             map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
                )?;

        let mut pollfds = 
            [
                libc::pollfd 
                {
                    fd: wakeup_event.as_raw_fd(),
                    events: libc::POLLIN,
                    revents: 0,
                },
            ]
            .to_vec();

        pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));

        return Ok( 
            Self
            {
                id: ID_GEN.fetch_add(1, Ordering::SeqCst),
                wakeup_event: Arc::new(wakeup_event),
                polling_flag: AtomicBool::new(false),
                timers: RwLock::new(BTreeMap::new()),
            } 
        );
    }

    fn add(&self, timer: TimerFd) -> TimerResult<()>
    {
        let mut timers_lock =
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let false =
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
                timer)
        };

        timers_lock.insert(timer.as_timer_id(), timer);

        /*let pollfd =
            libc::pollfd 
            {
                fd: timer.as_fd().as_raw_fd(),
                events: libc::POLLIN,
                revents: 0,
            };

        let mut vals = self.poll_fd.clone_exclusivly();

        let Err(pos) = vals.binary_search_by(|f| f.fd.cmp(&timer.as_fd().as_raw_fd()))
            else 
            { 
                timer_err!(TimerErrorType::Duplicate, 
                    "timer: {} already have been regestered withing instance", timer) 
            };

        vals.insert(pos, pollfd);
        vals.commit();

        self.epoll_fds.fetch_add(1, Ordering::SeqCst);*/

        return Ok(());
    }

    fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
    {
        let mut timers_lock =
            self
                .timers
                .write()
                .map_or_else(|e| e.into_inner(), |v| v);

        // check if timer was added previosly
        let true =
            timers_lock.contains_key(&timer.as_timer_id())
        else
        {
            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason does not exist",
                timer.as_timer_id())
        };

        // remove from the list
        let _ = timers_lock.remove(&timer.as_timer_id());

        return Ok(());
    }

    fn poll(&self, timeout: Option<i32>) ->  TimerResult<Option<Vec<PollEventType>>>
    {
        if self.polling_flag.swap(true, Ordering::SeqCst) == true
        {
            timer_err!(TimerErrorType::EPollAlreadyPolling, 
                "epoll fd: '{}' other thread already polling", self.id);
        }

        // create pollfd list
        let timers_lock =
            self
                .timers
                .read()
                .map_or_else(|e| e.into_inner(), |v| v);


        let mut pollfds: Vec<libc::pollfd> =
            timers_lock
                .iter()
                .map(|timer|
                    {
                        libc::pollfd
                        {
                            fd: timer.1.as_raw_fd(),
                            events: libc::POLLIN,
                            revents: 0,
                        }
                    }
                )
                .collect();

        drop(timers_lock);

        let wakeup_fd =
            libc::pollfd
            {
                fd: self.wakeup_event.as_raw_fd(),
                events: libc::POLLIN,
                revents: 0,
            };

        pollfds.insert(0, wakeup_fd);

        pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));

        let res = 
            unsafe 
            {
                libc::poll(
                    pollfds.as_mut_ptr().cast(),
                    pollfds.len() as libc::nfds_t,
                    timeout.map_or(-1, |f| f),
                )
            };

        //let mut poll_res = PollResult::None;
        
        self.polling_flag.store(false, Ordering::SeqCst);

        if res < 0
        {
            let errn = Errno::last();
            if errn != Errno::EINTR
            {
                timer_err!(TimerErrorType::EPoll(errn), "poll {} error", self.id);
            }
            else
            {
                return Ok(None);
            }
        } else if res == 0
        {
            return Ok(None);
        }
        else
        {
            let timers_read =
                self
                    .timers
                    .read()
                    .map_or_else(|e| e.into_inner(), |v| v);

            let mut poll_res: Vec<PollEventType> = Vec::with_capacity(res as usize);

            for event in pollfds.iter()
            {
                if event.revents & libc::POLLIN == libc::POLLIN
                {
                    if event.fd as RawFd == self.wakeup_event.as_raw_fd()
                    {
                        let _ = self.wakeup_event.read();

                        // no event
                    }
                    else
                    {
                        let Some(timer) =
                            timers_read
                                .get(&TimerId::from(event.fd)).map(|c| c)
                        else
                        {
                            // probably removed, just ignore
                            continue;
                        };

                        match timer.read()
                        {
                            Ok(res) =>
                                poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
                            Err(e) =>
                                poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
                        }
                    }
                }
                else if event.revents & (libc::POLLHUP | libc::POLLERR | libc::POLLRDHUP | libc::POLLNVAL) > 0
                {
                   // skip
                }
            }

            if poll_res.is_empty() == false
            {
                return Ok(Some(poll_res));
            }
            else
            {
                return Ok(None);
            }
        }
    }
    
    fn get_count(&self) -> usize 
    {
        return
            self
                .timers
                .read()
                .map_or_else(|e| e.into_inner(), |v| v)
                .len();
    }

    fn get_poll_interruptor(&self) -> PollInterrupt
    {
        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
    }
    
    fn interrupt_poll(&self) -> bool 
    {
        return self.wakeup_event.write(1).is_ok();
    }
}