rust-mqtt 0.5.1

MQTT client for embedded and non-embedded environments
Documentation
//! Contains utilities for session management.

pub use flight::{CPublishFlightState, InFlightPublish, SPublishFlightState};
use heapless::Vec;

use crate::types::PacketIdentifier;

mod flight;

/// Session-associated information
///
/// Client identifier is not stored here as it would lead to inconsistencies with the underyling allocation system.
#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Session<const RECEIVE_MAXIMUM: usize, const SEND_MAXIMUM: usize> {
    /// The currently in-flight outgoing publications.
    pub pending_client_publishes: Vec<InFlightPublish<CPublishFlightState>, SEND_MAXIMUM>,
    /// The currently in-flight incoming publications.
    pub pending_server_publishes: Vec<InFlightPublish<SPublishFlightState>, RECEIVE_MAXIMUM>,
}

impl<const RECEIVE_MAXIMUM: usize, const SEND_MAXIMUM: usize>
    Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>
{
    /// Returns whether the packet identifier is currently in-flight in a client->server publication process.
    #[must_use]
    pub fn is_used_cpublish_packet_identifier(&self, packet_identifier: PacketIdentifier) -> bool {
        self.cpublish_flight_state(packet_identifier).is_some()
    }
    /// Returns whether the packet identifier is currently in-flight in a server->client publication process.
    #[must_use]
    pub fn is_used_spublish_packet_identifier(&self, packet_identifier: PacketIdentifier) -> bool {
        self.spublish_flight_state(packet_identifier).is_some()
    }

    /// Returns the state of the publication of the packet identifier if the packet identifier is in-flight in an outgoing publication.
    #[must_use]
    pub fn cpublish_flight_state(
        &self,
        packet_identifier: PacketIdentifier,
    ) -> Option<CPublishFlightState> {
        self.pending_client_publishes
            .iter()
            .find(|f| f.packet_identifier == packet_identifier)
            .map(|f| f.state)
    }
    /// Returns the state of the publication of the packet identifier if the packet identifier is in-flight in an incoming publication.
    #[must_use]
    pub fn spublish_flight_state(
        &self,
        packet_identifier: PacketIdentifier,
    ) -> Option<SPublishFlightState> {
        self.pending_server_publishes
            .iter()
            .find(|f| f.packet_identifier == packet_identifier)
            .map(|f| f.state)
    }

    /// Returns the amount of currently in-flight outgoing publications.
    #[must_use]
    pub fn in_flight_cpublishes(&self) -> u16 {
        self.pending_client_publishes.len() as u16
    }
    /// Returns the amount of currently in-flight incoming publications.
    #[must_use]
    pub fn in_flight_spublishes(&self) -> u16 {
        self.pending_server_publishes.len() as u16
    }
    /// Returns the amount of slots for outgoing publications.
    #[must_use]
    pub fn cpublish_remaining_capacity(&self) -> u16 {
        (self.pending_client_publishes.capacity() - self.pending_client_publishes.len()) as u16
    }
    /// Returns the amount of slots for incoming publications.
    #[must_use]
    pub fn spublish_remaining_capacity(&self) -> u16 {
        (self.pending_server_publishes.capacity() - self.pending_server_publishes.len()) as u16
    }

    /// Adds an entry to await a PUBACK/PUBREC/PUBCOMP packet. Assumes the packet identifier has no entry currently.
    ///
    /// # Safety
    /// `self.pending_client_publishes` has free capacity.
    pub(crate) unsafe fn r#await(
        &mut self,
        packet_identifier: PacketIdentifier,
        state: CPublishFlightState,
    ) {
        // Safety: self.pending_client_publishes has free capacity.
        unsafe {
            self.pending_client_publishes
                .push(InFlightPublish {
                    packet_identifier,
                    state,
                })
                .unwrap_unchecked();
        }
    }
    /// Adds an entry to await a PUBACK packet. Assumes the packet identifier has no entry currently.
    ///
    /// # Safety
    /// `self.pending_client_publishes` has free capacity.
    pub(crate) unsafe fn await_puback(&mut self, packet_identifier: PacketIdentifier) {
        // Safety: self.pending_client_publishes has free capacity.
        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPuback) };
    }
    /// Adds an entry to await a PUBREC packet. Assumes the packet identifier has no entry currently.
    ///
    /// # Safety
    /// `self.pending_client_publishes` has free capacity.
    pub(crate) unsafe fn await_pubrec(&mut self, packet_identifier: PacketIdentifier) {
        // Safety: self.pending_client_publishes has free capacity.
        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPubrec) };
    }
    /// Adds an entry to await a PUBREL packet. Assumes the packet identifier has no entry currently.
    ///
    /// # Safety
    /// `self.pending_server_publishes` has free capacity.
    pub(crate) unsafe fn await_pubrel(&mut self, packet_identifier: PacketIdentifier) {
        // Safety: self.pending_server_publishes has free capacity.
        unsafe {
            self.pending_server_publishes
                .push(InFlightPublish {
                    packet_identifier,
                    state: SPublishFlightState::AwaitingPubrel,
                })
                .unwrap_unchecked();
        }
    }
    /// Adds an entry to await a PUBCOMP packet. Assumes the packet identifier has no entry currently.
    ///
    /// # Safety
    /// `self.pending_client_publishes` has free capacity.
    pub(crate) unsafe fn await_pubcomp(&mut self, packet_identifier: PacketIdentifier) {
        // Safety: self.pending_client_publishes has free capacity.
        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPubcomp) };
    }

    pub(crate) fn remove_cpublish(
        &mut self,
        packet_identifier: PacketIdentifier,
    ) -> Option<CPublishFlightState> {
        self.pending_client_publishes
            .iter()
            .position(|s| s.packet_identifier == packet_identifier)
            .map(|i| {
                // Safety: `.iter().position()` confirms the index is within bounds.
                unsafe { self.pending_client_publishes.swap_remove_unchecked(i) }.state
            })
    }
    pub(crate) fn remove_spublish(
        &mut self,
        packet_identifier: PacketIdentifier,
    ) -> Option<SPublishFlightState> {
        self.pending_server_publishes
            .iter()
            .position(|s| s.packet_identifier == packet_identifier)
            .map(|i| {
                // Safety: `.iter().position()` confirms the index is within bounds.
                unsafe { self.pending_server_publishes.swap_remove_unchecked(i) }.state
            })
    }

    pub(crate) fn clear(&mut self) {
        self.pending_client_publishes.clear();
        self.pending_server_publishes.clear();
    }
}