#![allow(dead_code, unused_variables, unused_mut)]
mod data_io;
mod handshake;
mod heartbeat;
use crate::error::ZmqError;
use crate::message::Msg;
use crate::protocol::zmtp::greeting::ZmtpGreeting;
use crate::protocol::zmtp::manual_parser::ZmtpManualParser;
use crate::security::framer::{ISecureFramer, NullFramer};
use crate::security::{Mechanism, NullMechanism};
use crate::socket::options::ZmtpEngineConfig;
use crate::transport::ZmtpStdStream;
use bytes::BytesMut;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use self::heartbeat::ZmtpHeartbeatStateX;
#[cfg(target_os = "linux")]
use super::cork::{TcpCorkInfoX, try_create_cork_info};
use super::types::{HandshakeSubPhaseX, ZmtpHandshakeProgressX};
use heartbeat::ZmtpHandshakeStateX;
pub(crate) struct ZmtpProtocolHandlerX<S: ZmtpStdStream> {
pub(crate) config: Arc<ZmtpEngineConfig>,
pub(crate) is_server: bool,
pub(crate) stream: Option<S>,
pub(crate) network_read_buffer: BytesMut,
pub(crate) handshake_state: ZmtpHandshakeStateX,
pub(crate) security_mechanism: Box<dyn Mechanism>,
pub(crate) pending_peer_greeting: Option<ZmtpGreeting>,
pub(crate) zmtp_manual_parser: ZmtpManualParser, pub(crate) framer: Box<dyn ISecureFramer>,
pub(crate) heartbeat_state: ZmtpHeartbeatStateX,
#[cfg(target_os = "linux")]
pub(crate) cork_info: Option<TcpCorkInfoX>,
#[cfg(not(target_os = "linux"))]
pub(crate) cork_info: Option<()>,
pub(crate) actor_handle: usize,
}
impl<S: ZmtpStdStream + fmt::Debug> fmt::Debug for ZmtpProtocolHandlerX<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("ZmtpProtocolHandlerX");
debug_struct
.field("actor_handle", &self.actor_handle)
.field("config", &self.config)
.field("is_server", &self.is_server)
.field("stream_is_some", &self.stream.is_some());
debug_struct.field("network_read_buffer_len", &self.network_read_buffer.len());
debug_struct.field("framer_active", &"true");
debug_struct
.field("handshake_state", &self.handshake_state)
.field("security_mechanism_name", &self.security_mechanism.name())
.field("pending_peer_greeting", &self.pending_peer_greeting);
debug_struct.field("heartbeat_state", &self.heartbeat_state);
#[cfg(target_os = "linux")]
debug_struct.field("cork_info", &self.cork_info);
#[cfg(not(target_os = "linux"))]
debug_struct.field("cork_info", &"None (Non-Linux)");
debug_struct.field("zmtp_manual_parser", &self.zmtp_manual_parser);
debug_struct.finish_non_exhaustive()
}
}
impl<S: ZmtpStdStream> ZmtpProtocolHandlerX<S> {
pub(crate) fn new(
stream: S, config: Arc<ZmtpEngineConfig>,
is_server: bool,
actor_handle: usize,
) -> Self {
let heartbeat_ivl_from_config = config.heartbeat_ivl;
let effective_timeout_corrected = config.heartbeat_timeout.unwrap_or_else(|| {
heartbeat_ivl_from_config.map_or(Duration::from_secs(30), |ivl| {
ivl.max(Duration::from_millis(100))
})
});
#[cfg(target_os = "linux")]
let cork_info_val = {
let stream_ref_for_cork: Option<&S> = if config.use_cork { Some(&stream) } else { None };
try_create_cork_info(stream_ref_for_cork, config.use_cork)
};
#[cfg(not(target_os = "linux"))]
let cork_info_val: Option<()> = None;
Self {
actor_handle,
config,
is_server,
stream: Some(stream),
network_read_buffer: BytesMut::with_capacity(8192 * 2),
handshake_state: ZmtpHandshakeStateX::new(),
security_mechanism: Box::new(NullMechanism),
pending_peer_greeting: None,
zmtp_manual_parser: ZmtpManualParser::new(),
framer: Box::new(NullFramer::new()),
heartbeat_state: ZmtpHeartbeatStateX::new(
heartbeat_ivl_from_config,
effective_timeout_corrected,
),
cork_info: cork_info_val,
}
}
pub(crate) fn is_handshake_complete(&self) -> bool {
self.handshake_state.sub_phase == HandshakeSubPhaseX::Done
}
pub(crate) async fn advance_handshake(&mut self) -> Result<ZmtpHandshakeProgressX, ZmqError> {
handshake::advance_handshake_step_impl(self).await
}
pub(crate) async fn read_and_parse_data_frame(&mut self) -> Result<Option<Msg>, ZmqError> {
data_io::read_data_frame_impl(self).await
}
pub(crate) async fn write_data_msg(
&mut self,
msg: Msg,
is_first_part_of_logical_zmq_msg: bool,
) -> Result<bool , ZmqError> {
data_io::write_data_msg_impl(self, msg, is_first_part_of_logical_zmq_msg).await
}
pub(crate) async fn write_data_msgs(&mut self, msgs: Vec<Msg>) -> Result<(), ZmqError> {
data_io::write_data_msgs_impl(self, msgs).await
}
pub(crate) fn process_incoming_data_command_frame(
&mut self,
cmd_msg: &Msg,
) -> Result<Option<Msg>, ZmqError> {
heartbeat::process_heartbeat_command_impl(self, cmd_msg)
}
pub(crate) async fn maybe_send_ping(&mut self) -> Result<(), ZmqError> {
heartbeat::try_send_ping_impl(self).await
}
pub(crate) fn has_pong_timed_out(&self) -> bool {
heartbeat::check_pong_timeout_impl(&self.heartbeat_state, Instant::now())
}
pub(crate) async fn initiate_stream_shutdown(&mut self) -> Result<(), ZmqError> {
tracing::debug!(
sca_handle = self.actor_handle,
"ZmtpProtocolHandlerX: Initiating stream shutdown."
);
self.clear_handshake_state();
if let Some(stream_ref) = self.stream.as_mut() {
#[cfg(target_os = "linux")]
{
if let Some(cork_info_ref) = self.cork_info.as_mut() {
if cork_info_ref.is_corked() {
cork_info_ref
.apply_cork_state(false, self.actor_handle)
.await;
}
}
}
match stream_ref.shutdown().await {
Ok(()) => tracing::debug!(
sca_handle = self.actor_handle,
"Stream shutdown() successful."
),
Err(e) => {
tracing::warn!(sca_handle = self.actor_handle, error = %e, "Error during stream shutdown().")
}
}
}
self.stream = None; Ok(())
}
fn clear_handshake_state(&mut self) {
tracing::trace!(
sca_handle = self.actor_handle,
"Clearing handshake state and buffers."
);
self.pending_peer_greeting = None;
self.network_read_buffer.clear();
if self.security_mechanism.name() != NullMechanism::NAME {
self.security_mechanism = Box::new(NullMechanism);
}
self.framer = Box::new(NullFramer::new());
}
}