1use log::{error, info, trace};
2use mio::net::TcpStream;
3
4use std::{io, mem, net, thread, time};
5
6use crate::v5::{self, Config};
7use crate::{Blob, ClientID, PacketID, Packetize, QPacket, QoS, QueueStatus, SLEEP_10MS};
8use crate::{Error, ErrorKind, ReasonCode, Result};
9
10pub type QueuePkt = QueueStatus<QPacket>;
11
12#[derive(Clone, Eq, PartialEq)]
14pub struct Protocol {
15 client_id: ClientID,
16 shard_id: u32,
17 raddr: net::SocketAddr,
18 config: Config,
19 connect: v5::Connect,
20}
21
22impl From<Config> for Protocol {
23 fn from(config: Config) -> Protocol {
24 Protocol {
25 client_id: ClientID::default(),
26 shard_id: u32::default(),
27 raddr: "0.0.0.0:0".parse().unwrap(),
28 config,
29 connect: v5::Connect::default(),
30 }
31 }
32}
33
34impl Protocol {
35 pub fn is_listen(&self) -> bool {
36 self.config.mqtt_listener
37 }
38
39 pub fn to_listen_address(&self) -> net::SocketAddr {
40 let port = self.config.mqtt_port;
41 net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::new(0, 0, 0, 0)), port)
42 }
43
44 pub fn to_listen_port(&self) -> u16 {
45 self.config.mqtt_port
46 }
47
48 #[inline]
49 pub fn maximum_qos(&self) -> QoS {
50 QoS::try_from(self.config.mqtt_maximum_qos).unwrap()
51 }
52
53 #[inline]
54 pub fn retain_available(&self) -> bool {
55 self.config.mqtt_retain_available
56 }
57
58 #[inline]
59 pub fn max_packet_size(&self) -> u32 {
60 self.config.mqtt_max_packet_size
61 }
62
63 #[inline]
64 pub fn keep_alive(&self) -> Option<u16> {
65 self.config.keep_alive()
66 }
67
68 #[inline]
69 pub fn keep_alive_factor(&self) -> f32 {
70 self.config.keep_alive_factor()
71 }
72
73 #[inline]
74 pub fn topic_alias_max(&self) -> Option<u16> {
75 self.config.topic_alias_max()
76 }
77}
78
79impl Protocol {
80 pub fn handshake(&self, prefix: &str, mut conn: TcpStream) -> Result<Socket> {
81 use crate::v5::MQTTRead;
82
83 let (raddr, laddr) = (conn.peer_addr().unwrap(), conn.local_addr().unwrap());
84 info!("{} raddr:{} laddr:{} new connection", prefix, raddr, laddr);
85
86 let deadline = {
87 let timeout = u64::from(self.config.mqtt_connect_timeout / 2);
88 time::Instant::now() + time::Duration::from_secs(timeout)
89 };
90
91 let mut packetr = MQTTRead::new(self.config.mqtt_max_packet_size);
92 loop {
93 packetr = match packetr.read(&mut conn) {
94 Ok((packetr, _would_block)) => packetr,
95 Err(err) if err.kind() == ErrorKind::MalformedPacket => {
96 error!("{}, fail read, err:{}", prefix, err);
97 self.send_connack(prefix, err.code(), conn)?;
98 break Err(err);
99 }
100 Err(err) if err.kind() == ErrorKind::ProtocolError => {
101 error!("{}, fail read, err:{}", prefix, err);
102 self.send_connack(prefix, err.code(), conn)?;
103 break Err(err);
104 }
105 Err(err) => unreachable!("unexpected error {}", err),
106 };
107
108 match &packetr {
109 MQTTRead::Init { .. } if time::Instant::now() < deadline => {
110 thread::sleep(SLEEP_10MS);
111 continue;
112 }
113 MQTTRead::Header { .. } if time::Instant::now() < deadline => {
114 thread::sleep(SLEEP_10MS);
115 continue;
116 }
117 MQTTRead::Remain { .. } if time::Instant::now() < deadline => {
118 thread::sleep(SLEEP_10MS);
119 continue;
120 }
121 MQTTRead::Init { .. }
122 | MQTTRead::Header { .. }
123 | MQTTRead::Remain { .. } => {
124 let code = ReasonCode::UnspecifiedError;
125 self.send_connack(prefix, code, conn)?;
126
127 break err!(
128 InvalidInput,
129 code: UnspecifiedError,
130 "{} deadline:{:?} fail handshake connect rx",
131 prefix,
132 deadline
133 );
134 }
135 MQTTRead::Fin { .. } => (),
136 MQTTRead::None => unreachable!(),
137 }
138
139 match packetr.parse() {
140 Ok(v5::Packet::Connect(connect)) => {
141 if let Err(err) = connect.validate() {
142 error!("{}, invalid connect-packet err:{}", prefix, err);
143 self.send_connack(prefix, err.code(), conn)?;
144 break Err(err);
145 } else {
146 break self.new_socket(conn, connect);
147 }
148 }
149 Ok(pkt) => {
150 let code = ReasonCode::ProtocolError;
151 self.send_connack(prefix, code, conn)?;
152
153 break err!(
154 ProtocolError,
155 code: ProtocolError,
156 "{} packet:{} unexpected in connection",
157 prefix,
158 pkt
159 );
160 }
161 Err(err) => {
162 error!("{}, invalid packet parse err:{}", prefix, err);
163 self.send_connack(prefix, err.code(), conn)?;
164 break Err(err);
165 }
166 }
167 }
168 }
169
170 fn send_connack(&self, pr: &str, rc: ReasonCode, mut conn: TcpStream) -> Result<()> {
171 use crate::v5::ConnAckReasonCode;
172
173 let raddr = conn.peer_addr().unwrap();
174 let max_size = self.config.mqtt_max_packet_size;
175
176 let deadline = {
177 let timeout = u64::from(self.config.mqtt_connect_timeout / 2);
178 time::Instant::now() + time::Duration::from_secs(timeout)
179 };
180
181 let mut packetw = {
182 let code = ConnAckReasonCode::try_from(rc).unwrap();
183 let cack = v5::ConnAck::from_reason_code(code);
184 v5::MQTTWrite::new(cack.encode().unwrap().as_ref(), max_size)
185 };
186
187 loop {
188 let (val, would_block) = match packetw.write(&mut conn) {
189 Ok((packetw, would_block)) => (packetw, would_block),
190 Err(err) => {
191 error!("{} problem writing connack packet err:{}", pr, err);
192 break Err(err);
193 }
194 };
195 packetw = val;
196
197 if would_block && time::Instant::now() < deadline {
198 thread::sleep(SLEEP_10MS);
199 } else if would_block {
200 break err!(
201 Disconnected,
202 desc: "{} deadline:{:?} failed handshake connack tx",
203 pr, deadline
204 );
205 } else {
206 info!("{} raddr:{} CONNACK", pr, raddr);
207 break Ok(());
208 }
209 }
210 }
211
212 fn new_socket(&self, conn: mio::net::TcpStream, cpkt: v5::Connect) -> Result<Socket> {
213 let socket = Socket {
214 client_id: ClientID::from(&cpkt),
215 shard_id: 0,
216 config: self.config.clone(),
217 conn,
218 connect: cpkt,
219 token: mio::Token(0),
220 rd: Source::default(),
221 wt: Sink::default(),
222 };
223
224 Ok(socket)
225 }
226}
227
228impl Protocol {
229 pub fn new_ping_resp(&self, _ping_req: v5::Packet) -> QPacket {
230 QPacket::V5(v5::Packet::PingResp)
231 }
232
233 pub fn new_pub_ack(&self, packet_id: PacketID) -> QPacket {
234 QPacket::V5(v5::Packet::PubAck(v5::Pub::new_pub_ack(packet_id)))
235 }
236
237 pub fn new_sub_ack(&self, sub: &v5::Packet, rcodes: Vec<ReasonCode>) -> QPacket {
238 match sub {
239 v5::Packet::Subscribe(sub) => {
240 let suback = v5::SubAck::from_sub(sub, rcodes);
241 QPacket::V5(v5::Packet::SubAck(suback))
242 }
243 pkt => unreachable!("{}", pkt),
244 }
245 }
246
247 pub fn new_unsub_ack(&self, unsub: &v5::Packet, rcodes: Vec<ReasonCode>) -> QPacket {
248 match unsub {
249 v5::Packet::UnSubscribe(unsub) => {
250 let unsuback = v5::UnsubAck::from_unsub(&unsub, rcodes);
251 QPacket::V5(v5::Packet::UnsubAck(unsuback))
252 }
253 pkt => unreachable!("{}", pkt),
254 }
255 }
256}
257
258pub struct Socket {
260 client_id: ClientID,
261 shard_id: u32,
262 config: Config,
263 conn: mio::net::TcpStream,
264 connect: v5::Connect,
265 token: mio::Token,
266 rd: Source,
267 wt: Sink,
268}
269
270#[derive(Default)]
271struct Source {
272 pr: v5::MQTTRead,
273 timeout: time::Duration,
274 deadline: Option<time::SystemTime>,
275}
276
277#[derive(Default)]
278struct Sink {
279 pw: v5::MQTTWrite,
280 timeout: time::Duration,
281 deadline: Option<time::SystemTime>,
282}
283
284impl mio::event::Source for Socket {
285 fn register(
286 &mut self,
287 registry: &mio::Registry,
288 token: mio::Token,
289 interests: mio::Interest,
290 ) -> io::Result<()> {
291 self.conn.register(registry, token, interests)
292 }
293
294 fn reregister(
295 &mut self,
296 registry: &mio::Registry,
297 token: mio::Token,
298 interests: mio::Interest,
299 ) -> io::Result<()> {
300 self.conn.reregister(registry, token, interests)
301 }
302
303 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
304 self.conn.deregister(registry)
305 }
306}
307
308impl Socket {
309 #[inline]
310 pub fn set_mio_token(&mut self, token: mio::Token) {
311 self.token = token;
312 }
313
314 #[inline]
315 pub fn set_shard_id(&mut self, shard_id: u32) {
316 self.shard_id = shard_id;
317 }
318}
319
320impl Socket {
321 #[inline]
322 pub fn peer_addr(&self) -> net::SocketAddr {
323 self.conn.peer_addr().unwrap()
324 }
325
326 #[inline]
327 pub fn as_client_id(&self) -> &ClientID {
328 &self.client_id
329 }
330
331 #[inline]
332 pub fn to_mio_token(&self) -> mio::Token {
333 self.token
334 }
335
336 #[inline]
337 pub fn to_protocol(&self) -> Protocol {
338 Protocol {
339 client_id: self.client_id.clone(),
340 shard_id: self.shard_id,
341 raddr: self.conn.peer_addr().unwrap(),
342 config: self.config.clone(),
343 connect: self.connect.clone(),
344 }
345 }
346
347 #[inline]
348 pub fn client_keep_alive(&self) -> u16 {
349 self.connect.keep_alive
350 }
351
352 #[inline]
353 pub fn client_receive_maximum(&self) -> u16 {
354 self.connect.receive_maximum()
355 }
356
357 #[inline]
358 pub fn client_session_expiry_interval(&self) -> Option<u32> {
359 self.connect.session_expiry_interval()
360 }
361
362 #[inline]
363 pub fn is_clean_start(&self) -> bool {
364 self.connect.flags.is_clean_start()
365 }
366}
367
368impl Socket {
369 pub fn read_packet(&mut self, prefix: &str) -> Result<QueuePkt> {
373 use crate::v5::MQTTRead::{Fin, Header, Init, Remain};
374
375 let disconnected = QueueStatus::<QPacket>::Disconnected(Vec::new());
376
377 let pr = mem::replace(&mut self.rd.pr, v5::MQTTRead::default());
378 let mut pr = match pr.read(&mut self.conn) {
379 Ok((pr, _would_block)) => pr,
380 Err(err) if err.kind() == ErrorKind::Disconnected => return Ok(disconnected),
381 Err(err) => return Err(err),
382 };
383
384 let status = match &pr {
385 Init { .. } | Header { .. } | Remain { .. } if !self.read_elapsed() => {
386 trace!("{} read retrying", prefix);
387 self.set_read_timeout(true, self.config.mqtt_sock_read_timeout);
388 QueueStatus::Block(Vec::new())
389 }
390 Init { .. } | Header { .. } | Remain { .. } => {
391 error!("{} rd_timeout:{:?} disconnecting", prefix, self.rd.timeout);
392 self.set_read_timeout(false, self.config.mqtt_sock_read_timeout);
393 QueueStatus::Disconnected(Vec::new())
394 }
395 Fin { .. } => {
396 self.set_read_timeout(false, self.config.mqtt_sock_read_timeout);
397 let pkt = pr.parse()?;
398 pr = pr.reset();
399 QueueStatus::Ok(vec![pkt.into()])
400 }
401 v5::MQTTRead::None => unreachable!(),
402 };
403
404 let _none = mem::replace(&mut self.rd.pr, pr);
405 Ok(status)
406 }
407}
408
409impl Socket {
410 pub fn write_packet(&mut self, prefix: &str, blob: Option<Blob>) -> QueuePkt {
412 use crate::v5::MQTTWrite::{Fin, Init, Remain};
413 use std::io::Write;
414
415 let mut pw = match (blob, &self.wt.pw) {
416 (Some(blob), Fin { .. }) => {
417 if let Err(err) = self.conn.flush() {
418 error!("{} fail conn.flush() err:{}", prefix, err);
419 return QueueStatus::Disconnected(Vec::new());
420 }
421
422 let pw = mem::replace(&mut self.wt.pw, v5::MQTTWrite::default());
423 pw.reset(blob.as_ref())
424 }
425 (Some(_blob), _) => unreachable!(),
426 _ => mem::replace(&mut self.wt.pw, v5::MQTTWrite::default()),
427 };
428
429 let write_timeout = self.config.mqtt_sock_write_timeout;
430 let timeout = self.wt.timeout;
431
432 let (res, pw) = loop {
433 pw = match pw.write(&mut self.conn) {
434 Ok((pw, _would_block)) => match &pw {
435 Init { .. } | Remain { .. } if !self.write_elapsed() => {
436 trace!("{} write retrying", prefix);
437 self.set_write_timeout(true, write_timeout);
438 pw
439 }
441 Init { .. } | Remain { .. } => {
442 self.set_write_timeout(false, write_timeout);
443 error!("{} wt_timeout:{:?} disconnecting..", prefix, timeout);
444 break (QueueStatus::Disconnected(Vec::new()), pw);
445 }
446 Fin { .. } => {
447 self.set_write_timeout(false, write_timeout);
448 break (QueueStatus::Ok(Vec::new()), pw);
449 }
450 v5::MQTTWrite::None => unreachable!(),
451 },
452 Err(err) if err.kind() == ErrorKind::Disconnected => {
453 let val = v5::MQTTWrite::default();
454 break (QueueStatus::Disconnected(Vec::new()), val);
455 }
456 Err(err) => unreachable!("unexpected error: {}", err),
457 }
458 };
459
460 let _none = mem::replace(&mut self.wt.pw, pw);
461 res
462 }
463
464 pub fn disconnect(&mut self, prefix: &str, code: ReasonCode) {
465 let blob = {
466 let disconn = v5::Disconnect { code, properties: None };
467 disconn.encode().ok()
468 };
469 self.write_packet(prefix, blob);
470 }
471
472 pub fn new_conn_ack(&self, rcode: ReasonCode) -> QPacket {
473 let val = self.connect.session_expiry_interval();
474 let sei = match (self.config.mqtt_session_expiry_interval, val) {
475 (Some(_one), Some(two)) => Some(two),
476 (Some(one), None) => Some(one),
477 (None, Some(two)) => Some(two),
478 (None, None) => None,
479 };
480
481 let mut props = v5::ConnAckProperties {
482 session_expiry_interval: sei,
483 receive_maximum: Some(self.config.mqtt_receive_maximum),
484 maximum_qos: Some(self.config.mqtt_maximum_qos.try_into().unwrap()),
485 retain_available: Some(self.config.mqtt_retain_available),
486 max_packet_size: Some(self.config.mqtt_max_packet_size),
487 assigned_client_identifier: None,
488 wildcard_subscription_available: Some(true),
489 subscription_identifiers_available: Some(true),
490 shared_subscription_available: None,
491 topic_alias_max: self.config.topic_alias_max(),
492 ..v5::ConnAckProperties::default()
493 };
494
495 if self.connect.payload.client_id.len() == 0 {
496 props.assigned_client_identifier = Some((*self.client_id).clone());
497 }
498
499 if let Some(keep_alive) = self.config.keep_alive() {
500 props.server_keep_alive = Some(keep_alive)
501 }
502
503 let connack = match rcode {
504 ReasonCode::Success => v5::ConnAck::new_success(Some(props)),
505 _ => unreachable!(),
506 };
507
508 QPacket::V5(v5::Packet::ConnAck(connack))
509 }
510}
511
512impl Socket {
513 fn read_elapsed(&self) -> bool {
514 let now = time::SystemTime::now();
515 match &self.rd.deadline {
516 Some(deadline) if &now > deadline => true,
517 Some(_) | None => false,
518 }
519 }
520
521 fn set_read_timeout(&mut self, retry: bool, timeout: Option<u32>) {
522 if let Some(timeout) = timeout {
523 if retry && self.rd.deadline.is_none() {
524 let now = time::SystemTime::now();
525 self.rd.deadline = Some(now + time::Duration::from_secs(timeout as u64));
526 } else if retry == false {
527 self.rd.deadline = None;
528 }
529 }
530 }
531
532 fn write_elapsed(&self) -> bool {
533 let now = time::SystemTime::now();
534 match &self.wt.deadline {
535 Some(deadline) if &now > deadline => true,
536 Some(_) | None => false,
537 }
538 }
539
540 fn set_write_timeout(&mut self, retry: bool, timeout: Option<u32>) {
541 if let Some(timeout) = timeout {
542 if retry && self.wt.deadline.is_none() {
543 let now = time::SystemTime::now();
544 self.wt.deadline = Some(now + time::Duration::from_secs(timeout as u64));
545 } else if retry == false {
546 self.wt.deadline = None;
547 }
548 }
549 }
550}