ntex_mqtt/v5/
handshake.rs1use ntex_io::IoBoxed;
2use std::{fmt, num::NonZeroU16, rc::Rc};
3
4use super::{codec, shared::MqttShared, sink::MqttSink};
5
6pub struct Handshake {
8 io: IoBoxed,
9 pkt: Box<codec::Connect>,
10 size: u32,
11 pub(super) shared: Rc<MqttShared>,
12}
13
14impl Handshake {
15 pub(crate) fn new(
16 pkt: Box<codec::Connect>,
17 size: u32,
18 io: IoBoxed,
19 shared: Rc<MqttShared>,
20 ) -> Self {
21 Self { io, pkt, size, shared }
22 }
23
24 #[inline]
25 pub fn packet(&self) -> &codec::Connect {
26 &self.pkt
27 }
28
29 #[inline]
30 pub fn packet_mut(&mut self) -> &mut codec::Connect {
31 &mut self.pkt
32 }
33
34 #[inline]
35 pub fn packet_size(&self) -> u32 {
36 self.size
37 }
38
39 #[inline]
40 pub fn io(&self) -> &IoBoxed {
41 &self.io
42 }
43
44 #[inline]
45 pub fn sink(&self) -> MqttSink {
47 MqttSink::new(self.shared.clone())
48 }
49
50 #[inline]
51 pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
53 let max_pkt_size = self.shared.codec.max_inbound_size();
54 let receive_max = self.shared.receive_max();
55 let packet = codec::ConnectAck {
56 reason_code: codec::ConnectAckReason::Success,
57 max_qos: self.shared.max_qos(),
58 topic_alias_max: self.shared.topic_alias_max(),
59 receive_max: NonZeroU16::new(receive_max).unwrap_or(crate::v5::RECEIVE_MAX_DEFAULT),
60 max_packet_size: if max_pkt_size == 0 {
61 None
62 } else {
63 Some(max_pkt_size)
64 },
65 ..codec::ConnectAck::default()
66 };
67
68 let Handshake { io, shared, pkt, .. } = self;
69 let keepalive = if pkt.keep_alive != 0 {
71 (pkt.keep_alive >> 1).saturating_add(pkt.keep_alive)
72 } else {
73 30
74 };
75 HandshakeAck { io, shared, keepalive, packet, session: Some(st) }
76 }
77
78 #[inline]
79 pub fn failed<St>(self, reason_code: codec::ConnectAckReason) -> HandshakeAck<St> {
81 HandshakeAck {
82 io: self.io,
83 shared: self.shared,
84 session: None,
85 keepalive: 30,
86 packet: codec::ConnectAck { reason_code, ..codec::ConnectAck::default() },
87 }
88 }
89
90 #[inline]
91 pub fn fail_with<St>(self, ack: codec::ConnectAck) -> HandshakeAck<St> {
93 HandshakeAck {
94 io: self.io,
95 shared: self.shared,
96 session: None,
97 packet: ack,
98 keepalive: 30,
99 }
100 }
101}
102
103impl fmt::Debug for Handshake {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 self.pkt.fmt(f)
106 }
107}
108
109pub struct HandshakeAck<St> {
111 pub(crate) io: IoBoxed,
112 pub(crate) session: Option<St>,
113 pub(crate) shared: Rc<MqttShared>,
114 pub(crate) packet: codec::ConnectAck,
115 pub(crate) keepalive: u16,
116}
117
118impl<St> HandshakeAck<St> {
119 #[inline]
120 pub fn keep_alive(mut self, timeout: u16) -> Self {
126 if timeout == 0 {
127 panic!("Timeout must be greater than 0")
128 }
129 self.keepalive = timeout;
130 self
131 }
132
133 #[inline]
135 pub fn with(mut self, f: impl FnOnce(&mut codec::ConnectAck)) -> Self {
136 f(&mut self.packet);
137 self
138 }
139}