rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use tokio::net::{UnixListener, UnixStream};

use super::AcceptStopHandle;
use crate::async_rt;
use crate::codec::DefaultFramedIo as FramedIo;
use crate::endpoint::Endpoint;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

use futures::channel::oneshot;
use futures::{select, FutureExt};

use std::path::Path;

pub(crate) async fn connect(path: &Path) -> ZmqResult<(FramedIo, Endpoint)> {
    let raw_socket = UnixStream::connect(path).await?;
    let peer_addr = raw_socket.peer_addr()?;
    let peer_addr = peer_addr.as_pathname().map(|a| a.to_owned());

    Ok((FramedIo::from_unix(raw_socket), Endpoint::Ipc(peer_addr)))
}

pub(crate) async fn begin_accept<T>(
    path: &Path,
    cback: impl Fn(ZmqResult<(FramedIo, Endpoint)>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
    T: std::future::Future<Output = ()> + Send + 'static,
{
    let effective_path;
    let path = if path == Path::new("*") {
        effective_path = std::env::temp_dir().join(format!("zmq-{}.sock", uuid::Uuid::new_v4()));
        effective_path.as_path()
    } else {
        path
    };

    let listener = UnixListener::bind(path)?;

    let resolved_addr = listener.local_addr()?;
    let resolved_addr = resolved_addr.as_pathname().map(|a| a.to_owned());
    let listener_addr = resolved_addr.clone();
    let (stop_channel, stop_callback) = oneshot::channel::<()>();
    let task_handle = async_rt::task::spawn(async move {
        let mut stop_callback = stop_callback.fuse();
        loop {
            select! {
                incoming = listener.accept().fuse() => {
                    let maybe_accepted: Result<_, _> = incoming.map(|(raw_socket, peer_addr)| {
                        let peer_addr = peer_addr.as_pathname().map(|a| a.to_owned());
                        (FramedIo::from_unix(raw_socket), Endpoint::Ipc(peer_addr))
                    }).map_err(|err| err.into());
                    async_rt::task::spawn(cback(maybe_accepted));
                },
                _ = stop_callback => {
                    log::debug!("Accept task received stop signal. {:?}", listener_addr);
                    break
                }
            }
        }
        drop(listener);
        if let Some(listener_addr) = listener_addr {
            if let Err(err) = tokio::fs::remove_file(&listener_addr).await {
                log::warn!(
                    "Could not delete unix socket at {}: {}",
                    listener_addr.display(),
                    err
                );
            }
        }
        Ok(())
    });
    Ok((
        Endpoint::Ipc(resolved_addr),
        AcceptStopHandle::new(TaskHandle::new(stop_channel, task_handle)),
    ))
}