#![cfg(feature = "io-uring")]
use crate::ZmqError;
use bytes::Bytes;
use io_uring::IoUring;
use libc;
use parking_lot::Mutex;
use std::collections::VecDeque;
use tracing::{error, info, trace, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct RegisteredSendBufferId(u16);
#[derive(Debug)]
struct SendBufferSlot {
id: RegisteredSendBufferId,
buffer: Vec<u8>, in_kernel_use: bool, }
impl SendBufferSlot {
fn ptr(&self) -> *const u8 {
self.buffer.as_ptr()
}
fn capacity(&self) -> usize {
self.buffer.capacity()
}
fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.buffer
}
fn iovec(&self) -> libc::iovec {
libc::iovec {
iov_base: self.buffer.as_ptr() as *mut libc::c_void,
iov_len: self.buffer.len(), }
}
}
#[derive(Debug)]
struct SendBufferPoolInner {
pool: Vec<SendBufferSlot>,
free_ids: VecDeque<RegisteredSendBufferId>,
}
#[derive(Debug)]
pub(crate) struct SendBufferPool {
inner: Mutex<SendBufferPoolInner>,
}
impl SendBufferPool {
pub fn new(ring: &IoUring, count: usize, capacity_per_buffer: usize) -> Result<Self, ZmqError> {
if count == 0 || capacity_per_buffer == 0 {
warn!("SendBufferPool initialized with zero count or capacity. Zero-copy send will be effectively disabled.");
return Ok(Self {
inner: Mutex::new(SendBufferPoolInner {
pool: Vec::new(),
free_ids: VecDeque::new(),
}),
});
}
let mut slots = Vec::with_capacity(count);
let mut free_ids = VecDeque::with_capacity(count);
for i in 0..count {
slots.push(SendBufferSlot {
id: RegisteredSendBufferId(i as u16),
buffer: vec![0u8; capacity_per_buffer],
in_kernel_use: false,
});
free_ids.push_back(RegisteredSendBufferId(i as u16));
}
let iovecs_to_register: Vec<libc::iovec> = slots.iter().map(|slot| slot.iovec()).collect();
unsafe {
ring.submitter().register_buffers(&iovecs_to_register).map_err(|e| {
error!("SendBufferPool: Failed to register_buffers: {}", e);
ZmqError::Internal(format!("SendBufferPool: Failed to register_buffers: {}", e))
})?;
}
info!(
"SendBufferPool: Registered {} send buffers ({} bytes each).",
count, capacity_per_buffer
);
Ok(Self {
inner: Mutex::new(SendBufferPoolInner { pool: slots, free_ids }),
})
}
pub fn acquire_and_prep_buffer(&self, data_to_copy: &Bytes) -> Option<(RegisteredSendBufferId, *const u8, u32)> {
if data_to_copy.is_empty() {
trace!("SendBufferPool: acquire_and_prep_buffer called with empty data, skipping.");
return None; }
let mut inner_guard = self.inner.lock();
if inner_guard.pool.is_empty() {
trace!("SendBufferPool: No buffers configured in the pool.");
return None;
}
if let Some(buffer_id) = inner_guard.free_ids.pop_front() {
let slot = &mut inner_guard.pool[buffer_id.0 as usize];
if data_to_copy.len() > slot.capacity() {
warn!(
"Data ({} bytes) too large for send buffer slot {:?} (capacity {} bytes). Cannot use zero-copy for this send.",
data_to_copy.len(),
buffer_id,
slot.capacity()
);
inner_guard.free_ids.push_front(buffer_id); return None;
}
slot.as_mut_slice()[..data_to_copy.len()].copy_from_slice(data_to_copy);
slot.in_kernel_use = true;
trace!(
"SendBufferPool: Acquired buffer {:?} for {} bytes.",
buffer_id,
data_to_copy.len()
);
Some((buffer_id, slot.ptr(), data_to_copy.len() as u32))
} else {
trace!("SendBufferPool: No free send buffers available.");
None }
}
pub fn release_buffer(&self, id: RegisteredSendBufferId) {
let mut inner_guard = self.inner.lock();
if let Some(slot_index) = inner_guard.pool.iter().position(|s| s.id == id) {
let slot = &mut inner_guard.pool[slot_index];
if slot.in_kernel_use {
slot.in_kernel_use = false;
if !inner_guard.free_ids.contains(&id) {
inner_guard.free_ids.push_back(id);
} else {
warn!("SendBufferPool: Buffer ID {:?} was already in free_ids when trying to release (in_kernel_use was true). State may be inconsistent.", id);
}
trace!("SendBufferPool: Released buffer {:?}.", id);
} else {
warn!(
"SendBufferPool: Attempted to release buffer {:?} which was not marked as in_kernel_use.",
id
);
if !inner_guard.free_ids.contains(&id) {
inner_guard.free_ids.push_back(id);
}
}
} else {
error!("SendBufferPool: Attempted to release an unknown buffer ID: {:?}", id);
}
}
pub unsafe fn unregister_all(&self, ring: &IoUring) -> Result<(), ZmqError> {
let inner_guard = self.inner.lock(); if inner_guard.pool.is_empty() {
return Ok(());
}
drop(inner_guard);
info!("SendBufferPool: Unregistering all send buffers.");
ring.submitter().unregister_buffers().map_err(|e| {
error!("SendBufferPool: Failed to unregister_buffers: {}", e);
ZmqError::Internal(format!("SendBufferPool: Failed to unregister_buffers: {}", e))
})
}
}