rumqttd/router/
mod.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    fmt,
4};
5
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    protocol::{
11        ConnAck, ConnAckProperties, Disconnect, DisconnectProperties, Packet, PingResp, PubAck,
12        PubAckProperties, PubComp, PubCompProperties, PubRec, PubRecProperties, PubRel,
13        PubRelProperties, Publish, PublishProperties, SubAck, SubAckProperties, UnsubAck,
14    },
15    ConnectionId, Filter, RouterId, Topic,
16};
17
18mod alertlog;
19mod connection;
20mod graveyard;
21pub mod iobufs;
22mod logs;
23mod routing;
24mod scheduler;
25pub(crate) mod shared_subs;
26mod waiters;
27
28pub use alertlog::Alert;
29pub use connection::Connection;
30pub use routing::Router;
31pub use waiters::Waiters;
32
33pub const MAX_SCHEDULE_ITERATIONS: usize = 100;
34pub const MAX_CHANNEL_CAPACITY: usize = 200;
35
36pub(crate) type FilterIdx = usize;
37
38#[derive(Debug)]
39// TODO: Fix this
40#[allow(clippy::large_enum_variant)]
41pub enum Event {
42    /// Client id and connection handle
43    Connect {
44        connection: connection::Connection,
45        incoming: iobufs::Incoming,
46        outgoing: iobufs::Outgoing,
47    },
48    /// New meter link
49    NewMeter(flume::Sender<Vec<Meter>>),
50    /// New alert link
51    NewAlert(flume::Sender<Vec<Alert>>),
52    /// Connection ready to receive more data
53    Ready,
54    /// Data for native commitlog
55    DeviceData,
56    /// Disconnection request
57    Disconnect,
58    /// Shadow
59    Shadow(ShadowRequest),
60    /// Collect and send alerts to all alerts links
61    SendAlerts,
62    /// Collect and send meters to all meters links
63    SendMeters,
64    /// Get metrics of a connection or all connections
65    PrintStatus(Print),
66    /// Publish Will message
67    PublishWill((String, Option<String>)),
68}
69
70/// Notification from router to connection
71#[derive(Debug, Clone)]
72pub enum Notification {
73    /// Data reply
74    Forward(Forward),
75    /// Acks reply for connection data
76    DeviceAck(Ack),
77    /// Data reply
78    ReplicaData {
79        cursor: (u64, u64),
80        size: usize,
81        payload: Bytes,
82    },
83    /// Acks reply for replication data
84    ReplicaAcks {
85        offset: (u64, u64),
86        payload: Bytes,
87    },
88    /// Shadow
89    Shadow(ShadowReply),
90    Unschedule,
91    Disconnect(Disconnect, Option<DisconnectProperties>),
92}
93
94type MaybePacket = Option<Packet>;
95
96// We either get a Packet to write to buffer or we unschedule which is represented as `None`
97impl From<Notification> for MaybePacket {
98    fn from(notification: Notification) -> Self {
99        let packet: Packet = match notification {
100            Notification::Forward(forward) => Packet::Publish(forward.publish, forward.properties),
101            Notification::DeviceAck(ack) => ack.into(),
102            Notification::Unschedule => return None,
103            Notification::Disconnect(disconnect, props) => Packet::Disconnect(disconnect, props),
104            v => {
105                tracing::error!("Unexpected notification here, it cannot be converted into Packet, Notification: {:?}", v);
106                return None;
107            }
108        };
109        Some(packet)
110    }
111}
112
113#[derive(Debug, Clone)]
114pub struct Forward {
115    pub cursor: Option<(u64, u64)>,
116    pub size: usize,
117    pub publish: Publish,
118    pub properties: Option<PublishProperties>,
119}
120
121#[derive(Debug, Clone)]
122#[allow(clippy::enum_variant_names)]
123pub enum Ack {
124    ConnAck(ConnectionId, ConnAck, Option<ConnAckProperties>),
125    // NOTE: using Option may be a better choice than new variant
126    // ConnAckWithProperties(ConnectionId, ConnAck, ConnAckProperties),
127    // TODO: merge the other variants as well using the same pattern
128    PubAck(PubAck),
129    PubAckWithProperties(PubAck, PubAckProperties),
130    SubAck(SubAck),
131    SubAckWithProperties(SubAck, SubAckProperties),
132    PubRec(PubRec),
133    PubRecWithProperties(PubRec, PubRecProperties),
134    PubRel(PubRel),
135    PubRelWithProperties(PubRel, PubRelProperties),
136    PubComp(PubComp),
137    PubCompWithProperties(PubComp, PubCompProperties),
138    UnsubAck(UnsubAck),
139    PingResp(PingResp),
140}
141
142impl From<Ack> for Packet {
143    fn from(value: Ack) -> Self {
144        match value {
145            Ack::ConnAck(_id, connack, props) => Packet::ConnAck(connack, props),
146            Ack::PubAck(puback) => Packet::PubAck(puback, None),
147            Ack::PubAckWithProperties(puback, prop) => Packet::PubAck(puback, Some(prop)),
148            Ack::SubAck(suback) => Packet::SubAck(suback, None),
149            Ack::SubAckWithProperties(suback, prop) => Packet::SubAck(suback, Some(prop)),
150            Ack::PubRec(pubrec) => Packet::PubRec(pubrec, None),
151            Ack::PubRecWithProperties(pubrec, prop) => Packet::PubRec(pubrec, Some(prop)),
152            Ack::PubRel(pubrel) => Packet::PubRel(pubrel, None),
153            Ack::PubRelWithProperties(pubrel, prop) => Packet::PubRel(pubrel, Some(prop)),
154            Ack::PubComp(pubcomp) => Packet::PubComp(pubcomp, None),
155            Ack::PubCompWithProperties(pubcomp, prop) => Packet::PubComp(pubcomp, Some(prop)),
156            Ack::UnsubAck(unsuback) => Packet::UnsubAck(unsuback, None),
157            Ack::PingResp(pingresp) => Packet::PingResp(pingresp),
158        }
159    }
160}
161
162fn packetid(ack: &Ack) -> u16 {
163    match ack {
164        Ack::ConnAck(..) => 0,
165        Ack::PubAck(puback) => puback.pkid,
166        Ack::PubAckWithProperties(puback, _) => puback.pkid,
167        Ack::SubAck(suback) => suback.pkid,
168        Ack::SubAckWithProperties(suback, _) => suback.pkid,
169        Ack::PubRel(pubrel) => pubrel.pkid,
170        Ack::PubRelWithProperties(pubrel, _) => pubrel.pkid,
171        Ack::PubRec(pubrec) => pubrec.pkid,
172        Ack::PubRecWithProperties(pubrec, _) => pubrec.pkid,
173        Ack::PubComp(pubcomp) => pubcomp.pkid,
174        Ack::PubCompWithProperties(pubcomp, _) => pubcomp.pkid,
175        Ack::UnsubAck(unsuback) => unsuback.pkid,
176        Ack::PingResp(_) => 0,
177    }
178}
179
180/// Request that connection/linker makes to extract data from commitlog
181/// NOTE Connection can make one sweep request to get data from multiple topics
182/// but we'll keep it simple for now as multiple requests in one message can
183/// makes constant extraction size harder
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub struct DataRequest {
186    /// Commitlog this request is pulling data from
187    pub filter: Filter,
188    pub filter_idx: FilterIdx,
189    /// Qos of the outgoing data
190    pub qos: u8,
191    /// (segment, offset) tuples per replica (1 native and 2 replicas)
192    pub cursor: (u64, u64),
193    /// number of messages read from subscription
194    pub read_count: usize,
195    /// Maximum count of payload buffer per replica
196    max_count: usize,
197    pub(crate) forward_retained: bool,
198    pub(crate) group: Option<String>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
202pub struct AcksRequest;
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
205pub enum Request {
206    Data(DataRequest),
207    Ack(AcksRequest),
208}
209
210/// A single message from connection to router
211pub struct Message {
212    /// Log to sweep
213    pub topic: String,
214    /// Qos of the topic
215    pub _qos: u8,
216    /// Reply data chain
217    pub payload: Bytes,
218}
219
220impl fmt::Debug for Message {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        write!(
223            f,
224            "Topic = {:?}, Payload size = {}",
225            self.topic,
226            self.payload.len()
227        )
228    }
229}
230
231/// A batch of messages from connection to router
232pub struct Data {
233    /// (segment, offset) tuples per replica (1 native and 2 replicas)
234    pub offset: (u64, u64),
235    /// Payload size
236    pub size: usize,
237    /// Reply data chain
238    pub payload: Vec<Publish>,
239}
240
241impl fmt::Debug for Data {
242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243        write!(
244            f,
245            "Cursors = {:?}, Payload size = {}, Payload count = {}",
246            self.offset,
247            self.size,
248            self.payload.len()
249        )
250    }
251}
252
253// #[derive(Debug, Clone)]
254// pub struct Disconnection {
255//     pub id: String,
256//     pub execute_will: bool,
257//     pub pending: Vec<Notification>,
258// }
259
260#[derive(Debug, Clone)]
261pub struct ShadowRequest {
262    pub filter: String,
263}
264
265#[derive(Debug, Clone)]
266pub struct ShadowReply {
267    pub topic: Bytes,
268    pub payload: Bytes,
269}
270
271#[derive(Debug, Default, Clone, Serialize, Deserialize)]
272pub struct RouterMeter {
273    pub timestamp: u128,
274    pub sequence: usize,
275    pub router_id: RouterId,
276    pub total_connections: usize,
277    pub total_subscriptions: usize,
278    pub total_publishes: usize,
279    pub failed_publishes: usize,
280}
281
282impl RouterMeter {
283    pub fn get(&mut self) -> Option<Self> {
284        if self.total_publishes > 0 || self.failed_publishes > 0 {
285            self.timestamp = std::time::SystemTime::now()
286                .duration_since(std::time::UNIX_EPOCH)
287                .unwrap()
288                .as_millis();
289            self.sequence += 1;
290
291            let meter = self.clone();
292            self.reset();
293
294            Some(meter)
295        } else {
296            None
297        }
298    }
299
300    fn reset(&mut self) {
301        self.total_publishes = 0;
302        self.failed_publishes = 0;
303    }
304}
305
306#[derive(Debug, Default, Clone, Serialize, Deserialize)]
307pub struct SubscriptionMeter {
308    pub timestamp: u128,
309    pub sequence: usize,
310    pub count: usize,
311    pub total_size: usize,
312}
313
314impl SubscriptionMeter {
315    pub fn get(&mut self) -> Option<Self> {
316        if self.count > 0 {
317            self.timestamp = std::time::SystemTime::now()
318                .duration_since(std::time::UNIX_EPOCH)
319                .unwrap()
320                .as_millis();
321            self.sequence += 1;
322
323            let meter = self.clone();
324            self.reset();
325
326            Some(meter)
327        } else {
328            None
329        }
330    }
331
332    fn reset(&mut self) {
333        self.count = 0;
334        self.total_size = 0;
335    }
336}
337
338#[derive(Debug, Default, Clone)]
339pub struct MeterData {
340    pub count: usize,
341    pub size: usize,
342}
343
344#[derive(Debug, Default, Clone)]
345pub struct IncomingMeter {
346    publishes: HashMap<Topic, MeterData>,
347    subscribes: HashSet<Filter>,
348    total_publishes: MeterData,
349}
350
351impl IncomingMeter {
352    pub fn register_publish(&mut self, publish: &Publish) -> Result<(), std::str::Utf8Error> {
353        let meter = {
354            let topic = std::str::from_utf8(&publish.topic)?.to_string();
355            self.publishes.entry(topic).or_default()
356        };
357        meter.count += 1;
358        meter.size += publish.len();
359
360        self.total_publishes.count += 1;
361        self.total_publishes.size += publish.len();
362
363        Ok(())
364    }
365
366    pub fn get_topic_meters(&self) -> &HashMap<Topic, MeterData> {
367        &self.publishes
368    }
369
370    pub fn register_subscription(&mut self, filter: Filter) -> bool {
371        self.subscribes.insert(filter)
372    }
373
374    pub fn unregister_subscription(&mut self, filter: &Filter) -> bool {
375        self.subscribes.remove(filter)
376    }
377
378    pub fn get_total_count(&self) -> usize {
379        self.total_publishes.count
380    }
381
382    pub fn get_total_size(&self) -> usize {
383        self.total_publishes.size
384    }
385}
386
387#[derive(Debug, Default, Clone)]
388pub struct OutgoingMeter {
389    pub publish_count: usize,
390    pub total_size: usize,
391}
392
393#[derive(Debug, Default, Clone, Serialize, Deserialize)]
394pub struct ConnectionEvents {
395    events: VecDeque<String>,
396}
397
398#[derive(Serialize, Debug, Clone)]
399pub enum Meter {
400    Router(usize, RouterMeter),
401    Subscription(String, SubscriptionMeter),
402}
403
404#[derive(Debug, Clone)]
405pub enum Print {
406    Config,
407    Router,
408    ReadyQueue,
409    Connection(String),
410    Subscriptions,
411    Subscription(Filter),
412    Waiters(Filter),
413}