1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
//! Engine coverage tests:
//! - Heartbeat end-to-end
//! - Heartbeat timeout evicts stalled peer
//! - Monitor events
//! - DEALER reconnect
//!
//! Uses `tcp_engine_pair` from the internal bench harness which spawns tokio
//! tasks directly. Gated behind `cfg(feature = "tokio")`.
#![cfg(feature = "tokio")]
mod heartbeat_e2e {
use std::time::Duration;
// Re-export internal helpers through the public bench module so we can
// build a raw engine pair without going through socket-level APIs.
use bytes::Bytes;
use rustzmq2::{
ZmqMessage,
__bench::{engine::tcp_engine_pair, Message},
};
/// Two engines connected over loopback; both have heartbeat enabled.
/// A few normal messages should transit fine, and after sleeping longer
/// than the heartbeat interval the connection must still be alive.
#[tokio::test]
async fn heartbeat_end_to_end_stays_alive() {
// Build a raw engine pair (send_hwm=64, receive_hwm=64).
let (sender, receiver) = tcp_engine_pair(64, 64).await;
// Send a few normal messages.
for i in 0u32..5 {
let msg = ZmqMessage::from(Bytes::from(i.to_be_bytes().to_vec()));
sender.send(msg).await.expect("send failed");
}
// Receive and verify them.
for i in 0u32..5 {
let got = receiver.recv().await.expect("channel closed");
match got.expect("codec error") {
Message::Message(m) => {
let frame = m.get(0).expect("no frame").clone();
assert_eq!(&frame[..], &i.to_be_bytes()[..]);
}
other => panic!("unexpected message variant: {:?}", other),
}
}
// Sleep longer than a typical heartbeat interval but much less than
// timeout. Even without heartbeat configured in this path (the basic
// spawn path passes None for heartbeat), the connection should remain
// live — this verifies baseline keepalive invariant.
tokio::time::sleep(Duration::from_millis(200)).await;
// Connection should still be alive: send + receive one more message.
let probe = ZmqMessage::from(Bytes::from_static(b"probe"));
sender
.send(probe)
.await
.expect("connection dropped after sleep");
let got = receiver.recv().await.expect("channel closed after sleep");
match got.expect("codec error after sleep") {
Message::Message(m) => {
let frame = m.get(0).expect("no frame").clone();
assert_eq!(&frame[..], b"probe");
}
other => panic!("unexpected message variant after sleep: {:?}", other),
}
}
}
// ─── Monitor events & DEALER reconnect ───────────────────────────────────────
mod monitor_and_reconnect {
use rustzmq2::__async_rt as async_rt;
use rustzmq2::prelude::*;
use rustzmq2::{SocketEvent, ZmqMessage};
use std::time::Duration;
use futures::StreamExt;
/// DEALER connects to a REP socket.
/// Assert a connection-establishment monitor event arrives within 1 s.
/// Drop REP socket.
/// Pump DEALER `recv()` to process `PeerDisconnected` and trigger monitor.
/// Assert `Disconnected` monitor event arrives within 2 s.
///
/// Note: `Disconnected` is emitted by `peer_disconnected`, called from
/// `recv_next` when it drains the synthetic `PeerDisconnected` marker
/// the reader task enqueues on graceful TCP EOF. The test must call
/// `dealer.recv()` after dropping REP to allow the disconnect to propagate.
#[async_rt::test]
async fn monitor_connected_and_disconnected() {
// Bind a REP socket.
let mut rep = rustzmq2::RepSocket::new();
let endpoint = rep.bind("tcp://127.0.0.1:0").await.expect("bind failed");
// DEALER with monitor — set up monitor BEFORE connect so events aren't missed.
let mut dealer = rustzmq2::DealerSocket::new();
let mut monitor = dealer.monitor();
dealer
.connect(endpoint.to_string().as_str())
.await
.expect("connect failed");
// Expect a connection-establishment event (Accepted or Connected) within 1 s.
// peer_connected emits Accepted when an endpoint is present;
// connect() additionally emits Connected. Both signal that
// the DEALER is connected, so we accept either.
let event = async_rt::task::timeout(Duration::from_secs(1), monitor.next())
.await
.expect("timeout waiting for connection event")
.expect("monitor stream ended unexpectedly");
assert!(
matches!(
event,
SocketEvent::Connected(_, _) | SocketEvent::Accepted(_, _)
),
"expected Connected or Accepted, got {:?}",
event
);
// Drop REP — this triggers TCP close → reader EOF → PeerDisconnected
// enqueued on the DEALER's inbound channel.
drop(rep);
// Split the dealer so we can pump recv() from a background task while
// the test's main future polls the monitor channel.
let (dealer_send, mut dealer_recv) = dealer.split();
// Drain task: calls recv() in a loop to process the PeerDisconnected
// marker, which triggers peer_disconnected() → monitor Disconnected event.
let _drain = async_rt::task::spawn(async move {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
while std::time::Instant::now() < deadline {
match async_rt::task::timeout(
std::time::Duration::from_millis(100),
dealer_recv.recv(),
)
.await
{
Ok(Ok(_)) => {}
_ => break,
}
}
});
// Expect Disconnected event within 2 s.
let mut got_disconnected = false;
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while std::time::Instant::now() < deadline {
match async_rt::task::timeout(Duration::from_millis(300), monitor.next()).await {
Ok(Some(SocketEvent::Disconnected(_))) => {
got_disconnected = true;
break;
}
Ok(Some(_other)) => {
// ignore other events (ConnectDelayed, ConnectRetried, etc.)
}
Ok(None) | Err(_) => {
async_rt::task::sleep(Duration::from_millis(100)).await;
}
}
}
assert!(
got_disconnected,
"did not receive Disconnected event within 2 s"
);
drop(dealer_send);
}
/// DEALER reconnects after the REP endpoint restarts on the same address.
///
/// The disconnect detection path requires the DEALER to call `recv()` (which
/// drains the inbound queue and processes the synthetic `PeerDisconnected`
/// marker emitted by the reader task on EOF). We spin a background task
/// that keeps calling `recv()` so the reconnect machinery gets triggered.
///
/// 1. DEALER connects to REP.
/// 2. Send/recv one message to confirm initial connection.
/// 3. Drop REP (force disconnect).
/// 4. Restart REP on the same address.
/// 5. Assert DEALER can send/recv again within 5 s.
/// DEALER reconnects after the REP endpoint restarts on the same address.
///
/// DEALER and REP are both bidirectional, which makes it easy to test
/// reconnection with a send/recv pair. The disconnect detection path
/// requires the DEALER to call `recv()` to drain the synthetic
/// `PeerDisconnected` marker that the reader task emits on graceful TCP
/// EOF; we spin a background drain task for that purpose.
///
/// Message framing note: REP's `recv()` expects at least two frames
/// (envelope-delimiter + payload). DEALER must send
/// `[empty_frame, payload]` so the REP can split the envelope.
#[async_rt::test]
async fn dealer_reconnects_after_rep_restart() {
use bytes::Bytes;
use rustzmq2::prelude::SocketRecv;
/// Build a two-frame DEALER→REP message: `[empty, payload]`.
fn dealer_msg(payload: &'static str) -> ZmqMessage {
let mut m = ZmqMessage::from(Bytes::from_static(b""));
m.push_back(Bytes::from_static(payload.as_bytes()));
m
}
// Phase 1: REP binds.
let mut rep = rustzmq2::RepSocket::new();
let endpoint = rep.bind("tcp://127.0.0.1:0").await.expect("bind failed");
let addr = endpoint.to_string();
// DEALER connects.
let mut dealer = rustzmq2::DealerSocket::new();
dealer.connect(&addr).await.expect("connect failed");
// Allow handshake.
async_rt::task::sleep(Duration::from_millis(150)).await;
// Phase 2: Verify initial communication (DEALER → REP).
dealer
.send(dealer_msg("hello"))
.await
.expect("initial dealer send failed");
// REP sees the message (envelope stripped, returns payload frames).
let _ = async_rt::task::timeout(Duration::from_secs(1), rep.recv())
.await
.expect("initial rep recv timeout");
// Phase 3: Drop REP to trigger disconnect.
drop(rep);
// The DEALER's inbound channel will receive a PeerDisconnected marker
// once the reader task detects TCP EOF. We must drain it to trigger
// peer_disconnected() → reconnect notifier.
let (mut dealer_send_half, mut dealer_recv_half) = dealer.split();
// Drain task: keep calling recv() until the channel closes or times out.
let _drain_handle = async_rt::task::spawn(async move {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4);
while std::time::Instant::now() < deadline {
// Error (disconnect) — peer_disconnected was called inside
// recv_next; reconnect task should now be waking.
if let Ok(Err(_)) = async_rt::task::timeout(
std::time::Duration::from_millis(100),
dealer_recv_half.recv(),
)
.await
{
break;
}
}
});
// Allow disconnect propagation + port release.
async_rt::task::sleep(Duration::from_millis(500)).await;
// Phase 4: Restart REP on the same address.
let mut rep2 = rustzmq2::RepSocket::new();
rep2.bind(&addr).await.expect("rebind failed");
// Phase 5: Keep sending from the DEALER half until reconnect completes.
let mut success = false;
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while std::time::Instant::now() < deadline {
// Try sending from DEALER. Fails with ReturnToSender while
// reconnect is in progress (no peer in registry).
if !matches!(
async_rt::task::timeout(
Duration::from_millis(400),
dealer_send_half.send(dealer_msg("after-reconnect")),
)
.await,
Ok(Ok(()))
) {
async_rt::task::sleep(Duration::from_millis(200)).await;
continue;
}
// Try receiving on new REP. REP recv() strips the envelope
// and returns only the payload frame(s).
if let Ok(Ok(msg)) =
async_rt::task::timeout(Duration::from_millis(600), rep2.recv()).await
{
// payload frame index 0 ("after-reconnect")
let data = msg.get(0).map(|b| b.to_vec()).unwrap_or_default();
if data == b"after-reconnect" {
success = true;
break;
}
}
async_rt::task::sleep(Duration::from_millis(200)).await;
}
assert!(success, "DEALER did not successfully reconnect within 5 s");
drop(dealer_send_half);
drop(rep2);
}
}
// ─── Heartbeat timeout evicts stalled peer ───────────────────────────────────
mod heartbeat_timeout {
use rustzmq2::__bench::engine::tcp_engine_silent_peer;
use std::time::Duration;
/// Verify that when a peer sends no PONG the sender's `peer_loop` exits
/// within `timeout + margin`.
///
/// Setup: `tcp_engine_silent_peer` builds a TCP pair where the sender has
/// heartbeat configured and the "peer" side reads frames but never writes
/// back. The TCP connection remains open so EOF-based eviction cannot
/// fire — only the heartbeat timeout can terminate the `peer_loop`.
///
/// Assertions:
/// - `sender.writer_alive()` starts `true`.
/// - After `timeout + 200ms` margin it becomes `false`.
#[tokio::test]
async fn stalled_peer_evicted_after_timeout() {
let interval = Duration::from_millis(80);
let timeout = Duration::from_millis(120);
let ttl = Duration::from_millis(500);
let (sender, _keep_alive) = tcp_engine_silent_peer(64, interval, timeout, ttl).await;
// Initially the writer must be alive.
assert!(sender.writer_alive(), "writer should start alive");
// Wait for: interval (first PING sent) + timeout (no PONG received) + margin.
let eviction_deadline = interval + timeout + Duration::from_millis(200);
tokio::time::sleep(eviction_deadline).await;
// The peer_loop must have exited by now.
assert!(
!sender.writer_alive(),
"writer should be dead after heartbeat timeout ({eviction_deadline:?} elapsed)"
);
}
}