rzmq 0.5.15

High performance, fully asynchronous, safe pure-Rust implementation of ZeroMQ (ØMQ) messaging with io_uring and TCP Cork acceleration on Linux.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
use rzmq::socket::options::SNDTIMEO;
use rzmq::socket::SocketEvent;
use rzmq::{Msg, SocketType, ZmqError};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify; // For signalling
use tokio::task::{self, JoinHandle};
use tokio::time::timeout;
mod common;

const SHORT_TIMEOUT: Duration = Duration::from_millis(250);
const LONG_TIMEOUT: Duration = Duration::from_secs(2);
const MONITOR_EVENT_TIMEOUT: Duration = Duration::from_secs(3);

// --- Test: Context termination closes sockets and allows exit ---
#[tokio::test]
async fn test_context_term_closes_sockets() -> Result<(), ZmqError> {
  println!("Starting test_context_term_closes_sockets...");
  let ctx = common::test_context();
  let push = ctx.socket(SocketType::Push)?;
  let pull = ctx.socket(SocketType::Pull)?;

  println!("Setting up monitor for PUSH socket...");
  let push_monitor_rx = push.monitor_default().await?;
  println!("PUSH monitor setup.");

  let endpoint = "inproc://term-test";

  println!("Binding PULL...");
  pull.bind(endpoint).await?;
  println!("Connecting PUSH...");
  push.connect(endpoint).await?;

  println!("Main test: Waiting for PUSH monitor Connected event...");
  common::wait_for_monitor_event(
    &push_monitor_rx,
    MONITOR_EVENT_TIMEOUT,
    SHORT_TIMEOUT,
    |e| matches!(e, SocketEvent::Connected { endpoint: ep, .. } if *ep == endpoint),
  )
  .await
  .expect("Monitor did not receive Connected event");
  println!("Main test: PUSH monitor received Connected event.");
  tokio::time::sleep(Duration::from_millis(20)).await;

  println!("PUSH sending message...");
  push.send(Msg::from_static(b"Before Term")).await?;
  println!("PUSH sent.");

  println!("PULL receiving message...");
  let msg1 = common::recv_timeout(&pull, SHORT_TIMEOUT).await?;
  assert_eq!(msg1.data().unwrap(), b"Before Term");
  println!("PULL received.");

  println!("Initiating context termination...");
  // Initiate termination - this returns quickly
  let shutdown_task = ctx.shutdown(); // Use shutdown() to just initiate

  println!("Waiting for Disconnected event for {}...", endpoint);
  // Now wait for the event AFTER initiating shutdown
  let event_wait_task = common::wait_for_monitor_event(
    &push_monitor_rx,
    MONITOR_EVENT_TIMEOUT, // Use a reasonable timeout
    SHORT_TIMEOUT,
    |e| matches!(e, SocketEvent::Disconnected { endpoint: ep } if *ep == endpoint),
  );

  let (_, event_wait_result) = futures::future::join(shutdown_task, event_wait_task).await;

  match event_wait_result {
    Ok(_) => println!("PUSH monitor received Disconnected event as expected."),
    Err(e) => panic!("PUSH monitor wait failed: {}", e),
  }

  println!("Event received or timed out. Now waiting for term() to complete...");
  // Now await the full termination process
  let term_result = ctx.term().await; // This waits on the WaitGroup
  println!("ctx.term() completed with: {:?}", term_result);
  assert!(term_result.is_ok(), "Context termination failed");
  println!("Context terminated.");

  // Try using sockets after termination - should fail
  println!("Attempting PUSH set_option_raw after term (should fail)...");
  let setopt_res = push
    .set_option_raw(rzmq::socket::options::SNDTIMEO, &(0i32).to_ne_bytes())
    .await;
  println!("PUSH set_option_raw result: {:?}", setopt_res);
  assert!(
    setopt_res.is_err(),
    "Expected error setting option after term, got {:?}",
    setopt_res
  );
  println!("PUSH set_option_raw correctly failed: {:?}", setopt_res.err().unwrap());

  println!("Attempting PUSH send after term (should fail)...");
  let send_res = push.send(Msg::from_static(b"After Term")).await;
  assert!(
    send_res.is_err(),
    "Expected error sending after term and disconnect, got {:?}",
    send_res
  );
  println!("PUSH send correctly failed after term: {:?}", send_res.err().unwrap());

  println!("Attempting PULL recv after term (should fail)...");
  let recv_res = pull.recv().await;
  assert!(
    recv_res.is_err(),
    "Expected error receiving after term, got {:?}",
    recv_res
  );
  println!("PULL recv correctly failed: {:?}", recv_res.err().unwrap());

  println!("Test test_context_term_closes_sockets finished.");
  Ok(())
}

// --- Test: Explicit socket close stops activity ---
#[tokio::test]
async fn test_socket_close_stops_connection() -> Result<(), ZmqError> {
  println!("Starting test_socket_close_stops_connection...");
  let ctx = common::test_context();
  let push = ctx.socket(SocketType::Push)?;
  let pull = ctx.socket(SocketType::Pull)?; // Keep pull alive

  let endpoint = "tcp://127.0.0.1:5640"; // Unique port

  println!("Binding PUSH...");
  push.bind(endpoint).await?; // Let PUSH be the listener this time
  tokio::time::sleep(Duration::from_millis(50)).await;

  println!("Connecting PULL...");
  pull.connect(endpoint).await?;
  tokio::time::sleep(Duration::from_millis(150)).await; // Connect + handshake

  // PUSH sends a message, PULL receives it
  println!("PUSH sending Message 1...");
  push.send(Msg::from_static(b"Message 1")).await?;
  let msg1 = common::recv_timeout(&pull, LONG_TIMEOUT).await?;
  assert_eq!(msg1.data().unwrap(), b"Message 1");
  println!("PULL received Message 1.");

  // Close the PULL socket explicitly
  println!("Closing PULL socket...");
  pull.close().await?;
  println!("PULL socket closed.");

  // Allow time for close to propagate and detach pipe
  tokio::time::sleep(Duration::from_millis(200)).await;

  // PUSH attempts to send another message
  println!("PUSH sending Message 2 (after PULL closed)...");
  // Use non-blocking send to check if peer is gone
  push
    .set_option_raw(rzmq::socket::options::SNDTIMEO, &(0i32).to_ne_bytes())
    .await?;
  let send_res = push.send(Msg::from_static(b"Message 2")).await;
  println!("PUSH send result: {:?}", send_res);

  // Since the only peer (PULL) is closed, the PUSH load balancer has no targets.
  // SNDTIMEO=0 should result in ResourceLimitReached.
  assert!(
    matches!(send_res, Err(ZmqError::ResourceLimitReached)),
    "Expected ResourceLimitReached sending after peer close, got {:?}",
    send_res
  );
  println!("PUSH correctly failed sending after PULL closed.");

  println!("Terminating context...");
  ctx.term().await?; // Should clean up the remaining PUSH socket
  println!("Test test_socket_close_stops_connection finished.");
  Ok(())
}

// --- Test: Closing socket handle triggers cleanup ---
#[tokio::test]
async fn test_socket_explicit_close_triggers_disconnect_event() -> anyhow::Result<()> {
  println!("Starting test_socket_explicit_close_triggers_disconnect_event...");
  let ctx = common::test_context();
  let push = ctx.socket(SocketType::Push)?; // Keep as owned initially

  // ... (setup push_monitor, bind push, check Listening) ...
  println!("Setting up PUSH monitor...");
  let push_monitor = push.monitor_default().await?;

  let endpoint = "tcp://127.0.0.1:5641"; // Keep unique port

  println!("Binding PUSH...");
  push.bind(endpoint).await?;

  println!("Expecting Listening event...");
  common::wait_for_monitor_event(
    &push_monitor,
    MONITOR_EVENT_TIMEOUT,
    SHORT_TIMEOUT,
    |e| matches!(e, SocketEvent::Listening { endpoint: ep } if ep == endpoint),
  )
  .await
  .map_err(|e| anyhow::anyhow!("Listening event wait failed: {}", e))?;
  println!("PUSH Monitor: Received Listening event.");

  tokio::time::sleep(Duration::from_millis(50)).await;

  let disconnected_endpoint_uri: String;

  {
    // Scope for PULL socket
    let pull = ctx.socket(SocketType::Pull)?;
    println!("Connecting PULL...");
    pull.connect(endpoint).await?;

    println!("Expecting Accepted/Handshake event...");
    let event2 = common::wait_for_monitor_event(&push_monitor, MONITOR_EVENT_TIMEOUT, SHORT_TIMEOUT, |e| {
      matches!(e, SocketEvent::HandshakeSucceeded { .. })
    })
    .await
    .map_err(|e| anyhow::anyhow!("Accepted/Handshake event wait failed: {}", e))?;
    println!("PUSH Monitor: Received connection event: {:?}", event2);

    disconnected_endpoint_uri = match event2 {
      SocketEvent::Accepted { endpoint: _, peer_addr } => format!("tcp://{}", peer_addr),
      SocketEvent::HandshakeSucceeded { endpoint: ep } => ep,
      _ => panic!("Unexpected event type received: {:?}", event2),
    };
    println!("Determined peer endpoint URI: {}", disconnected_endpoint_uri);

    tokio::time::sleep(Duration::from_millis(150)).await;

    println!("PUSH sending Message 1...");
    push.send(Msg::from_static(b"Message 1")).await?;
    let msg1 = common::recv_timeout(&pull, LONG_TIMEOUT).await?;
    assert_eq!(msg1.data().unwrap(), b"Message 1");
    println!("PULL received Message 1.");

    println!("PULL socket closing...");
    pull.close().await?;
    println!("PULL socket closed (explicitly).");
  } // pull socket scope ends, handle is dropped

  println!("Waiting for Disconnected event for {}...", disconnected_endpoint_uri);
  common::wait_for_monitor_event(
    &push_monitor,
    MONITOR_EVENT_TIMEOUT,
    SHORT_TIMEOUT,
    |e| matches!(e, SocketEvent::Disconnected { endpoint: ep } if *ep == disconnected_endpoint_uri),
  )
  .await
  .map_err(|e| anyhow::anyhow!("Disconnected event wait failed: {}", e))?;
  println!(
    "PUSH Monitor: Received Disconnected event for {}.",
    disconnected_endpoint_uri
  );

  println!("PUSH setting SNDTIMEO=0...");
  push.set_option_raw(SNDTIMEO, &(0i32).to_ne_bytes()).await?;
  println!("PUSH sending Message 2 (after PULL disconnected)...");
  let send_res = push.send(Msg::from_static(b"Message 2")).await;
  println!("PUSH send result: {:?}", send_res);

  assert!(
    matches!(send_res, Err(ZmqError::ResourceLimitReached)),
    "Expected ResourceLimitReached sending after peer disconnect, got {:?}",
    send_res
  );
  println!("PUSH correctly failed sending after PULL disconnected.");

  println!("Closing PUSH socket explicitly before context term...");
  let push_close_res = push.close().await;
  println!("PUSH close result: {:?}", push_close_res);
  // We expect Ok(()) here as the socket should still be operational enough to process close
  assert!(push_close_res.is_ok(), "PUSH close failed unexpectedly");
  // Allow a moment for the close command to be processed and shutdown to initiate
  tokio::time::sleep(Duration::from_millis(50)).await;

  println!("Terminating context...");
  ctx.term().await?; // Terminate context gracefully

  println!("Checking if monitor channel is closed...");
  match timeout(SHORT_TIMEOUT, push_monitor.recv()).await {
    Ok(Ok(event)) => {
      // It's possible to receive a final 'Closed' or 'Disconnected' related to the *listener*
      // being shut down as part of the PUSH socket closing, *before* the monitor channel itself closes.
      // However, receiving ConnectDelayed is definitely wrong.
      match event {
        SocketEvent::ConnectDelayed { .. } => {
          anyhow::bail!(
            "Received unexpected ConnectDelayed event after context term: {:?}",
            event
          );
        }
        // Allow other close-related events like Closed or potentially another Disconnected
        // if the timing is very specific, but log them as warnings maybe.
        _ => {
          println!(
            "PUSH Monitor received final event {:?} (expected closed/empty). Tolerating.",
            event
          );
        }
      }
    }
    Ok(Err(_recv_err)) => {
      println!("PUSH Monitor channel correctly closed after context term.");
    }
    Err(_) => {
      println!("PUSH Monitor channel timed out (likely closed and empty) after context term.");
    }
  }

  println!("Test test_socket_explicit_close_triggers_disconnect_event finished.");
  Ok(())
}

// --- Test: Concurrent Context Termination and Socket Operation ---
// This test is a bit more complex, aiming to catch race conditions during shutdown.
#[tokio::test]
async fn test_concurrent_term_and_op() -> Result<(), ZmqError> {
  println!("Starting test_concurrent_term_and_op...");
  let ctx = common::test_context();
  let push = Arc::new(ctx.socket(SocketType::Push)?);
  let pull = ctx.socket(SocketType::Pull)?;
  let endpoint = "inproc://concurrent-term";

  println!("Binding PULL...");
  pull.bind(endpoint).await?;
  println!("Connecting PUSH...");
  push.connect(endpoint).await?;
  tokio::time::sleep(Duration::from_millis(50)).await;

  let push_clone = push.clone();
  let finished_sending = Arc::new(Notify::new());
  let finished_sending_clone = finished_sending.clone();

  // Task that continuously sends messages
  let send_task: JoinHandle<()> = task::spawn(async move {
    // Type annotation for clarity
    let mut count = 0;
    loop {
      let msg = Msg::from_vec(format!("Msg {}", count).into_bytes());
      match common::send_timeout(&*push_clone, msg, SHORT_TIMEOUT).await {
        Ok(()) => {
          count += 1;
          tokio::task::yield_now().await;
        }
        Err(ZmqError::Timeout) => {
          println!("Send task: Send timed out.");
          break;
        }
        Err(e) => {
          println!("Send task: Send failed: {}", e);
          break;
        }
      }
      if count % 1000 == 0 {
        println!("Send task: Sent {} messages", count);
      }
    }
    println!("Send task finished.");
    finished_sending_clone.notify_one(); // Signal completion
  });

  // Main task receives some messages, then terminates context
  println!("Main task receiving first message...");
  let _ = common::recv_timeout(&pull, LONG_TIMEOUT).await?; // Receive at least one
  println!("Main task received one message.");
  tokio::time::sleep(Duration::from_millis(10)).await; // Allow sender to send a bit more

  println!("Main task initiating context termination...");

  let termination_complete = Arc::new(Notify::new());
  let termination_complete_clone = termination_complete.clone();

  // Spawn termination task
  let term_task: JoinHandle<Result<(), ZmqError>> = task::spawn(async move {
    let result = ctx.term().await; // ctx is MOVED here
    println!("Termination task: ctx.term() finished with result: {:?}", result);
    termination_complete_clone.notify_waiters(); // Signal completion
    result // Return the result
  });

  // Wait for EITHER termination to signal completion OR the send task to signal completion
  tokio::select! {
      // Branch 1: Termination task signals completion first
      _ = termination_complete.notified() => {
          println!("Termination completed signal received first.");
          // Termination is done, no need to wait further here in select.
          // We'll check term_task result outside the select.
      }
      // Branch 2: Send task signals completion first
      _ = finished_sending.notified() => {
          println!("Send task finished notification received first.");
          // Send task finished, now explicitly wait for termination to also signal completion.
          println!("Awaiting final termination signal after send task finished...");
           _ = termination_complete.notified().await; // Wait for the signal
          println!("Final termination signal received after send task finished.");
      }
      // Branch 3: Overall timeout
      _ = tokio::time::sleep(Duration::from_secs(5)) => {
           panic!("Test timed out waiting for termination or send task completion");
      }
  }

  // --- Post-select checks ---

  // Check the result of the termination task *after* select ensures it was signalled
  let term_final_result = term_task.await; // Join the termination task
  match term_final_result {
    Ok(Ok(())) => {
      println!("Termination task joined successfully.");
    }
    Ok(Err(e)) => return Err(e), // Termination itself failed with ZmqError
    Err(join_err) => panic!("Term task panicked: {:?}", join_err), // Term task panicked
  }

  // Ensure send task didn't panic (outside the select)
  if let Err(e) = send_task.await {
    match e.try_into_panic() {
      Ok(payload) => std::panic::resume_unwind(payload),
      Err(join_err) if !join_err.is_cancelled() => {
        // Only panic if it wasn't just cancelled by the runtime/termination
        panic!("Send task failed to join normally: {:?}", join_err);
      }
      _ => {
        println!("Send task was cancelled (expected outcome possible).");
      }
    }
  }

  println!("Test test_concurrent_term_and_op finished.");
  Ok(())
}