ntex_mqtt/v5/
handshake.rs

1use ntex_io::IoBoxed;
2use std::{fmt, num::NonZeroU16, rc::Rc};
3
4use super::{codec, shared::MqttShared, sink::MqttSink};
5
6/// Handshake message
7pub struct Handshake {
8    io: IoBoxed,
9    pkt: Box<codec::Connect>,
10    size: u32,
11    pub(super) shared: Rc<MqttShared>,
12}
13
14impl Handshake {
15    pub(crate) fn new(
16        pkt: Box<codec::Connect>,
17        size: u32,
18        io: IoBoxed,
19        shared: Rc<MqttShared>,
20    ) -> Self {
21        Self { io, pkt, size, shared }
22    }
23
24    #[inline]
25    pub fn packet(&self) -> &codec::Connect {
26        &self.pkt
27    }
28
29    #[inline]
30    pub fn packet_mut(&mut self) -> &mut codec::Connect {
31        &mut self.pkt
32    }
33
34    #[inline]
35    pub fn packet_size(&self) -> u32 {
36        self.size
37    }
38
39    #[inline]
40    pub fn io(&self) -> &IoBoxed {
41        &self.io
42    }
43
44    #[inline]
45    /// Returns mqtt server sink
46    pub fn sink(&self) -> MqttSink {
47        MqttSink::new(self.shared.clone())
48    }
49
50    #[inline]
51    /// Ack handshake message and set state
52    pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
53        let max_pkt_size = self.shared.codec.max_inbound_size();
54        let receive_max = self.shared.receive_max();
55        let packet = codec::ConnectAck {
56            reason_code: codec::ConnectAckReason::Success,
57            max_qos: self.shared.max_qos(),
58            topic_alias_max: self.shared.topic_alias_max(),
59            receive_max: NonZeroU16::new(receive_max).unwrap_or(crate::v5::RECEIVE_MAX_DEFAULT),
60            max_packet_size: if max_pkt_size == 0 { None } else { Some(max_pkt_size) },
61            ..codec::ConnectAck::default()
62        };
63
64        let Handshake { io, shared, pkt, .. } = self;
65        // [MQTT-3.1.2-22]
66        let keepalive = if pkt.keep_alive != 0 {
67            (pkt.keep_alive >> 1).checked_add(pkt.keep_alive).unwrap_or(u16::MAX)
68        } else {
69            30
70        };
71        HandshakeAck { io, shared, keepalive, packet, session: Some(st) }
72    }
73
74    #[inline]
75    /// Create handshake ack object with error
76    pub fn failed<St>(self, reason_code: codec::ConnectAckReason) -> HandshakeAck<St> {
77        HandshakeAck {
78            io: self.io,
79            shared: self.shared,
80            session: None,
81            keepalive: 30,
82            packet: codec::ConnectAck { reason_code, ..codec::ConnectAck::default() },
83        }
84    }
85
86    #[inline]
87    /// Create handshake ack object with provided ConnectAck packet
88    pub fn fail_with<St>(self, ack: codec::ConnectAck) -> HandshakeAck<St> {
89        HandshakeAck {
90            io: self.io,
91            shared: self.shared,
92            session: None,
93            packet: ack,
94            keepalive: 30,
95        }
96    }
97}
98
99impl fmt::Debug for Handshake {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        self.pkt.fmt(f)
102    }
103}
104
105/// Handshake ack message
106pub struct HandshakeAck<St> {
107    pub(crate) io: IoBoxed,
108    pub(crate) session: Option<St>,
109    pub(crate) shared: Rc<MqttShared>,
110    pub(crate) packet: codec::ConnectAck,
111    pub(crate) keepalive: u16,
112}
113
114impl<St> HandshakeAck<St> {
115    #[inline]
116    /// Set idle keep-alive for the connection in seconds.
117    /// This method sets `server_keepalive_sec` property for `ConnectAck`
118    /// response packet.
119    ///
120    /// By default idle keep-alive is set to 30 seconds. Panics if timeout is `0`.
121    pub fn keep_alive(mut self, timeout: u16) -> Self {
122        if timeout == 0 {
123            panic!("Timeout must be greater than 0")
124        }
125        self.keepalive = timeout;
126        self
127    }
128
129    /// Access to ConnectAck packet
130    #[inline]
131    pub fn with(mut self, f: impl FnOnce(&mut codec::ConnectAck)) -> Self {
132        f(&mut self.packet);
133        self
134    }
135}