use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{Socket, SocketBackend, SocketOptions, SocketRecv, SocketType, ZmqMessage, ZmqResult};
use flume::Receiver;
pub struct GatherSocket {
core: SocketCore,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl HasCommon for GatherSocket {
type Backend = GenericSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.core.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.core.common
}
}
impl Socket for GatherSocket {
type Backend = GenericSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let core = SocketCore::new(SocketType::GATHER, options);
let inbound = core.common.backend.inbound();
Self { core, inbound }
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
self.core.connect_endpoint(endpoint).await
}
async fn linger_drain(&mut self) {
self.core.linger_drain().await;
}
}
impl SocketRecv for GatherSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
loop {
let (_peer_id, message) = self
.core
.common
.backend
.recv_auto(&self.inbound, receive_timeout)
.await?;
if message.len() == 1 {
return Ok(message);
}
}
}
}