1use crate::{
2 DataManager,
3 model::{
4 Error, Notification, Result, User, UserPermission,
5 socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod},
6 },
7};
8use oiseau::{
9 PostgresRow,
10 cache::{Cache, redis::Commands},
11 execute, get, params, query_rows,
12};
13use tetratto_core::auto_method;
14
15impl DataManager {
16 pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
18 Notification {
19 id: get!(x->0(i64)) as usize,
20 created: get!(x->1(i64)) as usize,
21 title: get!(x->2(String)),
22 content: get!(x->3(String)),
23 owner: get!(x->4(i64)) as usize,
24 read: get!(x->5(i32)) as i8 == 1,
25 tag: get!(x->6(String)),
26 }
27 }
28
29 auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM a_notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
30
31 pub async fn get_notifications_by_owner(&self, owner: usize) -> Result<Vec<Notification>> {
33 let conn = match self.0.connect().await {
34 Ok(c) => c,
35 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
36 };
37
38 let res = query_rows!(
39 &conn,
40 "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC",
41 &[&(owner as i64)],
42 |x| { Self::get_notification_from_row(x) }
43 );
44
45 if res.is_err() {
46 return Err(Error::GeneralNotFound("notification".to_string()));
47 }
48
49 Ok(res.unwrap())
50 }
51
52 pub async fn get_notifications_by_owner_paginated(
54 &self,
55 owner: usize,
56 batch: usize,
57 page: usize,
58 ) -> Result<Vec<Notification>> {
59 let conn = match self.0.connect().await {
60 Ok(c) => c,
61 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
62 };
63
64 let res = query_rows!(
65 &conn,
66 "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
67 &[&(owner as i64), &(batch as i64), &((page * batch) as i64)],
68 |x| { Self::get_notification_from_row(x) }
69 );
70
71 if res.is_err() {
72 return Err(Error::GeneralNotFound("notification".to_string()));
73 }
74
75 Ok(res.unwrap())
76 }
77
78 pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
80 let conn = match self.0.connect().await {
81 Ok(c) => c,
82 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
83 };
84
85 let res = query_rows!(
86 &conn,
87 "SELECT * FROM a_notifications WHERE tag = $1 ORDER BY created DESC",
88 &[&tag],
89 |x| { Self::get_notification_from_row(x) }
90 );
91
92 if res.is_err() {
93 return Err(Error::GeneralNotFound("notification".to_string()));
94 }
95
96 Ok(res.unwrap())
97 }
98
99 pub async fn create_notification(&self, data: Notification) -> Result<()> {
104 let conn = match self.0.connect().await {
105 Ok(c) => c,
106 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
107 };
108
109 let res = execute!(
110 &conn,
111 "INSERT INTO a_notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
112 params![
113 &(data.id as i64),
114 &(data.created as i64),
115 &data.title,
116 &data.content,
117 &(data.owner as i64),
118 &{ if data.read { 1 } else { 0 } },
119 &data.tag
120 ]
121 );
122
123 if let Err(e) = res {
124 return Err(Error::DatabaseError(e.to_string()));
125 }
126
127 self.incr_user_notifications(data.owner).await?;
129
130 let mut con = self.0.1.get_con().await;
132
133 if let Err(e) = con.publish::<String, String, ()>(
134 format!("{}/notifs", data.owner),
135 serde_json::to_string(&SocketMessage {
136 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
137 data: serde_json::to_string(&data).unwrap(),
138 })
139 .unwrap(),
140 ) {
141 return Err(Error::MiscError(e.to_string()));
142 }
143
144 Ok(())
146 }
147
148 pub async fn delete_notification(&self, id: usize, user: &User) -> Result<()> {
149 let notification = self.get_notification_by_id(id).await?;
150
151 if user.id != notification.owner
152 && !user
153 .permissions
154 .contains(&UserPermission::ManageNotifications)
155 {
156 return Err(Error::NotAllowed);
157 }
158
159 let conn = match self.0.connect().await {
160 Ok(c) => c,
161 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
162 };
163
164 let res = execute!(
165 &conn,
166 "DELETE FROM a_notifications WHERE id = $1",
167 &[&(id as i64)]
168 );
169
170 if let Err(e) = res {
171 return Err(Error::DatabaseError(e.to_string()));
172 }
173
174 self.0.1.remove(format!("atto.notification:{}", id)).await;
175
176 if !notification.read {
178 self.decr_user_notifications(notification.owner)
179 .await
180 .unwrap();
181 }
182
183 let mut con = self.0.1.get_con().await;
185
186 if let Err(e) = con.publish::<String, String, ()>(
187 format!("{}/notifs", notification.owner),
188 serde_json::to_string(&SocketMessage {
189 method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
190 data: notification.id.to_string(),
191 })
192 .unwrap(),
193 ) {
194 return Err(Error::MiscError(e.to_string()));
195 }
196
197 Ok(())
199 }
200
201 pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
202 let conn = match self.0.connect().await {
203 Ok(c) => c,
204 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
205 };
206
207 let res = execute!(
208 &conn,
209 "DELETE FROM a_notifications WHERE owner = $1",
210 &[&(user.id as i64)]
211 );
212
213 if let Err(e) = res {
214 return Err(Error::DatabaseError(e.to_string()));
215 }
216
217 self.update_user_notification_count(user.id, 0).await?;
218 Ok(())
219 }
220
221 pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
222 let conn = match self.0.connect().await {
223 Ok(c) => c,
224 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
225 };
226
227 let res = execute!(
228 &conn,
229 "DELETE FROM a_notifications WHERE owner = $1 AND tag = $2",
230 params![&(user.id as i64), tag]
231 );
232
233 if let Err(e) = res {
234 return Err(Error::DatabaseError(e.to_string()));
235 }
236
237 Ok(())
238 }
239
240 pub async fn update_notification_read(
241 &self,
242 id: usize,
243 new_read: bool,
244 user: &User,
245 ) -> Result<()> {
246 let y = self.get_notification_by_id(id).await?;
247
248 if y.owner != user.id
249 && !user
250 .permissions
251 .contains(&UserPermission::ManageNotifications)
252 {
253 return Err(Error::NotAllowed);
254 }
255
256 let conn = match self.0.connect().await {
258 Ok(c) => c,
259 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
260 };
261
262 let res = execute!(
263 &conn,
264 "UPDATE a_notifications SET read = $1 WHERE id = $2",
265 params![&{ if new_read { 1 } else { 0 } }, &(id as i64)]
266 );
267
268 if let Err(e) = res {
269 return Err(Error::DatabaseError(e.to_string()));
270 }
271
272 self.0.1.remove(format!("atto.notification:{}", id)).await;
273
274 if (y.read) && (!new_read) {
275 self.incr_user_notifications(user.id).await?;
276 } else if (!y.read) && (new_read) {
277 self.decr_user_notifications(user.id).await?;
278 }
279
280 Ok(())
281 }
282
283 pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
284 let notifications = self.get_notifications_by_owner(user.id).await?;
285
286 let mut changed_count: i32 = 0;
287 for notification in notifications {
288 if notification.read == read {
289 continue;
291 }
292
293 changed_count += 1;
294
295 self.0
296 .1
297 .remove(format!("atto.notification:{}", notification.id))
298 .await;
299 }
300
301 let conn = match self.0.connect().await {
303 Ok(c) => c,
304 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
305 };
306
307 let res = execute!(
308 &conn,
309 "UPDATE a_notifications SET read = $1 WHERE owner = $2",
310 params![&{ if read { 1 } else { 0 } }, &(user.id as i64)]
311 );
312
313 if let Err(e) = res {
314 return Err(Error::DatabaseError(e.to_string()));
315 }
316
317 if !read {
319 self.update_user_notification_count(user.id, changed_count)
321 .await?;
322 } else {
323 self.update_user_notification_count(user.id, 0).await?;
324 }
325
326 Ok(())
328 }
329}