1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 fmt,
4};
5
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 protocol::{
11 ConnAck, ConnAckProperties, Disconnect, DisconnectProperties, Packet, PingResp, PubAck,
12 PubAckProperties, PubComp, PubCompProperties, PubRec, PubRecProperties, PubRel,
13 PubRelProperties, Publish, PublishProperties, SubAck, SubAckProperties, UnsubAck,
14 },
15 ConnectionId, Filter, RouterId, Topic,
16};
17
18mod alertlog;
19mod connection;
20mod graveyard;
21pub mod iobufs;
22mod logs;
23mod routing;
24mod scheduler;
25pub(crate) mod shared_subs;
26mod waiters;
27
28pub use alertlog::Alert;
29pub use connection::Connection;
30pub use routing::Router;
31pub use waiters::Waiters;
32
33pub const MAX_SCHEDULE_ITERATIONS: usize = 100;
34pub const MAX_CHANNEL_CAPACITY: usize = 200;
35
36pub(crate) type FilterIdx = usize;
37
38#[derive(Debug)]
39#[allow(clippy::large_enum_variant)]
41pub enum Event {
42 Connect {
44 connection: connection::Connection,
45 incoming: iobufs::Incoming,
46 outgoing: iobufs::Outgoing,
47 },
48 NewMeter(flume::Sender<Vec<Meter>>),
50 NewAlert(flume::Sender<Vec<Alert>>),
52 Ready,
54 DeviceData,
56 Disconnect,
58 Shadow(ShadowRequest),
60 SendAlerts,
62 SendMeters,
64 PrintStatus(Print),
66 PublishWill((String, Option<String>)),
68}
69
70#[derive(Debug, Clone)]
72pub enum Notification {
73 Forward(Forward),
75 DeviceAck(Ack),
77 ReplicaData {
79 cursor: (u64, u64),
80 size: usize,
81 payload: Bytes,
82 },
83 ReplicaAcks {
85 offset: (u64, u64),
86 payload: Bytes,
87 },
88 Shadow(ShadowReply),
90 Unschedule,
91 Disconnect(Disconnect, Option<DisconnectProperties>),
92}
93
94type MaybePacket = Option<Packet>;
95
96impl From<Notification> for MaybePacket {
98 fn from(notification: Notification) -> Self {
99 let packet: Packet = match notification {
100 Notification::Forward(forward) => Packet::Publish(forward.publish, forward.properties),
101 Notification::DeviceAck(ack) => ack.into(),
102 Notification::Unschedule => return None,
103 Notification::Disconnect(disconnect, props) => Packet::Disconnect(disconnect, props),
104 v => {
105 tracing::error!("Unexpected notification here, it cannot be converted into Packet, Notification: {:?}", v);
106 return None;
107 }
108 };
109 Some(packet)
110 }
111}
112
113#[derive(Debug, Clone)]
114pub struct Forward {
115 pub cursor: Option<(u64, u64)>,
116 pub size: usize,
117 pub publish: Publish,
118 pub properties: Option<PublishProperties>,
119}
120
121#[derive(Debug, Clone)]
122#[allow(clippy::enum_variant_names)]
123pub enum Ack {
124 ConnAck(ConnectionId, ConnAck, Option<ConnAckProperties>),
125 PubAck(PubAck),
129 PubAckWithProperties(PubAck, PubAckProperties),
130 SubAck(SubAck),
131 SubAckWithProperties(SubAck, SubAckProperties),
132 PubRec(PubRec),
133 PubRecWithProperties(PubRec, PubRecProperties),
134 PubRel(PubRel),
135 PubRelWithProperties(PubRel, PubRelProperties),
136 PubComp(PubComp),
137 PubCompWithProperties(PubComp, PubCompProperties),
138 UnsubAck(UnsubAck),
139 PingResp(PingResp),
140}
141
142impl From<Ack> for Packet {
143 fn from(value: Ack) -> Self {
144 match value {
145 Ack::ConnAck(_id, connack, props) => Packet::ConnAck(connack, props),
146 Ack::PubAck(puback) => Packet::PubAck(puback, None),
147 Ack::PubAckWithProperties(puback, prop) => Packet::PubAck(puback, Some(prop)),
148 Ack::SubAck(suback) => Packet::SubAck(suback, None),
149 Ack::SubAckWithProperties(suback, prop) => Packet::SubAck(suback, Some(prop)),
150 Ack::PubRec(pubrec) => Packet::PubRec(pubrec, None),
151 Ack::PubRecWithProperties(pubrec, prop) => Packet::PubRec(pubrec, Some(prop)),
152 Ack::PubRel(pubrel) => Packet::PubRel(pubrel, None),
153 Ack::PubRelWithProperties(pubrel, prop) => Packet::PubRel(pubrel, Some(prop)),
154 Ack::PubComp(pubcomp) => Packet::PubComp(pubcomp, None),
155 Ack::PubCompWithProperties(pubcomp, prop) => Packet::PubComp(pubcomp, Some(prop)),
156 Ack::UnsubAck(unsuback) => Packet::UnsubAck(unsuback, None),
157 Ack::PingResp(pingresp) => Packet::PingResp(pingresp),
158 }
159 }
160}
161
162fn packetid(ack: &Ack) -> u16 {
163 match ack {
164 Ack::ConnAck(..) => 0,
165 Ack::PubAck(puback) => puback.pkid,
166 Ack::PubAckWithProperties(puback, _) => puback.pkid,
167 Ack::SubAck(suback) => suback.pkid,
168 Ack::SubAckWithProperties(suback, _) => suback.pkid,
169 Ack::PubRel(pubrel) => pubrel.pkid,
170 Ack::PubRelWithProperties(pubrel, _) => pubrel.pkid,
171 Ack::PubRec(pubrec) => pubrec.pkid,
172 Ack::PubRecWithProperties(pubrec, _) => pubrec.pkid,
173 Ack::PubComp(pubcomp) => pubcomp.pkid,
174 Ack::PubCompWithProperties(pubcomp, _) => pubcomp.pkid,
175 Ack::UnsubAck(unsuback) => unsuback.pkid,
176 Ack::PingResp(_) => 0,
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub struct DataRequest {
186 pub filter: Filter,
188 pub filter_idx: FilterIdx,
189 pub qos: u8,
191 pub cursor: (u64, u64),
193 pub read_count: usize,
195 max_count: usize,
197 pub(crate) forward_retained: bool,
198 pub(crate) group: Option<String>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
202pub struct AcksRequest;
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
205pub enum Request {
206 Data(DataRequest),
207 Ack(AcksRequest),
208}
209
210pub struct Message {
212 pub topic: String,
214 pub _qos: u8,
216 pub payload: Bytes,
218}
219
220impl fmt::Debug for Message {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 write!(
223 f,
224 "Topic = {:?}, Payload size = {}",
225 self.topic,
226 self.payload.len()
227 )
228 }
229}
230
231pub struct Data {
233 pub offset: (u64, u64),
235 pub size: usize,
237 pub payload: Vec<Publish>,
239}
240
241impl fmt::Debug for Data {
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 write!(
244 f,
245 "Cursors = {:?}, Payload size = {}, Payload count = {}",
246 self.offset,
247 self.size,
248 self.payload.len()
249 )
250 }
251}
252
253#[derive(Debug, Clone)]
261pub struct ShadowRequest {
262 pub filter: String,
263}
264
265#[derive(Debug, Clone)]
266pub struct ShadowReply {
267 pub topic: Bytes,
268 pub payload: Bytes,
269}
270
271#[derive(Debug, Default, Clone, Serialize, Deserialize)]
272pub struct RouterMeter {
273 pub timestamp: u128,
274 pub sequence: usize,
275 pub router_id: RouterId,
276 pub total_connections: usize,
277 pub total_subscriptions: usize,
278 pub total_publishes: usize,
279 pub failed_publishes: usize,
280}
281
282impl RouterMeter {
283 pub fn get(&mut self) -> Option<Self> {
284 if self.total_publishes > 0 || self.failed_publishes > 0 {
285 self.timestamp = std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .unwrap()
288 .as_millis();
289 self.sequence += 1;
290
291 let meter = self.clone();
292 self.reset();
293
294 Some(meter)
295 } else {
296 None
297 }
298 }
299
300 fn reset(&mut self) {
301 self.total_publishes = 0;
302 self.failed_publishes = 0;
303 }
304}
305
306#[derive(Debug, Default, Clone, Serialize, Deserialize)]
307pub struct SubscriptionMeter {
308 pub timestamp: u128,
309 pub sequence: usize,
310 pub count: usize,
311 pub total_size: usize,
312}
313
314impl SubscriptionMeter {
315 pub fn get(&mut self) -> Option<Self> {
316 if self.count > 0 {
317 self.timestamp = std::time::SystemTime::now()
318 .duration_since(std::time::UNIX_EPOCH)
319 .unwrap()
320 .as_millis();
321 self.sequence += 1;
322
323 let meter = self.clone();
324 self.reset();
325
326 Some(meter)
327 } else {
328 None
329 }
330 }
331
332 fn reset(&mut self) {
333 self.count = 0;
334 self.total_size = 0;
335 }
336}
337
338#[derive(Debug, Default, Clone)]
339pub struct MeterData {
340 pub count: usize,
341 pub size: usize,
342}
343
344#[derive(Debug, Default, Clone)]
345pub struct IncomingMeter {
346 publishes: HashMap<Topic, MeterData>,
347 subscribes: HashSet<Filter>,
348 total_publishes: MeterData,
349}
350
351impl IncomingMeter {
352 pub fn register_publish(&mut self, publish: &Publish) -> Result<(), std::str::Utf8Error> {
353 let meter = {
354 let topic = std::str::from_utf8(&publish.topic)?.to_string();
355 self.publishes.entry(topic).or_default()
356 };
357 meter.count += 1;
358 meter.size += publish.len();
359
360 self.total_publishes.count += 1;
361 self.total_publishes.size += publish.len();
362
363 Ok(())
364 }
365
366 pub fn get_topic_meters(&self) -> &HashMap<Topic, MeterData> {
367 &self.publishes
368 }
369
370 pub fn register_subscription(&mut self, filter: Filter) -> bool {
371 self.subscribes.insert(filter)
372 }
373
374 pub fn unregister_subscription(&mut self, filter: &Filter) -> bool {
375 self.subscribes.remove(filter)
376 }
377
378 pub fn get_total_count(&self) -> usize {
379 self.total_publishes.count
380 }
381
382 pub fn get_total_size(&self) -> usize {
383 self.total_publishes.size
384 }
385}
386
387#[derive(Debug, Default, Clone)]
388pub struct OutgoingMeter {
389 pub publish_count: usize,
390 pub total_size: usize,
391}
392
393#[derive(Debug, Default, Clone, Serialize, Deserialize)]
394pub struct ConnectionEvents {
395 events: VecDeque<String>,
396}
397
398#[derive(Serialize, Debug, Clone)]
399pub enum Meter {
400 Router(usize, RouterMeter),
401 Subscription(String, SubscriptionMeter),
402}
403
404#[derive(Debug, Clone)]
405pub enum Print {
406 Config,
407 Router,
408 ReadyQueue,
409 Connection(String),
410 Subscriptions,
411 Subscription(Filter),
412 Waiters(Filter),
413}