#![cfg(feature = "io-uring")]
use crate::io_uring_backend::buffer_manager::BufferRingManager;
use crate::io_uring_backend::connection_handler::{
HandlerIoOps, HandlerSqeBlueprint, UringConnectionHandler, UringWorkerInterface,
};
use crate::io_uring_backend::ops::UserData;
use crate::io_uring_backend::worker::internal_op_tracker::{InternalOpPayload, InternalOpTracker, InternalOpType};
use crate::ZmqError;
use io_uring::cqueue;
use io_uring::{cqueue::Entry as CqeResult, squeue, types};
use std::fmt;
use std::os::unix::io::RawFd;
pub const IOURING_CQE_F_MORE: u32 = 1 << 1;
#[derive(Debug)]
pub(crate) struct MultishotReader {
fd: RawFd,
buffer_group_id: u16,
active_op_user_data: Option<UserData>,
is_active: bool,
cancel_op_user_data: Option<UserData>,
}
impl MultishotReader {
pub fn new(fd: RawFd, buffer_group_id: u16) -> Self {
Self {
fd,
buffer_group_id,
active_op_user_data: None,
is_active: false,
cancel_op_user_data: None,
}
}
pub fn buffer_group_id(&self) -> u16 {
self.buffer_group_id
}
pub fn prepare_recv_multi_intent(&self) -> Option<HandlerSqeBlueprint> {
if self.is_active || self.cancel_op_user_data.is_some() {
return None; }
Some(HandlerSqeBlueprint::RequestNewRingReadMultishot {
fd: self.fd,
bgid: self.buffer_group_id,
})
}
pub fn mark_operation_submitted(&mut self, op_user_data: UserData) {
self.active_op_user_data = Some(op_user_data);
self.is_active = true;
self.cancel_op_user_data = None; tracing::debug!(
"[MultishotReader FD={}] Marked as active in kernel with UserData {}.",
self.fd,
op_user_data
);
}
pub fn prepare_cancel_intent(&self) -> Option<HandlerSqeBlueprint> {
if !self.is_active || self.active_op_user_data.is_none() || self.cancel_op_user_data.is_some() {
return None; }
Some(HandlerSqeBlueprint::RequestNewAsyncCancel {
fd: self.fd,
target_user_data: self.active_op_user_data.unwrap(),
})
}
pub fn mark_cancellation_submitted(&mut self, cancel_sqe_user_data: UserData, _target_op_user_data: UserData) {
self.cancel_op_user_data = Some(cancel_sqe_user_data);
tracing::debug!(
"[MultishotReader FD={}] Cancellation submitted with UserData {}.",
self.fd,
cancel_sqe_user_data
);
}
pub fn process_cqe(
&mut self,
cqe: &CqeResult,
buffer_manager: &BufferRingManager,
owner_handler: &mut dyn UringConnectionHandler,
worker_interface: &UringWorkerInterface<'_>,
_internal_op_tracker_ref: &mut InternalOpTracker,
) -> Result<(HandlerIoOps, bool), ZmqError> {
let cqe_ud = cqe.user_data();
let cqe_res = cqe.result();
let cqe_flags = cqe.flags();
let mut ops_to_return = HandlerIoOps::new();
if Some(cqe_ud) == self.active_op_user_data {
if !self.is_active {
tracing::warn!("[MultishotReader FD={}] CQE (ud {}) for active_op_user_data, but reader not marked active_in_kernel. State inconsistency?", self.fd, cqe_ud);
self.is_active = true;
}
if cqe_res < 0 {
let errno = -cqe_res;
tracing::error!(
"[MultishotReader FD={}] Error on active multishot read (ud {}): errno {}. Terminating multishot.",
self.fd,
cqe_ud,
errno
);
self.active_op_user_data = None;
self.is_active = false;
return Ok((ops_to_return.set_error_close(), true));
}
let buffer_id_opt = cqueue::buffer_select(cqe_flags);
if buffer_id_opt.is_none() {
tracing::error!(
"[MultishotReader FD={}] Multishot CQE (ud {}) missing F_BUFFER flag or invalid BID! Flags: {:x}",
self.fd,
cqe_ud,
cqe_flags
);
self.active_op_user_data = None;
self.is_active = false;
return Ok((ops_to_return.set_error_close(), true));
}
let buffer_id = buffer_id_opt.unwrap(); let bytes_read = cqe_res as usize;
if bytes_read > 0 {
match unsafe { buffer_manager.borrow_kernel_filled_buffer(buffer_id, bytes_read) } {
Ok(borrowed_buffer) => {
ops_to_return = owner_handler.process_ring_read_data(&borrowed_buffer, buffer_id, worker_interface);
}
Err(e) => {
tracing::error!(
"[MultishotReader FD={}] Failed to borrow buffer ID {} ({} bytes): {:?}. Terminating multishot.",
self.fd,
buffer_id,
bytes_read,
e
);
self.active_op_user_data = None;
self.is_active = false;
return Ok((ops_to_return.set_error_close(), true));
}
}
} else {
tracing::info!(
"[MultishotReader FD={}] EOF on multishot read (ud {}). Terminating multishot.",
self.fd,
cqe_ud
);
ops_to_return = owner_handler.process_ring_read_data(&[], buffer_id, worker_interface);
}
if (cqe_flags & IOURING_CQE_F_MORE) == 0 || bytes_read == 0 {
tracing::debug!(
"[MultishotReader FD={}] Multishot read (ud {}) finished (no MORE flag or EOF). Bytes read: {}",
self.fd,
cqe_ud,
bytes_read
);
self.active_op_user_data = None;
self.is_active = false;
return Ok((ops_to_return, true));
} else {
tracing::trace!(
"[MultishotReader FD={}] Multishot read (ud {}) has MORE flag. Op remains active.",
self.fd,
cqe_ud
);
return Ok((ops_to_return, false));
}
} else if Some(cqe_ud) == self.cancel_op_user_data {
tracing::debug!(
"[MultishotReader FD={}] AsyncCancel CQE (ud {}) received for multishot op (target_ud: {:?}). Res: {}",
self.fd,
cqe_ud,
self.active_op_user_data,
cqe_res
);
if cqe_res < 0 && cqe_res != -libc::ECANCELED && cqe_res != -libc::ENOENT {
tracing::warn!(
"[MultishotReader FD={}] AsyncCancel for multishot op failed with error: {}",
self.fd,
cqe_res
);
}
if let Some(original_multishot_ud) = self.active_op_user_data.take() {
tracing::trace!(
"[MultishotReader FD={}] Original multishot op (ud {}) is now considered terminated due to cancel CQE.",
self.fd,
original_multishot_ud
);
}
self.active_op_user_data = None;
self.cancel_op_user_data = None;
self.is_active = false;
return Ok((ops_to_return, true));
} else {
tracing::error!("[MultishotReader FD={}] process_cqe called with non-matching UserData (ud {}). This indicates a logic error in cqe_processor's delegation.", self.fd, cqe_ud);
return Err(ZmqError::Internal(
"MultishotReader::process_cqe called with non-matching UserData".into(),
));
}
}
pub fn is_active(&self) -> bool {
self.is_active && self.cancel_op_user_data.is_none()
}
pub(crate) fn set_active(&mut self, user_data: UserData) {
if self.active_op_user_data == Some(user_data) {
self.is_active = true;
tracing::debug!(
"[MultishotReader FD={}] Marked as active with UserData {}.",
self.fd,
user_data
);
} else {
tracing::warn!("[MultishotReader FD={}] set_active called with UserData {}, but current expected is {:?}. State unchanged unless matching.", self.fd, user_data, self.active_op_user_data);
}
}
pub(crate) fn matches_cqe_user_data(&self, cqe_user_data: UserData) -> bool {
self.active_op_user_data == Some(cqe_user_data) || self.cancel_op_user_data == Some(cqe_user_data)
}
pub(crate) fn set_inactive_due_to_close(&mut self) {
tracing::debug!("[MultishotReader FD={}] Marked as inactive due to FD closure.", self.fd);
self.is_active = false;
self.active_op_user_data = None;
self.cancel_op_user_data = None; }
}