p2panda_sync/manager/
session_map.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Maintain mapping of generic topics to session ids and of session ids to a channel sender.
4use std::collections::{HashMap, HashSet};
5use std::hash::Hash;
6
7/// Map of generic topics to session ids and of session ids to a channel sender.
8#[derive(Clone, Debug)]
9pub struct SessionTopicMap<T, TX> {
10    pub(crate) session_tx_map: HashMap<u64, TX>,
11    pub(crate) session_topic_map: HashMap<u64, T>,
12    pub(crate) topic_session_map: HashMap<T, HashSet<u64>>,
13}
14
15impl<T, TX> Default for SessionTopicMap<T, TX> {
16    fn default() -> Self {
17        Self {
18            session_tx_map: Default::default(),
19            session_topic_map: Default::default(),
20            topic_session_map: Default::default(),
21        }
22    }
23}
24
25impl<T, TX> SessionTopicMap<T, TX>
26where
27    T: Clone + Hash + Eq,
28{
29    /// Insert a session id with their topic and tx channel.
30    pub fn insert_with_topic(&mut self, session_id: u64, topic: T, tx: TX) {
31        self.session_topic_map.insert(session_id, topic.clone());
32        self.topic_session_map
33            .entry(topic.clone())
34            .and_modify(|sessions| {
35                sessions.insert(session_id);
36            })
37            .or_insert(HashSet::from_iter([session_id]));
38        self.session_tx_map.insert(session_id, tx);
39    }
40
41    /// Drop a session from all mappings.
42    ///
43    /// Returns true if the session existed and was dropped, otherwise returns false when the
44    /// session was known
45    pub fn drop(&mut self, session_id: u64) -> bool {
46        let Some(topic) = self.session_topic_map.remove(&session_id) else {
47            return false;
48        };
49        self.topic_session_map
50            .entry(topic.clone())
51            .and_modify(|sessions| {
52                sessions.remove(&session_id);
53            });
54        self.session_tx_map.remove(&session_id);
55        true
56    }
57
58    /// Get the topic for a session id.
59    ///
60    /// Returns None of the session id was not known.
61    pub fn topic(&self, session_id: u64) -> Option<&T> {
62        self.session_topic_map.get(&session_id)
63    }
64
65    /// Get ids for all sessions associated with the given topic.
66    pub fn sessions(&self, topic: &T) -> HashSet<u64> {
67        self.topic_session_map
68            .get(topic)
69            .cloned()
70            .unwrap_or_default()
71    }
72
73    /// Get a reference to a session sender.
74    pub fn sender(&self, session_id: u64) -> Option<&TX> {
75        self.session_tx_map.get(&session_id)
76    }
77
78    /// Get a mutable reference to a session sender.
79    pub fn sender_mut(&mut self, session_id: u64) -> Option<&mut TX> {
80        self.session_tx_map.get_mut(&session_id)
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use std::collections::HashSet;
87
88    use futures::channel::mpsc;
89
90    use crate::manager::SessionTopicMap;
91    use crate::test_utils::TestTopic;
92
93    const SESSION1: u64 = 1;
94    const SESSION2: u64 = 2;
95    const SESSION3: u64 = 3;
96
97    const TOPIC_A: &str = "cats";
98    const TOPIC_B: &str = "dogs";
99
100    #[test]
101    fn default_is_empty() {
102        let map: SessionTopicMap<TestTopic, ()> = SessionTopicMap::default();
103        assert!(map.session_tx_map.is_empty());
104        assert!(map.session_topic_map.is_empty());
105        assert!(map.topic_session_map.is_empty());
106    }
107
108    #[test]
109    fn insert_with_topic() {
110        let (tx, _rx) = mpsc::channel::<()>(128);
111        let mut map = SessionTopicMap::default();
112
113        map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx.clone());
114
115        // Check session→topic mapping
116        assert_eq!(map.topic(SESSION1), Some(&TestTopic::new(TOPIC_A)));
117
118        // Check topic→session mapping
119        assert_eq!(
120            map.sessions(&TestTopic::new(TOPIC_A)),
121            HashSet::from_iter([SESSION1])
122        );
123
124        // Channel should be retrievable
125        assert!(map.sender(SESSION1).is_some());
126    }
127
128    #[test]
129    fn drop_session() {
130        let (tx, _rx) = mpsc::channel::<()>(128);
131        let mut map = SessionTopicMap::default();
132
133        map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx.clone());
134        map.insert_with_topic(SESSION2, TestTopic::new(TOPIC_A), tx.clone());
135        map.insert_with_topic(SESSION3, TestTopic::new(TOPIC_B), tx);
136
137        // Drop one from topic A
138        assert!(map.drop(SESSION1));
139
140        // Should be removed from all mappings
141        assert!(map.topic(SESSION1).is_none());
142        assert!(!map.session_tx_map.contains_key(&SESSION1));
143
144        // Remaining sessions for topic A
145        let sessions = map.sessions(&TestTopic::new(TOPIC_A));
146        assert_eq!(sessions, HashSet::from([SESSION2]));
147
148        // Dropping a non-existent session returns false
149        assert!(!map.drop(10));
150    }
151
152    #[test]
153    fn insert_multiple_sessions_same_topic() {
154        let (tx1, _rx1) = mpsc::channel::<()>(128);
155        let (tx2, _rx2) = mpsc::channel::<()>(128);
156        let mut map = SessionTopicMap::default();
157
158        map.insert_with_topic(SESSION1, TestTopic::new(TOPIC_A), tx1);
159        map.insert_with_topic(SESSION2, TestTopic::new(TOPIC_A), tx2);
160
161        let sessions = map.sessions(&TestTopic::new(TOPIC_A));
162        assert_eq!(sessions, HashSet::from([SESSION1, SESSION2]));
163    }
164}