autter_core/database/
notifications.rs

1use crate::{
2    DataManager,
3    model::{
4        Error, Notification, Result, User, UserPermission,
5        socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod},
6    },
7};
8use oiseau::{
9    PostgresRow,
10    cache::{Cache, redis::Commands},
11    execute, get, params, query_rows,
12};
13use tetratto_core::auto_method;
14
15impl DataManager {
16    /// Get a [`Notification`] from an SQL row.
17    pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
18        Notification {
19            id: get!(x->0(i64)) as usize,
20            created: get!(x->1(i64)) as usize,
21            title: get!(x->2(String)),
22            content: get!(x->3(String)),
23            owner: get!(x->4(i64)) as usize,
24            read: get!(x->5(i32)) as i8 == 1,
25            tag: get!(x->6(String)),
26        }
27    }
28
29    auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM a_notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
30
31    /// Get all notifications by `owner`.
32    pub async fn get_notifications_by_owner(&self, owner: usize) -> Result<Vec<Notification>> {
33        let conn = match self.0.connect().await {
34            Ok(c) => c,
35            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
36        };
37
38        let res = query_rows!(
39            &conn,
40            "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC",
41            &[&(owner as i64)],
42            |x| { Self::get_notification_from_row(x) }
43        );
44
45        if res.is_err() {
46            return Err(Error::GeneralNotFound("notification".to_string()));
47        }
48
49        Ok(res.unwrap())
50    }
51
52    /// Get all notifications by `owner` (paginated).
53    pub async fn get_notifications_by_owner_paginated(
54        &self,
55        owner: usize,
56        batch: usize,
57        page: usize,
58    ) -> Result<Vec<Notification>> {
59        let conn = match self.0.connect().await {
60            Ok(c) => c,
61            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
62        };
63
64        let res = query_rows!(
65            &conn,
66            "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
67            &[&(owner as i64), &(batch as i64), &((page * batch) as i64)],
68            |x| { Self::get_notification_from_row(x) }
69        );
70
71        if res.is_err() {
72            return Err(Error::GeneralNotFound("notification".to_string()));
73        }
74
75        Ok(res.unwrap())
76    }
77
78    /// Get all notifications by `tag`.
79    pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
80        let conn = match self.0.connect().await {
81            Ok(c) => c,
82            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
83        };
84
85        let res = query_rows!(
86            &conn,
87            "SELECT * FROM a_notifications WHERE tag = $1 ORDER BY created DESC",
88            &[&tag],
89            |x| { Self::get_notification_from_row(x) }
90        );
91
92        if res.is_err() {
93            return Err(Error::GeneralNotFound("notification".to_string()));
94        }
95
96        Ok(res.unwrap())
97    }
98
99    /// Create a new notification in the database.
100    ///
101    /// # Arguments
102    /// * `data` - a mock [`Notification`] object to insert
103    pub async fn create_notification(&self, data: Notification) -> Result<()> {
104        let conn = match self.0.connect().await {
105            Ok(c) => c,
106            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
107        };
108
109        let res = execute!(
110            &conn,
111            "INSERT INTO a_notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
112            params![
113                &(data.id as i64),
114                &(data.created as i64),
115                &data.title,
116                &data.content,
117                &(data.owner as i64),
118                &{ if data.read { 1 } else { 0 } },
119                &data.tag
120            ]
121        );
122
123        if let Err(e) = res {
124            return Err(Error::DatabaseError(e.to_string()));
125        }
126
127        // incr notification count
128        self.incr_user_notifications(data.owner).await?;
129
130        // post event
131        let mut con = self.0.1.get_con().await;
132
133        if let Err(e) = con.publish::<String, String, ()>(
134            format!("{}/notifs", data.owner),
135            serde_json::to_string(&SocketMessage {
136                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
137                data: serde_json::to_string(&data).unwrap(),
138            })
139            .unwrap(),
140        ) {
141            return Err(Error::MiscError(e.to_string()));
142        }
143
144        // return
145        Ok(())
146    }
147
148    pub async fn delete_notification(&self, id: usize, user: &User) -> Result<()> {
149        let notification = self.get_notification_by_id(id).await?;
150
151        if user.id != notification.owner
152            && !user
153                .permissions
154                .contains(&UserPermission::ManageNotifications)
155        {
156            return Err(Error::NotAllowed);
157        }
158
159        let conn = match self.0.connect().await {
160            Ok(c) => c,
161            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
162        };
163
164        let res = execute!(
165            &conn,
166            "DELETE FROM a_notifications WHERE id = $1",
167            &[&(id as i64)]
168        );
169
170        if let Err(e) = res {
171            return Err(Error::DatabaseError(e.to_string()));
172        }
173
174        self.0.1.remove(format!("atto.notification:{}", id)).await;
175
176        // decr notification count
177        if !notification.read {
178            self.decr_user_notifications(notification.owner)
179                .await
180                .unwrap();
181        }
182
183        // post event
184        let mut con = self.0.1.get_con().await;
185
186        if let Err(e) = con.publish::<String, String, ()>(
187            format!("{}/notifs", notification.owner),
188            serde_json::to_string(&SocketMessage {
189                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
190                data: notification.id.to_string(),
191            })
192            .unwrap(),
193        ) {
194            return Err(Error::MiscError(e.to_string()));
195        }
196
197        // return
198        Ok(())
199    }
200
201    pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
202        let conn = match self.0.connect().await {
203            Ok(c) => c,
204            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
205        };
206
207        let res = execute!(
208            &conn,
209            "DELETE FROM a_notifications WHERE owner = $1",
210            &[&(user.id as i64)]
211        );
212
213        if let Err(e) = res {
214            return Err(Error::DatabaseError(e.to_string()));
215        }
216
217        self.update_user_notification_count(user.id, 0).await?;
218        Ok(())
219    }
220
221    pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
222        let conn = match self.0.connect().await {
223            Ok(c) => c,
224            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
225        };
226
227        let res = execute!(
228            &conn,
229            "DELETE FROM a_notifications WHERE owner = $1 AND tag = $2",
230            params![&(user.id as i64), tag]
231        );
232
233        if let Err(e) = res {
234            return Err(Error::DatabaseError(e.to_string()));
235        }
236
237        Ok(())
238    }
239
240    pub async fn update_notification_read(
241        &self,
242        id: usize,
243        new_read: bool,
244        user: &User,
245    ) -> Result<()> {
246        let y = self.get_notification_by_id(id).await?;
247
248        if y.owner != user.id
249            && !user
250                .permissions
251                .contains(&UserPermission::ManageNotifications)
252        {
253            return Err(Error::NotAllowed);
254        }
255
256        // ...
257        let conn = match self.0.connect().await {
258            Ok(c) => c,
259            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
260        };
261
262        let res = execute!(
263            &conn,
264            "UPDATE a_notifications SET read = $1 WHERE id = $2",
265            params![&{ if new_read { 1 } else { 0 } }, &(id as i64)]
266        );
267
268        if let Err(e) = res {
269            return Err(Error::DatabaseError(e.to_string()));
270        }
271
272        self.0.1.remove(format!("atto.notification:{}", id)).await;
273
274        if (y.read) && (!new_read) {
275            self.incr_user_notifications(user.id).await?;
276        } else if (!y.read) && (new_read) {
277            self.decr_user_notifications(user.id).await?;
278        }
279
280        Ok(())
281    }
282
283    pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
284        let notifications = self.get_notifications_by_owner(user.id).await?;
285
286        let mut changed_count: i32 = 0;
287        for notification in notifications {
288            if notification.read == read {
289                // no need to update this
290                continue;
291            }
292
293            changed_count += 1;
294
295            self.0
296                .1
297                .remove(format!("atto.notification:{}", notification.id))
298                .await;
299        }
300
301        // execute
302        let conn = match self.0.connect().await {
303            Ok(c) => c,
304            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
305        };
306
307        let res = execute!(
308            &conn,
309            "UPDATE a_notifications SET read = $1 WHERE owner = $2",
310            params![&{ if read { 1 } else { 0 } }, &(user.id as i64)]
311        );
312
313        if let Err(e) = res {
314            return Err(Error::DatabaseError(e.to_string()));
315        }
316
317        // use changed_count to update user counts
318        if !read {
319            // we don't need to update when marking things as read since that should just be 0
320            self.update_user_notification_count(user.id, changed_count)
321                .await?;
322        } else {
323            self.update_user_notification_count(user.id, 0).await?;
324        }
325
326        // ...
327        Ok(())
328    }
329}