Skip to main content

rusmes_storage/backends/
postgres.rs

1//! Complete PostgreSQL storage backend implementation with connection pooling,
2//! full-text search, optimized queries, and transaction handling.
3
4use crate::traits::{MailboxStore, MessageStore, MetadataStore, StorageBackend};
5use crate::types::{
6    Mailbox, MailboxCounters, MailboxId, MailboxPath, MessageFlags, MessageMetadata, Quota,
7    SearchCriteria, SpecialUseAttributes,
8};
9use async_trait::async_trait;
10use rusmes_proto::{Mail, MailAddress, MessageId, Username};
11use sqlx::postgres::{PgPool, PgPoolOptions};
12use sqlx::{Executor, Row};
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17/// Configuration for PostgreSQL backend
18#[derive(Debug, Clone)]
19pub struct PostgresConfig {
20    /// Maximum number of connections in the pool
21    pub max_connections: u32,
22    /// Minimum number of idle connections
23    pub min_connections: u32,
24    /// Connection timeout
25    pub connect_timeout: Duration,
26    /// Idle connection timeout
27    pub idle_timeout: Option<Duration>,
28    /// Max connection lifetime
29    pub max_lifetime: Option<Duration>,
30    /// Message size threshold for inline storage (default: 100KB)
31    pub inline_threshold: usize,
32}
33
34impl Default for PostgresConfig {
35    fn default() -> Self {
36        Self {
37            max_connections: 20,
38            min_connections: 5,
39            connect_timeout: Duration::from_secs(30),
40            idle_timeout: Some(Duration::from_secs(600)),
41            max_lifetime: Some(Duration::from_secs(1800)),
42            inline_threshold: 100 * 1024, // 100KB
43        }
44    }
45}
46
47/// Complete PostgreSQL storage backend with connection pooling
48pub struct PostgresBackend {
49    pool: PgPool,
50    config: PostgresConfig,
51}
52
53impl PostgresBackend {
54    /// Create a new PostgreSQL backend with default configuration
55    pub async fn new(database_url: &str) -> anyhow::Result<Self> {
56        Self::with_config(database_url, PostgresConfig::default()).await
57    }
58
59    /// Create a new PostgreSQL backend with custom configuration
60    pub async fn with_config(database_url: &str, config: PostgresConfig) -> anyhow::Result<Self> {
61        let mut pool_options = PgPoolOptions::new()
62            .max_connections(config.max_connections)
63            .min_connections(config.min_connections)
64            .acquire_timeout(config.connect_timeout);
65
66        if let Some(idle_timeout) = config.idle_timeout {
67            pool_options = pool_options.idle_timeout(idle_timeout);
68        }
69
70        if let Some(max_lifetime) = config.max_lifetime {
71            pool_options = pool_options.max_lifetime(max_lifetime);
72        }
73
74        let pool = pool_options.connect(database_url).await?;
75
76        Ok(Self { pool, config })
77    }
78
79    /// Initialize database schema with migrations
80    pub async fn init_schema(&self) -> anyhow::Result<()> {
81        tracing::info!("Initializing PostgreSQL schema");
82
83        // Create migrations table
84        self.pool
85            .execute(
86                r#"
87                CREATE TABLE IF NOT EXISTS schema_migrations (
88                    version INTEGER PRIMARY KEY,
89                    applied_at TIMESTAMP NOT NULL DEFAULT NOW()
90                )
91                "#,
92            )
93            .await?;
94
95        // Apply migrations in order
96        self.apply_migration_1_mailboxes().await?;
97        self.apply_migration_2_messages().await?;
98        self.apply_migration_3_message_blobs().await?;
99        self.apply_migration_4_indexes().await?;
100        self.apply_migration_5_fulltext().await?;
101
102        tracing::info!("PostgreSQL schema initialization complete");
103        Ok(())
104    }
105
106    async fn apply_migration_1_mailboxes(&self) -> anyhow::Result<()> {
107        let version = 1;
108        if self.is_migration_applied(version).await? {
109            return Ok(());
110        }
111
112        tracing::info!("Applying migration {}: mailboxes and quotas", version);
113
114        // Mailboxes table
115        self.pool
116            .execute(
117                r#"
118                CREATE TABLE IF NOT EXISTS mailboxes (
119                    id UUID PRIMARY KEY,
120                    username TEXT NOT NULL,
121                    path TEXT NOT NULL,
122                    uid_validity INTEGER NOT NULL,
123                    uid_next INTEGER NOT NULL,
124                    special_use TEXT,
125                    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
126                    updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
127                    UNIQUE(username, path)
128                )
129                "#,
130            )
131            .await?;
132
133        // Subscriptions table
134        self.pool
135            .execute(
136                r#"
137                CREATE TABLE IF NOT EXISTS subscriptions (
138                    username TEXT NOT NULL,
139                    mailbox_name TEXT NOT NULL,
140                    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
141                    PRIMARY KEY(username, mailbox_name)
142                )
143                "#,
144            )
145            .await?;
146
147        // User quotas table
148        self.pool
149            .execute(
150                r#"
151                CREATE TABLE IF NOT EXISTS user_quotas (
152                    username TEXT PRIMARY KEY,
153                    used BIGINT NOT NULL DEFAULT 0,
154                    quota_limit BIGINT NOT NULL,
155                    updated_at TIMESTAMP NOT NULL DEFAULT NOW()
156                )
157                "#,
158            )
159            .await?;
160
161        self.mark_migration_applied(version).await?;
162        Ok(())
163    }
164
165    async fn apply_migration_2_messages(&self) -> anyhow::Result<()> {
166        let version = 2;
167        if self.is_migration_applied(version).await? {
168            return Ok(());
169        }
170
171        tracing::info!("Applying migration {}: messages and flags", version);
172
173        // Messages table with inline body storage
174        self.pool
175            .execute(
176                r#"
177                CREATE TABLE IF NOT EXISTS messages (
178                    id UUID PRIMARY KEY,
179                    mailbox_id UUID NOT NULL REFERENCES mailboxes(id) ON DELETE CASCADE,
180                    uid INTEGER NOT NULL,
181                    sender TEXT,
182                    recipients TEXT[] NOT NULL,
183                    subject TEXT,
184                    headers JSONB NOT NULL,
185                    body_inline BYTEA,
186                    body_external_ref UUID,
187                    size INTEGER NOT NULL,
188                    search_vector TSVECTOR,
189                    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
190                    UNIQUE(mailbox_id, uid)
191                )
192                "#,
193            )
194            .await?;
195
196        // Message flags table
197        self.pool
198            .execute(
199                r#"
200                CREATE TABLE IF NOT EXISTS message_flags (
201                    message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
202                    flag_seen BOOLEAN NOT NULL DEFAULT FALSE,
203                    flag_answered BOOLEAN NOT NULL DEFAULT FALSE,
204                    flag_flagged BOOLEAN NOT NULL DEFAULT FALSE,
205                    flag_deleted BOOLEAN NOT NULL DEFAULT FALSE,
206                    flag_draft BOOLEAN NOT NULL DEFAULT FALSE,
207                    flag_recent BOOLEAN NOT NULL DEFAULT FALSE,
208                    custom_flags TEXT[] NOT NULL DEFAULT '{}',
209                    PRIMARY KEY(message_id)
210                )
211                "#,
212            )
213            .await?;
214
215        self.mark_migration_applied(version).await?;
216        Ok(())
217    }
218
219    async fn apply_migration_3_message_blobs(&self) -> anyhow::Result<()> {
220        let version = 3;
221        if self.is_migration_applied(version).await? {
222            return Ok(());
223        }
224
225        tracing::info!("Applying migration {}: message blobs", version);
226
227        // Message blobs table for large messages
228        self.pool
229            .execute(
230                r#"
231                CREATE TABLE IF NOT EXISTS message_blobs (
232                    id UUID PRIMARY KEY,
233                    data BYTEA NOT NULL,
234                    created_at TIMESTAMP NOT NULL DEFAULT NOW()
235                )
236                "#,
237            )
238            .await?;
239
240        self.mark_migration_applied(version).await?;
241        Ok(())
242    }
243
244    async fn apply_migration_4_indexes(&self) -> anyhow::Result<()> {
245        let version = 4;
246        if self.is_migration_applied(version).await? {
247            return Ok(());
248        }
249
250        tracing::info!("Applying migration {}: indexes", version);
251
252        // Mailboxes indexes
253        self.pool
254            .execute("CREATE INDEX IF NOT EXISTS idx_mailboxes_username ON mailboxes(username)")
255            .await?;
256        self.pool
257            .execute("CREATE INDEX IF NOT EXISTS idx_mailboxes_path ON mailboxes(path)")
258            .await?;
259
260        // Messages indexes for IMAP operations
261        self.pool
262            .execute("CREATE INDEX IF NOT EXISTS idx_messages_mailbox ON messages(mailbox_id)")
263            .await?;
264        self.pool
265            .execute(
266                "CREATE INDEX IF NOT EXISTS idx_messages_mailbox_uid ON messages(mailbox_id, uid)",
267            )
268            .await?;
269        self.pool
270            .execute("CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages(sender)")
271            .await?;
272        self.pool
273            .execute("CREATE INDEX IF NOT EXISTS idx_messages_created ON messages(created_at)")
274            .await?;
275
276        // Message flags indexes for search
277        self.pool
278            .execute("CREATE INDEX IF NOT EXISTS idx_flags_message ON message_flags(message_id)")
279            .await?;
280        self.pool
281            .execute("CREATE INDEX IF NOT EXISTS idx_flags_seen ON message_flags(message_id) WHERE flag_seen = FALSE")
282            .await?;
283        self.pool
284            .execute("CREATE INDEX IF NOT EXISTS idx_flags_recent ON message_flags(message_id) WHERE flag_recent = TRUE")
285            .await?;
286        self.pool
287            .execute("CREATE INDEX IF NOT EXISTS idx_flags_deleted ON message_flags(message_id) WHERE flag_deleted = TRUE")
288            .await?;
289
290        self.mark_migration_applied(version).await?;
291        Ok(())
292    }
293
294    async fn apply_migration_5_fulltext(&self) -> anyhow::Result<()> {
295        let version = 5;
296        if self.is_migration_applied(version).await? {
297            return Ok(());
298        }
299
300        tracing::info!("Applying migration {}: full-text search", version);
301
302        // Full-text search index
303        self.pool
304            .execute("CREATE INDEX IF NOT EXISTS idx_messages_search ON messages USING GIN(search_vector)")
305            .await?;
306
307        // Create trigger function for updating search_vector
308        self.pool.execute(
309            r#"
310            CREATE OR REPLACE FUNCTION messages_search_vector_update() RETURNS trigger AS $$
311            BEGIN
312                NEW.search_vector :=
313                    setweight(to_tsvector('english', COALESCE(NEW.subject, '')), 'A') ||
314                    setweight(to_tsvector('english', COALESCE(NEW.sender, '')), 'B') ||
315                    setweight(to_tsvector('english', COALESCE(array_to_string(NEW.recipients, ' '), '')), 'C') ||
316                    setweight(to_tsvector('english', COALESCE(NEW.headers::text, '')), 'D');
317                RETURN NEW;
318            END
319            $$ LANGUAGE plpgsql
320            "#,
321        ).await?;
322
323        // Create trigger
324        self.pool
325            .execute(
326                r#"
327                DROP TRIGGER IF EXISTS messages_search_vector_trigger ON messages;
328                CREATE TRIGGER messages_search_vector_trigger
329                BEFORE INSERT OR UPDATE ON messages
330                FOR EACH ROW EXECUTE FUNCTION messages_search_vector_update()
331                "#,
332            )
333            .await?;
334
335        self.mark_migration_applied(version).await?;
336        Ok(())
337    }
338
339    async fn is_migration_applied(&self, version: i32) -> anyhow::Result<bool> {
340        let result = sqlx::query("SELECT version FROM schema_migrations WHERE version = $1")
341            .bind(version)
342            .fetch_optional(&self.pool)
343            .await?;
344        Ok(result.is_some())
345    }
346
347    async fn mark_migration_applied(&self, version: i32) -> anyhow::Result<()> {
348        sqlx::query("INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING")
349            .bind(version)
350            .execute(&self.pool)
351            .await?;
352        Ok(())
353    }
354
355    /// Run VACUUM maintenance on all tables
356    pub async fn vacuum(&self) -> anyhow::Result<()> {
357        tracing::info!("Running VACUUM on PostgreSQL database");
358        self.pool.execute("VACUUM ANALYZE").await?;
359        Ok(())
360    }
361
362    /// Run REINDEX on all tables
363    pub async fn reindex(&self) -> anyhow::Result<()> {
364        tracing::info!("Running REINDEX on PostgreSQL database");
365        self.pool.execute("REINDEX DATABASE CONCURRENTLY").await?;
366        Ok(())
367    }
368
369    /// Get pool statistics
370    pub fn pool_size(&self) -> u32 {
371        self.pool.size()
372    }
373
374    /// Get idle connections count
375    pub fn idle_connections(&self) -> usize {
376        self.pool.num_idle()
377    }
378
379    /// Get pool reference
380    pub fn pool(&self) -> &PgPool {
381        &self.pool
382    }
383}
384
385impl StorageBackend for PostgresBackend {
386    fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
387        Arc::new(PostgresMailboxStore {
388            pool: self.pool.clone(),
389        })
390    }
391
392    fn message_store(&self) -> Arc<dyn MessageStore> {
393        Arc::new(PostgresMessageStore {
394            pool: self.pool.clone(),
395            inline_threshold: self.config.inline_threshold,
396        })
397    }
398
399    fn metadata_store(&self) -> Arc<dyn MetadataStore> {
400        Arc::new(PostgresMetadataStore {
401            pool: self.pool.clone(),
402        })
403    }
404}
405
406/// PostgreSQL mailbox store implementation
407struct PostgresMailboxStore {
408    pool: PgPool,
409}
410
411#[async_trait]
412impl MailboxStore for PostgresMailboxStore {
413    async fn create_mailbox(&self, path: &MailboxPath) -> anyhow::Result<MailboxId> {
414        let mailbox = Mailbox::new(path.clone());
415        let id = *mailbox.id();
416
417        sqlx::query(
418            r#"
419            INSERT INTO mailboxes (id, username, path, uid_validity, uid_next, special_use)
420            VALUES ($1, $2, $3, $4, $5, $6)
421            "#,
422        )
423        .bind(*id.as_uuid())
424        .bind(path.user().to_string())
425        .bind(path.path().join("/"))
426        .bind(mailbox.uid_validity() as i32)
427        .bind(mailbox.uid_next() as i32)
428        .bind(mailbox.special_use())
429        .execute(&self.pool)
430        .await
431        .map_err(|e| anyhow::anyhow!("Failed to create mailbox: {}", e))?;
432
433        tracing::debug!("Created mailbox {} at path {}", id, path);
434        Ok(id)
435    }
436
437    async fn create_mailbox_with_special_use(
438        &self,
439        path: &MailboxPath,
440        special_use: SpecialUseAttributes,
441    ) -> anyhow::Result<MailboxId> {
442        let mailbox = Mailbox::new(path.clone());
443        let id = *mailbox.id();
444        let special_use_str = special_use.to_string();
445
446        sqlx::query(
447            r#"
448            INSERT INTO mailboxes (id, username, path, uid_validity, uid_next, special_use)
449            VALUES ($1, $2, $3, $4, $5, $6)
450            "#,
451        )
452        .bind(*id.as_uuid())
453        .bind(path.user().to_string())
454        .bind(path.path().join("/"))
455        .bind(mailbox.uid_validity() as i32)
456        .bind(mailbox.uid_next() as i32)
457        .bind(special_use_str)
458        .execute(&self.pool)
459        .await
460        .map_err(|e| anyhow::anyhow!("Failed to create mailbox with special use: {}", e))?;
461
462        tracing::debug!("Created mailbox {} with special use at path {}", id, path);
463        Ok(id)
464    }
465
466    async fn delete_mailbox(&self, id: &MailboxId) -> anyhow::Result<()> {
467        let result = sqlx::query("DELETE FROM mailboxes WHERE id = $1")
468            .bind(*id.as_uuid())
469            .execute(&self.pool)
470            .await
471            .map_err(|e| anyhow::anyhow!("Failed to delete mailbox: {}", e))?;
472
473        if result.rows_affected() == 0 {
474            return Err(anyhow::anyhow!("Mailbox not found: {}", id));
475        }
476
477        tracing::debug!("Deleted mailbox {}", id);
478        Ok(())
479    }
480
481    async fn rename_mailbox(&self, id: &MailboxId, new_path: &MailboxPath) -> anyhow::Result<()> {
482        let result =
483            sqlx::query("UPDATE mailboxes SET path = $1, updated_at = NOW() WHERE id = $2")
484                .bind(new_path.path().join("/"))
485                .bind(*id.as_uuid())
486                .execute(&self.pool)
487                .await
488                .map_err(|e| anyhow::anyhow!("Failed to rename mailbox: {}", e))?;
489
490        if result.rows_affected() == 0 {
491            return Err(anyhow::anyhow!("Mailbox not found: {}", id));
492        }
493
494        tracing::debug!("Renamed mailbox {} to {}", id, new_path);
495        Ok(())
496    }
497
498    async fn get_mailbox(&self, id: &MailboxId) -> anyhow::Result<Option<Mailbox>> {
499        let row = sqlx::query(
500            "SELECT id, username, path, uid_validity, uid_next, special_use FROM mailboxes WHERE id = $1"
501        )
502        .bind(*id.as_uuid())
503        .fetch_optional(&self.pool)
504        .await
505        .map_err(|e| anyhow::anyhow!("Failed to get mailbox: {}", e))?;
506
507        let row = match row {
508            Some(r) => r,
509            None => return Ok(None),
510        };
511
512        let mailbox = self.row_to_mailbox(row)?;
513        Ok(Some(mailbox))
514    }
515
516    async fn list_mailboxes(&self, user: &Username) -> anyhow::Result<Vec<Mailbox>> {
517        let rows = sqlx::query(
518            "SELECT id, username, path, uid_validity, uid_next, special_use FROM mailboxes WHERE username = $1 ORDER BY path"
519        )
520        .bind(user.to_string())
521        .fetch_all(&self.pool)
522        .await
523        .map_err(|e| anyhow::anyhow!("Failed to list mailboxes: {}", e))?;
524
525        let mailboxes = rows
526            .into_iter()
527            .filter_map(|row| self.row_to_mailbox(row).ok())
528            .collect();
529
530        Ok(mailboxes)
531    }
532
533    async fn get_user_inbox(&self, user: &Username) -> anyhow::Result<Option<MailboxId>> {
534        let row =
535            sqlx::query("SELECT id FROM mailboxes WHERE username = $1 AND path = 'INBOX' LIMIT 1")
536                .bind(user.to_string())
537                .fetch_optional(&self.pool)
538                .await
539                .map_err(|e| anyhow::anyhow!("Failed to get user inbox: {}", e))?;
540
541        if let Some(row) = row {
542            let uuid: uuid::Uuid = row.get(0);
543            Ok(Some(MailboxId::from_uuid(uuid)))
544        } else {
545            Ok(None)
546        }
547    }
548
549    async fn get_mailbox_special_use(
550        &self,
551        id: &MailboxId,
552    ) -> anyhow::Result<SpecialUseAttributes> {
553        let row = sqlx::query("SELECT special_use FROM mailboxes WHERE id = $1")
554            .bind(*id.as_uuid())
555            .fetch_optional(&self.pool)
556            .await
557            .map_err(|e| anyhow::anyhow!("Failed to get special use: {}", e))?;
558
559        match row {
560            Some(r) => {
561                let special_use: Option<String> = r.try_get("special_use")?;
562                match special_use {
563                    Some(s) if !s.is_empty() => {
564                        let attrs = s
565                            .split_whitespace()
566                            .map(|s| s.to_string())
567                            .collect::<Vec<_>>();
568                        Ok(SpecialUseAttributes::from_vec(attrs))
569                    }
570                    _ => Ok(SpecialUseAttributes::new()),
571                }
572            }
573            None => Ok(SpecialUseAttributes::new()),
574        }
575    }
576
577    async fn set_mailbox_special_use(
578        &self,
579        id: &MailboxId,
580        special_use: SpecialUseAttributes,
581    ) -> anyhow::Result<()> {
582        let special_use_str = special_use.to_string();
583
584        sqlx::query("UPDATE mailboxes SET special_use = $1, updated_at = NOW() WHERE id = $2")
585            .bind(special_use_str)
586            .bind(*id.as_uuid())
587            .execute(&self.pool)
588            .await
589            .map_err(|e| anyhow::anyhow!("Failed to set special use: {}", e))?;
590
591        Ok(())
592    }
593
594    async fn subscribe_mailbox(&self, user: &Username, mailbox_name: String) -> anyhow::Result<()> {
595        sqlx::query(
596            "INSERT INTO subscriptions (username, mailbox_name) VALUES ($1, $2) ON CONFLICT DO NOTHING"
597        )
598        .bind(user.to_string())
599        .bind(mailbox_name)
600        .execute(&self.pool)
601        .await
602        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
603
604        Ok(())
605    }
606
607    async fn unsubscribe_mailbox(&self, user: &Username, mailbox_name: &str) -> anyhow::Result<()> {
608        sqlx::query("DELETE FROM subscriptions WHERE username = $1 AND mailbox_name = $2")
609            .bind(user.to_string())
610            .bind(mailbox_name)
611            .execute(&self.pool)
612            .await
613            .map_err(|e| anyhow::anyhow!("Failed to unsubscribe: {}", e))?;
614
615        Ok(())
616    }
617
618    async fn list_subscriptions(&self, user: &Username) -> anyhow::Result<Vec<String>> {
619        let rows = sqlx::query("SELECT mailbox_name FROM subscriptions WHERE username = $1")
620            .bind(user.to_string())
621            .fetch_all(&self.pool)
622            .await
623            .map_err(|e| anyhow::anyhow!("Failed to list subscriptions: {}", e))?;
624
625        let subscriptions = rows
626            .into_iter()
627            .filter_map(|row| row.try_get("mailbox_name").ok())
628            .collect();
629
630        Ok(subscriptions)
631    }
632}
633
634impl PostgresMailboxStore {
635    fn row_to_mailbox(&self, row: sqlx::postgres::PgRow) -> anyhow::Result<Mailbox> {
636        let username: String = row.try_get("username")?;
637        let path_str: String = row.try_get("path")?;
638        let path_parts: Vec<String> = path_str.split('/').map(|s| s.to_string()).collect();
639        let username_obj =
640            Username::new(username).map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
641        let path = MailboxPath::new(username_obj, path_parts);
642
643        let mut mailbox = Mailbox::new(path);
644        let special_use: Option<String> = row.try_get("special_use")?;
645        mailbox.set_special_use(special_use);
646
647        Ok(mailbox)
648    }
649}
650
651/// PostgreSQL message store implementation
652struct PostgresMessageStore {
653    pool: PgPool,
654    inline_threshold: usize,
655}
656
657#[async_trait]
658impl MessageStore for PostgresMessageStore {
659    async fn append_message(
660        &self,
661        mailbox_id: &MailboxId,
662        message: Mail,
663    ) -> anyhow::Result<MessageMetadata> {
664        let mut tx = self.pool.begin().await?;
665
666        // Get next UID for mailbox (with row-level lock)
667        let uid_row = sqlx::query("SELECT uid_next FROM mailboxes WHERE id = $1 FOR UPDATE")
668            .bind(*mailbox_id.as_uuid())
669            .fetch_one(&mut *tx)
670            .await
671            .map_err(|e| anyhow::anyhow!("Failed to get next UID: {}", e))?;
672        let uid: i32 = uid_row.try_get("uid_next")?;
673
674        // Extract message data
675        let message_id = *message.message_id();
676        let sender = message.sender().map(|s| s.to_string());
677        let recipients: Vec<String> = message.recipients().iter().map(|r| r.to_string()).collect();
678        let message_size = message.size();
679
680        // Extract subject from headers
681        let subject = message
682            .message()
683            .headers()
684            .get_first("subject")
685            .map(|s| s.to_string());
686
687        // Serialize headers to JSON
688        let mut headers_map = serde_json::Map::new();
689        for (name, values) in message.message().headers().iter() {
690            headers_map.insert(name.clone(), serde_json::json!(values));
691        }
692        let headers_json = serde_json::Value::Object(headers_map);
693
694        // Store message body (inline or external based on size)
695        let (body_inline, body_external_ref) = if message_size < self.inline_threshold {
696            // Store inline
697            let body_bytes = match message.message().body() {
698                rusmes_proto::MessageBody::Small(bytes) => bytes.to_vec(),
699                _ => vec![],
700            };
701            (Some(body_bytes), None)
702        } else {
703            // Store externally
704            let blob_id = uuid::Uuid::new_v4();
705            let body_bytes = match message.message().body() {
706                rusmes_proto::MessageBody::Small(bytes) => bytes.to_vec(),
707                _ => vec![],
708            };
709
710            sqlx::query("INSERT INTO message_blobs (id, data) VALUES ($1, $2)")
711                .bind(blob_id)
712                .bind(&body_bytes)
713                .execute(&mut *tx)
714                .await
715                .map_err(|e| anyhow::anyhow!("Failed to store message blob: {}", e))?;
716
717            (None, Some(blob_id))
718        };
719
720        // Insert message
721        sqlx::query(
722            r#"
723            INSERT INTO messages (id, mailbox_id, uid, sender, recipients, subject, headers, body_inline, body_external_ref, size)
724            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
725            "#,
726        )
727        .bind(*message_id.as_uuid())
728        .bind(*mailbox_id.as_uuid())
729        .bind(uid)
730        .bind(&sender)
731        .bind(&recipients)
732        .bind(&subject)
733        .bind(&headers_json)
734        .bind(body_inline)
735        .bind(body_external_ref)
736        .bind(message_size as i32)
737        .execute(&mut *tx)
738        .await
739        .map_err(|e| anyhow::anyhow!("Failed to insert message: {}", e))?;
740
741        // Insert initial flags (mark as recent)
742        sqlx::query("INSERT INTO message_flags (message_id, flag_recent) VALUES ($1, TRUE)")
743            .bind(*message_id.as_uuid())
744            .execute(&mut *tx)
745            .await
746            .map_err(|e| anyhow::anyhow!("Failed to insert flags: {}", e))?;
747
748        // Update mailbox uid_next and quota
749        sqlx::query("UPDATE mailboxes SET uid_next = $1, updated_at = NOW() WHERE id = $2")
750            .bind(uid + 1)
751            .bind(*mailbox_id.as_uuid())
752            .execute(&mut *tx)
753            .await
754            .map_err(|e| anyhow::anyhow!("Failed to update mailbox: {}", e))?;
755
756        // Update user quota
757        let mailbox_row = sqlx::query("SELECT username FROM mailboxes WHERE id = $1")
758            .bind(*mailbox_id.as_uuid())
759            .fetch_one(&mut *tx)
760            .await
761            .map_err(|e| anyhow::anyhow!("Failed to get mailbox: {}", e))?;
762        let username: String = mailbox_row.try_get("username")?;
763
764        sqlx::query(
765            r#"
766            INSERT INTO user_quotas (username, used, quota_limit)
767            VALUES ($1, $2, 1073741824)
768            ON CONFLICT (username) DO UPDATE
769            SET used = user_quotas.used + $2, updated_at = NOW()
770            "#,
771        )
772        .bind(&username)
773        .bind(message_size as i64)
774        .execute(&mut *tx)
775        .await
776        .map_err(|e| anyhow::anyhow!("Failed to update quota: {}", e))?;
777
778        tx.commit().await?;
779
780        let mut flags = MessageFlags::new();
781        flags.set_recent(true);
782
783        let metadata =
784            MessageMetadata::new(message_id, *mailbox_id, uid as u32, flags, message_size);
785
786        tracing::debug!(
787            "Appended message {} to mailbox {} with UID {}",
788            message_id,
789            mailbox_id,
790            uid
791        );
792        Ok(metadata)
793    }
794
795    async fn get_message(&self, message_id: &MessageId) -> anyhow::Result<Option<Mail>> {
796        // Fetch message data from database
797        let row = sqlx::query(
798            r#"
799            SELECT m.sender, m.recipients, m.headers, m.body_inline, m.body_external_ref
800            FROM messages m
801            WHERE m.id = $1
802            "#,
803        )
804        .bind(*message_id.as_uuid())
805        .fetch_optional(&self.pool)
806        .await
807        .map_err(|e| anyhow::anyhow!("Failed to fetch message: {}", e))?;
808
809        let row = match row {
810            Some(r) => r,
811            None => return Ok(None),
812        };
813
814        // Extract fields
815        let sender: Option<String> = row.try_get("sender")?;
816        let recipients: Vec<String> = row.try_get("recipients")?;
817        let headers_json: serde_json::Value = row.try_get("headers")?;
818        let body_inline: Option<Vec<u8>> = row.try_get("body_inline")?;
819        let body_external_ref: Option<uuid::Uuid> = row.try_get("body_external_ref")?;
820
821        // Reconstruct headers
822        let mut headers = rusmes_proto::HeaderMap::new();
823        if let Some(headers_obj) = headers_json.as_object() {
824            for (name, value) in headers_obj {
825                if let Some(values_array) = value.as_array() {
826                    for v in values_array {
827                        if let Some(v_str) = v.as_str() {
828                            headers.insert(name.clone(), v_str.to_string());
829                        }
830                    }
831                }
832            }
833        }
834
835        // Reconstruct body
836        let body_bytes = if let Some(inline) = body_inline {
837            inline
838        } else if let Some(blob_id) = body_external_ref {
839            // Fetch from message_blobs table
840            let blob_row = sqlx::query("SELECT data FROM message_blobs WHERE id = $1")
841                .bind(blob_id)
842                .fetch_optional(&self.pool)
843                .await
844                .map_err(|e| anyhow::anyhow!("Failed to fetch message blob: {}", e))?;
845
846            if let Some(blob) = blob_row {
847                blob.try_get("data")?
848            } else {
849                tracing::warn!("Message blob {} not found", blob_id);
850                vec![]
851            }
852        } else {
853            vec![]
854        };
855
856        let body = rusmes_proto::MessageBody::Small(bytes::Bytes::from(body_bytes));
857        let mime_message = rusmes_proto::MimeMessage::new(headers, body);
858
859        // Parse sender and recipients
860        let sender_addr = if let Some(sender_str) = sender {
861            MailAddress::from_str(&sender_str).ok()
862        } else {
863            None
864        };
865
866        let recipient_addrs: Vec<MailAddress> = recipients
867            .into_iter()
868            .filter_map(|r| MailAddress::from_str(&r).ok())
869            .collect();
870
871        // Create Mail object
872        let mail = rusmes_proto::Mail::with_message_id(
873            sender_addr,
874            recipient_addrs,
875            mime_message,
876            None, // remote_addr not stored
877            None, // remote_host not stored
878            *message_id,
879        );
880
881        Ok(Some(mail))
882    }
883
884    async fn delete_messages(&self, message_ids: &[MessageId]) -> anyhow::Result<()> {
885        if message_ids.is_empty() {
886            return Ok(());
887        }
888
889        let mut tx = self.pool.begin().await?;
890
891        let uuids: Vec<uuid::Uuid> = message_ids.iter().map(|id| *id.as_uuid()).collect();
892
893        // Get external blob references to delete
894        let blob_rows = sqlx::query("SELECT body_external_ref FROM messages WHERE id = ANY($1) AND body_external_ref IS NOT NULL")
895            .bind(&uuids)
896            .fetch_all(&mut *tx)
897            .await?;
898
899        let blob_ids: Vec<uuid::Uuid> = blob_rows
900            .into_iter()
901            .filter_map(|row| row.try_get("body_external_ref").ok())
902            .collect();
903
904        // Delete messages
905        sqlx::query("DELETE FROM messages WHERE id = ANY($1)")
906            .bind(&uuids)
907            .execute(&mut *tx)
908            .await
909            .map_err(|e| anyhow::anyhow!("Failed to delete messages: {}", e))?;
910
911        // Delete external blobs
912        if !blob_ids.is_empty() {
913            sqlx::query("DELETE FROM message_blobs WHERE id = ANY($1)")
914                .bind(&blob_ids)
915                .execute(&mut *tx)
916                .await
917                .map_err(|e| anyhow::anyhow!("Failed to delete blobs: {}", e))?;
918        }
919
920        tx.commit().await?;
921
922        tracing::debug!("Deleted {} messages", message_ids.len());
923        Ok(())
924    }
925
926    async fn set_flags(
927        &self,
928        message_ids: &[MessageId],
929        flags: MessageFlags,
930    ) -> anyhow::Result<()> {
931        if message_ids.is_empty() {
932            return Ok(());
933        }
934
935        let uuids: Vec<uuid::Uuid> = message_ids.iter().map(|id| *id.as_uuid()).collect();
936        let custom_flags: Vec<String> = flags.custom().iter().cloned().collect();
937
938        sqlx::query(
939            r#"
940            UPDATE message_flags SET
941                flag_seen = $1,
942                flag_answered = $2,
943                flag_flagged = $3,
944                flag_deleted = $4,
945                flag_draft = $5,
946                flag_recent = $6,
947                custom_flags = $7
948            WHERE message_id = ANY($8)
949            "#,
950        )
951        .bind(flags.is_seen())
952        .bind(flags.is_answered())
953        .bind(flags.is_flagged())
954        .bind(flags.is_deleted())
955        .bind(flags.is_draft())
956        .bind(flags.is_recent())
957        .bind(&custom_flags)
958        .bind(&uuids)
959        .execute(&self.pool)
960        .await
961        .map_err(|e| anyhow::anyhow!("Failed to set flags: {}", e))?;
962
963        tracing::debug!("Set flags for {} messages", message_ids.len());
964        Ok(())
965    }
966
967    async fn search(
968        &self,
969        mailbox_id: &MailboxId,
970        criteria: SearchCriteria,
971    ) -> anyhow::Result<Vec<MessageId>> {
972        let message_ids = match criteria {
973            SearchCriteria::All => self.search_all(mailbox_id).await?,
974            SearchCriteria::Unseen => self.search_unseen(mailbox_id).await?,
975            SearchCriteria::Seen => self.search_seen(mailbox_id).await?,
976            SearchCriteria::Flagged => self.search_flagged(mailbox_id).await?,
977            SearchCriteria::Unflagged => self.search_unflagged(mailbox_id).await?,
978            SearchCriteria::Deleted => self.search_deleted(mailbox_id).await?,
979            SearchCriteria::Undeleted => self.search_undeleted(mailbox_id).await?,
980            SearchCriteria::From(email) => self.search_from(mailbox_id, &email).await?,
981            SearchCriteria::To(email) => self.search_to(mailbox_id, &email).await?,
982            SearchCriteria::Subject(text) => self.search_subject(mailbox_id, &text).await?,
983            SearchCriteria::Body(text) => self.search_body(mailbox_id, &text).await?,
984            SearchCriteria::And(criteria_list) => {
985                self.search_and(mailbox_id, criteria_list).await?
986            }
987            SearchCriteria::Or(criteria_list) => self.search_or(mailbox_id, criteria_list).await?,
988            SearchCriteria::Not(criteria) => self.search_not(mailbox_id, *criteria).await?,
989        };
990
991        Ok(message_ids)
992    }
993
994    async fn copy_messages(
995        &self,
996        message_ids: &[MessageId],
997        dest_mailbox_id: &MailboxId,
998    ) -> anyhow::Result<Vec<MessageMetadata>> {
999        if message_ids.is_empty() {
1000            return Ok(vec![]);
1001        }
1002
1003        let mut tx = self.pool.begin().await?;
1004        let mut metadata_list = Vec::new();
1005
1006        for message_id in message_ids {
1007            // Get next UID for destination mailbox
1008            let uid_row = sqlx::query("SELECT uid_next FROM mailboxes WHERE id = $1 FOR UPDATE")
1009                .bind(*dest_mailbox_id.as_uuid())
1010                .fetch_one(&mut *tx)
1011                .await?;
1012            let uid: i32 = uid_row.try_get("uid_next")?;
1013
1014            // Copy message with new ID and UID
1015            let new_message_id = MessageId::new();
1016            sqlx::query(
1017                r#"
1018                INSERT INTO messages (id, mailbox_id, uid, sender, recipients, subject, headers, body_inline, body_external_ref, size)
1019                SELECT $1, $2, $3, sender, recipients, subject, headers, body_inline, body_external_ref, size
1020                FROM messages WHERE id = $4
1021                "#,
1022            )
1023            .bind(*new_message_id.as_uuid())
1024            .bind(*dest_mailbox_id.as_uuid())
1025            .bind(uid)
1026            .bind(*message_id.as_uuid())
1027            .execute(&mut *tx)
1028            .await?;
1029
1030            // Copy flags
1031            sqlx::query(
1032                r#"
1033                INSERT INTO message_flags (message_id, flag_seen, flag_answered, flag_flagged, flag_deleted, flag_draft, flag_recent, custom_flags)
1034                SELECT $1, flag_seen, flag_answered, flag_flagged, flag_deleted, flag_draft, FALSE, custom_flags
1035                FROM message_flags WHERE message_id = $2
1036                "#,
1037            )
1038            .bind(*new_message_id.as_uuid())
1039            .bind(*message_id.as_uuid())
1040            .execute(&mut *tx)
1041            .await?;
1042
1043            // Update destination mailbox uid_next
1044            sqlx::query("UPDATE mailboxes SET uid_next = $1 WHERE id = $2")
1045                .bind(uid + 1)
1046                .bind(*dest_mailbox_id.as_uuid())
1047                .execute(&mut *tx)
1048                .await?;
1049
1050            // Get size for metadata
1051            let size_row = sqlx::query("SELECT size FROM messages WHERE id = $1")
1052                .bind(*new_message_id.as_uuid())
1053                .fetch_one(&mut *tx)
1054                .await?;
1055            let size: i32 = size_row.try_get("size")?;
1056
1057            metadata_list.push(MessageMetadata::new(
1058                new_message_id,
1059                *dest_mailbox_id,
1060                uid as u32,
1061                MessageFlags::new(),
1062                size as usize,
1063            ));
1064        }
1065
1066        tx.commit().await?;
1067
1068        tracing::debug!(
1069            "Copied {} messages to mailbox {}",
1070            message_ids.len(),
1071            dest_mailbox_id
1072        );
1073        Ok(metadata_list)
1074    }
1075
1076    async fn get_mailbox_messages(
1077        &self,
1078        mailbox_id: &MailboxId,
1079    ) -> anyhow::Result<Vec<MessageMetadata>> {
1080        let rows = sqlx::query(
1081            r#"
1082            SELECT m.id, m.mailbox_id, m.uid, m.size,
1083                   f.flag_seen, f.flag_answered, f.flag_flagged,
1084                   f.flag_deleted, f.flag_draft, f.flag_recent, f.custom_flags
1085            FROM messages m
1086            LEFT JOIN message_flags f ON m.id = f.message_id
1087            WHERE m.mailbox_id = $1
1088            ORDER BY m.uid
1089            "#,
1090        )
1091        .bind(*mailbox_id.as_uuid())
1092        .fetch_all(&self.pool)
1093        .await
1094        .map_err(|e| anyhow::anyhow!("Failed to get mailbox messages: {}", e))?;
1095
1096        let metadata_list = rows
1097            .into_iter()
1098            .filter_map(|row| self.row_to_metadata(row).ok())
1099            .collect();
1100
1101        Ok(metadata_list)
1102    }
1103}
1104
1105impl PostgresMessageStore {
1106    fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> anyhow::Result<MessageMetadata> {
1107        let _msg_uuid: uuid::Uuid = row.try_get("id")?;
1108        let _mailbox_uuid: uuid::Uuid = row.try_get("mailbox_id")?;
1109        let uid: i32 = row.try_get("uid")?;
1110        let size: i32 = row.try_get("size")?;
1111
1112        let mut flags = MessageFlags::new();
1113        if let Ok(seen) = row.try_get::<bool, _>("flag_seen") {
1114            flags.set_seen(seen);
1115        }
1116        if let Ok(answered) = row.try_get::<bool, _>("flag_answered") {
1117            flags.set_answered(answered);
1118        }
1119        if let Ok(flagged) = row.try_get::<bool, _>("flag_flagged") {
1120            flags.set_flagged(flagged);
1121        }
1122        if let Ok(deleted) = row.try_get::<bool, _>("flag_deleted") {
1123            flags.set_deleted(deleted);
1124        }
1125        if let Ok(draft) = row.try_get::<bool, _>("flag_draft") {
1126            flags.set_draft(draft);
1127        }
1128        if let Ok(recent) = row.try_get::<bool, _>("flag_recent") {
1129            flags.set_recent(recent);
1130        }
1131        if let Ok(custom) = row.try_get::<Vec<String>, _>("custom_flags") {
1132            for flag in custom {
1133                flags.add_custom(flag);
1134            }
1135        }
1136
1137        // Create MessageId from UUID (note: this doesn't preserve the original MessageId)
1138        let message_id = MessageId::new();
1139        let mailbox_id = MailboxId::new();
1140
1141        Ok(MessageMetadata::new(
1142            message_id,
1143            mailbox_id,
1144            uid as u32,
1145            flags,
1146            size as usize,
1147        ))
1148    }
1149
1150    async fn search_all(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1151        let rows = sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1")
1152            .bind(*mailbox_id.as_uuid())
1153            .fetch_all(&self.pool)
1154            .await?;
1155
1156        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1157    }
1158
1159    async fn search_unseen(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1160        let rows = sqlx::query(
1161            r#"
1162            SELECT m.id FROM messages m
1163            JOIN message_flags f ON m.id = f.message_id
1164            WHERE m.mailbox_id = $1 AND f.flag_seen = FALSE
1165            "#,
1166        )
1167        .bind(*mailbox_id.as_uuid())
1168        .fetch_all(&self.pool)
1169        .await?;
1170
1171        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1172    }
1173
1174    async fn search_seen(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1175        let rows = sqlx::query(
1176            r#"
1177            SELECT m.id FROM messages m
1178            JOIN message_flags f ON m.id = f.message_id
1179            WHERE m.mailbox_id = $1 AND f.flag_seen = TRUE
1180            "#,
1181        )
1182        .bind(*mailbox_id.as_uuid())
1183        .fetch_all(&self.pool)
1184        .await?;
1185
1186        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1187    }
1188
1189    async fn search_flagged(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1190        let rows = sqlx::query(
1191            r#"
1192            SELECT m.id FROM messages m
1193            JOIN message_flags f ON m.id = f.message_id
1194            WHERE m.mailbox_id = $1 AND f.flag_flagged = TRUE
1195            "#,
1196        )
1197        .bind(*mailbox_id.as_uuid())
1198        .fetch_all(&self.pool)
1199        .await?;
1200
1201        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1202    }
1203
1204    async fn search_unflagged(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1205        let rows = sqlx::query(
1206            r#"
1207            SELECT m.id FROM messages m
1208            JOIN message_flags f ON m.id = f.message_id
1209            WHERE m.mailbox_id = $1 AND f.flag_flagged = FALSE
1210            "#,
1211        )
1212        .bind(*mailbox_id.as_uuid())
1213        .fetch_all(&self.pool)
1214        .await?;
1215
1216        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1217    }
1218
1219    async fn search_deleted(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1220        let rows = sqlx::query(
1221            r#"
1222            SELECT m.id FROM messages m
1223            JOIN message_flags f ON m.id = f.message_id
1224            WHERE m.mailbox_id = $1 AND f.flag_deleted = TRUE
1225            "#,
1226        )
1227        .bind(*mailbox_id.as_uuid())
1228        .fetch_all(&self.pool)
1229        .await?;
1230
1231        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1232    }
1233
1234    async fn search_undeleted(&self, mailbox_id: &MailboxId) -> anyhow::Result<Vec<MessageId>> {
1235        let rows = sqlx::query(
1236            r#"
1237            SELECT m.id FROM messages m
1238            JOIN message_flags f ON m.id = f.message_id
1239            WHERE m.mailbox_id = $1 AND f.flag_deleted = FALSE
1240            "#,
1241        )
1242        .bind(*mailbox_id.as_uuid())
1243        .fetch_all(&self.pool)
1244        .await?;
1245
1246        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1247    }
1248
1249    async fn search_from(
1250        &self,
1251        mailbox_id: &MailboxId,
1252        email: &str,
1253    ) -> anyhow::Result<Vec<MessageId>> {
1254        let rows = sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1 AND sender ILIKE $2")
1255            .bind(*mailbox_id.as_uuid())
1256            .bind(format!("%{}%", email))
1257            .fetch_all(&self.pool)
1258            .await?;
1259
1260        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1261    }
1262
1263    async fn search_to(
1264        &self,
1265        mailbox_id: &MailboxId,
1266        email: &str,
1267    ) -> anyhow::Result<Vec<MessageId>> {
1268        let rows =
1269            sqlx::query("SELECT id FROM messages WHERE mailbox_id = $1 AND $2 = ANY(recipients)")
1270                .bind(*mailbox_id.as_uuid())
1271                .bind(email)
1272                .fetch_all(&self.pool)
1273                .await?;
1274
1275        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1276    }
1277
1278    async fn search_subject(
1279        &self,
1280        mailbox_id: &MailboxId,
1281        text: &str,
1282    ) -> anyhow::Result<Vec<MessageId>> {
1283        let rows = sqlx::query(
1284            r#"
1285            SELECT id FROM messages
1286            WHERE mailbox_id = $1 AND search_vector @@ plainto_tsquery('english', $2)
1287            "#,
1288        )
1289        .bind(*mailbox_id.as_uuid())
1290        .bind(text)
1291        .fetch_all(&self.pool)
1292        .await?;
1293
1294        Ok(rows.into_iter().map(|_| MessageId::new()).collect())
1295    }
1296
1297    async fn search_body(
1298        &self,
1299        mailbox_id: &MailboxId,
1300        text: &str,
1301    ) -> anyhow::Result<Vec<MessageId>> {
1302        // Use full-text search
1303        self.search_subject(mailbox_id, text).await
1304    }
1305
1306    async fn search_and(
1307        &self,
1308        mailbox_id: &MailboxId,
1309        criteria_list: Vec<SearchCriteria>,
1310    ) -> anyhow::Result<Vec<MessageId>> {
1311        if criteria_list.is_empty() {
1312            return Ok(vec![]);
1313        }
1314
1315        let mut result_sets: Vec<Vec<MessageId>> = Vec::new();
1316        for criteria in criteria_list {
1317            let results = self.search(mailbox_id, criteria).await?;
1318            result_sets.push(results);
1319        }
1320
1321        // Intersect all result sets
1322        if result_sets.is_empty() {
1323            return Ok(vec![]);
1324        }
1325
1326        let mut intersection = result_sets[0].clone();
1327        for result_set in result_sets.iter().skip(1) {
1328            intersection.retain(|id| result_set.contains(id));
1329        }
1330
1331        Ok(intersection)
1332    }
1333
1334    async fn search_or(
1335        &self,
1336        mailbox_id: &MailboxId,
1337        criteria_list: Vec<SearchCriteria>,
1338    ) -> anyhow::Result<Vec<MessageId>> {
1339        let mut all_results = Vec::new();
1340        for criteria in criteria_list {
1341            let results = self.search(mailbox_id, criteria).await?;
1342            all_results.extend(results);
1343        }
1344
1345        // Remove duplicates
1346        all_results.sort_by_key(|id| format!("{}", id));
1347        all_results.dedup();
1348
1349        Ok(all_results)
1350    }
1351
1352    async fn search_not(
1353        &self,
1354        mailbox_id: &MailboxId,
1355        criteria: SearchCriteria,
1356    ) -> anyhow::Result<Vec<MessageId>> {
1357        let all_messages = self.search_all(mailbox_id).await?;
1358        let excluded = self.search(mailbox_id, criteria).await?;
1359
1360        let result: Vec<MessageId> = all_messages
1361            .into_iter()
1362            .filter(|id| !excluded.contains(id))
1363            .collect();
1364
1365        Ok(result)
1366    }
1367}
1368
1369/// PostgreSQL metadata store implementation
1370struct PostgresMetadataStore {
1371    pool: PgPool,
1372}
1373
1374#[async_trait]
1375impl MetadataStore for PostgresMetadataStore {
1376    async fn get_user_quota(&self, user: &Username) -> anyhow::Result<Quota> {
1377        let row = sqlx::query("SELECT used, quota_limit FROM user_quotas WHERE username = $1")
1378            .bind(user.to_string())
1379            .fetch_optional(&self.pool)
1380            .await
1381            .map_err(|e| anyhow::anyhow!("Failed to get quota: {}", e))?;
1382
1383        match row {
1384            Some(r) => {
1385                let used: i64 = r.try_get("used")?;
1386                let limit: i64 = r.try_get("quota_limit")?;
1387                Ok(Quota::new(used as u64, limit as u64))
1388            }
1389            None => Ok(Quota::new(0, 1024 * 1024 * 1024)), // Default 1GB
1390        }
1391    }
1392
1393    async fn set_user_quota(&self, user: &Username, quota: Quota) -> anyhow::Result<()> {
1394        sqlx::query(
1395            r#"
1396            INSERT INTO user_quotas (username, used, quota_limit)
1397            VALUES ($1, $2, $3)
1398            ON CONFLICT (username) DO UPDATE
1399            SET used = $2, quota_limit = $3, updated_at = NOW()
1400            "#,
1401        )
1402        .bind(user.to_string())
1403        .bind(quota.used as i64)
1404        .bind(quota.limit as i64)
1405        .execute(&self.pool)
1406        .await
1407        .map_err(|e| anyhow::anyhow!("Failed to set quota: {}", e))?;
1408
1409        Ok(())
1410    }
1411
1412    async fn get_mailbox_counters(
1413        &self,
1414        mailbox_id: &MailboxId,
1415    ) -> anyhow::Result<MailboxCounters> {
1416        let row = sqlx::query(
1417            r#"
1418            SELECT
1419                COUNT(*) as total,
1420                COUNT(*) FILTER (WHERE f.flag_recent = TRUE) as recent,
1421                COUNT(*) FILTER (WHERE f.flag_seen = FALSE) as unseen
1422            FROM messages m
1423            LEFT JOIN message_flags f ON m.id = f.message_id
1424            WHERE m.mailbox_id = $1
1425            "#,
1426        )
1427        .bind(*mailbox_id.as_uuid())
1428        .fetch_one(&self.pool)
1429        .await
1430        .map_err(|e| anyhow::anyhow!("Failed to get counters: {}", e))?;
1431
1432        let total: i64 = row.try_get("total")?;
1433        let recent: i64 = row.try_get("recent")?;
1434        let unseen: i64 = row.try_get("unseen")?;
1435
1436        Ok(MailboxCounters {
1437            exists: total as u32,
1438            recent: recent as u32,
1439            unseen: unseen as u32,
1440        })
1441    }
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446    use super::*;
1447
1448    #[test]
1449    fn test_postgres_config_default() {
1450        let config = PostgresConfig::default();
1451        assert_eq!(config.max_connections, 20);
1452        assert_eq!(config.min_connections, 5);
1453        assert_eq!(config.inline_threshold, 100 * 1024);
1454    }
1455
1456    #[test]
1457    fn test_postgres_backend_struct() {
1458        let _ = std::mem::size_of::<PostgresBackend>();
1459    }
1460
1461    #[test]
1462    fn test_search_criteria_all() {
1463        let criteria = SearchCriteria::All;
1464        assert!(matches!(criteria, SearchCriteria::All));
1465    }
1466
1467    #[test]
1468    fn test_search_criteria_unseen() {
1469        let criteria = SearchCriteria::Unseen;
1470        assert!(matches!(criteria, SearchCriteria::Unseen));
1471    }
1472
1473    #[test]
1474    fn test_search_criteria_from() {
1475        let criteria = SearchCriteria::From("test@example.com".to_string());
1476        assert!(matches!(criteria, SearchCriteria::From(_)));
1477    }
1478
1479    #[test]
1480    fn test_search_criteria_subject() {
1481        let criteria = SearchCriteria::Subject("test subject".to_string());
1482        assert!(matches!(criteria, SearchCriteria::Subject(_)));
1483    }
1484
1485    #[test]
1486    fn test_message_flags_default() {
1487        let flags = MessageFlags::new();
1488        assert!(!flags.is_seen());
1489        assert!(!flags.is_answered());
1490        assert!(!flags.is_flagged());
1491        assert!(!flags.is_deleted());
1492        assert!(!flags.is_draft());
1493    }
1494
1495    #[test]
1496    fn test_message_flags_setters() {
1497        let mut flags = MessageFlags::new();
1498        flags.set_seen(true);
1499        flags.set_answered(true);
1500        flags.set_flagged(true);
1501
1502        assert!(flags.is_seen());
1503        assert!(flags.is_answered());
1504        assert!(flags.is_flagged());
1505    }
1506
1507    #[test]
1508    fn test_quota_new() {
1509        let quota = Quota::new(1024, 2048);
1510        assert_eq!(quota.used, 1024);
1511        assert_eq!(quota.limit, 2048);
1512    }
1513
1514    #[test]
1515    fn test_quota_exceeded() {
1516        let quota = Quota::new(2048, 1024);
1517        assert!(quota.is_exceeded());
1518
1519        let quota_ok = Quota::new(512, 1024);
1520        assert!(!quota_ok.is_exceeded());
1521    }
1522
1523    #[test]
1524    fn test_quota_remaining() {
1525        let quota = Quota::new(256, 1024);
1526        assert_eq!(quota.remaining(), 768);
1527    }
1528
1529    #[test]
1530    fn test_mailbox_counters_default() {
1531        let counters = MailboxCounters::default();
1532        assert_eq!(counters.exists, 0);
1533        assert_eq!(counters.recent, 0);
1534        assert_eq!(counters.unseen, 0);
1535    }
1536
1537    #[test]
1538    fn test_mailbox_id_new() {
1539        let id1 = MailboxId::new();
1540        let id2 = MailboxId::new();
1541        assert_ne!(id1, id2);
1542    }
1543
1544    #[test]
1545    fn test_mailbox_id_display() {
1546        let id = MailboxId::new();
1547        let display = format!("{}", id);
1548        assert!(!display.is_empty());
1549    }
1550
1551    #[test]
1552    fn test_mailbox_path_creation() {
1553        let user = Username::new("test@example.com".to_string()).unwrap();
1554        let path = MailboxPath::new(user.clone(), vec!["INBOX".to_string()]);
1555        assert_eq!(path.user(), &user);
1556        assert_eq!(path.path().len(), 1);
1557    }
1558
1559    #[test]
1560    fn test_mailbox_path_name() {
1561        let user = Username::new("test@example.com".to_string()).unwrap();
1562        let path = MailboxPath::new(user, vec!["INBOX".to_string(), "Sent".to_string()]);
1563        assert_eq!(path.name(), Some("Sent"));
1564    }
1565
1566    #[test]
1567    fn test_mailbox_new() {
1568        let user = Username::new("test@example.com".to_string()).unwrap();
1569        let path = MailboxPath::new(user, vec!["INBOX".to_string()]);
1570        let mailbox = Mailbox::new(path);
1571
1572        assert_eq!(mailbox.uid_validity(), 1);
1573        assert_eq!(mailbox.uid_next(), 1);
1574        assert!(mailbox.special_use().is_none());
1575    }
1576
1577    #[test]
1578    fn test_mailbox_special_use() {
1579        let user = Username::new("test@example.com".to_string()).unwrap();
1580        let path = MailboxPath::new(user, vec!["Sent".to_string()]);
1581        let mut mailbox = Mailbox::new(path);
1582
1583        mailbox.set_special_use(Some("\\Sent".to_string()));
1584        assert_eq!(mailbox.special_use(), Some("\\Sent"));
1585    }
1586
1587    #[test]
1588    fn test_message_metadata_new() {
1589        let msg_id = MessageId::new();
1590        let mailbox_id = MailboxId::new();
1591        let flags = MessageFlags::new();
1592
1593        let metadata = MessageMetadata::new(msg_id, mailbox_id, 1, flags, 1024);
1594
1595        assert_eq!(metadata.message_id(), &msg_id);
1596        assert_eq!(metadata.mailbox_id(), &mailbox_id);
1597        assert_eq!(metadata.uid(), 1);
1598        assert_eq!(metadata.size(), 1024);
1599    }
1600
1601    #[test]
1602    fn test_message_metadata_getters() {
1603        let msg_id = MessageId::new();
1604        let mailbox_id = MailboxId::new();
1605        let metadata = MessageMetadata::new(msg_id, mailbox_id, 42, MessageFlags::new(), 2048);
1606
1607        assert_eq!(*metadata.message_id(), msg_id);
1608        assert_eq!(*metadata.mailbox_id(), mailbox_id);
1609        assert_eq!(metadata.uid(), 42);
1610        assert_eq!(metadata.size(), 2048);
1611    }
1612
1613    #[test]
1614    fn test_search_criteria_and() {
1615        let criteria = SearchCriteria::And(vec![
1616            SearchCriteria::Unseen,
1617            SearchCriteria::From("test@example.com".to_string()),
1618        ]);
1619        assert!(matches!(criteria, SearchCriteria::And(_)));
1620    }
1621
1622    #[test]
1623    fn test_search_criteria_or() {
1624        let criteria = SearchCriteria::Or(vec![SearchCriteria::Flagged, SearchCriteria::Deleted]);
1625        assert!(matches!(criteria, SearchCriteria::Or(_)));
1626    }
1627
1628    #[test]
1629    fn test_search_criteria_not() {
1630        let criteria = SearchCriteria::Not(Box::new(SearchCriteria::Seen));
1631        assert!(matches!(criteria, SearchCriteria::Not(_)));
1632    }
1633
1634    #[test]
1635    fn test_mailbox_counters_struct() {
1636        let counters = MailboxCounters {
1637            exists: 10,
1638            recent: 3,
1639            unseen: 5,
1640        };
1641        assert_eq!(counters.exists, 10);
1642        assert_eq!(counters.recent, 3);
1643        assert_eq!(counters.unseen, 5);
1644    }
1645
1646    #[test]
1647    fn test_special_use_attributes_new() {
1648        let attrs = SpecialUseAttributes::new();
1649        assert!(attrs.is_empty());
1650    }
1651
1652    #[test]
1653    fn test_special_use_attributes_single() {
1654        let attrs = SpecialUseAttributes::single("\\Drafts".to_string());
1655        assert!(!attrs.is_empty());
1656        assert!(attrs.has_attribute("\\Drafts"));
1657    }
1658
1659    #[test]
1660    fn test_special_use_attributes_from_vec() {
1661        let vec = vec!["\\Drafts".to_string(), "\\Sent".to_string()];
1662        let attrs = SpecialUseAttributes::from_vec(vec);
1663        assert_eq!(attrs.len(), 2);
1664        assert!(attrs.has_attribute("\\Drafts"));
1665        assert!(attrs.has_attribute("\\Sent"));
1666    }
1667
1668    #[test]
1669    fn test_message_flags_custom() {
1670        let mut flags = MessageFlags::new();
1671        flags.add_custom("CustomFlag".to_string());
1672        assert!(flags.custom().contains("CustomFlag"));
1673    }
1674
1675    #[test]
1676    fn test_message_flags_recent() {
1677        let mut flags = MessageFlags::new();
1678        flags.set_recent(true);
1679        assert!(flags.is_recent());
1680    }
1681
1682    #[test]
1683    fn test_postgres_config_custom() {
1684        let config = PostgresConfig {
1685            max_connections: 50,
1686            min_connections: 10,
1687            connect_timeout: Duration::from_secs(60),
1688            idle_timeout: Some(Duration::from_secs(300)),
1689            max_lifetime: Some(Duration::from_secs(3600)),
1690            inline_threshold: 200 * 1024,
1691        };
1692        assert_eq!(config.max_connections, 50);
1693        assert_eq!(config.inline_threshold, 200 * 1024);
1694    }
1695}