1use crate::{
2 DataManager,
3 model::{
4 Error, Notification, Result, User, UserPermission,
5 socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod},
6 },
7};
8use oiseau::{
9 PostgresRow,
10 cache::{Cache, redis::Commands},
11 execute, get, params, query_rows,
12};
13use tetratto_core2::{auto_method, model::id::Id};
14
15impl DataManager {
16 pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
18 Notification {
19 id: Id::deserialize(&get!(x->0(String))),
20 created: get!(x->1(i64)) as u128,
21 title: get!(x->2(String)),
22 content: get!(x->3(String)),
23 owner: Id::Legacy(get!(x->4(i64)) as usize),
24 read: get!(x->5(i32)) as i8 == 1,
25 tag: get!(x->6(String)),
26 }
27 }
28
29 auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM a_notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
30
31 pub async fn get_notifications_by_owner(&self, owner: &Id) -> Result<Vec<Notification>> {
33 let conn = match self.0.connect().await {
34 Ok(c) => c,
35 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
36 };
37
38 let res = query_rows!(
39 &conn,
40 "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC",
41 &[&owner.printable()],
42 |x| { Self::get_notification_from_row(x) }
43 );
44
45 if res.is_err() {
46 return Err(Error::GeneralNotFound("notification".to_string()));
47 }
48
49 Ok(res.unwrap())
50 }
51
52 pub async fn get_notifications_by_owner_paginated(
54 &self,
55 owner: &Id,
56 batch: usize,
57 page: usize,
58 ) -> Result<Vec<Notification>> {
59 let conn = match self.0.connect().await {
60 Ok(c) => c,
61 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
62 };
63
64 let res = query_rows!(
65 &conn,
66 "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
67 &[
68 &owner.printable(),
69 &(batch as i64),
70 &((page * batch) as i64)
71 ],
72 |x| { Self::get_notification_from_row(x) }
73 );
74
75 if res.is_err() {
76 return Err(Error::GeneralNotFound("notification".to_string()));
77 }
78
79 Ok(res.unwrap())
80 }
81
82 pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
84 let conn = match self.0.connect().await {
85 Ok(c) => c,
86 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
87 };
88
89 let res = query_rows!(
90 &conn,
91 "SELECT * FROM a_notifications WHERE tag = $1 ORDER BY created DESC",
92 &[&tag],
93 |x| { Self::get_notification_from_row(x) }
94 );
95
96 if res.is_err() {
97 return Err(Error::GeneralNotFound("notification".to_string()));
98 }
99
100 Ok(res.unwrap())
101 }
102
103 pub async fn create_notification(&self, data: Notification) -> Result<()> {
108 let conn = match self.0.connect().await {
109 Ok(c) => c,
110 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
111 };
112
113 let res = execute!(
114 &conn,
115 "INSERT INTO a_notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
116 params![
117 &data.id.printable(),
118 &(data.created as i64),
119 &data.title,
120 &data.content,
121 &(data.owner.as_usize() as i64),
122 &{ if data.read { 1 } else { 0 } },
123 &data.tag
124 ]
125 );
126
127 if let Err(e) = res {
128 return Err(Error::DatabaseError(e.to_string()));
129 }
130
131 self.incr_user_notifications(&data.owner).await?;
133
134 let mut con = self.0.1.get_con().await;
136
137 if let Err(e) = con.publish::<String, String, ()>(
138 format!("{}/notifs", data.owner),
139 serde_json::to_string(&SocketMessage {
140 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
141 data: serde_json::to_string(&data).unwrap(),
142 })
143 .unwrap(),
144 ) {
145 return Err(Error::MiscError(e.to_string()));
146 }
147
148 Ok(())
150 }
151
152 pub async fn delete_notification(&self, id: &Id, user: &User) -> Result<()> {
153 let notification = self.get_notification_by_id(&id).await?;
154
155 if user.id != notification.owner
156 && !user
157 .permissions
158 .contains(&UserPermission::ManageNotifications)
159 {
160 return Err(Error::NotAllowed);
161 }
162
163 let conn = match self.0.connect().await {
164 Ok(c) => c,
165 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
166 };
167
168 let res = execute!(
169 &conn,
170 "DELETE FROM a_notifications WHERE id = $1",
171 &[&id.printable()]
172 );
173
174 if let Err(e) = res {
175 return Err(Error::DatabaseError(e.to_string()));
176 }
177
178 self.0.1.remove(format!("atto.notification:{}", id)).await;
179
180 if !notification.read {
182 self.decr_user_notifications(¬ification.owner)
183 .await
184 .unwrap();
185 }
186
187 let mut con = self.0.1.get_con().await;
189
190 if let Err(e) = con.publish::<String, String, ()>(
191 format!("{}/notifs", notification.owner),
192 serde_json::to_string(&SocketMessage {
193 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
194 data: notification.id.to_string(),
195 })
196 .unwrap(),
197 ) {
198 return Err(Error::MiscError(e.to_string()));
199 }
200
201 Ok(())
203 }
204
205 pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
206 let conn = match self.0.connect().await {
207 Ok(c) => c,
208 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
209 };
210
211 let res = execute!(
212 &conn,
213 "DELETE FROM a_notifications WHERE owner = $1",
214 &[&user.id.printable()]
215 );
216
217 if let Err(e) = res {
218 return Err(Error::DatabaseError(e.to_string()));
219 }
220
221 self.update_user_notification_count(&user.id, 0).await?;
222 Ok(())
223 }
224
225 pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
226 let conn = match self.0.connect().await {
227 Ok(c) => c,
228 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
229 };
230
231 let res = execute!(
232 &conn,
233 "DELETE FROM a_notifications WHERE owner = $1 AND tag = $2",
234 params![&user.id.printable(), tag]
235 );
236
237 if let Err(e) = res {
238 return Err(Error::DatabaseError(e.to_string()));
239 }
240
241 Ok(())
242 }
243
244 pub async fn update_notification_read(
245 &self,
246 id: &Id,
247 new_read: bool,
248 user: &User,
249 ) -> Result<()> {
250 let y = self.get_notification_by_id(&id).await?;
251
252 if y.owner != user.id
253 && !user
254 .permissions
255 .contains(&UserPermission::ManageNotifications)
256 {
257 return Err(Error::NotAllowed);
258 }
259
260 let conn = match self.0.connect().await {
262 Ok(c) => c,
263 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
264 };
265
266 let res = execute!(
267 &conn,
268 "UPDATE a_notifications SET read = $1 WHERE id = $2",
269 params![&{ if new_read { 1 } else { 0 } }, &id.printable()]
270 );
271
272 if let Err(e) = res {
273 return Err(Error::DatabaseError(e.to_string()));
274 }
275
276 self.0.1.remove(format!("atto.notification:{}", id)).await;
277
278 if (y.read) && (!new_read) {
279 self.incr_user_notifications(&user.id).await?;
280 } else if (!y.read) && (new_read) {
281 self.decr_user_notifications(&user.id).await?;
282 }
283
284 Ok(())
285 }
286
287 pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
288 let notifications = self.get_notifications_by_owner(&user.id).await?;
289
290 let mut changed_count: i32 = 0;
291 for notification in notifications {
292 if notification.read == read {
293 continue;
295 }
296
297 changed_count += 1;
298
299 self.0
300 .1
301 .remove(format!("atto.notification:{}", notification.id))
302 .await;
303 }
304
305 let conn = match self.0.connect().await {
307 Ok(c) => c,
308 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
309 };
310
311 let res = execute!(
312 &conn,
313 "UPDATE a_notifications SET read = $1 WHERE owner = $2",
314 params![&{ if read { 1 } else { 0 } }, &user.id.printable()]
315 );
316
317 if let Err(e) = res {
318 return Err(Error::DatabaseError(e.to_string()));
319 }
320
321 if !read {
323 self.update_user_notification_count(&user.id, changed_count)
325 .await?;
326 } else {
327 self.update_user_notification_count(&user.id, 0).await?;
328 }
329
330 Ok(())
332 }
333}