rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! ZMTP framing on top of a concrete transport.
//!
//! `FramedIo<R, W>` is a generic paired read/write handle whose concrete types
//! are determined at the transport layer:
//!
//! - Tokio TCP: `TcpFramedIo` = `FramedIo<ZmqFramedRead, ZmqFramedWrite>`
//! - smol TCP:  `SmolFramedIo` = `FramedIo<SmolTcpFramedRead, SmolTcpFramedWrite>`
//!
//! The read half must implement `Stream<Item = Result<Message, CodecError>>`.
//! The write half must implement `Sink<Message, Error = CodecError>`.
//! After handoff to the engine the write half must also implement
//! `IntoEngineWriter` so it can produce a `VectoredWriter`.

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use crate::codec::ZmqCodec;
#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use crate::codec::{CodecError, Message};
#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use futures::{Sink, Stream};
#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use pin_project_lite::pin_project;
#[cfg(all(feature = "tokio", feature = "tcp"))]
use tokio::net::tcp::{OwnedReadHalf as TcpOwnedReadHalf, OwnedWriteHalf as TcpOwnedWriteHalf};
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
use tokio::net::unix::{OwnedReadHalf as UnixOwnedReadHalf, OwnedWriteHalf as UnixOwnedWriteHalf};
#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use tokio_util::codec::{FramedRead, FramedWrite};

// ── Type aliases for concrete tokio halves ────────────────────────────────────

#[cfg(all(feature = "tokio", feature = "tcp"))]
pub type TcpFramedRead = FramedRead<TcpOwnedReadHalf, ZmqCodec>;
#[cfg(all(feature = "tokio", feature = "tcp"))]
pub type TcpFramedWrite = FramedWrite<TcpOwnedWriteHalf, ZmqCodec>;

#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
pub type IpcFramedRead = FramedRead<UnixOwnedReadHalf, ZmqCodec>;
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
pub type IpcFramedWrite = FramedWrite<UnixOwnedWriteHalf, ZmqCodec>;

// ── ZmqFramedRead / ZmqFramedWrite enum (tokio transports) ───────────────────
//
// Two definitions: with IPC variant (ipc + unix) and without.
// pin_project! does not correctly handle #[cfg] on individual variants —
// it generates the Proj variant unconditionally but the field type then
// fails to resolve. Separate cfg blocks are the correct fix.

#[cfg(all(
    feature = "tokio",
    feature = "tcp",
    not(all(feature = "ipc", target_family = "unix"))
))]
pin_project! {
    #[project = ZmqFramedReadProj]
    pub(crate) enum ZmqFramedRead {
        Tcp { #[pin] inner: TcpFramedRead },
    }
}

#[cfg(all(
    feature = "tokio",
    feature = "tcp",
    feature = "ipc",
    target_family = "unix"
))]
pin_project! {
    #[project = ZmqFramedReadProj]
    pub(crate) enum ZmqFramedRead {
        Tcp { #[pin] inner: TcpFramedRead },
        Ipc { #[pin] inner: IpcFramedRead },
    }
}

#[cfg(all(
    feature = "tokio",
    not(feature = "tcp"),
    feature = "ipc",
    target_family = "unix"
))]
pin_project! {
    #[project = ZmqFramedReadProj]
    pub(crate) enum ZmqFramedRead {
        Ipc { #[pin] inner: IpcFramedRead },
    }
}

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl Stream for ZmqFramedRead {
    type Item = Result<Message, CodecError>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        match self.project() {
            #[cfg(feature = "tcp")]
            ZmqFramedReadProj::Tcp { inner } => inner.poll_next(cx),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedReadProj::Ipc { inner } => inner.poll_next(cx),
        }
    }
}

#[cfg(all(
    feature = "tokio",
    feature = "tcp",
    not(all(feature = "ipc", target_family = "unix"))
))]
pin_project! {
    #[project = ZmqFramedWriteProj]
    pub(crate) enum ZmqFramedWrite {
        Tcp { #[pin] inner: TcpFramedWrite },
    }
}

#[cfg(all(
    feature = "tokio",
    feature = "tcp",
    feature = "ipc",
    target_family = "unix"
))]
pin_project! {
    #[project = ZmqFramedWriteProj]
    pub(crate) enum ZmqFramedWrite {
        Tcp { #[pin] inner: TcpFramedWrite },
        Ipc { #[pin] inner: IpcFramedWrite },
    }
}

#[cfg(all(
    feature = "tokio",
    not(feature = "tcp"),
    feature = "ipc",
    target_family = "unix"
))]
pin_project! {
    #[project = ZmqFramedWriteProj]
    pub(crate) enum ZmqFramedWrite {
        Ipc { #[pin] inner: IpcFramedWrite },
    }
}

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl ZmqFramedWrite {
    pub(crate) fn into_engine_writer(self) -> crate::engine::writer::ZmqEngineWriteHalf {
        match self {
            #[cfg(feature = "tcp")]
            ZmqFramedWrite::Tcp { inner } => crate::engine::writer::ZmqEngineWriteHalf::Tcp(
                crate::engine::writer::EngineWriteHalf::new(inner.into_inner()),
            ),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedWrite::Ipc { inner } => crate::engine::writer::ZmqEngineWriteHalf::Ipc(
                crate::engine::writer::EngineWriteHalf::new(inner.into_inner()),
            ),
        }
    }
}

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl Sink<Message> for ZmqFramedWrite {
    type Error = CodecError;

    fn poll_ready(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        match self.project() {
            #[cfg(feature = "tcp")]
            ZmqFramedWriteProj::Tcp { inner } => inner.poll_ready(cx),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedWriteProj::Ipc { inner } => inner.poll_ready(cx),
        }
    }

    fn start_send(self: std::pin::Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
        match self.project() {
            #[cfg(feature = "tcp")]
            ZmqFramedWriteProj::Tcp { inner } => inner.start_send(item),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedWriteProj::Ipc { inner } => inner.start_send(item),
        }
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        match self.project() {
            #[cfg(feature = "tcp")]
            ZmqFramedWriteProj::Tcp { inner } => inner.poll_flush(cx),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedWriteProj::Ipc { inner } => inner.poll_flush(cx),
        }
    }

    fn poll_close(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        match self.project() {
            #[cfg(feature = "tcp")]
            ZmqFramedWriteProj::Tcp { inner } => inner.poll_close(cx),
            #[cfg(all(feature = "ipc", target_family = "unix"))]
            ZmqFramedWriteProj::Ipc { inner } => inner.poll_close(cx),
        }
    }
}

// ── `IntoEngineWriter` — write-half → vectored-writer conversion ──────────────

/// Marker trait: a framing write half that can hand itself to the engine's
/// vectored writer. Implemented by `ZmqFramedWrite` (tokio) and
/// `SmolTcpFramedWrite` (smol).
pub trait IntoEngineWriter {
    type Writer: crate::io_compat::AsyncVectoredWrite + Send + 'static;
    fn into_engine_writer(self) -> Self::Writer;
}

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl IntoEngineWriter for ZmqFramedWrite {
    type Writer = crate::engine::writer::ZmqEngineWriteHalf;
    fn into_engine_writer(self) -> Self::Writer {
        ZmqFramedWrite::into_engine_writer(self)
    }
}

// ── Generic FramedIo<R, W> ────────────────────────────────────────────────────

/// Generic paired read/write handle. `R` is the framing read half (a `Stream`),
/// `W` is the framing write half (a `Sink`). The concrete types are resolved at
/// the transport layer; the socket/engine layer operates entirely through the
/// `Stream`/`Sink` interfaces plus the `IntoEngineWriter` handoff.
pub struct FramedIo<R, W> {
    pub read_half: R,
    pub write_half: W,
    /// CURVE session from a completed handshake; `None` for NULL/PLAIN.
    #[cfg(feature = "curve")]
    pub(crate) curve: Option<crate::mechanism::CurveSession>,
}

impl<R, W> FramedIo<R, W> {
    #[cfg(feature = "curve")]
    pub(crate) fn into_parts(self) -> (R, W, Option<crate::mechanism::CurveSession>) {
        (self.read_half, self.write_half, self.curve)
    }

    #[cfg(not(feature = "curve"))]
    pub(crate) fn into_parts(self) -> (R, W) {
        (self.read_half, self.write_half)
    }
}

// ── Tokio concrete constructor ────────────────────────────────────────────────

#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl FramedIo<ZmqFramedRead, ZmqFramedWrite> {
    #[cfg(feature = "tcp")]
    pub(crate) fn from_tcp(stream: tokio::net::TcpStream) -> Self {
        let (read, write) = stream.into_split();
        let read_half = tokio_util::codec::FramedRead::new(read, ZmqCodec::new());
        let write_half = tokio_util::codec::FramedWrite::new(write, ZmqCodec::new());
        Self {
            read_half: ZmqFramedRead::Tcp { inner: read_half },
            write_half: ZmqFramedWrite::Tcp { inner: write_half },
            #[cfg(feature = "curve")]
            curve: None,
        }
    }

    #[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
    pub(crate) fn from_unix(stream: tokio::net::UnixStream) -> Self {
        let (read, write) = stream.into_split();
        let read_half = tokio_util::codec::FramedRead::new(read, ZmqCodec::new());
        let write_half = tokio_util::codec::FramedWrite::new(write, ZmqCodec::new());
        Self {
            read_half: ZmqFramedRead::Ipc { inner: read_half },
            write_half: ZmqFramedWrite::Ipc { inner: write_half },
            #[cfg(feature = "curve")]
            curve: None,
        }
    }
}

// ── Convenience type aliases ──────────────────────────────────────────────────

/// Default tokio `FramedIo` type (used throughout the socket/transport layer).
#[cfg(all(
    feature = "tokio",
    any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
pub(crate) type TcpFramedIo = FramedIo<ZmqFramedRead, ZmqFramedWrite>;

// ── smol FramedIo ─────────────────────────────────────────────────────────────

#[cfg(all(feature = "smol", not(feature = "tokio"), feature = "tcp"))]
pub(crate) mod smol {
    use super::{FramedIo, IntoEngineWriter};
    use async_io::Async;
    use asynchronous_codec::{FramedRead, FramedWrite};
    use std::net::TcpStream as StdTcpStream;
    use std::sync::Arc;

    use crate::codec::ZmqCodec;

    pub type SmolTcpFramedRead = FramedRead<smol::net::TcpStream, ZmqCodec>;
    pub type SmolTcpFramedWrite = FramedWrite<smol::net::TcpStream, ZmqCodec>;

    /// smol TCP `FramedIo`. `smol::net::TcpStream` is internally `Arc`-backed
    /// and `Clone`, so both halves share the same underlying socket.
    pub(crate) type SmolFramedIo = FramedIo<SmolTcpFramedRead, SmolTcpFramedWrite>;

    impl FramedIo<SmolTcpFramedRead, SmolTcpFramedWrite> {
        pub(crate) fn from_tcp(stream: smol::net::TcpStream) -> Self {
            let read_half = FramedRead::new(stream.clone(), ZmqCodec::new());
            let write_half = FramedWrite::new(stream, ZmqCodec::new());
            Self {
                read_half,
                write_half,
                #[cfg(feature = "curve")]
                curve: None,
            }
        }
    }

    impl IntoEngineWriter for SmolTcpFramedWrite {
        type Writer = crate::engine::writer::SmolEngineWriteHalf;

        fn into_engine_writer(self) -> Self::Writer {
            let stream = self.into_inner();
            let arc: Arc<Async<StdTcpStream>> = stream.into();
            crate::engine::writer::SmolEngineWriteHalf(crate::engine::writer::EngineWriteHalf::new(
                arc,
            ))
        }
    }
}