1use oiseau::cache::Cache;
2use crate::model::id::Id;
3use crate::model::socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod};
4use crate::model::{Error, Result, auth::Notification, auth::User, permissions::FinePermission};
5use crate::{auto_method, DataManager};
6
7use oiseau::{PostgresRow, cache::redis::Commands};
8use oiseau::{execute, get, query_rows, params};
9
10impl DataManager {
11 pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
13 Notification {
14 id: crate::model::id::Id::deserialize(&get!(x->0(String))),
15 created: get!(x->1(i64)) as u128,
16 title: get!(x->2(String)),
17 content: get!(x->3(String)),
18 owner: crate::model::id::Id::deserialize(&get!(x->4(String))),
19 read: get!(x->5(i32)) as i8 == 1,
20 tag: get!(x->6(String)),
21 }
22 }
23
24 auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
25
26 pub async fn get_notifications_by_owner(&self, owner: &Id) -> Result<Vec<Notification>> {
28 let conn = match self.0.connect().await {
29 Ok(c) => c,
30 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
31 };
32
33 let res = query_rows!(
34 &conn,
35 "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC",
36 &[&owner.printable()],
37 |x| { Self::get_notification_from_row(x) }
38 );
39
40 if res.is_err() {
41 return Err(Error::GeneralNotFound("notification".to_string()));
42 }
43
44 Ok(res.unwrap())
45 }
46
47 pub async fn get_notifications_by_owner_paginated(
49 &self,
50 owner: &Id,
51 batch: usize,
52 page: usize,
53 ) -> Result<Vec<Notification>> {
54 let conn = match self.0.connect().await {
55 Ok(c) => c,
56 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
57 };
58
59 let res = query_rows!(
60 &conn,
61 "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
62 &[
63 &owner.printable(),
64 &(batch as i64),
65 &((page * batch) as i64)
66 ],
67 |x| { Self::get_notification_from_row(x) }
68 );
69
70 if res.is_err() {
71 return Err(Error::GeneralNotFound("notification".to_string()));
72 }
73
74 Ok(res.unwrap())
75 }
76
77 pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
79 let conn = match self.0.connect().await {
80 Ok(c) => c,
81 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
82 };
83
84 let res = query_rows!(
85 &conn,
86 "SELECT * FROM notifications WHERE tag = $1 ORDER BY created DESC",
87 &[&tag],
88 |x| { Self::get_notification_from_row(x) }
89 );
90
91 if res.is_err() {
92 return Err(Error::GeneralNotFound("notification".to_string()));
93 }
94
95 Ok(res.unwrap())
96 }
97
98 pub async fn create_notification(&self, data: Notification) -> Result<()> {
103 let conn = match self.0.connect().await {
104 Ok(c) => c,
105 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
106 };
107
108 let res = execute!(
109 &conn,
110 "INSERT INTO notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
111 params![
112 &data.id.printable(),
113 &(data.created as i64),
114 &data.title,
115 &data.content,
116 &data.owner.printable(),
117 &{ if data.read { 1 } else { 0 } },
118 &data.tag
119 ]
120 );
121
122 if let Err(e) = res {
123 return Err(Error::DatabaseError(e.to_string()));
124 }
125
126 self.incr_user_notifications(&data.owner).await?;
128
129 let mut con = self.0.1.get_con().await;
131
132 if let Err(e) = con.publish::<String, String, ()>(
133 format!("{}/notifs", data.owner),
134 serde_json::to_string(&SocketMessage {
135 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
136 data: serde_json::to_string(&data).unwrap(),
137 })
138 .unwrap(),
139 ) {
140 return Err(Error::MiscError(e.to_string()));
141 }
142
143 Ok(())
145 }
146
147 pub async fn delete_notification(&self, id: &crate::model::id::Id, user: &User) -> Result<()> {
148 let notification = self.get_notification_by_id(id).await?;
149
150 if user.id != notification.owner
151 && !user.permissions.check(FinePermission::ManageNotifications)
152 {
153 return Err(Error::NotAllowed);
154 }
155
156 let conn = match self.0.connect().await {
157 Ok(c) => c,
158 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
159 };
160
161 let res = execute!(
162 &conn,
163 "DELETE FROM notifications WHERE id = $1",
164 &[&id.printable()]
165 );
166
167 if let Err(e) = res {
168 return Err(Error::DatabaseError(e.to_string()));
169 }
170
171 self.0.1.remove(format!("atto.notification:{}", id)).await;
172
173 if !notification.read {
175 self.decr_user_notifications(¬ification.owner)
176 .await
177 .unwrap();
178 }
179
180 let mut con = self.0.1.get_con().await;
182
183 if let Err(e) = con.publish::<String, String, ()>(
184 format!("{}/notifs", notification.owner),
185 serde_json::to_string(&SocketMessage {
186 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
187 data: notification.id.to_string(),
188 })
189 .unwrap(),
190 ) {
191 return Err(Error::MiscError(e.to_string()));
192 }
193
194 Ok(())
196 }
197
198 pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
199 let conn = match self.0.connect().await {
200 Ok(c) => c,
201 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
202 };
203
204 let res = execute!(
205 &conn,
206 "DELETE FROM notifications WHERE owner = $1",
207 &[&user.id.printable()]
208 );
209
210 if let Err(e) = res {
211 return Err(Error::DatabaseError(e.to_string()));
212 }
213
214 self.update_user_notification_count(&user.id, 0).await?;
215 Ok(())
216 }
217
218 pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
219 let conn = match self.0.connect().await {
220 Ok(c) => c,
221 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
222 };
223
224 let res = execute!(
225 &conn,
226 "DELETE FROM notifications WHERE owner = $1 AND tag = $2",
227 params![&user.id.printable(), tag]
228 );
229
230 if let Err(e) = res {
231 return Err(Error::DatabaseError(e.to_string()));
232 }
233
234 Ok(())
235 }
236
237 pub async fn update_notification_read(
238 &self,
239 id: &crate::model::id::Id,
240 new_read: bool,
241 user: &User,
242 ) -> Result<()> {
243 let y = self.get_notification_by_id(id).await?;
244
245 if y.owner != user.id && !user.permissions.check(FinePermission::ManageNotifications) {
246 return Err(Error::NotAllowed);
247 }
248
249 let conn = match self.0.connect().await {
251 Ok(c) => c,
252 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
253 };
254
255 let res = execute!(
256 &conn,
257 "UPDATE notifications SET read = $1 WHERE id = $2",
258 params![&{ if new_read { 1 } else { 0 } }, &id.printable()]
259 );
260
261 if let Err(e) = res {
262 return Err(Error::DatabaseError(e.to_string()));
263 }
264
265 self.0.1.remove(format!("atto.notification:{}", id)).await;
266
267 if (y.read) && (!new_read) {
268 self.incr_user_notifications(&user.id).await?;
269 } else if (!y.read) && (new_read) {
270 self.decr_user_notifications(&user.id).await?;
271 }
272
273 Ok(())
274 }
275
276 pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
277 let notifications = self.get_notifications_by_owner(&user.id).await?;
278
279 let mut changed_count: i32 = 0;
280 for notification in notifications {
281 if notification.read == read {
282 continue;
284 }
285
286 changed_count += 1;
287
288 self.0
289 .1
290 .remove(format!("atto.notification:{}", notification.id))
291 .await;
292 }
293
294 let conn = match self.0.connect().await {
296 Ok(c) => c,
297 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
298 };
299
300 let res = execute!(
301 &conn,
302 "UPDATE notifications SET read = $1 WHERE owner = $2",
303 params![&{ if read { 1 } else { 0 } }, &user.id.printable()]
304 );
305
306 if let Err(e) = res {
307 return Err(Error::DatabaseError(e.to_string()));
308 }
309
310 if !read {
312 self.update_user_notification_count(&user.id, changed_count)
314 .await?;
315 } else {
316 self.update_user_notification_count(&user.id, 0).await?;
317 }
318
319 Ok(())
321 }
322}