use std::collections::HashMap;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use crate::core::events::presence::{PresenceEvent, PresenceEventContent};
use crate::core::identifiers::*;
use crate::core::presence::PresenceState;
use crate::core::{MatrixError, UnixMillis};
use crate::schema::*;
use crate::{connect, DataError, DataResult};
#[derive(Identifiable, Queryable, Debug, Clone)]
#[diesel(table_name = user_presences)]
pub struct DbPresence {
pub id: i64,
pub user_id: OwnedUserId,
pub stream_id: Option<i64>,
pub state: Option<String>,
pub status_msg: Option<String>,
pub last_active_at: Option<UnixMillis>,
pub last_federation_update_at: Option<UnixMillis>,
pub last_user_sync_at: Option<UnixMillis>,
pub currently_active: Option<bool>,
pub occur_sn: i64,
}
#[derive(Insertable, AsChangeset, Debug, Clone)]
#[diesel(table_name = user_presences)]
pub struct NewDbPresence {
pub user_id: OwnedUserId,
pub stream_id: Option<i64>,
pub state: Option<String>,
pub status_msg: Option<String>,
pub last_active_at: Option<UnixMillis>,
pub last_federation_update_at: Option<UnixMillis>,
pub last_user_sync_at: Option<UnixMillis>,
pub currently_active: Option<bool>,
pub occur_sn: Option<i64>,
}
impl DbPresence {
pub fn to_presence_event(&self, user_id: &UserId) -> DataResult<PresenceEvent> {
let now = UnixMillis::now();
let state = self.state.as_deref().map(PresenceState::from).unwrap_or_default();
let last_active_ago = if state == PresenceState::Online {
None
} else {
self.last_active_at
.map(|last_active_at| now.0.saturating_sub(last_active_at.0))
};
let profile = crate::user::get_profile(user_id, None)?;
Ok(PresenceEvent {
sender: user_id.to_owned(),
content: PresenceEventContent {
presence: state,
status_msg: self.status_msg.clone(),
currently_active: self.currently_active,
last_active_ago,
display_name: profile.as_ref().and_then(|p| p.display_name.clone()),
avatar_url: profile.as_ref().and_then(|p| p.avatar_url.clone()),
},
})
}
}
pub fn ping_presence(user_id: &UserId, new_state: &PresenceState) -> DataResult<()> {
const REFRESH_TIMEOUT: u64 = 60 * 1000;
let last_presence = last_presence(user_id);
let state_changed = match last_presence {
Err(_) => true,
Ok(ref presence) => presence.content.presence != *new_state,
};
let last_last_active_ago = match last_presence {
Err(_) => 0_u64,
Ok(ref presence) => presence.content.last_active_ago.unwrap_or_default().into(),
};
if !state_changed && last_last_active_ago < REFRESH_TIMEOUT {
return Ok(());
}
let status_msg = match last_presence {
Ok(presence) => presence.content.status_msg.clone(),
Err(_) => Some(String::new()),
};
let currently_active = *new_state == PresenceState::Online;
set_presence(
NewDbPresence {
user_id: user_id.to_owned(),
stream_id: None,
state: Some(new_state.to_string()),
status_msg: None,
last_active_at: Some(UnixMillis::now()),
last_federation_update_at: None,
last_user_sync_at: None,
currently_active: Some(currently_active),
occur_sn: None,
},
false,
)
}
pub fn last_presence(user_id: &UserId) -> DataResult<PresenceEvent> {
let presence = user_presences::table
.filter(user_presences::user_id.eq(user_id))
.first::<DbPresence>(&mut *connect()?)
.optional()?;
if let Some(data) = presence {
Ok(data.to_presence_event(user_id)?)
} else {
Err(MatrixError::not_found("No presence data found for user").into())
}
}
pub fn set_presence(mut presence: NewDbPresence, force: bool) -> DataResult<()> {
if force {
diesel::delete(user_presences::table.filter(user_presences::user_id.eq(&presence.user_id)))
.execute(&mut connect()?)?;
diesel::insert_into(user_presences::table)
.values(&presence)
.on_conflict(user_presences::user_id)
.do_update()
.set(&presence)
.execute(&mut connect()?)?;
} else {
let old_state = user_presences::table
.filter(user_presences::user_id.eq(&presence.user_id))
.select(user_presences::state)
.first::<Option<String>>(&mut connect()?)
.optional()?
.flatten();
if old_state != presence.state && presence.state.is_some() {
diesel::delete(user_presences::table.filter(user_presences::user_id.eq(&presence.user_id)))
.execute(&mut connect()?)?;
diesel::insert_into(user_presences::table)
.values(&presence)
.on_conflict(user_presences::user_id)
.do_update()
.set(&presence)
.execute(&mut connect()?)?;
} else {
diesel::update(user_presences::table.filter(user_presences::user_id.eq(&presence.user_id)))
.set(&presence)
.execute(&mut connect()?)?;
}
}
Ok(())
}
pub fn remove_presence(user_id: &UserId) -> DataResult<()> {
diesel::delete(user_presences::table.filter(user_presences::user_id.eq(user_id))).execute(&mut connect()?)?;
Ok(())
}
pub fn presences_since(since_sn: i64) -> DataResult<HashMap<OwnedUserId, PresenceEvent>> {
let presences = user_presences::table
.filter(user_presences::occur_sn.ge(since_sn))
.load::<DbPresence>(&mut *connect()?)?;
presences
.into_iter()
.map(|presence| {
presence
.to_presence_event(&presence.user_id)
.map(|event| (presence.user_id, event))
})
.collect()
}
#[inline]
pub fn from_json_bytes_to_event(bytes: &[u8], user_id: &UserId) -> DataResult<PresenceEvent> {
let presence = Presence::from_json_bytes(bytes)?;
let event = presence.to_presence_event(user_id);
Ok(event)
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub(super) struct Presence {
state: PresenceState,
currently_active: bool,
last_active_ts: u64,
status_msg: Option<String>,
}
impl Presence {
#[must_use]
pub(super) fn new(
state: PresenceState,
currently_active: bool,
last_active_ts: u64,
status_msg: Option<String>,
) -> Self {
Self {
state,
currently_active,
last_active_ts,
status_msg,
}
}
pub(super) fn from_json_bytes(bytes: &[u8]) -> DataResult<Self> {
serde_json::from_slice(bytes).map_err(|_| DataError::public("Invalid presence data in database"))
}
pub(super) fn to_presence_event(&self, user_id: &UserId) -> PresenceEvent {
let now = UnixMillis::now();
let last_active_ago = Some(now.0.saturating_sub(self.last_active_ts));
PresenceEvent {
sender: user_id.to_owned(),
content: PresenceEventContent {
presence: self.state.clone(),
status_msg: self.status_msg.clone(),
currently_active: Some(self.currently_active),
last_active_ago,
display_name: crate::user::display_name(user_id).ok().flatten(),
avatar_url: crate::user::avatar_url(user_id).ok().flatten(),
},
}
}
}