Skip to main content

postcrate_core/db/
emails.rs

1//! Email row storage + FTS5 sync.
2
3use serde::Serialize;
4use sqlx::{Row, SqlitePool};
5use uuid::Uuid;
6
7use crate::db::attachments::AttachmentInsert;
8use crate::error::{Error, Result};
9
10#[derive(Debug, Clone, Serialize)]
11#[cfg_attr(feature = "specta", derive(specta::Type))]
12#[serde(rename_all = "camelCase")]
13pub struct EmailSummary {
14    pub id: String,
15    pub mailbox_id: String,
16    pub received_at: i64,
17    pub from: String,
18    pub to: Vec<String>,
19    pub subject: Option<String>,
20    pub has_html: bool,
21    pub has_text: bool,
22    pub size_bytes: i64,
23    pub read: bool,
24    pub pinned: bool,
25    pub starred: bool,
26    pub tag: Option<String>,
27}
28
29#[derive(Debug, Clone, Serialize)]
30#[cfg_attr(feature = "specta", derive(specta::Type))]
31#[serde(rename_all = "camelCase")]
32pub struct AttachmentMeta {
33    pub id: String,
34    pub filename: Option<String>,
35    pub content_type: Option<String>,
36    pub content_id: Option<String>,
37    pub size_bytes: i64,
38}
39
40#[derive(Debug, Clone, Serialize)]
41#[cfg_attr(feature = "specta", derive(specta::Type))]
42#[serde(rename_all = "camelCase")]
43pub struct EmailDetail {
44    pub id: String,
45    pub mailbox_id: String,
46    pub received_at: i64,
47    pub from: String,
48    pub to: Vec<String>,
49    pub subject: Option<String>,
50    pub has_html: bool,
51    pub has_text: bool,
52    pub size_bytes: i64,
53    pub read: bool,
54    pub pinned: bool,
55    pub starred: bool,
56    pub note: Option<String>,
57    pub tag: Option<String>,
58    pub headers: serde_json::Value,
59    pub text_body: Option<String>,
60    pub html_body: Option<String>,
61    pub attachments: Vec<AttachmentMeta>,
62    pub message_id: Option<String>,
63    pub in_reply_to: Option<String>,
64    pub ext_smtputf8: bool,
65    pub ext_8bitmime: bool,
66}
67
68#[derive(Debug, Clone)]
69pub(crate) struct EmailInsert {
70    pub mailbox_id: String,
71    pub received_at: i64,
72    pub smtp_from: String,
73    pub smtp_to: Vec<String>,
74    pub header_from: Option<String>,
75    pub header_to: Option<String>,
76    pub header_cc: Option<String>,
77    pub header_subject: Option<String>,
78    pub message_id: Option<String>,
79    pub in_reply_to: Option<String>,
80    pub size_bytes: i64,
81    pub has_html: bool,
82    pub has_text: bool,
83    pub raw_path: String,
84    pub parsed_json: serde_json::Value,
85    pub ext_smtputf8: bool,
86    pub ext_8bitmime: bool,
87    pub attachments: Vec<AttachmentInsert>,
88    /// For FTS: searchable body — text part if present, else html stripped.
89    pub fts_body: String,
90    /// Auto-detected category. `None` skips classification.
91    pub tag: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub(crate) struct InsertOutcome {
96    pub id: String,
97    pub summary: EmailSummary,
98}
99
100/// Insert an email + its attachments + FTS row in one transaction.
101pub(crate) async fn insert(pool: &SqlitePool, email: EmailInsert) -> Result<InsertOutcome> {
102    let id = Uuid::new_v4().to_string();
103    let smtp_to_json = serde_json::to_string(&email.smtp_to)?;
104    let parsed_json_str = serde_json::to_string(&email.parsed_json)?;
105    let fts_recipients = email.smtp_to.join(" ");
106    let fts_subject = email.header_subject.clone().unwrap_or_default();
107    let fts_sender = email.smtp_from.clone();
108
109    let mut tx = pool.begin().await?;
110
111    sqlx::query(
112        r"INSERT INTO emails (
113            id, mailbox_id, received_at, smtp_from, smtp_to_json,
114            header_from, header_to, header_cc, header_subject,
115            message_id, in_reply_to,
116            size_bytes, has_html, has_text, raw_path, parsed_json,
117            read_flag, ext_smtputf8, ext_8bitmime, tag
118        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?)",
119    )
120    .bind(&id)
121    .bind(&email.mailbox_id)
122    .bind(email.received_at)
123    .bind(&email.smtp_from)
124    .bind(&smtp_to_json)
125    .bind(&email.header_from)
126    .bind(&email.header_to)
127    .bind(&email.header_cc)
128    .bind(&email.header_subject)
129    .bind(&email.message_id)
130    .bind(&email.in_reply_to)
131    .bind(email.size_bytes)
132    .bind(i64::from(email.has_html))
133    .bind(i64::from(email.has_text))
134    .bind(&email.raw_path)
135    .bind(&parsed_json_str)
136    .bind(i64::from(email.ext_smtputf8))
137    .bind(i64::from(email.ext_8bitmime))
138    .bind(&email.tag)
139    .execute(&mut *tx)
140    .await?;
141
142    for att in &email.attachments {
143        sqlx::query(
144            r"INSERT INTO attachments
145                (id, email_id, filename, content_type, content_id, size_bytes, blob_path)
146              VALUES (?, ?, ?, ?, ?, ?, ?)",
147        )
148        .bind(&att.id)
149        .bind(&id)
150        .bind(&att.filename)
151        .bind(&att.content_type)
152        .bind(&att.content_id)
153        .bind(att.size_bytes)
154        .bind(&att.blob_path)
155        .execute(&mut *tx)
156        .await?;
157    }
158
159    // FTS sync — emails_fts stores `email_id` as an UNINDEXED column
160    // (see migration 0004) so DELETE + SEARCH can both find rows by
161    // the email's UUID without a Rust-side rowid hash.
162    sqlx::query(
163        r"INSERT INTO emails_fts(subject, sender, recipients, body, email_id)
164          VALUES (?, ?, ?, ?, ?)",
165    )
166    .bind(&fts_subject)
167    .bind(&fts_sender)
168    .bind(&fts_recipients)
169    .bind(&email.fts_body)
170    .bind(&id)
171    .execute(&mut *tx)
172    .await?;
173
174    tx.commit().await?;
175
176    let summary = EmailSummary {
177        id: id.clone(),
178        mailbox_id: email.mailbox_id.clone(),
179        received_at: email.received_at,
180        from: email.smtp_from.clone(),
181        to: email.smtp_to.clone(),
182        subject: email.header_subject.clone(),
183        has_html: email.has_html,
184        has_text: email.has_text,
185        size_bytes: email.size_bytes,
186        read: false,
187        pinned: false,
188        starred: false,
189        tag: email.tag.clone(),
190    };
191
192    Ok(InsertOutcome { id, summary })
193}
194
195/// Most-recent `limit` summaries across *all* mailboxes, newest first.
196/// Used by `Service::wait_for_email` when the caller didn't specify a
197/// mailbox filter.
198pub(crate) async fn list_recent_across(
199    pool: &SqlitePool,
200    limit: u32,
201) -> Result<Vec<EmailSummary>> {
202    let rows = sqlx::query(
203        r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
204                 header_subject, has_html, has_text, size_bytes, read_flag,
205                 pinned, starred, tag
206          FROM emails
207          ORDER BY received_at DESC
208          LIMIT ?",
209    )
210    .bind(i64::from(limit))
211    .fetch_all(pool)
212    .await?;
213    let mut out = Vec::with_capacity(rows.len());
214    for row in rows {
215        out.push(row_to_summary(&row)?);
216    }
217    Ok(out)
218}
219
220pub(crate) async fn list(
221    pool: &SqlitePool,
222    mailbox_id: &str,
223    limit: u32,
224    offset: u32,
225) -> Result<Vec<EmailSummary>> {
226    // Pinned emails sort first regardless of received_at.
227    let rows = sqlx::query(
228        r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
229                 header_subject, has_html, has_text, size_bytes, read_flag,
230                 pinned, starred, tag
231          FROM emails
232          WHERE mailbox_id = ?
233          ORDER BY pinned DESC, received_at DESC
234          LIMIT ? OFFSET ?",
235    )
236    .bind(mailbox_id)
237    .bind(i64::from(limit))
238    .bind(i64::from(offset))
239    .fetch_all(pool)
240    .await?;
241
242    let mut out = Vec::with_capacity(rows.len());
243    for row in rows {
244        out.push(row_to_summary(&row)?);
245    }
246    Ok(out)
247}
248
249pub(crate) async fn get_detail(pool: &SqlitePool, id: &str) -> Result<EmailDetail> {
250    let row = sqlx::query(
251        r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
252                 header_subject, message_id, in_reply_to,
253                 has_html, has_text, size_bytes, parsed_json, read_flag,
254                 ext_smtputf8, ext_8bitmime, pinned, starred, note, tag
255          FROM emails WHERE id = ?",
256    )
257    .bind(id)
258    .fetch_optional(pool)
259    .await?
260    .ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
261
262    let parsed_json_str: String = row.try_get("parsed_json").unwrap_or_default();
263    let parsed: serde_json::Value =
264        serde_json::from_str(&parsed_json_str).unwrap_or(serde_json::Value::Null);
265
266    let attachments = crate::db::attachments::list_for_email(pool, id).await?;
267
268    let smtp_to_json: String = row.try_get("smtp_to_json").unwrap_or_default();
269    let to: Vec<String> = serde_json::from_str(&smtp_to_json).unwrap_or_default();
270
271    let headers = parsed.get("headers").cloned().unwrap_or(serde_json::Value::Null);
272    let text_body = parsed
273        .get("text_body")
274        .and_then(|v| v.as_str())
275        .map(|s| s.to_string());
276    let html_body = parsed
277        .get("html_body")
278        .and_then(|v| v.as_str())
279        .map(|s| s.to_string());
280
281    Ok(EmailDetail {
282        id: row.try_get("id").unwrap_or_default(),
283        mailbox_id: row.try_get("mailbox_id").unwrap_or_default(),
284        received_at: row.try_get("received_at").unwrap_or(0),
285        from: row.try_get("smtp_from").unwrap_or_default(),
286        to,
287        subject: row.try_get("header_subject").ok(),
288        has_html: row.try_get::<i64, _>("has_html").unwrap_or(0) != 0,
289        has_text: row.try_get::<i64, _>("has_text").unwrap_or(0) != 0,
290        size_bytes: row.try_get("size_bytes").unwrap_or(0),
291        read: row.try_get::<i64, _>("read_flag").unwrap_or(0) != 0,
292        headers,
293        text_body,
294        html_body,
295        attachments,
296        message_id: row.try_get("message_id").ok(),
297        in_reply_to: row.try_get("in_reply_to").ok(),
298        ext_smtputf8: row.try_get::<i64, _>("ext_smtputf8").unwrap_or(0) != 0,
299        ext_8bitmime: row.try_get::<i64, _>("ext_8bitmime").unwrap_or(0) != 0,
300        pinned: row.try_get::<i64, _>("pinned").unwrap_or(0) != 0,
301        starred: row.try_get::<i64, _>("starred").unwrap_or(0) != 0,
302        note: row
303            .try_get::<Option<String>, _>("note")
304            .ok()
305            .flatten(),
306        tag: row
307            .try_get::<Option<String>, _>("tag")
308            .ok()
309            .flatten(),
310    })
311}
312
313pub(crate) async fn get_raw_path(pool: &SqlitePool, id: &str) -> Result<String> {
314    let row = sqlx::query("SELECT raw_path FROM emails WHERE id = ?")
315        .bind(id)
316        .fetch_optional(pool)
317        .await?
318        .ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
319    Ok(row.try_get::<String, _>("raw_path").unwrap_or_default())
320}
321
322pub(crate) async fn delete(pool: &SqlitePool, id: &str) -> Result<String> {
323    let mut tx = pool.begin().await?;
324    let raw_path: Option<String> = sqlx::query("SELECT raw_path FROM emails WHERE id = ?")
325        .bind(id)
326        .fetch_optional(&mut *tx)
327        .await?
328        .and_then(|r| r.try_get("raw_path").ok());
329    let raw_path = raw_path.ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
330
331    sqlx::query("DELETE FROM emails WHERE id = ?")
332        .bind(id)
333        .execute(&mut *tx)
334        .await?;
335    sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
336        .bind(id)
337        .execute(&mut *tx)
338        .await?;
339    tx.commit().await?;
340    Ok(raw_path)
341}
342
343/// Clear a mailbox. Returns (deleted_count, raw_paths_to_delete).
344///
345/// When `preserve_pinned` is true (the default in the Service layer),
346/// rows with `pinned = 1` survive — mandates that pin acts
347/// as a "keep this across inbox clears" marker.
348pub(crate) async fn clear_mailbox(
349    pool: &SqlitePool,
350    mailbox_id: &str,
351    preserve_pinned: bool,
352) -> Result<(u64, Vec<String>)> {
353    let mut tx = pool.begin().await?;
354    let select_sql = if preserve_pinned {
355        "SELECT id, raw_path FROM emails WHERE mailbox_id = ? AND pinned = 0"
356    } else {
357        "SELECT id, raw_path FROM emails WHERE mailbox_id = ?"
358    };
359    let rows = sqlx::query(select_sql)
360        .bind(mailbox_id)
361        .fetch_all(&mut *tx)
362        .await?;
363    let mut paths = Vec::with_capacity(rows.len());
364    for r in &rows {
365        let id: String = r.try_get("id").unwrap_or_default();
366        let path: String = r.try_get("raw_path").unwrap_or_default();
367        sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
368            .bind(&id)
369            .execute(&mut *tx)
370            .await?;
371        paths.push(path);
372    }
373    let delete_sql = if preserve_pinned {
374        "DELETE FROM emails WHERE mailbox_id = ? AND pinned = 0"
375    } else {
376        "DELETE FROM emails WHERE mailbox_id = ?"
377    };
378    let res = sqlx::query(delete_sql)
379        .bind(mailbox_id)
380        .execute(&mut *tx)
381        .await?;
382    tx.commit().await?;
383    Ok((res.rows_affected(), paths))
384}
385
386pub(crate) async fn mark_read(pool: &SqlitePool, id: &str, read: bool) -> Result<()> {
387    let res = sqlx::query("UPDATE emails SET read_flag = ? WHERE id = ?")
388        .bind(i64::from(read))
389        .bind(id)
390        .execute(pool)
391        .await?;
392    if res.rows_affected() == 0 {
393        return Err(Error::EmailNotFound(id.to_string()));
394    }
395    Ok(())
396}
397
398pub(crate) async fn search(
399    pool: &SqlitePool,
400    q: &str,
401    mailbox_id: Option<&str>,
402    limit: u32,
403) -> Result<Vec<EmailSummary>> {
404    let cleaned = sanitize_fts(q);
405    if cleaned.is_empty() {
406        return Ok(Vec::new());
407    }
408    let fts_query = build_fts_query(&cleaned);
409    if fts_query.is_empty() {
410        return Ok(Vec::new());
411    }
412
413    let rows = if let Some(mb) = mailbox_id {
414        sqlx::query(
415            r"SELECT e.id, e.mailbox_id, e.received_at, e.smtp_from, e.smtp_to_json,
416                     e.header_subject, e.has_html, e.has_text, e.size_bytes, e.read_flag
417              FROM emails_fts f
418              JOIN emails e ON e.id = f.email_id
419              WHERE emails_fts MATCH ?1 AND e.mailbox_id = ?2
420              ORDER BY e.received_at DESC
421              LIMIT ?3",
422        )
423        .bind(&fts_query)
424        .bind(mb)
425        .bind(i64::from(limit))
426        .fetch_all(pool)
427        .await?
428    } else {
429        sqlx::query(
430            r"SELECT e.id, e.mailbox_id, e.received_at, e.smtp_from, e.smtp_to_json,
431                     e.header_subject, e.has_html, e.has_text, e.size_bytes, e.read_flag
432              FROM emails_fts f
433              JOIN emails e ON e.id = f.email_id
434              WHERE emails_fts MATCH ?1
435              ORDER BY e.received_at DESC
436              LIMIT ?2",
437        )
438        .bind(&fts_query)
439        .bind(i64::from(limit))
440        .fetch_all(pool)
441        .await?
442    };
443
444    let mut out = Vec::with_capacity(rows.len());
445    for row in rows {
446        out.push(row_to_summary(&row)?);
447    }
448    Ok(out)
449}
450
451/// IDs of emails older than `cutoff_ms` (used by retention).
452pub(crate) async fn list_older_than(
453    pool: &SqlitePool,
454    cutoff_ms: i64,
455) -> Result<Vec<(String, String, String)>> {
456    let rows = sqlx::query(
457        r"SELECT id, mailbox_id, raw_path FROM emails WHERE received_at < ?",
458    )
459    .bind(cutoff_ms)
460    .fetch_all(pool)
461    .await?;
462    Ok(rows
463        .into_iter()
464        .map(|r| {
465            (
466                r.try_get("id").unwrap_or_default(),
467                r.try_get("mailbox_id").unwrap_or_default(),
468                r.try_get("raw_path").unwrap_or_default(),
469            )
470        })
471        .collect())
472}
473
474/// Trim a mailbox down to `keep_max` newest rows; return ids/paths to remove.
475///
476/// Pinned rows are never trimmed — they don't count toward `keep_max`
477/// and they're excluded from the candidate-for-deletion set. This
478/// (pin survives retention).
479pub(crate) async fn trim_mailbox(
480    pool: &SqlitePool,
481    mailbox_id: &str,
482    keep_max: i64,
483) -> Result<Vec<(String, String)>> {
484    let rows = sqlx::query(
485        r"SELECT id, raw_path FROM emails
486          WHERE mailbox_id = ?
487            AND pinned = 0
488            AND id NOT IN (
489                SELECT id FROM emails
490                WHERE mailbox_id = ?
491                  AND pinned = 0
492                ORDER BY received_at DESC
493                LIMIT ?
494            )",
495    )
496    .bind(mailbox_id)
497    .bind(mailbox_id)
498    .bind(keep_max)
499    .fetch_all(pool)
500    .await?;
501
502    Ok(rows
503        .into_iter()
504        .map(|r| {
505            (
506                r.try_get("id").unwrap_or_default(),
507                r.try_get("raw_path").unwrap_or_default(),
508            )
509        })
510        .collect())
511}
512
513pub(crate) async fn delete_by_ids(pool: &SqlitePool, ids: &[String]) -> Result<()> {
514    if ids.is_empty() {
515        return Ok(());
516    }
517    let mut tx = pool.begin().await?;
518    for id in ids {
519        sqlx::query("DELETE FROM emails WHERE id = ?")
520            .bind(id)
521            .execute(&mut *tx)
522            .await?;
523        sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
524            .bind(id)
525            .execute(&mut *tx)
526            .await?;
527    }
528    tx.commit().await?;
529    Ok(())
530}
531
532pub(crate) async fn list_all_raw_paths(pool: &SqlitePool) -> Result<Vec<String>> {
533    let rows = sqlx::query("SELECT raw_path FROM emails").fetch_all(pool).await?;
534    Ok(rows
535        .into_iter()
536        .filter_map(|r| r.try_get("raw_path").ok())
537        .collect())
538}
539
540fn row_to_summary(row: &sqlx::sqlite::SqliteRow) -> Result<EmailSummary> {
541    let smtp_to_json: String = row.try_get("smtp_to_json").unwrap_or_default();
542    let to: Vec<String> = serde_json::from_str(&smtp_to_json).unwrap_or_default();
543    Ok(EmailSummary {
544        id: row.try_get("id").unwrap_or_default(),
545        mailbox_id: row.try_get("mailbox_id").unwrap_or_default(),
546        received_at: row.try_get("received_at").unwrap_or(0),
547        from: row.try_get("smtp_from").unwrap_or_default(),
548        to,
549        subject: row.try_get("header_subject").ok(),
550        has_html: row.try_get::<i64, _>("has_html").unwrap_or(0) != 0,
551        has_text: row.try_get::<i64, _>("has_text").unwrap_or(0) != 0,
552        size_bytes: row.try_get("size_bytes").unwrap_or(0),
553        read: row.try_get::<i64, _>("read_flag").unwrap_or(0) != 0,
554        pinned: row.try_get::<i64, _>("pinned").unwrap_or(0) != 0,
555        starred: row.try_get::<i64, _>("starred").unwrap_or(0) != 0,
556        tag: row
557            .try_get::<Option<String>, _>("tag")
558            .ok()
559            .flatten(),
560    })
561}
562
563pub(crate) async fn set_pinned(pool: &SqlitePool, id: &str, pinned: bool) -> Result<()> {
564    let res = sqlx::query("UPDATE emails SET pinned = ? WHERE id = ?")
565        .bind(i64::from(pinned))
566        .bind(id)
567        .execute(pool)
568        .await?;
569    if res.rows_affected() == 0 {
570        return Err(Error::EmailNotFound(id.to_string()));
571    }
572    Ok(())
573}
574
575pub(crate) async fn set_starred(pool: &SqlitePool, id: &str, starred: bool) -> Result<()> {
576    let res = sqlx::query("UPDATE emails SET starred = ? WHERE id = ?")
577        .bind(i64::from(starred))
578        .bind(id)
579        .execute(pool)
580        .await?;
581    if res.rows_affected() == 0 {
582        return Err(Error::EmailNotFound(id.to_string()));
583    }
584    Ok(())
585}
586
587pub(crate) async fn set_note(pool: &SqlitePool, id: &str, note: Option<&str>) -> Result<()> {
588    let res = sqlx::query("UPDATE emails SET note = ? WHERE id = ?")
589        .bind(note)
590        .bind(id)
591        .execute(pool)
592        .await?;
593    if res.rows_affected() == 0 {
594        return Err(Error::EmailNotFound(id.to_string()));
595    }
596    Ok(())
597}
598
599pub(crate) async fn set_tag(pool: &SqlitePool, id: &str, tag: Option<&str>) -> Result<()> {
600    let res = sqlx::query("UPDATE emails SET tag = ? WHERE id = ?")
601        .bind(tag)
602        .bind(id)
603        .execute(pool)
604        .await?;
605    if res.rows_affected() == 0 {
606        return Err(Error::EmailNotFound(id.to_string()));
607    }
608    Ok(())
609}
610
611/// Drop anything that could be interpreted as FTS5 query syntax. We
612/// keep `.` `@` `_` because they appear in addresses and identifiers;
613/// the tokenizer treats them as separators anyway. We deliberately
614/// drop `-` because in an FTS5 expression an unquoted `-` is the NOT
615/// operator (`foo -bar` ≡ "foo AND NOT bar"), which surprises users
616/// who type hyphenated terms like `password-reset`.
617fn sanitize_fts(q: &str) -> String {
618    q.chars()
619        .filter(|c| c.is_alphanumeric() || c.is_whitespace() || matches!(*c, '.' | '@' | '_'))
620        .collect::<String>()
621        .trim()
622        .to_string()
623}
624
625/// Turn the sanitized query into an FTS5 MATCH expression. Each token
626/// becomes a prefix term (`foo*`) so partial words match — `"alic"`
627/// finds emails containing "alice". Tokens combine with AND (the FTS5
628/// default), so multi-word queries narrow the result set.
629fn build_fts_query(cleaned: &str) -> String {
630    cleaned
631        .split_whitespace()
632        .filter(|t| !t.is_empty())
633        .map(|t| format!("{t}*"))
634        .collect::<Vec<_>>()
635        .join(" ")
636}