1use diesel::prelude::*;
2use palpo_core::MatrixError;
3
4use crate::core::events::AnyToDeviceEvent;
5use crate::core::identifiers::*;
6use crate::core::{client::device::Device, JsonValue, RawJson, Seqnum, UnixMillis};
7use crate::schema::*;
8use crate::user::NewDbAccessToken;
9use crate::{connect, diesel_exists, DataError, DataResult};
10
11#[derive(Identifiable, Queryable, Debug, Clone)]
12#[diesel(table_name = user_devices)]
13pub struct DbUserDevice {
14 pub id: i64,
15
16 pub user_id: OwnedUserId,
17
18 pub device_id: OwnedDeviceId,
19
20 pub display_name: Option<String>,
22
23 pub user_agent: Option<String>,
24
25 pub is_hidden: bool,
26 pub last_seen_ip: Option<String>,
28
29 pub last_seen_at: Option<UnixMillis>,
31 pub created_at: UnixMillis,
32}
33#[derive(Insertable, Debug, Clone)]
34#[diesel(table_name = user_devices)]
35pub struct NewDbUserDevice {
36 pub user_id: OwnedUserId,
37
38 pub device_id: OwnedDeviceId,
39
40 pub display_name: Option<String>,
42
43 pub user_agent: Option<String>,
44
45 pub is_hidden: bool,
46 pub last_seen_ip: Option<String>,
48
49 pub last_seen_at: Option<UnixMillis>,
51 pub created_at: UnixMillis,
52}
53
54impl DbUserDevice {
55 pub fn into_matrix_device(self) -> Device {
56 let Self {
57 device_id,
58 display_name,
59 last_seen_at,
60 last_seen_ip,
61 ..
62 } = self;
63 Device {
64 device_id,
65 display_name,
66 last_seen_ip,
67 last_seen_ts: last_seen_at,
68 }
69 }
70}
71
72#[derive(Identifiable, Queryable, Debug, Clone)]
73#[diesel(table_name = device_inboxes)]
74pub struct DbDeviceInbox {
75 pub id: i64,
76
77 pub user_id: OwnedUserId,
78 pub device_id: OwnedDeviceId,
79 pub json_data: JsonValue,
80 pub occur_sn: i64,
81 pub created_at: i64,
82}
83#[derive(Insertable, Debug, Clone)]
84#[diesel(table_name = device_inboxes)]
85pub struct NewDbDeviceInbox {
86 pub user_id: OwnedUserId,
87 pub device_id: OwnedDeviceId,
88 pub json_data: JsonValue,
89 pub created_at: i64,
90}
91
92pub fn create_device(
93 user_id: &UserId,
94 device_id: &DeviceId,
95 token: &str,
96 initial_device_display_name: Option<String>,
97) -> DataResult<DbUserDevice> {
98 let device = diesel::insert_into(user_devices::table)
99 .values(NewDbUserDevice {
100 user_id: user_id.to_owned(),
101 device_id: device_id.to_owned(),
102 display_name: initial_device_display_name,
103 user_agent: None,
104 is_hidden: false,
105 last_seen_ip: None, last_seen_at: Some(UnixMillis::now()),
107 created_at: UnixMillis::now(),
108 })
109 .get_result(&mut connect()?)?;
110
111 diesel::insert_into(user_access_tokens::table)
112 .values(NewDbAccessToken::new(
113 user_id.to_owned(),
114 device_id.to_owned(),
115 token.to_owned(),
116 ))
117 .execute(&mut connect()?)?;
118 Ok(device)
119}
120
121pub fn get_device(user_id: &UserId, device_id: &DeviceId) -> DataResult<DbUserDevice> {
122 user_devices::table
123 .filter(user_devices::user_id.eq(user_id))
124 .filter(user_devices::device_id.eq(device_id))
125 .first::<DbUserDevice>(&mut connect()?)
126 .map_err(Into::into)
127}
128
129pub fn all_device_ids(user_id: &UserId) -> DataResult<Vec<OwnedDeviceId>> {
130 user_devices::table
131 .filter(user_devices::user_id.eq(user_id))
132 .select(user_devices::device_id)
133 .load::<OwnedDeviceId>(&mut connect()?)
134 .map_err(Into::into)
135}
136
137pub fn is_device_exists(user_id: &UserId, device_id: &DeviceId) -> DataResult<bool> {
138 let query = user_devices::table
139 .filter(user_devices::user_id.eq(user_id))
140 .filter(user_devices::device_id.eq(device_id));
141 diesel_exists!(query, &mut connect()?).map_err(Into::into)
142}
143
144pub fn remove_device(user_id: &UserId, device_id: &OwnedDeviceId) -> DataResult<()> {
145 let count = diesel::delete(
146 user_devices::table
147 .filter(user_devices::user_id.eq(user_id))
148 .filter(user_devices::device_id.eq(device_id)),
149 )
150 .execute(&mut connect()?)?;
151 if count == 0 {
152 if diesel_exists!(
153 user_devices::table.filter(user_devices::device_id.eq(device_id)),
154 &mut connect()?
155 )? {
156 return Err(MatrixError::forbidden("Device not owned by user.").into());
157 } else {
158 return Err(MatrixError::not_found("Device not found.").into());
159 }
160 }
161 diesel::delete(
162 user_access_tokens::table
163 .filter(user_access_tokens::user_id.eq(user_id))
164 .filter(user_access_tokens::device_id.eq(device_id)),
165 )
166 .execute(&mut connect()?)?;
167 diesel::delete(
168 user_refresh_tokens::table
169 .filter(user_refresh_tokens::user_id.eq(user_id))
170 .filter(user_refresh_tokens::device_id.eq(device_id)),
171 )
172 .execute(&mut connect()?)?;
173 diesel::delete(
174 pushers::table
175 .filter(pushers::user_id.eq(user_id))
176 .filter(pushers::device_id.eq(device_id)),
177 )
178 .execute(&mut connect()?)?;
179 Ok(())
180}
181pub fn remove_all_devices(user_id: &UserId) -> DataResult<()> {
182 diesel::delete(user_devices::table.filter(user_devices::user_id.eq(user_id))).execute(&mut connect()?)?;
183 diesel::delete(user_access_tokens::table.filter(user_access_tokens::user_id.eq(user_id)))
184 .execute(&mut connect()?)?;
185 diesel::delete(user_refresh_tokens::table.filter(user_refresh_tokens::user_id.eq(user_id)))
186 .execute(&mut connect()?)?;
187 Ok(())
188}
189
190pub fn delete_dehydrated_devices(user_id: &UserId) -> DataResult<()> {
191 diesel::delete(user_dehydrated_devices::table.filter(user_dehydrated_devices::user_id.eq(user_id)))
192 .execute(&mut connect()?)?;
193 Ok(())
194}
195
196pub fn set_token(user_id: &UserId, device_id: &DeviceId, token: &str) -> DataResult<()> {
197 diesel::insert_into(user_access_tokens::table)
198 .values(NewDbAccessToken::new(
199 user_id.to_owned(),
200 device_id.to_owned(),
201 token.to_owned(),
202 ))
203 .on_conflict((user_access_tokens::user_id, user_access_tokens::device_id))
204 .do_update()
205 .set(user_access_tokens::token.eq(token))
206 .execute(&mut connect()?)?;
207 Ok(())
208}
209
210pub fn get_to_device_events(
211 user_id: &UserId,
212 device_id: &DeviceId,
213 since_sn: Option<Seqnum>,
214 until_sn: Option<Seqnum>,
215) -> DataResult<Vec<RawJson<AnyToDeviceEvent>>> {
216 println!("==================get_to_device_events");
217 device_inboxes::table
218 .filter(device_inboxes::user_id.eq(user_id))
219 .filter(device_inboxes::device_id.eq(device_id))
220 .load::<DbDeviceInbox>(&mut connect()?)?
221 .into_iter()
222 .map(|event| {
223 serde_json::from_value(event.json_data.clone())
224 .map_err(|_| DataError::public("Invalid JSON in device inbox"))
225 })
226 .collect::<DataResult<Vec<_>>>()
227}
228
229pub fn add_to_device_event(
230 sender: &UserId,
231 target_user_id: &UserId,
232 target_device_id: &DeviceId,
233 event_type: &str,
234 content: serde_json::Value,
235) -> DataResult<()> {
236 println!("===============add_to_device_event");
237 let mut json = serde_json::Map::new();
238 json.insert("type".to_owned(), event_type.to_owned().into());
239 json.insert("sender".to_owned(), sender.to_string().into());
240 json.insert("content".to_owned(), content);
241
242 let json_data = serde_json::to_value(&json)?;
243
244 diesel::insert_into(device_inboxes::table)
245 .values(NewDbDeviceInbox {
246 user_id: target_user_id.to_owned(),
247 device_id: target_device_id.to_owned(),
248 json_data,
249 created_at: UnixMillis::now().get() as i64,
250 })
251 .execute(&mut connect()?)?;
252
253 Ok(())
254}
255
256pub fn remove_to_device_events(user_id: &UserId, device_id: &DeviceId, until_sn: Seqnum) -> DataResult<()> {
257 diesel::delete(
258 device_inboxes::table
259 .filter(device_inboxes::user_id.eq(user_id))
260 .filter(device_inboxes::device_id.eq(device_id))
261 .filter(device_inboxes::occur_sn.le(until_sn)),
262 )
263 .execute(&mut connect()?)?;
264 Ok(())
265}