mymq/v5/
protocol.rs

1use log::{error, info, trace};
2use mio::net::TcpStream;
3
4use std::{io, mem, net, thread, time};
5
6use crate::v5::{self, Config};
7use crate::{Blob, ClientID, PacketID, Packetize, QPacket, QoS, QueueStatus, SLEEP_10MS};
8use crate::{Error, ErrorKind, ReasonCode, Result};
9
10pub type QueuePkt = QueueStatus<QPacket>;
11
12/// Type implement Protocol bridge between MQTT-v5 and broker.
13#[derive(Clone, Eq, PartialEq)]
14pub struct Protocol {
15    client_id: ClientID,
16    shard_id: u32,
17    raddr: net::SocketAddr,
18    config: Config,
19    connect: v5::Connect,
20}
21
22impl From<Config> for Protocol {
23    fn from(config: Config) -> Protocol {
24        Protocol {
25            client_id: ClientID::default(),
26            shard_id: u32::default(),
27            raddr: "0.0.0.0:0".parse().unwrap(),
28            config,
29            connect: v5::Connect::default(),
30        }
31    }
32}
33
34impl Protocol {
35    pub fn is_listen(&self) -> bool {
36        self.config.mqtt_listener
37    }
38
39    pub fn to_listen_address(&self) -> net::SocketAddr {
40        let port = self.config.mqtt_port;
41        net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::new(0, 0, 0, 0)), port)
42    }
43
44    pub fn to_listen_port(&self) -> u16 {
45        self.config.mqtt_port
46    }
47
48    #[inline]
49    pub fn maximum_qos(&self) -> QoS {
50        QoS::try_from(self.config.mqtt_maximum_qos).unwrap()
51    }
52
53    #[inline]
54    pub fn retain_available(&self) -> bool {
55        self.config.mqtt_retain_available
56    }
57
58    #[inline]
59    pub fn max_packet_size(&self) -> u32 {
60        self.config.mqtt_max_packet_size
61    }
62
63    #[inline]
64    pub fn keep_alive(&self) -> Option<u16> {
65        self.config.keep_alive()
66    }
67
68    #[inline]
69    pub fn keep_alive_factor(&self) -> f32 {
70        self.config.keep_alive_factor()
71    }
72
73    #[inline]
74    pub fn topic_alias_max(&self) -> Option<u16> {
75        self.config.topic_alias_max()
76    }
77}
78
79impl Protocol {
80    pub fn handshake(&self, prefix: &str, mut conn: TcpStream) -> Result<Socket> {
81        use crate::v5::MQTTRead;
82
83        let (raddr, laddr) = (conn.peer_addr().unwrap(), conn.local_addr().unwrap());
84        info!("{} raddr:{} laddr:{} new connection", prefix, raddr, laddr);
85
86        let deadline = {
87            let timeout = u64::from(self.config.mqtt_connect_timeout / 2);
88            time::Instant::now() + time::Duration::from_secs(timeout)
89        };
90
91        let mut packetr = MQTTRead::new(self.config.mqtt_max_packet_size);
92        loop {
93            packetr = match packetr.read(&mut conn) {
94                Ok((packetr, _would_block)) => packetr,
95                Err(err) if err.kind() == ErrorKind::MalformedPacket => {
96                    error!("{}, fail read, err:{}", prefix, err);
97                    self.send_connack(prefix, err.code(), conn)?;
98                    break Err(err);
99                }
100                Err(err) if err.kind() == ErrorKind::ProtocolError => {
101                    error!("{}, fail read, err:{}", prefix, err);
102                    self.send_connack(prefix, err.code(), conn)?;
103                    break Err(err);
104                }
105                Err(err) => unreachable!("unexpected error {}", err),
106            };
107
108            match &packetr {
109                MQTTRead::Init { .. } if time::Instant::now() < deadline => {
110                    thread::sleep(SLEEP_10MS);
111                    continue;
112                }
113                MQTTRead::Header { .. } if time::Instant::now() < deadline => {
114                    thread::sleep(SLEEP_10MS);
115                    continue;
116                }
117                MQTTRead::Remain { .. } if time::Instant::now() < deadline => {
118                    thread::sleep(SLEEP_10MS);
119                    continue;
120                }
121                MQTTRead::Init { .. }
122                | MQTTRead::Header { .. }
123                | MQTTRead::Remain { .. } => {
124                    let code = ReasonCode::UnspecifiedError;
125                    self.send_connack(prefix, code, conn)?;
126
127                    break err!(
128                        InvalidInput,
129                        code: UnspecifiedError,
130                        "{} deadline:{:?} fail handshake connect rx",
131                        prefix,
132                        deadline
133                    );
134                }
135                MQTTRead::Fin { .. } => (),
136                MQTTRead::None => unreachable!(),
137            }
138
139            match packetr.parse() {
140                Ok(v5::Packet::Connect(connect)) => {
141                    if let Err(err) = connect.validate() {
142                        error!("{}, invalid connect-packet err:{}", prefix, err);
143                        self.send_connack(prefix, err.code(), conn)?;
144                        break Err(err);
145                    } else {
146                        break self.new_socket(conn, connect);
147                    }
148                }
149                Ok(pkt) => {
150                    let code = ReasonCode::ProtocolError;
151                    self.send_connack(prefix, code, conn)?;
152
153                    break err!(
154                        ProtocolError,
155                        code: ProtocolError,
156                        "{} packet:{} unexpected in connection",
157                        prefix,
158                        pkt
159                    );
160                }
161                Err(err) => {
162                    error!("{}, invalid packet parse err:{}", prefix, err);
163                    self.send_connack(prefix, err.code(), conn)?;
164                    break Err(err);
165                }
166            }
167        }
168    }
169
170    fn send_connack(&self, pr: &str, rc: ReasonCode, mut conn: TcpStream) -> Result<()> {
171        use crate::v5::ConnAckReasonCode;
172
173        let raddr = conn.peer_addr().unwrap();
174        let max_size = self.config.mqtt_max_packet_size;
175
176        let deadline = {
177            let timeout = u64::from(self.config.mqtt_connect_timeout / 2);
178            time::Instant::now() + time::Duration::from_secs(timeout)
179        };
180
181        let mut packetw = {
182            let code = ConnAckReasonCode::try_from(rc).unwrap();
183            let cack = v5::ConnAck::from_reason_code(code);
184            v5::MQTTWrite::new(cack.encode().unwrap().as_ref(), max_size)
185        };
186
187        loop {
188            let (val, would_block) = match packetw.write(&mut conn) {
189                Ok((packetw, would_block)) => (packetw, would_block),
190                Err(err) => {
191                    error!("{} problem writing connack packet err:{}", pr, err);
192                    break Err(err);
193                }
194            };
195            packetw = val;
196
197            if would_block && time::Instant::now() < deadline {
198                thread::sleep(SLEEP_10MS);
199            } else if would_block {
200                break err!(
201                    Disconnected,
202                    desc: "{} deadline:{:?} failed handshake connack tx",
203                    pr, deadline
204                );
205            } else {
206                info!("{} raddr:{} CONNACK", pr, raddr);
207                break Ok(());
208            }
209        }
210    }
211
212    fn new_socket(&self, conn: mio::net::TcpStream, cpkt: v5::Connect) -> Result<Socket> {
213        let socket = Socket {
214            client_id: ClientID::from(&cpkt),
215            shard_id: 0,
216            config: self.config.clone(),
217            conn,
218            connect: cpkt,
219            token: mio::Token(0),
220            rd: Source::default(),
221            wt: Sink::default(),
222        };
223
224        Ok(socket)
225    }
226}
227
228impl Protocol {
229    pub fn new_ping_resp(&self, _ping_req: v5::Packet) -> QPacket {
230        QPacket::V5(v5::Packet::PingResp)
231    }
232
233    pub fn new_pub_ack(&self, packet_id: PacketID) -> QPacket {
234        QPacket::V5(v5::Packet::PubAck(v5::Pub::new_pub_ack(packet_id)))
235    }
236
237    pub fn new_sub_ack(&self, sub: &v5::Packet, rcodes: Vec<ReasonCode>) -> QPacket {
238        match sub {
239            v5::Packet::Subscribe(sub) => {
240                let suback = v5::SubAck::from_sub(sub, rcodes);
241                QPacket::V5(v5::Packet::SubAck(suback))
242            }
243            pkt => unreachable!("{}", pkt),
244        }
245    }
246
247    pub fn new_unsub_ack(&self, unsub: &v5::Packet, rcodes: Vec<ReasonCode>) -> QPacket {
248        match unsub {
249            v5::Packet::UnSubscribe(unsub) => {
250                let unsuback = v5::UnsubAck::from_unsub(&unsub, rcodes);
251                QPacket::V5(v5::Packet::UnsubAck(unsuback))
252            }
253            pkt => unreachable!("{}", pkt),
254        }
255    }
256}
257
258/// Type implement the socket connection for MQTT-v5 and broker.
259pub struct Socket {
260    client_id: ClientID,
261    shard_id: u32,
262    config: Config,
263    conn: mio::net::TcpStream,
264    connect: v5::Connect,
265    token: mio::Token,
266    rd: Source,
267    wt: Sink,
268}
269
270#[derive(Default)]
271struct Source {
272    pr: v5::MQTTRead,
273    timeout: time::Duration,
274    deadline: Option<time::SystemTime>,
275}
276
277#[derive(Default)]
278struct Sink {
279    pw: v5::MQTTWrite,
280    timeout: time::Duration,
281    deadline: Option<time::SystemTime>,
282}
283
284impl mio::event::Source for Socket {
285    fn register(
286        &mut self,
287        registry: &mio::Registry,
288        token: mio::Token,
289        interests: mio::Interest,
290    ) -> io::Result<()> {
291        self.conn.register(registry, token, interests)
292    }
293
294    fn reregister(
295        &mut self,
296        registry: &mio::Registry,
297        token: mio::Token,
298        interests: mio::Interest,
299    ) -> io::Result<()> {
300        self.conn.reregister(registry, token, interests)
301    }
302
303    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
304        self.conn.deregister(registry)
305    }
306}
307
308impl Socket {
309    #[inline]
310    pub fn set_mio_token(&mut self, token: mio::Token) {
311        self.token = token;
312    }
313
314    #[inline]
315    pub fn set_shard_id(&mut self, shard_id: u32) {
316        self.shard_id = shard_id;
317    }
318}
319
320impl Socket {
321    #[inline]
322    pub fn peer_addr(&self) -> net::SocketAddr {
323        self.conn.peer_addr().unwrap()
324    }
325
326    #[inline]
327    pub fn as_client_id(&self) -> &ClientID {
328        &self.client_id
329    }
330
331    #[inline]
332    pub fn to_mio_token(&self) -> mio::Token {
333        self.token
334    }
335
336    #[inline]
337    pub fn to_protocol(&self) -> Protocol {
338        Protocol {
339            client_id: self.client_id.clone(),
340            shard_id: self.shard_id,
341            raddr: self.conn.peer_addr().unwrap(),
342            config: self.config.clone(),
343            connect: self.connect.clone(),
344        }
345    }
346
347    #[inline]
348    pub fn client_keep_alive(&self) -> u16 {
349        self.connect.keep_alive
350    }
351
352    #[inline]
353    pub fn client_receive_maximum(&self) -> u16 {
354        self.connect.receive_maximum()
355    }
356
357    #[inline]
358    pub fn client_session_expiry_interval(&self) -> Option<u32> {
359        self.connect.session_expiry_interval()
360    }
361
362    #[inline]
363    pub fn is_clean_start(&self) -> bool {
364        self.connect.flags.is_clean_start()
365    }
366}
367
368impl Socket {
369    // return a single packet, if fully received.
370    // MalformedPacket, implies a DISCONNECT and socket close
371    // ProtocolError, implies DISCONNECT and socket close
372    pub fn read_packet(&mut self, prefix: &str) -> Result<QueuePkt> {
373        use crate::v5::MQTTRead::{Fin, Header, Init, Remain};
374
375        let disconnected = QueueStatus::<QPacket>::Disconnected(Vec::new());
376
377        let pr = mem::replace(&mut self.rd.pr, v5::MQTTRead::default());
378        let mut pr = match pr.read(&mut self.conn) {
379            Ok((pr, _would_block)) => pr,
380            Err(err) if err.kind() == ErrorKind::Disconnected => return Ok(disconnected),
381            Err(err) => return Err(err),
382        };
383
384        let status = match &pr {
385            Init { .. } | Header { .. } | Remain { .. } if !self.read_elapsed() => {
386                trace!("{} read retrying", prefix);
387                self.set_read_timeout(true, self.config.mqtt_sock_read_timeout);
388                QueueStatus::Block(Vec::new())
389            }
390            Init { .. } | Header { .. } | Remain { .. } => {
391                error!("{} rd_timeout:{:?} disconnecting", prefix, self.rd.timeout);
392                self.set_read_timeout(false, self.config.mqtt_sock_read_timeout);
393                QueueStatus::Disconnected(Vec::new())
394            }
395            Fin { .. } => {
396                self.set_read_timeout(false, self.config.mqtt_sock_read_timeout);
397                let pkt = pr.parse()?;
398                pr = pr.reset();
399                QueueStatus::Ok(vec![pkt.into()])
400            }
401            v5::MQTTRead::None => unreachable!(),
402        };
403
404        let _none = mem::replace(&mut self.rd.pr, pr);
405        Ok(status)
406    }
407}
408
409impl Socket {
410    // QueueStatus shall not carry any packets
411    pub fn write_packet(&mut self, prefix: &str, blob: Option<Blob>) -> QueuePkt {
412        use crate::v5::MQTTWrite::{Fin, Init, Remain};
413        use std::io::Write;
414
415        let mut pw = match (blob, &self.wt.pw) {
416            (Some(blob), Fin { .. }) => {
417                if let Err(err) = self.conn.flush() {
418                    error!("{} fail conn.flush() err:{}", prefix, err);
419                    return QueueStatus::Disconnected(Vec::new());
420                }
421
422                let pw = mem::replace(&mut self.wt.pw, v5::MQTTWrite::default());
423                pw.reset(blob.as_ref())
424            }
425            (Some(_blob), _) => unreachable!(),
426            _ => mem::replace(&mut self.wt.pw, v5::MQTTWrite::default()),
427        };
428
429        let write_timeout = self.config.mqtt_sock_write_timeout;
430        let timeout = self.wt.timeout;
431
432        let (res, pw) = loop {
433            pw = match pw.write(&mut self.conn) {
434                Ok((pw, _would_block)) => match &pw {
435                    Init { .. } | Remain { .. } if !self.write_elapsed() => {
436                        trace!("{} write retrying", prefix);
437                        self.set_write_timeout(true, write_timeout);
438                        pw
439                        // TODO: thread yield ?
440                    }
441                    Init { .. } | Remain { .. } => {
442                        self.set_write_timeout(false, write_timeout);
443                        error!("{} wt_timeout:{:?} disconnecting..", prefix, timeout);
444                        break (QueueStatus::Disconnected(Vec::new()), pw);
445                    }
446                    Fin { .. } => {
447                        self.set_write_timeout(false, write_timeout);
448                        break (QueueStatus::Ok(Vec::new()), pw);
449                    }
450                    v5::MQTTWrite::None => unreachable!(),
451                },
452                Err(err) if err.kind() == ErrorKind::Disconnected => {
453                    let val = v5::MQTTWrite::default();
454                    break (QueueStatus::Disconnected(Vec::new()), val);
455                }
456                Err(err) => unreachable!("unexpected error: {}", err),
457            }
458        };
459
460        let _none = mem::replace(&mut self.wt.pw, pw);
461        res
462    }
463
464    pub fn disconnect(&mut self, prefix: &str, code: ReasonCode) {
465        let blob = {
466            let disconn = v5::Disconnect { code, properties: None };
467            disconn.encode().ok()
468        };
469        self.write_packet(prefix, blob);
470    }
471
472    pub fn new_conn_ack(&self, rcode: ReasonCode) -> QPacket {
473        let val = self.connect.session_expiry_interval();
474        let sei = match (self.config.mqtt_session_expiry_interval, val) {
475            (Some(_one), Some(two)) => Some(two),
476            (Some(one), None) => Some(one),
477            (None, Some(two)) => Some(two),
478            (None, None) => None,
479        };
480
481        let mut props = v5::ConnAckProperties {
482            session_expiry_interval: sei,
483            receive_maximum: Some(self.config.mqtt_receive_maximum),
484            maximum_qos: Some(self.config.mqtt_maximum_qos.try_into().unwrap()),
485            retain_available: Some(self.config.mqtt_retain_available),
486            max_packet_size: Some(self.config.mqtt_max_packet_size),
487            assigned_client_identifier: None,
488            wildcard_subscription_available: Some(true),
489            subscription_identifiers_available: Some(true),
490            shared_subscription_available: None,
491            topic_alias_max: self.config.topic_alias_max(),
492            ..v5::ConnAckProperties::default()
493        };
494
495        if self.connect.payload.client_id.len() == 0 {
496            props.assigned_client_identifier = Some((*self.client_id).clone());
497        }
498
499        if let Some(keep_alive) = self.config.keep_alive() {
500            props.server_keep_alive = Some(keep_alive)
501        }
502
503        let connack = match rcode {
504            ReasonCode::Success => v5::ConnAck::new_success(Some(props)),
505            _ => unreachable!(),
506        };
507
508        QPacket::V5(v5::Packet::ConnAck(connack))
509    }
510}
511
512impl Socket {
513    fn read_elapsed(&self) -> bool {
514        let now = time::SystemTime::now();
515        match &self.rd.deadline {
516            Some(deadline) if &now > deadline => true,
517            Some(_) | None => false,
518        }
519    }
520
521    fn set_read_timeout(&mut self, retry: bool, timeout: Option<u32>) {
522        if let Some(timeout) = timeout {
523            if retry && self.rd.deadline.is_none() {
524                let now = time::SystemTime::now();
525                self.rd.deadline = Some(now + time::Duration::from_secs(timeout as u64));
526            } else if retry == false {
527                self.rd.deadline = None;
528            }
529        }
530    }
531
532    fn write_elapsed(&self) -> bool {
533        let now = time::SystemTime::now();
534        match &self.wt.deadline {
535            Some(deadline) if &now > deadline => true,
536            Some(_) | None => false,
537        }
538    }
539
540    fn set_write_timeout(&mut self, retry: bool, timeout: Option<u32>) {
541        if let Some(timeout) = timeout {
542            if retry && self.wt.deadline.is_none() {
543                let now = time::SystemTime::now();
544                self.wt.deadline = Some(now + time::Duration::from_secs(timeout as u64));
545            } else if retry == false {
546                self.wt.deadline = None;
547            }
548        }
549    }
550}