Skip to main content

mxr_store/
message.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4
5pub(crate) fn future_date_cutoff_timestamp() -> i64 {
6    (chrono::Utc::now() + chrono::Duration::days(1)).timestamp()
7}
8
9impl super::Store {
10    pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
11        let id = envelope.id.as_str();
12        let account_id = envelope.account_id.as_str();
13        let thread_id = envelope.thread_id.as_str();
14        let from_name = envelope.from.name.as_deref();
15        let to_addrs = serde_json::to_string(&envelope.to).unwrap();
16        let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
17        let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
18        let refs = serde_json::to_string(&envelope.references).unwrap();
19        let date = envelope.date.timestamp();
20        let flags = envelope.flags.bits() as i64;
21        let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
22        let has_attachments = envelope.has_attachments;
23        let size_bytes = envelope.size_bytes as i64;
24
25        sqlx::query!(
26            "INSERT INTO messages
27             (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
28              reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
29              subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
30             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31             ON CONFLICT(id) DO UPDATE SET
32                provider_id = excluded.provider_id,
33                thread_id = excluded.thread_id,
34                message_id_header = excluded.message_id_header,
35                in_reply_to = excluded.in_reply_to,
36                reference_headers = excluded.reference_headers,
37                from_name = excluded.from_name,
38                from_email = excluded.from_email,
39                to_addrs = excluded.to_addrs,
40                cc_addrs = excluded.cc_addrs,
41                bcc_addrs = excluded.bcc_addrs,
42                subject = excluded.subject,
43                date = excluded.date,
44                flags = excluded.flags,
45                snippet = excluded.snippet,
46                has_attachments = excluded.has_attachments,
47                size_bytes = excluded.size_bytes,
48                unsubscribe_method = excluded.unsubscribe_method",
49            id,
50            account_id,
51            envelope.provider_id,
52            thread_id,
53            envelope.message_id_header,
54            envelope.in_reply_to,
55            refs,
56            from_name,
57            envelope.from.email,
58            to_addrs,
59            cc_addrs,
60            bcc_addrs,
61            envelope.subject,
62            date,
63            flags,
64            envelope.snippet,
65            has_attachments,
66            size_bytes,
67            unsub,
68        )
69        .execute(self.writer())
70        .await?;
71
72        Ok(())
73    }
74
75    pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
76        let id_str = id.as_str();
77        let row = sqlx::query!(
78            r#"SELECT
79                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
80                thread_id as "thread_id!", message_id_header, in_reply_to,
81                reference_headers, from_name, from_email as "from_email!",
82                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
83                subject as "subject!", date as "date!", flags as "flags!",
84                snippet as "snippet!", has_attachments as "has_attachments!: bool",
85                size_bytes as "size_bytes!", unsubscribe_method,
86                COALESCE((
87                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
88                    FROM message_labels
89                    JOIN labels ON labels.id = message_labels.label_id
90                    WHERE message_labels.message_id = messages.id
91                ), '') as "label_provider_ids!: String"
92             FROM messages WHERE id = ?"#,
93            id_str,
94        )
95        .fetch_optional(self.reader())
96        .await?;
97
98        Ok(row.map(|r| {
99            record_to_envelope(
100                &r.id,
101                &r.account_id,
102                &r.provider_id,
103                &r.thread_id,
104                r.message_id_header.as_deref(),
105                r.in_reply_to.as_deref(),
106                r.reference_headers.as_deref(),
107                r.from_name.as_deref(),
108                &r.from_email,
109                &r.to_addrs,
110                &r.cc_addrs,
111                &r.bcc_addrs,
112                &r.subject,
113                r.date,
114                r.flags,
115                &r.snippet,
116                r.has_attachments,
117                r.size_bytes,
118                r.unsubscribe_method.as_deref(),
119                &r.label_provider_ids,
120            )
121        }))
122    }
123
124    pub async fn list_envelopes_by_label(
125        &self,
126        label_id: &LabelId,
127        limit: u32,
128        offset: u32,
129    ) -> Result<Vec<Envelope>, sqlx::Error> {
130        let lid = label_id.as_str();
131        let cutoff = future_date_cutoff_timestamp();
132        let lim = limit as i64;
133        let off = offset as i64;
134        let rows = sqlx::query!(
135            r#"SELECT
136                m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
137                m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
138                m.reference_headers, m.from_name, m.from_email as "from_email!",
139                m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
140                m.subject as "subject!", m.date as "date!", m.flags as "flags!",
141                m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
142                m.size_bytes as "size_bytes!", m.unsubscribe_method,
143                COALESCE((
144                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
145                    FROM message_labels
146                    JOIN labels ON labels.id = message_labels.label_id
147                    WHERE message_labels.message_id = m.id
148                ), '') as "label_provider_ids!: String"
149             FROM messages m
150             JOIN message_labels ml ON m.id = ml.message_id
151             WHERE ml.label_id = ?
152             ORDER BY CASE WHEN m.date > ? THEN 0 ELSE m.date END DESC, m.id DESC
153             LIMIT ? OFFSET ?"#,
154            lid,
155            cutoff,
156            lim,
157            off,
158        )
159        .fetch_all(self.reader())
160        .await?;
161
162        Ok(rows
163            .into_iter()
164            .map(|r| {
165                record_to_envelope(
166                    &r.id,
167                    &r.account_id,
168                    &r.provider_id,
169                    &r.thread_id,
170                    r.message_id_header.as_deref(),
171                    r.in_reply_to.as_deref(),
172                    r.reference_headers.as_deref(),
173                    r.from_name.as_deref(),
174                    &r.from_email,
175                    &r.to_addrs,
176                    &r.cc_addrs,
177                    &r.bcc_addrs,
178                    &r.subject,
179                    r.date,
180                    r.flags,
181                    &r.snippet,
182                    r.has_attachments,
183                    r.size_bytes,
184                    r.unsubscribe_method.as_deref(),
185                    &r.label_provider_ids,
186                )
187            })
188            .collect())
189    }
190
191    pub async fn list_envelopes_by_account(
192        &self,
193        account_id: &AccountId,
194        limit: u32,
195        offset: u32,
196    ) -> Result<Vec<Envelope>, sqlx::Error> {
197        let aid = account_id.as_str();
198        let cutoff = future_date_cutoff_timestamp();
199        let lim = limit as i64;
200        let off = offset as i64;
201        let rows = sqlx::query!(
202            r#"SELECT
203                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
204                thread_id as "thread_id!", message_id_header, in_reply_to,
205                reference_headers, from_name, from_email as "from_email!",
206                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
207                subject as "subject!", date as "date!", flags as "flags!",
208                snippet as "snippet!", has_attachments as "has_attachments!: bool",
209                size_bytes as "size_bytes!", unsubscribe_method,
210                COALESCE((
211                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
212                    FROM message_labels
213                    JOIN labels ON labels.id = message_labels.label_id
214                    WHERE message_labels.message_id = messages.id
215                ), '') as "label_provider_ids!: String"
216             FROM messages
217             WHERE account_id = ?
218             ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
219             LIMIT ? OFFSET ?"#,
220            aid,
221            cutoff,
222            lim,
223            off,
224        )
225        .fetch_all(self.reader())
226        .await?;
227
228        Ok(rows
229            .into_iter()
230            .map(|r| {
231                record_to_envelope(
232                    &r.id,
233                    &r.account_id,
234                    &r.provider_id,
235                    &r.thread_id,
236                    r.message_id_header.as_deref(),
237                    r.in_reply_to.as_deref(),
238                    r.reference_headers.as_deref(),
239                    r.from_name.as_deref(),
240                    &r.from_email,
241                    &r.to_addrs,
242                    &r.cc_addrs,
243                    &r.bcc_addrs,
244                    &r.subject,
245                    r.date,
246                    r.flags,
247                    &r.snippet,
248                    r.has_attachments,
249                    r.size_bytes,
250                    r.unsubscribe_method.as_deref(),
251                    &r.label_provider_ids,
252                )
253            })
254            .collect())
255    }
256
257    pub async fn list_envelopes_by_ids(
258        &self,
259        message_ids: &[MessageId],
260    ) -> Result<Vec<Envelope>, sqlx::Error> {
261        if message_ids.is_empty() {
262            return Ok(Vec::new());
263        }
264
265        let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
266        let sql = format!(
267            r#"SELECT
268                m.id as id, m.account_id as account_id, m.provider_id as provider_id,
269                m.thread_id as thread_id, m.message_id_header, m.in_reply_to,
270                m.reference_headers, m.from_name, m.from_email as from_email,
271                m.to_addrs as to_addrs, m.cc_addrs as cc_addrs, m.bcc_addrs as bcc_addrs,
272                m.subject as subject, m.date as date, m.flags as flags,
273                m.snippet as snippet, m.has_attachments as has_attachments,
274                m.size_bytes as size_bytes, m.unsubscribe_method,
275                COALESCE((
276                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
277                    FROM message_labels
278                    JOIN labels ON labels.id = message_labels.label_id
279                    WHERE message_labels.message_id = m.id
280                ), '') as label_provider_ids
281             FROM messages m
282             WHERE m.id IN ({})"#,
283            placeholders.join(", ")
284        );
285
286        let mut query = sqlx::query(&sql);
287        for message_id in message_ids {
288            query = query.bind(message_id.as_str());
289        }
290
291        let rows = query.fetch_all(self.reader()).await?;
292        let mut by_id = std::collections::HashMap::with_capacity(rows.len());
293        for row in rows {
294            let envelope = record_to_envelope(
295                row.get::<String, _>("id").as_str(),
296                row.get::<String, _>("account_id").as_str(),
297                row.get::<String, _>("provider_id").as_str(),
298                row.get::<String, _>("thread_id").as_str(),
299                row.get::<Option<String>, _>("message_id_header").as_deref(),
300                row.get::<Option<String>, _>("in_reply_to").as_deref(),
301                row.get::<Option<String>, _>("reference_headers").as_deref(),
302                row.get::<Option<String>, _>("from_name").as_deref(),
303                row.get::<String, _>("from_email").as_str(),
304                row.get::<String, _>("to_addrs").as_str(),
305                row.get::<String, _>("cc_addrs").as_str(),
306                row.get::<String, _>("bcc_addrs").as_str(),
307                row.get::<String, _>("subject").as_str(),
308                row.get::<i64, _>("date"),
309                row.get::<i64, _>("flags"),
310                row.get::<String, _>("snippet").as_str(),
311                row.get::<bool, _>("has_attachments"),
312                row.get::<i64, _>("size_bytes"),
313                row.get::<Option<String>, _>("unsubscribe_method")
314                    .as_deref(),
315                row.get::<String, _>("label_provider_ids").as_str(),
316            );
317            by_id.insert(envelope.id.clone(), envelope);
318        }
319
320        Ok(message_ids
321            .iter()
322            .filter_map(|message_id| by_id.remove(message_id))
323            .collect())
324    }
325
326    // Dynamic SQL -- kept as runtime query due to variable IN clause
327    pub async fn delete_messages_by_provider_ids(
328        &self,
329        account_id: &AccountId,
330        provider_ids: &[String],
331    ) -> Result<u64, sqlx::Error> {
332        if provider_ids.is_empty() {
333            return Ok(0);
334        }
335        let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
336        let sql = format!(
337            "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
338            placeholders.join(", ")
339        );
340        let mut query = sqlx::query(&sql).bind(account_id.as_str());
341        for pid in provider_ids {
342            query = query.bind(pid);
343        }
344        let result = query.execute(self.writer()).await?;
345        Ok(result.rows_affected())
346    }
347
348    pub async fn set_message_labels(
349        &self,
350        message_id: &MessageId,
351        label_ids: &[LabelId],
352    ) -> Result<(), sqlx::Error> {
353        let mid = message_id.as_str();
354        sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
355            .execute(self.writer())
356            .await?;
357
358        for label_id in label_ids {
359            let mid = message_id.as_str();
360            let lid = label_id.as_str();
361            sqlx::query!(
362                "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
363                mid,
364                lid,
365            )
366            .execute(self.writer())
367            .await?;
368        }
369
370        Ok(())
371    }
372
373    pub async fn update_message_thread_id(
374        &self,
375        message_id: &MessageId,
376        thread_id: &ThreadId,
377    ) -> Result<(), sqlx::Error> {
378        sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
379            .bind(thread_id.as_str())
380            .bind(message_id.as_str())
381            .execute(self.writer())
382            .await?;
383        Ok(())
384    }
385
386    pub async fn get_message_id_by_provider_id(
387        &self,
388        account_id: &AccountId,
389        provider_id: &str,
390    ) -> Result<Option<MessageId>, sqlx::Error> {
391        let aid = account_id.as_str();
392        let row = sqlx::query!(
393            r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
394            aid,
395            provider_id,
396        )
397        .fetch_optional(self.reader())
398        .await?;
399
400        Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
401    }
402
403    pub async fn count_messages_by_account(
404        &self,
405        account_id: &AccountId,
406    ) -> Result<u32, sqlx::Error> {
407        let aid = account_id.as_str();
408        let row = sqlx::query!(
409            r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
410            aid,
411        )
412        .fetch_one(self.reader())
413        .await?;
414
415        Ok(row.cnt as u32)
416    }
417
418    /// List all envelopes across all accounts, paginated. Used for reindexing.
419    pub async fn list_all_envelopes_paginated(
420        &self,
421        limit: u32,
422        offset: u32,
423    ) -> Result<Vec<Envelope>, sqlx::Error> {
424        let cutoff = future_date_cutoff_timestamp();
425        let lim = limit as i64;
426        let off = offset as i64;
427        let rows = sqlx::query!(
428            r#"SELECT
429                id as "id!", account_id as "account_id!", provider_id as "provider_id!",
430                thread_id as "thread_id!", message_id_header, in_reply_to,
431                reference_headers, from_name, from_email as "from_email!",
432                to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
433                subject as "subject!", date as "date!", flags as "flags!",
434                snippet as "snippet!", has_attachments as "has_attachments!: bool",
435                size_bytes as "size_bytes!", unsubscribe_method,
436                COALESCE((
437                    SELECT GROUP_CONCAT(labels.provider_id, char(31))
438                    FROM message_labels
439                    JOIN labels ON labels.id = message_labels.label_id
440                    WHERE message_labels.message_id = messages.id
441                ), '') as "label_provider_ids!: String"
442             FROM messages
443             ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
444             LIMIT ? OFFSET ?"#,
445            cutoff,
446            lim,
447            off,
448        )
449        .fetch_all(self.reader())
450        .await?;
451
452        Ok(rows
453            .into_iter()
454            .map(|r| {
455                record_to_envelope(
456                    &r.id,
457                    &r.account_id,
458                    &r.provider_id,
459                    &r.thread_id,
460                    r.message_id_header.as_deref(),
461                    r.in_reply_to.as_deref(),
462                    r.reference_headers.as_deref(),
463                    r.from_name.as_deref(),
464                    &r.from_email,
465                    &r.to_addrs,
466                    &r.cc_addrs,
467                    &r.bcc_addrs,
468                    &r.subject,
469                    r.date,
470                    r.flags,
471                    &r.snippet,
472                    r.has_attachments,
473                    r.size_bytes,
474                    r.unsubscribe_method.as_deref(),
475                    &r.label_provider_ids,
476                )
477            })
478            .collect())
479    }
480
481    /// Count all messages across all accounts. Used for reindexing.
482    pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
483        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
484            .fetch_one(self.reader())
485            .await?;
486        Ok(row.cnt as u32)
487    }
488
489    pub async fn update_flags(
490        &self,
491        message_id: &MessageId,
492        flags: MessageFlags,
493    ) -> Result<(), sqlx::Error> {
494        let mid = message_id.as_str();
495        let flags_val = flags.bits() as i64;
496        sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
497            .execute(self.writer())
498            .await?;
499        Ok(())
500    }
501
502    /// Set the read flag on a message.
503    pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
504        let mid = message_id.as_str();
505        let row = sqlx::query!(
506            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
507            mid,
508        )
509        .fetch_optional(self.reader())
510        .await?;
511
512        if let Some(r) = row {
513            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
514            if read {
515                flags.insert(MessageFlags::READ);
516            } else {
517                flags.remove(MessageFlags::READ);
518            }
519            let flags_val = flags.bits() as i64;
520            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
521                .execute(self.writer())
522                .await?;
523        }
524        Ok(())
525    }
526
527    /// Set the starred flag on a message.
528    pub async fn set_starred(
529        &self,
530        message_id: &MessageId,
531        starred: bool,
532    ) -> Result<(), sqlx::Error> {
533        let mid = message_id.as_str();
534        let row = sqlx::query!(
535            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
536            mid,
537        )
538        .fetch_optional(self.reader())
539        .await?;
540
541        if let Some(r) = row {
542            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
543            if starred {
544                flags.insert(MessageFlags::STARRED);
545            } else {
546                flags.remove(MessageFlags::STARRED);
547            }
548            let flags_val = flags.bits() as i64;
549            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
550                .execute(self.writer())
551                .await?;
552        }
553        Ok(())
554    }
555
556    /// Get the provider_id for a message.
557    pub async fn get_provider_id(
558        &self,
559        message_id: &MessageId,
560    ) -> Result<Option<String>, sqlx::Error> {
561        let mid = message_id.as_str();
562        let row = sqlx::query!(
563            r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
564            mid,
565        )
566        .fetch_optional(self.reader())
567        .await?;
568        Ok(row.map(|r| r.provider_id))
569    }
570
571    /// Get the label IDs for a message.
572    pub async fn get_message_label_ids(
573        &self,
574        message_id: &MessageId,
575    ) -> Result<Vec<LabelId>, sqlx::Error> {
576        let mid = message_id.as_str();
577        let rows = sqlx::query!(
578            r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
579            mid,
580        )
581        .fetch_all(self.reader())
582        .await?;
583        Ok(rows
584            .into_iter()
585            .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
586            .collect())
587    }
588
589    /// Add a label to a message.
590    pub async fn add_message_label(
591        &self,
592        message_id: &MessageId,
593        label_id: &LabelId,
594    ) -> Result<(), sqlx::Error> {
595        let mid = message_id.as_str();
596        let lid = label_id.as_str();
597        sqlx::query!(
598            "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
599            mid,
600            lid,
601        )
602        .execute(self.writer())
603        .await?;
604        Ok(())
605    }
606
607    /// Remove a label from a message.
608    pub async fn remove_message_label(
609        &self,
610        message_id: &MessageId,
611        label_id: &LabelId,
612    ) -> Result<(), sqlx::Error> {
613        let mid = message_id.as_str();
614        let lid = label_id.as_str();
615        sqlx::query!(
616            "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
617            mid,
618            lid,
619        )
620        .execute(self.writer())
621        .await?;
622        Ok(())
623    }
624
625    /// Count total rows in the message_labels junction table.
626    pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
627        let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
628            .fetch_one(self.reader())
629            .await?;
630        Ok(row.cnt as u32)
631    }
632
633    /// Mark a message as trashed (update flags).
634    pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
635        let mid = message_id.as_str();
636        let row = sqlx::query!(
637            r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
638            mid,
639        )
640        .fetch_optional(self.reader())
641        .await?;
642
643        if let Some(r) = row {
644            let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
645            flags.insert(MessageFlags::TRASH);
646            let flags_val = flags.bits() as i64;
647            sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
648                .execute(self.writer())
649                .await?;
650        }
651        Ok(())
652    }
653
654    /// Get distinct contacts (name + email) from message senders, ordered by frequency.
655    pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
656        let lim = limit as i64;
657        let rows = sqlx::query_as::<_, (String, String)>(
658            r#"SELECT
659                COALESCE(from_name, '') as name,
660                from_email as email
661             FROM messages
662             WHERE from_email != ''
663             GROUP BY from_email
664             ORDER BY COUNT(*) DESC
665             LIMIT ?"#,
666        )
667        .bind(lim)
668        .fetch_all(self.reader())
669        .await?;
670        Ok(rows)
671    }
672
673    pub async fn list_subscriptions(
674        &self,
675        limit: u32,
676    ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
677        let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
678        let trash_flag = MessageFlags::TRASH.bits() as i64;
679        let spam_flag = MessageFlags::SPAM.bits() as i64;
680        let cutoff = future_date_cutoff_timestamp();
681        let lim = limit as i64;
682
683        let rows = sqlx::query(
684            r#"WITH ranked AS (
685                SELECT
686                    id,
687                    account_id,
688                    provider_id,
689                    thread_id,
690                    from_name,
691                    from_email,
692                    subject,
693                    snippet,
694                    date,
695                    flags,
696                    has_attachments,
697                    size_bytes,
698                    unsubscribe_method,
699                    COUNT(*) OVER (
700                        PARTITION BY account_id, LOWER(from_email)
701                    ) AS message_count,
702                    ROW_NUMBER() OVER (
703                        PARTITION BY account_id, LOWER(from_email)
704                        ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
705                    ) AS rn
706                FROM messages
707                WHERE from_email != ''
708                  AND unsubscribe_method IS NOT NULL
709                  AND unsubscribe_method != ?
710                  AND (flags & ?) = 0
711                  AND (flags & ?) = 0
712            )
713            SELECT
714                id,
715                account_id,
716                provider_id,
717                thread_id,
718                from_name,
719                from_email,
720                subject,
721                snippet,
722                date,
723                flags,
724                has_attachments,
725                size_bytes,
726                unsubscribe_method,
727                message_count
728            FROM ranked
729            WHERE rn = 1
730            ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
731            LIMIT ?"#,
732        )
733        .bind(cutoff)
734        .bind(none_unsubscribe)
735        .bind(trash_flag)
736        .bind(spam_flag)
737        .bind(cutoff)
738        .bind(lim)
739        .fetch_all(self.reader())
740        .await?;
741
742        Ok(rows
743            .into_iter()
744            .map(|row| SubscriptionSummary {
745                account_id: AccountId::from_uuid(
746                    uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
747                ),
748                sender_name: row.get::<Option<String>, _>("from_name"),
749                sender_email: row.get::<String, _>("from_email"),
750                message_count: row.get::<i64, _>("message_count") as u32,
751                latest_message_id: MessageId::from_uuid(
752                    uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
753                ),
754                latest_provider_id: row.get::<String, _>("provider_id"),
755                latest_thread_id: ThreadId::from_uuid(
756                    uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
757                ),
758                latest_subject: row.get::<String, _>("subject"),
759                latest_snippet: row.get::<String, _>("snippet"),
760                latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
761                    .unwrap_or_default(),
762                latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
763                latest_has_attachments: row.get::<bool, _>("has_attachments"),
764                latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
765                unsubscribe: row
766                    .get::<Option<String>, _>("unsubscribe_method")
767                    .as_deref()
768                    .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
769                    .unwrap_or(UnsubscribeMethod::None),
770            })
771            .collect())
772    }
773}
774
775/// Shared helper to convert individual field values into an Envelope.
776/// Used by both message.rs and thread.rs queries since the `query!` macro
777/// returns different anonymous types for each call site.
778#[allow(clippy::too_many_arguments)]
779pub(crate) fn record_to_envelope(
780    id: &str,
781    account_id: &str,
782    provider_id: &str,
783    thread_id: &str,
784    message_id_header: Option<&str>,
785    in_reply_to: Option<&str>,
786    reference_headers: Option<&str>,
787    from_name: Option<&str>,
788    from_email: &str,
789    to_addrs: &str,
790    cc_addrs: &str,
791    bcc_addrs: &str,
792    subject: &str,
793    date: i64,
794    flags: i64,
795    snippet: &str,
796    has_attachments: bool,
797    size_bytes: i64,
798    unsubscribe_method: Option<&str>,
799    label_provider_ids: &str,
800) -> Envelope {
801    Envelope {
802        id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
803        account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
804        provider_id: provider_id.to_string(),
805        thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
806        message_id_header: message_id_header.map(|s| s.to_string()),
807        in_reply_to: in_reply_to.map(|s| s.to_string()),
808        references: reference_headers
809            .map(|r| serde_json::from_str(r).unwrap_or_default())
810            .unwrap_or_default(),
811        from: Address {
812            name: from_name.map(|s| s.to_string()),
813            email: from_email.to_string(),
814        },
815        to: serde_json::from_str(to_addrs).unwrap_or_default(),
816        cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
817        bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
818        subject: subject.to_string(),
819        date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
820        flags: MessageFlags::from_bits_truncate(flags as u32),
821        snippet: snippet.to_string(),
822        has_attachments,
823        size_bytes: size_bytes as u64,
824        unsubscribe: unsubscribe_method
825            .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
826            .unwrap_or(UnsubscribeMethod::None),
827        label_provider_ids: if label_provider_ids.is_empty() {
828            vec![]
829        } else {
830            label_provider_ids
831                .split('\u{1f}')
832                .filter(|provider_id| !provider_id.is_empty())
833                .map(str::to_string)
834                .collect()
835        },
836    }
837}