#[macro_use]
extern crate log;
use once_cell::sync::Lazy;
use rand::Rng;
use redis_kiss::{get_connection, AsyncCommands};
use std::collections::HashSet;
mod operations;
use operations::{
__add_to_set_string, __add_to_set_u32, __delete_key, __get_set_members_as_string,
__get_set_size, __remove_from_set_string, __remove_from_set_u32,
};
pub static REGION_ID: Lazy<u16> = Lazy::new(|| {
std::env::var("REGION_ID")
.unwrap_or_else(|_| "0".to_string())
.parse()
.unwrap()
});
pub static REGION_KEY: Lazy<String> = Lazy::new(|| format!("region{}", &*REGION_ID));
pub static ONLINE_SET: &str = "online";
pub static FLAG_BITS: u32 = 0b1;
pub async fn create_session(user_id: &str, flags: u8) -> (bool, u32) {
info!("Creating a presence session for {user_id} with flags {flags}");
if let Ok(mut conn) = get_connection().await {
let was_empty = __get_set_size(&mut conn, user_id).await == 0;
let session_id = {
let mut rng = rand::thread_rng();
(rng.gen::<u32>() & !FLAG_BITS) | (flags as u32 & FLAG_BITS)
};
__add_to_set_u32(&mut conn, user_id, session_id).await;
__add_to_set_string(&mut conn, ONLINE_SET, user_id).await;
__add_to_set_string(&mut conn, ®ION_KEY, &format!("{user_id}:{session_id}")).await;
info!("Created session for {user_id}, assigned them a session ID of {session_id}.");
(was_empty, session_id)
} else {
(false, 0)
}
}
pub async fn delete_session(user_id: &str, session_id: u32) -> bool {
delete_session_internal(user_id, session_id, false).await
}
async fn delete_session_internal(user_id: &str, session_id: u32, skip_region: bool) -> bool {
info!("Deleting presence session for {user_id} with id {session_id}");
if let Ok(mut conn) = get_connection().await {
__remove_from_set_u32(&mut conn, user_id, session_id).await;
if !skip_region {
__remove_from_set_string(&mut conn, ®ION_KEY, &format!("{user_id}:{session_id}"))
.await;
}
let is_empty = __get_set_size(&mut conn, user_id).await == 0;
if is_empty {
__remove_from_set_string(&mut conn, ONLINE_SET, user_id).await;
info!("User ID {} just went offline.", &user_id);
}
is_empty
} else {
false
}
}
pub async fn is_online(user_id: &str) -> bool {
if let Ok(mut conn) = get_connection().await {
conn.exists(user_id).await.unwrap_or(false)
} else {
false
}
}
#[cfg(feature = "redis-is-patched")]
pub async fn filter_online(user_ids: &'_ [String]) -> HashSet<String> {
let mut set = HashSet::new();
if user_ids.is_empty() {
return set;
}
if user_ids.len() == 1 {
if is_online(&user_ids[0]).await {
set.insert(user_ids[0].to_string());
}
return set;
}
if let Ok(mut conn) = get_connection().await {
let data: Vec<bool> = conn
.smismember(ONLINE_SET, user_ids)
.await
.expect("this shouldn't happen, please read this code! presence/mod.rs");
if data.is_empty() {
return set;
}
for i in 0..user_ids.len() {
if data[i] {
set.insert(user_ids[i].to_string());
}
}
}
set
}
#[cfg(not(feature = "redis-is-patched"))]
pub async fn filter_online(user_ids: &'_ [String]) -> HashSet<String> {
if user_ids.is_empty() {
HashSet::new()
} else if let Ok(mut conn) = get_connection().await {
let members: Vec<String> = conn.smembers(ONLINE_SET).await.unwrap_or_default();
let members: HashSet<&String> = members.iter().collect();
let user_ids: HashSet<&String> = user_ids.iter().collect();
members
.intersection(&user_ids)
.map(|x| x.to_string())
.collect()
} else {
HashSet::new()
}
}
pub async fn clear_region(region_id: Option<&str>) {
let region_id = region_id.unwrap_or(&*REGION_KEY);
let mut conn = get_connection().await.expect("Redis connection");
let sessions = __get_set_members_as_string(&mut conn, region_id).await;
if !sessions.is_empty() {
info!(
"Cleaning up {} sessions, this may take a while...",
sessions.len()
);
for session in sessions {
let parts = session.split(':').collect::<Vec<&str>>();
if let (Some(user_id), Some(session_id)) = (parts.first(), parts.get(1)) {
if let Ok(session_id) = session_id.parse() {
delete_session_internal(user_id, session_id, true).await;
}
}
}
__delete_key(&mut conn, region_id).await;
info!("Clean up complete.");
}
}
#[cfg(test)]
mod tests {
use crate::{clear_region, create_session, delete_session, filter_online, is_online};
use rand::Rng;
#[async_std::test]
async fn it_works() {
clear_region(None).await;
let user_id = rand::thread_rng().gen::<u32>().to_string();
let other_id = rand::thread_rng().gen::<u32>().to_string();
let flags = 1;
let (first_session, session_id) = create_session(&user_id, flags).await;
assert!(first_session);
assert_ne!(session_id, 0);
assert_eq!(session_id as u8 & flags, flags);
assert!(is_online(&user_id).await);
let user_ids = filter_online(&[user_id.to_string()]).await;
assert_eq!(user_ids.len(), 1);
assert!(user_ids.contains(&user_id));
let (first_session, second_session_id) = create_session(&user_id, 0).await;
assert!(!first_session);
assert_eq!(second_session_id as u8 & 1, 0);
let (first_session, other_session_id) = create_session(&other_id, 0).await;
assert!(first_session);
let user_ids = filter_online(&[user_id.to_string(), other_id.to_string()]).await;
assert_eq!(user_ids.len(), 2);
assert!(user_ids.contains(&user_id));
assert!(user_ids.contains(&other_id));
delete_session(&user_id, session_id).await;
delete_session(&other_id, other_session_id).await;
assert!(!is_online(&other_id).await);
clear_region(None).await;
assert!(!is_online(&user_id).await);
let user_ids = filter_online(&[user_id.to_string(), other_id.to_string()]).await;
assert!(user_ids.is_empty())
}
}