use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use maf_schemas::packet::RxPacket;
use serde::de::DeserializeOwned;
use uuid::Uuid;
use crate::{
app::AppState,
callable::{CallableFetch, CallableParam, SupportsAsync},
channel::BoundChannel,
platform::{self, Message, PlatformUser, SendError},
App, Channel,
};
#[derive(Debug, Clone)]
pub struct User {
meta: UserMeta,
state: Arc<AppState>,
inner: Arc<platform::RawUser>,
auth_deserialized_cached: Arc<RwLock<Option<Arc<dyn std::any::Any + Send + Sync>>>>,
}
#[derive(Debug, Clone)]
pub struct UserMeta {
pub id: Uuid,
pub auth: Option<serde_json::Value>,
}
#[derive(Debug, thiserror::Error)]
pub enum UserNextMessageError {
#[error("failed to deserialize message")]
Deserialize(#[from] serde_json::Error),
#[error("failed to listen for message")]
Listen(#[from] platform::ListenError),
}
impl User {
pub(crate) fn new(state: Arc<AppState>, raw: platform::RawUser) -> Self {
Self {
meta: raw.meta(),
state,
inner: Arc::new(raw),
auth_deserialized_cached: Arc::new(RwLock::new(None)),
}
}
pub fn meta(&self) -> &UserMeta {
&self.meta
}
pub fn auth<T: DeserializeOwned + Send + Sync + 'static>(&self) -> Arc<T> {
if let Some(Ok(cached)) = self
.auth_deserialized_cached
.read()
.expect("poisoned lock")
.as_ref()
.map(|arc_any| arc_any.clone().downcast::<T>())
{
return cached;
}
let auth = self.meta.auth.as_ref().expect("user is not authenticated");
let auth_arc: Arc<T> = Arc::new(
serde_json::from_value::<T>(auth.clone())
.expect("failed to deserialize user auth data"),
);
self.auth_deserialized_cached
.write()
.expect("poisoned lock")
.replace(auth_arc.clone());
auth_arc
}
pub(crate) async fn next_message(&self) -> Result<UserMessage<'_>, UserNextMessageError> {
let message = self.inner.next_message().await?;
Ok(UserMessage {
user: self,
packet: match message {
platform::Message::Text(text) => {
serde_json::from_str(&text).map_err(UserNextMessageError::Deserialize)?
}
platform::Message::Binary(data) => todo!("handle binary messages: {data:?}"),
},
})
}
pub(crate) fn send(&self, data: impl serde::Serialize) -> Result<(), SendError> {
let text = serde_json::to_string(&data)?;
self.inner.send(Message::Text(text))?;
Ok(())
}
pub fn channel<T>(&self, name: impl ToString) -> BoundChannel<T> {
let name = name.to_string();
BoundChannel::new(Channel::new(self.state.clone(), name), &self)
}
}
impl UserMeta {
pub fn id(&self) -> Uuid {
self.id
}
}
#[derive(Debug)]
pub struct UserMessage<'a> {
pub(crate) user: &'a User,
pub(crate) packet: RxPacket,
}
pub struct Users {
app: App,
}
impl Users {
pub(crate) fn new(app: App) -> Self {
Self { app }
}
pub async fn all(&self) -> HashMap<Uuid, User> {
let users = self.app.inner.state.users.read().await;
users.clone()
}
pub async fn get(&self, id: &Uuid) -> Option<User> {
let users = self.app.inner.state.users.read().await;
users.get(id).cloned()
}
pub async fn count(&self) -> usize {
let users = self.app.inner.state.users.read().await;
users.len()
}
}
impl<Ctx, Init> CallableParam<Ctx, Init> for Users
where
Ctx: CallableFetch<App>,
Init: Send + Sync,
{
type Error = std::convert::Infallible;
async fn extract(ctx: &mut Ctx, _init: &Init) -> Result<Self, Self::Error> {
let app = ctx.fetch();
Ok(Users::new(app))
}
}
impl SupportsAsync for Users {}