ntex_mqtt/v5/
handshake.rs

1use ntex_io::IoBoxed;
2use std::{fmt, num::NonZeroU16, rc::Rc};
3
4use super::{codec, shared::MqttShared, sink::MqttSink};
5
6/// Handshake message
7pub struct Handshake {
8    io: IoBoxed,
9    pkt: Box<codec::Connect>,
10    size: u32,
11    pub(super) shared: Rc<MqttShared>,
12}
13
14impl Handshake {
15    pub(crate) fn new(
16        pkt: Box<codec::Connect>,
17        size: u32,
18        io: IoBoxed,
19        shared: Rc<MqttShared>,
20    ) -> Self {
21        Self { io, pkt, size, shared }
22    }
23
24    #[inline]
25    pub fn packet(&self) -> &codec::Connect {
26        &self.pkt
27    }
28
29    #[inline]
30    pub fn packet_mut(&mut self) -> &mut codec::Connect {
31        &mut self.pkt
32    }
33
34    #[inline]
35    pub fn packet_size(&self) -> u32 {
36        self.size
37    }
38
39    #[inline]
40    pub fn io(&self) -> &IoBoxed {
41        &self.io
42    }
43
44    #[inline]
45    /// Returns mqtt server sink
46    pub fn sink(&self) -> MqttSink {
47        MqttSink::new(self.shared.clone())
48    }
49
50    #[inline]
51    /// Ack handshake message and set state
52    pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
53        let max_pkt_size = self.shared.codec.max_inbound_size();
54        let receive_max = self.shared.receive_max();
55        let packet = codec::ConnectAck {
56            reason_code: codec::ConnectAckReason::Success,
57            max_qos: self.shared.max_qos(),
58            topic_alias_max: self.shared.topic_alias_max(),
59            receive_max: NonZeroU16::new(receive_max).unwrap_or(crate::v5::RECEIVE_MAX_DEFAULT),
60            max_packet_size: if max_pkt_size == 0 {
61                None
62            } else {
63                Some(max_pkt_size)
64            },
65            ..codec::ConnectAck::default()
66        };
67
68        let Handshake { io, shared, pkt, .. } = self;
69        // [MQTT-3.1.2-22]
70        let keepalive = if pkt.keep_alive != 0 {
71            (pkt.keep_alive >> 1).saturating_add(pkt.keep_alive)
72        } else {
73            30
74        };
75        HandshakeAck { io, shared, keepalive, packet, session: Some(st) }
76    }
77
78    #[inline]
79    /// Create handshake ack object with error
80    pub fn failed<St>(self, reason_code: codec::ConnectAckReason) -> HandshakeAck<St> {
81        HandshakeAck {
82            io: self.io,
83            shared: self.shared,
84            session: None,
85            keepalive: 30,
86            packet: codec::ConnectAck { reason_code, ..codec::ConnectAck::default() },
87        }
88    }
89
90    #[inline]
91    /// Create handshake ack object with provided ConnectAck packet
92    pub fn fail_with<St>(self, ack: codec::ConnectAck) -> HandshakeAck<St> {
93        HandshakeAck {
94            io: self.io,
95            shared: self.shared,
96            session: None,
97            packet: ack,
98            keepalive: 30,
99        }
100    }
101}
102
103impl fmt::Debug for Handshake {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        self.pkt.fmt(f)
106    }
107}
108
109/// Handshake ack message
110pub struct HandshakeAck<St> {
111    pub(crate) io: IoBoxed,
112    pub(crate) session: Option<St>,
113    pub(crate) shared: Rc<MqttShared>,
114    pub(crate) packet: codec::ConnectAck,
115    pub(crate) keepalive: u16,
116}
117
118impl<St> HandshakeAck<St> {
119    #[inline]
120    /// Set idle keep-alive for the connection in seconds.
121    /// This method sets `server_keepalive_sec` property for `ConnectAck`
122    /// response packet.
123    ///
124    /// By default idle keep-alive is set to 30 seconds. Panics if timeout is `0`.
125    pub fn keep_alive(mut self, timeout: u16) -> Self {
126        if timeout == 0 {
127            panic!("Timeout must be greater than 0")
128        }
129        self.keepalive = timeout;
130        self
131    }
132
133    /// Access to ConnectAck packet
134    #[inline]
135    pub fn with(mut self, f: impl FnOnce(&mut codec::ConnectAck)) -> Self {
136        f(&mut self.packet);
137        self
138    }
139}