1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
use core::fmt;
use alloc::collections::VecDeque;
use alloc::string::String;
use alloc::vec::Vec;
use mqtt_proto::{Pid, QoS, TopicFilter, TopicName};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
/// The client is disconnected.
Disconnected,
/// The client is attempting to connect.
Connecting,
/// The client is connected.
Connected,
/// The client is disconnecting.
Disconnecting,
/// The client has encountered an error state.
Error,
}
impl fmt::Display for ConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Disconnected => write!(f, "Disconnected"),
Self::Connecting => write!(f, "Connecting"),
Self::Connected => write!(f, "Connected"),
Self::Disconnecting => write!(f, "Disconnecting"),
Self::Error => write!(f, "Error"),
}
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionInfo {
/// The topic filter for the subscription.
pub topic_filter: TopicFilter,
/// The QoS level of the subscription.
pub qos: QoS,
/// The current state of the subscription.
pub state: SubscriptionState,
/// The number of retries for this subscription.
pub retry_count: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubscriptionState {
/// The subscription is pending acknowledgment.
Pending,
/// The subscription is active.
Active,
/// The client is unsubscribing from this topic.
Unsubscribing,
/// The subscription has failed.
Failed,
}
#[derive(Debug, Clone)]
pub struct ReceivedMessage {
/// The topic of the received message.
pub topic: TopicName,
/// The QoS level of the received message.
pub qos: QoS,
/// Indicates if the received message is a retained message.
pub retain: bool,
/// The payload of the received message.
pub payload: Vec<u8>,
/// The packet ID of the received message (if QoS > 0).
pub packet_id: Option<Pid>,
/// The timestamp of when the message was received (in milliseconds).
pub timestamp: u64,
}
#[derive(Debug)]
pub struct ClientState {
/// The current state of the connection.
connection_state: ConnectionState,
/// A queue of received messages.
received_messages: VecDeque<ReceivedMessage>,
/// Client statistics.
stats: ClientStats,
/// Indicates if the session was unexpectedly disconnected.
clean_session: bool,
/// The maximum number of messages in the receive queue.
msg_capacity: usize,
}
#[derive(Debug, Default, Clone)]
pub struct ClientStats {
/// The total number of packets sent.
pub messages_sent: u32,
/// The total number of packets received.
pub messages_received: u32,
/// The number of `PUBLISH` packets sent.
pub publishes_sent: u32,
/// The number of `PUBLISH` packets received.
pub publishes_received: u32,
/// The number of successful connections.
pub connect_count: u32,
/// The number of disconnections.
pub disconnect_count: u32,
/// The number of reconnections.
pub reconnect_count: u32,
/// The number of errors encountered.
pub error_count: u32,
}
const DEFAULT_MSG_CAPACITY: usize = 64;
impl ClientState {
/// Creates a new `ClientState`.
pub fn new(_client_id: String, msg_capacity: usize, clean_session: bool) -> Self {
Self {
connection_state: ConnectionState::Disconnected,
received_messages: VecDeque::with_capacity(msg_capacity),
stats: ClientStats::default(),
clean_session,
msg_capacity,
}
}
/// Returns the current connection state.
pub fn connection_state(&self) -> ConnectionState {
self.connection_state
}
/// Sets the connection state.
pub fn set_connection_state(&mut self, state: ConnectionState) {
log::debug!(
"Connection state changed: {} -> {}",
self.connection_state,
state
);
match (self.connection_state, state) {
(ConnectionState::Disconnected, ConnectionState::Connecting) => {
// Start connecting
}
(ConnectionState::Connecting, ConnectionState::Connected) => {
self.stats.connect_count += 1;
}
(ConnectionState::Connected, ConnectionState::Disconnecting) => {
// Start disconnecting
}
(_, ConnectionState::Disconnected) => {
self.stats.disconnect_count += 1;
if self.connection_state == ConnectionState::Connected {
// Unexpected disconnect, increment reconnect count
self.stats.reconnect_count += 1;
}
}
(_, ConnectionState::Error) => {
self.stats.error_count += 1;
}
_ => {}
}
self.connection_state = state;
}
/// Adds a received message to the queue.
pub fn add_received_message(&mut self, message: ReceivedMessage) {
if self.received_messages.len() >= self.msg_capacity {
// Queue is full, remove the oldest message
self.received_messages.pop_front();
log::warn!("Received message queue is full, oldest message dropped.");
}
self.received_messages.push_back(message);
self.stats.messages_received += 1;
self.stats.publishes_received += 1;
}
/// Pops the next received message from the queue.
pub fn pop_received_message(&mut self) -> Option<ReceivedMessage> {
self.received_messages.pop_front()
}
/// Increments counters for a sent message.
pub fn record_message_sent(&mut self) {
self.stats.messages_sent += 1;
self.stats.publishes_sent += 1;
}
/// Returns a reference to the client statistics.
pub fn stats(&self) -> &ClientStats {
&self.stats
}
/// Clears the volatile state.
pub fn clear_volatile_state(&mut self) {
self.received_messages.clear();
log::debug!("Volatile state cleared");
}
/// Resets all statistics to zero.
pub fn reset_stats(&mut self) {
self.stats = ClientStats::default();
}
}
impl Default for ClientState {
fn default() -> Self {
Self::new("default_client".into(), DEFAULT_MSG_CAPACITY, true)
}
}