Skip to main content

tetratto_core2/database/
notifications.rs

1use oiseau::cache::Cache;
2use crate::model::id::Id;
3use crate::model::socket::{CrudMessageType, PacketType, SocketMessage, SocketMethod};
4use crate::model::{Error, Result, auth::Notification, auth::User, permissions::FinePermission};
5use crate::{auto_method, DataManager};
6
7use oiseau::{PostgresRow, cache::redis::Commands};
8use oiseau::{execute, get, query_rows, params};
9
10impl DataManager {
11    /// Get a [`Notification`] from an SQL row.
12    pub(crate) fn get_notification_from_row(x: &PostgresRow) -> Notification {
13        Notification {
14            id: crate::model::id::Id::deserialize(&get!(x->0(String))),
15            created: get!(x->1(i64)) as u128,
16            title: get!(x->2(String)),
17            content: get!(x->3(String)),
18            owner: crate::model::id::Id::deserialize(&get!(x->4(String))),
19            read: get!(x->5(i32)) as i8 == 1,
20            tag: get!(x->6(String)),
21        }
22    }
23
24    auto_method!(get_notification_by_id()@get_notification_from_row -> "SELECT * FROM notifications WHERE id = $1" --name="notification" --returns=Notification --cache-key-tmpl="atto.notification:{}");
25
26    /// Get all notifications by `owner`.
27    pub async fn get_notifications_by_owner(&self, owner: &Id) -> Result<Vec<Notification>> {
28        let conn = match self.0.connect().await {
29            Ok(c) => c,
30            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
31        };
32
33        let res = query_rows!(
34            &conn,
35            "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC",
36            &[&owner.printable()],
37            |x| { Self::get_notification_from_row(x) }
38        );
39
40        if res.is_err() {
41            return Err(Error::GeneralNotFound("notification".to_string()));
42        }
43
44        Ok(res.unwrap())
45    }
46
47    /// Get all notifications by `owner` (paginated).
48    pub async fn get_notifications_by_owner_paginated(
49        &self,
50        owner: &Id,
51        batch: usize,
52        page: usize,
53    ) -> Result<Vec<Notification>> {
54        let conn = match self.0.connect().await {
55            Ok(c) => c,
56            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
57        };
58
59        let res = query_rows!(
60            &conn,
61            "SELECT * FROM notifications WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
62            &[
63                &owner.printable(),
64                &(batch as i64),
65                &((page * batch) as i64)
66            ],
67            |x| { Self::get_notification_from_row(x) }
68        );
69
70        if res.is_err() {
71            return Err(Error::GeneralNotFound("notification".to_string()));
72        }
73
74        Ok(res.unwrap())
75    }
76
77    /// Get all notifications by `tag`.
78    pub async fn get_notifications_by_tag(&self, tag: &str) -> Result<Vec<Notification>> {
79        let conn = match self.0.connect().await {
80            Ok(c) => c,
81            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
82        };
83
84        let res = query_rows!(
85            &conn,
86            "SELECT * FROM notifications WHERE tag = $1 ORDER BY created DESC",
87            &[&tag],
88            |x| { Self::get_notification_from_row(x) }
89        );
90
91        if res.is_err() {
92            return Err(Error::GeneralNotFound("notification".to_string()));
93        }
94
95        Ok(res.unwrap())
96    }
97
98    /// Create a new notification in the database.
99    ///
100    /// # Arguments
101    /// * `data` - a mock [`Notification`] object to insert
102    pub async fn create_notification(&self, data: Notification) -> Result<()> {
103        let conn = match self.0.connect().await {
104            Ok(c) => c,
105            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
106        };
107
108        let res = execute!(
109            &conn,
110            "INSERT INTO notifications VALUES ($1, $2, $3, $4, $5, $6, $7)",
111            params![
112                &data.id.printable(),
113                &(data.created as i64),
114                &data.title,
115                &data.content,
116                &data.owner.printable(),
117                &{ if data.read { 1 } else { 0 } },
118                &data.tag
119            ]
120        );
121
122        if let Err(e) = res {
123            return Err(Error::DatabaseError(e.to_string()));
124        }
125
126        // incr notification count
127        self.incr_user_notifications(&data.owner).await?;
128
129        // post event
130        let mut con = self.0.1.get_con().await;
131
132        if let Err(e) = con.publish::<String, String, ()>(
133            format!("{}/notifs", data.owner),
134            serde_json::to_string(&SocketMessage {
135                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Create)),
136                data: serde_json::to_string(&data).unwrap(),
137            })
138            .unwrap(),
139        ) {
140            return Err(Error::MiscError(e.to_string()));
141        }
142
143        // return
144        Ok(())
145    }
146
147    pub async fn delete_notification(&self, id: &crate::model::id::Id, user: &User) -> Result<()> {
148        let notification = self.get_notification_by_id(id).await?;
149
150        if user.id != notification.owner
151            && !user.permissions.check(FinePermission::ManageNotifications)
152        {
153            return Err(Error::NotAllowed);
154        }
155
156        let conn = match self.0.connect().await {
157            Ok(c) => c,
158            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
159        };
160
161        let res = execute!(
162            &conn,
163            "DELETE FROM notifications WHERE id = $1",
164            &[&id.printable()]
165        );
166
167        if let Err(e) = res {
168            return Err(Error::DatabaseError(e.to_string()));
169        }
170
171        self.0.1.remove(format!("atto.notification:{}", id)).await;
172
173        // decr notification count
174        if !notification.read {
175            self.decr_user_notifications(&notification.owner)
176                .await
177                .unwrap();
178        }
179
180        // post event
181        let mut con = self.0.1.get_con().await;
182
183        if let Err(e) = con.publish::<String, String, ()>(
184            format!("{}/notifs", notification.owner),
185            serde_json::to_string(&SocketMessage {
186                method: SocketMethod::Packet(PacketType::Crud(CrudMessageType::Delete)),
187                data: notification.id.to_string(),
188            })
189            .unwrap(),
190        ) {
191            return Err(Error::MiscError(e.to_string()));
192        }
193
194        // return
195        Ok(())
196    }
197
198    pub async fn delete_all_notifications(&self, user: &User) -> Result<()> {
199        let conn = match self.0.connect().await {
200            Ok(c) => c,
201            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
202        };
203
204        let res = execute!(
205            &conn,
206            "DELETE FROM notifications WHERE owner = $1",
207            &[&user.id.printable()]
208        );
209
210        if let Err(e) = res {
211            return Err(Error::DatabaseError(e.to_string()));
212        }
213
214        self.update_user_notification_count(&user.id, 0).await?;
215        Ok(())
216    }
217
218    pub async fn delete_all_notifications_by_tag(&self, user: &User, tag: &str) -> Result<()> {
219        let conn = match self.0.connect().await {
220            Ok(c) => c,
221            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
222        };
223
224        let res = execute!(
225            &conn,
226            "DELETE FROM notifications WHERE owner = $1 AND tag = $2",
227            params![&user.id.printable(), tag]
228        );
229
230        if let Err(e) = res {
231            return Err(Error::DatabaseError(e.to_string()));
232        }
233
234        Ok(())
235    }
236
237    pub async fn update_notification_read(
238        &self,
239        id: &crate::model::id::Id,
240        new_read: bool,
241        user: &User,
242    ) -> Result<()> {
243        let y = self.get_notification_by_id(id).await?;
244
245        if y.owner != user.id && !user.permissions.check(FinePermission::ManageNotifications) {
246            return Err(Error::NotAllowed);
247        }
248
249        // ...
250        let conn = match self.0.connect().await {
251            Ok(c) => c,
252            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
253        };
254
255        let res = execute!(
256            &conn,
257            "UPDATE notifications SET read = $1 WHERE id = $2",
258            params![&{ if new_read { 1 } else { 0 } }, &id.printable()]
259        );
260
261        if let Err(e) = res {
262            return Err(Error::DatabaseError(e.to_string()));
263        }
264
265        self.0.1.remove(format!("atto.notification:{}", id)).await;
266
267        if (y.read) && (!new_read) {
268            self.incr_user_notifications(&user.id).await?;
269        } else if (!y.read) && (new_read) {
270            self.decr_user_notifications(&user.id).await?;
271        }
272
273        Ok(())
274    }
275
276    pub async fn update_all_notifications_read(&self, user: &User, read: bool) -> Result<()> {
277        let notifications = self.get_notifications_by_owner(&user.id).await?;
278
279        let mut changed_count: i32 = 0;
280        for notification in notifications {
281            if notification.read == read {
282                // no need to update this
283                continue;
284            }
285
286            changed_count += 1;
287
288            self.0
289                .1
290                .remove(format!("atto.notification:{}", notification.id))
291                .await;
292        }
293
294        // execute
295        let conn = match self.0.connect().await {
296            Ok(c) => c,
297            Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
298        };
299
300        let res = execute!(
301            &conn,
302            "UPDATE notifications SET read = $1 WHERE owner = $2",
303            params![&{ if read { 1 } else { 0 } }, &user.id.printable()]
304        );
305
306        if let Err(e) = res {
307            return Err(Error::DatabaseError(e.to_string()));
308        }
309
310        // use changed_count to update user counts
311        if !read {
312            // we don't need to update when marking things as read since that should just be 0
313            self.update_user_notification_count(&user.id, changed_count)
314                .await?;
315        } else {
316            self.update_user_notification_count(&user.id, 0).await?;
317        }
318
319        // ...
320        Ok(())
321    }
322}