#![allow(dead_code, unused_variables)]
use super::ZmtpProtocolHandlerX;
use crate::transport::ZmtpStdStream;
use crate::error::ZmqError;
use crate::message::Msg;
use crate::protocol::zmtp::command::ZmtpCommand;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub(crate) struct ZmtpHandshakeStateX {
pub sub_phase: super::HandshakeSubPhaseX,
pub(crate) peer_socket_type: Option<String>,
}
impl ZmtpHandshakeStateX {
pub(crate) fn new() -> Self {
Self {
sub_phase: super::HandshakeSubPhaseX::GreetingExchange,
peer_socket_type: None,
}
}
}
#[derive(Debug)]
pub(crate) struct ZmtpHeartbeatStateX {
pub last_activity_time: Instant,
pub last_ping_sent_time: Option<Instant>,
pub waiting_for_pong: bool,
pub ivl: Option<Duration>,
pub timeout: Duration,
}
impl ZmtpHeartbeatStateX {
pub(crate) fn new(ivl: Option<Duration>, timeout: Duration) -> Self {
Self {
last_activity_time: Instant::now(),
last_ping_sent_time: None,
waiting_for_pong: false,
ivl,
timeout,
}
}
pub(crate) fn record_activity(&mut self) {
self.last_activity_time = Instant::now();
}
pub(crate) fn ping_sent(&mut self) {
self.last_ping_sent_time = Some(Instant::now());
self.waiting_for_pong = true;
self.last_activity_time = Instant::now(); }
pub(crate) fn pong_received(&mut self) {
self.waiting_for_pong = false;
self.last_ping_sent_time = None;
}
pub(crate) fn should_send_ping(&self, now: Instant) -> bool {
if self.waiting_for_pong {
return false;
}
match self.ivl {
Some(interval) => now.duration_since(self.last_activity_time) >= interval,
None => false,
}
}
pub(crate) fn has_pong_timed_out(&self, now: Instant) -> bool {
if !self.waiting_for_pong {
return false;
}
match self.last_ping_sent_time {
Some(ping_sent_at) => now.duration_since(ping_sent_at) >= self.timeout,
None => false, }
}
pub(crate) fn get_pong_deadline(&self) -> Option<Instant> {
if self.waiting_for_pong {
self.last_ping_sent_time.map(|sent_at| sent_at + self.timeout)
} else {
None
}
}
}
pub(crate) fn process_heartbeat_command_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
cmd_msg: &Msg,
) -> Result<Option<Msg>, ZmqError> {
tracing::trace!(
sca_handle = handler.actor_handle,
"Processing incoming command frame in data phase for heartbeat."
);
match ZmtpCommand::parse(cmd_msg) {
Some(ZmtpCommand::Ping(ping_context_payload)) => {
tracing::debug!(
sca_handle = handler.actor_handle,
ping_payload_len = ping_context_payload.len(),
"Received PING, preparing PONG."
);
let pong_reply_msg = ZmtpCommand::create_pong(&ping_context_payload);
Ok(Some(pong_reply_msg))
}
Some(ZmtpCommand::Pong(_pong_context_payload)) => {
tracing::debug!(sca_handle = handler.actor_handle, "Received PONG.");
handler.heartbeat_state.pong_received();
Ok(None)
}
Some(ZmtpCommand::Error) => Err(ZmqError::ProtocolViolation("Received ZMTP ERROR from peer".into())),
Some(other) => {
Ok(None) }
None => Err(ZmqError::ProtocolViolation(
"Unparseable command received in data phase".into(),
)),
}
}
pub(crate) async fn try_send_ping_impl<S: ZmtpStdStream>(
handler: &mut ZmtpProtocolHandlerX<S>,
) -> Result<(), ZmqError> {
if handler.heartbeat_state.should_send_ping(Instant::now()) {
tracing::debug!(
sca_handle = handler.actor_handle,
"Heartbeat: Sending PING due to inactivity."
);
let ping_msg = ZmtpCommand::create_ping(0, b"");
let _was_last_part = super::data_io::write_data_msg_impl(handler, ping_msg, true).await?;
handler.heartbeat_state.ping_sent();
}
Ok(())
}
pub(crate) fn check_pong_timeout_impl(heartbeat_state: &ZmtpHeartbeatStateX, now: Instant) -> bool {
heartbeat_state.has_pong_timed_out(now)
}