palpo_data/user/
data.rs

1use diesel::prelude::*;
2use serde::de::DeserializeOwned;
3
4use crate::core::events::{AnyRawAccountDataEvent, RoomAccountDataEventType};
5use crate::core::identifiers::*;
6use crate::core::UnixMillis;
7use crate::core::serde::{JsonValue, json, RawJson};
8use crate::schema::*;
9use crate::{connect, DataResult};
10
11#[derive(Identifiable, Queryable, Debug, Clone)]
12#[diesel(table_name = user_datas)]
13pub struct DbUserData {
14    pub id: i64,
15    pub user_id: OwnedUserId,
16    pub room_id: Option<OwnedRoomId>,
17    pub data_type: String,
18    pub json_data: JsonValue,
19    pub occur_sn: i64,
20    pub created_at: UnixMillis,
21}
22#[derive(Insertable, AsChangeset, Debug, Clone)]
23#[diesel(table_name = user_datas)]
24pub struct NewDbUserData {
25    pub user_id: OwnedUserId,
26    pub room_id: Option<OwnedRoomId>,
27    pub data_type: String,
28    pub json_data: JsonValue,
29    pub occur_sn: Option<i64>,
30    pub created_at: UnixMillis,
31}
32
33/// Places one event in the account data of the user and removes the previous entry.
34#[tracing::instrument(skip(room_id, user_id, event_type, json_data))]
35pub fn set_data(
36    user_id: &UserId,
37    room_id: Option<OwnedRoomId>,
38    event_type: &str,
39    json_data: JsonValue,
40) -> DataResult<DbUserData> {
41    if let Some(room_id) = &room_id {
42        let user_data = user_datas::table
43            .filter(user_datas::user_id.eq(user_id))
44            .filter(user_datas::room_id.eq(room_id))
45            .filter(user_datas::data_type.eq(event_type))
46            .first::<DbUserData>(&mut *connect()?)
47            .optional()?;
48        if let Some(user_data) = user_data {
49            if user_data.json_data == json_data {
50                return Ok(user_data);
51            }
52        }
53    } else {
54        let user_data = user_datas::table
55            .filter(user_datas::user_id.eq(user_id))
56            .filter(user_datas::room_id.is_null())
57            .filter(user_datas::data_type.eq(event_type))
58            .first::<DbUserData>(&mut *connect()?)
59            .optional()?;
60        if let Some(user_data) = user_data {
61            if user_data.json_data == json_data {
62                return Ok(user_data);
63            }
64        }
65    }
66
67    let new_data = NewDbUserData {
68        user_id: user_id.to_owned(),
69        room_id: room_id.clone(),
70        data_type: event_type.to_owned(),
71        json_data,
72        occur_sn: None,
73        created_at: UnixMillis::now(),
74    };
75    diesel::insert_into(user_datas::table)
76        .values(&new_data)
77        .on_conflict((user_datas::user_id, user_datas::room_id, user_datas::data_type))
78        .do_update()
79        .set(&new_data)
80        .get_result::<DbUserData>(&mut *connect()?)
81        .map_err(Into::into)
82}
83
84#[tracing::instrument]
85pub fn get_data<E: DeserializeOwned>(user_id: &UserId, room_id: Option<&RoomId>, kind: &str) -> DataResult<Option<E>> {
86    let row = user_datas::table
87        .filter(user_datas::user_id.eq(user_id))
88        .filter(user_datas::room_id.eq(room_id).or(user_datas::room_id.is_null()))
89        .filter(user_datas::data_type.eq(kind))
90        .order_by(user_datas::id.desc())
91        .first::<DbUserData>(&mut *connect()?)
92        .optional()?;
93    if let Some(row) = row {
94        Ok(Some(serde_json::from_value(row.json_data)?))
95    } else {
96        Ok(None)
97    }
98}
99
100/// Searches the account data for a specific kind.
101#[tracing::instrument]
102pub fn get_room_data<E: DeserializeOwned>(user_id: &UserId, room_id: &RoomId, kind: &str) -> DataResult<Option<E>> {
103    let row = user_datas::table
104        .filter(user_datas::user_id.eq(user_id))
105        .filter(user_datas::room_id.eq(room_id))
106        .filter(user_datas::data_type.eq(kind))
107        .order_by(user_datas::id.desc())
108        .first::<DbUserData>(&mut *connect()?)
109        .optional()?;
110    if let Some(row) = row {
111        Ok(Some(serde_json::from_value(row.json_data)?))
112    } else {
113        Ok(None)
114    }
115}
116
117#[tracing::instrument]
118pub fn get_global_data<E: DeserializeOwned>(user_id: &UserId, kind: &str) -> DataResult<Option<E>> {
119    let row = user_datas::table
120        .filter(user_datas::user_id.eq(user_id))
121        .filter(user_datas::room_id.is_null())
122        .filter(user_datas::data_type.eq(kind))
123        .order_by(user_datas::id.desc())
124        .first::<DbUserData>(&mut *connect()?)
125        .optional()?;
126    if let Some(row) = row {
127        Ok(Some(serde_json::from_value(row.json_data)?))
128    } else {
129        Ok(None)
130    }
131}
132
133/// Returns all changes to the account data that happened after `since`.
134#[tracing::instrument(skip(room_id, user_id, since_sn))]
135pub fn data_changes(
136    room_id: Option<&RoomId>,
137    user_id: &UserId,
138    since_sn: i64,
139    until_sn: Option<i64>,
140) -> DataResult<Vec<AnyRawAccountDataEvent>> {
141    let mut user_datas = Vec::new();
142
143    let query = user_datas::table
144        .filter(user_datas::user_id.eq(user_id))
145        .filter(user_datas::room_id.eq(room_id).or(user_datas::room_id.is_null()))
146        .filter(user_datas::occur_sn.ge(since_sn))
147        .into_boxed();
148    let db_datas = if let Some(until_sn) = until_sn {
149        query
150            .filter(user_datas::occur_sn.le(until_sn))
151            .order_by(user_datas::occur_sn.asc())
152            .load::<DbUserData>(&mut *connect()?)?
153    } else {
154        query
155            .order_by(user_datas::occur_sn.asc())
156            .load::<DbUserData>(&mut *connect()?)?
157    };
158
159    for db_data in db_datas {
160        let kind = RoomAccountDataEventType::from(&*db_data.data_type);
161        let account_data = json!({
162            "type": kind,
163            "content": db_data.json_data
164        });
165        if db_data.room_id.is_none() {
166            user_datas.push(AnyRawAccountDataEvent::Global(RawJson::from_value(&account_data)?));
167        } else {
168            user_datas.push(AnyRawAccountDataEvent::Room(RawJson::from_value(&account_data)?));
169        }
170    }
171
172    Ok(user_datas)
173}