use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
CaptureSocket, Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType,
ZmqMessage, ZmqResult,
};
use flume::Receiver;
use std::sync::Arc;
pub struct DealerSocket {
core: SocketCore,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl HasCommon for DealerSocket {
type Backend = GenericSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.core.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.core.common
}
}
impl Socket for DealerSocket {
type Backend = GenericSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let core = SocketCore::new(SocketType::DEALER, options);
let inbound = core.common.backend.inbound();
Self { core, inbound }
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
self.core.connect_endpoint(endpoint).await
}
async fn linger_drain(&mut self) {
self.core.linger_drain().await;
}
}
impl SocketRecv for DealerSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
let (_peer_id, message) = self
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
Ok(message)
}
}
impl SocketSend for DealerSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
let send_timeout = self.core.common.backend.socket_options().send_timeout;
self.core
.common
.backend
.send_round_robin_timed(message, send_timeout)
.await?;
self.core.after_send().await;
Ok(())
}
}
impl CaptureSocket for DealerSocket {}
impl DealerSocket {
pub fn split(mut self) -> (DealerSendHalf, DealerRecvHalf) {
let dummy = SocketCore::new(SocketType::DEALER, SocketOptions::default());
let inbound = std::mem::replace(&mut self.inbound, dummy.common.backend.inbound());
let core = std::mem::replace(&mut self.core, dummy);
let inner = Arc::new(DealerSocketInner { core });
(
DealerSendHalf {
inner: inner.clone(),
},
DealerRecvHalf { inner, inbound },
)
}
}
struct DealerSocketInner {
core: SocketCore,
}
#[derive(Clone)]
pub struct DealerSendHalf {
inner: Arc<DealerSocketInner>,
}
pub struct DealerRecvHalf {
inner: Arc<DealerSocketInner>,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl SocketSend for DealerSendHalf {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
self.inner
.core
.common
.backend
.send_round_robin(message)
.await?;
self.inner.core.after_send().await;
Ok(())
}
}
impl CaptureSocket for DealerSendHalf {}
impl SocketRecv for DealerRecvHalf {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self
.inner
.core
.common
.backend
.socket_options()
.receive_timeout;
let (_peer_id, message) = self
.inner
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
Ok(message)
}
}