Skip to main content

rust_mqtt/session/
mod.rs

1//! Contains utilities for session management.
2
3pub use flight::{CPublishFlightState, InFlightPublish, SPublishFlightState};
4use heapless::Vec;
5
6use crate::types::PacketIdentifier;
7
8mod flight;
9
10/// Session-associated information
11///
12/// Client identifier is not stored here as it would lead to inconsistencies with the underyling allocation system.
13#[derive(Debug, Default, Clone)]
14#[cfg_attr(feature = "defmt", derive(defmt::Format))]
15pub struct Session<const RECEIVE_MAXIMUM: usize, const SEND_MAXIMUM: usize> {
16    /// The currently in-flight outgoing publications.
17    pub pending_client_publishes: Vec<InFlightPublish<CPublishFlightState>, SEND_MAXIMUM>,
18    /// The currently in-flight incoming publications.
19    pub pending_server_publishes: Vec<InFlightPublish<SPublishFlightState>, RECEIVE_MAXIMUM>,
20}
21
22impl<const RECEIVE_MAXIMUM: usize, const SEND_MAXIMUM: usize>
23    Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>
24{
25    /// Returns whether the packet identifier is currently in-flight in a client->server publication process.
26    #[must_use]
27    pub fn is_used_cpublish_packet_identifier(&self, packet_identifier: PacketIdentifier) -> bool {
28        self.cpublish_flight_state(packet_identifier).is_some()
29    }
30    /// Returns whether the packet identifier is currently in-flight in a server->client publication process.
31    #[must_use]
32    pub fn is_used_spublish_packet_identifier(&self, packet_identifier: PacketIdentifier) -> bool {
33        self.spublish_flight_state(packet_identifier).is_some()
34    }
35
36    /// Returns the state of the publication of the packet identifier if the packet identifier is in-flight in an outgoing publication.
37    #[must_use]
38    pub fn cpublish_flight_state(
39        &self,
40        packet_identifier: PacketIdentifier,
41    ) -> Option<CPublishFlightState> {
42        self.pending_client_publishes
43            .iter()
44            .find(|f| f.packet_identifier == packet_identifier)
45            .map(|f| f.state)
46    }
47    /// Returns the state of the publication of the packet identifier if the packet identifier is in-flight in an incoming publication.
48    #[must_use]
49    pub fn spublish_flight_state(
50        &self,
51        packet_identifier: PacketIdentifier,
52    ) -> Option<SPublishFlightState> {
53        self.pending_server_publishes
54            .iter()
55            .find(|f| f.packet_identifier == packet_identifier)
56            .map(|f| f.state)
57    }
58
59    /// Returns the amount of currently in-flight outgoing publications.
60    #[must_use]
61    pub fn in_flight_cpublishes(&self) -> u16 {
62        self.pending_client_publishes.len() as u16
63    }
64    /// Returns the amount of currently in-flight incoming publications.
65    #[must_use]
66    pub fn in_flight_spublishes(&self) -> u16 {
67        self.pending_server_publishes.len() as u16
68    }
69    /// Returns the amount of slots for outgoing publications.
70    #[must_use]
71    pub fn cpublish_remaining_capacity(&self) -> u16 {
72        (self.pending_client_publishes.capacity() - self.pending_client_publishes.len()) as u16
73    }
74    /// Returns the amount of slots for incoming publications.
75    #[must_use]
76    pub fn spublish_remaining_capacity(&self) -> u16 {
77        (self.pending_server_publishes.capacity() - self.pending_server_publishes.len()) as u16
78    }
79
80    /// Adds an entry to await a PUBACK/PUBREC/PUBCOMP packet. Assumes the packet identifier has no entry currently.
81    ///
82    /// # Safety
83    /// `self.pending_client_publishes` has free capacity.
84    pub(crate) unsafe fn r#await(
85        &mut self,
86        packet_identifier: PacketIdentifier,
87        state: CPublishFlightState,
88    ) {
89        // Safety: self.pending_client_publishes has free capacity.
90        unsafe {
91            self.pending_client_publishes
92                .push(InFlightPublish {
93                    packet_identifier,
94                    state,
95                })
96                .unwrap_unchecked();
97        }
98    }
99    /// Adds an entry to await a PUBACK packet. Assumes the packet identifier has no entry currently.
100    ///
101    /// # Safety
102    /// `self.pending_client_publishes` has free capacity.
103    pub(crate) unsafe fn await_puback(&mut self, packet_identifier: PacketIdentifier) {
104        // Safety: self.pending_client_publishes has free capacity.
105        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPuback) };
106    }
107    /// Adds an entry to await a PUBREC packet. Assumes the packet identifier has no entry currently.
108    ///
109    /// # Safety
110    /// `self.pending_client_publishes` has free capacity.
111    pub(crate) unsafe fn await_pubrec(&mut self, packet_identifier: PacketIdentifier) {
112        // Safety: self.pending_client_publishes has free capacity.
113        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPubrec) };
114    }
115    /// Adds an entry to await a PUBREL packet. Assumes the packet identifier has no entry currently.
116    ///
117    /// # Safety
118    /// `self.pending_server_publishes` has free capacity.
119    pub(crate) unsafe fn await_pubrel(&mut self, packet_identifier: PacketIdentifier) {
120        // Safety: self.pending_server_publishes has free capacity.
121        unsafe {
122            self.pending_server_publishes
123                .push(InFlightPublish {
124                    packet_identifier,
125                    state: SPublishFlightState::AwaitingPubrel,
126                })
127                .unwrap_unchecked();
128        }
129    }
130    /// Adds an entry to await a PUBCOMP packet. Assumes the packet identifier has no entry currently.
131    ///
132    /// # Safety
133    /// `self.pending_client_publishes` has free capacity.
134    pub(crate) unsafe fn await_pubcomp(&mut self, packet_identifier: PacketIdentifier) {
135        // Safety: self.pending_client_publishes has free capacity.
136        unsafe { self.r#await(packet_identifier, CPublishFlightState::AwaitingPubcomp) };
137    }
138
139    pub(crate) fn remove_cpublish(
140        &mut self,
141        packet_identifier: PacketIdentifier,
142    ) -> Option<CPublishFlightState> {
143        self.pending_client_publishes
144            .iter()
145            .position(|s| s.packet_identifier == packet_identifier)
146            .map(|i| {
147                // Safety: `.iter().position()` confirms the index is within bounds.
148                unsafe { self.pending_client_publishes.swap_remove_unchecked(i) }.state
149            })
150    }
151    pub(crate) fn remove_spublish(
152        &mut self,
153        packet_identifier: PacketIdentifier,
154    ) -> Option<SPublishFlightState> {
155        self.pending_server_publishes
156            .iter()
157            .position(|s| s.packet_identifier == packet_identifier)
158            .map(|i| {
159                // Safety: `.iter().position()` confirms the index is within bounds.
160                unsafe { self.pending_server_publishes.swap_remove_unchecked(i) }.state
161            })
162    }
163
164    pub(crate) fn clear(&mut self) {
165        self.pending_client_publishes.clear();
166        self.pending_server_publishes.clear();
167    }
168}