Skip to main content

mesquitte_core/server/
state.rs

1use dashmap::DashMap;
2use mqtt_codec_kit::common::{QualityOfService, TopicFilter};
3use parking_lot::Mutex;
4use tokio::sync::mpsc::{self, channel};
5
6use crate::types::{
7    client::{AddClientReceipt, ClientId},
8    error::Error,
9    outgoing::Outgoing,
10    retain_table::RetainTable,
11    topic_router::RouteTable,
12};
13
14#[derive(Default)]
15pub struct GlobalState {
16    // TODO: metrics
17    // config: Arc<Config>,
18
19    // TODO: config: max qos
20    // TODO: config: max packet size
21    // TODO: config: max client packet size->V5 properties
22    // TODO: config: max topic alias
23    // TODO: config: read channel size
24    // TODO: config: outgoing channel size
25    // TODO: config: max inflight size
26    // TODO: config: max inflight message size
27    // TODO: config: max qos2 limit
28    // TODO: config max keep alive
29    // TODO: config min keep alive
30    // TODO: config retain table enable
31    // TODO: config max retain table size?
32
33    // TODO: The next client internal id, use this mutex to keep `add_client` atomic
34    next_client_id: Mutex<u64>,
35
36    // client internal id => MQTT client identifier
37    client_id_map: DashMap<ClientId, String, ahash::RandomState>,
38    // MQTT client identifier => client internal id
39    client_identifier_map: DashMap<String, ClientId, ahash::RandomState>,
40    clients: DashMap<ClientId, mpsc::Sender<Outgoing>, ahash::RandomState>,
41
42    route_table: RouteTable,
43    retain_table: RetainTable,
44}
45
46impl GlobalState {
47    pub fn new() -> Self {
48        Self {
49            ..Default::default()
50        }
51    }
52
53    fn renew_client(&self, client_identifier: &str, sender: mpsc::Sender<Outgoing>) -> ClientId {
54        let mut next_client_id = self.next_client_id.lock();
55
56        let client_id = (*next_client_id).into();
57
58        self.client_identifier_map
59            .insert(client_identifier.to_string(), client_id);
60        self.client_id_map
61            .insert(client_id, client_identifier.to_string());
62        self.clients.insert(client_id, sender);
63
64        *next_client_id += 1;
65
66        client_id
67    }
68
69    pub async fn add_client(
70        &self,
71        client_identifier: &str,
72        sender: mpsc::Sender<Outgoing>,
73    ) -> Result<AddClientReceipt, Error> {
74        let client_id_opt: Option<ClientId> = self
75            .client_identifier_map
76            .get(client_identifier)
77            .map(|pair| *pair.value());
78
79        // TODO: build session state timeout
80        if let Some(client_id) = client_id_opt {
81            if let Some(old_sender) = self.get_outgoing_sender(&client_id) {
82                if !old_sender.is_closed() {
83                    let (control_sender, mut control_receiver) = channel(1);
84                    if let Err(err) = old_sender.send(Outgoing::Online(control_sender)).await {
85                        log::error!("global state add client: {err}");
86                        return Err(Error::SendOutgoing(err));
87                    }
88                    return match control_receiver.recv().await {
89                        Some(state) => {
90                            let client_id = self.renew_client(client_identifier, sender);
91                            Ok(AddClientReceipt::Present(client_id, state))
92                        }
93                        None => Err(Error::EmptySessionState),
94                    };
95                }
96            }
97        }
98        Ok(AddClientReceipt::New(
99            self.renew_client(client_identifier, sender),
100        ))
101    }
102
103    pub fn remove_client<'a>(
104        &self,
105        client_id: ClientId,
106        subscribes: impl IntoIterator<Item = &'a TopicFilter>,
107    ) {
108        // keep client operation atomic
109        let _guard = self.next_client_id.lock();
110
111        if let Some((_, client_identifier)) = self.client_id_map.remove(&client_id) {
112            self.client_identifier_map.remove(&client_identifier);
113        }
114        self.clients.remove(&client_id);
115        for filter in subscribes {
116            self.route_table.unsubscribe(filter, client_id);
117        }
118    }
119
120    pub fn subscribe(&self, filter: &TopicFilter, id: ClientId, qos: QualityOfService) {
121        self.route_table.subscribe(filter, id, qos);
122    }
123
124    pub fn unsubscribe(&self, filter: &TopicFilter, id: ClientId) {
125        self.route_table.unsubscribe(filter, id);
126    }
127
128    pub fn get_outgoing_sender(&self, client_id: &ClientId) -> Option<mpsc::Sender<Outgoing>> {
129        self.clients.get(client_id).map(|s| s.value().clone())
130    }
131
132    pub fn retain_table(&self) -> &RetainTable {
133        &self.retain_table
134    }
135
136    pub fn route_table(&self) -> &RouteTable {
137        &self.route_table
138    }
139}