#![cfg(feature = "io-uring")]
use crate::io_uring_backend::ops::{UringOpCompletion, UringOpRequest, UserData};
use crate::ZmqError;
use super::socket_addr_to_sockaddr_storage;
use std::mem;
use std::net::SocketAddr;
use std::os::unix::io::RawFd;
use io_uring::{opcode, squeue, types};
pub(crate) fn build_sqe_for_external_request(
request: &UringOpRequest,
out_connect_fd: &mut Option<RawFd>,
_ring_features_has_ext_arg: bool, ) -> Result<Option<squeue::Entry>, UringOpCompletion> {
let user_data = request.get_user_data_ref();
match request {
UringOpRequest::Nop { .. } => {
Ok(Some(opcode::Nop::new().build().user_data(user_data)))
}
UringOpRequest::Connect { target_addr, .. } => {
let socket_fd = match target_addr {
SocketAddr::V4(_) => unsafe { libc::socket(libc::AF_INET, libc::SOCK_STREAM | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC, 0) },
SocketAddr::V6(_) => unsafe { libc::socket(libc::AF_INET6, libc::SOCK_STREAM | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC, 0) },
};
if socket_fd < 0 {
let e = std::io::Error::last_os_error();
tracing::error!("Connect: Failed to create socket: {}", e);
return Err(UringOpCompletion::OpError {
user_data,
op_name: "ConnectSocketCreate".to_string(),
error: ZmqError::from(e),
});
}
*out_connect_fd = Some(socket_fd);
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let addr_len = socket_addr_to_sockaddr_storage(target_addr, &mut storage);
if addr_len == 0 {
unsafe { libc::close(socket_fd); } *out_connect_fd = None; tracing::error!("Connect: Address conversion failed for target_addr: {:?}", target_addr);
return Err(UringOpCompletion::OpError {
user_data,
op_name: "ConnectAddrConversion".to_string(),
error: ZmqError::InvalidArgument(format!("Address conversion failed for {:?}", target_addr)),
});
}
Ok(Some(
opcode::Connect::new(types::Fd(socket_fd), &storage as *const _ as *const libc::sockaddr, addr_len)
.build()
.user_data(user_data), ))
}
UringOpRequest::RegisterRawBuffers { .. } => {
tracing::trace!("build_sqe_for_external_request: RegisterRawBuffers is handled directly by worker, not as an SQE from here.");
Ok(None)
}
UringOpRequest::InitializeBufferRing { .. } |
UringOpRequest::Listen { .. } | UringOpRequest::RegisterExternalFd { .. } |
UringOpRequest::StartFdReadLoop { .. } |
UringOpRequest::ShutdownConnectionHandler { .. } => {
tracing::trace!(
"build_sqe_for_external_request: Op '{}' does not produce a direct SQE from this function.",
request.op_name_str()
);
Ok(None) }
}
}