p2panda_sync/manager/
session_map.rs1use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6
7#[derive(Clone, Debug)]
9pub struct SessionTopicMap<T, TX> {
10 pub(crate) session_tx_map: HashMap<u64, TX>,
11 pub(crate) session_topic_map: HashMap<u64, T>,
12 pub(crate) topic_session_map: HashMap<T, HashSet<u64>>,
13}
14
15impl<T, TX> Default for SessionTopicMap<T, TX> {
16 fn default() -> Self {
17 Self {
18 session_tx_map: Default::default(),
19 session_topic_map: Default::default(),
20 topic_session_map: Default::default(),
21 }
22 }
23}
24
25impl<T, TX> SessionTopicMap<T, TX>
26where
27 T: Clone + Hash + Eq,
28{
29 pub fn insert_with_topic(&mut self, session_id: u64, topic: T, tx: TX) {
31 self.session_topic_map.insert(session_id, topic.clone());
32 self.topic_session_map
33 .entry(topic.clone())
34 .and_modify(|sessions| {
35 sessions.insert(session_id);
36 })
37 .or_insert(HashSet::from_iter([session_id]));
38 self.session_tx_map.insert(session_id, tx);
39 }
40
41 pub fn drop(&mut self, session_id: u64) -> bool {
46 let Some(topic) = self.session_topic_map.remove(&session_id) else {
47 return false;
48 };
49 self.topic_session_map
50 .entry(topic.clone())
51 .and_modify(|sessions| {
52 sessions.remove(&session_id);
53 });
54 self.session_tx_map.remove(&session_id);
55 true
56 }
57
58 pub fn topic(&self, session_id: u64) -> Option<&T> {
62 self.session_topic_map.get(&session_id)
63 }
64
65 pub fn sessions(&self, topic: &T) -> HashSet<u64> {
67 self.topic_session_map
68 .get(topic)
69 .cloned()
70 .unwrap_or_default()
71 }
72
73 pub fn sender(&self, session_id: u64) -> Option<&TX> {
75 self.session_tx_map.get(&session_id)
76 }
77
78 pub fn sender_mut(&mut self, session_id: u64) -> Option<&mut TX> {
80 self.session_tx_map.get_mut(&session_id)
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use std::collections::HashSet;
87
88 use futures::channel::mpsc;
89
90 use crate::manager::SessionTopicMap;
91 use crate::test_utils::TestTopic;
92
93 const SESSION1: u64 = 1;
94 const SESSION2: u64 = 2;
95 const SESSION3: u64 = 3;
96
97 const TOPIC_A: &str = "cats";
98 const TOPIC_B: &str = "dogs";
99
100 #[test]
101 fn default_is_empty() {
102 let map: SessionTopicMap<TestTopic, ()> = SessionTopicMap::default();
103 assert!(map.session_tx_map.is_empty());
104 assert!(map.session_topic_map.is_empty());
105 assert!(map.topic_session_map.is_empty());
106 }
107
108 #[test]
109 fn insert_with_topic() {
110 let (tx, _rx) = mpsc::channel::<()>(128);
111 let mut map = SessionTopicMap::default();
112
113 map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx.clone());
114
115 assert_eq!(map.topic(SESSION1), Some(&TestTopic::new(TOPIC_A)));
117
118 assert_eq!(
120 map.sessions(&TestTopic::new(TOPIC_A)),
121 HashSet::from_iter([SESSION1])
122 );
123
124 assert!(map.sender(SESSION1).is_some());
126 }
127
128 #[test]
129 fn drop_session() {
130 let (tx, _rx) = mpsc::channel::<()>(128);
131 let mut map = SessionTopicMap::default();
132
133 map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx.clone());
134 map.insert_with_topic(SESSION2, TestTopic::new(TOPIC_A), tx.clone());
135 map.insert_with_topic(SESSION3, TestTopic::new(TOPIC_B), tx);
136
137 assert!(map.drop(SESSION1));
139
140 assert!(map.topic(SESSION1).is_none());
142 assert!(!map.session_tx_map.contains_key(&SESSION1));
143
144 let sessions = map.sessions(&TestTopic::new(TOPIC_A));
146 assert_eq!(sessions, HashSet::from([SESSION2]));
147
148 assert!(!map.drop(10));
150 }
151
152 #[test]
153 fn insert_multiple_sessions_same_topic() {
154 let (tx1, _rx1) = mpsc::channel::<()>(128);
155 let (tx2, _rx2) = mpsc::channel::<()>(128);
156 let mut map = SessionTopicMap::default();
157
158 map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx1);
159 map.insert_with_topic(SESSION2, TestTopic::new(TOPIC_A), tx2);
160
161 let sessions = map.sessions(&TestTopic::new(TOPIC_A));
162 assert_eq!(sessions, HashSet::from([SESSION1, SESSION2]));
163 }
164}