1use diesel::prelude::*;
2use serde::de::DeserializeOwned;
3
4use crate::core::events::{AnyRawAccountDataEvent, RoomAccountDataEventType};
5use crate::core::identifiers::*;
6use crate::core::UnixMillis;
7use crate::core::serde::{JsonValue, json, RawJson};
8use crate::schema::*;
9use crate::{connect, DataResult};
10
11#[derive(Identifiable, Queryable, Debug, Clone)]
12#[diesel(table_name = user_datas)]
13pub struct DbUserData {
14 pub id: i64,
15 pub user_id: OwnedUserId,
16 pub room_id: Option<OwnedRoomId>,
17 pub data_type: String,
18 pub json_data: JsonValue,
19 pub occur_sn: i64,
20 pub created_at: UnixMillis,
21}
22#[derive(Insertable, AsChangeset, Debug, Clone)]
23#[diesel(table_name = user_datas)]
24pub struct NewDbUserData {
25 pub user_id: OwnedUserId,
26 pub room_id: Option<OwnedRoomId>,
27 pub data_type: String,
28 pub json_data: JsonValue,
29 pub occur_sn: Option<i64>,
30 pub created_at: UnixMillis,
31}
32
33#[tracing::instrument(skip(room_id, user_id, event_type, json_data))]
35pub fn set_data(
36 user_id: &UserId,
37 room_id: Option<OwnedRoomId>,
38 event_type: &str,
39 json_data: JsonValue,
40) -> DataResult<DbUserData> {
41 if let Some(room_id) = &room_id {
42 let user_data = user_datas::table
43 .filter(user_datas::user_id.eq(user_id))
44 .filter(user_datas::room_id.eq(room_id))
45 .filter(user_datas::data_type.eq(event_type))
46 .first::<DbUserData>(&mut *connect()?)
47 .optional()?;
48 if let Some(user_data) = user_data {
49 if user_data.json_data == json_data {
50 return Ok(user_data);
51 }
52 }
53 } else {
54 let user_data = user_datas::table
55 .filter(user_datas::user_id.eq(user_id))
56 .filter(user_datas::room_id.is_null())
57 .filter(user_datas::data_type.eq(event_type))
58 .first::<DbUserData>(&mut *connect()?)
59 .optional()?;
60 if let Some(user_data) = user_data {
61 if user_data.json_data == json_data {
62 return Ok(user_data);
63 }
64 }
65 }
66
67 let new_data = NewDbUserData {
68 user_id: user_id.to_owned(),
69 room_id: room_id.clone(),
70 data_type: event_type.to_owned(),
71 json_data,
72 occur_sn: None,
73 created_at: UnixMillis::now(),
74 };
75 diesel::insert_into(user_datas::table)
76 .values(&new_data)
77 .on_conflict((user_datas::user_id, user_datas::room_id, user_datas::data_type))
78 .do_update()
79 .set(&new_data)
80 .get_result::<DbUserData>(&mut *connect()?)
81 .map_err(Into::into)
82}
83
84#[tracing::instrument]
85pub fn get_data<E: DeserializeOwned>(user_id: &UserId, room_id: Option<&RoomId>, kind: &str) -> DataResult<Option<E>> {
86 let row = user_datas::table
87 .filter(user_datas::user_id.eq(user_id))
88 .filter(user_datas::room_id.eq(room_id).or(user_datas::room_id.is_null()))
89 .filter(user_datas::data_type.eq(kind))
90 .order_by(user_datas::id.desc())
91 .first::<DbUserData>(&mut *connect()?)
92 .optional()?;
93 if let Some(row) = row {
94 Ok(Some(serde_json::from_value(row.json_data)?))
95 } else {
96 Ok(None)
97 }
98}
99
100#[tracing::instrument]
102pub fn get_room_data<E: DeserializeOwned>(user_id: &UserId, room_id: &RoomId, kind: &str) -> DataResult<Option<E>> {
103 let row = user_datas::table
104 .filter(user_datas::user_id.eq(user_id))
105 .filter(user_datas::room_id.eq(room_id))
106 .filter(user_datas::data_type.eq(kind))
107 .order_by(user_datas::id.desc())
108 .first::<DbUserData>(&mut *connect()?)
109 .optional()?;
110 if let Some(row) = row {
111 Ok(Some(serde_json::from_value(row.json_data)?))
112 } else {
113 Ok(None)
114 }
115}
116
117#[tracing::instrument]
118pub fn get_global_data<E: DeserializeOwned>(user_id: &UserId, kind: &str) -> DataResult<Option<E>> {
119 let row = user_datas::table
120 .filter(user_datas::user_id.eq(user_id))
121 .filter(user_datas::room_id.is_null())
122 .filter(user_datas::data_type.eq(kind))
123 .order_by(user_datas::id.desc())
124 .first::<DbUserData>(&mut *connect()?)
125 .optional()?;
126 if let Some(row) = row {
127 Ok(Some(serde_json::from_value(row.json_data)?))
128 } else {
129 Ok(None)
130 }
131}
132
133#[tracing::instrument(skip(room_id, user_id, since_sn))]
135pub fn data_changes(
136 room_id: Option<&RoomId>,
137 user_id: &UserId,
138 since_sn: i64,
139 until_sn: Option<i64>,
140) -> DataResult<Vec<AnyRawAccountDataEvent>> {
141 let mut user_datas = Vec::new();
142
143 let query = user_datas::table
144 .filter(user_datas::user_id.eq(user_id))
145 .filter(user_datas::room_id.eq(room_id).or(user_datas::room_id.is_null()))
146 .filter(user_datas::occur_sn.ge(since_sn))
147 .into_boxed();
148 let db_datas = if let Some(until_sn) = until_sn {
149 query
150 .filter(user_datas::occur_sn.le(until_sn))
151 .order_by(user_datas::occur_sn.asc())
152 .load::<DbUserData>(&mut *connect()?)?
153 } else {
154 query
155 .order_by(user_datas::occur_sn.asc())
156 .load::<DbUserData>(&mut *connect()?)?
157 };
158
159 for db_data in db_datas {
160 let kind = RoomAccountDataEventType::from(&*db_data.data_type);
161 let account_data = json!({
162 "type": kind,
163 "content": db_data.json_data
164 });
165 if db_data.room_id.is_none() {
166 user_datas.push(AnyRawAccountDataEvent::Global(RawJson::from_value(&account_data)?));
167 } else {
168 user_datas.push(AnyRawAccountDataEvent::Room(RawJson::from_value(&account_data)?));
169 }
170 }
171
172 Ok(user_datas)
173}