mesquitte_core/server/
state.rs1use dashmap::DashMap;
2use mqtt_codec_kit::common::{QualityOfService, TopicFilter};
3use parking_lot::Mutex;
4use tokio::sync::mpsc::{self, channel};
5
6use crate::types::{
7 client::{AddClientReceipt, ClientId},
8 error::Error,
9 outgoing::Outgoing,
10 retain_table::RetainTable,
11 topic_router::RouteTable,
12};
13
14#[derive(Default)]
15pub struct GlobalState {
16 next_client_id: Mutex<u64>,
35
36 client_id_map: DashMap<ClientId, String, ahash::RandomState>,
38 client_identifier_map: DashMap<String, ClientId, ahash::RandomState>,
40 clients: DashMap<ClientId, mpsc::Sender<Outgoing>, ahash::RandomState>,
41
42 route_table: RouteTable,
43 retain_table: RetainTable,
44}
45
46impl GlobalState {
47 pub fn new() -> Self {
48 Self {
49 ..Default::default()
50 }
51 }
52
53 fn renew_client(&self, client_identifier: &str, sender: mpsc::Sender<Outgoing>) -> ClientId {
54 let mut next_client_id = self.next_client_id.lock();
55
56 let client_id = (*next_client_id).into();
57
58 self.client_identifier_map
59 .insert(client_identifier.to_string(), client_id);
60 self.client_id_map
61 .insert(client_id, client_identifier.to_string());
62 self.clients.insert(client_id, sender);
63
64 *next_client_id += 1;
65
66 client_id
67 }
68
69 pub async fn add_client(
70 &self,
71 client_identifier: &str,
72 sender: mpsc::Sender<Outgoing>,
73 ) -> Result<AddClientReceipt, Error> {
74 let client_id_opt: Option<ClientId> = self
75 .client_identifier_map
76 .get(client_identifier)
77 .map(|pair| *pair.value());
78
79 if let Some(client_id) = client_id_opt {
81 if let Some(old_sender) = self.get_outgoing_sender(&client_id) {
82 if !old_sender.is_closed() {
83 let (control_sender, mut control_receiver) = channel(1);
84 if let Err(err) = old_sender.send(Outgoing::Online(control_sender)).await {
85 log::error!("global state add client: {err}");
86 return Err(Error::SendOutgoing(err));
87 }
88 return match control_receiver.recv().await {
89 Some(state) => {
90 let client_id = self.renew_client(client_identifier, sender);
91 Ok(AddClientReceipt::Present(client_id, state))
92 }
93 None => Err(Error::EmptySessionState),
94 };
95 }
96 }
97 }
98 Ok(AddClientReceipt::New(
99 self.renew_client(client_identifier, sender),
100 ))
101 }
102
103 pub fn remove_client<'a>(
104 &self,
105 client_id: ClientId,
106 subscribes: impl IntoIterator<Item = &'a TopicFilter>,
107 ) {
108 let _guard = self.next_client_id.lock();
110
111 if let Some((_, client_identifier)) = self.client_id_map.remove(&client_id) {
112 self.client_identifier_map.remove(&client_identifier);
113 }
114 self.clients.remove(&client_id);
115 for filter in subscribes {
116 self.route_table.unsubscribe(filter, client_id);
117 }
118 }
119
120 pub fn subscribe(&self, filter: &TopicFilter, id: ClientId, qos: QualityOfService) {
121 self.route_table.subscribe(filter, id, qos);
122 }
123
124 pub fn unsubscribe(&self, filter: &TopicFilter, id: ClientId) {
125 self.route_table.unsubscribe(filter, id);
126 }
127
128 pub fn get_outgoing_sender(&self, client_id: &ClientId) -> Option<mpsc::Sender<Outgoing>> {
129 self.clients.get(client_id).map(|s| s.value().clone())
130 }
131
132 pub fn retain_table(&self) -> &RetainTable {
133 &self.retain_table
134 }
135
136 pub fn route_table(&self) -> &RouteTable {
137 &self.route_table
138 }
139}