rzmq 0.5.16

High performance, fully asynchronous, safe pure-Rust implementation of ZeroMQ (ØMQ) messaging with io_uring and TCP Cork acceleration on Linux.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
#![cfg(feature = "io-uring")]

// Declare internal worker sub-modules
mod cqe_processor;
mod eventfd_poller;
mod external_op_tracker;
mod handler_manager;
mod internal_op_tracker;
mod main_loop;
mod multishot_reader;
mod sqe_builder;

use crate::io_uring_backend::buffer_manager::BufferRingManager;
use crate::io_uring_backend::connection_handler::{
  HandlerSqeBlueprint, HandlerUpstreamEvent, OutgoingMessage, ProtocolHandlerFactory, WorkerIoConfig
};
use crate::io_uring_backend::ops::UringOpRequest;
use crate::io_uring_backend::send_buffer_pool::SendBufferPool;
use crate::io_uring_backend::signaling_op_sender::SignalingOpSender;
use crate::io_uring_backend::UserData;
use crate::uring::{global_state, UringConfig};
use crate::ZmqError;

use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::mem;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use fibre::mpmc::{unbounded, AsyncSender, Sender as SyncSender, Receiver as SyncReceiver};
use fibre::mpsc;
use io_uring::opcode;
use io_uring::IoUring;
use tracing::{debug, error, info, trace, warn};

// Publicly re-export for use within io_uring_backend module
pub(crate) use eventfd_poller::EventFdPoller;
pub(crate) use external_op_tracker::{ExternalOpContext, ExternalOpTracker};
pub(crate) use handler_manager::HandlerManager;
pub(crate) use internal_op_tracker::{InternalOpPayload, InternalOpTracker, InternalOpType, PinnedIovecs};
pub(crate) use multishot_reader::MultishotReader;


#[derive(Debug, Default)]
struct FdWork {
  pub(crate) pending_blueprints: VecDeque<HandlerSqeBlueprint>,
  pub(crate) app_data: VecDeque<OutgoingMessage>,
  pub(crate) write_in_flight: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WorkerState {
  Running,
  Draining, // Shutdown initiated, processing in-flight completions only
  CleaningUp, 
  Stopped,
}

pub struct UringWorker {
  pub(crate) state: WorkerState,
  ring: IoUring,
  op_rx: SyncReceiver<UringOpRequest>,

  pub(crate) work_map: HashMap<RawFd, FdWork>,
  buffer_manager: Option<BufferRingManager>, // Option because it's initialized via UringOpRequest
  handler_manager: HandlerManager,

  external_op_tracker: ExternalOpTracker,
  internal_op_tracker: InternalOpTracker,

  // WorkerIoConfig is Arc'd because it's shared with handlers created by HandlerManager
  worker_io_config: Arc<WorkerIoConfig>,
  default_buffer_ring_group_id_val: Option<u16>,
  fds_needing_close_initiated_pass: VecDeque<RawFd>,
  pub(crate) event_fd_poller: EventFdPoller,
  send_buffer_pool: Option<Arc<SendBufferPool>>, // For zero-copy sends
  pub(crate) fd_to_mpsc_rx: HashMap<RawFd, Arc<mpsc::BoundedReceiver<OutgoingMessage>>>,
  // Configuration values passed at spawn time or from global settings
  cfg_send_zerocopy_enabled: bool,
  cfg_send_buffer_count: usize, //TODO revisit
  cfg_send_buffer_size: usize,
  // Shared flag: true while the worker is blocked in submit_with_args waiting for kernel events.
  // Connections check this before writing to eventfd to avoid redundant syscalls.
  pub(crate) worker_asleep: Arc<AtomicBool>,
  // Pool of recycled iovec pairs for zero-allocation Writev submissions.
  pub(crate) iovec_pool: VecDeque<PinnedIovecs>,
  // Pool of pre-allocated buffers for coalescing multiple outgoing writes into one SQE.
  pub(crate) coalesce_buffer_pool: VecDeque<bytes::BytesMut>,
}

impl fmt::Debug for UringWorker {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    f.debug_struct("UringWorker")
      .field("ring_fd", &self.ring.as_raw_fd())
      .field("op_rx_len", &self.op_rx.len())
      .field("op_rx_is_closed", &self.op_rx.is_closed())
      .field("buffer_manager_is_some", &self.buffer_manager.is_some())
      .field("external_op_tracker_len", &self.external_op_tracker.in_flight.len())
      .field("internal_op_tracker_len", &self.internal_op_tracker.op_to_details.len())
      .field("default_buffer_ring_group_id_val", &self.default_buffer_ring_group_id_val)
      .field("event_fd_poller", &self.event_fd_poller)
      .field("fd_to_mpsc_rx_len", &self.fd_to_mpsc_rx.len())
      .finish_non_exhaustive()
  }
}

impl UringWorker {
  pub fn spawn_with_config(
    config: UringConfig,
    factories: Vec<Arc<dyn ProtocolHandlerFactory>>,
    upstream_event_tx: SyncSender<(RawFd, HandlerUpstreamEvent)>,
  ) -> Result<(SignalingOpSender, std::thread::JoinHandle<Result<(), ZmqError>>), ZmqError> {
    let (op_tx_sync, op_rx) = unbounded::<UringOpRequest>();
    let op_tx_async_for_signaler: AsyncSender<UringOpRequest> = op_tx_sync.to_async();

    let worker_io_config = Arc::new(WorkerIoConfig { upstream_event_tx });

    let event_fd_master_instance =
      eventfd::EventFD::new(0, eventfd::EfdFlags::EFD_CLOEXEC | eventfd::EfdFlags::EFD_NONBLOCK).map_err(|e| {
        error!("Failed to create master EventFD for UringWorker: {}", e);
        ZmqError::Internal(format!("Master EventFD creation failed: {}", e))
      })?;

    let worker_asleep = Arc::new(AtomicBool::new(false));
    let signaling_op_sender = SignalingOpSender::new(
      op_tx_async_for_signaler,
      event_fd_master_instance.clone(),
      Arc::clone(&worker_asleep),
    );

    let worker_thread_join_handle = std::thread::Builder::new()
      .name("rzmq-io-uring-worker".into())
      .spawn(move || { // config is moved here
        match IoUring::new(config.ring_entries) {
          Ok(ring) => {
            let mut internal_tracker = InternalOpTracker::new();
            let event_fd_poller_instance = EventFdPoller::new_with_fd(
                event_fd_master_instance,
                &mut internal_tracker,
            );

            // --- SendBufferPool Initialization ---
            let mut worker_send_buffer_pool: Option<Arc<SendBufferPool>> = None;
            // This variable will hold the *actual* state of ZC enablement for this worker instance.
            let mut effective_send_zerocopy_enabled_for_worker = config.default_send_zerocopy;

            if config.default_send_zerocopy {
                if config.default_send_buffer_count > 0 && config.default_send_buffer_size > 0 {
                    // TODO: Consider RLIMIT_MEMLOCK check here or ensure it's documented.
                    match SendBufferPool::new(&ring, config.default_send_buffer_count, config.default_send_buffer_size) {
                        Ok(pool) => {
                            info!("UringWorker: SendBufferPool initialized (count: {}, size: {}). Zero-copy send enabled.", config.default_send_buffer_count, config.default_send_buffer_size);
                            worker_send_buffer_pool = Some(Arc::new(pool));
                        }
                        Err(e) => {
                            error!("UringWorker: Failed to initialize SendBufferPool from config: {}. Disabling ZC send for this worker.", e);
                            effective_send_zerocopy_enabled_for_worker = false;
                        }
                    }
                } else {
                    info!("UringWorker: Zero-copy send requested by config, but pool count/size is zero. Disabling ZC send for this worker.");
                    effective_send_zerocopy_enabled_for_worker = false;
                }
            } else {
                info!("UringWorker: Zero-copy send not enabled by config.");
            }

            // --- Default BufferRingManager Initialization (for bgid 0) ---
            let mut default_worker_buffer_manager: Option<BufferRingManager> = None;
            let mut default_worker_bgid_val: Option<u16> = None;

            // Create the default buffer ring if buffers are configured.
            // This ring is used for both single-shot and multi-shot provided-buffer reads.
            // The `config.default_recv_multishot` flag will control which *type* of read is *attempted*,
            // but the infrastructure (the buffer ring) should exist for either.
            if config.default_recv_buffer_count > 0 && config.default_recv_buffer_size > 0 {
                match BufferRingManager::new(&ring, config.default_recv_buffer_count as u16, 0, config.default_recv_buffer_size) {
                    Ok(bm) => {
                        info!("UringWorker: Default BufferRingManager (bgid 0) initialized (count: {}, size: {}). Provided-buffer reads are enabled.", config.default_recv_buffer_count, config.default_recv_buffer_size);
                        default_worker_buffer_manager = Some(bm);
                        default_worker_bgid_val = Some(0); // Default ring uses bgid 0
                    }
                    Err(e) => {
                        error!("UringWorker: Failed to initialize default BufferRingManager from config: {}. Provided-buffer reads may fail.", e);
                    }
                }
            } else {
                  info!("UringWorker: Default provided-buffer recv ring not configured (count/size is zero).");
            }

            // Pre-populate the iovec pool to avoid per-message Box allocations on the vectored send path.
            let mut iovec_pool = VecDeque::with_capacity(256);
            for _ in 0..256 {
              iovec_pool.push_back(PinnedIovecs(Box::new([
                libc::iovec { iov_base: std::ptr::null_mut(), iov_len: 0 },
                libc::iovec { iov_base: std::ptr::null_mut(), iov_len: 0 },
              ])));
            }

            // Pre-allocate coalescing buffers (32 × 64KB = 2MB) to avoid per-send allocations.
            let mut coalesce_buffer_pool = VecDeque::with_capacity(32);
            for _ in 0..32 {
              coalesce_buffer_pool.push_back(bytes::BytesMut::with_capacity(65536));
            }

            let mut worker = UringWorker {
              state: WorkerState::Running,
              ring,
              op_rx,
              work_map: HashMap::new(),
              buffer_manager: default_worker_buffer_manager,
              handler_manager: HandlerManager::new(factories, worker_io_config.clone()),
              external_op_tracker: ExternalOpTracker::new(),
              internal_op_tracker: internal_tracker,
              event_fd_poller: event_fd_poller_instance,
              worker_io_config,
              default_buffer_ring_group_id_val: default_worker_bgid_val,
              fds_needing_close_initiated_pass: VecDeque::new(),
              send_buffer_pool: worker_send_buffer_pool,
              fd_to_mpsc_rx: HashMap::new(),
              cfg_send_zerocopy_enabled: effective_send_zerocopy_enabled_for_worker,
              cfg_send_buffer_count: config.default_send_buffer_count,
              cfg_send_buffer_size: config.default_send_buffer_size,
              worker_asleep,
              iovec_pool,
              coalesce_buffer_pool,
            };

            {
              let loop_result = main_loop::run_worker_loop(&mut worker);
              if loop_result.is_err() {
                warn!("UringWorker: Loop exited with error, cleanup might be partial.");
              }
            }
            
            info!("rzmq-io-uring-worker OS thread (PID: {}) finished.", std::process::id());
            Ok(()) // The loop_result is no longer returned, just Ok(())
          }
          Err(e) => {
            error!("Failed to initialize IoUring (entries: {}): {}. UringWorker thread cannot start.", config.ring_entries, e);
            Err(ZmqError::Internal(format!("IoUring init failed: {}", e)))
          }
        }
      })
      .map_err(|e| ZmqError::Internal(format!("Failed to spawn UringWorker thread: {:?}", e)))?;

    Ok((signaling_op_sender, worker_thread_join_handle))
  }
}

impl UringWorker {
  fn transition_to_draining(&mut self) {
    info!("UringWorker: Transitioning to DRAINING state.");
    self.state = WorkerState::Draining;
    
    // Instead of aborting, we take and drop the sender side of the
    // upstream channel. The processor's `recv().await` will then return an
    // error, causing its loop to terminate gracefully.
    if let Some(tx) = global_state::get_global_parsed_msg_tx_mutex().lock().take() {
        drop(tx);
        debug!("UringWorker: Dropped upstream TX channel to signal processor shutdown.");
    }

    let mut sq_for_shutdown = unsafe { self.ring.submission_shared() };

    // Cancel all in-flight internal kernel operations.
    let internal_ops_to_cancel: Vec<UserData> =
      self.internal_op_tracker.op_to_details.keys().copied().collect();

    info!(
      "UringWorker: Draining state - Cancelling {} in-flight internal operations.",
      internal_ops_to_cancel.len()
    );

    for op_ud in internal_ops_to_cancel {
      if sq_for_shutdown.is_full() {
        warn!("UringWorker draining transition: SQ full, cannot submit all cancel ops. CQE processing will need to handle the rest.");
        break;
      }
      trace!(
        "UringWorker draining transition: Submitting AsyncCancel for internal op_ud {}",
        op_ud
      );
      // The cancel op itself doesn't need its own tracker entry,
      // as we will be draining all CQEs anyway. We can give it a sentinel UD.
      let cancel_sqe = opcode::AsyncCancel::new(op_ud).build().user_data(0); // Sentinel UD for cancel op
      unsafe {
        let _ = sq_for_shutdown.push(&cancel_sqe);
      }
    }
    drop(sq_for_shutdown);

    // After submitting cancellations, we must submit the ring to make sure the kernel sees them.
    if let Err(e) = self.ring.submitter().submit() {
      warn!(
        "UringWorker draining transition: Error submitting cancellation SQEs: {}",
        e
      );
    }
  }
}

// --- Helper functions for address conversion (moved from sqe_builder or other places) ---
pub(crate) fn socket_addr_to_sockaddr_storage(
  addr: &SocketAddr,
  storage: &mut libc::sockaddr_storage,
) -> libc::socklen_t {
  unsafe {
    // Zero out the storage first to avoid garbage in padding bytes
    // especially for sockaddr_in.
    *(storage as *mut _ as *mut [u8; std::mem::size_of::<libc::sockaddr_storage>()]) =
      [0; std::mem::size_of::<libc::sockaddr_storage>()];

    match addr {
      SocketAddr::V4(v4_addr) => {
        let sockaddr_in: &mut libc::sockaddr_in = mem::transmute(storage);
        sockaddr_in.sin_family = libc::AF_INET as libc::sa_family_t;
        sockaddr_in.sin_port = v4_addr.port().to_be();
        sockaddr_in.sin_addr = libc::in_addr {
          s_addr: u32::from_ne_bytes(v4_addr.ip().octets()).to_be(),
        };
        mem::size_of::<libc::sockaddr_in>() as libc::socklen_t
      }
      SocketAddr::V6(v6_addr) => {
        let sockaddr_in6: &mut libc::sockaddr_in6 = mem::transmute(storage);
        sockaddr_in6.sin6_family = libc::AF_INET6 as libc::sa_family_t;
        sockaddr_in6.sin6_port = v6_addr.port().to_be();
        sockaddr_in6.sin6_addr = libc::in6_addr {
          s6_addr: v6_addr.ip().octets(),
        };
        sockaddr_in6.sin6_flowinfo = v6_addr.flowinfo(); // Already in network byte order from std
        sockaddr_in6.sin6_scope_id = v6_addr.scope_id(); // Already in network byte order from std
        mem::size_of::<libc::sockaddr_in6>() as libc::socklen_t
      }
    }
  }
}

#[allow(dead_code)] // Used in cqe_processor for unwrap_or_else
pub(crate) fn dummy_socket_addr() -> SocketAddr {
  SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))
}

pub(crate) fn get_peer_local_addr(fd: RawFd) -> Result<(SocketAddr, SocketAddr), std::io::Error> {
  let mut peer_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
  let mut peer_addrlen = mem::size_of_val(&peer_storage) as libc::socklen_t;
  if unsafe {
    libc::getpeername(
      fd,
      &mut peer_storage as *mut _ as *mut libc::sockaddr,
      &mut peer_addrlen,
    )
  } != 0
  {
    return Err(std::io::Error::last_os_error());
  }
  let peer_saddr = sockaddr_storage_to_socket_addr(&peer_storage, peer_addrlen)?;

  let mut local_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
  let mut local_addrlen = mem::size_of_val(&local_storage) as libc::socklen_t;
  if unsafe {
    libc::getsockname(
      fd,
      &mut local_storage as *mut _ as *mut libc::sockaddr,
      &mut local_addrlen,
    )
  } != 0
  {
    return Err(std::io::Error::last_os_error());
  }
  let local_saddr = sockaddr_storage_to_socket_addr(&local_storage, local_addrlen)?;

  Ok((peer_saddr, local_saddr))
}

pub(crate) fn sockaddr_storage_to_socket_addr(
  storage: &libc::sockaddr_storage,
  len: libc::socklen_t,
) -> std::io::Result<SocketAddr> {
  match storage.ss_family as libc::c_int {
    libc::AF_INET => {
      if len as usize >= mem::size_of::<libc::sockaddr_in>() {
        let sa = unsafe { &*(storage as *const _ as *const libc::sockaddr_in) };
        let ip = Ipv4Addr::from(u32::from_be(sa.sin_addr.s_addr)); // s_addr is in network byte order
        let port = u16::from_be(sa.sin_port); // sin_port is in network byte order
        Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
      } else {
        Err(std::io::Error::new(
          std::io::ErrorKind::InvalidInput,
          "sockaddr_in length too small",
        ))
      }
    }
    libc::AF_INET6 => {
      if len as usize >= mem::size_of::<libc::sockaddr_in6>() {
        let sa = unsafe { &*(storage as *const _ as *const libc::sockaddr_in6) };
        let ip = Ipv6Addr::from(sa.sin6_addr.s6_addr); // s6_addr is network order
        let port = u16::from_be(sa.sin6_port); // sin6_port is network order
        let flowinfo = u32::from_be(sa.sin6_flowinfo); // sin6_flowinfo is network order
        let scope_id = u32::from_be(sa.sin6_scope_id); // sin6_scope_id is network order
        Ok(SocketAddr::V6(SocketAddrV6::new(ip, port, flowinfo, scope_id)))
      } else {
        Err(std::io::Error::new(
          std::io::ErrorKind::InvalidInput,
          "sockaddr_in6 length too small",
        ))
      }
    }
    _ => Err(std::io::Error::new(
      std::io::ErrorKind::InvalidInput,
      "invalid socket address family",
    )),
  }
}