1use crate::message::Msg;
2use crate::PktType;
3use log::{error, info, trace};
4use serde_json::json;
5use std::collections::{BTreeMap, HashMap};
6use tokio;
7use tokio::sync::broadcast::Sender;
8
9type ClientChannelMap = HashMap<String, Sender<Msg>>;
10
11#[derive(Debug, Clone)]
13pub struct TopicMap {
14 pub map: BTreeMap<String, ClientChannelMap>,
15}
16impl TopicMap {
17 fn query(&self, topic: String) -> String {
19 let v: Vec<String>;
20 if topic == "*" {
21 v = self
22 .map
23 .iter()
24 .map(|(k, v)| format!("{}: {}", k, v.len()))
25 .collect();
26 json!({topic: v}).to_string()
27 } else {
28 let clients = match self.map.get(&topic) {
29 Some(clients) => clients.len(),
30 None => 0,
31 };
32 v = vec![format!("{}", clients)];
33 json!({topic: v}).to_string()
34 }
35 }
36 fn add_channel(&mut self, topic: String, client_id: String, channel: Sender<Msg>) {
38 if self.map.contains_key(&topic.clone()) {
39 if let Some(channels) = self.map.get_mut(&topic.clone()) {
40 channels.entry(client_id).or_insert(channel);
41 }
43 } else {
44 let mut client_map = ClientChannelMap::new();
45 client_map.insert(client_id, channel);
46 self.map.insert(topic, client_map);
47 }
48 }
49 fn remove_channel(&mut self, topic: String, client_id: String) {
51 if self.map.contains_key(&topic) {
52 if let Some(channels) = self.map.get_mut(&topic) {
53 channels.remove(&client_id);
54 }
55 trace!("Channels: {:?}", self.map);
56 }
57 }
58
59 async fn publish(&mut self, msg: Msg) {
61 if !self.map.contains_key(&msg.topic) {
62 return;
63 }
64 let topic = msg.topic.clone();
65
66 if let Some(channels) = self.map.get_mut(&topic.clone()) {
67 let dead_channels = channels
68 .iter()
69 .map(|(client_id, channel)| {
70 info!("Sending msg to the {}", client_id);
71 match channel.send(msg.clone()) {
72 Ok(_n) => "".to_string(),
73 Err(e) => {
74 error!(
75 "Error occurred: {} while sending the message to the channel {}",
76 e.to_string(),
77 client_id
78 );
79 error!("Cleaning up");
80 client_id.clone()
81 }
82 }
83 })
84 .collect::<Vec<_>>();
85 info!("Dead_channels: {:?}", dead_channels);
86 let _ = dead_channels
87 .iter()
88 .map(|client_id| {
89 self.remove_channel(topic.clone(), client_id.clone());
90 })
91 .collect::<Vec<_>>();
92 }
93 }
94}
95
96pub(crate) fn get_global_broadcaster(capacity: usize) -> tokio::sync::broadcast::Sender<Msg> {
98 info!("Creating broadcast channel");
99 let (glob_tx, _) = tokio::sync::broadcast::channel(capacity);
100 glob_tx
101}
102
103pub(crate) async fn topic_manager(chan: Sender<Msg>) {
105 let mut map: TopicMap = TopicMap {
108 map: BTreeMap::new(),
109 };
110 let mut rx = chan.subscribe();
111 loop {
112 match rx.recv().await {
113 Ok(msg) => {
114 if !msg.topic.is_empty() {
115 info!("Topic received: {}", msg.topic);
116 match msg.header.pkt_type {
117 PktType::PUBLISH => {
118 trace!("Publishing to map:{:?}", map);
119 map.publish(msg).await;
120 }
121 PktType::SUBSCRIBE => {
122 map.add_channel(
123 msg.topic,
124 msg.client_id.unwrap(),
125 msg.channel.unwrap(),
126 );
127 trace!("Map: {:?}", map);
128 }
129 PktType::UNSUBSCRIBE => {
130 info!("Unsubscribing:");
131 map.remove_channel(msg.topic, msg.client_id.unwrap());
132 }
133 PktType::QUERY => {
134 info!("Querying");
135 let query_resp = map.query(msg.topic.clone());
136 info!("Query_resp: {}", query_resp.clone());
137 let resp_msg = match msg.response_msg(query_resp.into_bytes()) {
138 Ok(rm) => rm,
139 Err(e) => {
140 error!(
141 "Error while getting the response to the query message: {}",
142 e.to_string()
143 );
144 continue;
145 }
146 };
147 info!("Generated query resp: {:?}", resp_msg);
148 match msg.channel.unwrap().send(resp_msg) {
149 Ok(n) => n,
150 Err(e) => {
151 error!(
152 "Error while sending the query response: {}",
153 e.to_string()
154 );
155 0
156 }
157 };
158 }
159 _ => {}
160 };
161 }
162 }
163 Err(e) => {
164 error!(
165 "Error occurred while receiving the topic: {}",
166 e.to_string()
167 );
168 }
170 };
171 }
172}