Skip to main content

rustio_admin/admin/
notifications.rs

1//! Per-operator notifications — a small `rustio_notifications`
2//! table + a public [`send`] helper for project code.
3//!
4//! Schema:
5//!
6//! ```sql
7//! CREATE TABLE rustio_notifications (
8//!     id         BIGSERIAL    PRIMARY KEY,
9//!     user_id    BIGINT       NOT NULL REFERENCES rustio_users(id) ON DELETE CASCADE,
10//!     message    TEXT         NOT NULL,
11//!     url        TEXT         NOT NULL DEFAULT '',
12//!     read_at    TIMESTAMPTZ,
13//!     created_at TIMESTAMPTZ  NOT NULL DEFAULT NOW()
14//! );
15//! ```
16//!
17//! Projects emit notifications via
18//! [`rustio_admin::send_notification`] — anywhere a `Db` is in
19//! scope. Every authenticated admin page renders a bell in the
20//! topbar with an unread-count badge; the operator clicks
21//! through to `/admin/notifications`, scans the list, and hits
22//! "Mark all read" to clear the badge.
23//!
24//! v1 scope: per-row dismissal is not yet shipped — the page
25//! offers a single "mark every notification for this operator
26//! as read" action. Operators with high notification volume can
27//! filter by date in a future iteration.
28
29use chrono::{DateTime, Utc};
30use sqlx::Row as _;
31
32use crate::error::Result;
33use crate::orm::Db;
34
35pub(crate) const CREATE_TABLE_SQL: &str = "CREATE TABLE IF NOT EXISTS rustio_notifications (
36    id         BIGSERIAL    PRIMARY KEY,
37    user_id    BIGINT       NOT NULL REFERENCES rustio_users(id) ON DELETE CASCADE,
38    message    TEXT         NOT NULL,
39    url        TEXT         NOT NULL DEFAULT '',
40    read_at    TIMESTAMPTZ,
41    created_at TIMESTAMPTZ  NOT NULL DEFAULT NOW()
42)";
43
44pub(crate) const CREATE_INDEX_SQL: &str =
45    "CREATE INDEX IF NOT EXISTS rustio_notifications_user_unread_idx \
46     ON rustio_notifications (user_id, read_at) WHERE read_at IS NULL";
47
48// public:
49/// Ensure the `rustio_notifications` table + its unread-lookup
50/// index exist. Idempotent.
51pub async fn ensure_table(db: &Db) -> Result<()> {
52    sqlx::query(CREATE_TABLE_SQL).execute(db.pool()).await?;
53    sqlx::query(CREATE_INDEX_SQL).execute(db.pool()).await?;
54    Ok(())
55}
56
57// public:
58/// One stored notification. Surfaced by [`list_for_user`] for
59/// the admin UI.
60#[derive(Debug, Clone)]
61pub struct Notification {
62    pub id: i64,
63    pub user_id: i64,
64    pub message: String,
65    /// Optional click-through URL — empty string when the
66    /// notification is informational only.
67    pub url: String,
68    /// `None` while the notification is unread; set to the
69    /// dismissal timestamp once the operator hits "mark all read".
70    pub read_at: Option<DateTime<Utc>>,
71    pub created_at: DateTime<Utc>,
72}
73
74// public:
75/// Persist one notification targeted at `user_id`. `url` may be
76/// empty when the message stands on its own (no click-through).
77/// Returns the new row's id on success.
78///
79/// Project code calls this anywhere a `Db` is in scope — request
80/// handlers, background jobs, periodic tasks. The framework's
81/// own audit pipeline does not emit notifications today; this
82/// is a project-facing surface.
83pub async fn send(db: &Db, user_id: i64, message: &str, url: &str) -> Result<i64> {
84    ensure_table(db).await?;
85    let id: i64 = sqlx::query_scalar(
86        "INSERT INTO rustio_notifications (user_id, message, url) \
87         VALUES ($1, $2, $3) RETURNING id",
88    )
89    .bind(user_id)
90    .bind(message)
91    .bind(url)
92    .fetch_one(db.pool())
93    .await?;
94    Ok(id)
95}
96
97/// Unread count for `user_id`. Page handlers fetch this once
98/// per render and pin it on `BaseContext` via
99/// `with_unread_count`; the topbar badge in `_topbar.html`
100/// branches on the result. Failure-soft — returns `0` on any
101/// DB hiccup so the topbar stays mute rather than 500ing.
102pub(crate) async fn unread_count(db: &Db, user_id: i64) -> i64 {
103    let _ = ensure_table(db).await;
104    sqlx::query_scalar(
105        "SELECT COUNT(*) FROM rustio_notifications \
106         WHERE user_id = $1 AND read_at IS NULL",
107    )
108    .bind(user_id)
109    .fetch_one(db.pool())
110    .await
111    .unwrap_or(0)
112}
113
114/// List every notification for `user_id`, newest first. Empties
115/// to an empty vec on error so the page renders.
116pub(crate) async fn list_for_user(db: &Db, user_id: i64) -> Vec<Notification> {
117    let _ = ensure_table(db).await;
118    let rows = sqlx::query(
119        "SELECT id, user_id, message, url, read_at, created_at \
120         FROM rustio_notifications \
121         WHERE user_id = $1 \
122         ORDER BY created_at DESC LIMIT 200",
123    )
124    .bind(user_id)
125    .fetch_all(db.pool())
126    .await
127    .unwrap_or_default();
128    rows.iter()
129        .map(|r| Notification {
130            id: r.try_get("id").unwrap_or(0),
131            user_id: r.try_get("user_id").unwrap_or(0),
132            message: r.try_get("message").unwrap_or_default(),
133            url: r.try_get("url").unwrap_or_default(),
134            read_at: r.try_get("read_at").ok().flatten(),
135            created_at: r.try_get("created_at").unwrap_or_else(|_| Utc::now()),
136        })
137        .collect()
138}
139
140/// Mark every unread notification for `user_id` as read. Stamps
141/// `read_at = NOW()` on every affected row. Returns the count
142/// of rows updated.
143pub(crate) async fn mark_all_read(db: &Db, user_id: i64) -> i64 {
144    let _ = ensure_table(db).await;
145    let result = sqlx::query(
146        "UPDATE rustio_notifications \
147         SET read_at = NOW() \
148         WHERE user_id = $1 AND read_at IS NULL",
149    )
150    .bind(user_id)
151    .execute(db.pool())
152    .await;
153    match result {
154        Ok(r) => r.rows_affected() as i64,
155        Err(e) => {
156            log::warn!("notifications::mark_all_read({user_id}): {e}");
157            0
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn create_table_sql_is_idempotent_shape() {
168        // The SQL is a stable contract for the framework's boot
169        // path — anything that breaks `IF NOT EXISTS` re-runs is
170        // a regression.
171        assert!(CREATE_TABLE_SQL.contains("IF NOT EXISTS"));
172        assert!(CREATE_INDEX_SQL.contains("IF NOT EXISTS"));
173    }
174}