#![cfg(target_os = "linux")]
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
#[derive(Debug)]
pub(crate) struct TcpCorkInfoX {
fd: RawFd,
is_corked: bool,
expecting_first_frame_of_new_zmq_message: bool,
}
impl TcpCorkInfoX {
pub(crate) fn new<S: AsRawFd>(stream: &S) -> Option<Self> {
let fd = stream.as_raw_fd();
if fd < 0 {
tracing::warn!(
"TcpCorkInfoX: Received invalid RawFd (<0) from stream, corking will be disabled."
);
return None;
}
Some(Self {
fd,
is_corked: false,
expecting_first_frame_of_new_zmq_message: true,
})
}
pub(crate) fn fd(&self) -> RawFd {
self.fd
}
pub(crate) fn is_corked(&self) -> bool {
self.is_corked
}
pub(crate) fn is_expecting_first_frame(&self) -> bool {
self.expecting_first_frame_of_new_zmq_message
}
pub(crate) fn set_expecting_first_frame(&mut self, expecting: bool) {
self.expecting_first_frame_of_new_zmq_message = expecting;
}
pub(crate) async fn apply_cork_state(&mut self, enable: bool, actor_handle: usize) {
if self.is_corked == enable {
return;
}
let fd_to_cork = self.fd; tracing::trace!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
"Attempting to set TCP_CORK"
);
let dup_fd = unsafe { libc::dup(fd_to_cork) };
if dup_fd < 0 {
let e = std::io::Error::last_os_error();
tracing::error!(
sca_handle = actor_handle,
fd = fd_to_cork,
error = %e,
"Failed to duplicate file descriptor for TCP_CORK operation. Aborting cork change."
);
return; }
let res = tokio::task::spawn_blocking(move || {
let socket = unsafe { socket2::Socket::from_raw_fd(dup_fd) };
let result = socket.set_tcp_cork(enable);
result
})
.await;
match res {
Ok(Ok(())) => {
tracing::debug!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
"TCP_CORK successfully {}set",
if enable { "" } else { "un" }
);
self.is_corked = enable;
}
Ok(Err(e)) => {
tracing::warn!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
error = %e, "Failed to set TCP_CORK socket option"
);
}
Err(join_err) => {
tracing::error!(
sca_handle = actor_handle,
fd = fd_to_cork,
cork_enable = enable,
error = %join_err, "Task for setting TCP_CORK panicked"
);
}
}
}
}
pub(crate) fn try_create_cork_info<S: AsRawFd>(
stream_option: Option<&S>,
use_cork_config: bool,
) -> Option<TcpCorkInfoX> {
if !use_cork_config {
return None;
}
stream_option.and_then(TcpCorkInfoX::new)
}