use crate::{
sealed,
socket::{Socket, SocketType},
};
pub type ChannelSocket = Socket<Channel>;
pub struct Channel {}
impl sealed::SenderFlag for Channel {}
impl sealed::ReceiverFlag for Channel {}
impl sealed::SocketType for Channel {
fn raw_socket_type() -> SocketType {
SocketType::Channel
}
}
unsafe impl Sync for Socket<Channel> {}
unsafe impl Send for Socket<Channel> {}
impl Socket<Channel> {}
#[cfg(test)]
mod channel_tests {
use super::ChannelSocket;
use crate::prelude::{Context, Receiver, RecvFlags, SendFlags, Sender, ZmqResult};
#[test]
fn channel_channel() -> ZmqResult<()> {
let endpoint = "inproc://channel_channel";
let context = Context::new()?;
let channel_server = ChannelSocket::from_context(&context)?;
channel_server.bind(endpoint)?;
std::thread::spawn(move || {
let msg = channel_server.recv_msg(RecvFlags::empty()).unwrap();
assert_eq!(msg.to_string(), "Hello");
channel_server
.send_msg("World", SendFlags::empty())
.unwrap();
});
let channel_client = ChannelSocket::from_context(&context)?;
channel_client.connect(endpoint)?;
channel_client
.send_msg("Hello", SendFlags::empty())
.unwrap();
let msg = channel_client.recv_msg(RecvFlags::empty()).unwrap();
assert_eq!(msg.to_string(), "World");
Ok(())
}
#[cfg(feature = "futures")]
#[test]
fn channel_channel_async() -> ZmqResult<()> {
let endpoint = "inproc://channel_channel";
let context = Context::new()?;
let channel_server = ChannelSocket::from_context(&context)?;
channel_server.bind(endpoint)?;
std::thread::spawn(move || {
let msg = channel_server.recv_msg(RecvFlags::empty()).unwrap();
assert_eq!(msg.to_string(), "Hello");
channel_server
.send_msg("World", SendFlags::empty())
.unwrap();
});
let channel_client = ChannelSocket::from_context(&context)?;
channel_client.connect(endpoint)?;
futures::executor::block_on(async {
channel_client
.send_msg_async("Hello", SendFlags::empty())
.await;
loop {
if let Some(msg) = channel_client.recv_msg_async().await {
assert_eq!(msg.to_string(), "World");
break;
}
}
Ok(())
})
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use crate::socket::SocketBuilder;
pub type ChannelBuilder = SocketBuilder;
#[cfg(test)]
mod channel_builder_tests {
use super::ChannelBuilder;
use crate::{
auth::ZapDomain,
prelude::{ChannelSocket, Context, ZmqResult},
security::SecurityMechanism,
};
#[test]
fn builder_from_default() -> ZmqResult<()> {
let context = Context::new()?;
let socket: ChannelSocket = ChannelBuilder::default().build_from_context(&context)?;
assert_eq!(socket.connect_timeout()?, 0);
assert_eq!(socket.handshake_interval()?, 30_000);
assert_eq!(socket.heartbeat_interval()?, 0);
assert_eq!(socket.heartbeat_timeout()?, -1);
assert_eq!(socket.heartbeat_timetolive()?, 0);
assert!(!socket.immediate()?);
assert!(!socket.ipv6()?);
assert_eq!(socket.linger()?, -1);
assert_eq!(socket.max_message_size()?, -1);
assert_eq!(socket.receive_buffer()?, -1);
assert_eq!(socket.receive_highwater_mark()?, 1_000);
assert_eq!(socket.receive_timeout()?, -1);
assert_eq!(socket.reconnect_interval()?, 100);
assert_eq!(socket.reconnect_interval_max()?, 0);
assert_eq!(socket.send_buffer()?, -1);
assert_eq!(socket.send_highwater_mark()?, 1_000);
assert_eq!(socket.send_timeout()?, -1);
assert_eq!(socket.zap_domain()?, ZapDomain::new("".into()));
assert_eq!(socket.security_mechanism()?, SecurityMechanism::Null);
Ok(())
}
}
}