1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
use hashbrown::{HashMap, HashSet};
use mqtt_proto::{Pid, QoS, TopicFilter};
use crate::error::MqttError;
use crate::state::{SubscriptionInfo, SubscriptionState};
use super::{InflightMessage, SessionState};
const DEFAULT_SUB_CAPACITY: usize = 32;
#[derive(Debug)]
pub struct InMemorySession {
next_pid: Pid,
pending_publishes: HashMap<Pid, InflightMessage>,
pending_pubrels: HashSet<Pid>,
subscriptions: HashMap<Pid, SubscriptionInfo>,
active_topics: HashSet<TopicFilter>,
sub_capacity: usize,
// TODO: For complete QoS 2 support, pending PUBREL messages need to be tracked with more context.
// This should include a timestamp and retry count to allow for proper retransmission logic
// without overwhelming the network. A new struct might be needed to hold this state.
// See `protocol/mod.rs` `handle_retries` for where this would be used.
}
impl InMemorySession {
pub fn new(sub_capacity: usize) -> Self {
Self {
sub_capacity,
..Default::default()
}
}
}
impl Default for InMemorySession {
fn default() -> Self {
Self {
next_pid: Pid::default(),
pending_publishes: HashMap::new(),
pending_pubrels: HashSet::new(),
subscriptions: HashMap::new(),
active_topics: HashSet::new(),
sub_capacity: DEFAULT_SUB_CAPACITY,
}
}
}
impl SessionState for InMemorySession {
fn next_pid(&mut self) -> Pid {
let pid = self.next_pid;
self.next_pid += 1;
pid
}
fn store_outgoing_publish(
&mut self,
pid: Pid,
message: InflightMessage,
) -> Result<(), MqttError> {
// TODO: check for capacity if we want to limit inflight messages at this level.
self.pending_publishes.insert(pid, message);
Ok(())
}
fn get_outgoing_publish_mut(&mut self, pid: Pid) -> Option<&mut InflightMessage> {
self.pending_publishes.get_mut(&pid)
}
fn complete_outgoing_publish(&mut self, pid: Pid) -> Option<InflightMessage> {
self.pending_publishes.remove(&pid)
}
fn pending_outgoing_publishes(&self) -> impl Iterator<Item = (Pid, &InflightMessage)> {
self.pending_publishes.iter().map(|(p, m)| (*p, m))
}
fn store_outgoing_pubrel(&mut self, pid: Pid) -> Result<(), MqttError> {
self.pending_pubrels.insert(pid);
Ok(())
}
fn complete_outgoing_pubrel(&mut self, pid: Pid) -> Option<Pid> {
if self.pending_pubrels.remove(&pid) {
Some(pid)
} else {
None
}
}
fn pending_outgoing_pubrels(&self) -> impl Iterator<Item = &Pid> {
self.pending_pubrels.iter()
}
fn add_subscription(
&mut self,
pid: Pid,
topic_filter: TopicFilter,
qos: QoS,
) -> Result<(), MqttError> {
if self.subscriptions.len() >= self.sub_capacity {
// TODO: Define a `MqttError::SubscriptionLimitExceeded` variant for more precise error handling.
// This would allow the caller to distinguish between different internal limits.
return Err(MqttError::Internal);
}
let subscription = SubscriptionInfo {
topic_filter: topic_filter.clone(),
qos,
state: SubscriptionState::Pending,
retry_count: 0,
};
self.subscriptions.insert(pid, subscription);
log::debug!(
"Added subscription: {} (PID: {})",
topic_filter,
pid.value()
);
Ok(())
}
fn confirm_subscription(&mut self, pid: Pid, return_codes: &[u8]) -> Result<(), MqttError> {
if let Some(subscription) = self.subscriptions.get_mut(&pid) {
// TODO: Enhance subscription confirmation logic to handle multiple topics in a single SUBACK.
// A single `SUBSCRIBE` packet can contain multiple topic filters. The `SUBACK` contains a list
// of return codes, one for each filter. The current implementation only handles one filter per
// subscribe request and thus only checks the first return code.
// A complete implementation should:
// 1. Associate a single PID with a list of topic filters sent in one packet.
// 2. Iterate through the `return_codes` and update the state of each topic filter individually.
// This is crucial for robustly handling partial subscription successes/failures.
if let Some(&return_code) = return_codes.first() {
if return_code <= 2 {
// QoS 0, 1, 2
subscription.state = SubscriptionState::Active;
if self.active_topics.len() < self.sub_capacity {
self.active_topics.insert(subscription.topic_filter.clone());
} else {
// TODO: Define and use `MqttError::SubscriptionLimitExceeded` here as well.
// This indicates that the session cannot store more active topic filters.
return Err(MqttError::Internal);
}
log::info!(
"Subscription confirmed: {} (QoS: {})",
subscription.topic_filter,
return_code
);
} else {
subscription.state = SubscriptionState::Failed;
log::warn!(
"Subscription failed: {} (return code: {})",
subscription.topic_filter,
return_code
);
}
}
}
Ok(())
}
fn remove_subscription(&mut self, pid: Pid) -> Option<SubscriptionInfo> {
if let Some(subscription) = self.subscriptions.remove(&pid) {
self.active_topics.remove(&subscription.topic_filter);
log::debug!(
"Removed subscription: {} (PID: {})",
subscription.topic_filter,
pid.value()
);
Some(subscription)
} else {
None
}
}
fn clear(&mut self) {
self.next_pid = Pid::default();
self.pending_publishes.clear();
self.subscriptions.clear();
self.active_topics.clear();
self.pending_pubrels.clear();
}
}