minimq 0.11.0

An MQTT5 client
Documentation
use core::convert::Infallible;

use crate::de::received_packet::ReceivedPacket;
use crate::{
    Error, InboundPublish, PeerError, ProtocolError, QoS, ReasonCode, ResourceError, debug, info,
    trace, warn,
};

use super::super::outbound::{ControlAction, check_control_packet_size, check_pubrel_size};
use super::state::{RuntimeState, SessionData};
use super::{Io, Session};

impl<'a> SessionData<'a> {
    pub(super) fn handle_packet(
        &mut self,
        runtime: &mut RuntimeState,
        packet: ReceivedPacket<'_>,
    ) -> Result<bool, Error<Infallible>> {
        match packet {
            ReceivedPacket::ConnAck(_) => return Err(ProtocolError::UnexpectedPacket.into()),
            ReceivedPacket::SubAck(ack) => {
                if !self.outbound.ack_packet(ack.packet_identifier) {
                    debug!(
                        "Ignoring stale SUBACK for packet id {=u16}",
                        ack.packet_identifier
                    );
                    return Ok(false);
                }
                debug!("Processed SUBACK packet_id={=u16}", ack.packet_identifier);
                for &code in ack.codes {
                    ReasonCode::from(code).as_result()?;
                }
            }
            ReceivedPacket::UnsubAck(ack) => {
                if !self.outbound.ack_packet(ack.packet_identifier) {
                    debug!(
                        "Ignoring stale UNSUBACK for packet id {=u16}",
                        ack.packet_identifier
                    );
                    return Ok(false);
                }
                debug!("Processed UNSUBACK packet_id={=u16}", ack.packet_identifier);
                for &code in ack.codes {
                    ReasonCode::from(code).as_result()?;
                }
            }
            ReceivedPacket::PingResp => {
                trace!("Received PINGRESP");
                runtime.ping_timeout = None;
            }
            ReceivedPacket::PubAck(ack) => {
                if !self.outbound.ack_packet(ack.packet_identifier) {
                    debug!(
                        "Ignoring stale PUBACK for packet id {=u16}",
                        ack.packet_identifier
                    );
                    return Ok(false);
                }
                runtime.send_quota = runtime
                    .send_quota
                    .saturating_add(1)
                    .min(runtime.max_send_quota);
                debug!(
                    "Processed PUBACK packet_id={=u16} send_quota={=u16}",
                    ack.packet_identifier, runtime.send_quota
                );
                ack.reason.code().as_result()?;
            }
            ReceivedPacket::PubRec(rec) => {
                let queue_release = match self.outbound.ack_packet(rec.packet_id) {
                    true => {
                        runtime.send_quota = runtime
                            .send_quota
                            .saturating_add(1)
                            .min(runtime.max_send_quota);
                        debug!(
                            "Processed PUBREC packet_id={=u16} send_quota={=u16}",
                            rec.packet_id, runtime.send_quota
                        );
                        true
                    }
                    false if self.outbound.has_pending_release(rec.packet_id) => {
                        debug!(
                            "Replaying PUBREL after stale PUBREC for packet id {=u16}",
                            rec.packet_id
                        );
                        false
                    }
                    false => {
                        debug!("Ignoring stale PUBREC for packet id {=u16}", rec.packet_id);
                        return Ok(false);
                    }
                };
                rec.reason.code().as_result()?;
                if queue_release {
                    check_pubrel_size(
                        runtime.maximum_packet_size,
                        rec.packet_id,
                        ReasonCode::Success,
                    )?;
                    self.outbound
                        .queue_release(rec.packet_id, ReasonCode::Success)?;
                    debug!("Queued PUBREL for packet_id={=u16}", rec.packet_id);
                }
            }
            ReceivedPacket::PubComp(comp) => {
                if !self.outbound.ack_release(comp.packet_id) {
                    debug!(
                        "Ignoring stale PUBCOMP for packet id {=u16}",
                        comp.packet_id
                    );
                    return Ok(false);
                }
                debug!("Processed PUBCOMP packet_id={=u16}", comp.packet_id);
                comp.reason.code().as_result()?;
            }
            ReceivedPacket::PubRel(rel) => {
                let reason = if let Some(index) = self
                    .pending_server_packet_ids
                    .iter()
                    .position(|id| *id == rel.packet_id)
                {
                    self.pending_server_packet_ids.swap_remove(index);
                    ReasonCode::Success
                } else {
                    ReasonCode::PacketIdNotFound
                };
                debug!(
                    "Queueing PUBCOMP for inbound PUBREL packet_id={=u16} reason={} pending_inbound_qos2={=usize}",
                    rel.packet_id,
                    reason,
                    self.pending_server_packet_ids.len()
                );
                let action = ControlAction::PubComp {
                    packet_id: rel.packet_id,
                    reason,
                };
                check_control_packet_size(runtime.maximum_packet_size, action)?;
                self.outbound.queue_control(action)?;
            }
            ReceivedPacket::Publish(info) => {
                debug!(
                    "Handling inbound PUBLISH packet_id={=?} topic={=str} qos={} retain={} payload_len={=usize}",
                    info.packet_id,
                    info.topic.0,
                    info.qos,
                    info.retain,
                    info.payload.len()
                );
                match info.qos {
                    QoS::AtMostOnce => {}
                    QoS::AtLeastOnce => {
                        let packet_id = info.packet_id.ok_or(ProtocolError::MalformedPacket)?;
                        let reason = if self.pending_server_packet_ids.contains(&packet_id) {
                            ReasonCode::PacketIdInUse
                        } else {
                            ReasonCode::Success
                        };
                        trace!(
                            "Queueing PUBACK for inbound QoS1 PUBLISH packet_id={=u16} {}",
                            packet_id, reason
                        );
                        let action = ControlAction::PubAck { packet_id, reason };
                        check_control_packet_size(runtime.maximum_packet_size, action)?;
                        self.outbound.queue_control(action)?;
                    }
                    QoS::ExactlyOnce => {
                        let packet_id = info.packet_id.ok_or(ProtocolError::MalformedPacket)?;
                        let duplicate = self.pending_server_packet_ids.contains(&packet_id);
                        let reason = if !duplicate {
                            self.pending_server_packet_ids
                                .push(packet_id)
                                .map(|_| ReasonCode::Success)
                                .unwrap_or(ReasonCode::ReceiveMaxExceeded)
                        } else {
                            ReasonCode::Success
                        };
                        trace!(
                            "Queueing PUBREC for inbound QoS2 PUBLISH packet_id={=u16} duplicate={=bool} {}",
                            packet_id, duplicate, reason
                        );
                        let action = ControlAction::PubRec { packet_id, reason };
                        check_control_packet_size(runtime.maximum_packet_size, action)?;
                        self.outbound.queue_control(action)?;
                        if duplicate || !reason.success() {
                            debug!(
                                "Ignoring inbound QoS2 PUBLISH after PUBREC packet_id={=u16} duplicate={=bool} reason={}",
                                packet_id, duplicate, reason
                            );
                            return Ok(false);
                        }
                    }
                }
                return Ok(true);
            }
            ReceivedPacket::Disconnect(_) => {
                info!("Received broker DISCONNECT");
                return Err(Error::Disconnected);
            }
        }

        Ok(false)
    }
}

impl<'buf, IO> Session<'buf, IO>
where
    IO: Io,
{
    pub(super) fn process_received_packet(&mut self) -> Result<Option<usize>, Error<IO::Error>> {
        if !self.packet_reader.packet_available() {
            return Ok(None);
        }

        let (packet_length, packet) = match self.packet_reader.take_packet() {
            Ok(packet) => packet,
            Err(err) => {
                warn!("Failed to decode inbound packet: {}", err);
                self.handle_disconnect();
                return Err(err.into());
            }
        };
        match self.data.handle_packet(&mut self.runtime, packet) {
            Ok(true) => Ok(Some(packet_length)),
            Ok(false) => Ok(None),
            Err(Error::Disconnected) => {
                warn!("Disconnecting session after broker DISCONNECT");
                self.handle_disconnect();
                Err(Error::Disconnected)
            }
            Err(Error::Peer(PeerError::InvalidPacket)) => {
                warn!("Disconnecting session after packet handling error");
                self.handle_disconnect();
                Err(Error::Peer(PeerError::InvalidPacket))
            }
            Err(Error::Resource(ResourceError::PacketTooLarge)) => {
                warn!("Disconnecting session after packet handling error");
                self.handle_disconnect();
                Err(Error::Resource(ResourceError::PacketTooLarge))
            }
            Err(Error::Peer(err)) => Err(Error::Peer(err)),
            Err(Error::Resource(err)) => Err(Error::Resource(err)),
            Err(Error::InvalidRequest | Error::NotReady | Error::WriteZero) => {
                unreachable!("packet handler returned local I/O state")
            }
            Err(Error::Transport(never)) => match never {},
        }
    }

    pub(super) fn decode_inbound_publish(&self, packet_length: usize) -> InboundPublish<'_> {
        let ReceivedPacket::Publish(info) =
            ReceivedPacket::from_buffer(&self.packet_reader.buffer[..packet_length])
                .expect("inbound packet must remain decodable")
        else {
            unreachable!("inbound event must be a PUBLISH");
        };
        InboundPublish::new(
            info.topic.0,
            info.payload,
            info.properties,
            info.retain,
            info.qos,
        )
    }
}