actix_mqtt/
connect.rs

1use std::fmt;
2use std::ops::Deref;
3use std::time::Duration;
4
5use actix_ioframe as ioframe;
6use mqtt_codec as mqtt;
7
8use crate::sink::MqttSink;
9
10/// Connect message
11pub struct Connect<Io> {
12    connect: mqtt::Connect,
13    sink: MqttSink,
14    keep_alive: Duration,
15    inflight: usize,
16    io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
17}
18
19impl<Io> Connect<Io> {
20    pub(crate) fn new(
21        connect: mqtt::Connect,
22        io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
23        sink: MqttSink,
24        inflight: usize,
25    ) -> Self {
26        Self {
27            keep_alive: Duration::from_secs(connect.keep_alive as u64),
28            connect,
29            io,
30            sink,
31            inflight,
32        }
33    }
34
35    /// Returns reference to io object
36    pub fn get_ref(&self) -> &Io {
37        self.io.get_ref()
38    }
39
40    /// Returns mutable reference to io object
41    pub fn get_mut(&mut self) -> &mut Io {
42        self.io.get_mut()
43    }
44
45    /// Returns mqtt server sink
46    pub fn sink(&self) -> &MqttSink {
47        &self.sink
48    }
49
50    /// Ack connect message and set state
51    pub fn ack<St>(self, st: St, session_present: bool) -> ConnectAck<Io, St> {
52        ConnectAck::new(self.io, st, session_present, self.keep_alive, self.inflight)
53    }
54
55    /// Create connect ack object with `identifier rejected` return code
56    pub fn identifier_rejected<St>(self) -> ConnectAck<Io, St> {
57        ConnectAck {
58            io: self.io,
59            session: None,
60            session_present: false,
61            return_code: mqtt::ConnectCode::IdentifierRejected,
62            keep_alive: Duration::from_secs(5),
63            inflight: 15,
64        }
65    }
66
67    /// Create connect ack object with `bad user name or password` return code
68    pub fn bad_username_or_pwd<St>(self) -> ConnectAck<Io, St> {
69        ConnectAck {
70            io: self.io,
71            session: None,
72            session_present: false,
73            return_code: mqtt::ConnectCode::BadUserNameOrPassword,
74            keep_alive: Duration::from_secs(5),
75            inflight: 15,
76        }
77    }
78
79    /// Create connect ack object with `not authorized` return code
80    pub fn not_authorized<St>(self) -> ConnectAck<Io, St> {
81        ConnectAck {
82            io: self.io,
83            session: None,
84            session_present: false,
85            return_code: mqtt::ConnectCode::NotAuthorized,
86            keep_alive: Duration::from_secs(5),
87            inflight: 15,
88        }
89    }
90}
91
92impl<Io> Deref for Connect<Io> {
93    type Target = mqtt::Connect;
94
95    fn deref(&self) -> &Self::Target {
96        &self.connect
97    }
98}
99
100impl<T> fmt::Debug for Connect<T> {
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        self.connect.fmt(f)
103    }
104}
105
106/// Ack connect message
107pub struct ConnectAck<Io, St> {
108    pub(crate) io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
109    pub(crate) session: Option<St>,
110    pub(crate) session_present: bool,
111    pub(crate) return_code: mqtt::ConnectCode,
112    pub(crate) keep_alive: Duration,
113    pub(crate) inflight: usize,
114}
115
116impl<Io, St> ConnectAck<Io, St> {
117    /// Create connect ack, `session_present` indicates that previous session is presents
118    pub(crate) fn new(
119        io: ioframe::ConnectResult<Io, (), mqtt::Codec>,
120        session: St,
121        session_present: bool,
122        keep_alive: Duration,
123        inflight: usize,
124    ) -> Self {
125        Self {
126            io,
127            session_present,
128            keep_alive,
129            inflight,
130            session: Some(session),
131            return_code: mqtt::ConnectCode::ConnectionAccepted,
132        }
133    }
134
135    /// Set idle time-out for the connection in milliseconds
136    ///
137    /// By default idle time-out is set to 300000 milliseconds
138    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
139        self.keep_alive = timeout;
140        self
141    }
142
143    /// Set in-flight count. Total number of `in-flight` packets
144    ///
145    /// By default in-flight count is set to 15
146    pub fn in_flight(mut self, in_flight: usize) -> Self {
147        self.inflight = in_flight;
148        self
149    }
150}