mqtt_protocol_core/mqtt/connection/
store.rs

1use crate::mqtt::packet::GenericStorePacket;
2use crate::mqtt::packet::IsPacketId;
3use crate::mqtt::packet::ResponsePacket;
4use crate::mqtt::result_code::MqttError;
5use alloc::vec::Vec;
6use indexmap::IndexMap;
7/**
8 * MIT License
9 *
10 * Copyright (c) 2025 Takatoshi Kondo
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in all
20 * copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31/// A store that holds packets in insertion order and allows O(1) insert/remove by id.
32pub struct GenericStore<PacketIdType: IsPacketId> {
33    map: IndexMap<PacketIdType, GenericStorePacket<PacketIdType>>,
34}
35
36pub type Store = GenericStore<u16>;
37
38impl<PacketIdType: IsPacketId> GenericStore<PacketIdType> {
39    /// Create a new empty store.
40    pub fn new() -> Self {
41        Self {
42            map: IndexMap::new(),
43        }
44    }
45
46    /// Add a packet to the store.
47    /// Returns true if inserted, false if a packet with same id already exists.
48    pub fn add(&mut self, packet: GenericStorePacket<PacketIdType>) -> Result<(), MqttError> {
49        let id = packet.packet_id();
50        if self.map.contains_key(&id) {
51            return Err(MqttError::PacketIdentifierConflict);
52        }
53        self.map.insert(id, packet);
54        Ok(())
55    }
56
57    /// Erase a packet by its response type and packet id.
58    /// Returns true if removed, false otherwise.
59    pub fn erase(&mut self, response: ResponsePacket, packet_id: PacketIdType) -> bool {
60        if let Some((index, _, pkt)) = self.map.get_full(&packet_id) {
61            if pkt.response_packet() == response {
62                self.map.shift_remove_index(index);
63                return true;
64            }
65        }
66        false
67    }
68    /// Erase a publish packet by packet id only.
69    /// Returns true if removed, false otherwise.
70    pub fn erase_publish(&mut self, packet_id: PacketIdType) -> bool {
71        if let Some((index, _, pkt)) = self.map.get_full(&packet_id) {
72            if matches!(
73                pkt.response_packet(),
74                ResponsePacket::V3_1_1Puback
75                    | ResponsePacket::V3_1_1Pubrec
76                    | ResponsePacket::V5_0Puback
77                    | ResponsePacket::V5_0Pubrec
78            ) {
79                self.map.shift_remove_index(index);
80                return true;
81            }
82        }
83        false
84    }
85
86    /// Clear all stored packets.
87    pub fn clear(&mut self) {
88        self.map.clear();
89    }
90
91    /// Iterate over packets in insertion order.
92    /// The provided function returns true to keep the packet, or false to remove it.
93    pub fn for_each<F>(&mut self, mut func: F)
94    where
95        F: FnMut(&GenericStorePacket<PacketIdType>) -> bool,
96    {
97        let mut to_remove = Vec::new();
98        for (id, pkt) in &self.map {
99            if !func(pkt) {
100                to_remove.push(*id);
101            }
102        }
103        for id in to_remove {
104            self.map.shift_remove(&id);
105            tracing::trace!("[store] removed pid: {id:?}");
106        }
107    }
108
109    /// Return a vector of all stored packets in insertion order.
110    pub fn get_stored(&self) -> Vec<GenericStorePacket<PacketIdType>> {
111        self.map.values().cloned().collect()
112    }
113}