mod backend;
pub(crate) mod common;
pub(crate) mod core;
mod event;
pub mod family;
pub(crate) mod handshake;
pub(crate) mod kind;
mod options;
mod socket_type;
pub(crate) mod topic_router;
pub use backend::{CaptureSocket, MultiPeerBackend, SocketBackend, SocketRecv, SocketSend};
pub use event::SocketEvent;
pub use options::{ReconnectStop, SocketBuilder, SocketOptions};
pub use socket_type::SocketType;
use crate::endpoint::Endpoint;
use crate::transport::{self, AcceptStopHandle, TransportIo};
use crate::{ZmqError, ZmqResult};
use futures::channel::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
pub trait Socket: Sized + Send + common::HasCommon<Backend = <Self as Socket>::Backend> {
type Backend: MultiPeerBackend + 'static;
fn new() -> Self {
Self::with_options(SocketOptions::default())
}
fn builder() -> options::SocketBuilder<Self> {
options::SocketBuilder::new()
}
fn with_options(options: SocketOptions) -> Self;
#[doc(hidden)]
fn backend(&self) -> Arc<<Self as Socket>::Backend> {
self.common().backend.clone()
}
fn bind<E>(
&mut self,
endpoint: E,
) -> impl std::future::Future<Output = ZmqResult<Endpoint>> + Send
where
E: TryInto<Endpoint> + Send,
E::Error: Into<ZmqError>,
{
async move {
let endpoint = endpoint.try_into().map_err(Into::into)?;
#[cfg(feature = "inproc")]
if matches!(endpoint, Endpoint::Inproc(_)) {
warn_inproc_ignored_options(self, &endpoint);
}
let cloned_backend = self.backend();
let cback = move |result: ZmqResult<TransportIo>| {
let b = cloned_backend.clone();
async move {
let result = match result {
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
Ok(TransportIo::Framed(io, ep)) => {
let peer_addr = Some(ep.to_string());
handshake::peer_connected(*io, b.clone(), None, peer_addr)
.await
.map(|peer_id| (ep, peer_id))
}
#[cfg(feature = "inproc")]
Ok(TransportIo::Inproc(peer)) => {
let ep = peer.endpoint.clone();
let peer_id = PeerIdentity::new();
b.clone()
.peer_connected_inproc(&peer_id, peer, Some(ep.clone()))
.await
.map(|()| (ep, peer_id))
}
Err(e) => Err(e),
};
match result {
Ok((ep, peer_id)) => {
if let Some(monitor) = b.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Accepted(ep, peer_id));
}
}
Err(e) => {
if let Some(monitor) = b.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::AcceptFailed(e));
}
}
}
}
};
let backend = self.backend();
let backlog = backend.socket_options().backlog;
let ipv6 = backend.socket_options().ipv6;
#[cfg(feature = "tcp")]
let tcp_cfg = crate::transport::TcpConfig::from_options(backend.socket_options());
#[cfg(not(feature = "tcp"))]
let tcp_cfg: () = ();
let (endpoint, stop_handle) =
transport::begin_accept(endpoint, backlog, ipv6, tcp_cfg, cback).await?;
if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Listening(endpoint.clone()));
}
self.binds().insert(endpoint.clone(), stop_handle);
self.set_last_endpoint(endpoint.clone());
Ok(endpoint)
}
}
#[doc(hidden)]
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.common_mut().binds
}
#[doc(hidden)]
fn set_last_endpoint(&mut self, endpoint: Endpoint) {
self.common_mut().last_endpoint = Some(endpoint);
}
fn last_endpoint(&self) -> Option<&Endpoint> {
self.common().last_endpoint.as_ref()
}
fn unbind(
&mut self,
endpoint: Endpoint,
) -> impl std::future::Future<Output = ZmqResult<()>> + Send {
async move {
let stop_handle = self.binds().remove(&endpoint);
let stop_handle = stop_handle.ok_or(ZmqError::NoSuchBind(endpoint))?;
stop_handle.task.shutdown().await
}
}
fn unbind_all(&mut self) -> impl std::future::Future<Output = Vec<ZmqError>> + Send {
async move {
let mut errs = Vec::new();
let endpoints: Vec<_> = self.binds().keys().cloned().collect();
for endpoint in endpoints {
if let Err(err) = self.unbind(endpoint).await {
errs.push(err);
}
}
errs
}
}
fn connect<E>(&mut self, endpoint: E) -> impl std::future::Future<Output = ZmqResult<()>> + Send
where
E: TryInto<Endpoint> + Send,
E::Error: Into<ZmqError>,
{
async move {
let endpoint = endpoint.try_into().map_err(Into::into)?;
#[cfg(feature = "inproc")]
if matches!(endpoint, Endpoint::Inproc(_)) {
warn_inproc_ignored_options(self, &endpoint);
}
let backend = self.backend();
let (resolved, peer_id) =
handshake::connect_peer_forever(endpoint, backend, None).await?;
if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id));
}
Ok(())
}
}
fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend().monitor().lock().replace(sender);
receiver
}
#[doc(hidden)]
fn linger_drain(&mut self) -> impl std::future::Future<Output = ()> + Send {
async {}
}
fn close(mut self) -> impl std::future::Future<Output = Result<(), Vec<ZmqError>>> + Send {
async move {
let errs = self.unbind_all().await;
self.linger_drain().await;
if errs.is_empty() {
Ok(())
} else {
Err(errs)
}
}
}
}
#[cfg(feature = "inproc")]
use crate::PeerIdentity;
#[cfg(feature = "inproc")]
fn warn_inproc_ignored_options<S: Socket>(sock: &mut S, endpoint: &Endpoint) {
let ignored: Vec<&'static str> = sock.backend().socket_options().inproc_ignored_options();
if ignored.is_empty() {
return;
}
let common = sock.common_mut();
for opt in ignored {
if common.ignored_warned.insert(opt) {
log::warn!(
"option '{}' is set but has no effect on inproc transport ({}); \
configure per-endpoint behavior with separate sockets if needed",
opt,
endpoint
);
if let Some(monitor) = common.backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::OptionIgnoredOnTransport {
option: opt,
endpoint: endpoint.clone(),
});
}
}
}
}