#![cfg(feature = "io-uring")]
use crate::message::Msg;
use crate::{Blob, ZmqError};
use std::any::Any;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use bytes::Bytes;
use fibre::mpmc::Sender as UpstreamEventSender;
pub use crate::io_uring_backend::ops::UserData;
use super::buffer_manager::BufferRingManager;
use super::ops::ProtocolConfig;
use super::worker::InternalOpTracker;
#[derive(Debug, Clone)]
pub enum HandlerSqeBlueprint {
RequestSend {
data: Bytes,
send_op_flags: i32,
originating_app_op_ud: UserData,
},
RequestSendZeroCopy {
data_to_send: Bytes, send_op_flags: i32, originating_app_op_ud: UserData,
},
RequestClose,
RequestNewRingReadMultishot { fd: RawFd, bgid: u16 },
RequestNewAsyncCancel {
fd: RawFd, target_user_data: UserData, },
RequestSetCork(bool),
}
#[derive(Debug, Default)]
pub struct HandlerIoOps {
pub sqe_blueprints: Vec<HandlerSqeBlueprint>,
pub initiate_close_due_to_error: bool,
}
impl HandlerIoOps {
pub fn new() -> Self {
Self::default()
}
pub fn add_blueprint(mut self, bp: HandlerSqeBlueprint) -> Self {
self.sqe_blueprints.push(bp);
self
}
pub fn set_error_close(mut self) -> Self {
self.initiate_close_due_to_error = true;
self
}
}
pub struct SubmissionQueueWriter<'sq_borrow> {
sq: &'sq_borrow mut io_uring::squeue::SubmissionQueue<'sq_borrow>,
}
impl<'sq_borrow> SubmissionQueueWriter<'sq_borrow> {
pub(crate) fn new(sq: &'sq_borrow mut io_uring::squeue::SubmissionQueue<'sq_borrow>) -> Self {
Self { sq }
}
pub fn push(&mut self, entry: &io_uring::squeue::Entry) -> Result<(), String> {
unsafe { self.sq.push(entry) }
.map(|_| ())
.map_err(|e| format!("SQ push error: {:?}", e))
}
pub fn is_full(&self) -> bool {
unsafe { self.sq.is_full() }
}
}
pub struct UringWorkerInterface<'cfg_life> {
pub fd: RawFd,
pub worker_io_config: &'cfg_life WorkerIoConfig,
pub buffer_manager: Option<&'cfg_life super::buffer_manager::BufferRingManager>,
pub default_bgid_for_handler_use: Option<u16>,
pub current_external_op_ud: UserData,
}
impl<'cfg_life> UringWorkerInterface<'cfg_life> {
pub(crate) fn new(
fd: RawFd,
worker_io_config: &'cfg_life WorkerIoConfig,
buffer_manager: Option<&'cfg_life super::buffer_manager::BufferRingManager>,
default_bgid_for_handler_use: Option<u16>,
current_external_op_ud: UserData,
) -> Self {
Self {
fd,
worker_io_config,
buffer_manager,
default_bgid_for_handler_use,
current_external_op_ud,
}
}
pub fn fd(&self) -> RawFd {
self.fd
}
pub fn default_buffer_group_id(&self) -> Option<u16> {
self.default_bgid_for_handler_use
}
}
pub trait UringConnectionHandler: Send {
fn fd(&self) -> RawFd;
fn is_closing_or_closed(&self) -> bool;
fn connection_ready(&mut self, interface: &UringWorkerInterface<'_>) -> HandlerIoOps;
fn process_ring_read_data(
&mut self,
buffer_slice: &[u8],
buffer_id: u16, interface: &UringWorkerInterface<'_>,
) -> HandlerIoOps;
fn handle_internal_sqe_completion(
&mut self,
sqe_user_data: UserData, cqe_result: i32,
cqe_flags: u32,
interface: &UringWorkerInterface<'_>,
) -> HandlerIoOps;
fn prepare_sqes(&mut self, interface: &UringWorkerInterface<'_>) -> HandlerIoOps;
fn handle_outgoing_app_data(
&mut self,
data: Arc<dyn Any + Send + Sync>,
interface: &UringWorkerInterface<'_>,
) -> HandlerIoOps;
fn close_initiated(&mut self, interface: &UringWorkerInterface<'_>) -> HandlerIoOps;
fn fd_has_been_closed(&mut self);
fn delegate_cqe_to_multishot_reader(
&mut self,
cqe: &io_uring::cqueue::Entry,
buffer_manager: &BufferRingManager,
worker_interface: &UringWorkerInterface<'_>,
internal_op_tracker: &mut InternalOpTracker,
) -> Option<Result<(HandlerIoOps, bool), ZmqError>>;
fn inform_multishot_reader_op_submitted(
&mut self,
op_user_data: UserData,
is_cancel_op: bool,
target_op_data_if_cancel: Option<UserData>,
);
}
pub trait ProtocolHandlerFactory: Send + Sync + 'static {
fn id(&self) -> &'static str;
fn create_handler(
&self,
fd: RawFd,
worker_io_config: Arc<WorkerIoConfig>,
protocol_config: &ProtocolConfig, is_server: bool,
) -> Result<Box<dyn UringConnectionHandler + Send>, String>;
}
#[derive(Debug, Clone)] pub enum HandlerUpstreamEvent {
Data(Msg),
HandshakeComplete {
peer_identity: Option<Blob>,
},
Error(ZmqError),
}
#[derive(Clone)]
pub struct WorkerIoConfig {
pub upstream_event_tx: UpstreamEventSender<(RawFd, HandlerUpstreamEvent)>,
}