revolt_presence/
lib.rs

1#[macro_use]
2extern crate log;
3
4use once_cell::sync::Lazy;
5use rand::Rng;
6use redis_kiss::{get_connection, AsyncCommands};
7use std::collections::HashSet;
8
9mod operations;
10use operations::{
11    __add_to_set_string, __add_to_set_u32, __delete_key, __get_set_members_as_string,
12    __get_set_size, __remove_from_set_string, __remove_from_set_u32,
13};
14
15pub static REGION_ID: Lazy<u16> = Lazy::new(|| {
16    std::env::var("REGION_ID")
17        .unwrap_or_else(|_| "0".to_string())
18        .parse()
19        .unwrap()
20});
21
22pub static REGION_KEY: Lazy<String> = Lazy::new(|| format!("region{}", &*REGION_ID));
23pub static ONLINE_SET: &str = "online";
24
25pub static FLAG_BITS: u32 = 0b1;
26
27/// Create a new presence session, returns the ID of this session
28pub async fn create_session(user_id: &str, flags: u8) -> (bool, u32) {
29    info!("Creating a presence session for {user_id} with flags {flags}");
30
31    if let Ok(mut conn) = get_connection().await {
32        // Check whether this is the first session
33        let was_empty = __get_set_size(&mut conn, user_id).await == 0;
34
35        // A session ID is comprised of random data and any flags ORed to the end
36        let session_id = {
37            let mut rng = rand::thread_rng();
38            (rng.gen::<u32>() & !FLAG_BITS) | (flags as u32 & FLAG_BITS)
39        };
40
41        // Add session to user's sessions and to the region
42        __add_to_set_u32(&mut conn, user_id, session_id).await;
43        __add_to_set_string(&mut conn, ONLINE_SET, user_id).await;
44        __add_to_set_string(&mut conn, &REGION_KEY, &format!("{user_id}:{session_id}")).await;
45        info!("Created session for {user_id}, assigned them a session ID of {session_id}.");
46
47        (was_empty, session_id)
48    } else {
49        // Fail through
50        (false, 0)
51    }
52}
53
54/// Delete existing presence session
55pub async fn delete_session(user_id: &str, session_id: u32) -> bool {
56    delete_session_internal(user_id, session_id, false).await
57}
58
59/// Delete existing presence session (but also choose whether to skip region)
60async fn delete_session_internal(user_id: &str, session_id: u32, skip_region: bool) -> bool {
61    info!("Deleting presence session for {user_id} with id {session_id}");
62
63    if let Ok(mut conn) = get_connection().await {
64        // Remove the session
65        __remove_from_set_u32(&mut conn, user_id, session_id).await;
66
67        // Remove from the region
68        if !skip_region {
69            __remove_from_set_string(&mut conn, &REGION_KEY, &format!("{user_id}:{session_id}"))
70                .await;
71        }
72
73        // Return whether this was the last session
74        let is_empty = __get_set_size(&mut conn, user_id).await == 0;
75        if is_empty {
76            __remove_from_set_string(&mut conn, ONLINE_SET, user_id).await;
77            info!("User ID {} just went offline.", &user_id);
78        }
79
80        is_empty
81    } else {
82        // Fail through
83        false
84    }
85}
86
87/// Check whether a given user ID is online
88pub async fn is_online(user_id: &str) -> bool {
89    if let Ok(mut conn) = get_connection().await {
90        conn.exists(user_id).await.unwrap_or(false)
91    } else {
92        false
93    }
94}
95
96/// Check whether a set of users is online, returns a set of the online user IDs
97#[cfg(feature = "redis-is-patched")]
98pub async fn filter_online(user_ids: &'_ [String]) -> HashSet<String> {
99    // Ignore empty list immediately, to save time.
100    let mut set = HashSet::new();
101    if user_ids.is_empty() {
102        return set;
103    }
104
105    // NOTE: at the point that we need mobile indicators
106    // you can interpret the data here and return a new data
107    // structure like HashMap<String /* id */, u8 /* flags */>
108
109    // We need to handle a special case where only one is present
110    // as for some reason or another, Redis does not like us sending
111    // a list of just one ID to the server.
112    if user_ids.len() == 1 {
113        if is_online(&user_ids[0]).await {
114            set.insert(user_ids[0].to_string());
115        }
116
117        return set;
118    }
119
120    // Otherwise, go ahead as normal.
121    if let Ok(mut conn) = get_connection().await {
122        // Ok so, if this breaks, that means we've lost the Redis patch which adds SMISMEMBER
123        // Currently it's patched in through a forked repository, investigate what happen to it
124        let data: Vec<bool> = conn
125            .smismember(ONLINE_SET, user_ids)
126            .await
127            .expect("this shouldn't happen, please read this code! presence/mod.rs");
128
129        if data.is_empty() {
130            return set;
131        }
132
133        // We filter known values to figure out who is online.
134        for i in 0..user_ids.len() {
135            if data[i] {
136                set.insert(user_ids[i].to_string());
137            }
138        }
139    }
140
141    set
142}
143
144/// Check whether a set of users is online, returns a set of the online user IDs
145#[cfg(not(feature = "redis-is-patched"))]
146pub async fn filter_online(user_ids: &'_ [String]) -> HashSet<String> {
147    if user_ids.is_empty() {
148        HashSet::new()
149    } else if let Ok(mut conn) = get_connection().await {
150        let members: Vec<String> = conn.smembers(ONLINE_SET).await.unwrap_or_default();
151        let members: HashSet<&String> = members.iter().collect();
152        let user_ids: HashSet<&String> = user_ids.iter().collect();
153
154        members
155            .intersection(&user_ids)
156            .map(|x| x.to_string())
157            .collect()
158    } else {
159        HashSet::new()
160    }
161}
162
163/// Reset any stale presence data
164pub async fn clear_region(region_id: Option<&str>) {
165    let region_id = region_id.unwrap_or(&*REGION_KEY);
166    let mut conn = get_connection().await.expect("Redis connection");
167
168    let sessions = __get_set_members_as_string(&mut conn, region_id).await;
169    if !sessions.is_empty() {
170        info!(
171            "Cleaning up {} sessions, this may take a while...",
172            sessions.len()
173        );
174
175        // Iterate and delete each session, this will
176        // also send out any relevant events.
177        for session in sessions {
178            let parts = session.split(':').collect::<Vec<&str>>();
179            if let (Some(user_id), Some(session_id)) = (parts.first(), parts.get(1)) {
180                if let Ok(session_id) = session_id.parse() {
181                    delete_session_internal(user_id, session_id, true).await;
182                }
183            }
184        }
185
186        // Then clear the set in Redis.
187        __delete_key(&mut conn, region_id).await;
188
189        info!("Clean up complete.");
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use crate::{clear_region, create_session, delete_session, filter_online, is_online};
196    use rand::Rng;
197
198    #[async_std::test]
199    async fn it_works() {
200        revolt_config::config().await;
201
202        // Clear the region before we start the tests:
203        clear_region(None).await;
204
205        // Generate some data we'll use:
206        let user_id = rand::thread_rng().gen::<u32>().to_string();
207        let other_id = rand::thread_rng().gen::<u32>().to_string();
208        let flags = 1;
209
210        // Create a session
211        let (first_session, session_id) = create_session(&user_id, flags).await;
212        assert!(first_session);
213        assert_ne!(session_id, 0);
214        assert_eq!(session_id as u8 & flags, flags);
215
216        // Check if the user is online
217        assert!(is_online(&user_id).await);
218
219        let user_ids = filter_online(&[user_id.to_string()]).await;
220        assert_eq!(user_ids.len(), 1);
221        assert!(user_ids.contains(&user_id));
222
223        // Create a few more sessions
224        let (first_session, second_session_id) = create_session(&user_id, 0).await;
225        assert!(!first_session);
226        assert_eq!(second_session_id as u8 & 1, 0);
227
228        let (first_session, other_session_id) = create_session(&other_id, 0).await;
229        assert!(first_session);
230
231        let user_ids = filter_online(&[user_id.to_string(), other_id.to_string()]).await;
232        assert_eq!(user_ids.len(), 2);
233        assert!(user_ids.contains(&user_id));
234        assert!(user_ids.contains(&other_id));
235
236        // Remove sessions
237        delete_session(&user_id, session_id).await;
238        delete_session(&other_id, other_session_id).await;
239        assert!(!is_online(&other_id).await);
240
241        // Check if we can wipe everything too
242        clear_region(None).await;
243        assert!(!is_online(&user_id).await);
244
245        let user_ids = filter_online(&[user_id.to_string(), other_id.to_string()]).await;
246        assert!(user_ids.is_empty())
247    }
248}