mqtt5 0.31.3

Complete MQTT v5.0 platform with high-performance async client and full-featured broker supporting TCP, TLS, WebSocket, authentication, bridging, and resource monitoring
Documentation
//! Packet type handlers for incoming packets

use crate::callback::CallbackManager;
use crate::codec::CodecRegistry;
use crate::error::{MqttError, Result};
use crate::packet::Packet;
use crate::protocol::v5::properties::Properties;
use crate::session::SessionState;
use crate::transport::PacketWriter;
use parking_lot::Mutex;
use std::sync::Arc;

use super::keepalive::KeepaliveState;
use super::unified::UnifiedWriter;
#[cfg(feature = "transport-quic")]
use crate::transport::flow::FlowId;

pub(super) async fn handle_incoming_packet_with_writer(
    packet: Packet,
    writer: &Arc<tokio::sync::Mutex<UnifiedWriter>>,
    session: &Arc<tokio::sync::RwLock<SessionState>>,
    callback_manager: &Arc<CallbackManager>,
    flow_id: Option<crate::transport::flow::FlowId>,
    keepalive_state: &Arc<Mutex<KeepaliveState>>,
    codec_registry: Option<&Arc<CodecRegistry>>,
) -> Result<()> {
    match packet {
        Packet::Publish(publish) => {
            handle_publish_with_ack(
                publish,
                writer,
                session,
                callback_manager,
                flow_id,
                codec_registry,
            )
            .await
        }
        Packet::PingResp => {
            keepalive_state.lock().record_pong_received();
            Ok(())
        }
        Packet::PubRec(pubrec) => handle_pubrec_outgoing(pubrec, writer, session).await,
        Packet::PubRel(pubrel) => handle_pubrel(pubrel, writer, session).await,
        Packet::PubComp(pubcomp) => handle_pubcomp_outgoing(pubcomp, session).await,
        Packet::Disconnect(disconnect) => {
            tracing::info!("Server sent DISCONNECT: {:?}", disconnect.reason_code);
            Err(MqttError::ConnectionError(
                "Server disconnected".to_string(),
            ))
        }
        _ => Ok(()),
    }
}

pub(super) async fn handle_publish_with_ack(
    mut publish: crate::packet::publish::PublishPacket,
    writer: &Arc<tokio::sync::Mutex<UnifiedWriter>>,
    session: &Arc<tokio::sync::RwLock<SessionState>>,
    callback_manager: &Arc<CallbackManager>,
    flow_id: Option<crate::transport::flow::FlowId>,
    codec_registry: Option<&Arc<CodecRegistry>>,
) -> Result<()> {
    if let Some(registry) = codec_registry {
        let content_type = publish.properties.get_content_type();
        let decoded = registry.decode_if_needed(&publish.payload, content_type.as_deref())?;
        publish.payload = decoded;
    }

    match publish.qos {
        crate::QoS::AtMostOnce => {}
        crate::QoS::AtLeastOnce => {
            if let Some(packet_id) = publish.packet_id {
                session
                    .read()
                    .await
                    .flow_control()
                    .read()
                    .await
                    .register_inbound_publish(packet_id)
                    .await?;

                if let Some(fid) = flow_id {
                    session
                        .read()
                        .await
                        .store_publish_flow(packet_id, fid)
                        .await;
                }
                let puback = crate::packet::puback::PubAckPacket {
                    packet_id,
                    reason_code: crate::protocol::v5::reason_codes::ReasonCode::Success,
                    properties: Properties::default(),
                };
                writer
                    .lock()
                    .await
                    .write_packet(Packet::PubAck(puback))
                    .await?;

                session
                    .read()
                    .await
                    .flow_control()
                    .read()
                    .await
                    .acknowledge_inbound(packet_id)
                    .await;
            }
        }
        crate::QoS::ExactlyOnce => {
            if let Some(packet_id) = publish.packet_id {
                session
                    .read()
                    .await
                    .flow_control()
                    .read()
                    .await
                    .register_inbound_publish(packet_id)
                    .await?;

                if let Some(fid) = flow_id {
                    session
                        .read()
                        .await
                        .store_publish_flow(packet_id, fid)
                        .await;
                }
                session
                    .write()
                    .await
                    .store_unacked_publish(publish.clone())
                    .await?;
                let pubrec = crate::packet::pubrec::PubRecPacket {
                    packet_id,
                    reason_code: crate::protocol::v5::reason_codes::ReasonCode::Success,
                    properties: Properties::default(),
                };
                writer
                    .lock()
                    .await
                    .write_packet(Packet::PubRec(pubrec))
                    .await?;
                session.write().await.store_pubrec(packet_id).await;
            }
        }
    }

    publish.stream_id = flow_id.map(|f| f.raw());
    let _ = callback_manager.dispatch(&publish);

    Ok(())
}

#[cfg(feature = "transport-quic")]
pub(super) async fn handle_incoming_packet_no_writer(
    packet: Packet,
    callback_manager: &Arc<CallbackManager>,
    flow_id: Option<FlowId>,
    keepalive_state: &Arc<Mutex<KeepaliveState>>,
    codec_registry: Option<&Arc<CodecRegistry>>,
) -> Result<()> {
    match packet {
        Packet::Publish(mut publish) => {
            if publish.qos != crate::QoS::AtMostOnce {
                return Err(MqttError::ProtocolError(
                    "QoS > 0 publish received on unidirectional stream".to_string(),
                ));
            }
            if let Some(registry) = codec_registry {
                let content_type = publish.properties.get_content_type();
                let decoded =
                    registry.decode_if_needed(&publish.payload, content_type.as_deref())?;
                publish.payload = decoded;
            }
            publish.stream_id = flow_id.map(|f| f.raw());
            let _ = callback_manager.dispatch(&publish);
            Ok(())
        }
        Packet::PingResp => {
            keepalive_state.lock().record_pong_received();
            Ok(())
        }
        Packet::Disconnect(disconnect) => {
            tracing::info!("Server sent DISCONNECT: {:?}", disconnect.reason_code);
            Err(MqttError::ConnectionError(
                "Server disconnected".to_string(),
            ))
        }
        _ => Ok(()),
    }
}

pub(super) async fn handle_pubrec_outgoing(
    pubrec: crate::packet::pubrec::PubRecPacket,
    writer: &Arc<tokio::sync::Mutex<UnifiedWriter>>,
    session: &Arc<tokio::sync::RwLock<SessionState>>,
) -> Result<()> {
    session
        .write()
        .await
        .complete_pubrec(pubrec.packet_id)
        .await;

    let pub_rel = crate::packet::pubrel::PubRelPacket {
        packet_id: pubrec.packet_id,
        reason_code: crate::protocol::v5::reason_codes::ReasonCode::Success,
        properties: Properties::default(),
    };

    writer
        .lock()
        .await
        .write_packet(crate::packet::Packet::PubRel(pub_rel))
        .await?;

    session.write().await.store_pubrel(pubrec.packet_id).await;

    Ok(())
}

pub(super) async fn handle_pubcomp_outgoing(
    pubcomp: crate::packet::pubcomp::PubCompPacket,
    session: &Arc<tokio::sync::RwLock<SessionState>>,
) -> Result<()> {
    session
        .write()
        .await
        .complete_pubrel(pubcomp.packet_id)
        .await;

    Ok(())
}

pub(super) async fn handle_pubrel(
    pubrel: crate::packet::pubrel::PubRelPacket,
    writer: &Arc<tokio::sync::Mutex<UnifiedWriter>>,
    session: &Arc<tokio::sync::RwLock<SessionState>>,
) -> Result<()> {
    let has_pubrec = session.read().await.has_pubrec(pubrel.packet_id).await;

    if has_pubrec {
        session.write().await.remove_pubrec(pubrel.packet_id).await;

        let pubcomp = crate::packet::pubcomp::PubCompPacket {
            packet_id: pubrel.packet_id,
            reason_code: crate::protocol::v5::reason_codes::ReasonCode::Success,
            properties: Properties::default(),
        };

        writer
            .lock()
            .await
            .write_packet(Packet::PubComp(pubcomp))
            .await?;
    } else {
        let pubcomp = crate::packet::pubcomp::PubCompPacket {
            packet_id: pubrel.packet_id,
            reason_code: crate::protocol::v5::reason_codes::ReasonCode::Success,
            properties: Properties::default(),
        };

        writer
            .lock()
            .await
            .write_packet(Packet::PubComp(pubcomp))
            .await?;
    }

    session
        .read()
        .await
        .flow_control()
        .read()
        .await
        .acknowledge_inbound(pubrel.packet_id)
        .await;

    Ok(())
}