use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
Socket, SocketBackend, SocketOptions, SocketSend, SocketType, ZmqError, ZmqMessage, ZmqResult,
};
pub struct ScatterSocket {
core: SocketCore,
}
impl HasCommon for ScatterSocket {
type Backend = GenericSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.core.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.core.common
}
}
impl Socket for ScatterSocket {
type Backend = GenericSocketBackend;
fn with_options(options: SocketOptions) -> Self {
Self {
core: SocketCore::new(SocketType::SCATTER, options),
}
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
self.core.connect_endpoint(endpoint).await
}
async fn linger_drain(&mut self) {
self.core.linger_drain().await;
}
}
impl SocketSend for ScatterSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
if message.len() > 1 {
return Err(ZmqError::Socket(
"SCATTER does not support multipart messages".into(),
));
}
let send_timeout = self.core.common.backend.socket_options().send_timeout;
self.core
.common
.backend
.send_round_robin_timed(message, send_timeout)
.await?;
self.core.after_send().await;
Ok(())
}
}