use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::PeerIdentity;
use crate::{
Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage,
ZmqResult,
};
use flume::Receiver;
use std::convert::TryInto;
use std::sync::Arc;
pub struct RouterSocket {
core: SocketCore,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl HasCommon for RouterSocket {
type Backend = GenericSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.core.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.core.common
}
}
impl Socket for RouterSocket {
type Backend = GenericSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let core = SocketCore::new(SocketType::ROUTER, options);
let inbound = core.common.backend.inbound();
Self { core, inbound }
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
self.core.connect_endpoint(endpoint).await
}
async fn linger_drain(&mut self) {
self.core.linger_drain().await;
}
}
impl SocketRecv for RouterSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
let (key, mut message) = self
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
let peer_id = self
.core
.common
.backend
.registry()
.id_for(key)
.ok_or(ZmqError::Other(
"Peer disappeared before ROUTER envelope".into(),
))?;
message.push_front(peer_id);
Ok(message)
}
}
impl SocketSend for RouterSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let mut message = message.into();
if message.len() <= 1 {
return Err(ZmqError::Socket(
"ROUTER send requires at least 2 frames: identity frame + message".into(),
));
}
let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
let send_timeout = self.core.common.backend.socket_options().send_timeout;
self.core
.common
.backend
.send_to_timed(&peer_id, message, send_timeout)
.await?;
self.core.after_send().await;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RouterEnvelope {
pub identity: PeerIdentity,
pub message: ZmqMessage,
}
impl RouterSocket {
pub async fn recv_envelope(&mut self) -> ZmqResult<RouterEnvelope> {
let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
let (key, message) = self
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
let identity = self
.core
.common
.backend
.registry()
.id_for(key)
.ok_or(ZmqError::Other(
"Peer disappeared before ROUTER envelope".into(),
))?;
Ok(RouterEnvelope { identity, message })
}
pub async fn send_to(
&mut self,
identity: &PeerIdentity,
message: impl Into<ZmqMessage> + Send,
) -> ZmqResult<()> {
let send_timeout = self.core.common.backend.socket_options().send_timeout;
self.core
.common
.backend
.send_to_timed(identity, message.into(), send_timeout)
.await?;
self.core.after_send().await;
Ok(())
}
pub fn split(mut self) -> (RouterSendHalf, RouterRecvHalf) {
let dummy = SocketCore::new(SocketType::ROUTER, SocketOptions::default());
let inbound = std::mem::replace(&mut self.inbound, dummy.common.backend.inbound());
let core = std::mem::replace(&mut self.core, dummy);
let inner = Arc::new(RouterSocketInner { core });
(
RouterSendHalf {
inner: inner.clone(),
},
RouterRecvHalf { inner, inbound },
)
}
}
struct RouterSocketInner {
core: SocketCore,
}
#[derive(Clone)]
pub struct RouterSendHalf {
inner: Arc<RouterSocketInner>,
}
pub struct RouterRecvHalf {
inner: Arc<RouterSocketInner>,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl SocketSend for RouterSendHalf {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let mut message = message.into();
if message.len() <= 1 {
return Err(ZmqError::Socket(
"ROUTER send requires at least 2 frames: identity frame + message".into(),
));
}
let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
let send_timeout = self.inner.core.common.backend.socket_options().send_timeout;
self.inner
.core
.common
.backend
.send_to_timed(&peer_id, message, send_timeout)
.await?;
self.inner.core.after_send().await;
Ok(())
}
}
impl SocketRecv for RouterRecvHalf {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self
.inner
.core
.common
.backend
.socket_options()
.receive_timeout;
let (key, mut message) = self
.inner
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
let peer_id = self
.inner
.core
.common
.backend
.registry()
.id_for(key)
.ok_or(ZmqError::Other(
"Peer disappeared before ROUTER envelope".into(),
))?;
message.push_front(peer_id);
Ok(message)
}
}
impl RouterRecvHalf {
pub async fn recv_envelope(&mut self) -> ZmqResult<RouterEnvelope> {
let receive_timeout = self
.inner
.core
.common
.backend
.socket_options()
.receive_timeout;
let (key, message) = self
.inner
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
let identity = self
.inner
.core
.common
.backend
.registry()
.id_for(key)
.ok_or(ZmqError::Other(
"Peer disappeared before ROUTER envelope".into(),
))?;
Ok(RouterEnvelope { identity, message })
}
}
impl RouterSendHalf {
pub async fn send_to(
&mut self,
identity: &PeerIdentity,
message: impl Into<ZmqMessage> + Send,
) -> ZmqResult<()> {
let send_timeout = self.inner.core.common.backend.socket_options().send_timeout;
self.inner
.core
.common
.backend
.send_to_timed(identity, message.into(), send_timeout)
.await?;
self.inner.core.after_send().await;
Ok(())
}
}