Skip to main content

autter_core/database/
notifications.rs

1use crate::{
2    DataManager,
3    model::{
4        Error, Notification, Result, User, UserPermission,
5        socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod},
6    },
7};
8use oiseau::{
9    PostgresRow,
10    cache::{Cache, redis::Commands},
11    execute, get, params, query_rows,
12};
13use tetratto_core2::{auto_method, model::id::Id};
14
15impl DataManager {
16    /// Get a [`Notification`] from an SQL row.
17    pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
18        Notification {
19            id: Id::deserialize(&get!(x->0(String))),
20            created: get!(x->1(i64)) as u128,
21            title: get!(x->2(String)),
22            content: get!(x->3(String)),
23            owner: Id::Legacy(get!(x->4(i64)) as usize),
24            read: get!(x->5(i32)) as i8 == 1,
25            tag: get!(x->6(String)),
26        }
27    }
28
29    auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM a_notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
30
31    /// Get all notifications by `owner`.
32    pub async fn get_notifications_by_owner(&self, owner: &Id) -> Result<Vec<Notification>> {
33        let conn = match self.0.connect().await {
34            Ok(c) => c,
35            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
36        };
37
38        let res = query_rows!(
39            &conn,
40            "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC",
41            &[&owner.printable()],
42            |x| { Self::get_notification_from_row(x) }
43        );
44
45        if res.is_err() {
46            return Err(Error::GeneralNotFound("notification".to_string()));
47        }
48
49        Ok(res.unwrap())
50    }
51
52    /// Get all notifications by `owner` (paginated).
53    pub async fn get_notifications_by_owner_paginated(
54        &self,
55        owner: &Id,
56        batch: usize,
57        page: usize,
58    ) -> Result<Vec<Notification>> {
59        let conn = match self.0.connect().await {
60            Ok(c) => c,
61            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
62        };
63
64        let res = query_rows!(
65            &conn,
66            "SELECT * FROM a_notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
67            &[
68                &owner.printable(),
69                &(batch as i64),
70                &((page * batch) as i64)
71            ],
72            |x| { Self::get_notification_from_row(x) }
73        );
74
75        if res.is_err() {
76            return Err(Error::GeneralNotFound("notification".to_string()));
77        }
78
79        Ok(res.unwrap())
80    }
81
82    /// Get all notifications by `tag`.
83    pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
84        let conn = match self.0.connect().await {
85            Ok(c) => c,
86            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
87        };
88
89        let res = query_rows!(
90            &conn,
91            "SELECT * FROM a_notifications WHERE tag = $1 ORDER BY created DESC",
92            &[&tag],
93            |x| { Self::get_notification_from_row(x) }
94        );
95
96        if res.is_err() {
97            return Err(Error::GeneralNotFound("notification".to_string()));
98        }
99
100        Ok(res.unwrap())
101    }
102
103    /// Create a new notification in the database.
104    ///
105    /// # Arguments
106    /// * `data` - a mock [`Notification`] object to insert
107    pub async fn create_notification(&self, data: Notification) -> Result<()> {
108        let conn = match self.0.connect().await {
109            Ok(c) => c,
110            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
111        };
112
113        let res = execute!(
114            &conn,
115            "INSERT INTO a_notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
116            params![
117                &data.id.printable(),
118                &(data.created as i64),
119                &data.title,
120                &data.content,
121                &(data.owner.as_usize() as i64),
122                &{ if data.read { 1 } else { 0 } },
123                &data.tag
124            ]
125        );
126
127        if let Err(e) = res {
128            return Err(Error::DatabaseError(e.to_string()));
129        }
130
131        // incr notification count
132        self.incr_user_notifications(&data.owner).await?;
133
134        // post event
135        let mut con = self.0.1.get_con().await;
136
137        if let Err(e) = con.publish::<String, String, ()>(
138            format!("{}/notifs", data.owner),
139            serde_json::to_string(&SocketMessage {
140                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
141                data: serde_json::to_string(&data).unwrap(),
142            })
143            .unwrap(),
144        ) {
145            return Err(Error::MiscError(e.to_string()));
146        }
147
148        // return
149        Ok(())
150    }
151
152    pub async fn delete_notification(&self, id: &Id, user: &User) -> Result<()> {
153        let notification = self.get_notification_by_id(&id).await?;
154
155        if user.id != notification.owner
156            && !user
157                .permissions
158                .contains(&UserPermission::ManageNotifications)
159        {
160            return Err(Error::NotAllowed);
161        }
162
163        let conn = match self.0.connect().await {
164            Ok(c) => c,
165            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
166        };
167
168        let res = execute!(
169            &conn,
170            "DELETE FROM a_notifications WHERE id = $1",
171            &[&id.printable()]
172        );
173
174        if let Err(e) = res {
175            return Err(Error::DatabaseError(e.to_string()));
176        }
177
178        self.0.1.remove(format!("atto.notification:{}", id)).await;
179
180        // decr notification count
181        if !notification.read {
182            self.decr_user_notifications(&notification.owner)
183                .await
184                .unwrap();
185        }
186
187        // post event
188        let mut con = self.0.1.get_con().await;
189
190        if let Err(e) = con.publish::<String, String, ()>(
191            format!("{}/notifs", notification.owner),
192            serde_json::to_string(&SocketMessage {
193                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
194                data: notification.id.to_string(),
195            })
196            .unwrap(),
197        ) {
198            return Err(Error::MiscError(e.to_string()));
199        }
200
201        // return
202        Ok(())
203    }
204
205    pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
206        let conn = match self.0.connect().await {
207            Ok(c) => c,
208            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
209        };
210
211        let res = execute!(
212            &conn,
213            "DELETE FROM a_notifications WHERE owner = $1",
214            &[&user.id.printable()]
215        );
216
217        if let Err(e) = res {
218            return Err(Error::DatabaseError(e.to_string()));
219        }
220
221        self.update_user_notification_count(&user.id, 0).await?;
222        Ok(())
223    }
224
225    pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
226        let conn = match self.0.connect().await {
227            Ok(c) => c,
228            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
229        };
230
231        let res = execute!(
232            &conn,
233            "DELETE FROM a_notifications WHERE owner = $1 AND tag = $2",
234            params![&user.id.printable(), tag]
235        );
236
237        if let Err(e) = res {
238            return Err(Error::DatabaseError(e.to_string()));
239        }
240
241        Ok(())
242    }
243
244    pub async fn update_notification_read(
245        &self,
246        id: &Id,
247        new_read: bool,
248        user: &User,
249    ) -> Result<()> {
250        let y = self.get_notification_by_id(&id).await?;
251
252        if y.owner != user.id
253            && !user
254                .permissions
255                .contains(&UserPermission::ManageNotifications)
256        {
257            return Err(Error::NotAllowed);
258        }
259
260        // ...
261        let conn = match self.0.connect().await {
262            Ok(c) => c,
263            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
264        };
265
266        let res = execute!(
267            &conn,
268            "UPDATE a_notifications SET read = $1 WHERE id = $2",
269            params![&{ if new_read { 1 } else { 0 } }, &id.printable()]
270        );
271
272        if let Err(e) = res {
273            return Err(Error::DatabaseError(e.to_string()));
274        }
275
276        self.0.1.remove(format!("atto.notification:{}", id)).await;
277
278        if (y.read) && (!new_read) {
279            self.incr_user_notifications(&user.id).await?;
280        } else if (!y.read) && (new_read) {
281            self.decr_user_notifications(&user.id).await?;
282        }
283
284        Ok(())
285    }
286
287    pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
288        let notifications = self.get_notifications_by_owner(&user.id).await?;
289
290        let mut changed_count: i32 = 0;
291        for notification in notifications {
292            if notification.read == read {
293                // no need to update this
294                continue;
295            }
296
297            changed_count += 1;
298
299            self.0
300                .1
301                .remove(format!("atto.notification:{}", notification.id))
302                .await;
303        }
304
305        // execute
306        let conn = match self.0.connect().await {
307            Ok(c) => c,
308            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
309        };
310
311        let res = execute!(
312            &conn,
313            "UPDATE a_notifications SET read = $1 WHERE owner = $2",
314            params![&{ if read { 1 } else { 0 } }, &user.id.printable()]
315        );
316
317        if let Err(e) = res {
318            return Err(Error::DatabaseError(e.to_string()));
319        }
320
321        // use changed_count to update user counts
322        if !read {
323            // we don't need to update when marking things as read since that should just be 0
324            self.update_user_notification_count(&user.id, changed_count)
325                .await?;
326        } else {
327            self.update_user_notification_count(&user.id, 0).await?;
328        }
329
330        // ...
331        Ok(())
332    }
333}