#![cfg(target_os = "linux")]
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
#[derive(Debug)]
pub(crate) struct TcpCorkInfoX {
fd: RawFd,
is_corked: bool,
expecting_first_frame_of_new_zmq_message: bool,
}
impl TcpCorkInfoX {
pub(crate) fn new<S: AsRawFd>(stream: &S) -> Option<Self> {
let fd = stream.as_raw_fd();
if fd < 0 {
tracing::warn!(
"TcpCorkInfoX: Received invalid RawFd (<0) from stream, corking will be disabled."
);
return None;
}
Some(Self {
fd,
is_corked: false,
expecting_first_frame_of_new_zmq_message: true,
})
}
pub(crate) fn fd(&self) -> RawFd {
self.fd
}
pub(crate) fn is_corked(&self) -> bool {
self.is_corked
}
pub(crate) fn is_expecting_first_frame(&self) -> bool {
self.expecting_first_frame_of_new_zmq_message
}
pub(crate) fn set_expecting_first_frame(&mut self, expecting: bool) {
self.expecting_first_frame_of_new_zmq_message = expecting;
}
pub(crate) async fn apply_cork_state(&mut self, enable: bool, actor_handle: usize) {
if self.is_corked == enable {
return;
}
let fd_to_cork = self.fd; tracing::trace!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
"Attempting to set TCP_CORK"
);
let optval: libc::c_int = if enable { 1 } else { 0 };
let res = unsafe {
libc::setsockopt(
fd_to_cork,
libc::IPPROTO_TCP,
libc::TCP_CORK,
&optval as *const _ as *const libc::c_void,
std::mem::size_of_val(&optval) as libc::socklen_t,
)
};
if res == 0 {
tracing::debug!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
"TCP_CORK successfully {}set",
if enable { "" } else { "un" }
);
self.is_corked = enable;
} else {
let e = std::io::Error::last_os_error();
tracing::warn!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
error = %e, "Failed to set TCP_CORK socket option"
);
}
}
}
pub(crate) fn try_create_cork_info<S: AsRawFd>(
stream_option: Option<&S>,
use_cork_config: bool,
) -> Option<TcpCorkInfoX> {
if !use_cork_config {
return None;
}
stream_option.and_then(TcpCorkInfoX::new)
}