mio 0.7.11

Lightweight non-blocking IO
Documentation
use std::ops::{Deref, DerefMut};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{fmt, io};

#[cfg(any(unix, debug_assertions))]
use crate::poll;
use crate::sys::IoSourceState;
use crate::{event, Interest, Registry, Token};

/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
/// implementation.
///
/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
///
/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
/// Mio supports registering any FD or socket that can be registered with the
/// underlying OS selector. `IoSource` provides the necessary bridge.
///
/// [`RawFd`]: std::os::unix::io::RawFd
/// [`RawSocket`]: std::os::windows::io::RawSocket
///
/// # Notes
///
/// To handle the registrations and events properly **all** I/O operations (such
/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
/// internal state is updated accordingly.
///
/// [`Poll`]: crate::Poll
/// [`do_io`]: IoSource::do_io
/*
///
/// # Examples
///
/// Basic usage.
///
/// ```
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Interest, Poll, Token};
/// use mio::IoSource;
///
/// use std::net;
///
/// let poll = Poll::new()?;
///
/// // Bind a std TCP listener.
/// let listener = net::TcpListener::bind("127.0.0.1:0")?;
/// // Wrap it in the `IoSource` type.
/// let mut listener = IoSource::new(listener);
///
/// // Register the listener.
/// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
/// #     Ok(())
/// # }
/// ```
*/
pub struct IoSource<T> {
    state: IoSourceState,
    inner: T,
    #[cfg(debug_assertions)]
    selector_id: SelectorId,
}

impl<T> IoSource<T> {
    /// Create a new `IoSource`.
    pub fn new(io: T) -> IoSource<T> {
        IoSource {
            state: IoSourceState::new(),
            inner: io,
            #[cfg(debug_assertions)]
            selector_id: SelectorId::new(),
        }
    }

    /// Execute an I/O operations ensuring that the socket receives more events
    /// if it hits a [`WouldBlock`] error.
    ///
    /// # Notes
    ///
    /// This method is required to be called for **all** I/O operations to
    /// ensure the user will receive events once the socket is ready again after
    /// returning a [`WouldBlock`] error.
    ///
    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
    pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
    where
        F: FnOnce(&T) -> io::Result<R>,
    {
        self.state.do_io(f, &self.inner)
    }

    /// Returns the I/O source, dropping the state.
    ///
    /// # Notes
    ///
    /// To ensure no more events are to be received for this I/O source first
    /// [`deregister`] it.
    ///
    /// [`deregister`]: Registry::deregister
    pub fn into_inner(self) -> T {
        self.inner
    }
}

/// Be careful when using this method. All I/O operations that may block must go
/// through the [`do_io`] method.
///
/// [`do_io`]: IoSource::do_io
impl<T> Deref for IoSource<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

/// Be careful when using this method. All I/O operations that may block must go
/// through the [`do_io`] method.
///
/// [`do_io`]: IoSource::do_io
impl<T> DerefMut for IoSource<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

#[cfg(unix)]
impl<T> event::Source for IoSource<T>
where
    T: AsRawFd,
{
    fn register(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.associate(registry)?;
        poll::selector(registry).register(self.inner.as_raw_fd(), token, interests)
    }

    fn reregister(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.check_association(registry)?;
        poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests)
    }

    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.remove_association(registry)?;
        poll::selector(registry).deregister(self.inner.as_raw_fd())
    }
}

#[cfg(windows)]
impl<T> event::Source for IoSource<T>
where
    T: AsRawSocket,
{
    fn register(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.associate(registry)?;
        self.state
            .register(registry, token, interests, self.inner.as_raw_socket())
    }

    fn reregister(
        &mut self,
        registry: &Registry,
        token: Token,
        interests: Interest,
    ) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.check_association(registry)?;
        self.state.reregister(registry, token, interests)
    }

    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
        #[cfg(debug_assertions)]
        self.selector_id.remove_association(_registry)?;
        self.state.deregister()
    }
}

impl<T> fmt::Debug for IoSource<T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.inner.fmt(f)
    }
}

/// Used to associate an `IoSource` with a `sys::Selector`.
#[cfg(debug_assertions)]
#[derive(Debug)]
struct SelectorId {
    id: AtomicUsize,
}

#[cfg(debug_assertions)]
impl SelectorId {
    /// Value of `id` if `SelectorId` is not associated with any
    /// `sys::Selector`. Valid selector ids start at 1.
    const UNASSOCIATED: usize = 0;

    /// Create a new `SelectorId`.
    const fn new() -> SelectorId {
        SelectorId {
            id: AtomicUsize::new(Self::UNASSOCIATED),
        }
    }

    /// Associate an I/O source with `registry`, returning an error if its
    /// already registered.
    fn associate(&self, registry: &Registry) -> io::Result<()> {
        let registry_id = poll::selector(&registry).id();
        let previous_id = self.id.swap(registry_id, Ordering::AcqRel);

        if previous_id == Self::UNASSOCIATED {
            Ok(())
        } else {
            Err(io::Error::new(
                io::ErrorKind::AlreadyExists,
                "I/O source already registered with a `Registry`",
            ))
        }
    }

    /// Check the association of an I/O source with `registry`, returning an
    /// error if its registered with a different `Registry` or not registered at
    /// all.
    fn check_association(&self, registry: &Registry) -> io::Result<()> {
        let registry_id = poll::selector(&registry).id();
        let id = self.id.load(Ordering::Acquire);

        if id == registry_id {
            Ok(())
        } else if id == Self::UNASSOCIATED {
            Err(io::Error::new(
                io::ErrorKind::NotFound,
                "I/O source not registered with `Registry`",
            ))
        } else {
            Err(io::Error::new(
                io::ErrorKind::AlreadyExists,
                "I/O source already registered with a different `Registry`",
            ))
        }
    }

    /// Remove a previously made association from `registry`, returns an error
    /// if it was not previously associated with `registry`.
    fn remove_association(&self, registry: &Registry) -> io::Result<()> {
        let registry_id = poll::selector(&registry).id();
        let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);

        if previous_id == registry_id {
            Ok(())
        } else {
            Err(io::Error::new(
                io::ErrorKind::NotFound,
                "I/O source not registered with `Registry`",
            ))
        }
    }
}

#[cfg(debug_assertions)]
impl Clone for SelectorId {
    fn clone(&self) -> SelectorId {
        SelectorId {
            id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
        }
    }
}