use crate::{
ZmqResult, sealed,
socket::{Socket, SocketOption, SocketType},
};
pub type PeerSocket = Socket<Peer>;
pub struct Peer {}
impl sealed::SenderFlag for Peer {}
impl sealed::ReceiverFlag for Peer {}
impl sealed::SocketType for Peer {
fn raw_socket_type() -> SocketType {
SocketType::Peer
}
}
unsafe impl Sync for Socket<Peer> {}
unsafe impl Send for Socket<Peer> {}
impl Socket<Peer> {
pub fn connect_peer<V>(&self, endpoint: V) -> ZmqResult<u32>
where
V: AsRef<str>,
{
self.socket.connect_peer(endpoint.as_ref())
}
pub fn set_hiccup_message<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::HiccupMessage, value)
}
pub fn set_hello_message<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::HelloMessage, value)
}
pub fn set_disconnect_message<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::DisconnectMessage, value)
}
}
#[cfg(test)]
mod peer_test {
use super::PeerSocket;
use crate::{
ZmqError,
prelude::{Context, Message, Receiver, RecvFlags, SendFlags, Sender, ZmqResult},
};
#[test]
fn set_hello_message_set_hello_message() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PeerSocket::from_context(&context)?;
socket.set_hello_message("hello")?;
Ok(())
}
#[test]
fn set_hiccup_message_set_hiccup_message() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PeerSocket::from_context(&context)?;
socket.set_hiccup_message("hiccup")?;
Ok(())
}
#[test]
fn set_disconnect_message_set_disconnect_message() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PeerSocket::from_context(&context)?;
socket.set_disconnect_message("disconnect")?;
Ok(())
}
#[test]
fn peer_peer() -> ZmqResult<()> {
let endpoint = "inproc://peer-peer-test";
let context = Context::new()?;
let peer_server = PeerSocket::from_context(&context)?;
peer_server.bind(endpoint)?;
std::thread::spawn(move || {
let msg = peer_server.recv_msg(RecvFlags::empty()).unwrap();
assert_eq!(msg.to_string(), "Hello");
let reply: Message = "World".into();
reply.set_routing_id(msg.routing_id().unwrap()).unwrap();
peer_server.send_msg(reply, SendFlags::empty()).unwrap();
});
let peer_client = PeerSocket::from_context(&context)?;
let routing_id = peer_client.connect_peer(endpoint)?;
let msg: Message = "Hello".into();
msg.set_routing_id(routing_id)?;
peer_client.send_msg(msg, SendFlags::empty())?;
let msg = peer_client.recv_msg(RecvFlags::empty())?;
assert_eq!(msg.routing_id(), Some(routing_id));
assert_eq!(msg.to_string(), "World");
Ok(())
}
#[cfg(feature = "futures")]
#[test]
fn peer_peer_async() -> ZmqResult<()> {
let endpoint = "inproc://peer-peer-test";
let context = Context::new()?;
let peer_server = PeerSocket::from_context(&context)?;
peer_server.bind(endpoint)?;
std::thread::spawn(move || {
futures::executor::block_on(async {
loop {
if let Some(msg) = peer_server.recv_msg_async().await {
assert_eq!(msg.to_string(), "Hello");
let reply: Message = "World".into();
reply.set_routing_id(msg.routing_id().unwrap()).unwrap();
peer_server.send_msg_async(reply, SendFlags::empty()).await;
break;
}
}
})
});
let peer_client = PeerSocket::from_context(&context)?;
let routing_id = peer_client.connect_peer(endpoint)?;
let msg: Message = "Hello".into();
msg.set_routing_id(routing_id)?;
futures::executor::block_on(async {
peer_client.send_msg_async(msg, SendFlags::empty()).await;
loop {
if let Some(msg) = peer_client.recv_msg_async().await {
assert_eq!(msg.routing_id(), Some(routing_id));
assert_eq!(msg.to_string(), "World");
break;
};
}
Ok(())
})
}
#[test]
fn send_msg_with_no_routing_id_fails() -> ZmqResult<()> {
let endpoint = "inproc://peer-peer-test";
let context = Context::new()?;
let peer_server = PeerSocket::from_context(&context)?;
peer_server.bind(endpoint)?;
let peer_client = PeerSocket::from_context(&context)?;
peer_client.connect_peer(endpoint)?;
let result = peer_client.send_msg("asdf", SendFlags::empty());
assert!(result.is_err_and(|err| err == ZmqError::HostUnreachable));
Ok(())
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use core::default::Default;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use super::PeerSocket;
use crate::{ZmqResult, context::Context, socket::SocketBuilder};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "PeerBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, Clone, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`PeerSocket`].\n\n")]
#[allow(dead_code)]
struct PeerConfig {
socket_builder: SocketBuilder,
#[builder(setter(into), default = "Default::default()")]
hiccup_msg: String,
#[builder(setter(into), default = "Default::default()")]
hello_message: String,
#[builder(setter(into), default = "Default::default()")]
disconnect_message: String,
}
impl PeerBuilder {
pub fn apply(self, socket: &PeerSocket) -> ZmqResult<()> {
if let Some(socket_builder) = self.socket_builder {
socket_builder.apply(socket)?;
}
self.hiccup_msg
.iter()
.try_for_each(|hiccup_message| socket.set_hiccup_message(hiccup_message))?;
self.hello_message
.iter()
.try_for_each(|hello_message| socket.set_hello_message(hello_message))?;
self.disconnect_message
.iter()
.try_for_each(|disconnect_message| {
socket.set_disconnect_message(disconnect_message)
})?;
Ok(())
}
pub fn build_from_context(self, context: &Context) -> ZmqResult<PeerSocket> {
let socket = PeerSocket::from_context(context)?;
self.apply(&socket)?;
Ok(socket)
}
}
#[cfg(test)]
mod peer_builder_test {
use super::PeerBuilder;
use crate::prelude::{Context, SocketBuilder, ZmqResult};
#[test]
fn default_peer_builder() -> ZmqResult<()> {
let context = Context::new()?;
PeerBuilder::default().build_from_context(&context)?;
Ok(())
}
#[test]
fn peer_builder_with_custom_values() -> ZmqResult<()> {
let context = Context::new()?;
PeerBuilder::default()
.socket_builder(SocketBuilder::default())
.hello_message("hello123")
.hiccup_msg("hiccup123")
.disconnect_message("disconnect123")
.build_from_context(&context)?;
Ok(())
}
}
}