1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//! Shared infrastructure for socket types backed by `GenericSocketBackend`
//! (PUSH, PULL, DEALER, ROUTER, REQ, REP, PAIR, CHANNEL, SCATTER, GATHER).
//! PUB/XPUB/SUB have custom backends and manage their own state directly.
use crate::endpoint::Endpoint;
use crate::engine::backend::GenericSocketBackend;
use crate::reconnect::{ReconnectConfig, ReconnectHandle};
use crate::socket::common::SocketCommon;
use crate::{SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqResult};
use std::collections::HashMap;
use std::sync::Arc;
pub(crate) struct SocketCore {
pub(crate) common: SocketCommon<GenericSocketBackend>,
pub(crate) reconnect_handles: HashMap<Endpoint, ReconnectHandle>,
/// Counter for the amortized cooperative yield on the send hot path.
/// Each `send` implementation bumps this and lets [`Self::after_send`]
/// decide whether to actually yield.
///
/// Stored as `AtomicU32` so split-socket halves (e.g. `DealerSendHalf`,
/// `RouterSendHalf`) can share it behind an `Arc` without locking.
/// Relaxed ordering is sufficient — this is a heuristic, not a
/// synchronization primitive.
send_count: std::sync::atomic::AtomicU32,
}
impl SocketCore {
pub(crate) fn new(socket_type: SocketType, mut options: SocketOptions) -> Self {
// Resolve the inline-write knob now that we know the socket type.
// `None` (the SocketOptions default) means "use the per-type
// default"; `Some(_)` is a user override that we leave intact.
if options.inline_write_max.is_none() {
options.inline_write_max = Some(socket_type.default_inline_write_max());
}
// Same shape for `out_batch_msgs`: outer None → per-type default,
// outer Some(_) → user override. PUB / XPUB stay at 32 (per-peer
// HWM safety under fanout); everything else gets 256 (point-to-
// point sockets benefit from larger writev amortization).
if options.out_batch_msgs.is_none() {
options.out_batch_msgs = Some(socket_type.default_out_batch_msgs());
}
let backend = Arc::new(GenericSocketBackend::with_options(socket_type, options));
Self {
common: SocketCommon::new(backend),
reconnect_handles: HashMap::new(),
send_count: std::sync::atomic::AtomicU32::new(0),
}
}
/// Amortized cooperative yield shorthand for socket `send` impls.
/// Call at the end of `send` to bump the counter and yield every
/// [`crate::async_rt::task::SOCKET_SEND_YIELD_EVERY`] sends.
///
/// Takes `&self` (not `&mut`) so split-socket send halves behind
/// `Arc<_>` can call it.
#[inline]
pub(crate) async fn after_send(&self) {
use std::sync::atomic::Ordering;
let n = self
.send_count
.fetch_add(1, Ordering::Relaxed)
.wrapping_add(1);
if n % crate::async_rt::task::SOCKET_SEND_YIELD_EVERY == 0 {
crate::async_rt::task::yield_now().await;
}
}
/// Connect to `endpoint`, register the peer, emit a `Connected`
/// monitor event, and spawn the reconnect task.
pub(crate) async fn connect_endpoint(&mut self, endpoint: Endpoint) -> ZmqResult<()> {
let connect_timeout = self.common.backend.socket_options().connect_timeout;
let (resolved, peer_id) = crate::socket::handshake::connect_peer_forever(
endpoint.clone(),
self.common.backend.clone(),
connect_timeout,
)
.await?;
if let Some(monitor) = self.common.backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id.clone()));
}
let b = self.common.backend.clone();
let register_fn: crate::reconnect::RegisterDisconnectFn =
Box::new(move |id, notifier| b.register_disconnect_notifier(id, notifier));
let opts = self.common.backend.socket_options();
let handle = crate::reconnect::spawn_reconnect_task(
endpoint.clone(),
self.common.backend.clone(),
peer_id,
register_fn,
ReconnectConfig {
initial_interval: opts.reconnect_interval,
max_interval: opts.reconnect_interval_max,
backoff_multiplier: 2.0,
},
);
self.reconnect_handles.insert(endpoint, handle);
Ok(())
}
/// Drain all peer outbound queues up to the linger timeout.
pub(crate) async fn linger_drain(&self) {
let opts = self.common.backend.socket_options();
crate::engine::registry::drain_registry(self.common.backend.registry(), opts).await;
}
}
impl Drop for SocketCore {
fn drop(&mut self) {
for (_, handle) in self.reconnect_handles.drain() {
handle.shutdown();
}
self.common.backend.shutdown();
}
}