Skip to main content

mxr_store/
message.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4
5pub(crate) fn future_date_cutoff_timestamp() -> i64 {
6    (chrono::Utc::now() + chrono::Duration::days(1)).timestamp()
7}
8
9impl super::Store {
10    pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
11        let id = envelope.id.as_str();
12        let account_id = envelope.account_id.as_str();
13        let thread_id = envelope.thread_id.as_str();
14        let from_name = envelope.from.name.as_deref();
15        let to_addrs = serde_json::to_string(&envelope.to).unwrap();
16        let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
17        let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
18        let refs = serde_json::to_string(&envelope.references).unwrap();
19        let date = envelope.date.timestamp();
20        let flags = envelope.flags.bits() as i64;
21        let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
22        let has_attachments = envelope.has_attachments;
23        let size_bytes = envelope.size_bytes as i64;
24
25        sqlx::query!(
26            "INSERT INTO messages
27             (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
28              reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
29              subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
30             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31             ON CONFLICT(id) DO UPDATE SET
32                provider_id = excluded.provider_id,
33                thread_id = excluded.thread_id,
34                message_id_header = excluded.message_id_header,
35                in_reply_to = excluded.in_reply_to,
36                reference_headers = excluded.reference_headers,
37                from_name = excluded.from_name,
38                from_email = excluded.from_email,
39                to_addrs = excluded.to_addrs,
40                cc_addrs = excluded.cc_addrs,
41                bcc_addrs = excluded.bcc_addrs,
42                subject = excluded.subject,
43                date = excluded.date,
44                flags = excluded.flags,
45                snippet = excluded.snippet,
46                has_attachments = excluded.has_attachments,
47                size_bytes = excluded.size_bytes,
48                unsubscribe_method = excluded.unsubscribe_method",
49            id,
50            account_id,
51            envelope.provider_id,
52            thread_id,
53            envelope.message_id_header,
54            envelope.in_reply_to,
55            refs,
56            from_name,
57            envelope.from.email,
58            to_addrs,
59            cc_addrs,
60            bcc_addrs,
61            envelope.subject,
62            date,
63            flags,
64            envelope.snippet,
65            has_attachments,
66            size_bytes,
67            unsub,
68        )
69        .execute(self.writer())
70        .await?;
71
72        Ok(())
73    }
74
75    pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
76        let id_str = id.as_str();
77        let row = sqlx::query!(
78            r#"SELECT
79                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
80                thread_id as "thread_id!", message_id_header, in_reply_to,
81                reference_headers, from_name, from_email as "from_email!",
82                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
83                subject as "subject!", date as "date!", flags as "flags!",
84                snippet as "snippet!", has_attachments as "has_attachments!: bool",
85                size_bytes as "size_bytes!", unsubscribe_method,
86                COALESCE((
87                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
88                    FROM message_labels
89                    JOIN labels ON labels.id = message_labels.label_id
90                    WHERE message_labels.message_id = messages.id
91                ), '') as "label_provider_ids!: String"
92             FROM messages WHERE id = ?"#,
93            id_str,
94        )
95        .fetch_optional(self.reader())
96        .await?;
97
98        Ok(row.map(|r| {
99            record_to_envelope(
100                &r.id,
101                &r.account_id,
102                &r.provider_id,
103                &r.thread_id,
104                r.message_id_header.as_deref(),
105                r.in_reply_to.as_deref(),
106                r.reference_headers.as_deref(),
107                r.from_name.as_deref(),
108                &r.from_email,
109                &r.to_addrs,
110                &r.cc_addrs,
111                &r.bcc_addrs,
112                &r.subject,
113                r.date,
114                r.flags,
115                &r.snippet,
116                r.has_attachments,
117                r.size_bytes,
118                r.unsubscribe_method.as_deref(),
119                &r.label_provider_ids,
120            )
121        }))
122    }
123
124    pub async fn list_envelopes_by_label(
125        &self,
126        label_id: &LabelId,
127        limit: u32,
128        offset: u32,
129    ) -> Result<Vec<Envelope>, sqlx::Error> {
130        let lid = label_id.as_str();
131        let cutoff = future_date_cutoff_timestamp();
132        let lim = limit as i64;
133        let off = offset as i64;
134        let rows = sqlx::query!(
135            r#"SELECT
136                m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
137                m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
138                m.reference_headers, m.from_name, m.from_email as "from_email!",
139                m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
140                m.subject as "subject!", m.date as "date!", m.flags as "flags!",
141                m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
142                m.size_bytes as "size_bytes!", m.unsubscribe_method,
143                COALESCE((
144                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
145                    FROM message_labels
146                    JOIN labels ON labels.id = message_labels.label_id
147                    WHERE message_labels.message_id = m.id
148                ), '') as "label_provider_ids!: String"
149             FROM messages m
150             JOIN message_labels ml ON m.id = ml.message_id
151             WHERE ml.label_id = ?
152             ORDER BY CASE WHEN m.date > ? THEN 0 ELSE m.date END DESC, m.id DESC
153             LIMIT ? OFFSET ?"#,
154            lid,
155            cutoff,
156            lim,
157            off,
158        )
159        .fetch_all(self.reader())
160        .await?;
161
162        Ok(rows
163            .into_iter()
164            .map(|r| {
165                record_to_envelope(
166                    &r.id,
167                    &r.account_id,
168                    &r.provider_id,
169                    &r.thread_id,
170                    r.message_id_header.as_deref(),
171                    r.in_reply_to.as_deref(),
172                    r.reference_headers.as_deref(),
173                    r.from_name.as_deref(),
174                    &r.from_email,
175                    &r.to_addrs,
176                    &r.cc_addrs,
177                    &r.bcc_addrs,
178                    &r.subject,
179                    r.date,
180                    r.flags,
181                    &r.snippet,
182                    r.has_attachments,
183                    r.size_bytes,
184                    r.unsubscribe_method.as_deref(),
185                    &r.label_provider_ids,
186                )
187            })
188            .collect())
189    }
190
191    pub async fn list_envelopes_by_account(
192        &self,
193        account_id: &AccountId,
194        limit: u32,
195        offset: u32,
196    ) -> Result<Vec<Envelope>, sqlx::Error> {
197        let aid = account_id.as_str();
198        let cutoff = future_date_cutoff_timestamp();
199        let lim = limit as i64;
200        let off = offset as i64;
201        let rows = sqlx::query!(
202            r#"SELECT
203                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
204                thread_id as "thread_id!", message_id_header, in_reply_to,
205                reference_headers, from_name, from_email as "from_email!",
206                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
207                subject as "subject!", date as "date!", flags as "flags!",
208                snippet as "snippet!", has_attachments as "has_attachments!: bool",
209                size_bytes as "size_bytes!", unsubscribe_method,
210                COALESCE((
211                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
212                    FROM message_labels
213                    JOIN labels ON labels.id = message_labels.label_id
214                    WHERE message_labels.message_id = messages.id
215                ), '') as "label_provider_ids!: String"
216             FROM messages
217             WHERE account_id = ?
218             ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
219             LIMIT ? OFFSET ?"#,
220            aid,
221            cutoff,
222            lim,
223            off,
224        )
225        .fetch_all(self.reader())
226        .await?;
227
228        Ok(rows
229            .into_iter()
230            .map(|r| {
231                record_to_envelope(
232                    &r.id,
233                    &r.account_id,
234                    &r.provider_id,
235                    &r.thread_id,
236                    r.message_id_header.as_deref(),
237                    r.in_reply_to.as_deref(),
238                    r.reference_headers.as_deref(),
239                    r.from_name.as_deref(),
240                    &r.from_email,
241                    &r.to_addrs,
242                    &r.cc_addrs,
243                    &r.bcc_addrs,
244                    &r.subject,
245                    r.date,
246                    r.flags,
247                    &r.snippet,
248                    r.has_attachments,
249                    r.size_bytes,
250                    r.unsubscribe_method.as_deref(),
251                    &r.label_provider_ids,
252                )
253            })
254            .collect())
255    }
256
257    pub async fn list_envelopes_by_ids(
258        &self,
259        message_ids: &[MessageId],
260    ) -> Result<Vec<Envelope>, sqlx::Error> {
261        if message_ids.is_empty() {
262            return Ok(Vec::new());
263        }
264
265        let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
266        let sql = format!(
267            r#"SELECT
268                m.id as id, m.account_id as account_id, m.provider_id as provider_id,
269                m.thread_id as thread_id, m.message_id_header, m.in_reply_to,
270                m.reference_headers, m.from_name, m.from_email as from_email,
271                m.to_addrs as to_addrs, m.cc_addrs as cc_addrs, m.bcc_addrs as bcc_addrs,
272                m.subject as subject, m.date as date, m.flags as flags,
273                m.snippet as snippet, m.has_attachments as has_attachments,
274                m.size_bytes as size_bytes, m.unsubscribe_method,
275                COALESCE((
276                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
277                    FROM message_labels
278                    JOIN labels ON labels.id = message_labels.label_id
279                    WHERE message_labels.message_id = m.id
280                ), '') as label_provider_ids
281             FROM messages m
282             WHERE m.id IN ({})"#,
283            placeholders.join(", ")
284        );
285
286        let mut query = sqlx::query(&sql);
287        for message_id in message_ids {
288            query = query.bind(message_id.as_str());
289        }
290
291        let rows = query.fetch_all(self.reader()).await?;
292        let mut by_id = std::collections::HashMap::with_capacity(rows.len());
293        for row in rows {
294            let id = row.get::<String, _>(0);
295            let account_id = row.get::<String, _>(1);
296            let provider_id = row.get::<String, _>(2);
297            let thread_id = row.get::<String, _>(3);
298            let message_id_header = row.get::<Option<String>, _>(4);
299            let in_reply_to = row.get::<Option<String>, _>(5);
300            let reference_headers = row.get::<Option<String>, _>(6);
301            let from_name = row.get::<Option<String>, _>(7);
302            let from_email = row.get::<String, _>(8);
303            let to_addrs = row.get::<String, _>(9);
304            let cc_addrs = row.get::<String, _>(10);
305            let bcc_addrs = row.get::<String, _>(11);
306            let subject = row.get::<String, _>(12);
307            let date = row.get::<i64, _>(13);
308            let flags = row.get::<i64, _>(14);
309            let snippet = row.get::<String, _>(15);
310            let has_attachments = row.get::<bool, _>(16);
311            let size_bytes = row.get::<i64, _>(17);
312            let unsubscribe_method = row.get::<Option<String>, _>(18);
313            let label_provider_ids = row.get::<String, _>(19);
314            let envelope = record_to_envelope(
315                &id,
316                &account_id,
317                &provider_id,
318                &thread_id,
319                message_id_header.as_deref(),
320                in_reply_to.as_deref(),
321                reference_headers.as_deref(),
322                from_name.as_deref(),
323                &from_email,
324                &to_addrs,
325                &cc_addrs,
326                &bcc_addrs,
327                &subject,
328                date,
329                flags,
330                &snippet,
331                has_attachments,
332                size_bytes,
333                unsubscribe_method.as_deref(),
334                &label_provider_ids,
335            );
336            by_id.insert(envelope.id.clone(), envelope);
337        }
338
339        Ok(message_ids
340            .iter()
341            .filter_map(|message_id| by_id.remove(message_id))
342            .collect())
343    }
344
345    // Dynamic SQL -- kept as runtime query due to variable IN clause
346    pub async fn delete_messages_by_provider_ids(
347        &self,
348        account_id: &AccountId,
349        provider_ids: &[String],
350    ) -> Result<u64, sqlx::Error> {
351        if provider_ids.is_empty() {
352            return Ok(0);
353        }
354        let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
355        let sql = format!(
356            "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
357            placeholders.join(", ")
358        );
359        let mut query = sqlx::query(&sql).bind(account_id.as_str());
360        for pid in provider_ids {
361            query = query.bind(pid);
362        }
363        let result = query.execute(self.writer()).await?;
364        Ok(result.rows_affected())
365    }
366
367    pub async fn set_message_labels(
368        &self,
369        message_id: &MessageId,
370        label_ids: &[LabelId],
371    ) -> Result<(), sqlx::Error> {
372        let mid = message_id.as_str();
373        sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
374            .execute(self.writer())
375            .await?;
376
377        for label_id in label_ids {
378            let mid = message_id.as_str();
379            let lid = label_id.as_str();
380            sqlx::query!(
381                "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
382                mid,
383                lid,
384            )
385            .execute(self.writer())
386            .await?;
387        }
388
389        Ok(())
390    }
391
392    pub async fn update_message_thread_id(
393        &self,
394        message_id: &MessageId,
395        thread_id: &ThreadId,
396    ) -> Result<(), sqlx::Error> {
397        sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
398            .bind(thread_id.as_str())
399            .bind(message_id.as_str())
400            .execute(self.writer())
401            .await?;
402        Ok(())
403    }
404
405    pub async fn get_message_id_by_provider_id(
406        &self,
407        account_id: &AccountId,
408        provider_id: &str,
409    ) -> Result<Option<MessageId>, sqlx::Error> {
410        let aid = account_id.as_str();
411        let row = sqlx::query!(
412            r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
413            aid,
414            provider_id,
415        )
416        .fetch_optional(self.reader())
417        .await?;
418
419        Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
420    }
421
422    pub async fn count_messages_by_account(
423        &self,
424        account_id: &AccountId,
425    ) -> Result<u32, sqlx::Error> {
426        let aid = account_id.as_str();
427        let row = sqlx::query!(
428            r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
429            aid,
430        )
431        .fetch_one(self.reader())
432        .await?;
433
434        Ok(row.cnt as u32)
435    }
436
437    /// List all envelopes across all accounts, paginated. Used for reindexing.
438    pub async fn list_all_envelopes_paginated(
439        &self,
440        limit: u32,
441        offset: u32,
442    ) -> Result<Vec<Envelope>, sqlx::Error> {
443        let cutoff = future_date_cutoff_timestamp();
444        let lim = limit as i64;
445        let off = offset as i64;
446        let rows = sqlx::query!(
447            r#"SELECT
448                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
449                thread_id as "thread_id!", message_id_header, in_reply_to,
450                reference_headers, from_name, from_email as "from_email!",
451                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
452                subject as "subject!", date as "date!", flags as "flags!",
453                snippet as "snippet!", has_attachments as "has_attachments!: bool",
454                size_bytes as "size_bytes!", unsubscribe_method,
455                COALESCE((
456                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
457                    FROM message_labels
458                    JOIN labels ON labels.id = message_labels.label_id
459                    WHERE message_labels.message_id = messages.id
460                ), '') as "label_provider_ids!: String"
461             FROM messages
462             ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
463             LIMIT ? OFFSET ?"#,
464            cutoff,
465            lim,
466            off,
467        )
468        .fetch_all(self.reader())
469        .await?;
470
471        Ok(rows
472            .into_iter()
473            .map(|r| {
474                record_to_envelope(
475                    &r.id,
476                    &r.account_id,
477                    &r.provider_id,
478                    &r.thread_id,
479                    r.message_id_header.as_deref(),
480                    r.in_reply_to.as_deref(),
481                    r.reference_headers.as_deref(),
482                    r.from_name.as_deref(),
483                    &r.from_email,
484                    &r.to_addrs,
485                    &r.cc_addrs,
486                    &r.bcc_addrs,
487                    &r.subject,
488                    r.date,
489                    r.flags,
490                    &r.snippet,
491                    r.has_attachments,
492                    r.size_bytes,
493                    r.unsubscribe_method.as_deref(),
494                    &r.label_provider_ids,
495                )
496            })
497            .collect())
498    }
499
500    /// Count all messages across all accounts. Used for reindexing.
501    pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
502        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
503            .fetch_one(self.reader())
504            .await?;
505        Ok(row.cnt as u32)
506    }
507
508    pub async fn update_flags(
509        &self,
510        message_id: &MessageId,
511        flags: MessageFlags,
512    ) -> Result<(), sqlx::Error> {
513        let mid = message_id.as_str();
514        let flags_val = flags.bits() as i64;
515        sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
516            .execute(self.writer())
517            .await?;
518        Ok(())
519    }
520
521    /// Set the read flag on a message.
522    pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
523        let mid = message_id.as_str();
524        let row = sqlx::query!(
525            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
526            mid,
527        )
528        .fetch_optional(self.reader())
529        .await?;
530
531        if let Some(r) = row {
532            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
533            if read {
534                flags.insert(MessageFlags::READ);
535            } else {
536                flags.remove(MessageFlags::READ);
537            }
538            let flags_val = flags.bits() as i64;
539            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
540                .execute(self.writer())
541                .await?;
542        }
543        Ok(())
544    }
545
546    /// Set the starred flag on a message.
547    pub async fn set_starred(
548        &self,
549        message_id: &MessageId,
550        starred: bool,
551    ) -> Result<(), sqlx::Error> {
552        let mid = message_id.as_str();
553        let row = sqlx::query!(
554            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
555            mid,
556        )
557        .fetch_optional(self.reader())
558        .await?;
559
560        if let Some(r) = row {
561            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
562            if starred {
563                flags.insert(MessageFlags::STARRED);
564            } else {
565                flags.remove(MessageFlags::STARRED);
566            }
567            let flags_val = flags.bits() as i64;
568            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
569                .execute(self.writer())
570                .await?;
571        }
572        Ok(())
573    }
574
575    /// Get the provider_id for a message.
576    pub async fn get_provider_id(
577        &self,
578        message_id: &MessageId,
579    ) -> Result<Option<String>, sqlx::Error> {
580        let mid = message_id.as_str();
581        let row = sqlx::query!(
582            r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
583            mid,
584        )
585        .fetch_optional(self.reader())
586        .await?;
587        Ok(row.map(|r| r.provider_id))
588    }
589
590    /// Get the label IDs for a message.
591    pub async fn get_message_label_ids(
592        &self,
593        message_id: &MessageId,
594    ) -> Result<Vec<LabelId>, sqlx::Error> {
595        let mid = message_id.as_str();
596        let rows = sqlx::query!(
597            r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
598            mid,
599        )
600        .fetch_all(self.reader())
601        .await?;
602        Ok(rows
603            .into_iter()
604            .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
605            .collect())
606    }
607
608    /// Add a label to a message.
609    pub async fn add_message_label(
610        &self,
611        message_id: &MessageId,
612        label_id: &LabelId,
613    ) -> Result<(), sqlx::Error> {
614        let mid = message_id.as_str();
615        let lid = label_id.as_str();
616        sqlx::query!(
617            "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
618            mid,
619            lid,
620        )
621        .execute(self.writer())
622        .await?;
623        Ok(())
624    }
625
626    /// Remove a label from a message.
627    pub async fn remove_message_label(
628        &self,
629        message_id: &MessageId,
630        label_id: &LabelId,
631    ) -> Result<(), sqlx::Error> {
632        let mid = message_id.as_str();
633        let lid = label_id.as_str();
634        sqlx::query!(
635            "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
636            mid,
637            lid,
638        )
639        .execute(self.writer())
640        .await?;
641        Ok(())
642    }
643
644    /// Count total rows in the message_labels junction table.
645    pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
646        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
647            .fetch_one(self.reader())
648            .await?;
649        Ok(row.cnt as u32)
650    }
651
652    /// Mark a message as trashed (update flags).
653    pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
654        let mid = message_id.as_str();
655        let row = sqlx::query!(
656            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
657            mid,
658        )
659        .fetch_optional(self.reader())
660        .await?;
661
662        if let Some(r) = row {
663            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
664            flags.insert(MessageFlags::TRASH);
665            let flags_val = flags.bits() as i64;
666            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
667                .execute(self.writer())
668                .await?;
669        }
670        Ok(())
671    }
672
673    /// Get distinct contacts (name + email) from message senders, ordered by frequency.
674    pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
675        let lim = limit as i64;
676        let rows = sqlx::query_as::<_, (String, String)>(
677            r#"SELECT
678                COALESCE(from_name, '') as name,
679                from_email as email
680             FROM messages
681             WHERE from_email != ''
682             GROUP BY from_email
683             ORDER BY COUNT(*) DESC
684             LIMIT ?"#,
685        )
686        .bind(lim)
687        .fetch_all(self.reader())
688        .await?;
689        Ok(rows)
690    }
691
692    pub async fn list_subscriptions(
693        &self,
694        limit: u32,
695    ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
696        let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
697        let trash_flag = MessageFlags::TRASH.bits() as i64;
698        let spam_flag = MessageFlags::SPAM.bits() as i64;
699        let cutoff = future_date_cutoff_timestamp();
700        let lim = limit as i64;
701
702        let rows = sqlx::query(
703            r#"WITH ranked AS (
704                SELECT
705                    id,
706                    account_id,
707                    provider_id,
708                    thread_id,
709                    from_name,
710                    from_email,
711                    subject,
712                    snippet,
713                    date,
714                    flags,
715                    has_attachments,
716                    size_bytes,
717                    unsubscribe_method,
718                    COUNT(*) OVER (
719                        PARTITION BY account_id, LOWER(from_email)
720                    ) AS message_count,
721                    ROW_NUMBER() OVER (
722                        PARTITION BY account_id, LOWER(from_email)
723                        ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
724                    ) AS rn
725                FROM messages
726                WHERE from_email != ''
727                  AND unsubscribe_method IS NOT NULL
728                  AND unsubscribe_method != ?
729                  AND (flags & ?) = 0
730                  AND (flags & ?) = 0
731            )
732            SELECT
733                id,
734                account_id,
735                provider_id,
736                thread_id,
737                from_name,
738                from_email,
739                subject,
740                snippet,
741                date,
742                flags,
743                has_attachments,
744                size_bytes,
745                unsubscribe_method,
746                message_count
747            FROM ranked
748            WHERE rn = 1
749            ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
750            LIMIT ?"#,
751        )
752        .bind(none_unsubscribe)
753        .bind(trash_flag)
754        .bind(spam_flag)
755        .bind(cutoff)
756        .bind(cutoff)
757        .bind(lim)
758        .fetch_all(self.reader())
759        .await?;
760
761        Ok(rows
762            .into_iter()
763            .map(|row| SubscriptionSummary {
764                account_id: AccountId::from_uuid(
765                    uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
766                ),
767                sender_name: row.get::<Option<String>, _>("from_name"),
768                sender_email: row.get::<String, _>("from_email"),
769                message_count: row.get::<i64, _>("message_count") as u32,
770                latest_message_id: MessageId::from_uuid(
771                    uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
772                ),
773                latest_provider_id: row.get::<String, _>("provider_id"),
774                latest_thread_id: ThreadId::from_uuid(
775                    uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
776                ),
777                latest_subject: row.get::<String, _>("subject"),
778                latest_snippet: row.get::<String, _>("snippet"),
779                latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
780                    .unwrap_or_default(),
781                latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
782                latest_has_attachments: row.get::<bool, _>("has_attachments"),
783                latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
784                unsubscribe: row
785                    .get::<Option<String>, _>("unsubscribe_method")
786                    .as_deref()
787                    .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
788                    .unwrap_or(UnsubscribeMethod::None),
789            })
790            .collect())
791    }
792}
793
794/// Shared helper to convert individual field values into an Envelope.
795/// Used by both message.rs and thread.rs queries since the `query!` macro
796/// returns different anonymous types for each call site.
797#[allow(clippy::too_many_arguments)]
798pub(crate) fn record_to_envelope(
799    id: &str,
800    account_id: &str,
801    provider_id: &str,
802    thread_id: &str,
803    message_id_header: Option<&str>,
804    in_reply_to: Option<&str>,
805    reference_headers: Option<&str>,
806    from_name: Option<&str>,
807    from_email: &str,
808    to_addrs: &str,
809    cc_addrs: &str,
810    bcc_addrs: &str,
811    subject: &str,
812    date: i64,
813    flags: i64,
814    snippet: &str,
815    has_attachments: bool,
816    size_bytes: i64,
817    unsubscribe_method: Option<&str>,
818    label_provider_ids: &str,
819) -> Envelope {
820    Envelope {
821        id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
822        account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
823        provider_id: provider_id.to_string(),
824        thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
825        message_id_header: message_id_header.map(|s| s.to_string()),
826        in_reply_to: in_reply_to.map(|s| s.to_string()),
827        references: reference_headers
828            .map(|r| serde_json::from_str(r).unwrap_or_default())
829            .unwrap_or_default(),
830        from: Address {
831            name: from_name.map(|s| s.to_string()),
832            email: from_email.to_string(),
833        },
834        to: serde_json::from_str(to_addrs).unwrap_or_default(),
835        cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
836        bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
837        subject: subject.to_string(),
838        date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
839        flags: MessageFlags::from_bits_truncate(flags as u32),
840        snippet: snippet.to_string(),
841        has_attachments,
842        size_bytes: size_bytes as u64,
843        unsubscribe: unsubscribe_method
844            .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
845            .unwrap_or(UnsubscribeMethod::None),
846        label_provider_ids: if label_provider_ids.is_empty() {
847            vec![]
848        } else {
849            label_provider_ids
850                .split('\u{1f}')
851                .filter(|provider_id| !provider_id.is_empty())
852                .map(str::to_string)
853                .collect()
854        },
855    }
856}