1use serde::Serialize;
4use sqlx::{Row, SqlitePool};
5use uuid::Uuid;
6
7use crate::db::attachments::AttachmentInsert;
8use crate::error::{Error, Result};
9
10#[derive(Debug, Clone, Serialize)]
11#[cfg_attr(feature = "specta", derive(specta::Type))]
12#[serde(rename_all = "camelCase")]
13pub struct EmailSummary {
14 pub id: String,
15 pub mailbox_id: String,
16 pub received_at: i64,
17 pub from: String,
18 pub to: Vec<String>,
19 pub subject: Option<String>,
20 pub has_html: bool,
21 pub has_text: bool,
22 pub size_bytes: i64,
23 pub read: bool,
24 pub pinned: bool,
25 pub starred: bool,
26 pub tag: Option<String>,
27}
28
29#[derive(Debug, Clone, Serialize)]
30#[cfg_attr(feature = "specta", derive(specta::Type))]
31#[serde(rename_all = "camelCase")]
32pub struct AttachmentMeta {
33 pub id: String,
34 pub filename: Option<String>,
35 pub content_type: Option<String>,
36 pub content_id: Option<String>,
37 pub size_bytes: i64,
38}
39
40#[derive(Debug, Clone, Serialize)]
41#[cfg_attr(feature = "specta", derive(specta::Type))]
42#[serde(rename_all = "camelCase")]
43pub struct EmailDetail {
44 pub id: String,
45 pub mailbox_id: String,
46 pub received_at: i64,
47 pub from: String,
48 pub to: Vec<String>,
49 pub subject: Option<String>,
50 pub has_html: bool,
51 pub has_text: bool,
52 pub size_bytes: i64,
53 pub read: bool,
54 pub pinned: bool,
55 pub starred: bool,
56 pub note: Option<String>,
57 pub tag: Option<String>,
58 pub headers: serde_json::Value,
59 pub text_body: Option<String>,
60 pub html_body: Option<String>,
61 pub attachments: Vec<AttachmentMeta>,
62 pub message_id: Option<String>,
63 pub in_reply_to: Option<String>,
64 pub ext_smtputf8: bool,
65 pub ext_8bitmime: bool,
66}
67
68#[derive(Debug, Clone)]
69pub(crate) struct EmailInsert {
70 pub mailbox_id: String,
71 pub received_at: i64,
72 pub smtp_from: String,
73 pub smtp_to: Vec<String>,
74 pub header_from: Option<String>,
75 pub header_to: Option<String>,
76 pub header_cc: Option<String>,
77 pub header_subject: Option<String>,
78 pub message_id: Option<String>,
79 pub in_reply_to: Option<String>,
80 pub size_bytes: i64,
81 pub has_html: bool,
82 pub has_text: bool,
83 pub raw_path: String,
84 pub parsed_json: serde_json::Value,
85 pub ext_smtputf8: bool,
86 pub ext_8bitmime: bool,
87 pub attachments: Vec<AttachmentInsert>,
88 pub fts_body: String,
90 pub tag: Option<String>,
92}
93
94#[derive(Debug, Clone)]
95pub(crate) struct InsertOutcome {
96 pub id: String,
97 pub summary: EmailSummary,
98}
99
100pub(crate) async fn insert(pool: &SqlitePool, email: EmailInsert) -> Result<InsertOutcome> {
102 let id = Uuid::new_v4().to_string();
103 let smtp_to_json = serde_json::to_string(&email.smtp_to)?;
104 let parsed_json_str = serde_json::to_string(&email.parsed_json)?;
105 let fts_recipients = email.smtp_to.join(" ");
106 let fts_subject = email.header_subject.clone().unwrap_or_default();
107 let fts_sender = email.smtp_from.clone();
108
109 let mut tx = pool.begin().await?;
110
111 sqlx::query(
112 r"INSERT INTO emails (
113 id, mailbox_id, received_at, smtp_from, smtp_to_json,
114 header_from, header_to, header_cc, header_subject,
115 message_id, in_reply_to,
116 size_bytes, has_html, has_text, raw_path, parsed_json,
117 read_flag, ext_smtputf8, ext_8bitmime, tag
118 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?)",
119 )
120 .bind(&id)
121 .bind(&email.mailbox_id)
122 .bind(email.received_at)
123 .bind(&email.smtp_from)
124 .bind(&smtp_to_json)
125 .bind(&email.header_from)
126 .bind(&email.header_to)
127 .bind(&email.header_cc)
128 .bind(&email.header_subject)
129 .bind(&email.message_id)
130 .bind(&email.in_reply_to)
131 .bind(email.size_bytes)
132 .bind(i64::from(email.has_html))
133 .bind(i64::from(email.has_text))
134 .bind(&email.raw_path)
135 .bind(&parsed_json_str)
136 .bind(i64::from(email.ext_smtputf8))
137 .bind(i64::from(email.ext_8bitmime))
138 .bind(&email.tag)
139 .execute(&mut *tx)
140 .await?;
141
142 for att in &email.attachments {
143 sqlx::query(
144 r"INSERT INTO attachments
145 (id, email_id, filename, content_type, content_id, size_bytes, blob_path)
146 VALUES (?, ?, ?, ?, ?, ?, ?)",
147 )
148 .bind(&att.id)
149 .bind(&id)
150 .bind(&att.filename)
151 .bind(&att.content_type)
152 .bind(&att.content_id)
153 .bind(att.size_bytes)
154 .bind(&att.blob_path)
155 .execute(&mut *tx)
156 .await?;
157 }
158
159 sqlx::query(
163 r"INSERT INTO emails_fts(subject, sender, recipients, body, email_id)
164 VALUES (?, ?, ?, ?, ?)",
165 )
166 .bind(&fts_subject)
167 .bind(&fts_sender)
168 .bind(&fts_recipients)
169 .bind(&email.fts_body)
170 .bind(&id)
171 .execute(&mut *tx)
172 .await?;
173
174 tx.commit().await?;
175
176 let summary = EmailSummary {
177 id: id.clone(),
178 mailbox_id: email.mailbox_id.clone(),
179 received_at: email.received_at,
180 from: email.smtp_from.clone(),
181 to: email.smtp_to.clone(),
182 subject: email.header_subject.clone(),
183 has_html: email.has_html,
184 has_text: email.has_text,
185 size_bytes: email.size_bytes,
186 read: false,
187 pinned: false,
188 starred: false,
189 tag: email.tag.clone(),
190 };
191
192 Ok(InsertOutcome { id, summary })
193}
194
195pub(crate) async fn list_recent_across(
199 pool: &SqlitePool,
200 limit: u32,
201) -> Result<Vec<EmailSummary>> {
202 let rows = sqlx::query(
203 r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
204 header_subject, has_html, has_text, size_bytes, read_flag,
205 pinned, starred, tag
206 FROM emails
207 ORDER BY received_at DESC
208 LIMIT ?",
209 )
210 .bind(i64::from(limit))
211 .fetch_all(pool)
212 .await?;
213 let mut out = Vec::with_capacity(rows.len());
214 for row in rows {
215 out.push(row_to_summary(&row)?);
216 }
217 Ok(out)
218}
219
220pub(crate) async fn list(
221 pool: &SqlitePool,
222 mailbox_id: &str,
223 limit: u32,
224 offset: u32,
225) -> Result<Vec<EmailSummary>> {
226 let rows = sqlx::query(
228 r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
229 header_subject, has_html, has_text, size_bytes, read_flag,
230 pinned, starred, tag
231 FROM emails
232 WHERE mailbox_id = ?
233 ORDER BY pinned DESC, received_at DESC
234 LIMIT ? OFFSET ?",
235 )
236 .bind(mailbox_id)
237 .bind(i64::from(limit))
238 .bind(i64::from(offset))
239 .fetch_all(pool)
240 .await?;
241
242 let mut out = Vec::with_capacity(rows.len());
243 for row in rows {
244 out.push(row_to_summary(&row)?);
245 }
246 Ok(out)
247}
248
249pub(crate) async fn get_detail(pool: &SqlitePool, id: &str) -> Result<EmailDetail> {
250 let row = sqlx::query(
251 r"SELECT id, mailbox_id, received_at, smtp_from, smtp_to_json,
252 header_subject, message_id, in_reply_to,
253 has_html, has_text, size_bytes, parsed_json, read_flag,
254 ext_smtputf8, ext_8bitmime, pinned, starred, note, tag
255 FROM emails WHERE id = ?",
256 )
257 .bind(id)
258 .fetch_optional(pool)
259 .await?
260 .ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
261
262 let parsed_json_str: String = row.try_get("parsed_json").unwrap_or_default();
263 let parsed: serde_json::Value =
264 serde_json::from_str(&parsed_json_str).unwrap_or(serde_json::Value::Null);
265
266 let attachments = crate::db::attachments::list_for_email(pool, id).await?;
267
268 let smtp_to_json: String = row.try_get("smtp_to_json").unwrap_or_default();
269 let to: Vec<String> = serde_json::from_str(&smtp_to_json).unwrap_or_default();
270
271 let headers = parsed.get("headers").cloned().unwrap_or(serde_json::Value::Null);
272 let text_body = parsed
273 .get("text_body")
274 .and_then(|v| v.as_str())
275 .map(|s| s.to_string());
276 let html_body = parsed
277 .get("html_body")
278 .and_then(|v| v.as_str())
279 .map(|s| s.to_string());
280
281 Ok(EmailDetail {
282 id: row.try_get("id").unwrap_or_default(),
283 mailbox_id: row.try_get("mailbox_id").unwrap_or_default(),
284 received_at: row.try_get("received_at").unwrap_or(0),
285 from: row.try_get("smtp_from").unwrap_or_default(),
286 to,
287 subject: row.try_get("header_subject").ok(),
288 has_html: row.try_get::<i64, _>("has_html").unwrap_or(0) != 0,
289 has_text: row.try_get::<i64, _>("has_text").unwrap_or(0) != 0,
290 size_bytes: row.try_get("size_bytes").unwrap_or(0),
291 read: row.try_get::<i64, _>("read_flag").unwrap_or(0) != 0,
292 headers,
293 text_body,
294 html_body,
295 attachments,
296 message_id: row.try_get("message_id").ok(),
297 in_reply_to: row.try_get("in_reply_to").ok(),
298 ext_smtputf8: row.try_get::<i64, _>("ext_smtputf8").unwrap_or(0) != 0,
299 ext_8bitmime: row.try_get::<i64, _>("ext_8bitmime").unwrap_or(0) != 0,
300 pinned: row.try_get::<i64, _>("pinned").unwrap_or(0) != 0,
301 starred: row.try_get::<i64, _>("starred").unwrap_or(0) != 0,
302 note: row
303 .try_get::<Option<String>, _>("note")
304 .ok()
305 .flatten(),
306 tag: row
307 .try_get::<Option<String>, _>("tag")
308 .ok()
309 .flatten(),
310 })
311}
312
313pub(crate) async fn get_raw_path(pool: &SqlitePool, id: &str) -> Result<String> {
314 let row = sqlx::query("SELECT raw_path FROM emails WHERE id = ?")
315 .bind(id)
316 .fetch_optional(pool)
317 .await?
318 .ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
319 Ok(row.try_get::<String, _>("raw_path").unwrap_or_default())
320}
321
322pub(crate) async fn delete(pool: &SqlitePool, id: &str) -> Result<String> {
323 let mut tx = pool.begin().await?;
324 let raw_path: Option<String> = sqlx::query("SELECT raw_path FROM emails WHERE id = ?")
325 .bind(id)
326 .fetch_optional(&mut *tx)
327 .await?
328 .and_then(|r| r.try_get("raw_path").ok());
329 let raw_path = raw_path.ok_or_else(|| Error::EmailNotFound(id.to_string()))?;
330
331 sqlx::query("DELETE FROM emails WHERE id = ?")
332 .bind(id)
333 .execute(&mut *tx)
334 .await?;
335 sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
336 .bind(id)
337 .execute(&mut *tx)
338 .await?;
339 tx.commit().await?;
340 Ok(raw_path)
341}
342
343pub(crate) async fn clear_mailbox(
349 pool: &SqlitePool,
350 mailbox_id: &str,
351 preserve_pinned: bool,
352) -> Result<(u64, Vec<String>)> {
353 let mut tx = pool.begin().await?;
354 let select_sql = if preserve_pinned {
355 "SELECT id, raw_path FROM emails WHERE mailbox_id = ? AND pinned = 0"
356 } else {
357 "SELECT id, raw_path FROM emails WHERE mailbox_id = ?"
358 };
359 let rows = sqlx::query(select_sql)
360 .bind(mailbox_id)
361 .fetch_all(&mut *tx)
362 .await?;
363 let mut paths = Vec::with_capacity(rows.len());
364 for r in &rows {
365 let id: String = r.try_get("id").unwrap_or_default();
366 let path: String = r.try_get("raw_path").unwrap_or_default();
367 sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
368 .bind(&id)
369 .execute(&mut *tx)
370 .await?;
371 paths.push(path);
372 }
373 let delete_sql = if preserve_pinned {
374 "DELETE FROM emails WHERE mailbox_id = ? AND pinned = 0"
375 } else {
376 "DELETE FROM emails WHERE mailbox_id = ?"
377 };
378 let res = sqlx::query(delete_sql)
379 .bind(mailbox_id)
380 .execute(&mut *tx)
381 .await?;
382 tx.commit().await?;
383 Ok((res.rows_affected(), paths))
384}
385
386pub(crate) async fn mark_read(pool: &SqlitePool, id: &str, read: bool) -> Result<()> {
387 let res = sqlx::query("UPDATE emails SET read_flag = ? WHERE id = ?")
388 .bind(i64::from(read))
389 .bind(id)
390 .execute(pool)
391 .await?;
392 if res.rows_affected() == 0 {
393 return Err(Error::EmailNotFound(id.to_string()));
394 }
395 Ok(())
396}
397
398pub(crate) async fn search(
399 pool: &SqlitePool,
400 q: &str,
401 mailbox_id: Option<&str>,
402 limit: u32,
403) -> Result<Vec<EmailSummary>> {
404 let cleaned = sanitize_fts(q);
405 if cleaned.is_empty() {
406 return Ok(Vec::new());
407 }
408 let fts_query = build_fts_query(&cleaned);
409 if fts_query.is_empty() {
410 return Ok(Vec::new());
411 }
412
413 let rows = if let Some(mb) = mailbox_id {
414 sqlx::query(
415 r"SELECT e.id, e.mailbox_id, e.received_at, e.smtp_from, e.smtp_to_json,
416 e.header_subject, e.has_html, e.has_text, e.size_bytes, e.read_flag
417 FROM emails_fts f
418 JOIN emails e ON e.id = f.email_id
419 WHERE emails_fts MATCH ?1 AND e.mailbox_id = ?2
420 ORDER BY e.received_at DESC
421 LIMIT ?3",
422 )
423 .bind(&fts_query)
424 .bind(mb)
425 .bind(i64::from(limit))
426 .fetch_all(pool)
427 .await?
428 } else {
429 sqlx::query(
430 r"SELECT e.id, e.mailbox_id, e.received_at, e.smtp_from, e.smtp_to_json,
431 e.header_subject, e.has_html, e.has_text, e.size_bytes, e.read_flag
432 FROM emails_fts f
433 JOIN emails e ON e.id = f.email_id
434 WHERE emails_fts MATCH ?1
435 ORDER BY e.received_at DESC
436 LIMIT ?2",
437 )
438 .bind(&fts_query)
439 .bind(i64::from(limit))
440 .fetch_all(pool)
441 .await?
442 };
443
444 let mut out = Vec::with_capacity(rows.len());
445 for row in rows {
446 out.push(row_to_summary(&row)?);
447 }
448 Ok(out)
449}
450
451pub(crate) async fn list_older_than(
453 pool: &SqlitePool,
454 cutoff_ms: i64,
455) -> Result<Vec<(String, String, String)>> {
456 let rows = sqlx::query(
457 r"SELECT id, mailbox_id, raw_path FROM emails WHERE received_at < ?",
458 )
459 .bind(cutoff_ms)
460 .fetch_all(pool)
461 .await?;
462 Ok(rows
463 .into_iter()
464 .map(|r| {
465 (
466 r.try_get("id").unwrap_or_default(),
467 r.try_get("mailbox_id").unwrap_or_default(),
468 r.try_get("raw_path").unwrap_or_default(),
469 )
470 })
471 .collect())
472}
473
474pub(crate) async fn trim_mailbox(
480 pool: &SqlitePool,
481 mailbox_id: &str,
482 keep_max: i64,
483) -> Result<Vec<(String, String)>> {
484 let rows = sqlx::query(
485 r"SELECT id, raw_path FROM emails
486 WHERE mailbox_id = ?
487 AND pinned = 0
488 AND id NOT IN (
489 SELECT id FROM emails
490 WHERE mailbox_id = ?
491 AND pinned = 0
492 ORDER BY received_at DESC
493 LIMIT ?
494 )",
495 )
496 .bind(mailbox_id)
497 .bind(mailbox_id)
498 .bind(keep_max)
499 .fetch_all(pool)
500 .await?;
501
502 Ok(rows
503 .into_iter()
504 .map(|r| {
505 (
506 r.try_get("id").unwrap_or_default(),
507 r.try_get("raw_path").unwrap_or_default(),
508 )
509 })
510 .collect())
511}
512
513pub(crate) async fn delete_by_ids(pool: &SqlitePool, ids: &[String]) -> Result<()> {
514 if ids.is_empty() {
515 return Ok(());
516 }
517 let mut tx = pool.begin().await?;
518 for id in ids {
519 sqlx::query("DELETE FROM emails WHERE id = ?")
520 .bind(id)
521 .execute(&mut *tx)
522 .await?;
523 sqlx::query("DELETE FROM emails_fts WHERE email_id = ?")
524 .bind(id)
525 .execute(&mut *tx)
526 .await?;
527 }
528 tx.commit().await?;
529 Ok(())
530}
531
532pub(crate) async fn list_all_raw_paths(pool: &SqlitePool) -> Result<Vec<String>> {
533 let rows = sqlx::query("SELECT raw_path FROM emails").fetch_all(pool).await?;
534 Ok(rows
535 .into_iter()
536 .filter_map(|r| r.try_get("raw_path").ok())
537 .collect())
538}
539
540fn row_to_summary(row: &sqlx::sqlite::SqliteRow) -> Result<EmailSummary> {
541 let smtp_to_json: String = row.try_get("smtp_to_json").unwrap_or_default();
542 let to: Vec<String> = serde_json::from_str(&smtp_to_json).unwrap_or_default();
543 Ok(EmailSummary {
544 id: row.try_get("id").unwrap_or_default(),
545 mailbox_id: row.try_get("mailbox_id").unwrap_or_default(),
546 received_at: row.try_get("received_at").unwrap_or(0),
547 from: row.try_get("smtp_from").unwrap_or_default(),
548 to,
549 subject: row.try_get("header_subject").ok(),
550 has_html: row.try_get::<i64, _>("has_html").unwrap_or(0) != 0,
551 has_text: row.try_get::<i64, _>("has_text").unwrap_or(0) != 0,
552 size_bytes: row.try_get("size_bytes").unwrap_or(0),
553 read: row.try_get::<i64, _>("read_flag").unwrap_or(0) != 0,
554 pinned: row.try_get::<i64, _>("pinned").unwrap_or(0) != 0,
555 starred: row.try_get::<i64, _>("starred").unwrap_or(0) != 0,
556 tag: row
557 .try_get::<Option<String>, _>("tag")
558 .ok()
559 .flatten(),
560 })
561}
562
563pub(crate) async fn set_pinned(pool: &SqlitePool, id: &str, pinned: bool) -> Result<()> {
564 let res = sqlx::query("UPDATE emails SET pinned = ? WHERE id = ?")
565 .bind(i64::from(pinned))
566 .bind(id)
567 .execute(pool)
568 .await?;
569 if res.rows_affected() == 0 {
570 return Err(Error::EmailNotFound(id.to_string()));
571 }
572 Ok(())
573}
574
575pub(crate) async fn set_starred(pool: &SqlitePool, id: &str, starred: bool) -> Result<()> {
576 let res = sqlx::query("UPDATE emails SET starred = ? WHERE id = ?")
577 .bind(i64::from(starred))
578 .bind(id)
579 .execute(pool)
580 .await?;
581 if res.rows_affected() == 0 {
582 return Err(Error::EmailNotFound(id.to_string()));
583 }
584 Ok(())
585}
586
587pub(crate) async fn set_note(pool: &SqlitePool, id: &str, note: Option<&str>) -> Result<()> {
588 let res = sqlx::query("UPDATE emails SET note = ? WHERE id = ?")
589 .bind(note)
590 .bind(id)
591 .execute(pool)
592 .await?;
593 if res.rows_affected() == 0 {
594 return Err(Error::EmailNotFound(id.to_string()));
595 }
596 Ok(())
597}
598
599pub(crate) async fn set_tag(pool: &SqlitePool, id: &str, tag: Option<&str>) -> Result<()> {
600 let res = sqlx::query("UPDATE emails SET tag = ? WHERE id = ?")
601 .bind(tag)
602 .bind(id)
603 .execute(pool)
604 .await?;
605 if res.rows_affected() == 0 {
606 return Err(Error::EmailNotFound(id.to_string()));
607 }
608 Ok(())
609}
610
611fn sanitize_fts(q: &str) -> String {
618 q.chars()
619 .filter(|c| c.is_alphanumeric() || c.is_whitespace() || matches!(*c, '.' | '@' | '_'))
620 .collect::<String>()
621 .trim()
622 .to_string()
623}
624
625fn build_fts_query(cleaned: &str) -> String {
630 cleaned
631 .split_whitespace()
632 .filter(|t| !t.is_empty())
633 .map(|t| format!("{t}*"))
634 .collect::<Vec<_>>()
635 .join(" ")
636}