mymq/v5/
client.rs

1//! Module implement MQTT Client.
2
3#[cfg(any(feature = "fuzzy", test))]
4use arbitrary::Arbitrary;
5use log::{error, trace, warn};
6
7#[cfg(unix)]
8use std::os::unix::io::{FromRawFd, IntoRawFd};
9#[cfg(windows)]
10use std::os::unix::io::{FromRawSocket, IntoRawSocket};
11
12use std::{collections::VecDeque, fmt, io, mem, net, result, time};
13
14use crate::{v5, ClientID, PacketID, Packetize, QoS};
15use crate::{Error, ErrorKind, ReasonCode, Result};
16
17pub const CLIENT_MAX_PACKET_SIZE: u32 = 1024 * 1024;
18
19/// MQTT CONNECT flags and headers
20#[derive(Clone, Copy)]
21pub struct ConnectOptions {
22    pub will_qos: Option<QoS>,
23    pub will_retain: Option<bool>,
24    pub keep_alive: u16,
25}
26
27impl Default for ConnectOptions {
28    fn default() -> ConnectOptions {
29        ConnectOptions { will_qos: None, will_retain: None, keep_alive: 0 }
30    }
31}
32
33/// ClientBuilder to create a customized [Client].
34pub struct ClientBuilder {
35    pub protocol_version: v5::MqttProtocol,
36    /// Provide unique client identifier, if missing, will be sent empty in CONNECT.
37    pub client_id: Option<ClientID>,
38    /// Socket settings for blocking io, refer [net::TcpStream::connect_timeout].
39    /// Defaults to None.
40    pub connect_timeout: Option<time::Duration>,
41    /// Socket settings for blocking io, refer [net::TcpStream::set_read_timeout].
42    /// Defaults to None.
43    pub read_timeout: Option<time::Duration>,
44    /// Socket settings for blocking io, refer [net::TcpStream::set_write_timeout].
45    /// Defaults to None.
46    pub write_timeout: Option<time::Duration>,
47    /// Socket settings, refer [net::TcpStream::set_nodelay].
48    /// Defaults to None.
49    pub nodelay: Option<bool>,
50    /// Socket settings, refer [net::TcpStream::set_ttl].
51    /// Defaults to None.
52    pub ttl: Option<u32>,
53    /// Maximum packet size,
54    pub max_packet_size: u32,
55    // CONNECT options
56    pub connopts: ConnectOptions,
57    pub connect_properties: Option<v5::ConnectProperties>,
58    pub connect_payload: v5::ConnectPayload,
59}
60
61impl Default for ClientBuilder {
62    fn default() -> ClientBuilder {
63        ClientBuilder {
64            client_id: Some(ClientID::new_uuid_v4()),
65            connect_timeout: None,
66            read_timeout: None,
67            write_timeout: None,
68            nodelay: None,
69            ttl: None,
70            max_packet_size: CLIENT_MAX_PACKET_SIZE,
71            // CONNECT options
72            connopts: ConnectOptions::default(),
73            connect_properties: Some(v5::ConnectProperties::default()),
74            connect_payload: v5::ConnectPayload::default(),
75
76            protocol_version: v5::MqttProtocol::V5,
77        }
78    }
79}
80
81impl ClientBuilder {
82    /// Connection with `remote` and start a synchronous client. All read/write calls
83    /// and other communication methods, on the returned client, shall block.
84    ///
85    /// NOTE: This call shall block until CONNACK is successfully received from remote.
86    pub fn connect(self, raddr: net::SocketAddr) -> io::Result<Client> {
87        let sock = match self.connect_timeout {
88            Some(timeout) => net::TcpStream::connect_timeout(&raddr, timeout)?,
89            None => net::TcpStream::connect(&raddr)?,
90        };
91        sock.set_read_timeout(self.read_timeout)?;
92        sock.set_write_timeout(self.write_timeout)?;
93        if let Some(nodelay) = self.nodelay {
94            sock.set_nodelay(nodelay)?
95        }
96        if let Some(ttl) = self.ttl {
97            sock.set_ttl(ttl)?
98        }
99
100        let mut client = self.into_client(raddr);
101
102        let (cio, connack) = {
103            let connect = client.to_connect(true /*clean_start*/);
104            let blocking = true;
105            ClientIO::handshake(&mut client, connect, sock, blocking)?
106        };
107
108        client.cio = cio;
109        client.next_packet_ids = (1..connack.receive_maximum()).collect();
110        client.connack = connack;
111
112        Ok(client)
113    }
114
115    /// Connection with `remote` and start an asynchronous client. All read/write calls
116    /// and other communication methods, on the returned client, shall not block.
117    /// Application will have to check for [io::ErrorKind::WouldBlock] and
118    /// [io::ErrorKind::Interrupted] returns.
119    ///
120    /// NOTE: This call shall block until CONNACK is successfully received from remote.
121    pub fn connect_noblock(self, raddr: net::SocketAddr) -> io::Result<Client> {
122        let sock = net::TcpStream::connect(raddr)?;
123        if let Some(nodelay) = self.nodelay {
124            sock.set_nodelay(nodelay)?
125        }
126        if let Some(ttl) = self.ttl {
127            sock.set_ttl(ttl)?
128        }
129
130        let mut client = self.into_client(raddr);
131
132        let (cio, connack) = {
133            let connect = client.to_connect(true /*clean_start*/);
134            let blocking = false;
135            ClientIO::handshake(&mut client, connect, sock, blocking)?
136        };
137
138        client.cio = cio;
139        client.next_packet_ids = (1..connack.receive_maximum()).collect();
140        client.connack = connack;
141
142        Ok(client)
143    }
144
145    fn into_client(self, raddr: net::SocketAddr) -> Client {
146        Client {
147            client_id: self.client_id.unwrap_or_else(|| ClientID::new_uuid_v4()),
148            raddr,
149            protocol_version: self.protocol_version,
150            connect_timeout: self.connect_timeout,
151            read_timeout: self.read_timeout,
152            write_timeout: self.write_timeout,
153            nodelay: self.nodelay,
154            ttl: self.ttl,
155            max_packet_size: self.max_packet_size,
156            connopts: self.connopts,
157            connect_properties: self.connect_properties.clone(),
158            connect_payload: self.connect_payload.clone(),
159            // CONNACK options
160            connack: v5::ConnAck::default(),
161
162            last_sent: time::Instant::now(),
163            last_rcvd: time::Instant::now(),
164            rd_deadline: None,
165            wt_deadline: None,
166            next_packet_ids: VecDeque::default(),
167
168            in_packets: VecDeque::default(),
169            cio: ClientIO::None,
170        }
171    }
172}
173
174/// Type to interface with MQTT broker.
175pub struct Client {
176    client_id: ClientID,
177    raddr: net::SocketAddr,
178    protocol_version: v5::MqttProtocol,
179    connect_timeout: Option<time::Duration>,
180    read_timeout: Option<time::Duration>,
181    write_timeout: Option<time::Duration>,
182    nodelay: Option<bool>,
183    ttl: Option<u32>,
184    max_packet_size: u32,
185    // CONNECT options
186    connopts: ConnectOptions,
187    connect_properties: Option<v5::ConnectProperties>,
188    connect_payload: v5::ConnectPayload,
189    // CONNACK options
190    connack: v5::ConnAck,
191
192    last_rcvd: time::Instant,
193    last_sent: time::Instant,
194    rd_deadline: Option<time::Instant>, // defaults to None
195    wt_deadline: Option<time::Instant>, // defaults to None
196    next_packet_ids: VecDeque<PacketID>,
197
198    in_packets: VecDeque<v5::Packet>,
199    cio: ClientIO,
200}
201
202impl Drop for Client {
203    fn drop(&mut self) {
204        match &self.cio {
205            ClientIO::None => (),
206            _ => {
207                let disconnect = {
208                    let code = DisconnReasonCode::NormalDisconnect as u8;
209                    v5::Disconnect {
210                        code: ReasonCode::try_from(code).unwrap(),
211                        properties: None,
212                    }
213                };
214                if let Err(err) = self.write(v5::Packet::Disconnect(disconnect)) {
215                    error!(
216                        "client_id:{:?} raddr:{:?} drop error:{}",
217                        self.client_id, self.raddr, err
218                    )
219                }
220            }
221        }
222    }
223}
224
225/// Client initialization and setup
226impl Client {
227    /// Call this immediately after `connect` or `connect_noblock` on the ClientBuilder,
228    /// else this call might panic. Returns a tuple of
229    /// (read-only-client, write-only-client).
230    pub fn split_rw(mut self) -> io::Result<(Client, Client)> {
231        let cio = mem::replace(&mut self.cio, ClientIO::None);
232
233        let (rd_cio, wt_cio) = cio.split_sock()?;
234        let reader = Client {
235            client_id: self.client_id.clone(),
236            raddr: self.raddr,
237            protocol_version: self.protocol_version,
238            connect_timeout: self.connect_timeout,
239            read_timeout: self.read_timeout,
240            write_timeout: self.write_timeout,
241            nodelay: self.nodelay,
242            ttl: self.ttl,
243            max_packet_size: self.max_packet_size,
244            // CONNECT options
245            connopts: self.connopts,
246            connect_properties: self.connect_properties.clone(),
247            connect_payload: self.connect_payload.clone(),
248            // CONNACK options
249            connack: self.connack.clone(),
250
251            last_rcvd: self.last_sent,
252            last_sent: self.last_rcvd,
253            rd_deadline: None,
254            wt_deadline: None,
255            next_packet_ids: VecDeque::default(),
256
257            in_packets: VecDeque::default(),
258            cio: rd_cio,
259        };
260
261        let _none = mem::replace(&mut self.cio, wt_cio);
262        Ok((reader, self))
263    }
264
265    /// If error is detected on this `Client` instance call this method. Reconnecting
266    /// will connect with the same `remote`, either in block or no-block configuration
267    /// as before.
268    pub fn reconnect(mut self) -> io::Result<Self> {
269        let sock = match self.connect_timeout {
270            Some(timeout) => net::TcpStream::connect_timeout(&self.raddr, timeout)?,
271            None => net::TcpStream::connect(&self.raddr)?,
272        };
273        sock.set_read_timeout(self.read_timeout)?;
274        sock.set_write_timeout(self.write_timeout)?;
275        if let Some(nodelay) = self.nodelay {
276            sock.set_nodelay(nodelay)?
277        }
278        if let Some(ttl) = self.ttl {
279            sock.set_ttl(ttl)?
280        }
281
282        let (cio, connack) = {
283            let connect = self.to_connect(false /*clean_start*/);
284            let blocking = self.cio.is_blocking();
285            ClientIO::handshake(&mut self, connect, sock, blocking)?
286        };
287        self.cio = cio;
288        self.next_packet_ids = (1..connack.receive_maximum()).collect();
289        self.connack = connack;
290
291        Ok(self)
292    }
293
294    fn to_connect(&self, clean_start: bool) -> v5::Connect {
295        let mut flags = vec![];
296
297        if clean_start {
298            flags.push(v5::ConnectFlags::CLEAN_START)
299        }
300        if self.is_will() {
301            let will_qos = self.connopts.will_qos.unwrap_or(QoS::AtMostOnce);
302            flags.push(v5::ConnectFlags::WILL_FLAG);
303            flags.push(v5::ConnectFlags::from(will_qos));
304            if let Some(true) = self.connopts.will_retain {
305                flags.push(v5::ConnectFlags::WILL_RETAIN)
306            }
307        }
308        if let Some(_) = &self.connect_payload.username {
309            flags.push(v5::ConnectFlags::USERNAME)
310        }
311        if let Some(_) = &self.connect_payload.password {
312            flags.push(v5::ConnectFlags::PASSWORD)
313        }
314
315        let mut connect = v5::Connect {
316            protocol_name: "MQTT".to_string(),
317            protocol_version: self.protocol_version,
318            flags: v5::ConnectFlags::new(&flags),
319            keep_alive: self.connopts.keep_alive,
320            properties: self.connect_properties.clone(),
321            payload: self.connect_payload.clone(),
322        };
323        connect.normalize();
324
325        connect
326    }
327
328    fn is_will(&self) -> bool {
329        self.connect_payload.will_topic.is_some()
330            && self.connect_payload.will_properties.is_some()
331            && self.connect_payload.will_payload.is_some()
332    }
333}
334
335/// Maintanence methods
336impl Client {
337    /// Returns the socket address of the local half of this TCP connection.
338    pub fn local_addr(&self) -> io::Result<net::SocketAddr> {
339        self.cio.local_addr()
340    }
341
342    /// Returns the socket address of the remote peer of this TCP connection.
343    pub fn peer_addr(&self) -> io::Result<net::SocketAddr> {
344        self.cio.peer_addr()
345    }
346
347    /// Gets the value of the TCP_NODELAY option on this socket.
348    pub fn nodelay(&self) -> io::Result<bool> {
349        self.cio.nodelay()
350    }
351
352    /// Returns the read timeout of this socket.
353    pub fn read_timeout(&self) -> io::Result<Option<time::Duration>> {
354        self.cio.read_timeout()
355    }
356
357    /// Returns the write timeout of this socket.
358    pub fn write_timeout(&self) -> io::Result<Option<time::Duration>> {
359        self.cio.write_timeout()
360    }
361
362    /// Gets the value of the IP_TTL option for this socket.
363    pub fn ttl(&self) -> io::Result<u32> {
364        self.cio.ttl()
365    }
366}
367
368/// Keep alive and ping-pong.
369impl Client {
370    /// Return the server recommended keep_alive or configured keep_alive, in seconds.
371    /// If returned keep_alive is non-ZERO, application shall make sure that there
372    /// is MQTT activity within the time-period.
373    pub fn keep_alive(&self) -> u16 {
374        match &self.connack.properties {
375            Some(props) => match props.server_keep_alive {
376                Some(keep_alive) => keep_alive,
377                None => self.connopts.keep_alive,
378            },
379            None => self.connopts.keep_alive,
380        }
381    }
382
383    /// Return the duration since last server communication.
384    pub fn elapsed(&self) -> time::Duration {
385        self.last_rcvd.elapsed()
386    }
387
388    /// Return whether, if keep_alive non-ZERO, client's communication has exceeded 1.5
389    /// times the configured `keep_alive`.
390    pub fn expired(&self) -> bool {
391        match self.keep_alive() {
392            0 => false,
393            keep_alive => {
394                let keep_alive = time::Duration::from_secs(keep_alive as u64).as_micros();
395                keep_alive < self.last_sent.elapsed().as_micros()
396            }
397        }
398    }
399
400    /// Return whether this client's connection is a continuation of earlier session. In
401    /// which case brokers shall remember the topic subscriptions for this `ClientID`.
402    pub fn session_present(&self) -> bool {
403        self.connack.flags.unwrap().unwrap()
404    }
405
406    /// Send a PingReq to server.
407    pub fn ping(&mut self) -> io::Result<()> {
408        self.write(v5::Packet::PingReq)?;
409
410        match self.read()? {
411            v5::Packet::PingResp => Ok(()),
412            pkt => {
413                let msg = format!("expected PingResp, got {:?}", pkt.to_packet_type());
414                Err(io::Error::new(io::ErrorKind::InvalidData, msg))
415            }
416        }?;
417
418        Ok(())
419    }
420}
421
422/// IO methods
423impl Client {
424    /// Subscribe one or more filters with broker. Below is an example of how to
425    /// subscribe for a single filter.
426    ///
427    /// ```ignore
428    ///     let mut sub = v5::Subscribe::default();
429    ///
430    ///     let filter: TopicFilter = "#".into();
431    ///     let opt = {
432    ///         let fwdrule = RetainForwardRule::OnNewSubscribe;
433    ///         let retain_as_published = true;
434    ///         let no_local = true;
435    ///         let qos = QoS::AtMostOnce;
436    ///         v5::SubscriptionOpt::new(fwdrule, retain_as_published, no_local, qos)
437    ///     };
438    ///     sub.add_filter(filter, opt);
439    ///
440    ///     sub.set_subscription_id(0x1003);
441    ///
442    ///     let suback = client.subscribe(sub).expect("failed to subscribe");
443    /// ```
444    ///
445    /// Note that this call shall block until the subscription message is sent to the
446    /// remote and subscribe-ack is recieved with the same `packet_id`. [Client] shall
447    /// automatically choose a `packet_id` for this subscription.
448    pub fn subscribe(&mut self, mut sub: v5::Subscribe) -> io::Result<v5::SubAck> {
449        sub.packet_id = self.acquire_packet_id(false /*is_publish*/).ok().unwrap();
450        self.write(v5::Packet::Subscribe(sub.clone()))?;
451        self.cio_read_sub_ack(&sub)
452    }
453
454    /// Disconnect with remote. Client can read packets after this call, but typically
455    /// the connection is gone. But [Self::reconnect] should work.
456    ///
457    /// NOTE: Applications can also compose their own disconnect message via
458    /// [v5::Disconnect] and send it via [Self::write] method.
459    pub fn disconnect(&mut self, code: DisconnReasonCode) -> io::Result<()> {
460        let code = match ReasonCode::try_from(code as u8) {
461            Ok(val) => Ok(val),
462            Err(err) => Err(io::Error::new(io::ErrorKind::InvalidInput, err.to_string())),
463        }?;
464        let disconnect = v5::Disconnect { code, properties: None };
465        self.write(v5::Packet::Disconnect(disconnect))
466    }
467
468    /// Read a single packet from connection, block until a packet is available.
469    pub fn read(&mut self) -> io::Result<v5::Packet> {
470        match self.in_packets.pop_front() {
471            Some(packet) => Ok(packet),
472            None => {
473                self.cio_read()?;
474                Ok(self.in_packets.pop_front().unwrap())
475            }
476        }
477    }
478
479    /// Same as [Self::read], but does not block. Application must be prepared to
480    /// handle [io::ErrorKind::WouldBlock], and [io::ErrorKind::Interrupted]
481    pub fn read_noblock(&mut self) -> io::Result<v5::Packet> {
482        match self.in_packets.pop_front() {
483            Some(packet) => Ok(packet),
484            None => {
485                self.cio_read_noblock()?;
486                Ok(self.in_packets.pop_front().unwrap())
487            }
488        }
489    }
490
491    /// Write a single packet on the connectio, block until a packet is available.
492    pub fn write(&mut self, packet: v5::Packet) -> io::Result<()> {
493        self.try_read()?;
494
495        let mut cio = mem::replace(&mut self.cio, ClientIO::None);
496        let res = cio.write(self, packet);
497        let _none = mem::replace(&mut self.cio, cio);
498
499        self.last_sent = time::Instant::now();
500        res
501    }
502
503    /// Same as [Self::write], but does not block. Application must be prepared to
504    /// handle [io::ErrorKind::WouldBlock], and [io::ErrorKind::Interrupted].
505    ///
506    /// To finish writing previous write pass `packet` as None. Returns [Result::Ok]
507    /// only when write has finished.
508    pub fn write_noblock(&mut self, packet: Option<v5::Packet>) -> io::Result<()> {
509        self.try_read()?;
510
511        let mut cio = mem::replace(&mut self.cio, ClientIO::None);
512        let res = cio.write_noblock(self, packet);
513        let _none = mem::replace(&mut self.cio, cio);
514
515        self.last_sent = time::Instant::now();
516        res
517    }
518
519    fn try_read(&mut self) -> io::Result<()> {
520        match self.read_noblock() {
521            Ok(packet) => {
522                self.in_packets.push_back(packet);
523                Ok(())
524            }
525            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
526            Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
527            Err(err) => Err(err),
528        }
529    }
530
531    fn cio_read(&mut self) -> io::Result<()> {
532        let mut cio = mem::replace(&mut self.cio, ClientIO::None);
533        let res = cio.read(self);
534        let _none = mem::replace(&mut self.cio, cio);
535
536        self.in_packets.push_back(res?);
537
538        self.last_rcvd = time::Instant::now();
539        Ok(())
540    }
541
542    fn cio_read_noblock(&mut self) -> io::Result<()> {
543        let mut cio = mem::replace(&mut self.cio, ClientIO::None);
544        let res = cio.read_noblock(self);
545        let _none = mem::replace(&mut self.cio, cio);
546
547        self.in_packets.push_back(res?);
548
549        self.last_rcvd = time::Instant::now();
550        Ok(())
551    }
552
553    fn cio_read_sub_ack(&mut self, sub: &v5::Subscribe) -> io::Result<v5::SubAck> {
554        loop {
555            self.cio_read()?;
556            match self.in_packets.pop_front() {
557                Some(v5::Packet::SubAck(sa)) if sa.packet_id == sub.packet_id => {
558                    break Ok(sa);
559                }
560                Some(v5::Packet::SubAck(sa)) => warn!(
561                    "sub-ack mismatch in packet_id {} != {}",
562                    sa.packet_id, sub.packet_id
563                ),
564                Some(pkt) => self.in_packets.push_back(pkt),
565                None => unreachable!(),
566            }
567        }
568    }
569}
570
571impl Client {
572    /// Obtain the underlying [mio] socket to register with [mio::Poll]. This can be
573    /// used to create an async wrapper. Calling this method on blocking connection
574    /// will panic.
575    pub fn as_mut_mio_tcpstream(&mut self) -> &mut mio::net::TcpStream {
576        use ClientIO::*;
577
578        match &mut self.cio {
579            Blocking { .. } | BlockRd { .. } | BlockWt { .. } => {
580                panic!("cannot use mio on standard socket")
581            }
582            NoBlock { sock, .. } | NoBlockRd { sock, .. } | NoBlockWt { sock, .. } => {
583                sock
584            }
585            ClientIO::None => unreachable!(),
586        }
587    }
588
589    fn acquire_packet_id(&mut self, publish: bool) -> Result<PacketID> {
590        if publish {
591            match self.next_packet_ids.pop_front() {
592                Some(packet_id) => Ok(packet_id),
593                None => err!(ProtocolError, code: ExceededReceiveMaximum, ""),
594            }
595        } else {
596            Ok(0)
597        }
598    }
599
600    fn release_packet_id(&mut self, packet_id: PacketID) {
601        self.next_packet_ids.push_back(packet_id);
602    }
603}
604
605#[allow(dead_code)]
606enum ClientIO {
607    Blocking {
608        sock: net::TcpStream,
609        pktr: v5::MQTTRead,
610        pktw: v5::MQTTWrite,
611    },
612    NoBlock {
613        sock: mio::net::TcpStream,
614        pktr: v5::MQTTRead,
615        pktw: v5::MQTTWrite,
616    },
617    BlockRd {
618        sock: net::TcpStream,
619        pktr: v5::MQTTRead,
620    },
621    BlockWt {
622        sock: net::TcpStream,
623        pktw: v5::MQTTWrite,
624    },
625    NoBlockRd {
626        sock: mio::net::TcpStream,
627        pktr: v5::MQTTRead,
628    },
629    NoBlockWt {
630        sock: mio::net::TcpStream,
631        pktw: v5::MQTTWrite,
632    },
633    None,
634}
635
636impl fmt::Debug for ClientIO {
637    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
638        match self {
639            ClientIO::Blocking { .. } => write!(f, "ClientIO::Blocking"),
640            ClientIO::NoBlock { .. } => write!(f, "ClientIO::NoBlock"),
641            ClientIO::BlockRd { .. } => write!(f, "ClientIO::BlockRd"),
642            ClientIO::BlockWt { .. } => write!(f, "ClientIO::BlockWt"),
643            ClientIO::NoBlockRd { .. } => write!(f, "ClientIO::NoBlockRd"),
644            ClientIO::NoBlockWt { .. } => write!(f, "ClientIO::NoBlockWt"),
645            ClientIO::None { .. } => write!(f, "ClientIO::None"),
646        }
647    }
648}
649
650impl ClientIO {
651    fn handshake(
652        client: &mut Client,
653        connect: v5::Connect,
654        mut sock: net::TcpStream,
655        blocking: bool,
656    ) -> io::Result<(ClientIO, v5::ConnAck)> {
657        let max_packet_size = connect.max_packet_size(client.max_packet_size);
658
659        let mut pktr = v5::MQTTRead::new(max_packet_size);
660        let mut pktw = v5::MQTTWrite::new(&[], max_packet_size);
661
662        write_packet(
663            client,
664            &mut sock,
665            &mut pktw,
666            Some(v5::Packet::Connect(connect)),
667            true, // block
668        )?;
669
670        let (val, connack) = match read_packet(client, &mut sock, &mut pktr, true)? {
671            v5::Packet::ConnAck(connack) => (pktr, connack),
672            pkt => {
673                let msg = format!("unexpected in handshake {:?}", pkt.to_packet_type());
674                Err(io::Error::new(io::ErrorKind::InvalidData, msg))?
675            }
676        };
677        connack
678            .validate()
679            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
680
681        pktr = val;
682
683        let cio = match blocking {
684            true => ClientIO::Blocking { sock, pktr, pktw },
685            false => {
686                let sock = mio::net::TcpStream::from_std(sock);
687                ClientIO::NoBlock { sock, pktr, pktw }
688            }
689        };
690
691        Ok((cio, connack))
692    }
693
694    fn split_sock(self) -> io::Result<(ClientIO, ClientIO)> {
695        match self {
696            ClientIO::Blocking { sock: rd_sock, pktr, pktw } => {
697                let wt_sock = rd_sock.try_clone()?;
698                rd_sock.shutdown(net::Shutdown::Write)?;
699                wt_sock.shutdown(net::Shutdown::Read)?;
700                let rd = ClientIO::BlockRd { sock: rd_sock, pktr };
701                let wt = ClientIO::BlockWt { sock: wt_sock, pktw };
702                Ok((rd, wt))
703            }
704            ClientIO::NoBlock { sock, pktr, pktw } => {
705                #[cfg(windows)]
706                let rd_sock =
707                    unsafe { net::TcpStream::from_raw_socket(sock.into_raw_socket()) };
708                #[cfg(unix)]
709                let rd_sock = unsafe { net::TcpStream::from_raw_fd(sock.into_raw_fd()) };
710
711                let wt_sock = rd_sock.try_clone()?;
712                rd_sock.shutdown(net::Shutdown::Write)?;
713                wt_sock.shutdown(net::Shutdown::Read)?;
714                let rd = ClientIO::NoBlockRd {
715                    sock: mio::net::TcpStream::from_std(rd_sock),
716                    pktr,
717                };
718                let wt = ClientIO::NoBlockWt {
719                    sock: mio::net::TcpStream::from_std(wt_sock),
720                    pktw,
721                };
722                Ok((rd, wt))
723            }
724            _ => unreachable!(),
725        }
726    }
727
728    fn local_addr(&self) -> io::Result<net::SocketAddr> {
729        match self {
730            ClientIO::Blocking { sock, .. } => sock.local_addr(),
731            ClientIO::NoBlock { sock, .. } => sock.local_addr(),
732            ClientIO::BlockRd { sock, .. } => sock.local_addr(),
733            ClientIO::BlockWt { sock, .. } => sock.local_addr(),
734            ClientIO::NoBlockRd { sock, .. } => sock.local_addr(),
735            ClientIO::NoBlockWt { sock, .. } => sock.local_addr(),
736            ClientIO::None => unreachable!(),
737        }
738    }
739
740    fn peer_addr(&self) -> io::Result<net::SocketAddr> {
741        match self {
742            ClientIO::Blocking { sock, .. } => sock.peer_addr(),
743            ClientIO::NoBlock { sock, .. } => sock.peer_addr(),
744            ClientIO::BlockRd { sock, .. } => sock.peer_addr(),
745            ClientIO::BlockWt { sock, .. } => sock.peer_addr(),
746            ClientIO::NoBlockRd { sock, .. } => sock.peer_addr(),
747            ClientIO::NoBlockWt { sock, .. } => sock.peer_addr(),
748            ClientIO::None => unreachable!(),
749        }
750    }
751
752    fn nodelay(&self) -> io::Result<bool> {
753        match self {
754            ClientIO::Blocking { sock, .. } => sock.nodelay(),
755            ClientIO::NoBlock { sock, .. } => sock.nodelay(),
756            ClientIO::BlockRd { sock, .. } => sock.nodelay(),
757            ClientIO::BlockWt { sock, .. } => sock.nodelay(),
758            ClientIO::NoBlockRd { sock, .. } => sock.nodelay(),
759            ClientIO::NoBlockWt { sock, .. } => sock.nodelay(),
760            ClientIO::None => unreachable!(),
761        }
762    }
763
764    fn read_timeout(&self) -> io::Result<Option<time::Duration>> {
765        match self {
766            ClientIO::Blocking { sock, .. } => sock.read_timeout(),
767            ClientIO::NoBlock { .. } => Ok(None),
768            ClientIO::BlockRd { sock, .. } => sock.read_timeout(),
769            ClientIO::BlockWt { sock, .. } => sock.read_timeout(),
770            ClientIO::NoBlockRd { .. } => Ok(None),
771            ClientIO::NoBlockWt { .. } => Ok(None),
772            ClientIO::None => unreachable!(),
773        }
774    }
775
776    fn write_timeout(&self) -> io::Result<Option<time::Duration>> {
777        match self {
778            ClientIO::Blocking { sock, .. } => sock.write_timeout(),
779            ClientIO::NoBlock { .. } => Ok(None),
780            ClientIO::BlockRd { sock, .. } => sock.write_timeout(),
781            ClientIO::BlockWt { sock, .. } => sock.write_timeout(),
782            ClientIO::NoBlockRd { .. } => Ok(None),
783            ClientIO::NoBlockWt { .. } => Ok(None),
784            ClientIO::None => unreachable!(),
785        }
786    }
787
788    fn ttl(&self) -> io::Result<u32> {
789        match self {
790            ClientIO::Blocking { sock, .. } => sock.ttl(),
791            ClientIO::NoBlock { sock, .. } => sock.ttl(),
792            ClientIO::BlockRd { sock, .. } => sock.ttl(),
793            ClientIO::BlockWt { sock, .. } => sock.ttl(),
794            ClientIO::NoBlockRd { sock, .. } => sock.ttl(),
795            ClientIO::NoBlockWt { sock, .. } => sock.ttl(),
796            ClientIO::None => unreachable!(),
797        }
798    }
799
800    fn is_blocking(&self) -> bool {
801        use ClientIO::*;
802
803        match self {
804            Blocking { .. } | BlockRd { .. } | BlockWt { .. } => true,
805            NoBlock { .. } | NoBlockRd { .. } | NoBlockWt { .. } => false,
806            None => unreachable!(),
807        }
808    }
809}
810
811impl ClientIO {
812    fn read(&mut self, client: &mut Client) -> io::Result<v5::Packet> {
813        match self {
814            ClientIO::Blocking { sock, pktr, .. } => {
815                read_packet(client, sock, pktr, true)
816            }
817            ClientIO::NoBlock { sock, pktr, .. } => {
818                read_packet(client, sock, pktr, false)
819            }
820            ClientIO::BlockRd { sock, pktr, .. } => {
821                //
822                read_packet(client, sock, pktr, true)
823            }
824            ClientIO::NoBlockRd { sock, pktr, .. } => {
825                read_packet(client, sock, pktr, false)
826            }
827            ClientIO::None => {
828                let s = format!("disconnected while Client::read");
829                Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
830            }
831            _ => unreachable!(),
832        }
833    }
834
835    fn read_noblock(&mut self, client: &mut Client) -> io::Result<v5::Packet> {
836        match self {
837            ClientIO::Blocking { sock, pktr, .. } => {
838                sock.set_nonblocking(true)?;
839                let res = read_packet(client, sock, pktr, false);
840                sock.set_nonblocking(false)?;
841                res
842            }
843            ClientIO::NoBlock { sock, pktr, .. } => {
844                read_packet(client, sock, pktr, false)
845            }
846            ClientIO::BlockRd { sock, pktr, .. } => {
847                sock.set_nonblocking(true)?;
848                let res = read_packet(client, sock, pktr, false);
849                sock.set_nonblocking(false)?;
850                res
851            }
852            ClientIO::NoBlockRd { sock, pktr, .. } => {
853                read_packet(client, sock, pktr, false)
854            }
855            ClientIO::None => {
856                let s = format!("disconnected while Client::read_noblock");
857                Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
858            }
859            _ => unreachable!(),
860        }
861    }
862
863    fn write(&mut self, client: &mut Client, pkt: v5::Packet) -> io::Result<()> {
864        match self {
865            ClientIO::Blocking { sock, pktw, .. } => {
866                write_packet(client, sock, pktw, Some(pkt), true)
867            }
868            ClientIO::NoBlock { sock, pktw, .. } => {
869                write_packet(client, sock, pktw, Some(pkt), true)
870            }
871            ClientIO::BlockWt { sock, pktw, .. } => {
872                write_packet(client, sock, pktw, Some(pkt), true)
873            }
874            ClientIO::NoBlockWt { sock, pktw, .. } => {
875                write_packet(client, sock, pktw, Some(pkt), true)
876            }
877            ClientIO::None => {
878                let s = format!("disconnected while Client::write");
879                Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
880            }
881            cio => unreachable!("{:?}", cio),
882        }
883    }
884
885    fn write_noblock(
886        &mut self,
887        client: &mut Client,
888        pkt: Option<v5::Packet>,
889    ) -> io::Result<()> {
890        match self {
891            ClientIO::Blocking { sock, pktw, .. } => {
892                write_packet(client, sock, pktw, pkt, false)
893            }
894            ClientIO::NoBlock { sock, pktw, .. } => {
895                write_packet(client, sock, pktw, pkt, false)
896            }
897            ClientIO::BlockWt { sock, pktw, .. } => {
898                write_packet(client, sock, pktw, pkt, false)
899            }
900            ClientIO::NoBlockWt { sock, pktw, .. } => {
901                write_packet(client, sock, pktw, pkt, false)
902            }
903            ClientIO::None => {
904                let s = format!("disconnected while Client::write_noblock");
905                Err(io::Error::new(io::ErrorKind::ConnectionReset, s))
906            }
907            cio => unreachable!("{:?}", cio),
908        }
909    }
910}
911
912impl Client {
913    fn read_elapsed(&self) -> bool {
914        match &self.rd_deadline {
915            Some(deadline) if &time::Instant::now() > deadline => true,
916            Some(_) | None => false,
917        }
918    }
919
920    fn set_read_timeout(&mut self, timeout: Option<time::Duration>) {
921        match timeout {
922            Some(timeout) => {
923                let now = time::Instant::now();
924                self.rd_deadline = Some(now.checked_add(timeout).unwrap());
925            }
926            None => {
927                self.rd_deadline = None;
928            }
929        }
930    }
931
932    fn write_elapsed(&self) -> bool {
933        match &self.wt_deadline {
934            Some(deadline) if &time::Instant::now() > deadline => true,
935            Some(_) | None => false,
936        }
937    }
938
939    fn set_write_timeout(&mut self, timeout: Option<time::Duration>) {
940        match timeout {
941            Some(timeout) => {
942                let now = time::Instant::now();
943                self.rd_deadline = Some(now.checked_add(timeout).unwrap());
944            }
945            None => {
946                self.rd_deadline = None;
947            }
948        }
949    }
950}
951
952fn read_packet<R>(
953    client: &mut Client,
954    sock: &mut R,
955    pktr: &mut v5::MQTTRead,
956    block: bool,
957) -> io::Result<v5::Packet>
958where
959    R: io::Read,
960{
961    use crate::v5::MQTTRead::{Fin, Header, Init, Remain};
962
963    client.set_read_timeout(client.read_timeout);
964
965    let mut pr = mem::replace(pktr, v5::MQTTRead::default());
966    let max_packet_size = pr.to_max_packet_size();
967
968    let res = loop {
969        pr = match pr.read(sock) {
970            Ok((val, true)) if !block => {
971                pr = val;
972                break Err(io::Error::new(io::ErrorKind::WouldBlock, ""));
973            }
974            Ok((val, _)) => val,
975            Err(err) if err.kind() == ErrorKind::MalformedPacket => {
976                pr = v5::MQTTRead::new(max_packet_size);
977                let s = format!("malformed packet from v5::MQTTRead");
978                break Err(io::Error::new(io::ErrorKind::InvalidData, s));
979            }
980            Err(err) if err.kind() == ErrorKind::ProtocolError => {
981                pr = v5::MQTTRead::new(max_packet_size);
982                let s = format!("protocol error from v5::MQTTRead");
983                break Err(io::Error::new(io::ErrorKind::InvalidData, s));
984            }
985            Err(err) if err.kind() == ErrorKind::Disconnected => {
986                pr = v5::MQTTRead::new(max_packet_size);
987                let s = format!("client disconnected in v5::MQTTRead");
988                break Err(io::Error::new(io::ErrorKind::ConnectionReset, s));
989            }
990            Err(err) => unreachable!("unexpected error {}", err),
991        };
992
993        match &pr {
994            Init { .. } | Header { .. } | Remain { .. } if !client.read_elapsed() => {
995                trace!("read retrying");
996            }
997            Init { .. } | Header { .. } | Remain { .. } => {
998                let s = format!("disconnect, read timesout {:?}", client.read_timeout);
999                break Err(io::Error::new(io::ErrorKind::TimedOut, s));
1000            }
1001            Fin { .. } => {
1002                client.set_read_timeout(None);
1003                match pr.parse() {
1004                    Ok(pkt) => {
1005                        pr = pr.reset();
1006                        break Ok(pkt);
1007                    }
1008                    Err(err) => {
1009                        pr = pr.reset();
1010                        let s = err.to_string();
1011                        break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1012                    }
1013                }
1014            }
1015            v5::MQTTRead::None => unreachable!(),
1016        };
1017    };
1018
1019    let _none = mem::replace(pktr, pr);
1020    res
1021}
1022
1023fn write_packet<W>(
1024    client: &mut Client,
1025    sock: &mut W,
1026    pktw: &mut v5::MQTTWrite,
1027    pkt: Option<v5::Packet>,
1028    block: bool,
1029) -> io::Result<()>
1030where
1031    W: io::Write,
1032{
1033    use crate::v5::MQTTWrite::{Fin, Init, Remain};
1034
1035    client.set_write_timeout(client.write_timeout);
1036
1037    let mut pw = mem::replace(pktw, v5::MQTTWrite::default());
1038    let max_packet_size = pw.to_max_packet_size();
1039
1040    if let Some(pkt) = pkt {
1041        match pkt.encode() {
1042            Ok(blob) => {
1043                pw = pw.reset(blob.as_ref());
1044            }
1045            Err(err) => {
1046                pw = mem::replace(pktw, pw);
1047                Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
1048            }
1049        }
1050    }
1051
1052    let res = loop {
1053        pw = match pw.write(sock) {
1054            Ok((val, true)) if !block => {
1055                pw = val;
1056                break Err(io::Error::new(io::ErrorKind::WouldBlock, ""));
1057            }
1058            Ok((val, _)) => val,
1059            Err(err) if err.kind() == ErrorKind::MalformedPacket => {
1060                pw = v5::MQTTWrite::new(&[], max_packet_size);
1061                let s = format!("malformed packet in v5::MQTTWrite");
1062                break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1063            }
1064            Err(err) if err.kind() == ErrorKind::ProtocolError => {
1065                pw = v5::MQTTWrite::new(&[], max_packet_size);
1066                let s = format!("protocol error in v5::MQTTWrite");
1067                break Err(io::Error::new(io::ErrorKind::InvalidData, s));
1068            }
1069            Err(err) if err.kind() == ErrorKind::Disconnected => {
1070                pw = v5::MQTTWrite::new(&[], max_packet_size);
1071                let s = format!("client disconnected in v5::MQTTWrite");
1072                break Err(io::Error::new(io::ErrorKind::ConnectionReset, s));
1073            }
1074            Err(err) => unreachable!("unexpected error {}", err),
1075        };
1076
1077        match &pw {
1078            Init { .. } | Remain { .. } if !client.write_elapsed() => {
1079                trace!("write retrying");
1080            }
1081            Init { .. } | Remain { .. } => {
1082                let s = format!("packet write fail after {:?}", client.write_timeout);
1083                break Err(io::Error::new(io::ErrorKind::TimedOut, s));
1084            }
1085            Fin { .. } => {
1086                client.set_write_timeout(None);
1087                break Ok(());
1088            }
1089            v5::MQTTWrite::None => unreachable!(),
1090        };
1091    };
1092
1093    let _none = mem::replace(pktw, pw);
1094    res
1095}
1096
1097const PP: &'static str = "Client::Disconnect";
1098
1099#[cfg_attr(any(feature = "fuzzy", test), derive(Arbitrary))]
1100#[derive(Clone, Copy, Eq, PartialEq, Debug)]
1101pub enum DisconnReasonCode {
1102    NormalDisconnect = 0x00,
1103    DisconnectWillMessage = 0x04,
1104    UnspecifiedError = 0x80,
1105    MalformedPacket = 0x81,
1106    ProtocolError = 0x82,
1107    ImplementationError = 0x83,
1108    TopicNameInvalid = 0x90,
1109    ExceededReceiveMaximum = 0x93,
1110    TopicAliasInvalid = 0x94,
1111    PacketTooLarge = 0x95,
1112    ExceedMessageRate = 0x96,
1113    QuotaExceeded = 0x97,
1114    AdminAction = 0x98,
1115    PayloadFormatInvalid = 0x99,
1116}
1117
1118impl From<DisconnReasonCode> for u8 {
1119    fn from(val: DisconnReasonCode) -> u8 {
1120        use DisconnReasonCode::*;
1121
1122        match val {
1123            NormalDisconnect => 0x00,
1124            DisconnectWillMessage => 0x04,
1125            UnspecifiedError => 0x80,
1126            MalformedPacket => 0x81,
1127            ProtocolError => 0x82,
1128            ImplementationError => 0x83,
1129            TopicNameInvalid => 0x90,
1130            ExceededReceiveMaximum => 0x93,
1131            TopicAliasInvalid => 0x94,
1132            PacketTooLarge => 0x95,
1133            ExceedMessageRate => 0x96,
1134            QuotaExceeded => 0x97,
1135            AdminAction => 0x98,
1136            PayloadFormatInvalid => 0x99,
1137        }
1138    }
1139}
1140
1141impl TryFrom<u8> for DisconnReasonCode {
1142    type Error = Error;
1143
1144    fn try_from(val: u8) -> Result<DisconnReasonCode> {
1145        match val {
1146            0x00 => Ok(DisconnReasonCode::NormalDisconnect),
1147            0x04 => Ok(DisconnReasonCode::DisconnectWillMessage),
1148            0x80 => Ok(DisconnReasonCode::UnspecifiedError),
1149            0x81 => Ok(DisconnReasonCode::MalformedPacket),
1150            0x82 => Ok(DisconnReasonCode::ProtocolError),
1151            0x83 => Ok(DisconnReasonCode::ImplementationError),
1152            0x90 => Ok(DisconnReasonCode::TopicNameInvalid),
1153            0x93 => Ok(DisconnReasonCode::ExceededReceiveMaximum),
1154            0x94 => Ok(DisconnReasonCode::TopicAliasInvalid),
1155            0x95 => Ok(DisconnReasonCode::PacketTooLarge),
1156            0x96 => Ok(DisconnReasonCode::ExceedMessageRate),
1157            0x97 => Ok(DisconnReasonCode::QuotaExceeded),
1158            0x98 => Ok(DisconnReasonCode::AdminAction),
1159            0x99 => Ok(DisconnReasonCode::PayloadFormatInvalid),
1160            val => {
1161                err!(
1162                    MalformedPacket,
1163                    code: MalformedPacket,
1164                    " {} reason-code {}",
1165                    PP,
1166                    val
1167                )
1168            }
1169        }
1170    }
1171}