use std::collections::{HashMap, HashSet};
pub const PRESENCE_STREAM_STATE: &str = "$p";
pub const WHISPER_STREAM_STATE: &str = "$w";
#[derive(Debug)]
#[allow(dead_code)]
pub struct Session {
pub conn_id: u32,
pub path: String,
pub remote_addr: String,
pub headers: HashMap<String, String>,
pub identifier: Option<String>,
pub connection_identifiers: Option<String>,
pub subscriptions: HashSet<String>,
pub streams: HashSet<String>,
pub subscription_streams: HashMap<String, HashSet<String>>,
pub cstate: HashMap<String, String>,
pub istate: HashMap<String, HashMap<String, String>>,
pub presence_streams: HashMap<String, String>,
pub whisper_streams: HashMap<String, String>,
}
#[allow(dead_code)]
impl Session {
pub fn new(
conn_id: u32,
path: String,
remote_addr: String,
headers: HashMap<String, String>,
) -> Self {
Self {
conn_id,
path,
remote_addr,
headers,
identifier: None,
connection_identifiers: None,
subscriptions: HashSet::new(),
streams: HashSet::new(),
subscription_streams: HashMap::new(),
cstate: HashMap::new(),
istate: HashMap::new(),
presence_streams: HashMap::new(),
whisper_streams: HashMap::new(),
}
}
pub fn is_authenticated(&self) -> bool {
self.connection_identifiers.is_some()
}
pub fn set_connection_identifiers(&mut self, identifiers: String) {
self.connection_identifiers = Some(identifiers);
}
pub fn set_identifier(&mut self, identifier: String) {
self.identifier = Some(identifier);
}
pub fn subscribe(&mut self, identifier: String) {
self.subscriptions.insert(identifier);
}
pub fn unsubscribe(&mut self, identifier: &str) -> bool {
self.subscriptions.remove(identifier)
}
pub fn subscribe_stream(&mut self, stream: String) {
self.streams.insert(stream);
}
pub fn unsubscribe_stream(&mut self, stream: &str) -> bool {
self.streams.remove(stream)
}
pub fn is_subscribed_to_stream(&self, stream: &str) -> bool {
self.streams.contains(stream)
}
pub fn set_cstate(&mut self, cstate: HashMap<String, String>) {
self.cstate = cstate;
}
pub fn set_istate(&mut self, identifier: &str, istate: HashMap<String, String>) {
self.istate.insert(identifier.to_string(), istate);
}
pub fn istate_for(&self, identifier: &str) -> HashMap<String, String> {
self.istate
.get(identifier)
.cloned()
.unwrap_or_else(HashMap::new)
}
pub fn istate_encoded(&self) -> HashMap<String, String> {
let mut encoded = HashMap::new();
for (identifier, state) in &self.istate {
if let Ok(json) = serde_json::to_string(state) {
encoded.insert(identifier.clone(), json);
}
}
encoded
}
pub fn add_subscription_streams(
&mut self,
identifier: &str,
streams: &[String],
) -> Vec<String> {
let entry = self
.subscription_streams
.entry(identifier.to_string())
.or_default();
let mut added = Vec::new();
for stream in streams {
if entry.insert(stream.clone()) && self.streams.insert(stream.clone()) {
added.push(stream.clone());
}
}
added
}
pub fn remove_subscription_streams(
&mut self,
identifier: &str,
streams: &[String],
) -> Vec<String> {
let mut candidates = Vec::new();
if let Some(entry) = self.subscription_streams.get_mut(identifier) {
for stream in streams {
if entry.remove(stream) {
candidates.push(stream.clone());
}
}
if entry.is_empty() {
self.subscription_streams.remove(identifier);
}
}
self.prune_streams(candidates)
}
pub fn remove_all_subscription_streams(&mut self, identifier: &str) -> Vec<String> {
let candidates = self
.subscription_streams
.remove(identifier)
.map(|streams| streams.into_iter().collect())
.unwrap_or_default();
self.prune_streams(candidates)
}
fn prune_streams(&mut self, streams: Vec<String>) -> Vec<String> {
let mut removed = Vec::new();
for stream in streams {
let still_used = self
.subscription_streams
.values()
.any(|set| set.contains(&stream));
if !still_used && self.streams.remove(&stream) {
removed.push(stream);
}
}
removed
}
pub fn get_header(&self, name: &str) -> Option<&str> {
let name_lower = name.to_lowercase();
self.headers
.iter()
.find(|(k, _)| k.to_lowercase() == name_lower)
.map(|(_, v)| v.as_str())
}
pub fn set_presence_stream(&mut self, identifier: &str, stream: String) {
self.presence_streams.insert(identifier.to_string(), stream);
}
pub fn get_presence_stream(&self, identifier: &str) -> Option<&str> {
self.presence_streams.get(identifier).map(|s| s.as_str())
}
pub fn remove_presence_stream(&mut self, identifier: &str) -> Option<String> {
self.presence_streams.remove(identifier)
}
pub fn all_presence_streams(&self) -> Vec<String> {
self.presence_streams.values().cloned().collect()
}
pub fn set_whisper_stream(&mut self, identifier: &str, stream: String) {
self.whisper_streams.insert(identifier.to_string(), stream);
}
pub fn get_whisper_stream(&self, identifier: &str) -> Option<&str> {
self.whisper_streams.get(identifier).map(|s| s.as_str())
}
pub fn remove_whisper_stream(&mut self, identifier: &str) -> Option<String> {
self.whisper_streams.remove(identifier)
}
}