#![allow(dead_code, unused_variables)]
use super::ZmtpProtocolHandlerX; use crate::error::ZmqError;
use crate::message::Msg;
use crate::protocol::zmtp::ZmtpCodec;
use crate::transport::ZmtpStdStream;
use bytes::{BufMut, BytesMut};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_util::codec::Encoder;
pub(crate) async fn read_data_frame_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
) -> Result<Option<Msg>, ZmqError> {
let stream = handler.stream.as_mut().ok_or_else(|| {
tracing::error!(
sca_handle = handler.actor_handle,
"Stream is None during read_data_frame_impl."
);
ZmqError::Internal("Stream unavailable for data frame reading".into())
})?;
let operation_timeout = handler.config.rcvtimeo.unwrap_or(Duration::from_secs(300));
loop {
let (mut parse_attempt_occurred, parse_result) = {
if handler.data_cipher.is_some() {
if !handler.plaintext_feed_buffer.is_empty() {
(
true,
handler
.zmtp_manual_parser
.decode_from_buffer(&mut handler.plaintext_feed_buffer),
)
} else {
(false, Ok(None)) }
} else {
if !handler.network_read_buffer.is_empty() {
(
true,
handler
.zmtp_manual_parser
.decode_from_buffer(&mut handler.network_read_buffer),
)
} else {
(false, Ok(None)) }
}
};
if parse_attempt_occurred {
match parse_result {
Ok(Some(msg)) => {
handler.heartbeat_state.record_activity();
return Ok(Some(msg));
}
Ok(None) => { }
Err(e) => return Err(e), }
}
let bytes_read = tokio::time::timeout(
operation_timeout,
stream.read_buf(&mut handler.network_read_buffer),
)
.await
.map_err(|_| {
tracing::warn!(
sca_handle = handler.actor_handle,
"Timeout reading data frame (single op)."
);
ZmqError::Timeout
})?
.map_err(|e| ZmqError::from_io_endpoint(e, "data read"))?;
if bytes_read == 0 {
tracing::debug!(
sca_handle = handler.actor_handle,
"Connection closed by peer (EOF in data phase)."
);
return Err(ZmqError::ConnectionClosed);
}
handler.heartbeat_state.record_activity();
tracing::trace!(
sca_handle = handler.actor_handle,
bytes_read,
"Read data from network."
);
if let Some(cipher) = &mut handler.data_cipher {
let mut made_progress_decrypting = false;
while !handler.network_read_buffer.is_empty() {
match cipher.decrypt_wire_data_to_zmtp_frame(&mut handler.network_read_buffer) {
Ok(Some(plaintext_zmtp_bytes)) => {
handler.plaintext_feed_buffer.put(plaintext_zmtp_bytes);
made_progress_decrypting = true;
}
Ok(None) => break, Err(e) => {
tracing::error!(sca_handle = handler.actor_handle, error = %e, "Decryption failed.");
return Err(e);
}
}
}
} else {
}
}
}
pub(crate) async fn write_data_msgs_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
mut msgs: Vec<Msg>, ) -> Result<(), ZmqError> {
write_message_slice_impl(handler, &mut msgs).await
}
pub(crate) async fn write_data_msg_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
mut msg: Msg, _is_first_part_of_logical_zmq_msg: bool,
) -> Result<bool , ZmqError> {
let msg_is_more = msg.is_more();
write_message_slice_impl(handler, &mut [msg]).await?;
Ok(!msg_is_more)
}
async fn write_message_slice_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
msgs: &mut [Msg],
) -> Result<(), ZmqError> {
if msgs.is_empty() {
return Ok(());
}
let stream = handler.stream.as_mut().ok_or_else(|| {
tracing::error!(
sca_handle = handler.actor_handle,
"Stream is None during write_message_slice_impl."
);
ZmqError::Internal("Stream unavailable for writing data message".into())
})?;
let operation_timeout = handler.config.sndtimeo.unwrap_or(Duration::from_secs(300));
let mut plaintext_zmtp_frame_buffer = BytesMut::new();
let mut temp_zmtp_encoder = ZmtpCodec::new();
for msg in msgs {
temp_zmtp_encoder.encode(msg.clone(), &mut plaintext_zmtp_frame_buffer)?;
}
let wire_bytes_to_send = if let Some(cipher) = &mut handler.data_cipher {
cipher.encrypt_zmtp_frame(plaintext_zmtp_frame_buffer.freeze())?
} else {
plaintext_zmtp_frame_buffer.freeze()
};
#[cfg(target_os = "linux")]
{
if let Some(ci) = handler.cork_info.as_mut() {
ci.apply_cork_state(true, handler.actor_handle).await;
}
}
let write_result = tokio::time::timeout(operation_timeout, stream.write_all(&wire_bytes_to_send))
.await
.map_err(|_| ZmqError::Timeout)?
.map_err(|e| ZmqError::from_io_endpoint(e, "data write"));
handler.heartbeat_state.record_activity();
#[cfg(target_os = "linux")]
{
if let Some(ci) = handler.cork_info.as_mut() {
if ci.is_corked() {
ci.apply_cork_state(false, handler.actor_handle).await;
}
ci.set_expecting_first_frame(true);
}
}
write_result
}