1use std::fmt;
2use std::ops::Deref;
3use std::time::Duration;
4
5use actix_ioframe as ioframe;
6use mqtt_codec as mqtt;
7
8use crate::sink::MqttSink;
9
10pub struct Connect<Io> {
12 connect: mqtt::Connect,
13 sink: MqttSink,
14 keep_alive: Duration,
15 inflight: usize,
16 io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
17}
18
19impl<Io> Connect<Io> {
20 pub(crate) fn new(
21 connect: mqtt::Connect,
22 io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
23 sink: MqttSink,
24 inflight: usize,
25 ) -> Self {
26 Self {
27 keep_alive: Duration::from_secs(connect.keep_alive as u64),
28 connect,
29 io,
30 sink,
31 inflight,
32 }
33 }
34
35 pub fn get_ref(&self) -> &Io {
37 self.io.get_ref()
38 }
39
40 pub fn get_mut(&mut self) -> &mut Io {
42 self.io.get_mut()
43 }
44
45 pub fn sink(&self) -> &MqttSink {
47 &self.sink
48 }
49
50 pub fn ack<St>(self, st: St, session_present: bool) -> ConnectAck<Io, St> {
52 ConnectAck::new(self.io, st, session_present, self.keep_alive, self.inflight)
53 }
54
55 pub fn identifier_rejected<St>(self) -> ConnectAck<Io, St> {
57 ConnectAck {
58 io: self.io,
59 session: None,
60 session_present: false,
61 return_code: mqtt::ConnectCode::IdentifierRejected,
62 keep_alive: Duration::from_secs(5),
63 inflight: 15,
64 }
65 }
66
67 pub fn bad_username_or_pwd<St>(self) -> ConnectAck<Io, St> {
69 ConnectAck {
70 io: self.io,
71 session: None,
72 session_present: false,
73 return_code: mqtt::ConnectCode::BadUserNameOrPassword,
74 keep_alive: Duration::from_secs(5),
75 inflight: 15,
76 }
77 }
78
79 pub fn not_authorized<St>(self) -> ConnectAck<Io, St> {
81 ConnectAck {
82 io: self.io,
83 session: None,
84 session_present: false,
85 return_code: mqtt::ConnectCode::NotAuthorized,
86 keep_alive: Duration::from_secs(5),
87 inflight: 15,
88 }
89 }
90}
91
92impl<Io> Deref for Connect<Io> {
93 type Target = mqtt::Connect;
94
95 fn deref(&self) -> &Self::Target {
96 &self.connect
97 }
98}
99
100impl<T> fmt::Debug for Connect<T> {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 self.connect.fmt(f)
103 }
104}
105
106pub struct ConnectAck<Io, St> {
108 pub(crate) io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
109 pub(crate) session: Option<St>,
110 pub(crate) session_present: bool,
111 pub(crate) return_code: mqtt::ConnectCode,
112 pub(crate) keep_alive: Duration,
113 pub(crate) inflight: usize,
114}
115
116impl<Io, St> ConnectAck<Io, St> {
117 pub(crate) fn new(
119 io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
120 session: St,
121 session_present: bool,
122 keep_alive: Duration,
123 inflight: usize,
124 ) -> Self {
125 Self {
126 io,
127 session_present,
128 keep_alive,
129 inflight,
130 session: Some(session),
131 return_code: mqtt::ConnectCode::ConnectionAccepted,
132 }
133 }
134
135 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
139 self.keep_alive = timeout;
140 self
141 }
142
143 pub fn in_flight(mut self, in_flight: usize) -> Self {
147 self.inflight = in_flight;
148 self
149 }
150}