Skip to main content

mxr_store/
message.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4impl super::Store {
5    pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
6        let id = envelope.id.as_str();
7        let account_id = envelope.account_id.as_str();
8        let thread_id = envelope.thread_id.as_str();
9        let from_name = envelope.from.name.as_deref();
10        let to_addrs = serde_json::to_string(&envelope.to).unwrap();
11        let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
12        let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
13        let refs = serde_json::to_string(&envelope.references).unwrap();
14        let date = envelope.date.timestamp();
15        let flags = envelope.flags.bits() as i64;
16        let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
17        let has_attachments = envelope.has_attachments;
18        let size_bytes = envelope.size_bytes as i64;
19
20        sqlx::query!(
21            "INSERT INTO messages
22             (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
23              reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
24              subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
25             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
26             ON CONFLICT(id) DO UPDATE SET
27                provider_id = excluded.provider_id,
28                thread_id = excluded.thread_id,
29                message_id_header = excluded.message_id_header,
30                in_reply_to = excluded.in_reply_to,
31                reference_headers = excluded.reference_headers,
32                from_name = excluded.from_name,
33                from_email = excluded.from_email,
34                to_addrs = excluded.to_addrs,
35                cc_addrs = excluded.cc_addrs,
36                bcc_addrs = excluded.bcc_addrs,
37                subject = excluded.subject,
38                date = excluded.date,
39                flags = excluded.flags,
40                snippet = excluded.snippet,
41                has_attachments = excluded.has_attachments,
42                size_bytes = excluded.size_bytes,
43                unsubscribe_method = excluded.unsubscribe_method",
44            id,
45            account_id,
46            envelope.provider_id,
47            thread_id,
48            envelope.message_id_header,
49            envelope.in_reply_to,
50            refs,
51            from_name,
52            envelope.from.email,
53            to_addrs,
54            cc_addrs,
55            bcc_addrs,
56            envelope.subject,
57            date,
58            flags,
59            envelope.snippet,
60            has_attachments,
61            size_bytes,
62            unsub,
63        )
64        .execute(self.writer())
65        .await?;
66
67        Ok(())
68    }
69
70    pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
71        let id_str = id.as_str();
72        let row = sqlx::query!(
73            r#"SELECT
74                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
75                thread_id as "thread_id!", message_id_header, in_reply_to,
76                reference_headers, from_name, from_email as "from_email!",
77                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
78                subject as "subject!", date as "date!", flags as "flags!",
79                snippet as "snippet!", has_attachments as "has_attachments!: bool",
80                size_bytes as "size_bytes!", unsubscribe_method,
81                COALESCE((
82                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
83                    FROM message_labels
84                    JOIN labels ON labels.id = message_labels.label_id
85                    WHERE message_labels.message_id = messages.id
86                ), '') as "label_provider_ids!: String"
87             FROM messages WHERE id = ?"#,
88            id_str,
89        )
90        .fetch_optional(self.reader())
91        .await?;
92
93        Ok(row.map(|r| {
94            record_to_envelope(
95                &r.id,
96                &r.account_id,
97                &r.provider_id,
98                &r.thread_id,
99                r.message_id_header.as_deref(),
100                r.in_reply_to.as_deref(),
101                r.reference_headers.as_deref(),
102                r.from_name.as_deref(),
103                &r.from_email,
104                &r.to_addrs,
105                &r.cc_addrs,
106                &r.bcc_addrs,
107                &r.subject,
108                r.date,
109                r.flags,
110                &r.snippet,
111                r.has_attachments,
112                r.size_bytes,
113                r.unsubscribe_method.as_deref(),
114                &r.label_provider_ids,
115            )
116        }))
117    }
118
119    pub async fn list_envelopes_by_label(
120        &self,
121        label_id: &LabelId,
122        limit: u32,
123        offset: u32,
124    ) -> Result<Vec<Envelope>, sqlx::Error> {
125        let lid = label_id.as_str();
126        let lim = limit as i64;
127        let off = offset as i64;
128        let rows = sqlx::query!(
129            r#"SELECT
130                m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
131                m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
132                m.reference_headers, m.from_name, m.from_email as "from_email!",
133                m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
134                m.subject as "subject!", m.date as "date!", m.flags as "flags!",
135                m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
136                m.size_bytes as "size_bytes!", m.unsubscribe_method,
137                COALESCE((
138                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
139                    FROM message_labels
140                    JOIN labels ON labels.id = message_labels.label_id
141                    WHERE message_labels.message_id = m.id
142                ), '') as "label_provider_ids!: String"
143             FROM messages m
144             JOIN message_labels ml ON m.id = ml.message_id
145             WHERE ml.label_id = ?
146             ORDER BY m.date DESC
147             LIMIT ? OFFSET ?"#,
148            lid,
149            lim,
150            off,
151        )
152        .fetch_all(self.reader())
153        .await?;
154
155        Ok(rows
156            .into_iter()
157            .map(|r| {
158                record_to_envelope(
159                    &r.id,
160                    &r.account_id,
161                    &r.provider_id,
162                    &r.thread_id,
163                    r.message_id_header.as_deref(),
164                    r.in_reply_to.as_deref(),
165                    r.reference_headers.as_deref(),
166                    r.from_name.as_deref(),
167                    &r.from_email,
168                    &r.to_addrs,
169                    &r.cc_addrs,
170                    &r.bcc_addrs,
171                    &r.subject,
172                    r.date,
173                    r.flags,
174                    &r.snippet,
175                    r.has_attachments,
176                    r.size_bytes,
177                    r.unsubscribe_method.as_deref(),
178                    &r.label_provider_ids,
179                )
180            })
181            .collect())
182    }
183
184    pub async fn list_envelopes_by_account(
185        &self,
186        account_id: &AccountId,
187        limit: u32,
188        offset: u32,
189    ) -> Result<Vec<Envelope>, sqlx::Error> {
190        let aid = account_id.as_str();
191        let lim = limit as i64;
192        let off = offset as i64;
193        let rows = sqlx::query!(
194            r#"SELECT
195                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
196                thread_id as "thread_id!", message_id_header, in_reply_to,
197                reference_headers, from_name, from_email as "from_email!",
198                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
199                subject as "subject!", date as "date!", flags as "flags!",
200                snippet as "snippet!", has_attachments as "has_attachments!: bool",
201                size_bytes as "size_bytes!", unsubscribe_method,
202                COALESCE((
203                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
204                    FROM message_labels
205                    JOIN labels ON labels.id = message_labels.label_id
206                    WHERE message_labels.message_id = messages.id
207                ), '') as "label_provider_ids!: String"
208             FROM messages WHERE account_id = ? ORDER BY date DESC LIMIT ? OFFSET ?"#,
209            aid,
210            lim,
211            off,
212        )
213        .fetch_all(self.reader())
214        .await?;
215
216        Ok(rows
217            .into_iter()
218            .map(|r| {
219                record_to_envelope(
220                    &r.id,
221                    &r.account_id,
222                    &r.provider_id,
223                    &r.thread_id,
224                    r.message_id_header.as_deref(),
225                    r.in_reply_to.as_deref(),
226                    r.reference_headers.as_deref(),
227                    r.from_name.as_deref(),
228                    &r.from_email,
229                    &r.to_addrs,
230                    &r.cc_addrs,
231                    &r.bcc_addrs,
232                    &r.subject,
233                    r.date,
234                    r.flags,
235                    &r.snippet,
236                    r.has_attachments,
237                    r.size_bytes,
238                    r.unsubscribe_method.as_deref(),
239                    &r.label_provider_ids,
240                )
241            })
242            .collect())
243    }
244
245    pub async fn list_envelopes_by_ids(
246        &self,
247        message_ids: &[MessageId],
248    ) -> Result<Vec<Envelope>, sqlx::Error> {
249        if message_ids.is_empty() {
250            return Ok(Vec::new());
251        }
252
253        let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
254        let sql = format!(
255            r#"SELECT
256                m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
257                m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
258                m.reference_headers, m.from_name, m.from_email as "from_email!",
259                m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
260                m.subject as "subject!", m.date as "date!", m.flags as "flags!",
261                m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
262                m.size_bytes as "size_bytes!", m.unsubscribe_method,
263                COALESCE((
264                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
265                    FROM message_labels
266                    JOIN labels ON labels.id = message_labels.label_id
267                    WHERE message_labels.message_id = m.id
268                ), '') as "label_provider_ids!: String"
269             FROM messages m
270             WHERE m.id IN ({})"#,
271            placeholders.join(", ")
272        );
273
274        let mut query = sqlx::query(&sql);
275        for message_id in message_ids {
276            query = query.bind(message_id.as_str());
277        }
278
279        let rows = query.fetch_all(self.reader()).await?;
280        let mut by_id = std::collections::HashMap::with_capacity(rows.len());
281        for row in rows {
282            let envelope = record_to_envelope(
283                row.get::<String, _>("id").as_str(),
284                row.get::<String, _>("account_id").as_str(),
285                row.get::<String, _>("provider_id").as_str(),
286                row.get::<String, _>("thread_id").as_str(),
287                row.get::<Option<String>, _>("message_id_header").as_deref(),
288                row.get::<Option<String>, _>("in_reply_to").as_deref(),
289                row.get::<Option<String>, _>("reference_headers").as_deref(),
290                row.get::<Option<String>, _>("from_name").as_deref(),
291                row.get::<String, _>("from_email").as_str(),
292                row.get::<String, _>("to_addrs").as_str(),
293                row.get::<String, _>("cc_addrs").as_str(),
294                row.get::<String, _>("bcc_addrs").as_str(),
295                row.get::<String, _>("subject").as_str(),
296                row.get::<i64, _>("date"),
297                row.get::<i64, _>("flags"),
298                row.get::<String, _>("snippet").as_str(),
299                row.get::<bool, _>("has_attachments"),
300                row.get::<i64, _>("size_bytes"),
301                row.get::<Option<String>, _>("unsubscribe_method")
302                    .as_deref(),
303                row.get::<String, _>("label_provider_ids").as_str(),
304            );
305            by_id.insert(envelope.id.clone(), envelope);
306        }
307
308        Ok(message_ids
309            .iter()
310            .filter_map(|message_id| by_id.remove(message_id))
311            .collect())
312    }
313
314    // Dynamic SQL -- kept as runtime query due to variable IN clause
315    pub async fn delete_messages_by_provider_ids(
316        &self,
317        account_id: &AccountId,
318        provider_ids: &[String],
319    ) -> Result<u64, sqlx::Error> {
320        if provider_ids.is_empty() {
321            return Ok(0);
322        }
323        let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
324        let sql = format!(
325            "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
326            placeholders.join(", ")
327        );
328        let mut query = sqlx::query(&sql).bind(account_id.as_str());
329        for pid in provider_ids {
330            query = query.bind(pid);
331        }
332        let result = query.execute(self.writer()).await?;
333        Ok(result.rows_affected())
334    }
335
336    pub async fn set_message_labels(
337        &self,
338        message_id: &MessageId,
339        label_ids: &[LabelId],
340    ) -> Result<(), sqlx::Error> {
341        let mid = message_id.as_str();
342        sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
343            .execute(self.writer())
344            .await?;
345
346        for label_id in label_ids {
347            let mid = message_id.as_str();
348            let lid = label_id.as_str();
349            sqlx::query!(
350                "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
351                mid,
352                lid,
353            )
354            .execute(self.writer())
355            .await?;
356        }
357
358        Ok(())
359    }
360
361    pub async fn update_message_thread_id(
362        &self,
363        message_id: &MessageId,
364        thread_id: &ThreadId,
365    ) -> Result<(), sqlx::Error> {
366        sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
367            .bind(thread_id.as_str())
368            .bind(message_id.as_str())
369            .execute(self.writer())
370            .await?;
371        Ok(())
372    }
373
374    pub async fn get_message_id_by_provider_id(
375        &self,
376        account_id: &AccountId,
377        provider_id: &str,
378    ) -> Result<Option<MessageId>, sqlx::Error> {
379        let aid = account_id.as_str();
380        let row = sqlx::query!(
381            r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
382            aid,
383            provider_id,
384        )
385        .fetch_optional(self.reader())
386        .await?;
387
388        Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
389    }
390
391    pub async fn count_messages_by_account(
392        &self,
393        account_id: &AccountId,
394    ) -> Result<u32, sqlx::Error> {
395        let aid = account_id.as_str();
396        let row = sqlx::query!(
397            r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
398            aid,
399        )
400        .fetch_one(self.reader())
401        .await?;
402
403        Ok(row.cnt as u32)
404    }
405
406    /// List all envelopes across all accounts, paginated. Used for reindexing.
407    pub async fn list_all_envelopes_paginated(
408        &self,
409        limit: u32,
410        offset: u32,
411    ) -> Result<Vec<Envelope>, sqlx::Error> {
412        let lim = limit as i64;
413        let off = offset as i64;
414        let rows = sqlx::query!(
415            r#"SELECT
416                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
417                thread_id as "thread_id!", message_id_header, in_reply_to,
418                reference_headers, from_name, from_email as "from_email!",
419                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
420                subject as "subject!", date as "date!", flags as "flags!",
421                snippet as "snippet!", has_attachments as "has_attachments!: bool",
422                size_bytes as "size_bytes!", unsubscribe_method,
423                COALESCE((
424                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
425                    FROM message_labels
426                    JOIN labels ON labels.id = message_labels.label_id
427                    WHERE message_labels.message_id = messages.id
428                ), '') as "label_provider_ids!: String"
429             FROM messages ORDER BY date DESC LIMIT ? OFFSET ?"#,
430            lim,
431            off,
432        )
433        .fetch_all(self.reader())
434        .await?;
435
436        Ok(rows
437            .into_iter()
438            .map(|r| {
439                record_to_envelope(
440                    &r.id,
441                    &r.account_id,
442                    &r.provider_id,
443                    &r.thread_id,
444                    r.message_id_header.as_deref(),
445                    r.in_reply_to.as_deref(),
446                    r.reference_headers.as_deref(),
447                    r.from_name.as_deref(),
448                    &r.from_email,
449                    &r.to_addrs,
450                    &r.cc_addrs,
451                    &r.bcc_addrs,
452                    &r.subject,
453                    r.date,
454                    r.flags,
455                    &r.snippet,
456                    r.has_attachments,
457                    r.size_bytes,
458                    r.unsubscribe_method.as_deref(),
459                    &r.label_provider_ids,
460                )
461            })
462            .collect())
463    }
464
465    /// Count all messages across all accounts. Used for reindexing.
466    pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
467        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
468            .fetch_one(self.reader())
469            .await?;
470        Ok(row.cnt as u32)
471    }
472
473    pub async fn update_flags(
474        &self,
475        message_id: &MessageId,
476        flags: MessageFlags,
477    ) -> Result<(), sqlx::Error> {
478        let mid = message_id.as_str();
479        let flags_val = flags.bits() as i64;
480        sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
481            .execute(self.writer())
482            .await?;
483        Ok(())
484    }
485
486    /// Set the read flag on a message.
487    pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
488        let mid = message_id.as_str();
489        let row = sqlx::query!(
490            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
491            mid,
492        )
493        .fetch_optional(self.reader())
494        .await?;
495
496        if let Some(r) = row {
497            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
498            if read {
499                flags.insert(MessageFlags::READ);
500            } else {
501                flags.remove(MessageFlags::READ);
502            }
503            let flags_val = flags.bits() as i64;
504            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
505                .execute(self.writer())
506                .await?;
507        }
508        Ok(())
509    }
510
511    /// Set the starred flag on a message.
512    pub async fn set_starred(
513        &self,
514        message_id: &MessageId,
515        starred: bool,
516    ) -> Result<(), sqlx::Error> {
517        let mid = message_id.as_str();
518        let row = sqlx::query!(
519            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
520            mid,
521        )
522        .fetch_optional(self.reader())
523        .await?;
524
525        if let Some(r) = row {
526            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
527            if starred {
528                flags.insert(MessageFlags::STARRED);
529            } else {
530                flags.remove(MessageFlags::STARRED);
531            }
532            let flags_val = flags.bits() as i64;
533            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
534                .execute(self.writer())
535                .await?;
536        }
537        Ok(())
538    }
539
540    /// Get the provider_id for a message.
541    pub async fn get_provider_id(
542        &self,
543        message_id: &MessageId,
544    ) -> Result<Option<String>, sqlx::Error> {
545        let mid = message_id.as_str();
546        let row = sqlx::query!(
547            r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
548            mid,
549        )
550        .fetch_optional(self.reader())
551        .await?;
552        Ok(row.map(|r| r.provider_id))
553    }
554
555    /// Get the label IDs for a message.
556    pub async fn get_message_label_ids(
557        &self,
558        message_id: &MessageId,
559    ) -> Result<Vec<LabelId>, sqlx::Error> {
560        let mid = message_id.as_str();
561        let rows = sqlx::query!(
562            r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
563            mid,
564        )
565        .fetch_all(self.reader())
566        .await?;
567        Ok(rows
568            .into_iter()
569            .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
570            .collect())
571    }
572
573    /// Add a label to a message.
574    pub async fn add_message_label(
575        &self,
576        message_id: &MessageId,
577        label_id: &LabelId,
578    ) -> Result<(), sqlx::Error> {
579        let mid = message_id.as_str();
580        let lid = label_id.as_str();
581        sqlx::query!(
582            "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
583            mid,
584            lid,
585        )
586        .execute(self.writer())
587        .await?;
588        Ok(())
589    }
590
591    /// Remove a label from a message.
592    pub async fn remove_message_label(
593        &self,
594        message_id: &MessageId,
595        label_id: &LabelId,
596    ) -> Result<(), sqlx::Error> {
597        let mid = message_id.as_str();
598        let lid = label_id.as_str();
599        sqlx::query!(
600            "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
601            mid,
602            lid,
603        )
604        .execute(self.writer())
605        .await?;
606        Ok(())
607    }
608
609    /// Count total rows in the message_labels junction table.
610    pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
611        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
612            .fetch_one(self.reader())
613            .await?;
614        Ok(row.cnt as u32)
615    }
616
617    /// Mark a message as trashed (update flags).
618    pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
619        let mid = message_id.as_str();
620        let row = sqlx::query!(
621            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
622            mid,
623        )
624        .fetch_optional(self.reader())
625        .await?;
626
627        if let Some(r) = row {
628            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
629            flags.insert(MessageFlags::TRASH);
630            let flags_val = flags.bits() as i64;
631            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
632                .execute(self.writer())
633                .await?;
634        }
635        Ok(())
636    }
637
638    /// Get distinct contacts (name + email) from message senders, ordered by frequency.
639    pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
640        let lim = limit as i64;
641        let rows = sqlx::query_as::<_, (String, String)>(
642            r#"SELECT
643                COALESCE(from_name, '') as name,
644                from_email as email
645             FROM messages
646             WHERE from_email != ''
647             GROUP BY from_email
648             ORDER BY COUNT(*) DESC
649             LIMIT ?"#,
650        )
651        .bind(lim)
652        .fetch_all(self.reader())
653        .await?;
654        Ok(rows)
655    }
656
657    pub async fn list_subscriptions(
658        &self,
659        limit: u32,
660    ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
661        let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
662        let trash_flag = MessageFlags::TRASH.bits() as i64;
663        let spam_flag = MessageFlags::SPAM.bits() as i64;
664        let lim = limit as i64;
665
666        let rows = sqlx::query(
667            r#"WITH ranked AS (
668                SELECT
669                    id,
670                    account_id,
671                    provider_id,
672                    thread_id,
673                    from_name,
674                    from_email,
675                    subject,
676                    snippet,
677                    date,
678                    flags,
679                    has_attachments,
680                    size_bytes,
681                    unsubscribe_method,
682                    COUNT(*) OVER (
683                        PARTITION BY account_id, LOWER(from_email)
684                    ) AS message_count,
685                    ROW_NUMBER() OVER (
686                        PARTITION BY account_id, LOWER(from_email)
687                        ORDER BY date DESC, id DESC
688                    ) AS rn
689                FROM messages
690                WHERE from_email != ''
691                  AND unsubscribe_method IS NOT NULL
692                  AND unsubscribe_method != ?
693                  AND (flags & ?) = 0
694                  AND (flags & ?) = 0
695            )
696            SELECT
697                id,
698                account_id,
699                provider_id,
700                thread_id,
701                from_name,
702                from_email,
703                subject,
704                snippet,
705                date,
706                flags,
707                has_attachments,
708                size_bytes,
709                unsubscribe_method,
710                message_count
711            FROM ranked
712            WHERE rn = 1
713            ORDER BY date DESC, id DESC
714            LIMIT ?"#,
715        )
716        .bind(none_unsubscribe)
717        .bind(trash_flag)
718        .bind(spam_flag)
719        .bind(lim)
720        .fetch_all(self.reader())
721        .await?;
722
723        Ok(rows
724            .into_iter()
725            .map(|row| SubscriptionSummary {
726                account_id: AccountId::from_uuid(
727                    uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
728                ),
729                sender_name: row.get::<Option<String>, _>("from_name"),
730                sender_email: row.get::<String, _>("from_email"),
731                message_count: row.get::<i64, _>("message_count") as u32,
732                latest_message_id: MessageId::from_uuid(
733                    uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
734                ),
735                latest_provider_id: row.get::<String, _>("provider_id"),
736                latest_thread_id: ThreadId::from_uuid(
737                    uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
738                ),
739                latest_subject: row.get::<String, _>("subject"),
740                latest_snippet: row.get::<String, _>("snippet"),
741                latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
742                    .unwrap_or_default(),
743                latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
744                latest_has_attachments: row.get::<bool, _>("has_attachments"),
745                latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
746                unsubscribe: row
747                    .get::<Option<String>, _>("unsubscribe_method")
748                    .as_deref()
749                    .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
750                    .unwrap_or(UnsubscribeMethod::None),
751            })
752            .collect())
753    }
754}
755
756/// Shared helper to convert individual field values into an Envelope.
757/// Used by both message.rs and thread.rs queries since the `query!` macro
758/// returns different anonymous types for each call site.
759#[allow(clippy::too_many_arguments)]
760pub(crate) fn record_to_envelope(
761    id: &str,
762    account_id: &str,
763    provider_id: &str,
764    thread_id: &str,
765    message_id_header: Option<&str>,
766    in_reply_to: Option<&str>,
767    reference_headers: Option<&str>,
768    from_name: Option<&str>,
769    from_email: &str,
770    to_addrs: &str,
771    cc_addrs: &str,
772    bcc_addrs: &str,
773    subject: &str,
774    date: i64,
775    flags: i64,
776    snippet: &str,
777    has_attachments: bool,
778    size_bytes: i64,
779    unsubscribe_method: Option<&str>,
780    label_provider_ids: &str,
781) -> Envelope {
782    Envelope {
783        id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
784        account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
785        provider_id: provider_id.to_string(),
786        thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
787        message_id_header: message_id_header.map(|s| s.to_string()),
788        in_reply_to: in_reply_to.map(|s| s.to_string()),
789        references: reference_headers
790            .map(|r| serde_json::from_str(r).unwrap_or_default())
791            .unwrap_or_default(),
792        from: Address {
793            name: from_name.map(|s| s.to_string()),
794            email: from_email.to_string(),
795        },
796        to: serde_json::from_str(to_addrs).unwrap_or_default(),
797        cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
798        bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
799        subject: subject.to_string(),
800        date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
801        flags: MessageFlags::from_bits_truncate(flags as u32),
802        snippet: snippet.to_string(),
803        has_attachments,
804        size_bytes: size_bytes as u64,
805        unsubscribe: unsubscribe_method
806            .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
807            .unwrap_or(UnsubscribeMethod::None),
808        label_provider_ids: if label_provider_ids.is_empty() {
809            vec![]
810        } else {
811            label_provider_ids
812                .split('\u{1f}')
813                .filter(|provider_id| !provider_id.is_empty())
814                .map(str::to_string)
815                .collect()
816        },
817    }
818}