Skip to main content

postcrate_core/db/
mailboxes.rs

1//! Mailbox row storage.
2//!
3//! Note: this module is `pub` because [`crate::Service`] re-exports the
4//! `Mailbox`, `CreateMailboxInput`, etc. types — they're part of the
5//! library's wire surface.
6
7use chrono::Utc;
8use serde::{Deserialize, Serialize};
9use sqlx::{Row, SqlitePool};
10use uuid::Uuid;
11
12use crate::error::{Error, Result};
13use crate::mailbox::kinds::MailboxKind;
14
15/// A mailbox row joined with its current message count.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[cfg_attr(feature = "specta", derive(specta::Type))]
18#[serde(rename_all = "camelCase")]
19pub struct Mailbox {
20    pub id: String,
21    pub project_id: String,
22    pub name: String,
23    pub port: u16,
24    pub kind: MailboxKind,
25    pub ttl_seconds: Option<u64>,
26    pub expires_at: Option<i64>,
27    pub failed: bool,
28    pub fail_reason: Option<String>,
29    pub created_at: i64,
30    pub count: i64,
31    /// When true, the listener for this mailbox wraps every accepted
32    /// socket in rustls *before* the SMTP banner (RFC 8314 §3.3,
33    /// port-465 style). Requires `--features tls`.
34    pub implicit_tls: bool,
35    /// User-stopped (not failed): the mailbox exists but its listener
36    /// is intentionally not bound. Survives app restarts so Stop sticks.
37    pub paused: bool,
38}
39
40#[derive(Debug, Clone, Deserialize)]
41#[cfg_attr(feature = "specta", derive(specta::Type))]
42#[serde(rename_all = "camelCase")]
43pub struct CreateMailboxInput {
44    pub project_id: String,
45    pub name: String,
46    pub kind: MailboxKind,
47    pub port: Option<u16>,
48    pub ttl_seconds: Option<u64>,
49    #[serde(default)]
50    pub implicit_tls: bool,
51}
52
53#[derive(Debug, Clone, Default, Deserialize)]
54#[cfg_attr(feature = "specta", derive(specta::Type))]
55#[serde(rename_all = "camelCase")]
56pub struct UpdateMailboxInput {
57    pub name: Option<String>,
58    pub port: Option<u16>,
59    pub ttl_seconds: Option<Option<u64>>, // None means leave alone; Some(None) means clear
60}
61
62#[derive(Debug, Clone, Deserialize)]
63#[cfg_attr(feature = "specta", derive(specta::Type))]
64#[serde(rename_all = "camelCase")]
65pub struct CreateEphemeralInput {
66    pub project_id: String,
67    pub name: Option<String>,
68    pub ttl_seconds: u64,
69}
70
71#[derive(Debug, Clone, Serialize)]
72#[cfg_attr(feature = "specta", derive(specta::Type))]
73#[serde(rename_all = "camelCase")]
74pub struct EphemeralHandle {
75    pub id: String,
76    pub host: String,
77    pub port: u16,
78    pub expires_at: i64,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) struct MailboxRow {
83    pub id: String,
84    pub project_id: String,
85    pub name: String,
86    pub port: u16,
87    pub kind: MailboxKind,
88    pub ttl_seconds: Option<u64>,
89    pub expires_at: Option<i64>,
90    pub failed: bool,
91    pub fail_reason: Option<String>,
92    pub created_at: i64,
93    pub implicit_tls: bool,
94    pub paused: bool,
95}
96
97impl MailboxRow {
98    pub(crate) fn with_count(self, count: i64) -> Mailbox {
99        Mailbox {
100            id: self.id,
101            project_id: self.project_id,
102            name: self.name,
103            port: self.port,
104            kind: self.kind,
105            ttl_seconds: self.ttl_seconds,
106            expires_at: self.expires_at,
107            failed: self.failed,
108            fail_reason: self.fail_reason,
109            created_at: self.created_at,
110            implicit_tls: self.implicit_tls,
111            paused: self.paused,
112            count,
113        }
114    }
115}
116
117pub(crate) async fn insert(
118    pool: &SqlitePool,
119    project_id: &str,
120    name: &str,
121    port: u16,
122    kind: MailboxKind,
123    ttl_seconds: Option<u64>,
124    implicit_tls: bool,
125) -> Result<MailboxRow> {
126    let now = Utc::now().timestamp_millis();
127    let expires_at = ttl_seconds.map(|t| now + (t as i64) * 1000);
128    let id = Uuid::new_v4().to_string();
129
130    let res = sqlx::query(
131        r"INSERT INTO mailboxes
132            (id, project_id, name, port, kind, ttl_seconds, expires_at, created_at, implicit_tls)
133          VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
134    )
135    .bind(&id)
136    .bind(project_id)
137    .bind(name)
138    .bind(i64::from(port))
139    .bind(kind.as_str())
140    .bind(ttl_seconds.map(|t| t as i64))
141    .bind(expires_at)
142    .bind(now)
143    .bind(i64::from(implicit_tls))
144    .execute(pool)
145    .await;
146
147    match res {
148        Ok(_) => Ok(MailboxRow {
149            id,
150            project_id: project_id.to_string(),
151            name: name.to_string(),
152            port,
153            kind,
154            ttl_seconds,
155            expires_at,
156            failed: false,
157            fail_reason: None,
158            created_at: now,
159            implicit_tls,
160            paused: false,
161        }),
162        Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
163            // Either name collision in project, or port collision globally.
164            let msg = e.message().to_lowercase();
165            if msg.contains("mailboxes.port") {
166                Err(Error::PortInUse(port))
167            } else {
168                Err(Error::DuplicateMailbox(name.to_string()))
169            }
170        }
171        Err(e) => Err(e.into()),
172    }
173}
174
175pub(crate) async fn get(pool: &SqlitePool, id: &str) -> Result<MailboxRow> {
176    let row = sqlx::query(
177        r"SELECT id, project_id, name, port, kind, ttl_seconds, expires_at,
178                 failed, fail_reason, created_at, implicit_tls, paused
179          FROM mailboxes WHERE id = ?",
180    )
181    .bind(id)
182    .fetch_optional(pool)
183    .await?
184    .ok_or_else(|| Error::MailboxNotFound(id.to_string()))?;
185
186    Ok(row_to_mailbox_row(&row))
187}
188
189pub(crate) async fn list(pool: &SqlitePool, project_id: Option<&str>) -> Result<Vec<Mailbox>> {
190    let sql = if project_id.is_some() {
191        r"SELECT m.id, m.project_id, m.name, m.port, m.kind, m.ttl_seconds, m.expires_at,
192                 m.failed, m.fail_reason, m.created_at, m.implicit_tls, m.paused,
193                 COALESCE(c.cnt, 0) AS cnt
194          FROM mailboxes m
195          LEFT JOIN (SELECT mailbox_id, COUNT(*) AS cnt FROM emails GROUP BY mailbox_id) c
196                 ON c.mailbox_id = m.id
197          WHERE m.project_id = ?
198          ORDER BY m.created_at ASC"
199    } else {
200        r"SELECT m.id, m.project_id, m.name, m.port, m.kind, m.ttl_seconds, m.expires_at,
201                 m.failed, m.fail_reason, m.created_at, m.implicit_tls, m.paused,
202                 COALESCE(c.cnt, 0) AS cnt
203          FROM mailboxes m
204          LEFT JOIN (SELECT mailbox_id, COUNT(*) AS cnt FROM emails GROUP BY mailbox_id) c
205                 ON c.mailbox_id = m.id
206          ORDER BY m.created_at ASC"
207    };
208
209    let mut q = sqlx::query(sql);
210    if let Some(p) = project_id {
211        q = q.bind(p);
212    }
213    let rows = q.fetch_all(pool).await?;
214
215    Ok(rows
216        .into_iter()
217        .map(|row| {
218            let mb = row_to_mailbox_row(&row);
219            let count: i64 = row.try_get("cnt").unwrap_or(0);
220            mb.with_count(count)
221        })
222        .collect())
223}
224
225pub(crate) async fn count_emails(pool: &SqlitePool, mailbox_id: &str) -> Result<i64> {
226    let row = sqlx::query("SELECT COUNT(*) AS c FROM emails WHERE mailbox_id = ?")
227        .bind(mailbox_id)
228        .fetch_one(pool)
229        .await?;
230    Ok(row.try_get::<i64, _>("c").unwrap_or(0))
231}
232
233pub(crate) async fn update(
234    pool: &SqlitePool,
235    id: &str,
236    patch: &UpdateMailboxInput,
237) -> Result<MailboxRow> {
238    let current = get(pool, id).await?;
239
240    let new_name = patch.name.clone().unwrap_or(current.name.clone());
241    let new_port = patch.port.unwrap_or(current.port);
242    let new_ttl = match patch.ttl_seconds {
243        None => current.ttl_seconds,
244        Some(v) => v,
245    };
246
247    let res = sqlx::query(
248        r"UPDATE mailboxes
249            SET name = ?, port = ?, ttl_seconds = ?
250          WHERE id = ?",
251    )
252    .bind(&new_name)
253    .bind(i64::from(new_port))
254    .bind(new_ttl.map(|t| t as i64))
255    .bind(id)
256    .execute(pool)
257    .await;
258
259    match res {
260        Ok(_) => get(pool, id).await,
261        Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
262            let msg = e.message().to_lowercase();
263            if msg.contains("mailboxes.port") {
264                Err(Error::PortInUse(new_port))
265            } else {
266                Err(Error::DuplicateMailbox(new_name))
267            }
268        }
269        Err(e) => Err(e.into()),
270    }
271}
272
273pub(crate) async fn delete(pool: &SqlitePool, id: &str) -> Result<()> {
274    let res = sqlx::query("DELETE FROM mailboxes WHERE id = ?")
275        .bind(id)
276        .execute(pool)
277        .await?;
278    if res.rows_affected() == 0 {
279        return Err(Error::MailboxNotFound(id.to_string()));
280    }
281    Ok(())
282}
283
284pub(crate) async fn mark_failed(
285    pool: &SqlitePool,
286    id: &str,
287    reason: Option<&str>,
288) -> Result<()> {
289    sqlx::query("UPDATE mailboxes SET failed = 1, fail_reason = ? WHERE id = ?")
290        .bind(reason)
291        .bind(id)
292        .execute(pool)
293        .await?;
294    Ok(())
295}
296
297pub(crate) async fn clear_failed(pool: &SqlitePool, id: &str) -> Result<()> {
298    sqlx::query("UPDATE mailboxes SET failed = 0, fail_reason = NULL WHERE id = ?")
299        .bind(id)
300        .execute(pool)
301        .await?;
302    Ok(())
303}
304
305/// Set the paused flag. Returns Ok even if no row matched — the
306/// service-level layer turns "no row" into `MailboxNotFound` via its
307/// own get() call before the listener-state side effect runs.
308pub(crate) async fn set_paused(pool: &SqlitePool, id: &str, paused: bool) -> Result<()> {
309    sqlx::query("UPDATE mailboxes SET paused = ? WHERE id = ?")
310        .bind(i64::from(paused))
311        .bind(id)
312        .execute(pool)
313        .await?;
314    Ok(())
315}
316
317/// Drop expired ephemerals at boot. Returns IDs that were swept so the
318/// caller can also clean up any orphan raw blobs.
319pub(crate) async fn sweep_expired_ephemerals(pool: &SqlitePool) -> Result<Vec<String>> {
320    let now = Utc::now().timestamp_millis();
321    let rows = sqlx::query(
322        r"SELECT id FROM mailboxes
323           WHERE kind = 'ephemeral'
324             AND (expires_at IS NULL OR expires_at < ?)",
325    )
326    .bind(now)
327    .fetch_all(pool)
328    .await?;
329
330    let ids: Vec<String> = rows
331        .iter()
332        .filter_map(|r| r.try_get::<String, _>("id").ok())
333        .collect();
334
335    if !ids.is_empty() {
336        sqlx::query(
337            r"DELETE FROM mailboxes
338               WHERE kind = 'ephemeral'
339                 AND (expires_at IS NULL OR expires_at < ?)",
340        )
341        .bind(now)
342        .execute(pool)
343        .await?;
344    }
345    Ok(ids)
346}
347
348pub(crate) async fn list_all_ports(pool: &SqlitePool) -> Result<Vec<u16>> {
349    let rows = sqlx::query("SELECT port FROM mailboxes").fetch_all(pool).await?;
350    Ok(rows
351        .into_iter()
352        .filter_map(|r| r.try_get::<i64, _>("port").ok())
353        .map(|n| n as u16)
354        .collect())
355}
356
357pub(crate) async fn list_active_for_boot(pool: &SqlitePool) -> Result<Vec<MailboxRow>> {
358    let rows = sqlx::query(
359        r"SELECT id, project_id, name, port, kind, ttl_seconds, expires_at,
360                 failed, fail_reason, created_at, implicit_tls, paused
361          FROM mailboxes",
362    )
363    .fetch_all(pool)
364    .await?;
365    Ok(rows.iter().map(row_to_mailbox_row).collect())
366}
367
368pub(crate) async fn list_expiring(pool: &SqlitePool) -> Result<Vec<(String, i64)>> {
369    let rows = sqlx::query(
370        r"SELECT id, expires_at FROM mailboxes
371          WHERE kind = 'ephemeral' AND expires_at IS NOT NULL",
372    )
373    .fetch_all(pool)
374    .await?;
375    Ok(rows
376        .into_iter()
377        .filter_map(|r| {
378            let id: String = r.try_get("id").ok()?;
379            let exp: i64 = r.try_get("expires_at").ok()?;
380            Some((id, exp))
381        })
382        .collect())
383}
384
385fn row_to_mailbox_row(row: &sqlx::sqlite::SqliteRow) -> MailboxRow {
386    let kind_str: String = row.try_get("kind").unwrap_or_else(|_| "primary".into());
387    let kind = MailboxKind::from_str(&kind_str).unwrap_or(MailboxKind::Primary);
388    let ttl_seconds: Option<i64> = row.try_get("ttl_seconds").ok();
389    let port_i64: i64 = row.try_get("port").unwrap_or(0);
390    let failed_i: i64 = row.try_get("failed").unwrap_or(0);
391
392    MailboxRow {
393        id: row.try_get("id").unwrap_or_default(),
394        project_id: row.try_get("project_id").unwrap_or_default(),
395        name: row.try_get("name").unwrap_or_default(),
396        port: port_i64 as u16,
397        kind,
398        ttl_seconds: ttl_seconds.map(|t| t as u64),
399        expires_at: row.try_get("expires_at").ok(),
400        failed: failed_i != 0,
401        fail_reason: row.try_get("fail_reason").ok(),
402        created_at: row.try_get("created_at").unwrap_or(0),
403        implicit_tls: row.try_get::<i64, _>("implicit_tls").unwrap_or(0) != 0,
404        paused: row.try_get::<i64, _>("paused").unwrap_or(0) != 0,
405    }
406}