use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, RwLock};
use crate::connection::Connection;
use crate::error::{Error, ErrorCode};
use crate::protocol::{Action, ProtocolMessage, WirePresenceMessage};
use crate::rest::{Data, PresenceAction};
use crate::Result;
#[derive(Debug)]
pub(crate) struct PresenceMap {
members: HashMap<String, WirePresenceMessage>,
residual: Option<HashMap<String, WirePresenceMessage>>,
sync_in_progress: bool,
#[allow(dead_code)]
sync_channel_serial: Option<String>,
}
impl PresenceMap {
pub fn new() -> Self {
Self {
members: HashMap::new(),
residual: None,
sync_in_progress: false,
sync_channel_serial: None,
}
}
pub fn start_sync(&mut self) {
self.sync_in_progress = true;
self.residual = Some(self.members.clone());
}
pub fn end_sync(&mut self) -> Vec<WirePresenceMessage> {
self.sync_in_progress = false;
let mut left = Vec::new();
if let Some(residual) = self.residual.take() {
for (key, mut msg) in residual {
if self.members.contains_key(&key) {
continue;
}
msg.action = PresenceAction::Leave;
left.push(msg);
}
}
self.members.retain(|_, m| m.action != PresenceAction::Absent);
left
}
pub fn put(&mut self, msg: WirePresenceMessage) -> bool {
let key = member_key(&msg.client_id, &msg.connection_id);
match msg.action {
PresenceAction::Leave | PresenceAction::Absent => {
if self.sync_in_progress {
if let Some(existing) = self.members.get(&key) {
if !is_newer(&msg, existing) {
return false;
}
}
if let Some(ref mut residual) = self.residual {
residual.remove(&key);
}
self.members.remove(&key);
} else {
if let Some(existing) = self.members.get(&key) {
if !is_newer(&msg, existing) {
return false;
}
}
self.members.remove(&key);
}
true
}
PresenceAction::Enter | PresenceAction::Update | PresenceAction::Present => {
if let Some(existing) = self.members.get(&key) {
if !is_newer(&msg, existing) {
return false;
}
}
if let Some(ref mut residual) = self.residual {
residual.remove(&key);
}
let mut stored = msg;
stored.action = PresenceAction::Present;
self.members.insert(key, stored);
true
}
}
}
pub fn get_members(&self) -> Vec<WirePresenceMessage> {
self.members.values().cloned().collect()
}
pub fn clear(&mut self) {
self.members.clear();
self.residual = None;
self.sync_in_progress = false;
}
}
fn member_key(client_id: &str, connection_id: &str) -> String {
format!("{}:{}", client_id, connection_id)
}
fn is_newer(incoming: &WirePresenceMessage, existing: &WirePresenceMessage) -> bool {
if let (Some(ref inc_id), Some(ref ex_id)) = (&incoming.id, &existing.id) {
if let (Some(inc_parsed), Some(ex_parsed)) = (parse_msg_id(inc_id), parse_msg_id(ex_id)) {
if inc_parsed.0 == ex_parsed.0 {
return (inc_parsed.1, inc_parsed.2) > (ex_parsed.1, ex_parsed.2);
}
}
}
match (incoming.timestamp, existing.timestamp) {
(Some(inc_ts), Some(ex_ts)) => inc_ts >= ex_ts,
_ => true, }
}
fn parse_msg_id(id: &str) -> Option<(&str, u64, u64)> {
let parts: Vec<&str> = id.splitn(3, ':').collect();
if parts.len() == 3 {
let serial = parts[1].parse().ok()?;
let index = parts[2].parse().ok()?;
Some((parts[0], serial, index))
} else {
None
}
}
pub struct RealtimePresence {
channel_name: String,
members: Arc<RwLock<PresenceMap>>,
my_members: Arc<RwLock<HashMap<String, WirePresenceMessage>>>,
event_subs: Arc<Mutex<Vec<mpsc::UnboundedSender<Arc<WirePresenceMessage>>>>>,
connection: Connection,
}
impl RealtimePresence {
pub(crate) fn new(
channel_name: String,
connection: Connection,
) -> Self {
Self {
channel_name,
members: Arc::new(RwLock::new(PresenceMap::new())),
my_members: Arc::new(RwLock::new(HashMap::new())),
event_subs: Arc::new(Mutex::new(Vec::new())),
connection,
}
}
pub async fn enter(&self, data: Option<Data>) -> Result<()> {
let client_id = self.require_client_id().await?;
self.send_presence(PresenceAction::Enter, client_id.clone(), data.clone())
.await?;
let msg = WirePresenceMessage::new(
PresenceAction::Present,
client_id.clone(),
data.unwrap_or(Data::None),
);
self.my_members.write().await.insert(client_id, msg);
Ok(())
}
pub async fn leave(&self, data: Option<Data>) -> Result<()> {
let client_id = self.require_client_id().await?;
self.send_presence(PresenceAction::Leave, client_id.clone(), data)
.await?;
self.my_members.write().await.remove(&client_id);
Ok(())
}
pub async fn update(&self, data: Option<Data>) -> Result<()> {
let client_id = self.require_client_id().await?;
self.send_presence(PresenceAction::Update, client_id.clone(), data.clone())
.await?;
let msg = WirePresenceMessage::new(
PresenceAction::Present,
client_id.clone(),
data.unwrap_or(Data::None),
);
self.my_members.write().await.insert(client_id, msg);
Ok(())
}
pub async fn get(&self) -> Vec<WirePresenceMessage> {
self.members.read().await.get_members()
}
pub fn subscribe(&self) -> mpsc::UnboundedReceiver<Arc<WirePresenceMessage>> {
let (tx, rx) = mpsc::unbounded_channel();
self.event_subs.lock().unwrap().push(tx);
rx
}
fn emit_event(&self, msg: Arc<WirePresenceMessage>) {
let mut subs = self.event_subs.lock().unwrap();
subs.retain(|tx| tx.send(msg.clone()).is_ok());
}
pub(crate) async fn on_attached(&self, has_presence: bool) {
if has_presence {
self.members.write().await.start_sync();
} else {
let existing = self.members.read().await.get_members();
for mut msg in existing {
msg.action = PresenceAction::Leave;
self.emit_event(Arc::new(msg));
}
self.members.write().await.clear();
}
let _ = self.reenter_own_members().await;
}
pub(crate) async fn process_presence(
&self,
messages: Vec<WirePresenceMessage>,
_is_sync: bool,
) {
let mut members = self.members.write().await;
for msg in messages {
if members.put(msg.clone()) {
self.emit_event(Arc::new(msg));
}
}
}
#[allow(dead_code)]
pub(crate) async fn start_sync(&self) {
self.members.write().await.start_sync();
}
pub(crate) async fn process_sync(
&self,
messages: Vec<WirePresenceMessage>,
channel_serial: Option<&str>,
) {
self.process_presence(messages, true).await;
let sync_complete = match channel_serial {
Some(serial) => {
match serial.split_once(':') {
Some((_, cursor)) => cursor.is_empty(),
None => true,
}
}
None => true,
};
if sync_complete {
let left_members = self.members.write().await.end_sync();
for msg in left_members {
self.emit_event(Arc::new(msg));
}
}
}
pub(crate) async fn reenter_own_members(&self) -> Result<()> {
let my = self.my_members.read().await.clone();
for (_, member) in my {
let result = self
.send_presence(
PresenceAction::Enter,
member.client_id.clone(),
if member.data.is_none() {
None
} else {
Some(member.data.clone())
},
)
.await;
if let Err(e) = result {
eprintln!(
"Failed to re-enter presence for {}: {}",
member.client_id, e
);
}
}
Ok(())
}
pub(crate) async fn clear(&self) {
self.members.write().await.clear();
}
async fn require_client_id(&self) -> Result<String> {
let shared = self.connection.shared.read().await;
if let Some(ref client_id) = shared.client_id {
Ok(client_id.clone())
} else {
Err(Error::new(
ErrorCode::UnableToEnterPresenceChannelNoClientID,
"No clientId configured; set client_id in ClientOptions for presence",
))
}
}
async fn send_presence(
&self,
action: PresenceAction,
client_id: String,
data: Option<Data>,
) -> Result<()> {
let wpm = WirePresenceMessage::new(
action,
client_id,
data.unwrap_or(Data::None),
);
let mut pm = ProtocolMessage::new(Action::Presence);
pm.channel = Some(self.channel_name.clone());
pm.presence = Some(vec![wpm]);
self.connection.send(pm, true).await
}
}