ntex-mqtt 8.0.0

Client and Server framework for MQTT v5 and v3.1.1 protocols
Documentation
use ntex_io::IoBoxed;
use std::{fmt, num::NonZeroU16, rc::Rc};

use super::{codec, shared::MqttShared, sink::MqttSink};

/// Handshake message
pub struct Handshake {
    io: IoBoxed,
    pkt: Box<codec::Connect>,
    size: u32,
    pub(super) shared: Rc<MqttShared>,
}

impl Handshake {
    pub(crate) fn new(
        pkt: Box<codec::Connect>,
        size: u32,
        io: IoBoxed,
        shared: Rc<MqttShared>,
    ) -> Self {
        Self { io, pkt, size, shared }
    }

    #[inline]
    pub fn packet(&self) -> &codec::Connect {
        &self.pkt
    }

    #[inline]
    pub fn packet_mut(&mut self) -> &mut codec::Connect {
        &mut self.pkt
    }

    #[inline]
    pub fn packet_size(&self) -> u32 {
        self.size
    }

    #[inline]
    pub fn io(&self) -> &IoBoxed {
        &self.io
    }

    #[inline]
    /// Returns mqtt server sink
    pub fn sink(&self) -> MqttSink {
        MqttSink::new(self.shared.clone())
    }

    #[inline]
    /// Ack handshake message and set state
    pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
        let max_pkt_size = self.shared.codec.max_inbound_size();
        let receive_max = self.shared.receive_max();
        let packet = codec::ConnectAck {
            reason_code: codec::ConnectAckReason::Success,
            max_qos: self.shared.max_qos(),
            topic_alias_max: self.shared.topic_alias_max(),
            receive_max: NonZeroU16::new(receive_max).unwrap_or(crate::v5::RECEIVE_MAX_DEFAULT),
            max_packet_size: if max_pkt_size == 0 {
                None
            } else {
                Some(max_pkt_size)
            },
            ..codec::ConnectAck::default()
        };

        let Handshake { io, shared, pkt, .. } = self;
        // [MQTT-3.1.2-22]
        let keepalive = if pkt.keep_alive != 0 {
            (pkt.keep_alive >> 1).saturating_add(pkt.keep_alive)
        } else {
            30
        };
        HandshakeAck { io, shared, keepalive, packet, session: Some(st), max_send: None }
    }

    #[inline]
    /// Create handshake ack object with error
    pub fn failed<St>(self, reason_code: codec::ConnectAckReason) -> HandshakeAck<St> {
        HandshakeAck {
            io: self.io,
            shared: self.shared,
            session: None,
            keepalive: 30,
            max_send: None,
            packet: codec::ConnectAck { reason_code, ..codec::ConnectAck::default() },
        }
    }

    #[inline]
    /// Create handshake ack object with provided `ConnectAck` packet
    pub fn fail_with<St>(self, ack: codec::ConnectAck) -> HandshakeAck<St> {
        HandshakeAck {
            io: self.io,
            shared: self.shared,
            session: None,
            packet: ack,
            max_send: None,
            keepalive: 30,
        }
    }
}

impl fmt::Debug for Handshake {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.pkt.fmt(f)
    }
}

/// Handshake ack message
pub struct HandshakeAck<St> {
    pub(crate) io: IoBoxed,
    pub(crate) session: Option<St>,
    pub(crate) shared: Rc<MqttShared>,
    pub(crate) packet: codec::ConnectAck,
    pub(crate) keepalive: u16,
    pub(crate) max_send: Option<u16>,
}

impl<St> fmt::Debug for HandshakeAck<St> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("HandshakeAck")
            .field("packet", &self.packet)
            .field("keepalive", &self.keepalive)
            .field("max_send", &self.max_send)
            .finish()
    }
}

impl<St> HandshakeAck<St> {
    #[inline]
    #[must_use]
    /// Set idle keep-alive for the connection in seconds.
    /// This method sets `server_keepalive_sec` property for `ConnectAck`
    /// response packet.
    ///
    /// By default idle keep-alive is set to 30 seconds.
    ///
    /// # Panics
    ///
    /// Panics if timeout is `0`.
    pub fn keep_alive(mut self, timeout: u16) -> Self {
        assert!(timeout != 0, "Timeout must be greater than 0");
        self.keepalive = timeout;
        self
    }

    #[must_use]
    /// Number of outgoing concurrent messages.
    ///
    /// By default outgoing is set to 16 messages
    pub fn max_send(mut self, val: Option<u16>) -> Self {
        if val == Some(0) {
            self.max_send = None;
        } else {
            self.max_send = val;
        }
        self
    }

    #[inline]
    #[must_use]
    /// Access to `ConnectAck` packet
    pub fn with(mut self, f: impl FnOnce(&mut codec::ConnectAck)) -> Self {
        f(&mut self.packet);
        self
    }
}

#[cfg(test)]
mod tests {
    use std::rc::Rc;

    use ntex_io::{Io, IoBoxed, testing::IoTest};
    use ntex_service::cfg::SharedCfg;

    use super::*;
    use crate::v5::shared::MqttShared;

    #[ntex::test]
    async fn test_debug() {
        let io = Io::new(IoTest::create().0, SharedCfg::new("test"));
        let codec_v5 = codec::Codec::new();
        let shared = Rc::new(MqttShared::new(io.get_ref(), codec_v5, Rc::default()));
        let connect = Box::new(codec::Connect::default());
        let h = Handshake::new(connect, 0, IoBoxed::from(io), shared);

        // Handshake delegates to the Connect packet
        let dbg = format!("{h:?}");
        assert!(!dbg.is_empty());

        // HandshakeAck
        let ack = h.ack(42u32);
        let dbg = format!("{ack:?}");
        assert!(dbg.contains("HandshakeAck"));
        assert!(dbg.contains("keepalive"));
    }
}