message-io 0.12.2

Event-driven message library to build network applications easy and fast
Documentation
mod resource_id;
mod endpoint;
mod poll;
mod registry;
mod driver;
mod remote_addr;
mod transport;
mod loader;

/// Module that specify the pattern to follow to create adapters.
/// This module is not part of the public API itself,
/// it must be used from the internals to build new adapters.
pub mod adapter;

// Reexports
pub use adapter::{SendStatus};
pub use resource_id::{ResourceId, ResourceType};
pub use endpoint::{Endpoint};
pub use remote_addr::{RemoteAddr, ToRemoteAddr};
pub use transport::{Transport};
pub use driver::{NetEvent};

use loader::{DriverLoader, ActionControllerList, EventProcessorList};
use poll::{Poll, PollEvent};

use strum::{IntoEnumIterator};

use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration};
use std::io::{self};

/// Create a network instance giving its controller and processor.
pub fn split() -> (NetworkController, NetworkProcessor) {
    let mut drivers = DriverLoader::default();
    Transport::iter().for_each(|transport| transport.mount_adapter(&mut drivers));

    let (poll, controllers, processors) = drivers.take();

    let network_controller = NetworkController::new(controllers);
    let network_processor = NetworkProcessor::new(poll, processors);

    (network_controller, network_processor)
}

/// Shareable instance in charge of control all the connections.
pub struct NetworkController {
    controllers: ActionControllerList,
}

impl NetworkController {
    fn new(controllers: ActionControllerList) -> NetworkController {
        Self { controllers }
    }

    /// Creates a connection to the specific address.
    /// The endpoint, an identifier of the new connection, will be returned.
    /// If the connection can not be performed (e.g. the address is not reached)
    /// the corresponding IO error is returned.
    /// This function blocks until the resource has been connected and is ready to use.
    pub fn connect(
        &self,
        transport: Transport,
        addr: impl ToRemoteAddr,
    ) -> io::Result<(Endpoint, SocketAddr)> {
        let addr = addr.to_remote_addr().unwrap();
        log::trace!("Connect to {} by adapter: {}", addr, transport.id());
        self.controllers[transport.id() as usize].connect(addr).map(|(endpoint, addr)| {
            log::trace!("Connected to {}", endpoint);
            (endpoint, addr)
        })
    }

    /// Listen messages from specified transport.
    /// The giver address will be used as interface and listening port.
    /// If the port can be opened, a [ResourceId] identifying the listener is returned
    /// along with the local address, or an error if not.
    /// The address is returned despite you passed as parameter because
    /// when a `0` port is specified, the OS will give choose the value.
    pub fn listen(
        &self,
        transport: Transport,
        addr: impl ToSocketAddrs,
    ) -> io::Result<(ResourceId, SocketAddr)> {
        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
        log::trace!("Listen by {} by adapter: {}", addr, transport.id());
        self.controllers[transport.id() as usize].listen(addr).map(|(resource_id, addr)| {
            log::trace!("Listening by {}", resource_id);
            (resource_id, addr)
        })
    }

    /// Remove a network resource.
    /// Returns `false` if the resource id doesn't exists.
    /// This is used to remove resources as connection or listeners.
    /// Resources of endpoints generated by listening in connection oriented transports
    /// can also be removed to close the connection.
    /// Removing an already connected connection implies a disconnection.
    /// Note that non-oriented connections as UDP use its listener resource to manage all
    /// remote endpoints internally, the remotes have not resource for themselfs.
    /// It means that all generated `Endpoint`s share the `ResourceId` of the listener and
    /// if you remove this resource you are removing the listener of all of them.
    /// For that cases there is no need to remove the resource because non-oriented connections
    /// have not connection itself to close, 'there is no spoon'.
    pub fn remove(&self, resource_id: ResourceId) -> bool {
        log::trace!("Remove {}", resource_id);
        let value = self.controllers[resource_id.adapter_id() as usize].remove(resource_id);
        log::trace!("Removed: {}", value);
        value
    }

    /// Send the data message thought the connection represented by the given endpoint.
    /// This function returns a [`SendStatus`] indicating the status of this send.
    /// There is no guarantee that send over a correct connection generates a [`SendStatus::Sent`]
    /// because any time a connection can be disconnected (even while you are sending).
    /// Except cases where you need to be sure that the message has been sent,
    /// you will want to process a [`NetEvent::Disconnected`] to determine if the connection +
    /// is *alive* instead of check if `send()` returned [`SendStatus::ResourceNotFound`].
    pub fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
        log::trace!("Send {} bytes to {}", data.len(), endpoint);
        let status =
            self.controllers[endpoint.resource_id().adapter_id() as usize].send(endpoint, data);
        log::trace!("Send status: {:?}", status);
        status
    }
}

/// Instance in charge of process input network events.
/// These events are offered to the user as a [`NetEvent`] its processing data.
pub struct NetworkProcessor {
    poll: Poll,
    processors: EventProcessorList,
}

impl NetworkProcessor {
    fn new(poll: Poll, processors: EventProcessorList) -> Self {
        Self { poll, processors }
    }

    /// Process the next poll event.
    /// This functions waits the timeout specified until the poll event is generated.
    /// If `None` is passed as timeout, it will wait indefinitely.
    /// Note that there is no 1-1 relation between an internal poll event and a [`NetEvent`].
    /// You need to assume that process an internal poll event could call 0 or N times to
    /// the callback with diferents `NetEvent`s.
    pub fn process_poll_event(
        &mut self,
        timeout: Option<Duration>,
        mut event_callback: impl FnMut(NetEvent<'_>),
    ) {
        let processors = &mut self.processors;
        self.poll.process_event(timeout, |poll_event| {
            match poll_event {
                PollEvent::Network(resource_id) => {
                    let adapter_id = resource_id.adapter_id() as usize;
                    processors[adapter_id].process(resource_id, &mut |net_event| {
                        log::trace!("Processed {:?}", net_event);
                        event_callback(net_event);
                    });
                }
                #[allow(dead_code)] //TODO: remove it with native event support
                PollEvent::Waker => todo!(),
            }
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::{Duration};

    lazy_static::lazy_static! {
        static ref TIMEOUT: Duration = Duration::from_millis(1000);
    }

    #[test]
    fn create_remove_listener() {
        let (controller, mut processor) = self::split();
        let (listener_id, _) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
        assert!(controller.remove(listener_id)); // Do not generate an event
        assert!(!controller.remove(listener_id));

        let mut was_event = false;
        processor.process_poll_event(Some(*TIMEOUT), |_| was_event = true);
        assert!(!was_event);
    }

    #[test]
    fn create_remove_listener_with_connection() {
        let (controller, mut processor) = self::split();
        let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
        controller.connect(Transport::Tcp, addr).unwrap();

        let mut was_event = false;
        processor.process_poll_event(Some(*TIMEOUT), |net_event| match net_event {
            NetEvent::Connected(_, _) => {
                assert!(controller.remove(listener_id));
                assert!(!controller.remove(listener_id));
                was_event = true;
            }
            _ => unreachable!(),
        });
        assert!(was_event);

        let mut was_event = false;
        processor.process_poll_event(Some(*TIMEOUT), |_| was_event = true);
        assert!(!was_event);
    }
}