use crate::{addr::Endpoint, auth::*, core::*, error::*, Ctx, CtxHandle};
use serde::{Deserialize, Serialize};
use std::{str, sync::Arc};
#[derive(Debug, Clone)]
pub struct Gather {
inner: Arc<RawSocket>,
}
impl Gather {
pub fn new() -> Result<Self, Error> {
let inner = Arc::new(RawSocket::new(RawSocketType::Gather)?);
Ok(Self { inner })
}
pub fn with_ctx(handle: CtxHandle) -> Result<Self, Error> {
let inner =
Arc::new(RawSocket::with_ctx(RawSocketType::Gather, handle)?);
Ok(Self { inner })
}
pub fn ctx(&self) -> CtxHandle {
self.inner.ctx()
}
}
impl PartialEq for Gather {
fn eq(&self, other: &Gather) -> bool {
self.inner == other.inner
}
}
impl Eq for Gather {}
impl GetRawSocket for Gather {
fn raw_socket(&self) -> &RawSocket {
&self.inner
}
}
impl Heartbeating for Gather {}
impl Socket for Gather {}
impl RecvMsg for Gather {}
unsafe impl Send for Gather {}
unsafe impl Sync for Gather {}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(from = "FlatGatherConfig")]
#[serde(into = "FlatGatherConfig")]
pub struct GatherConfig {
socket_config: SocketConfig,
recv_config: RecvConfig,
heartbeat_config: HeartbeatingConfig,
}
impl GatherConfig {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Gather, Error> {
self.with_ctx(Ctx::global())
}
pub fn with_ctx(&self, handle: CtxHandle) -> Result<Gather, Error> {
let gather = Gather::with_ctx(handle)?;
self.apply(&gather)?;
Ok(gather)
}
pub fn apply(&self, gather: &Gather) -> Result<(), Error> {
self.recv_config.apply(gather)?;
self.socket_config.apply(gather)?;
Ok(())
}
}
#[derive(Clone, Serialize, Deserialize)]
struct FlatGatherConfig {
connect: Option<Vec<Endpoint>>,
bind: Option<Vec<Endpoint>>,
heartbeat: Option<Heartbeat>,
recv_hwm: HighWaterMark,
recv_timeout: Period,
mechanism: Option<Mechanism>,
}
impl From<GatherConfig> for FlatGatherConfig {
fn from(config: GatherConfig) -> Self {
let socket_config = config.socket_config;
let recv_config = config.recv_config;
let heartbeat_config = config.heartbeat_config;
Self {
connect: socket_config.connect,
bind: socket_config.bind,
heartbeat: heartbeat_config.heartbeat,
mechanism: socket_config.mechanism,
recv_hwm: recv_config.recv_hwm,
recv_timeout: recv_config.recv_timeout,
}
}
}
impl From<FlatGatherConfig> for GatherConfig {
fn from(flat: FlatGatherConfig) -> Self {
let socket_config = SocketConfig {
connect: flat.connect,
bind: flat.bind,
mechanism: flat.mechanism,
};
let recv_config = RecvConfig {
recv_hwm: flat.recv_hwm,
recv_timeout: flat.recv_timeout,
};
let heartbeat_config = HeartbeatingConfig {
heartbeat: flat.heartbeat,
};
Self {
socket_config,
recv_config,
heartbeat_config,
}
}
}
impl GetSocketConfig for GatherConfig {
fn socket_config(&self) -> &SocketConfig {
&self.socket_config
}
fn socket_config_mut(&mut self) -> &mut SocketConfig {
&mut self.socket_config
}
}
impl ConfigureSocket for GatherConfig {}
impl GetRecvConfig for GatherConfig {
fn recv_config(&self) -> &RecvConfig {
&self.recv_config
}
fn recv_config_mut(&mut self) -> &mut RecvConfig {
&mut self.recv_config
}
}
impl ConfigureRecv for GatherConfig {}
impl GetHeartbeatingConfig for GatherConfig {
fn heartbeat_config(&self) -> &HeartbeatingConfig {
&self.heartbeat_config
}
fn heartbeat_config_mut(&mut self) -> &mut HeartbeatingConfig {
&mut self.heartbeat_config
}
}
impl ConfigureHeartbeating for GatherConfig {}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct GatherBuilder {
inner: GatherConfig,
}
impl GatherBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Gather, Error> {
self.inner.build()
}
pub fn with_ctx(&self, handle: CtxHandle) -> Result<Gather, Error> {
self.inner.with_ctx(handle)
}
}
impl GetSocketConfig for GatherBuilder {
fn socket_config(&self) -> &SocketConfig {
self.inner.socket_config()
}
fn socket_config_mut(&mut self) -> &mut SocketConfig {
self.inner.socket_config_mut()
}
}
impl BuildSocket for GatherBuilder {}
impl GetRecvConfig for GatherBuilder {
fn recv_config(&self) -> &RecvConfig {
self.inner.recv_config()
}
fn recv_config_mut(&mut self) -> &mut RecvConfig {
self.inner.recv_config_mut()
}
}
impl BuildRecv for GatherBuilder {}
impl GetHeartbeatingConfig for GatherBuilder {
fn heartbeat_config(&self) -> &HeartbeatingConfig {
self.inner.heartbeat_config()
}
fn heartbeat_config_mut(&mut self) -> &mut HeartbeatingConfig {
self.inner.heartbeat_config_mut()
}
}
impl BuildHeartbeating for GatherBuilder {}
#[cfg(test)]
mod test {
use super::*;
use crate::*;
use std::time::Duration;
#[test]
fn test_ser_de() {
let config = GatherConfig::new();
let ron = serde_yaml::to_string(&config).unwrap();
let de: GatherConfig = serde_yaml::from_str(&ron).unwrap();
assert_eq!(config, de);
}
#[test]
fn test_issue_125() {
let gather = Gather::new().unwrap();
gather
.set_recv_timeout(Some(Duration::from_secs(3)))
.unwrap();
}
#[test]
fn test_gather() {
let addr_a = InprocAddr::new_unique();
let addr_b = InprocAddr::new_unique();
let lb_a = ScatterBuilder::new().bind(&addr_a).build().unwrap();
let lb_b = ScatterBuilder::new().bind(&addr_b).build().unwrap();
let worker = GatherBuilder::new()
.connect(&[addr_a, addr_b])
.recv_hwm(1)
.build()
.unwrap();
for _ in 0..100 {
lb_a.try_send("a").unwrap();
}
for _ in 0..100 {
lb_b.try_send("b").unwrap();
}
let mut msg = Msg::new();
for i in 0..200 {
worker.try_recv(&mut msg).unwrap();
if i % 2 == 0 {
assert_eq!(msg.to_str(), Ok("a"));
} else {
assert_eq!(msg.to_str(), Ok("b"));
}
}
}
}