use diesel::prelude::*;
use palpo_core::MatrixError;
use crate::core::events::AnyToDeviceEvent;
use crate::core::identifiers::*;
use crate::core::{client::device::Device, JsonValue, RawJson, Seqnum, UnixMillis};
use crate::schema::*;
use crate::user::NewDbAccessToken;
use crate::{connect, diesel_exists, DataError, DataResult};
#[derive(Identifiable, Queryable, Debug, Clone)]
#[diesel(table_name = user_devices)]
pub struct DbUserDevice {
pub id: i64,
pub user_id: OwnedUserId,
pub device_id: OwnedDeviceId,
pub display_name: Option<String>,
pub user_agent: Option<String>,
pub is_hidden: bool,
pub last_seen_ip: Option<String>,
pub last_seen_at: Option<UnixMillis>,
pub created_at: UnixMillis,
}
#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = user_devices)]
pub struct NewDbUserDevice {
pub user_id: OwnedUserId,
pub device_id: OwnedDeviceId,
pub display_name: Option<String>,
pub user_agent: Option<String>,
pub is_hidden: bool,
pub last_seen_ip: Option<String>,
pub last_seen_at: Option<UnixMillis>,
pub created_at: UnixMillis,
}
impl DbUserDevice {
pub fn into_matrix_device(self) -> Device {
let Self {
device_id,
display_name,
last_seen_at,
last_seen_ip,
..
} = self;
Device {
device_id,
display_name,
last_seen_ip,
last_seen_ts: last_seen_at,
}
}
}
#[derive(Identifiable, Queryable, Debug, Clone)]
#[diesel(table_name = device_inboxes)]
pub struct DbDeviceInbox {
pub id: i64,
pub user_id: OwnedUserId,
pub device_id: OwnedDeviceId,
pub json_data: JsonValue,
pub occur_sn: i64,
pub created_at: i64,
}
#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = device_inboxes)]
pub struct NewDbDeviceInbox {
pub user_id: OwnedUserId,
pub device_id: OwnedDeviceId,
pub json_data: JsonValue,
pub created_at: i64,
}
pub fn create_device(
user_id: &UserId,
device_id: &DeviceId,
token: &str,
initial_device_display_name: Option<String>,
) -> DataResult<DbUserDevice> {
let device = diesel::insert_into(user_devices::table)
.values(NewDbUserDevice {
user_id: user_id.to_owned(),
device_id: device_id.to_owned(),
display_name: initial_device_display_name,
user_agent: None,
is_hidden: false,
last_seen_ip: None, last_seen_at: Some(UnixMillis::now()),
created_at: UnixMillis::now(),
})
.get_result(&mut connect()?)?;
diesel::insert_into(user_access_tokens::table)
.values(NewDbAccessToken::new(
user_id.to_owned(),
device_id.to_owned(),
token.to_owned(),
))
.execute(&mut connect()?)?;
Ok(device)
}
pub fn get_device(user_id: &UserId, device_id: &DeviceId) -> DataResult<DbUserDevice> {
user_devices::table
.filter(user_devices::user_id.eq(user_id))
.filter(user_devices::device_id.eq(device_id))
.first::<DbUserDevice>(&mut connect()?)
.map_err(Into::into)
}
pub fn all_device_ids(user_id: &UserId) -> DataResult<Vec<OwnedDeviceId>> {
user_devices::table
.filter(user_devices::user_id.eq(user_id))
.select(user_devices::device_id)
.load::<OwnedDeviceId>(&mut connect()?)
.map_err(Into::into)
}
pub fn is_device_exists(user_id: &UserId, device_id: &DeviceId) -> DataResult<bool> {
let query = user_devices::table
.filter(user_devices::user_id.eq(user_id))
.filter(user_devices::device_id.eq(device_id));
diesel_exists!(query, &mut connect()?).map_err(Into::into)
}
pub fn remove_device(user_id: &UserId, device_id: &OwnedDeviceId) -> DataResult<()> {
let count = diesel::delete(
user_devices::table
.filter(user_devices::user_id.eq(user_id))
.filter(user_devices::device_id.eq(device_id)),
)
.execute(&mut connect()?)?;
if count == 0 {
if diesel_exists!(
user_devices::table.filter(user_devices::device_id.eq(device_id)),
&mut connect()?
)? {
return Err(MatrixError::forbidden("Device not owned by user.").into());
} else {
return Err(MatrixError::not_found("Device not found.").into());
}
}
diesel::delete(
user_access_tokens::table
.filter(user_access_tokens::user_id.eq(user_id))
.filter(user_access_tokens::device_id.eq(device_id)),
)
.execute(&mut connect()?)?;
diesel::delete(
user_refresh_tokens::table
.filter(user_refresh_tokens::user_id.eq(user_id))
.filter(user_refresh_tokens::device_id.eq(device_id)),
)
.execute(&mut connect()?)?;
diesel::delete(
pushers::table
.filter(pushers::user_id.eq(user_id))
.filter(pushers::device_id.eq(device_id)),
)
.execute(&mut connect()?)?;
Ok(())
}
pub fn remove_all_devices(user_id: &UserId) -> DataResult<()> {
diesel::delete(user_devices::table.filter(user_devices::user_id.eq(user_id))).execute(&mut connect()?)?;
diesel::delete(user_access_tokens::table.filter(user_access_tokens::user_id.eq(user_id)))
.execute(&mut connect()?)?;
diesel::delete(user_refresh_tokens::table.filter(user_refresh_tokens::user_id.eq(user_id)))
.execute(&mut connect()?)?;
Ok(())
}
pub fn delete_dehydrated_devices(user_id: &UserId) -> DataResult<()> {
diesel::delete(user_dehydrated_devices::table.filter(user_dehydrated_devices::user_id.eq(user_id)))
.execute(&mut connect()?)?;
Ok(())
}
pub fn set_token(user_id: &UserId, device_id: &DeviceId, token: &str) -> DataResult<()> {
diesel::insert_into(user_access_tokens::table)
.values(NewDbAccessToken::new(
user_id.to_owned(),
device_id.to_owned(),
token.to_owned(),
))
.on_conflict((user_access_tokens::user_id, user_access_tokens::device_id))
.do_update()
.set(user_access_tokens::token.eq(token))
.execute(&mut connect()?)?;
Ok(())
}
pub fn get_to_device_events(
user_id: &UserId,
device_id: &DeviceId,
since_sn: Option<Seqnum>,
until_sn: Option<Seqnum>,
) -> DataResult<Vec<RawJson<AnyToDeviceEvent>>> {
println!("==================get_to_device_events");
device_inboxes::table
.filter(device_inboxes::user_id.eq(user_id))
.filter(device_inboxes::device_id.eq(device_id))
.load::<DbDeviceInbox>(&mut connect()?)?
.into_iter()
.map(|event| {
serde_json::from_value(event.json_data.clone())
.map_err(|_| DataError::public("Invalid JSON in device inbox"))
})
.collect::<DataResult<Vec<_>>>()
}
pub fn add_to_device_event(
sender: &UserId,
target_user_id: &UserId,
target_device_id: &DeviceId,
event_type: &str,
content: serde_json::Value,
) -> DataResult<()> {
println!("===============add_to_device_event");
let mut json = serde_json::Map::new();
json.insert("type".to_owned(), event_type.to_owned().into());
json.insert("sender".to_owned(), sender.to_string().into());
json.insert("content".to_owned(), content);
let json_data = serde_json::to_value(&json)?;
diesel::insert_into(device_inboxes::table)
.values(NewDbDeviceInbox {
user_id: target_user_id.to_owned(),
device_id: target_device_id.to_owned(),
json_data,
created_at: UnixMillis::now().get() as i64,
})
.execute(&mut connect()?)?;
Ok(())
}
pub fn remove_to_device_events(user_id: &UserId, device_id: &DeviceId, until_sn: Seqnum) -> DataResult<()> {
diesel::delete(
device_inboxes::table
.filter(device_inboxes::user_id.eq(user_id))
.filter(device_inboxes::device_id.eq(device_id))
.filter(device_inboxes::occur_sn.le(until_sn)),
)
.execute(&mut connect()?)?;
Ok(())
}