1#[cfg(any(feature = "fuzzy", test))]
4use arbitrary::Arbitrary;
5use log::{error, trace, warn};
6
7#[cfg(unix)]
8use std::os::unix::io::{FromRawFd, IntoRawFd};
9#[cfg(windows)]
10use std::os::unix::io::{FromRawSocket, IntoRawSocket};
11
12use std::{collections::VecDeque, fmt, io, mem, net, result, time};
13
14use crate::{v5, ClientID, PacketID, Packetize, QoS};
15use crate::{Error, ErrorKind, ReasonCode, Result};
16
17pub const CLIENT_MAX_PACKET_SIZE: u32 = 1024 * 1024;
18
19#[derive(Clone, Copy)]
21pub struct ConnectOptions {
22 pub will_qos: Option<QoS>,
23 pub will_retain: Option<bool>,
24 pub keep_alive: u16,
25}
26
27impl Default for ConnectOptions {
28 fn default() -> ConnectOptions {
29 ConnectOptions { will_qos: None, will_retain: None, keep_alive: 0 }
30 }
31}
32
33pub struct ClientBuilder {
35 pub protocol_version: v5::MqttProtocol,
36 pub client_id: Option<ClientID>,
38 pub connect_timeout: Option<time::Duration>,
41 pub read_timeout: Option<time::Duration>,
44 pub write_timeout: Option<time::Duration>,
47 pub nodelay: Option<bool>,
50 pub ttl: Option<u32>,
53 pub max_packet_size: u32,
55 pub connopts: ConnectOptions,
57 pub connect_properties: Option<v5::ConnectProperties>,
58 pub connect_payload: v5::ConnectPayload,
59}
60
61impl Default for ClientBuilder {
62 fn default() -> ClientBuilder {
63 ClientBuilder {
64 client_id: Some(ClientID::new_uuid_v4()),
65 connect_timeout: None,
66 read_timeout: None,
67 write_timeout: None,
68 nodelay: None,
69 ttl: None,
70 max_packet_size: CLIENT_MAX_PACKET_SIZE,
71 connopts: ConnectOptions::default(),
73 connect_properties: Some(v5::ConnectProperties::default()),
74 connect_payload: v5::ConnectPayload::default(),
75
76 protocol_version: v5::MqttProtocol::V5,
77 }
78 }
79}
80
81impl ClientBuilder {
82 pub fn connect(self, raddr: net::SocketAddr) -> io::Result<Client> {
87 let sock = match self.connect_timeout {
88 Some(timeout) => net::TcpStream::connect_timeout(&raddr, timeout)?,
89 None => net::TcpStream::connect(&raddr)?,
90 };
91 sock.set_read_timeout(self.read_timeout)?;
92 sock.set_write_timeout(self.write_timeout)?;
93 if let Some(nodelay) = self.nodelay {
94 sock.set_nodelay(nodelay)?
95 }
96 if let Some(ttl) = self.ttl {
97 sock.set_ttl(ttl)?
98 }
99
100 let mut client = self.into_client(raddr);
101
102 let (cio, connack) = {
103 let connect = client.to_connect(true );
104 let blocking = true;
105 ClientIO::handshake(&mut client, connect, sock, blocking)?
106 };
107
108 client.cio = cio;
109 client.next_packet_ids = (1..connack.receive_maximum()).collect();
110 client.connack = connack;
111
112 Ok(client)
113 }
114
115 pub fn connect_noblock(self, raddr: net::SocketAddr) -> io::Result<Client> {
122 let sock = net::TcpStream::connect(raddr)?;
123 if let Some(nodelay) = self.nodelay {
124 sock.set_nodelay(nodelay)?
125 }
126 if let Some(ttl) = self.ttl {
127 sock.set_ttl(ttl)?
128 }
129
130 let mut client = self.into_client(raddr);
131
132 let (cio, connack) = {
133 let connect = client.to_connect(true );
134 let blocking = false;
135 ClientIO::handshake(&mut client, connect, sock, blocking)?
136 };
137
138 client.cio = cio;
139 client.next_packet_ids = (1..connack.receive_maximum()).collect();
140 client.connack = connack;
141
142 Ok(client)
143 }
144
145 fn into_client(self, raddr: net::SocketAddr) -> Client {
146 Client {
147 client_id: self.client_id.unwrap_or_else(|| ClientID::new_uuid_v4()),
148 raddr,
149 protocol_version: self.protocol_version,
150 connect_timeout: self.connect_timeout,
151 read_timeout: self.read_timeout,
152 write_timeout: self.write_timeout,
153 nodelay: self.nodelay,
154 ttl: self.ttl,
155 max_packet_size: self.max_packet_size,
156 connopts: self.connopts,
157 connect_properties: self.connect_properties.clone(),
158 connect_payload: self.connect_payload.clone(),
159 connack: v5::ConnAck::default(),
161
162 last_sent: time::Instant::now(),
163 last_rcvd: time::Instant::now(),
164 rd_deadline: None,
165 wt_deadline: None,
166 next_packet_ids: VecDeque::default(),
167
168 in_packets: VecDeque::default(),
169 cio: ClientIO::None,
170 }
171 }
172}
173
174pub struct Client {
176 client_id: ClientID,
177 raddr: net::SocketAddr,
178 protocol_version: v5::MqttProtocol,
179 connect_timeout: Option<time::Duration>,
180 read_timeout: Option<time::Duration>,
181 write_timeout: Option<time::Duration>,
182 nodelay: Option<bool>,
183 ttl: Option<u32>,
184 max_packet_size: u32,
185 connopts: ConnectOptions,
187 connect_properties: Option<v5::ConnectProperties>,
188 connect_payload: v5::ConnectPayload,
189 connack: v5::ConnAck,
191
192 last_rcvd: time::Instant,
193 last_sent: time::Instant,
194 rd_deadline: Option<time::Instant>, wt_deadline: Option<time::Instant>, next_packet_ids: VecDeque<PacketID>,
197
198 in_packets: VecDeque<v5::Packet>,
199 cio: ClientIO,
200}
201
202impl Drop for Client {
203 fn drop(&mut self) {
204 match &self.cio {
205 ClientIO::None => (),
206 _ => {
207 let disconnect = {
208 let code = DisconnReasonCode::NormalDisconnect as u8;
209 v5::Disconnect {
210 code: ReasonCode::try_from(code).unwrap(),
211 properties: None,
212 }
213 };
214 if let Err(err) = self.write(v5::Packet::Disconnect(disconnect)) {
215 error!(
216 "client_id:{:?} raddr:{:?} drop error:{}",
217 self.client_id, self.raddr, err
218 )
219 }
220 }
221 }
222 }
223}
224
225impl Client {
227 pub fn split_rw(mut self) -> io::Result<(Client, Client)> {
231 let cio = mem::replace(&mut self.cio, ClientIO::None);
232
233 let (rd_cio, wt_cio) = cio.split_sock()?;
234 let reader = Client {
235 client_id: self.client_id.clone(),
236 raddr: self.raddr,
237 protocol_version: self.protocol_version,
238 connect_timeout: self.connect_timeout,
239 read_timeout: self.read_timeout,
240 write_timeout: self.write_timeout,
241 nodelay: self.nodelay,
242 ttl: self.ttl,
243 max_packet_size: self.max_packet_size,
244 connopts: self.connopts,
246 connect_properties: self.connect_properties.clone(),
247 connect_payload: self.connect_payload.clone(),
248 connack: self.connack.clone(),
250
251 last_rcvd: self.last_sent,
252 last_sent: self.last_rcvd,
253 rd_deadline: None,
254 wt_deadline: None,
255 next_packet_ids: VecDeque::default(),
256
257 in_packets: VecDeque::default(),
258 cio: rd_cio,
259 };
260
261 let _none = mem::replace(&mut self.cio, wt_cio);
262 Ok((reader, self))
263 }
264
265 pub fn reconnect(mut self) -> io::Result<Self> {
269 let sock = match self.connect_timeout {
270 Some(timeout) => net::TcpStream::connect_timeout(&self.raddr, timeout)?,
271 None => net::TcpStream::connect(&self.raddr)?,
272 };
273 sock.set_read_timeout(self.read_timeout)?;
274 sock.set_write_timeout(self.write_timeout)?;
275 if let Some(nodelay) = self.nodelay {
276 sock.set_nodelay(nodelay)?
277 }
278 if let Some(ttl) = self.ttl {
279 sock.set_ttl(ttl)?
280 }
281
282 let (cio, connack) = {
283 let connect = self.to_connect(false );
284 let blocking = self.cio.is_blocking();
285 ClientIO::handshake(&mut self, connect, sock, blocking)?
286 };
287 self.cio = cio;
288 self.next_packet_ids = (1..connack.receive_maximum()).collect();
289 self.connack = connack;
290
291 Ok(self)
292 }
293
294 fn to_connect(&self, clean_start: bool) -> v5::Connect {
295 let mut flags = vec![];
296
297 if clean_start {
298 flags.push(v5::ConnectFlags::CLEAN_START)
299 }
300 if self.is_will() {
301 let will_qos = self.connopts.will_qos.unwrap_or(QoS::AtMostOnce);
302 flags.push(v5::ConnectFlags::WILL_FLAG);
303 flags.push(v5::ConnectFlags::from(will_qos));
304 if let Some(true) = self.connopts.will_retain {
305 flags.push(v5::ConnectFlags::WILL_RETAIN)
306 }
307 }
308 if let Some(_) = &self.connect_payload.username {
309 flags.push(v5::ConnectFlags::USERNAME)
310 }
311 if let Some(_) = &self.connect_payload.password {
312 flags.push(v5::ConnectFlags::PASSWORD)
313 }
314
315 let mut connect = v5::Connect {
316 protocol_name: "MQTT".to_string(),
317 protocol_version: self.protocol_version,
318 flags: v5::ConnectFlags::new(&flags),
319 keep_alive: self.connopts.keep_alive,
320 properties: self.connect_properties.clone(),
321 payload: self.connect_payload.clone(),
322 };
323 connect.normalize();
324
325 connect
326 }
327
328 fn is_will(&self) -> bool {
329 self.connect_payload.will_topic.is_some()
330 && self.connect_payload.will_properties.is_some()
331 && self.connect_payload.will_payload.is_some()
332 }
333}
334
335impl Client {
337 pub fn local_addr(&self) -> io::Result<net::SocketAddr> {
339 self.cio.local_addr()
340 }
341
342 pub fn peer_addr(&self) -> io::Result<net::SocketAddr> {
344 self.cio.peer_addr()
345 }
346
347 pub fn nodelay(&self) -> io::Result<bool> {
349 self.cio.nodelay()
350 }
351
352 pub fn read_timeout(&self) -> io::Result<Option<time::Duration>> {
354 self.cio.read_timeout()
355 }
356
357 pub fn write_timeout(&self) -> io::Result<Option<time::Duration>> {
359 self.cio.write_timeout()
360 }
361
362 pub fn ttl(&self) -> io::Result<u32> {
364 self.cio.ttl()
365 }
366}
367
368impl Client {
370 pub fn keep_alive(&self) -> u16 {
374 match &self.connack.properties {
375 Some(props) => match props.server_keep_alive {
376 Some(keep_alive) => keep_alive,
377 None => self.connopts.keep_alive,
378 },
379 None => self.connopts.keep_alive,
380 }
381 }
382
383 pub fn elapsed(&self) -> time::Duration {
385 self.last_rcvd.elapsed()
386 }
387
388 pub fn expired(&self) -> bool {
391 match self.keep_alive() {
392 0 => false,
393 keep_alive => {
394 let keep_alive = time::Duration::from_secs(keep_alive as u64).as_micros();
395 keep_alive < self.last_sent.elapsed().as_micros()
396 }
397 }
398 }
399
400 pub fn session_present(&self) -> bool {
403 self.connack.flags.unwrap().unwrap()
404 }
405
406 pub fn ping(&mut self) -> io::Result<()> {
408 self.write(v5::Packet::PingReq)?;
409
410 match self.read()? {
411 v5::Packet::PingResp => Ok(()),
412 pkt => {
413 let msg = format!("expected PingResp, got {:?}", pkt.to_packet_type());
414 Err(io::Error::new(io::ErrorKind::InvalidData, msg))
415 }
416 }?;
417
418 Ok(())
419 }
420}
421
422impl Client {
424 pub fn subscribe(&mut self, mut sub: v5::Subscribe) -> io::Result<v5::SubAck> {
449 sub.packet_id = self.acquire_packet_id(false ).ok().unwrap();
450 self.write(v5::Packet::Subscribe(sub.clone()))?;
451 self.cio_read_sub_ack(&sub)
452 }
453
454 pub fn disconnect(&mut self, code: DisconnReasonCode) -> io::Result<()> {
460 let code = match ReasonCode::try_from(code as u8) {
461 Ok(val) => Ok(val),
462 Err(err) => Err(io::Error::new(io::ErrorKind::InvalidInput, err.to_string())),
463 }?;
464 let disconnect = v5::Disconnect { code, properties: None };
465 self.write(v5::Packet::Disconnect(disconnect))
466 }
467
468 pub fn read(&mut self) -> io::Result<v5::Packet> {
470 match self.in_packets.pop_front() {
471 Some(packet) => Ok(packet),
472 None => {
473 self.cio_read()?;
474 Ok(self.in_packets.pop_front().unwrap())
475 }
476 }
477 }
478
479 pub fn read_noblock(&mut self) -> io::Result<v5::Packet> {
482 match self.in_packets.pop_front() {
483 Some(packet) => Ok(packet),
484 None => {
485 self.cio_read_noblock()?;
486 Ok(self.in_packets.pop_front().unwrap())
487 }
488 }
489 }
490
491 pub fn write(&mut self, packet: v5::Packet) -> io::Result<()> {
493 self.try_read()?;
494
495 let mut cio = mem::replace(&mut self.cio, ClientIO::None);
496 let res = cio.write(self, packet);
497 let _none = mem::replace(&mut self.cio, cio);
498
499 self.last_sent = time::Instant::now();
500 res
501 }
502
503 pub fn write_noblock(&mut self, packet: Option<v5::Packet>) -> io::Result<()> {
509 self.try_read()?;
510
511 let mut cio = mem::replace(&mut self.cio, ClientIO::None);
512 let res = cio.write_noblock(self, packet);
513 let _none = mem::replace(&mut self.cio, cio);
514
515 self.last_sent = time::Instant::now();
516 res
517 }
518
519 fn try_read(&mut self) -> io::Result<()> {
520 match self.read_noblock() {
521 Ok(packet) => {
522 self.in_packets.push_back(packet);
523 Ok(())
524 }
525 Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
526 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
527 Err(err) => Err(err),
528 }
529 }
530
531 fn cio_read(&mut self) -> io::Result<()> {
532 let mut cio = mem::replace(&mut self.cio, ClientIO::None);
533 let res = cio.read(self);
534 let _none = mem::replace(&mut self.cio, cio);
535
536 self.in_packets.push_back(res?);
537
538 self.last_rcvd = time::Instant::now();
539 Ok(())
540 }
541
542 fn cio_read_noblock(&mut self) -> io::Result<()> {
543 let mut cio = mem::replace(&mut self.cio, ClientIO::None);
544 let res = cio.read_noblock(self);
545 let _none = mem::replace(&mut self.cio, cio);
546
547 self.in_packets.push_back(res?);
548
549 self.last_rcvd = time::Instant::now();
550 Ok(())
551 }
552
553 fn cio_read_sub_ack(&mut self, sub: &v5::Subscribe) -> io::Result<v5::SubAck> {
554 loop {
555 self.cio_read()?;
556 match self.in_packets.pop_front() {
557 Some(v5::Packet::SubAck(sa)) if sa.packet_id == sub.packet_id => {
558 break Ok(sa);
559 }
560 Some(v5::Packet::SubAck(sa)) => warn!(
561 "sub-ack mismatch in packet_id {} != {}",
562 sa.packet_id, sub.packet_id
563 ),
564 Some(pkt) => self.in_packets.push_back(pkt),
565 None => unreachable!(),
566 }
567 }
568 }
569}
570
571impl Client {
572 pub fn as_mut_mio_tcpstream(&mut self) -> &mut mio::net::TcpStream {
576 use ClientIO::*;
577
578 match &mut self.cio {
579 Blocking { .. } | BlockRd { .. } | BlockWt { .. } => {
580 panic!("cannot use mio on standard socket")
581 }
582 NoBlock { sock, .. } | NoBlockRd { sock, .. } | NoBlockWt { sock, .. } => {
583 sock
584 }
585 ClientIO::None => unreachable!(),
586 }
587 }
588
589 fn acquire_packet_id(&mut self, publish: bool) -> Result<PacketID> {
590 if publish {
591 match self.next_packet_ids.pop_front() {
592 Some(packet_id) => Ok(packet_id),
593 None => err!(ProtocolError, code: ExceededReceiveMaximum, ""),
594 }
595 } else {
596 Ok(0)
597 }
598 }
599
600 fn release_packet_id(&mut self, packet_id: PacketID) {
601 self.next_packet_ids.push_back(packet_id);
602 }
603}
604
605#[allow(dead_code)]
606enum ClientIO {
607 Blocking {
608 sock: net::TcpStream,
609 pktr: v5::MQTTRead,
610 pktw: v5::MQTTWrite,
611 },
612 NoBlock {
613 sock: mio::net::TcpStream,
614 pktr: v5::MQTTRead,
615 pktw: v5::MQTTWrite,
616 },
617 BlockRd {
618 sock: net::TcpStream,
619 pktr: v5::MQTTRead,
620 },
621 BlockWt {
622 sock: net::TcpStream,
623 pktw: v5::MQTTWrite,
624 },
625 NoBlockRd {
626 sock: mio::net::TcpStream,
627 pktr: v5::MQTTRead,
628 },
629 NoBlockWt {
630 sock: mio::net::TcpStream,
631 pktw: v5::MQTTWrite,
632 },
633 None,
634}
635
636impl fmt::Debug for ClientIO {
637 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
638 match self {
639 ClientIO::Blocking { .. } => write!(f, "ClientIO::Blocking"),
640 ClientIO::NoBlock { .. } => write!(f, "ClientIO::NoBlock"),
641 ClientIO::BlockRd { .. } => write!(f, "ClientIO::BlockRd"),
642 ClientIO::BlockWt { .. } => write!(f, "ClientIO::BlockWt"),
643 ClientIO::NoBlockRd { .. } => write!(f, "ClientIO::NoBlockRd"),
644 ClientIO::NoBlockWt { .. } => write!(f, "ClientIO::NoBlockWt"),
645 ClientIO::None { .. } => write!(f, "ClientIO::None"),
646 }
647 }
648}
649
650impl ClientIO {
651 fn handshake(
652 client: &mut Client,
653 connect: v5::Connect,
654 mut sock: net::TcpStream,
655 blocking: bool,
656 ) -> io::Result<(ClientIO, v5::ConnAck)> {
657 let max_packet_size = connect.max_packet_size(client.max_packet_size);
658
659 let mut pktr = v5::MQTTRead::new(max_packet_size);
660 let mut pktw = v5::MQTTWrite::new(&[], max_packet_size);
661
662 write_packet(
663 client,
664 &mut sock,
665 &mut pktw,
666 Some(v5::Packet::Connect(connect)),
667 true, )?;
669
670 let (val, connack) = match read_packet(client, &mut sock, &mut pktr, true)? {
671 v5::Packet::ConnAck(connack) => (pktr, connack),
672 pkt => {
673 let msg = format!("unexpected in handshake {:?}", pkt.to_packet_type());
674 Err(io::Error::new(io::ErrorKind::InvalidData, msg))?
675 }
676 };
677 connack
678 .validate()
679 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
680
681 pktr = val;
682
683 let cio = match blocking {
684 true => ClientIO::Blocking { sock, pktr, pktw },
685 false => {
686 let sock = mio::net::TcpStream::from_std(sock);
687 ClientIO::NoBlock { sock, pktr, pktw }
688 }
689 };
690
691 Ok((cio, connack))
692 }
693
694 fn split_sock(self) -> io::Result<(ClientIO, ClientIO)> {
695 match self {
696 ClientIO::Blocking { sock: rd_sock, pktr, pktw } => {
697 let wt_sock = rd_sock.try_clone()?;
698 rd_sock.shutdown(net::Shutdown::Write)?;
699 wt_sock.shutdown(net::Shutdown::Read)?;
700 let rd = ClientIO::BlockRd { sock: rd_sock, pktr };
701 let wt = ClientIO::BlockWt { sock: wt_sock, pktw };
702 Ok((rd, wt))
703 }
704 ClientIO::NoBlock { sock, pktr, pktw } => {
705 #[cfg(windows)]
706 let rd_sock =
707 unsafe { net::TcpStream::from_raw_socket(sock.into_raw_socket()) };
708 #[cfg(unix)]
709 let rd_sock = unsafe { net::TcpStream::from_raw_fd(sock.into_raw_fd()) };
710
711 let wt_sock = rd_sock.try_clone()?;
712 rd_sock.shutdown(net::Shutdown::Write)?;
713 wt_sock.shutdown(net::Shutdown::Read)?;
714 let rd = ClientIO::NoBlockRd {
715 sock: mio::net::TcpStream::from_std(rd_sock),
716 pktr,
717 };
718 let wt = ClientIO::NoBlockWt {
719 sock: mio::net::TcpStream::from_std(wt_sock),
720 pktw,
721 };
722 Ok((rd, wt))
723 }
724 _ => unreachable!(),
725 }
726 }
727
728 fn local_addr(&self) -> io::Result<net::SocketAddr> {
729 match self {
730 ClientIO::Blocking { sock, .. } => sock.local_addr(),
731 ClientIO::NoBlock { sock, .. } => sock.local_addr(),
732 ClientIO::BlockRd { sock, .. } => sock.local_addr(),
733 ClientIO::BlockWt { sock, .. } => sock.local_addr(),
734 ClientIO::NoBlockRd { sock, .. } => sock.local_addr(),
735 ClientIO::NoBlockWt { sock, .. } => sock.local_addr(),
736 ClientIO::None => unreachable!(),
737 }
738 }
739
740 fn peer_addr(&self) -> io::Result<net::SocketAddr> {
741 match self {
742 ClientIO::Blocking { sock, .. } => sock.peer_addr(),
743 ClientIO::NoBlock { sock, .. } => sock.peer_addr(),
744 ClientIO::BlockRd { sock, .. } => sock.peer_addr(),
745 ClientIO::BlockWt { sock, .. } => sock.peer_addr(),
746 ClientIO::NoBlockRd { sock, .. } => sock.peer_addr(),
747 ClientIO::NoBlockWt { sock, .. } => sock.peer_addr(),
748 ClientIO::None => unreachable!(),
749 }
750 }
751
752 fn nodelay(&self) -> io::Result<bool> {
753 match self {
754 ClientIO::Blocking { sock, .. } => sock.nodelay(),
755 ClientIO::NoBlock { sock, .. } => sock.nodelay(),
756 ClientIO::BlockRd { sock, .. } => sock.nodelay(),
757 ClientIO::BlockWt { sock, .. } => sock.nodelay(),
758 ClientIO::NoBlockRd { sock, .. } => sock.nodelay(),
759 ClientIO::NoBlockWt { sock, .. } => sock.nodelay(),
760 ClientIO::None => unreachable!(),
761 }
762 }
763
764 fn read_timeout(&self) -> io::Result<Option<time::Duration>> {
765 match self {
766 ClientIO::Blocking { sock, .. } => sock.read_timeout(),
767 ClientIO::NoBlock { .. } => Ok(None),
768 ClientIO::BlockRd { sock, .. } => sock.read_timeout(),
769 ClientIO::BlockWt { sock, .. } => sock.read_timeout(),
770 ClientIO::NoBlockRd { .. } => Ok(None),
771 ClientIO::NoBlockWt { .. } => Ok(None),
772 ClientIO::None => unreachable!(),
773 }
774 }
775
776 fn write_timeout(&self) -> io::Result<Option<time::Duration>> {
777 match self {
778 ClientIO::Blocking { sock, .. } => sock.write_timeout(),
779 ClientIO::NoBlock { .. } => Ok(None),
780 ClientIO::BlockRd { sock, .. } => sock.write_timeout(),
781 ClientIO::BlockWt { sock, .. } => sock.write_timeout(),
782 ClientIO::NoBlockRd { .. } => Ok(None),
783 ClientIO::NoBlockWt { .. } => Ok(None),
784 ClientIO::None => unreachable!(),
785 }
786 }
787
788 fn ttl(&self) -> io::Result<u32> {
789 match self {
790 ClientIO::Blocking { sock, .. } => sock.ttl(),
791 ClientIO::NoBlock { sock, .. } => sock.ttl(),
792 ClientIO::BlockRd { sock, .. } => sock.ttl(),
793 ClientIO::BlockWt { sock, .. } => sock.ttl(),
794 ClientIO::NoBlockRd { sock, .. } => sock.ttl(),
795 ClientIO::NoBlockWt { sock, .. } => sock.ttl(),
796 ClientIO::None => unreachable!(),
797 }
798 }
799
800 fn is_blocking(&self) -> bool {
801 use ClientIO::*;
802
803 match self {
804 Blocking { .. } | BlockRd { .. } | BlockWt { .. } => true,
805 NoBlock { .. } | NoBlockRd { .. } | NoBlockWt { .. } => false,
806 None => unreachable!(),
807 }
808 }
809}
810
811impl ClientIO {
812 fn read(&mut self, client: &mut Client) -> io::Result<v5::Packet> {
813 match self {
814 ClientIO::Blocking { sock, pktr, .. } => {
815 read_packet(client, sock, pktr, true)
816 }
817 ClientIO::NoBlock { sock, pktr, .. } => {
818 read_packet(client, sock, pktr, false)
819 }
820 ClientIO::BlockRd { sock, pktr, .. } => {
821 read_packet(client, sock, pktr, true)
823 }
824 ClientIO::NoBlockRd { sock, pktr, .. } => {
825 read_packet(client, sock, pktr, false)
826 }
827 ClientIO::None => {
828 let s = format!("disconnected while Client::read");
829 Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
830 }
831 _ => unreachable!(),
832 }
833 }
834
835 fn read_noblock(&mut self, client: &mut Client) -> io::Result<v5::Packet> {
836 match self {
837 ClientIO::Blocking { sock, pktr, .. } => {
838 sock.set_nonblocking(true)?;
839 let res = read_packet(client, sock, pktr, false);
840 sock.set_nonblocking(false)?;
841 res
842 }
843 ClientIO::NoBlock { sock, pktr, .. } => {
844 read_packet(client, sock, pktr, false)
845 }
846 ClientIO::BlockRd { sock, pktr, .. } => {
847 sock.set_nonblocking(true)?;
848 let res = read_packet(client, sock, pktr, false);
849 sock.set_nonblocking(false)?;
850 res
851 }
852 ClientIO::NoBlockRd { sock, pktr, .. } => {
853 read_packet(client, sock, pktr, false)
854 }
855 ClientIO::None => {
856 let s = format!("disconnected while Client::read_noblock");
857 Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
858 }
859 _ => unreachable!(),
860 }
861 }
862
863 fn write(&mut self, client: &mut Client, pkt: v5::Packet) -> io::Result<()> {
864 match self {
865 ClientIO::Blocking { sock, pktw, .. } => {
866 write_packet(client, sock, pktw, Some(pkt), true)
867 }
868 ClientIO::NoBlock { sock, pktw, .. } => {
869 write_packet(client, sock, pktw, Some(pkt), true)
870 }
871 ClientIO::BlockWt { sock, pktw, .. } => {
872 write_packet(client, sock, pktw, Some(pkt), true)
873 }
874 ClientIO::NoBlockWt { sock, pktw, .. } => {
875 write_packet(client, sock, pktw, Some(pkt), true)
876 }
877 ClientIO::None => {
878 let s = format!("disconnected while Client::write");
879 Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
880 }
881 cio => unreachable!("{:?}", cio),
882 }
883 }
884
885 fn write_noblock(
886 &mut self,
887 client: &mut Client,
888 pkt: Option<v5::Packet>,
889 ) -> io::Result<()> {
890 match self {
891 ClientIO::Blocking { sock, pktw, .. } => {
892 write_packet(client, sock, pktw, pkt, false)
893 }
894 ClientIO::NoBlock { sock, pktw, .. } => {
895 write_packet(client, sock, pktw, pkt, false)
896 }
897 ClientIO::BlockWt { sock, pktw, .. } => {
898 write_packet(client, sock, pktw, pkt, false)
899 }
900 ClientIO::NoBlockWt { sock, pktw, .. } => {
901 write_packet(client, sock, pktw, pkt, false)
902 }
903 ClientIO::None => {
904 let s = format!("disconnected while Client::write_noblock");
905 Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
906 }
907 cio => unreachable!("{:?}", cio),
908 }
909 }
910}
911
912impl Client {
913 fn read_elapsed(&self) -> bool {
914 match &self.rd_deadline {
915 Some(deadline) if &time::Instant::now() > deadline => true,
916 Some(_) | None => false,
917 }
918 }
919
920 fn set_read_timeout(&mut self, timeout: Option<time::Duration>) {
921 match timeout {
922 Some(timeout) => {
923 let now = time::Instant::now();
924 self.rd_deadline = Some(now.checked_add(timeout).unwrap());
925 }
926 None => {
927 self.rd_deadline = None;
928 }
929 }
930 }
931
932 fn write_elapsed(&self) -> bool {
933 match &self.wt_deadline {
934 Some(deadline) if &time::Instant::now() > deadline => true,
935 Some(_) | None => false,
936 }
937 }
938
939 fn set_write_timeout(&mut self, timeout: Option<time::Duration>) {
940 match timeout {
941 Some(timeout) => {
942 let now = time::Instant::now();
943 self.rd_deadline = Some(now.checked_add(timeout).unwrap());
944 }
945 None => {
946 self.rd_deadline = None;
947 }
948 }
949 }
950}
951
952fn read_packet<R>(
953 client: &mut Client,
954 sock: &mut R,
955 pktr: &mut v5::MQTTRead,
956 block: bool,
957) -> io::Result<v5::Packet>
958where
959 R: io::Read,
960{
961 use crate::v5::MQTTRead::{Fin, Header, Init, Remain};
962
963 client.set_read_timeout(client.read_timeout);
964
965 let mut pr = mem::replace(pktr, v5::MQTTRead::default());
966 let max_packet_size = pr.to_max_packet_size();
967
968 let res = loop {
969 pr = match pr.read(sock) {
970 Ok((val, true)) if !block => {
971 pr = val;
972 break Err(io::Error::new(io::ErrorKind::WouldBlock, ""));
973 }
974 Ok((val, _)) => val,
975 Err(err) if err.kind() == ErrorKind::MalformedPacket => {
976 pr = v5::MQTTRead::new(max_packet_size);
977 let s = format!("malformed packet from v5::MQTTRead");
978 break Err(io::Error::new(io::ErrorKind::InvalidData, s));
979 }
980 Err(err) if err.kind() == ErrorKind::ProtocolError => {
981 pr = v5::MQTTRead::new(max_packet_size);
982 let s = format!("protocol error from v5::MQTTRead");
983 break Err(io::Error::new(io::ErrorKind::InvalidData, s));
984 }
985 Err(err) if err.kind() == ErrorKind::Disconnected => {
986 pr = v5::MQTTRead::new(max_packet_size);
987 let s = format!("client disconnected in v5::MQTTRead");
988 break Err(io::Error::new(io::ErrorKind::ConnectionReset, s));
989 }
990 Err(err) => unreachable!("unexpected error {}", err),
991 };
992
993 match &pr {
994 Init { .. } | Header { .. } | Remain { .. } if !client.read_elapsed() => {
995 trace!("read retrying");
996 }
997 Init { .. } | Header { .. } | Remain { .. } => {
998 let s = format!("disconnect, read timesout {:?}", client.read_timeout);
999 break Err(io::Error::new(io::ErrorKind::TimedOut, s));
1000 }
1001 Fin { .. } => {
1002 client.set_read_timeout(None);
1003 match pr.parse() {
1004 Ok(pkt) => {
1005 pr = pr.reset();
1006 break Ok(pkt);
1007 }
1008 Err(err) => {
1009 pr = pr.reset();
1010 let s = err.to_string();
1011 break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1012 }
1013 }
1014 }
1015 v5::MQTTRead::None => unreachable!(),
1016 };
1017 };
1018
1019 let _none = mem::replace(pktr, pr);
1020 res
1021}
1022
1023fn write_packet<W>(
1024 client: &mut Client,
1025 sock: &mut W,
1026 pktw: &mut v5::MQTTWrite,
1027 pkt: Option<v5::Packet>,
1028 block: bool,
1029) -> io::Result<()>
1030where
1031 W: io::Write,
1032{
1033 use crate::v5::MQTTWrite::{Fin, Init, Remain};
1034
1035 client.set_write_timeout(client.write_timeout);
1036
1037 let mut pw = mem::replace(pktw, v5::MQTTWrite::default());
1038 let max_packet_size = pw.to_max_packet_size();
1039
1040 if let Some(pkt) = pkt {
1041 match pkt.encode() {
1042 Ok(blob) => {
1043 pw = pw.reset(blob.as_ref());
1044 }
1045 Err(err) => {
1046 pw = mem::replace(pktw, pw);
1047 Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
1048 }
1049 }
1050 }
1051
1052 let res = loop {
1053 pw = match pw.write(sock) {
1054 Ok((val, true)) if !block => {
1055 pw = val;
1056 break Err(io::Error::new(io::ErrorKind::WouldBlock, ""));
1057 }
1058 Ok((val, _)) => val,
1059 Err(err) if err.kind() == ErrorKind::MalformedPacket => {
1060 pw = v5::MQTTWrite::new(&[], max_packet_size);
1061 let s = format!("malformed packet in v5::MQTTWrite");
1062 break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1063 }
1064 Err(err) if err.kind() == ErrorKind::ProtocolError => {
1065 pw = v5::MQTTWrite::new(&[], max_packet_size);
1066 let s = format!("protocol error in v5::MQTTWrite");
1067 break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1068 }
1069 Err(err) if err.kind() == ErrorKind::Disconnected => {
1070 pw = v5::MQTTWrite::new(&[], max_packet_size);
1071 let s = format!("client disconnected in v5::MQTTWrite");
1072 break Err(io::Error::new(io::ErrorKind::ConnectionReset, s));
1073 }
1074 Err(err) => unreachable!("unexpected error {}", err),
1075 };
1076
1077 match &pw {
1078 Init { .. } | Remain { .. } if !client.write_elapsed() => {
1079 trace!("write retrying");
1080 }
1081 Init { .. } | Remain { .. } => {
1082 let s = format!("packet write fail after {:?}", client.write_timeout);
1083 break Err(io::Error::new(io::ErrorKind::TimedOut, s));
1084 }
1085 Fin { .. } => {
1086 client.set_write_timeout(None);
1087 break Ok(());
1088 }
1089 v5::MQTTWrite::None => unreachable!(),
1090 };
1091 };
1092
1093 let _none = mem::replace(pktw, pw);
1094 res
1095}
1096
1097const PP: &'static str = "Client::Disconnect";
1098
1099#[cfg_attr(any(feature = "fuzzy", test), derive(Arbitrary))]
1100#[derive(Clone, Copy, Eq, PartialEq, Debug)]
1101pub enum DisconnReasonCode {
1102 NormalDisconnect = 0x00,
1103 DisconnectWillMessage = 0x04,
1104 UnspecifiedError = 0x80,
1105 MalformedPacket = 0x81,
1106 ProtocolError = 0x82,
1107 ImplementationError = 0x83,
1108 TopicNameInvalid = 0x90,
1109 ExceededReceiveMaximum = 0x93,
1110 TopicAliasInvalid = 0x94,
1111 PacketTooLarge = 0x95,
1112 ExceedMessageRate = 0x96,
1113 QuotaExceeded = 0x97,
1114 AdminAction = 0x98,
1115 PayloadFormatInvalid = 0x99,
1116}
1117
1118impl From<DisconnReasonCode> for u8 {
1119 fn from(val: DisconnReasonCode) -> u8 {
1120 use DisconnReasonCode::*;
1121
1122 match val {
1123 NormalDisconnect => 0x00,
1124 DisconnectWillMessage => 0x04,
1125 UnspecifiedError => 0x80,
1126 MalformedPacket => 0x81,
1127 ProtocolError => 0x82,
1128 ImplementationError => 0x83,
1129 TopicNameInvalid => 0x90,
1130 ExceededReceiveMaximum => 0x93,
1131 TopicAliasInvalid => 0x94,
1132 PacketTooLarge => 0x95,
1133 ExceedMessageRate => 0x96,
1134 QuotaExceeded => 0x97,
1135 AdminAction => 0x98,
1136 PayloadFormatInvalid => 0x99,
1137 }
1138 }
1139}
1140
1141impl TryFrom<u8> for DisconnReasonCode {
1142 type Error = Error;
1143
1144 fn try_from(val: u8) -> Result<DisconnReasonCode> {
1145 match val {
1146 0x00 => Ok(DisconnReasonCode::NormalDisconnect),
1147 0x04 => Ok(DisconnReasonCode::DisconnectWillMessage),
1148 0x80 => Ok(DisconnReasonCode::UnspecifiedError),
1149 0x81 => Ok(DisconnReasonCode::MalformedPacket),
1150 0x82 => Ok(DisconnReasonCode::ProtocolError),
1151 0x83 => Ok(DisconnReasonCode::ImplementationError),
1152 0x90 => Ok(DisconnReasonCode::TopicNameInvalid),
1153 0x93 => Ok(DisconnReasonCode::ExceededReceiveMaximum),
1154 0x94 => Ok(DisconnReasonCode::TopicAliasInvalid),
1155 0x95 => Ok(DisconnReasonCode::PacketTooLarge),
1156 0x96 => Ok(DisconnReasonCode::ExceedMessageRate),
1157 0x97 => Ok(DisconnReasonCode::QuotaExceeded),
1158 0x98 => Ok(DisconnReasonCode::AdminAction),
1159 0x99 => Ok(DisconnReasonCode::PayloadFormatInvalid),
1160 val => {
1161 err!(
1162 MalformedPacket,
1163 code: MalformedPacket,
1164 " {} reason-code {}",
1165 PP,
1166 val
1167 )
1168 }
1169 }
1170 }
1171}