simple_pub_sub/
topics.rs

1use crate::message::Msg;
2use crate::PktType;
3use log::{error, info, trace};
4use serde_json::json;
5use std::collections::{BTreeMap, HashMap};
6use tokio;
7use tokio::sync::broadcast::Sender;
8
9type ClientChannelMap = HashMap<String, Sender<Msg>>;
10
11/// The `TopicMap` struct is used to store the channels for a given topic.
12#[derive(Debug, Clone)]
13pub struct TopicMap {
14    pub map: BTreeMap<String, ClientChannelMap>,
15}
16impl TopicMap {
17    /// Returns the number of connected clients for a given topic.
18    fn query(&self, topic: String) -> String {
19        let v: Vec<String>;
20        if topic == "*" {
21            v = self
22                .map
23                .iter()
24                .map(|(k, v)| format!("{}: {}", k, v.len()))
25                .collect();
26            json!({topic: v}).to_string()
27        } else {
28            let clients = match self.map.get(&topic) {
29                Some(clients) => clients.len(),
30                None => 0,
31            };
32            v = vec![format!("{}", clients)];
33            json!({topic: v}).to_string()
34        }
35    }
36    /// Adds a channel to the map.
37    fn add_channel(&mut self, topic: String, client_id: String, channel: Sender<Msg>) {
38        if self.map.contains_key(&topic.clone()) {
39            if let Some(channels) = self.map.get_mut(&topic.clone()) {
40                channels.entry(client_id).or_insert(channel);
41                // Not sure if the channel should be replaced if the key is already present.
42            }
43        } else {
44            let mut client_map = ClientChannelMap::new();
45            client_map.insert(client_id, channel);
46            self.map.insert(topic, client_map);
47        }
48    }
49    /// Removes a channel from the map.
50    fn remove_channel(&mut self, topic: String, client_id: String) {
51        if self.map.contains_key(&topic) {
52            if let Some(channels) = self.map.get_mut(&topic) {
53                channels.remove(&client_id);
54            }
55            trace!("Channels: {:?}", self.map);
56        }
57    }
58
59    /// Publishes the message to the channels.
60    async fn publish(&mut self, msg: Msg) {
61        if !self.map.contains_key(&msg.topic) {
62            return;
63        }
64        let topic = msg.topic.clone();
65
66        if let Some(channels) = self.map.get_mut(&topic.clone()) {
67            let dead_channels = channels
68                .iter()
69                .map(|(client_id, channel)| {
70                    info!("Sending msg to the {}", client_id);
71                    match channel.send(msg.clone()) {
72                        Ok(_n) => "".to_string(),
73                        Err(e) => {
74                            error!(
75                                "Error occurred: {} while sending the message to the channel {}",
76                                e.to_string(),
77                                client_id
78                            );
79                            error!("Cleaning up");
80                            client_id.clone()
81                        }
82                    }
83                })
84                .collect::<Vec<_>>();
85            info!("Dead_channels: {:?}", dead_channels);
86            let _ = dead_channels
87                .iter()
88                .map(|client_id| {
89                    self.remove_channel(topic.clone(), client_id.clone());
90                })
91                .collect::<Vec<_>>();
92        }
93    }
94}
95
96/// returns a global broadcaster.
97pub(crate) fn get_global_broadcaster(capacity: usize) -> tokio::sync::broadcast::Sender<Msg> {
98    info!("Creating broadcast channel");
99    let (glob_tx, _) = tokio::sync::broadcast::channel(capacity);
100    glob_tx
101}
102
103/// Handles the incoming and out-going messages for each topic.
104pub(crate) async fn topic_manager(chan: Sender<Msg>) {
105    // NOTE: this MSG must always have the client_id and channel
106    // it should not be None
107    let mut map: TopicMap = TopicMap {
108        map: BTreeMap::new(),
109    };
110    let mut rx = chan.subscribe();
111    loop {
112        match rx.recv().await {
113            Ok(msg) => {
114                if !msg.topic.is_empty() {
115                    info!("Topic received: {}", msg.topic);
116                    match msg.header.pkt_type {
117                        PktType::PUBLISH => {
118                            trace!("Publishing to map:{:?}", map);
119                            map.publish(msg).await;
120                        }
121                        PktType::SUBSCRIBE => {
122                            map.add_channel(
123                                msg.topic,
124                                msg.client_id.unwrap(),
125                                msg.channel.unwrap(),
126                            );
127                            trace!("Map: {:?}", map);
128                        }
129                        PktType::UNSUBSCRIBE => {
130                            info!("Unsubscribing:");
131                            map.remove_channel(msg.topic, msg.client_id.unwrap());
132                        }
133                        PktType::QUERY => {
134                            info!("Querying");
135                            let query_resp = map.query(msg.topic.clone());
136                            info!("Query_resp: {}", query_resp.clone());
137                            let resp_msg = match msg.response_msg(query_resp.into_bytes()) {
138                                Ok(rm) => rm,
139                                Err(e) => {
140                                    error!(
141                                        "Error while getting the response to the query message: {}",
142                                        e.to_string()
143                                    );
144                                    continue;
145                                }
146                            };
147                            info!("Generated query resp: {:?}", resp_msg);
148                            match msg.channel.unwrap().send(resp_msg) {
149                                Ok(n) => n,
150                                Err(e) => {
151                                    error!(
152                                        "Error while sending the query response: {}",
153                                        e.to_string()
154                                    );
155                                    0
156                                }
157                            };
158                        }
159                        _ => {}
160                    };
161                }
162            }
163            Err(e) => {
164                error!(
165                    "Error occurred while receiving the topic: {}",
166                    e.to_string()
167                );
168                // "".to_string()
169            }
170        };
171    }
172}