1use crate::traits::{MailboxStore, MessageStore, MetadataStore, StorageBackend};
5use crate::types::{
6 Mailbox, MailboxCounters, MailboxId, MailboxPath, MessageFlags, MessageMetadata, Quota,
7 SearchCriteria, SpecialUseAttributes,
8};
9use async_trait::async_trait;
10use rusmes_proto::{Mail, MailAddress, MessageId, Username};
11use sqlx::postgres::{PgPool, PgPoolOptions};
12use sqlx::{Executor, Row};
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17#[derive(Debug, Clone)]
19pub struct PostgresConfig {
20 pub max_connections: u32,
22 pub min_connections: u32,
24 pub connect_timeout: Duration,
26 pub idle_timeout: Option<Duration>,
28 pub max_lifetime: Option<Duration>,
30 pub inline_threshold: usize,
32}
33
34impl Default for PostgresConfig {
35 fn default() -> Self {
36 Self {
37 max_connections: 20,
38 min_connections: 5,
39 connect_timeout: Duration::from_secs(30),
40 idle_timeout: Some(Duration::from_secs(600)),
41 max_lifetime: Some(Duration::from_secs(1800)),
42 inline_threshold: 100 * 1024, }
44 }
45}
46
47pub struct PostgresBackend {
49 pool: PgPool,
50 config: PostgresConfig,
51}
52
53impl PostgresBackend {
54 pub async fn new(database_url: &str) -> anyhow::Result<Self> {
56 Self::with_config(database_url, PostgresConfig::default()).await
57 }
58
59 pub async fn with_config(database_url: &str, config: PostgresConfig) -> anyhow::Result<Self> {
61 let mut pool_options = PgPoolOptions::new()
62 .max_connections(config.max_connections)
63 .min_connections(config.min_connections)
64 .acquire_timeout(config.connect_timeout);
65
66 if let Some(idle_timeout) = config.idle_timeout {
67 pool_options = pool_options.idle_timeout(idle_timeout);
68 }
69
70 if let Some(max_lifetime) = config.max_lifetime {
71 pool_options = pool_options.max_lifetime(max_lifetime);
72 }
73
74 let pool = pool_options.connect(database_url).await?;
75
76 Ok(Self { pool, config })
77 }
78
79 pub async fn init_schema(&self) -> anyhow::Result<()> {
81 tracing::info!("Initializing PostgreSQL schema");
82
83 self.pool
85 .execute(
86 r#"
87 CREATE TABLE IF NOT EXISTS schema_migrations (
88 version INTEGER PRIMARY KEY,
89 applied_at TIMESTAMP NOT NULL DEFAULT NOW()
90 )
91 "#,
92 )
93 .await?;
94
95 self.apply_migration_1_mailboxes().await?;
97 self.apply_migration_2_messages().await?;
98 self.apply_migration_3_message_blobs().await?;
99 self.apply_migration_4_indexes().await?;
100 self.apply_migration_5_fulltext().await?;
101
102 tracing::info!("PostgreSQL schema initialization complete");
103 Ok(())
104 }
105
106 async fn apply_migration_1_mailboxes(&self) -> anyhow::Result<()> {
107 let version = 1;
108 if self.is_migration_applied(version).await? {
109 return Ok(());
110 }
111
112 tracing::info!("Applying migration {}: mailboxes and quotas", version);
113
114 self.pool
116 .execute(
117 r#"
118 CREATE TABLE IF NOT EXISTS mailboxes (
119 id UUID PRIMARY KEY,
120 username TEXT NOT NULL,
121 path TEXT NOT NULL,
122 uid_validity INTEGER NOT NULL,
123 uid_next INTEGER NOT NULL,
124 special_use TEXT,
125 created_at TIMESTAMP NOT NULL DEFAULT NOW(),
126 updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
127 UNIQUE(username, path)
128 )
129 "#,
130 )
131 .await?;
132
133 self.pool
135 .execute(
136 r#"
137 CREATE TABLE IF NOT EXISTS subscriptions (
138 username TEXT NOT NULL,
139 mailbox_name TEXT NOT NULL,
140 created_at TIMESTAMP NOT NULL DEFAULT NOW(),
141 PRIMARY KEY(username, mailbox_name)
142 )
143 "#,
144 )
145 .await?;
146
147 self.pool
149 .execute(
150 r#"
151 CREATE TABLE IF NOT EXISTS user_quotas (
152 username TEXT PRIMARY KEY,
153 used BIGINT NOT NULL DEFAULT 0,
154 quota_limit BIGINT NOT NULL,
155 updated_at TIMESTAMP NOT NULL DEFAULT NOW()
156 )
157 "#,
158 )
159 .await?;
160
161 self.mark_migration_applied(version).await?;
162 Ok(())
163 }
164
165 async fn apply_migration_2_messages(&self) -> anyhow::Result<()> {
166 let version = 2;
167 if self.is_migration_applied(version).await? {
168 return Ok(());
169 }
170
171 tracing::info!("Applying migration {}: messages and flags", version);
172
173 self.pool
175 .execute(
176 r#"
177 CREATE TABLE IF NOT EXISTS messages (
178 id UUID PRIMARY KEY,
179 mailbox_id UUID NOT NULL REFERENCES mailboxes(id) ON DELETE CASCADE,
180 uid INTEGER NOT NULL,
181 sender TEXT,
182 recipients TEXT[] NOT NULL,
183 subject TEXT,
184 headers JSONB NOT NULL,
185 body_inline BYTEA,
186 body_external_ref UUID,
187 size INTEGER NOT NULL,
188 search_vector TSVECTOR,
189 created_at TIMESTAMP NOT NULL DEFAULT NOW(),
190 UNIQUE(mailbox_id, uid)
191 )
192 "#,
193 )
194 .await?;
195
196 self.pool
198 .execute(
199 r#"
200 CREATE TABLE IF NOT EXISTS message_flags (
201 message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
202 flag_seen BOOLEAN NOT NULL DEFAULT FALSE,
203 flag_answered BOOLEAN NOT NULL DEFAULT FALSE,
204 flag_flagged BOOLEAN NOT NULL DEFAULT FALSE,
205 flag_deleted BOOLEAN NOT NULL DEFAULT FALSE,
206 flag_draft BOOLEAN NOT NULL DEFAULT FALSE,
207 flag_recent BOOLEAN NOT NULL DEFAULT FALSE,
208 custom_flags TEXT[] NOT NULL DEFAULT '{}',
209 PRIMARY KEY(message_id)
210 )
211 "#,
212 )
213 .await?;
214
215 self.mark_migration_applied(version).await?;
216 Ok(())
217 }
218
219 async fn apply_migration_3_message_blobs(&self) -> anyhow::Result<()> {
220 let version = 3;
221 if self.is_migration_applied(version).await? {
222 return Ok(());
223 }
224
225 tracing::info!("Applying migration {}: message blobs", version);
226
227 self.pool
229 .execute(
230 r#"
231 CREATE TABLE IF NOT EXISTS message_blobs (
232 id UUID PRIMARY KEY,
233 data BYTEA NOT NULL,
234 created_at TIMESTAMP NOT NULL DEFAULT NOW()
235 )
236 "#,
237 )
238 .await?;
239
240 self.mark_migration_applied(version).await?;
241 Ok(())
242 }
243
244 async fn apply_migration_4_indexes(&self) -> anyhow::Result<()> {
245 let version = 4;
246 if self.is_migration_applied(version).await? {
247 return Ok(());
248 }
249
250 tracing::info!("Applying migration {}: indexes", version);
251
252 self.pool
254 .execute("CREATE INDEX IF NOT EXISTS idx_mailboxes_username ON mailboxes(username)")
255 .await?;
256 self.pool
257 .execute("CREATE INDEX IF NOT EXISTS idx_mailboxes_path ON mailboxes(path)")
258 .await?;
259
260 self.pool
262 .execute("CREATE INDEX IF NOT EXISTS idx_messages_mailbox ON messages(mailbox_id)")
263 .await?;
264 self.pool
265 .execute(
266 "CREATE INDEX IF NOT EXISTS idx_messages_mailbox_uid ON messages(mailbox_id, uid)",
267 )
268 .await?;
269 self.pool
270 .execute("CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages(sender)")
271 .await?;
272 self.pool
273 .execute("CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at)")
274 .await?;
275
276 self.pool
278 .execute("CREATE INDEX IF NOT EXISTS idx_flags_message ON message_flags(message_id)")
279 .await?;
280 self.pool
281 .execute("CREATE INDEX IF NOT EXISTS idx_flags_seen ON message_flags(message_id) WHERE flag_seen = FALSE")
282 .await?;
283 self.pool
284 .execute("CREATE INDEX IF NOT EXISTS idx_flags_recent ON message_flags(message_id) WHERE flag_recent = TRUE")
285 .await?;
286 self.pool
287 .execute("CREATE INDEX IF NOT EXISTS idx_flags_deleted ON message_flags(message_id) WHERE flag_deleted = TRUE")
288 .await?;
289
290 self.mark_migration_applied(version).await?;
291 Ok(())
292 }
293
294 async fn apply_migration_5_fulltext(&self) -> anyhow::Result<()> {
295 let version = 5;
296 if self.is_migration_applied(version).await? {
297 return Ok(());
298 }
299
300 tracing::info!("Applying migration {}: full-text search", version);
301
302 self.pool
304 .execute("CREATE INDEX IF NOT EXISTS idx_messages_search ON messages USING GIN(search_vector)")
305 .await?;
306
307 self.pool.execute(
309 r#"
310 CREATE OR REPLACE FUNCTION messages_search_vector_update() RETURNS trigger AS $$
311 BEGIN
312 NEW.search_vector :=
313 setweight(to_tsvector('english', COALESCE(NEW.subject, '')), 'A') ||
314 setweight(to_tsvector('english', COALESCE(NEW.sender, '')), 'B') ||
315 setweight(to_tsvector('english', COALESCE(array_to_string(NEW.recipients, ' '), '')), 'C') ||
316 setweight(to_tsvector('english', COALESCE(NEW.headers::text, '')), 'D');
317 RETURN NEW;
318 END
319 $$ LANGUAGE plpgsql
320 "#,
321 ).await?;
322
323 self.pool
325 .execute(
326 r#"
327 DROP TRIGGER IF EXISTS messages_search_vector_trigger ON messages;
328 CREATE TRIGGER messages_search_vector_trigger
329 BEFORE INSERT OR UPDATE ON messages
330 FOR EACH ROW EXECUTE FUNCTION messages_search_vector_update()
331 "#,
332 )
333 .await?;
334
335 self.mark_migration_applied(version).await?;
336 Ok(())
337 }
338
339 async fn is_migration_applied(&self, version: i32) -> anyhow::Result<bool> {
340 let result = sqlx::query("SELECT version FROM schema_migrations WHERE version = $1")
341 .bind(version)
342 .fetch_optional(&self.pool)
343 .await?;
344 Ok(result.is_some())
345 }
346
347 async fn mark_migration_applied(&self, version: i32) -> anyhow::Result<()> {
348 sqlx::query("INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING")
349 .bind(version)
350 .execute(&self.pool)
351 .await?;
352 Ok(())
353 }
354
355 pub async fn vacuum(&self) -> anyhow::Result<()> {
357 tracing::info!("Running VACUUM on PostgreSQL database");
358 self.pool.execute("VACUUM ANALYZE").await?;
359 Ok(())
360 }
361
362 pub async fn reindex(&self) -> anyhow::Result<()> {
364 tracing::info!("Running REINDEX on PostgreSQL database");
365 self.pool.execute("REINDEX DATABASE CONCURRENTLY").await?;
366 Ok(())
367 }
368
369 pub fn pool_size(&self) -> u32 {
371 self.pool.size()
372 }
373
374 pub fn idle_connections(&self) -> usize {
376 self.pool.num_idle()
377 }
378
379 pub fn pool(&self) -> &PgPool {
381 &self.pool
382 }
383}
384
385impl StorageBackend for PostgresBackend {
386 fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
387 Arc::new(PostgresMailboxStore {
388 pool: self.pool.clone(),
389 })
390 }
391
392 fn message_store(&self) -> Arc<dyn MessageStore> {
393 Arc::new(PostgresMessageStore {
394 pool: self.pool.clone(),
395 inline_threshold: self.config.inline_threshold,
396 })
397 }
398
399 fn metadata_store(&self) -> Arc<dyn MetadataStore> {
400 Arc::new(PostgresMetadataStore {
401 pool: self.pool.clone(),
402 })
403 }
404}
405
406struct PostgresMailboxStore {
408 pool: PgPool,
409}
410
411#[async_trait]
412impl MailboxStore for PostgresMailboxStore {
413 async fn create_mailbox(&self, path: &MailboxPath) -> anyhow::Result<MailboxId> {
414 let mailbox = Mailbox::new(path.clone());
415 let id = *mailbox.id();
416
417 sqlx::query(
418 r#"
419 INSERT INTO mailboxes (id, username, path, uid_validity, uid_next, special_use)
420 VALUES ($1, $2, $3, $4, $5, $6)
421 "#,
422 )
423 .bind(*id.as_uuid())
424 .bind(path.user().to_string())
425 .bind(path.path().join("/"))
426 .bind(mailbox.uid_validity() as i32)
427 .bind(mailbox.uid_next() as i32)
428 .bind(mailbox.special_use())
429 .execute(&self.pool)
430 .await
431 .map_err(|e| anyhow::anyhow!("Failed to create mailbox: {}", e))?;
432
433 tracing::debug!("Created mailbox {} at path {}", id, path);
434 Ok(id)
435 }
436
437 async fn create_mailbox_with_special_use(
438 &self,
439 path: &MailboxPath,
440 special_use: SpecialUseAttributes,
441 ) -> anyhow::Result<MailboxId> {
442 let mailbox = Mailbox::new(path.clone());
443 let id = *mailbox.id();
444 let special_use_str = special_use.to_string();
445
446 sqlx::query(
447 r#"
448 INSERT INTO mailboxes (id, username, path, uid_validity, uid_next, special_use)
449 VALUES ($1, $2, $3, $4, $5, $6)
450 "#,
451 )
452 .bind(*id.as_uuid())
453 .bind(path.user().to_string())
454 .bind(path.path().join("/"))
455 .bind(mailbox.uid_validity() as i32)
456 .bind(mailbox.uid_next() as i32)
457 .bind(special_use_str)
458 .execute(&self.pool)
459 .await
460 .map_err(|e| anyhow::anyhow!("Failed to create mailbox with special use: {}", e))?;
461
462 tracing::debug!("Created mailbox {} with special use at path {}", id, path);
463 Ok(id)
464 }
465
466 async fn delete_mailbox(&self, id: &MailboxId) -> anyhow::Result<()> {
467 let result = sqlx::query("DELETE FROM mailboxes WHERE id = $1")
468 .bind(*id.as_uuid())
469 .execute(&self.pool)
470 .await
471 .map_err(|e| anyhow::anyhow!("Failed to delete mailbox: {}", e))?;
472
473 if result.rows_affected() == 0 {
474 return Err(anyhow::anyhow!("Mailbox not found: {}", id));
475 }
476
477 tracing::debug!("Deleted mailbox {}", id);
478 Ok(())
479 }
480
481 async fn rename_mailbox(&self, id: &MailboxId, new_path: &MailboxPath) -> anyhow::Result<()> {
482 let result =
483 sqlx::query("UPDATE mailboxes SET path = $1, updated_at = NOW() WHERE id = $2")
484 .bind(new_path.path().join("/"))
485 .bind(*id.as_uuid())
486 .execute(&self.pool)
487 .await
488 .map_err(|e| anyhow::anyhow!("Failed to rename mailbox: {}", e))?;
489
490 if result.rows_affected() == 0 {
491 return Err(anyhow::anyhow!("Mailbox not found: {}", id));
492 }
493
494 tracing::debug!("Renamed mailbox {} to {}", id, new_path);
495 Ok(())
496 }
497
498 async fn get_mailbox(&self, id: &MailboxId) -> anyhow::Result<Option<Mailbox>> {
499 let row = sqlx::query(
500 "SELECT id, username, path, uid_validity, uid_next, special_use FROM mailboxes WHERE id = $1"
501 )
502 .bind(*id.as_uuid())
503 .fetch_optional(&self.pool)
504 .await
505 .map_err(|e| anyhow::anyhow!("Failed to get mailbox: {}", e))?;
506
507 let row = match row {
508 Some(r) => r,
509 None => return Ok(None),
510 };
511
512 let mailbox = self.row_to_mailbox(row)?;
513 Ok(Some(mailbox))
514 }
515
516 async fn list_mailboxes(&self, user: &Username) -> anyhow::Result<Vec<Mailbox>> {
517 let rows = sqlx::query(
518 "SELECT id, username, path, uid_validity, uid_next, special_use FROM mailboxes WHERE username = $1 ORDER BY path"
519 )
520 .bind(user.to_string())
521 .fetch_all(&self.pool)
522 .await
523 .map_err(|e| anyhow::anyhow!("Failed to list mailboxes: {}", e))?;
524
525 let mailboxes = rows
526 .into_iter()
527 .filter_map(|row| self.row_to_mailbox(row).ok())
528 .collect();
529
530 Ok(mailboxes)
531 }
532
533 async fn get_user_inbox(&self, user: &Username) -> anyhow::Result<Option<MailboxId>> {
534 let row =
535 sqlx::query("SELECT id FROM mailboxes WHERE username = $1 AND path = 'INBOX' LIMIT 1")
536 .bind(user.to_string())
537 .fetch_optional(&self.pool)
538 .await
539 .map_err(|e| anyhow::anyhow!("Failed to get user inbox: {}", e))?;
540
541 if let Some(row) = row {
542 let uuid: uuid::Uuid = row.get(0);
543 Ok(Some(MailboxId::from_uuid(uuid)))
544 } else {
545 Ok(None)
546 }
547 }
548
549 async fn get_mailbox_special_use(
550 &self,
551 id: &MailboxId,
552 ) -> anyhow::Result<SpecialUseAttributes> {
553 let row = sqlx::query("SELECT special_use FROM mailboxes WHERE id = $1")
554 .bind(*id.as_uuid())
555 .fetch_optional(&self.pool)
556 .await
557 .map_err(|e| anyhow::anyhow!("Failed to get special use: {}", e))?;
558
559 match row {
560 Some(r) => {
561 let special_use: Option<String> = r.try_get("special_use")?;
562 match special_use {
563 Some(s) if !s.is_empty() => {
564 let attrs = s
565 .split_whitespace()
566 .map(|s| s.to_string())
567 .collect::<Vec<_>>();
568 Ok(SpecialUseAttributes::from_vec(attrs))
569 }
570 _ => Ok(SpecialUseAttributes::new()),
571 }
572 }
573 None => Ok(SpecialUseAttributes::new()),
574 }
575 }
576
577 async fn set_mailbox_special_use(
578 &self,
579 id: &MailboxId,
580 special_use: SpecialUseAttributes,
581 ) -> anyhow::Result<()> {
582 let special_use_str = special_use.to_string();
583
584 sqlx::query("UPDATE mailboxes SET special_use = $1, updated_at = NOW() WHERE id = $2")
585 .bind(special_use_str)
586 .bind(*id.as_uuid())
587 .execute(&self.pool)
588 .await
589 .map_err(|e| anyhow::anyhow!("Failed to set special use: {}", e))?;
590
591 Ok(())
592 }
593
594 async fn subscribe_mailbox(&self, user: &Username, mailbox_name: String) -> anyhow::Result<()> {
595 sqlx::query(
596 "INSERT INTO subscriptions (username, mailbox_name) VALUES ($1, $2) ON CONFLICT DO NOTHING"
597 )
598 .bind(user.to_string())
599 .bind(mailbox_name)
600 .execute(&self.pool)
601 .await
602 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
603
604 Ok(())
605 }
606
607 async fn unsubscribe_mailbox(&self, user: &Username, mailbox_name: &str) -> anyhow::Result<()> {
608 sqlx::query("DELETE FROM subscriptions WHERE username = $1 AND mailbox_name = $2")
609 .bind(user.to_string())
610 .bind(mailbox_name)
611 .execute(&self.pool)
612 .await
613 .map_err(|e| anyhow::anyhow!("Failed to unsubscribe: {}", e))?;
614
615 Ok(())
616 }
617
618 async fn list_subscriptions(&self, user: &Username) -> anyhow::Result<Vec<String>> {
619 let rows = sqlx::query("SELECT mailbox_name FROM subscriptions WHERE username = $1")
620 .bind(user.to_string())
621 .fetch_all(&self.pool)
622 .await
623 .map_err(|e| anyhow::anyhow!("Failed to list subscriptions: {}", e))?;
624
625 let subscriptions = rows
626 .into_iter()
627 .filter_map(|row| row.try_get("mailbox_name").ok())
628 .collect();
629
630 Ok(subscriptions)
631 }
632}
633
634impl PostgresMailboxStore {
635 fn row_to_mailbox(&self, row: sqlx::postgres::PgRow) -> anyhow::Result<Mailbox> {
636 let username: String = row.try_get("username")?;
637 let path_str: String = row.try_get("path")?;
638 let path_parts: Vec<String> = path_str.split('/').map(|s| s.to_string()).collect();
639 let username_obj =
640 Username::new(username).map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
641 let path = MailboxPath::new(username_obj, path_parts);
642
643 let mut mailbox = Mailbox::new(path);
644 let special_use: Option<String> = row.try_get("special_use")?;
645 mailbox.set_special_use(special_use);
646
647 Ok(mailbox)
648 }
649}
650
651struct PostgresMessageStore {
653 pool: PgPool,
654 inline_threshold: usize,
655}
656
657#[async_trait]
658impl MessageStore for PostgresMessageStore {
659 async fn append_message(
660 &self,
661 mailbox_id: &MailboxId,
662 message: Mail,
663 ) -> anyhow::Result<MessageMetadata> {
664 let mut tx = self.pool.begin().await?;
665
666 let uid_row = sqlx::query("SELECT uid_next FROM mailboxes WHERE id = $1 FOR UPDATE")
668 .bind(*mailbox_id.as_uuid())
669 .fetch_one(&mut *tx)
670 .await
671 .map_err(|e| anyhow::anyhow!("Failed to get next UID: {}", e))?;
672 let uid: i32 = uid_row.try_get("uid_next")?;
673
674 let message_id = *message.message_id();
676 let sender = message.sender().map(|s| s.to_string());
677 let recipients: Vec<String> = message.recipients().iter().map(|r| r.to_string()).collect();
678 let message_size = message.size();
679
680 let subject = message
682 .message()
683 .headers()
684 .get_first("subject")
685 .map(|s| s.to_string());
686
687 let mut headers_map = serde_json::Map::new();
689 for (name, values) in message.message().headers().iter() {
690 headers_map.insert(name.clone(), serde_json::json!(values));
691 }
692 let headers_json = serde_json::Value::Object(headers_map);
693
694 let (body_inline, body_external_ref) = if message_size < self.inline_threshold {
696 let body_bytes = match message.message().body() {
698 rusmes_proto::MessageBody::Small(bytes) => bytes.to_vec(),
699 _ => vec![],
700 };
701 (Some(body_bytes), None)
702 } else {
703 let blob_id = uuid::Uuid::new_v4();
705 let body_bytes = match message.message().body() {
706 rusmes_proto::MessageBody::Small(bytes) => bytes.to_vec(),
707 _ => vec![],
708 };
709
710 sqlx::query("INSERT INTO message_blobs (id, data) VALUES ($1, $2)")
711 .bind(blob_id)
712 .bind(&body_bytes)
713 .execute(&mut *tx)
714 .await
715 .map_err(|e| anyhow::anyhow!("Failed to store message blob: {}", e))?;
716
717 (None, Some(blob_id))
718 };
719
720 sqlx::query(
722 r#"
723 INSERT INTO messages (id, mailbox_id, uid, sender, recipients, subject, headers, body_inline, body_external_ref, size)
724 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
725 "#,
726 )
727 .bind(*message_id.as_uuid())
728 .bind(*mailbox_id.as_uuid())
729 .bind(uid)
730 .bind(&sender)
731 .bind(&recipients)
732 .bind(&subject)
733 .bind(&headers_json)
734 .bind(body_inline)
735 .bind(body_external_ref)
736 .bind(message_size as i32)
737 .execute(&mut *tx)
738 .await
739 .map_err(|e| anyhow::anyhow!("Failed to insert message: {}", e))?;
740
741 sqlx::query("INSERT INTO message_flags (message_id, flag_recent) VALUES ($1, TRUE)")
743 .bind(*message_id.as_uuid())
744 .execute(&mut *tx)
745 .await
746 .map_err(|e| anyhow::anyhow!("Failed to insert flags: {}", e))?;
747
748 sqlx::query("UPDATE mailboxes SET uid_next = $1, updated_at = NOW() WHERE id = $2")
750 .bind(uid + 1)
751 .bind(*mailbox_id.as_uuid())
752 .execute(&mut *tx)
753 .await
754 .map_err(|e| anyhow::anyhow!("Failed to update mailbox: {}", e))?;
755
756 let mailbox_row = sqlx::query("SELECT username FROM mailboxes WHERE id = $1")
758 .bind(*mailbox_id.as_uuid())
759 .fetch_one(&mut *tx)
760 .await
761 .map_err(|e| anyhow::anyhow!("Failed to get mailbox: {}", e))?;
762 let username: String = mailbox_row.try_get("username")?;
763
764 sqlx::query(
765 r#"
766 INSERT INTO user_quotas (username, used, quota_limit)
767 VALUES ($1, $2, 1073741824)
768 ON CONFLICT (username) DO UPDATE
769 SET used = user_quotas.used + $2, updated_at = NOW()
770 "#,
771 )
772 .bind(&username)
773 .bind(message_size as i64)
774 .execute(&mut *tx)
775 .await
776 .map_err(|e| anyhow::anyhow!("Failed to update quota: {}", e))?;
777
778 tx.commit().await?;
779
780 let mut flags = MessageFlags::new();
781 flags.set_recent(true);
782
783 let metadata =
784 MessageMetadata::new(message_id, *mailbox_id, uid as u32, flags, message_size);
785
786 tracing::debug!(
787 "Appended message {} to mailbox {} with UID {}",
788 message_id,
789 mailbox_id,
790 uid
791 );
792 Ok(metadata)
793 }
794
795 async fn get_message(&self, message_id: &MessageId) -> anyhow::Result<Option<Mail>> {
796 let row = sqlx::query(
798 r#"
799 SELECT m.sender, m.recipients, m.headers, m.body_inline, m.body_external_ref
800 FROM messages m
801 WHERE m.id = $1
802 "#,
803 )
804 .bind(*message_id.as_uuid())
805 .fetch_optional(&self.pool)
806 .await
807 .map_err(|e| anyhow::anyhow!("Failed to fetch message: {}", e))?;
808
809 let row = match row {
810 Some(r) => r,
811 None => return Ok(None),
812 };
813
814 let sender: Option<String> = row.try_get("sender")?;
816 let recipients: Vec<String> = row.try_get("recipients")?;
817 let headers_json: serde_json::Value = row.try_get("headers")?;
818 let body_inline: Option<Vec<u8>> = row.try_get("body_inline")?;
819 let body_external_ref: Option<uuid::Uuid> = row.try_get("body_external_ref")?;
820
821 let mut headers = rusmes_proto::HeaderMap::new();
823 if let Some(headers_obj) = headers_json.as_object() {
824 for (name, value) in headers_obj {
825 if let Some(values_array) = value.as_array() {
826 for v in values_array {
827 if let Some(v_str) = v.as_str() {
828 headers.insert(name.clone(), v_str.to_string());
829 }
830 }
831 }
832 }
833 }
834
835 let body_bytes = if let Some(inline) = body_inline {
837 inline
838 } else if let Some(blob_id) = body_external_ref {
839 let blob_row = sqlx::query("SELECT data FROM message_blobs WHERE id = $1")
841 .bind(blob_id)
842 .fetch_optional(&self.pool)
843 .await
844 .map_err(|e| anyhow::anyhow!("Failed to fetch message blob: {}", e))?;
845
846 if let Some(blob) = blob_row {
847 blob.try_get("data")?
848 } else {
849 tracing::warn!("Message blob {} not found", blob_id);
850 vec![]
851 }
852 } else {
853 vec![]
854 };
855
856 let body = rusmes_proto::MessageBody::Small(bytes::Bytes::from(body_bytes));
857 let mime_message = rusmes_proto::MimeMessage::new(headers, body);
858
859 let sender_addr = if let Some(sender_str) = sender {
861 MailAddress::from_str(&sender_str).ok()
862 } else {
863 None
864 };
865
866 let recipient_addrs: Vec<MailAddress> = recipients
867 .into_iter()
868 .filter_map(|r| MailAddress::from_str(&r).ok())
869 .collect();
870
871 let mail = rusmes_proto::Mail::with_message_id(
873 sender_addr,
874 recipient_addrs,
875 mime_message,
876 None, None, *message_id,
879 );
880
881 Ok(Some(mail))
882 }
883
884 async fn delete_messages(&self, message_ids: &[MessageId]) -> anyhow::Result<()> {
885 if message_ids.is_empty() {
886 return Ok(());
887 }
888
889 let mut tx = self.pool.begin().await?;
890
891 let uuids: Vec<uuid::Uuid> = message_ids.iter().map(|id| *id.as_uuid()).collect();
892
893 let blob_rows = sqlx::query("SELECT body_external_ref FROM messages WHERE id = ANY($1) AND body_external_ref IS NOT NULL")
895 .bind(&uuids)
896 .fetch_all(&mut *tx)
897 .await?;
898
899 let blob_ids: Vec<uuid::Uuid> = blob_rows
900 .into_iter()
901 .filter_map(|row| row.try_get("body_external_ref").ok())
902 .collect();
903
904 sqlx::query("DELETE FROM messages WHERE id = ANY($1)")
906 .bind(&uuids)
907 .execute(&mut *tx)
908 .await
909 .map_err(|e| anyhow::anyhow!("Failed to delete messages: {}", e))?;
910
911 if !blob_ids.is_empty() {
913 sqlx::query("DELETE FROM message_blobs WHERE id = ANY($1)")
914 .bind(&blob_ids)
915 .execute(&mut *tx)
916 .await
917 .map_err(|e| anyhow::anyhow!("Failed to delete blobs: {}", e))?;
918 }
919
920 tx.commit().await?;
921
922 tracing::debug!("Deleted {} messages", message_ids.len());
923 Ok(())
924 }
925
926 async fn set_flags(
927 &self,
928 message_ids: &[MessageId],
929 flags: MessageFlags,
930 ) -> anyhow::Result<()> {
931 if message_ids.is_empty() {
932 return Ok(());
933 }
934
935 let uuids: Vec<uuid::Uuid> = message_ids.iter().map(|id| *id.as_uuid()).collect();
936 let custom_flags: Vec<String> = flags.custom().iter().cloned().collect();
937
938 sqlx::query(
939 r#"
940 UPDATE message_flags SET
941 flag_seen = $1,
942 flag_answered = $2,
943 flag_flagged = $3,
944 flag_deleted = $4,
945 flag_draft = $5,
946 flag_recent = $6,
947 custom_flags = $7
948 WHERE message_id = ANY($8)
949 "#,
950 )
951 .bind(flags.is_seen())
952 .bind(flags.is_answered())
953 .bind(flags.is_flagged())
954 .bind(flags.is_deleted())
955 .bind(flags.is_draft())
956 .bind(flags.is_recent())
957 .bind(&custom_flags)
958 .bind(&uuids)
959 .execute(&self.pool)
960 .await
961 .map_err(|e| anyhow::anyhow!("Failed to set flags: {}", e))?;
962
963 tracing::debug!("Set flags for {} messages", message_ids.len());
964 Ok(())
965 }
966
967 async fn search(
968 &self,
969 mailbox_id: &MailboxId,
970 criteria: SearchCriteria,
971 ) -> anyhow::Result<Vec<MessageId>> {
972 let message_ids = match criteria {
973 SearchCriteria::All => self.search_all(mailbox_id).await?,
974 SearchCriteria::Unseen => self.search_unseen(mailbox_id).await?,
975 SearchCriteria::Seen => self.search_seen(mailbox_id).await?,
976 SearchCriteria::Flagged => self.search_flagged(mailbox_id).await?,
977 SearchCriteria::Unflagged => self.search_unflagged(mailbox_id).await?,
978 SearchCriteria::Deleted => self.search_deleted(mailbox_id).await?,
979 SearchCriteria::Undeleted => self.search_undeleted(mailbox_id).await?,
980 SearchCriteria::From(email) => self.search_from(mailbox_id, &email).await?,
981 SearchCriteria::To(email) => self.search_to(mailbox_id, &email).await?,
982 SearchCriteria::Subject(text) => self.search_subject(mailbox_id, &text).await?,
983 SearchCriteria::Body(text) => self.search_body(mailbox_id, &text).await?,
984 SearchCriteria::And(criteria_list) => {
985 self.search_and(mailbox_id, criteria_list).await?
986 }
987 SearchCriteria::Or(criteria_list) => self.search_or(mailbox_id, criteria_list).await?,
988 SearchCriteria::Not(criteria) => self.search_not(mailbox_id, *criteria).await?,
989 };
990
991 Ok(message_ids)
992 }
993
994 async fn copy_messages(
995 &self,
996 message_ids: &[MessageId],
997 dest_mailbox_id: &MailboxId,
998 ) -> anyhow::Result<Vec<MessageMetadata>> {
999 if message_ids.is_empty() {
1000 return Ok(vec![]);
1001 }
1002
1003 let mut tx = self.pool.begin().await?;
1004 let mut metadata_list = Vec::new();
1005
1006 for message_id in message_ids {
1007 let uid_row = sqlx::query("SELECT uid_next FROM mailboxes WHERE id = $1 FOR UPDATE")
1009 .bind(*dest_mailbox_id.as_uuid())
1010 .fetch_one(&mut *tx)
1011 .await?;
1012 let uid: i32 = uid_row.try_get("uid_next")?;
1013
1014 let new_message_id = MessageId::new();
1016 sqlx::query(
1017 r#"
1018 INSERT INTO messages (id, mailbox_id, uid, sender, recipients, subject, headers, body_inline, body_external_ref, size)
1019 SELECT $1, $2, $3, sender, recipients, subject, headers, body_inline, body_external_ref, size
1020 FROM messages WHERE id = $4
1021 "#,
1022 )
1023 .bind(*new_message_id.as_uuid())
1024 .bind(*dest_mailbox_id.as_uuid())
1025 .bind(uid)
1026 .bind(*message_id.as_uuid())
1027 .execute(&mut *tx)
1028 .await?;
1029
1030 sqlx::query(
1032 r#"
1033 INSERT INTO message_flags (message_id, flag_seen, flag_answered, flag_flagged, flag_deleted, flag_draft, flag_recent, custom_flags)
1034 SELECT $1, flag_seen, flag_answered, flag_flagged, flag_deleted, flag_draft, FALSE, custom_flags
1035 FROM message_flags WHERE message_id = $2
1036 "#,
1037 )
1038 .bind(*new_message_id.as_uuid())
1039 .bind(*message_id.as_uuid())
1040 .execute(&mut *tx)
1041 .await?;
1042
1043 sqlx::query("UPDATE mailboxes SET uid_next = $1 WHERE id = $2")
1045 .bind(uid + 1)
1046 .bind(*dest_mailbox_id.as_uuid())
1047 .execute(&mut *tx)
1048 .await?;
1049
1050 let size_row = sqlx::query("SELECT size FROM messages WHERE id = $1")
1052 .bind(*new_message_id.as_uuid())
1053 .fetch_one(&mut *tx)
1054 .await?;
1055 let size: i32 = size_row.try_get("size")?;
1056
1057 metadata_list.push(MessageMetadata::new(
1058 new_message_id,
1059 *dest_mailbox_id,
1060 uid as u32,
1061 MessageFlags::new(),
1062 size as usize,
1063 ));
1064 }
1065
1066 tx.commit().await?;
1067
1068 tracing::debug!(
1069 "Copied {} messages to mailbox {}",
1070 message_ids.len(),
1071 dest_mailbox_id
1072 );
1073 Ok(metadata_list)
1074 }
1075
1076 async fn get_mailbox_messages(
1077 &self,
1078 mailbox_id: &MailboxId,
1079 ) -> anyhow::Result<Vec<MessageMetadata>> {
1080 let rows = sqlx::query(
1081 r#"
1082 SELECT m.id, m.mailbox_id, m.uid, m.size,
1083 f.flag_seen, f.flag_answered, f.flag_flagged,
1084 f.flag_deleted, f.flag_draft, f.flag_recent, f.custom_flags
1085 FROM messages m
1086 LEFT JOIN message_flags f ON m.id = f.message_id
1087 WHERE m.mailbox_id = $1
1088 ORDER BY m.uid
1089 "#,
1090 )
1091 .bind(*mailbox_id.as_uuid())
1092 .fetch_all(&self.pool)
1093 .await
1094 .map_err(|e| anyhow::anyhow!("Failed to get mailbox messages: {}", e))?;
1095
1096 let metadata_list = rows
1097 .into_iter()
1098 .filter_map(|row| self.row_to_metadata(row).ok())
1099 .collect();
1100
1101 Ok(metadata_list)
1102 }
1103}
1104
1105impl PostgresMessageStore {
1106 fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> anyhow::Result<MessageMetadata> {
1107 let _msg_uuid: uuid::Uuid = row.try_get("id")?;
1108 let _mailbox_uuid: uuid::Uuid = row.try_get("mailbox_id")?;
1109 let uid: i32 = row.try_get("uid")?;
1110 let size: i32 = row.try_get("size")?;
1111
1112 let mut flags = MessageFlags::new();
1113 if let Ok(seen) = row.try_get::<bool, _>("flag_seen") {
1114 flags.set_seen(seen);
1115 }
1116 if let Ok(answered) = row.try_get::<bool, _>("flag_answered") {
1117 flags.set_answered(answered);
1118 }
1119 if let Ok(flagged) = row.try_get::<bool, _>("flag_flagged") {
1120 flags.set_flagged(flagged);
1121 }
1122 if let Ok(deleted) = row.try_get::<bool, _>("flag_deleted") {
1123 flags.set_deleted(deleted);
1124 }
1125 if let Ok(draft) = row.try_get::<bool, _>("flag_draft") {
1126 flags.set_draft(draft);
1127 }
1128 if let Ok(recent) = row.try_get::<bool, _>("flag_recent") {
1129 flags.set_recent(recent);
1130 }
1131 if let Ok(custom) = row.try_get::<Vec<String>, _>("custom_flags") {
1132 for flag in custom {
1133 flags.add_custom(flag);
1134 }
1135 }
1136
1137 let message_id = MessageId::new();
1139 let mailbox_id = MailboxId::new();
1140
1141 Ok(MessageMetadata::new(
1142 message_id,
1143 mailbox_id,
1144 uid as u32,
1145 flags,
1146 size as usize,
1147 ))
1148 }
1149
1150 async fn search_all(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1151 let rows = sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1")
1152 .bind(*mailbox_id.as_uuid())
1153 .fetch_all(&self.pool)
1154 .await?;
1155
1156 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1157 }
1158
1159 async fn search_unseen(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1160 let rows = sqlx::query(
1161 r#"
1162 SELECT m.id FROM messages m
1163 JOIN message_flags f ON m.id = f.message_id
1164 WHERE m.mailbox_id = $1 AND f.flag_seen = FALSE
1165 "#,
1166 )
1167 .bind(*mailbox_id.as_uuid())
1168 .fetch_all(&self.pool)
1169 .await?;
1170
1171 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1172 }
1173
1174 async fn search_seen(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1175 let rows = sqlx::query(
1176 r#"
1177 SELECT m.id FROM messages m
1178 JOIN message_flags f ON m.id = f.message_id
1179 WHERE m.mailbox_id = $1 AND f.flag_seen = TRUE
1180 "#,
1181 )
1182 .bind(*mailbox_id.as_uuid())
1183 .fetch_all(&self.pool)
1184 .await?;
1185
1186 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1187 }
1188
1189 async fn search_flagged(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1190 let rows = sqlx::query(
1191 r#"
1192 SELECT m.id FROM messages m
1193 JOIN message_flags f ON m.id = f.message_id
1194 WHERE m.mailbox_id = $1 AND f.flag_flagged = TRUE
1195 "#,
1196 )
1197 .bind(*mailbox_id.as_uuid())
1198 .fetch_all(&self.pool)
1199 .await?;
1200
1201 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1202 }
1203
1204 async fn search_unflagged(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1205 let rows = sqlx::query(
1206 r#"
1207 SELECT m.id FROM messages m
1208 JOIN message_flags f ON m.id = f.message_id
1209 WHERE m.mailbox_id = $1 AND f.flag_flagged = FALSE
1210 "#,
1211 )
1212 .bind(*mailbox_id.as_uuid())
1213 .fetch_all(&self.pool)
1214 .await?;
1215
1216 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1217 }
1218
1219 async fn search_deleted(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1220 let rows = sqlx::query(
1221 r#"
1222 SELECT m.id FROM messages m
1223 JOIN message_flags f ON m.id = f.message_id
1224 WHERE m.mailbox_id = $1 AND f.flag_deleted = TRUE
1225 "#,
1226 )
1227 .bind(*mailbox_id.as_uuid())
1228 .fetch_all(&self.pool)
1229 .await?;
1230
1231 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1232 }
1233
1234 async fn search_undeleted(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1235 let rows = sqlx::query(
1236 r#"
1237 SELECT m.id FROM messages m
1238 JOIN message_flags f ON m.id = f.message_id
1239 WHERE m.mailbox_id = $1 AND f.flag_deleted = FALSE
1240 "#,
1241 )
1242 .bind(*mailbox_id.as_uuid())
1243 .fetch_all(&self.pool)
1244 .await?;
1245
1246 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1247 }
1248
1249 async fn search_from(
1250 &self,
1251 mailbox_id: &MailboxId,
1252 email: &str,
1253 ) -> anyhow::Result<Vec<MessageId>> {
1254 let rows = sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1 AND sender ILIKE $2")
1255 .bind(*mailbox_id.as_uuid())
1256 .bind(format!("%{}%", email))
1257 .fetch_all(&self.pool)
1258 .await?;
1259
1260 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1261 }
1262
1263 async fn search_to(
1264 &self,
1265 mailbox_id: &MailboxId,
1266 email: &str,
1267 ) -> anyhow::Result<Vec<MessageId>> {
1268 let rows =
1269 sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1 AND $2 = ANY(recipients)")
1270 .bind(*mailbox_id.as_uuid())
1271 .bind(email)
1272 .fetch_all(&self.pool)
1273 .await?;
1274
1275 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1276 }
1277
1278 async fn search_subject(
1279 &self,
1280 mailbox_id: &MailboxId,
1281 text: &str,
1282 ) -> anyhow::Result<Vec<MessageId>> {
1283 let rows = sqlx::query(
1284 r#"
1285 SELECT id FROM messages
1286 WHERE mailbox_id = $1 AND search_vector @@ plainto_tsquery('english', $2)
1287 "#,
1288 )
1289 .bind(*mailbox_id.as_uuid())
1290 .bind(text)
1291 .fetch_all(&self.pool)
1292 .await?;
1293
1294 Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1295 }
1296
1297 async fn search_body(
1298 &self,
1299 mailbox_id: &MailboxId,
1300 text: &str,
1301 ) -> anyhow::Result<Vec<MessageId>> {
1302 self.search_subject(mailbox_id, text).await
1304 }
1305
1306 async fn search_and(
1307 &self,
1308 mailbox_id: &MailboxId,
1309 criteria_list: Vec<SearchCriteria>,
1310 ) -> anyhow::Result<Vec<MessageId>> {
1311 if criteria_list.is_empty() {
1312 return Ok(vec![]);
1313 }
1314
1315 let mut result_sets: Vec<Vec<MessageId>> = Vec::new();
1316 for criteria in criteria_list {
1317 let results = self.search(mailbox_id, criteria).await?;
1318 result_sets.push(results);
1319 }
1320
1321 if result_sets.is_empty() {
1323 return Ok(vec![]);
1324 }
1325
1326 let mut intersection = result_sets[0].clone();
1327 for result_set in result_sets.iter().skip(1) {
1328 intersection.retain(|id| result_set.contains(id));
1329 }
1330
1331 Ok(intersection)
1332 }
1333
1334 async fn search_or(
1335 &self,
1336 mailbox_id: &MailboxId,
1337 criteria_list: Vec<SearchCriteria>,
1338 ) -> anyhow::Result<Vec<MessageId>> {
1339 let mut all_results = Vec::new();
1340 for criteria in criteria_list {
1341 let results = self.search(mailbox_id, criteria).await?;
1342 all_results.extend(results);
1343 }
1344
1345 all_results.sort_by_key(|id| format!("{}", id));
1347 all_results.dedup();
1348
1349 Ok(all_results)
1350 }
1351
1352 async fn search_not(
1353 &self,
1354 mailbox_id: &MailboxId,
1355 criteria: SearchCriteria,
1356 ) -> anyhow::Result<Vec<MessageId>> {
1357 let all_messages = self.search_all(mailbox_id).await?;
1358 let excluded = self.search(mailbox_id, criteria).await?;
1359
1360 let result: Vec<MessageId> = all_messages
1361 .into_iter()
1362 .filter(|id| !excluded.contains(id))
1363 .collect();
1364
1365 Ok(result)
1366 }
1367}
1368
1369struct PostgresMetadataStore {
1371 pool: PgPool,
1372}
1373
1374#[async_trait]
1375impl MetadataStore for PostgresMetadataStore {
1376 async fn get_user_quota(&self, user: &Username) -> anyhow::Result<Quota> {
1377 let row = sqlx::query("SELECT used, quota_limit FROM user_quotas WHERE username = $1")
1378 .bind(user.to_string())
1379 .fetch_optional(&self.pool)
1380 .await
1381 .map_err(|e| anyhow::anyhow!("Failed to get quota: {}", e))?;
1382
1383 match row {
1384 Some(r) => {
1385 let used: i64 = r.try_get("used")?;
1386 let limit: i64 = r.try_get("quota_limit")?;
1387 Ok(Quota::new(used as u64, limit as u64))
1388 }
1389 None => Ok(Quota::new(0, 1024 * 1024 * 1024)), }
1391 }
1392
1393 async fn set_user_quota(&self, user: &Username, quota: Quota) -> anyhow::Result<()> {
1394 sqlx::query(
1395 r#"
1396 INSERT INTO user_quotas (username, used, quota_limit)
1397 VALUES ($1, $2, $3)
1398 ON CONFLICT (username) DO UPDATE
1399 SET used = $2, quota_limit = $3, updated_at = NOW()
1400 "#,
1401 )
1402 .bind(user.to_string())
1403 .bind(quota.used as i64)
1404 .bind(quota.limit as i64)
1405 .execute(&self.pool)
1406 .await
1407 .map_err(|e| anyhow::anyhow!("Failed to set quota: {}", e))?;
1408
1409 Ok(())
1410 }
1411
1412 async fn get_mailbox_counters(
1413 &self,
1414 mailbox_id: &MailboxId,
1415 ) -> anyhow::Result<MailboxCounters> {
1416 let row = sqlx::query(
1417 r#"
1418 SELECT
1419 COUNT(*) as total,
1420 COUNT(*) FILTER (WHERE f.flag_recent = TRUE) as recent,
1421 COUNT(*) FILTER (WHERE f.flag_seen = FALSE) as unseen
1422 FROM messages m
1423 LEFT JOIN message_flags f ON m.id = f.message_id
1424 WHERE m.mailbox_id = $1
1425 "#,
1426 )
1427 .bind(*mailbox_id.as_uuid())
1428 .fetch_one(&self.pool)
1429 .await
1430 .map_err(|e| anyhow::anyhow!("Failed to get counters: {}", e))?;
1431
1432 let total: i64 = row.try_get("total")?;
1433 let recent: i64 = row.try_get("recent")?;
1434 let unseen: i64 = row.try_get("unseen")?;
1435
1436 Ok(MailboxCounters {
1437 exists: total as u32,
1438 recent: recent as u32,
1439 unseen: unseen as u32,
1440 })
1441 }
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446 use super::*;
1447
1448 #[test]
1449 fn test_postgres_config_default() {
1450 let config = PostgresConfig::default();
1451 assert_eq!(config.max_connections, 20);
1452 assert_eq!(config.min_connections, 5);
1453 assert_eq!(config.inline_threshold, 100 * 1024);
1454 }
1455
1456 #[test]
1457 fn test_postgres_backend_struct() {
1458 let _ = std::mem::size_of::<PostgresBackend>();
1459 }
1460
1461 #[test]
1462 fn test_search_criteria_all() {
1463 let criteria = SearchCriteria::All;
1464 assert!(matches!(criteria, SearchCriteria::All));
1465 }
1466
1467 #[test]
1468 fn test_search_criteria_unseen() {
1469 let criteria = SearchCriteria::Unseen;
1470 assert!(matches!(criteria, SearchCriteria::Unseen));
1471 }
1472
1473 #[test]
1474 fn test_search_criteria_from() {
1475 let criteria = SearchCriteria::From("test@example.com".to_string());
1476 assert!(matches!(criteria, SearchCriteria::From(_)));
1477 }
1478
1479 #[test]
1480 fn test_search_criteria_subject() {
1481 let criteria = SearchCriteria::Subject("test subject".to_string());
1482 assert!(matches!(criteria, SearchCriteria::Subject(_)));
1483 }
1484
1485 #[test]
1486 fn test_message_flags_default() {
1487 let flags = MessageFlags::new();
1488 assert!(!flags.is_seen());
1489 assert!(!flags.is_answered());
1490 assert!(!flags.is_flagged());
1491 assert!(!flags.is_deleted());
1492 assert!(!flags.is_draft());
1493 }
1494
1495 #[test]
1496 fn test_message_flags_setters() {
1497 let mut flags = MessageFlags::new();
1498 flags.set_seen(true);
1499 flags.set_answered(true);
1500 flags.set_flagged(true);
1501
1502 assert!(flags.is_seen());
1503 assert!(flags.is_answered());
1504 assert!(flags.is_flagged());
1505 }
1506
1507 #[test]
1508 fn test_quota_new() {
1509 let quota = Quota::new(1024, 2048);
1510 assert_eq!(quota.used, 1024);
1511 assert_eq!(quota.limit, 2048);
1512 }
1513
1514 #[test]
1515 fn test_quota_exceeded() {
1516 let quota = Quota::new(2048, 1024);
1517 assert!(quota.is_exceeded());
1518
1519 let quota_ok = Quota::new(512, 1024);
1520 assert!(!quota_ok.is_exceeded());
1521 }
1522
1523 #[test]
1524 fn test_quota_remaining() {
1525 let quota = Quota::new(256, 1024);
1526 assert_eq!(quota.remaining(), 768);
1527 }
1528
1529 #[test]
1530 fn test_mailbox_counters_default() {
1531 let counters = MailboxCounters::default();
1532 assert_eq!(counters.exists, 0);
1533 assert_eq!(counters.recent, 0);
1534 assert_eq!(counters.unseen, 0);
1535 }
1536
1537 #[test]
1538 fn test_mailbox_id_new() {
1539 let id1 = MailboxId::new();
1540 let id2 = MailboxId::new();
1541 assert_ne!(id1, id2);
1542 }
1543
1544 #[test]
1545 fn test_mailbox_id_display() {
1546 let id = MailboxId::new();
1547 let display = format!("{}", id);
1548 assert!(!display.is_empty());
1549 }
1550
1551 #[test]
1552 fn test_mailbox_path_creation() {
1553 let user = Username::new("test@example.com".to_string()).unwrap();
1554 let path = MailboxPath::new(user.clone(), vec!["INBOX".to_string()]);
1555 assert_eq!(path.user(), &user);
1556 assert_eq!(path.path().len(), 1);
1557 }
1558
1559 #[test]
1560 fn test_mailbox_path_name() {
1561 let user = Username::new("test@example.com".to_string()).unwrap();
1562 let path = MailboxPath::new(user, vec!["INBOX".to_string(), "Sent".to_string()]);
1563 assert_eq!(path.name(), Some("Sent"));
1564 }
1565
1566 #[test]
1567 fn test_mailbox_new() {
1568 let user = Username::new("test@example.com".to_string()).unwrap();
1569 let path = MailboxPath::new(user, vec!["INBOX".to_string()]);
1570 let mailbox = Mailbox::new(path);
1571
1572 assert_eq!(mailbox.uid_validity(), 1);
1573 assert_eq!(mailbox.uid_next(), 1);
1574 assert!(mailbox.special_use().is_none());
1575 }
1576
1577 #[test]
1578 fn test_mailbox_special_use() {
1579 let user = Username::new("test@example.com".to_string()).unwrap();
1580 let path = MailboxPath::new(user, vec!["Sent".to_string()]);
1581 let mut mailbox = Mailbox::new(path);
1582
1583 mailbox.set_special_use(Some("\\Sent".to_string()));
1584 assert_eq!(mailbox.special_use(), Some("\\Sent"));
1585 }
1586
1587 #[test]
1588 fn test_message_metadata_new() {
1589 let msg_id = MessageId::new();
1590 let mailbox_id = MailboxId::new();
1591 let flags = MessageFlags::new();
1592
1593 let metadata = MessageMetadata::new(msg_id, mailbox_id, 1, flags, 1024);
1594
1595 assert_eq!(metadata.message_id(), &msg_id);
1596 assert_eq!(metadata.mailbox_id(), &mailbox_id);
1597 assert_eq!(metadata.uid(), 1);
1598 assert_eq!(metadata.size(), 1024);
1599 }
1600
1601 #[test]
1602 fn test_message_metadata_getters() {
1603 let msg_id = MessageId::new();
1604 let mailbox_id = MailboxId::new();
1605 let metadata = MessageMetadata::new(msg_id, mailbox_id, 42, MessageFlags::new(), 2048);
1606
1607 assert_eq!(*metadata.message_id(), msg_id);
1608 assert_eq!(*metadata.mailbox_id(), mailbox_id);
1609 assert_eq!(metadata.uid(), 42);
1610 assert_eq!(metadata.size(), 2048);
1611 }
1612
1613 #[test]
1614 fn test_search_criteria_and() {
1615 let criteria = SearchCriteria::And(vec![
1616 SearchCriteria::Unseen,
1617 SearchCriteria::From("test@example.com".to_string()),
1618 ]);
1619 assert!(matches!(criteria, SearchCriteria::And(_)));
1620 }
1621
1622 #[test]
1623 fn test_search_criteria_or() {
1624 let criteria = SearchCriteria::Or(vec![SearchCriteria::Flagged, SearchCriteria::Deleted]);
1625 assert!(matches!(criteria, SearchCriteria::Or(_)));
1626 }
1627
1628 #[test]
1629 fn test_search_criteria_not() {
1630 let criteria = SearchCriteria::Not(Box::new(SearchCriteria::Seen));
1631 assert!(matches!(criteria, SearchCriteria::Not(_)));
1632 }
1633
1634 #[test]
1635 fn test_mailbox_counters_struct() {
1636 let counters = MailboxCounters {
1637 exists: 10,
1638 recent: 3,
1639 unseen: 5,
1640 };
1641 assert_eq!(counters.exists, 10);
1642 assert_eq!(counters.recent, 3);
1643 assert_eq!(counters.unseen, 5);
1644 }
1645
1646 #[test]
1647 fn test_special_use_attributes_new() {
1648 let attrs = SpecialUseAttributes::new();
1649 assert!(attrs.is_empty());
1650 }
1651
1652 #[test]
1653 fn test_special_use_attributes_single() {
1654 let attrs = SpecialUseAttributes::single("\\Drafts".to_string());
1655 assert!(!attrs.is_empty());
1656 assert!(attrs.has_attribute("\\Drafts"));
1657 }
1658
1659 #[test]
1660 fn test_special_use_attributes_from_vec() {
1661 let vec = vec!["\\Drafts".to_string(), "\\Sent".to_string()];
1662 let attrs = SpecialUseAttributes::from_vec(vec);
1663 assert_eq!(attrs.len(), 2);
1664 assert!(attrs.has_attribute("\\Drafts"));
1665 assert!(attrs.has_attribute("\\Sent"));
1666 }
1667
1668 #[test]
1669 fn test_message_flags_custom() {
1670 let mut flags = MessageFlags::new();
1671 flags.add_custom("CustomFlag".to_string());
1672 assert!(flags.custom().contains("CustomFlag"));
1673 }
1674
1675 #[test]
1676 fn test_message_flags_recent() {
1677 let mut flags = MessageFlags::new();
1678 flags.set_recent(true);
1679 assert!(flags.is_recent());
1680 }
1681
1682 #[test]
1683 fn test_postgres_config_custom() {
1684 let config = PostgresConfig {
1685 max_connections: 50,
1686 min_connections: 10,
1687 connect_timeout: Duration::from_secs(60),
1688 idle_timeout: Some(Duration::from_secs(300)),
1689 max_lifetime: Some(Duration::from_secs(3600)),
1690 inline_threshold: 200 * 1024,
1691 };
1692 assert_eq!(config.max_connections, 50);
1693 assert_eq!(config.inline_threshold, 200 * 1024);
1694 }
1695}