mqtt_protocol_core/mqtt/connection/
store.rs

1use crate::mqtt::packet::GenericStorePacket;
2use crate::mqtt::packet::ResponsePacket;
3use crate::mqtt::result_code::MqttError;
4use crate::mqtt::packet::IsPacketId;
5use indexmap::IndexMap;
6/**
7 * MIT License
8 *
9 * Copyright (c) 2025 Takatoshi Kondo
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in all
19 * copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30/// A store that holds packets in insertion order and allows O(1) insert/remove by id.
31pub struct GenericStore<PacketIdType: IsPacketId> {
32    map: IndexMap<PacketIdType, GenericStorePacket<PacketIdType>>,
33}
34
35pub type Store = GenericStore<u16>;
36
37impl<PacketIdType: IsPacketId> GenericStore<PacketIdType> {
38    /// Create a new empty store.
39    pub fn new() -> Self {
40        Self {
41            map: IndexMap::new(),
42        }
43    }
44
45    /// Add a packet to the store.
46    /// Returns true if inserted, false if a packet with same id already exists.
47    pub fn add(&mut self, packet: GenericStorePacket<PacketIdType>) -> Result<(), MqttError> {
48        let id = packet.packet_id();
49        if self.map.contains_key(&id) {
50            return Err(MqttError::PacketIdentifierConflict);
51        }
52        self.map.insert(id, packet);
53        Ok(())
54    }
55
56    /// Erase a packet by its response type and packet id.
57    /// Returns true if removed, false otherwise.
58    pub fn erase(&mut self, response: ResponsePacket, packet_id: PacketIdType) -> bool {
59        if let Some((index, _, pkt)) = self.map.get_full(&packet_id) {
60            if pkt.response_packet() == response {
61                self.map.shift_remove_index(index);
62                return true;
63            }
64        }
65        false
66    }
67    /// Erase a publish packet by packet id only.
68    /// Returns true if removed, false otherwise.
69    pub fn erase_publish(&mut self, packet_id: PacketIdType) -> bool {
70        if let Some((index, _, pkt)) = self.map.get_full(&packet_id) {
71            if matches!(
72                pkt.response_packet(),
73                ResponsePacket::V3_1_1Puback
74                    | ResponsePacket::V3_1_1Pubrec
75                    | ResponsePacket::V5_0Puback
76                    | ResponsePacket::V5_0Pubrec
77            ) {
78                self.map.shift_remove_index(index);
79                return true;
80            }
81        }
82        false
83    }
84
85    /// Clear all stored packets.
86    pub fn clear(&mut self) {
87        self.map.clear();
88    }
89
90    /// Iterate over packets in insertion order.
91    /// The provided function returns true to keep the packet, or false to remove it.
92    pub fn for_each<F>(&mut self, mut func: F)
93    where
94        F: FnMut(&GenericStorePacket<PacketIdType>) -> bool,
95    {
96        let mut to_remove = Vec::new();
97        for (id, pkt) in &self.map {
98            if !func(pkt) {
99                to_remove.push(*id);
100            }
101        }
102        for id in to_remove {
103            self.map.shift_remove(&id);
104            println!("[store] removed pid: {:?}", id);
105        }
106    }
107
108    /// Return a vector of all stored packets in insertion order.
109    pub fn get_stored(&self) -> Vec<GenericStorePacket<PacketIdType>> {
110        self.map.values().cloned().collect()
111    }
112}