#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use crate::codec::ZmqCodec;
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use crate::codec::{CodecError, Message};
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use futures::{Sink, Stream};
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use pin_project_lite::pin_project;
#[cfg(all(feature = "tokio", feature = "tcp"))]
use tokio::net::tcp::{OwnedReadHalf as TcpOwnedReadHalf, OwnedWriteHalf as TcpOwnedWriteHalf};
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
use tokio::net::unix::{OwnedReadHalf as UnixOwnedReadHalf, OwnedWriteHalf as UnixOwnedWriteHalf};
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
use tokio_util::codec::{FramedRead, FramedWrite};
#[cfg(all(feature = "tokio", feature = "tcp"))]
pub type TcpFramedRead = FramedRead<TcpOwnedReadHalf, ZmqCodec>;
#[cfg(all(feature = "tokio", feature = "tcp"))]
pub type TcpFramedWrite = FramedWrite<TcpOwnedWriteHalf, ZmqCodec>;
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
pub type IpcFramedRead = FramedRead<UnixOwnedReadHalf, ZmqCodec>;
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
pub type IpcFramedWrite = FramedWrite<UnixOwnedWriteHalf, ZmqCodec>;
#[cfg(all(
feature = "tokio",
feature = "tcp",
not(all(feature = "ipc", target_family = "unix"))
))]
pin_project! {
#[project = ZmqFramedReadProj]
pub(crate) enum ZmqFramedRead {
Tcp { #[pin] inner: TcpFramedRead },
}
}
#[cfg(all(
feature = "tokio",
feature = "tcp",
feature = "ipc",
target_family = "unix"
))]
pin_project! {
#[project = ZmqFramedReadProj]
pub(crate) enum ZmqFramedRead {
Tcp { #[pin] inner: TcpFramedRead },
Ipc { #[pin] inner: IpcFramedRead },
}
}
#[cfg(all(
feature = "tokio",
not(feature = "tcp"),
feature = "ipc",
target_family = "unix"
))]
pin_project! {
#[project = ZmqFramedReadProj]
pub(crate) enum ZmqFramedRead {
Ipc { #[pin] inner: IpcFramedRead },
}
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl Stream for ZmqFramedRead {
type Item = Result<Message, CodecError>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.project() {
#[cfg(feature = "tcp")]
ZmqFramedReadProj::Tcp { inner } => inner.poll_next(cx),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedReadProj::Ipc { inner } => inner.poll_next(cx),
}
}
}
#[cfg(all(
feature = "tokio",
feature = "tcp",
not(all(feature = "ipc", target_family = "unix"))
))]
pin_project! {
#[project = ZmqFramedWriteProj]
pub(crate) enum ZmqFramedWrite {
Tcp { #[pin] inner: TcpFramedWrite },
}
}
#[cfg(all(
feature = "tokio",
feature = "tcp",
feature = "ipc",
target_family = "unix"
))]
pin_project! {
#[project = ZmqFramedWriteProj]
pub(crate) enum ZmqFramedWrite {
Tcp { #[pin] inner: TcpFramedWrite },
Ipc { #[pin] inner: IpcFramedWrite },
}
}
#[cfg(all(
feature = "tokio",
not(feature = "tcp"),
feature = "ipc",
target_family = "unix"
))]
pin_project! {
#[project = ZmqFramedWriteProj]
pub(crate) enum ZmqFramedWrite {
Ipc { #[pin] inner: IpcFramedWrite },
}
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl ZmqFramedWrite {
pub(crate) fn into_engine_writer(self) -> crate::engine::writer::ZmqEngineWriteHalf {
match self {
#[cfg(feature = "tcp")]
ZmqFramedWrite::Tcp { inner } => crate::engine::writer::ZmqEngineWriteHalf::Tcp(
crate::engine::writer::EngineWriteHalf::new(inner.into_inner()),
),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedWrite::Ipc { inner } => crate::engine::writer::ZmqEngineWriteHalf::Ipc(
crate::engine::writer::EngineWriteHalf::new(inner.into_inner()),
),
}
}
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl Sink<Message> for ZmqFramedWrite {
type Error = CodecError;
fn poll_ready(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.project() {
#[cfg(feature = "tcp")]
ZmqFramedWriteProj::Tcp { inner } => inner.poll_ready(cx),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedWriteProj::Ipc { inner } => inner.poll_ready(cx),
}
}
fn start_send(self: std::pin::Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match self.project() {
#[cfg(feature = "tcp")]
ZmqFramedWriteProj::Tcp { inner } => inner.start_send(item),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedWriteProj::Ipc { inner } => inner.start_send(item),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.project() {
#[cfg(feature = "tcp")]
ZmqFramedWriteProj::Tcp { inner } => inner.poll_flush(cx),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedWriteProj::Ipc { inner } => inner.poll_flush(cx),
}
}
fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.project() {
#[cfg(feature = "tcp")]
ZmqFramedWriteProj::Tcp { inner } => inner.poll_close(cx),
#[cfg(all(feature = "ipc", target_family = "unix"))]
ZmqFramedWriteProj::Ipc { inner } => inner.poll_close(cx),
}
}
}
pub trait IntoEngineWriter {
type Writer: crate::io_compat::AsyncVectoredWrite + Send + 'static;
fn into_engine_writer(self) -> Self::Writer;
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl IntoEngineWriter for ZmqFramedWrite {
type Writer = crate::engine::writer::ZmqEngineWriteHalf;
fn into_engine_writer(self) -> Self::Writer {
ZmqFramedWrite::into_engine_writer(self)
}
}
pub struct FramedIo<R, W> {
pub read_half: R,
pub write_half: W,
#[cfg(feature = "curve")]
pub(crate) curve: Option<crate::mechanism::CurveSession>,
}
impl<R, W> FramedIo<R, W> {
#[cfg(feature = "curve")]
pub(crate) fn into_parts(self) -> (R, W, Option<crate::mechanism::CurveSession>) {
(self.read_half, self.write_half, self.curve)
}
#[cfg(not(feature = "curve"))]
pub(crate) fn into_parts(self) -> (R, W) {
(self.read_half, self.write_half)
}
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
impl FramedIo<ZmqFramedRead, ZmqFramedWrite> {
#[cfg(feature = "tcp")]
pub(crate) fn from_tcp(stream: tokio::net::TcpStream) -> Self {
let (read, write) = stream.into_split();
let read_half = tokio_util::codec::FramedRead::new(read, ZmqCodec::new());
let write_half = tokio_util::codec::FramedWrite::new(write, ZmqCodec::new());
Self {
read_half: ZmqFramedRead::Tcp { inner: read_half },
write_half: ZmqFramedWrite::Tcp { inner: write_half },
#[cfg(feature = "curve")]
curve: None,
}
}
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
pub(crate) fn from_unix(stream: tokio::net::UnixStream) -> Self {
let (read, write) = stream.into_split();
let read_half = tokio_util::codec::FramedRead::new(read, ZmqCodec::new());
let write_half = tokio_util::codec::FramedWrite::new(write, ZmqCodec::new());
Self {
read_half: ZmqFramedRead::Ipc { inner: read_half },
write_half: ZmqFramedWrite::Ipc { inner: write_half },
#[cfg(feature = "curve")]
curve: None,
}
}
}
#[cfg(all(
feature = "tokio",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
pub(crate) type TcpFramedIo = FramedIo<ZmqFramedRead, ZmqFramedWrite>;
#[cfg(all(feature = "smol", not(feature = "tokio"), feature = "tcp"))]
pub(crate) mod smol {
use super::{FramedIo, IntoEngineWriter};
use async_io::Async;
use asynchronous_codec::{FramedRead, FramedWrite};
use std::net::TcpStream as StdTcpStream;
use std::sync::Arc;
use crate::codec::ZmqCodec;
pub type SmolTcpFramedRead = FramedRead<smol::net::TcpStream, ZmqCodec>;
pub type SmolTcpFramedWrite = FramedWrite<smol::net::TcpStream, ZmqCodec>;
pub(crate) type SmolFramedIo = FramedIo<SmolTcpFramedRead, SmolTcpFramedWrite>;
impl FramedIo<SmolTcpFramedRead, SmolTcpFramedWrite> {
pub(crate) fn from_tcp(stream: smol::net::TcpStream) -> Self {
let read_half = FramedRead::new(stream.clone(), ZmqCodec::new());
let write_half = FramedWrite::new(stream, ZmqCodec::new());
Self {
read_half,
write_half,
#[cfg(feature = "curve")]
curve: None,
}
}
}
impl IntoEngineWriter for SmolTcpFramedWrite {
type Writer = crate::engine::writer::SmolEngineWriteHalf;
fn into_engine_writer(self) -> Self::Writer {
let stream = self.into_inner();
let arc: Arc<Async<StdTcpStream>> = stream.into();
crate::engine::writer::SmolEngineWriteHalf(crate::engine::writer::EngineWriteHalf::new(
arc,
))
}
}
}