1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
//! `#[doc(hidden)]` benchmark surface.
//!
//! Exposed via `zeromq::__bench` so criterion microbenchmarks can reach
//! in and drive the engine layer directly without building a full
//! socket. Stability: **none**. Anything here may change without notice.
pub use crate::codec::zmq_codec::ZmqCodec;
pub use crate::codec::Message;
/// Benchmark helpers for the engine path. Bundled as a single module so
/// we don't have to widen visibility on every internal type involved.
#[cfg(all(feature = "tokio", feature = "tcp"))]
pub mod engine {
use crate::codec::handshake::greet_exchange;
use crate::codec::{CodecError, DefaultFramedIo as FramedIo, Message};
use crate::engine::PeerEngine;
use crate::error::SendError;
use crate::message::ZmqMessage;
use crate::PeerIdentity;
use flume::Receiver;
pub use crate::error::SendError as EngineSendError;
/// Open a TCP pair + greeting exchange + pair of `PeerEngine`s.
/// Returns `(sender, receiver)` where messages sent via `sender`
/// arrive on `receiver`. The `receive_hwm` parameter sizes the
/// receiver's shared inbound channel.
pub async fn tcp_engine_pair(
send_hwm: usize,
receive_hwm: usize,
) -> (SenderHandle, ReceiverHandle) {
use tokio::net::{TcpListener, TcpStream};
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let connect_fut = TcpStream::connect(addr);
let (accept_res, connect_res) = futures::join!(listener.accept(), connect_fut);
let (server, _) = accept_res.unwrap();
let client = connect_res.unwrap();
let mut io_a = FramedIo::from_tcp(server);
let mut io_b = FramedIo::from_tcp(client);
let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
a.unwrap();
b.unwrap();
#[cfg(feature = "curve")]
let (a_read, a_write, _) = io_a.into_parts();
#[cfg(not(feature = "curve"))]
let (a_read, a_write) = io_a.into_parts();
#[cfg(feature = "curve")]
let (b_read, b_write, _) = io_b.into_parts();
#[cfg(not(feature = "curve"))]
let (b_read, b_write) = io_b.into_parts();
// Each engine feeds its own shared inbound. The sender side
// (engine_a) never receives in this harness, so its inbound
// is a small dummy.
let (a_in_tx, _a_in_rx) = flume::bounded(8);
let (b_in_tx, b_in_rx) = flume::bounded(receive_hwm);
// Bench harness: no registry, so arbitrary PeerKey values are fine.
let tx = PeerEngine::spawn(
0,
PeerIdentity::new(),
a_read,
a_write.into_engine_writer(),
send_hwm,
a_in_tx,
crate::engine::peer_loop::PeerConfig::default(),
);
let rx = PeerEngine::spawn(
1,
PeerIdentity::new(),
b_read,
b_write.into_engine_writer(),
send_hwm,
b_in_tx,
crate::engine::peer_loop::PeerConfig::default(),
);
(
SenderHandle(tx),
ReceiverHandle {
_engine: rx,
inbound: b_in_rx,
},
)
}
/// Like `tcp_engine_pair` but with heartbeat enabled on the sender side.
/// The receiver does NOT have heartbeat configured — it will reply to
/// incoming PINGs via the inline PONG path in `peer_loop`, but it
/// won't send PINGs of its own.
///
/// Use `drop_receiver` to simulate a stalled peer: with no task
/// draining the receiver's inbound channel, the sender's PONG replies
/// can still transit (the TCP socket stays open), but if the TCP
/// connection is severed the sender will time out.
///
/// For the eviction test we instead want to prevent the receiver from
/// ever sending back a PONG. We achieve this by dropping the
/// `ReceiverHandle` while keeping the TCP connection alive — the
/// receiver's `PeerEngine` is dropped so its `peer_loop` exits,
/// closing the write side of the TCP stream, which causes the sender
/// to see EOF and disconnect independently of the heartbeat.
///
/// A cleaner approach: build a one-sided pair where only the sender
/// has a live engine and the receiver side is a raw `TcpStream` that
/// never writes back. That's what `tcp_engine_silent_peer` provides.
pub async fn tcp_engine_pair_with_heartbeat(
send_hwm: usize,
receive_hwm: usize,
heartbeat_interval: std::time::Duration,
heartbeat_timeout: std::time::Duration,
heartbeat_ttl: std::time::Duration,
) -> (SenderHandle, ReceiverHandle) {
use crate::engine::HeartbeatConfig;
use tokio::net::{TcpListener, TcpStream};
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let connect_fut = TcpStream::connect(addr);
let (accept_res, connect_res) = futures::join!(listener.accept(), connect_fut);
let (server, _) = accept_res.unwrap();
let client = connect_res.unwrap();
let mut io_a = FramedIo::from_tcp(server);
let mut io_b = FramedIo::from_tcp(client);
let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
a.unwrap();
b.unwrap();
#[cfg(feature = "curve")]
let (a_read, a_write, _) = io_a.into_parts();
#[cfg(not(feature = "curve"))]
let (a_read, a_write) = io_a.into_parts();
#[cfg(feature = "curve")]
let (b_read, b_write, _) = io_b.into_parts();
#[cfg(not(feature = "curve"))]
let (b_read, b_write) = io_b.into_parts();
let (a_in_tx, _a_in_rx) = flume::bounded(8);
let (b_in_tx, b_in_rx) = flume::bounded(receive_hwm);
// Sender has heartbeat configured.
let tx = PeerEngine::spawn(
0,
PeerIdentity::new(),
a_read,
a_write.into_engine_writer(),
send_hwm,
a_in_tx,
crate::engine::peer_loop::PeerConfig {
heartbeat: Some(HeartbeatConfig {
interval: heartbeat_interval,
timeout: heartbeat_timeout,
ttl: heartbeat_ttl,
}),
..Default::default()
},
);
let rx = PeerEngine::spawn(
1,
PeerIdentity::new(),
b_read,
b_write.into_engine_writer(),
send_hwm,
b_in_tx,
crate::engine::peer_loop::PeerConfig::default(),
);
(
SenderHandle(tx),
ReceiverHandle {
_engine: rx,
inbound: b_in_rx,
},
)
}
/// Build a one-sided engine: the sender has heartbeat configured, the
/// peer side reads frames and discards them (never sends back a PONG).
/// The TCP connection stays open so EOF-based detection doesn't fire —
/// only the heartbeat timeout can evict the peer.
///
/// Returns `(SenderHandle, _keep_alive)` where `_keep_alive` must be
/// held for the duration of the test to prevent TCP FIN.
pub async fn tcp_engine_silent_peer(
send_hwm: usize,
heartbeat_interval: std::time::Duration,
heartbeat_timeout: std::time::Duration,
heartbeat_ttl: std::time::Duration,
) -> (SenderHandle, tokio::task::JoinHandle<()>) {
use crate::engine::HeartbeatConfig;
use tokio::net::{TcpListener, TcpStream};
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (accept_res, connect_res) = futures::join!(listener.accept(), TcpStream::connect(addr));
let (server, _) = accept_res.unwrap();
let client = connect_res.unwrap();
let mut io_a = FramedIo::from_tcp(client);
let mut io_b = FramedIo::from_tcp(server);
// Both sides complete ZMTP greeting so the sender advances past
// greet_exchange.
let (a, b) = futures::join!(greet_exchange(&mut io_a), greet_exchange(&mut io_b));
a.unwrap();
b.unwrap();
#[cfg(feature = "curve")]
let (a_read, a_write, _) = io_a.into_parts();
#[cfg(not(feature = "curve"))]
let (a_read, a_write) = io_a.into_parts();
let (a_in_tx, _a_in_rx) = flume::bounded(8);
let tx = PeerEngine::spawn(
0,
PeerIdentity::new(),
a_read,
a_write.into_engine_writer(),
send_hwm,
a_in_tx,
crate::engine::peer_loop::PeerConfig {
heartbeat: Some(HeartbeatConfig {
interval: heartbeat_interval,
timeout: heartbeat_timeout,
ttl: heartbeat_ttl,
}),
..Default::default()
},
);
// The silent-peer task drains io_b's read side (so TCP recv
// buffer doesn't fill up and stall the write path) but never
// writes anything back. Holding the JoinHandle prevents the task
// from being dropped, keeping the TCP connection open.
let keep_alive = tokio::spawn(async move {
use futures::StreamExt as _;
while io_b.read_half.next().await.is_some() {}
});
(SenderHandle(tx), keep_alive)
}
pub struct SenderHandle(PeerEngine);
pub struct ReceiverHandle {
_engine: PeerEngine,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
}
impl SenderHandle {
pub async fn send(&self, msg: ZmqMessage) -> Result<(), SendError> {
self.0.send(Message::Message(msg)).await
}
/// Returns `true` while the underlying `peer_loop` is still running.
/// Becomes `false` after a heartbeat timeout or TCP disconnect.
pub fn writer_alive(&self) -> bool {
self.0.writer_alive()
}
}
impl ReceiverHandle {
pub async fn recv(&self) -> Option<Result<Message, CodecError>> {
self.inbound.recv_async().await.ok().map(|(_, res)| res)
}
}
}