ntex_mqtt/v5/
handshake.rs1use ntex_io::IoBoxed;
2use std::{fmt, num::NonZeroU16, rc::Rc};
3
4use super::{codec, shared::MqttShared, sink::MqttSink};
5
6pub struct Handshake {
8 io: IoBoxed,
9 pkt: Box<codec::Connect>,
10 size: u32,
11 pub(super) shared: Rc<MqttShared>,
12}
13
14impl Handshake {
15 pub(crate) fn new(
16 pkt: Box<codec::Connect>,
17 size: u32,
18 io: IoBoxed,
19 shared: Rc<MqttShared>,
20 ) -> Self {
21 Self { io, pkt, size, shared }
22 }
23
24 #[inline]
25 pub fn packet(&self) -> &codec::Connect {
26 &self.pkt
27 }
28
29 #[inline]
30 pub fn packet_mut(&mut self) -> &mut codec::Connect {
31 &mut self.pkt
32 }
33
34 #[inline]
35 pub fn packet_size(&self) -> u32 {
36 self.size
37 }
38
39 #[inline]
40 pub fn io(&self) -> &IoBoxed {
41 &self.io
42 }
43
44 #[inline]
45 pub fn sink(&self) -> MqttSink {
47 MqttSink::new(self.shared.clone())
48 }
49
50 #[inline]
51 pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
53 let max_pkt_size = self.shared.codec.max_inbound_size();
54 let receive_max = self.shared.receive_max();
55 let packet = codec::ConnectAck {
56 reason_code: codec::ConnectAckReason::Success,
57 max_qos: self.shared.max_qos(),
58 topic_alias_max: self.shared.topic_alias_max(),
59 receive_max: NonZeroU16::new(receive_max).unwrap_or(crate::v5::RECEIVE_MAX_DEFAULT),
60 max_packet_size: if max_pkt_size == 0 { None } else { Some(max_pkt_size) },
61 ..codec::ConnectAck::default()
62 };
63
64 let Handshake { io, shared, pkt, .. } = self;
65 let keepalive = if pkt.keep_alive != 0 {
67 (pkt.keep_alive >> 1).checked_add(pkt.keep_alive).unwrap_or(u16::MAX)
68 } else {
69 30
70 };
71 HandshakeAck { io, shared, keepalive, packet, session: Some(st) }
72 }
73
74 #[inline]
75 pub fn failed<St>(self, reason_code: codec::ConnectAckReason) -> HandshakeAck<St> {
77 HandshakeAck {
78 io: self.io,
79 shared: self.shared,
80 session: None,
81 keepalive: 30,
82 packet: codec::ConnectAck { reason_code, ..codec::ConnectAck::default() },
83 }
84 }
85
86 #[inline]
87 pub fn fail_with<St>(self, ack: codec::ConnectAck) -> HandshakeAck<St> {
89 HandshakeAck {
90 io: self.io,
91 shared: self.shared,
92 session: None,
93 packet: ack,
94 keepalive: 30,
95 }
96 }
97}
98
99impl fmt::Debug for Handshake {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 self.pkt.fmt(f)
102 }
103}
104
105pub struct HandshakeAck<St> {
107 pub(crate) io: IoBoxed,
108 pub(crate) session: Option<St>,
109 pub(crate) shared: Rc<MqttShared>,
110 pub(crate) packet: codec::ConnectAck,
111 pub(crate) keepalive: u16,
112}
113
114impl<St> HandshakeAck<St> {
115 #[inline]
116 pub fn keep_alive(mut self, timeout: u16) -> Self {
122 if timeout == 0 {
123 panic!("Timeout must be greater than 0")
124 }
125 self.keepalive = timeout;
126 self
127 }
128
129 #[inline]
131 pub fn with(mut self, f: impl FnOnce(&mut codec::ConnectAck)) -> Self {
132 f(&mut self.packet);
133 self
134 }
135}