use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
};
use flume::Receiver;
pub struct PairSocket {
core: SocketCore,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl HasCommon for PairSocket {
type Backend = GenericSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.core.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.core.common
}
}
impl Socket for PairSocket {
type Backend = GenericSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let core = SocketCore::new(SocketType::PAIR, options);
let inbound = core.common.backend.inbound();
Self { core, inbound }
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
self.core.connect_endpoint(endpoint).await
}
async fn linger_drain(&mut self) {
self.core.linger_drain().await;
}
}
impl SocketSend for PairSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
let send_timeout = self.core.common.backend.socket_options().send_timeout;
self.core
.common
.backend
.send_round_robin_timed(message, send_timeout)
.await?;
self.core.after_send().await;
Ok(())
}
}
impl SocketRecv for PairSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
let (_peer_id, message) = self
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
Ok(message)
}
}