1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4
5pub(crate) fn future_date_cutoff_timestamp() -> i64 {
6 (chrono::Utc::now() + chrono::Duration::days(1)).timestamp()
7}
8
9impl super::Store {
10 pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
11 let id = envelope.id.as_str();
12 let account_id = envelope.account_id.as_str();
13 let thread_id = envelope.thread_id.as_str();
14 let from_name = envelope.from.name.as_deref();
15 let to_addrs = serde_json::to_string(&envelope.to).unwrap();
16 let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
17 let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
18 let refs = serde_json::to_string(&envelope.references).unwrap();
19 let date = envelope.date.timestamp();
20 let flags = envelope.flags.bits() as i64;
21 let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
22 let has_attachments = envelope.has_attachments;
23 let size_bytes = envelope.size_bytes as i64;
24
25 sqlx::query!(
26 "INSERT INTO messages
27 (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
28 reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
29 subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
30 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31 ON CONFLICT(id) DO UPDATE SET
32 provider_id = excluded.provider_id,
33 thread_id = excluded.thread_id,
34 message_id_header = excluded.message_id_header,
35 in_reply_to = excluded.in_reply_to,
36 reference_headers = excluded.reference_headers,
37 from_name = excluded.from_name,
38 from_email = excluded.from_email,
39 to_addrs = excluded.to_addrs,
40 cc_addrs = excluded.cc_addrs,
41 bcc_addrs = excluded.bcc_addrs,
42 subject = excluded.subject,
43 date = excluded.date,
44 flags = excluded.flags,
45 snippet = excluded.snippet,
46 has_attachments = excluded.has_attachments,
47 size_bytes = excluded.size_bytes,
48 unsubscribe_method = excluded.unsubscribe_method",
49 id,
50 account_id,
51 envelope.provider_id,
52 thread_id,
53 envelope.message_id_header,
54 envelope.in_reply_to,
55 refs,
56 from_name,
57 envelope.from.email,
58 to_addrs,
59 cc_addrs,
60 bcc_addrs,
61 envelope.subject,
62 date,
63 flags,
64 envelope.snippet,
65 has_attachments,
66 size_bytes,
67 unsub,
68 )
69 .execute(self.writer())
70 .await?;
71
72 Ok(())
73 }
74
75 pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
76 let id_str = id.as_str();
77 let row = sqlx::query!(
78 r#"SELECT
79 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
80 thread_id as "thread_id!", message_id_header, in_reply_to,
81 reference_headers, from_name, from_email as "from_email!",
82 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
83 subject as "subject!", date as "date!", flags as "flags!",
84 snippet as "snippet!", has_attachments as "has_attachments!: bool",
85 size_bytes as "size_bytes!", unsubscribe_method,
86 COALESCE((
87 SELECT GROUP_CONCAT(labels.provider_id, char(31))
88 FROM message_labels
89 JOIN labels ON labels.id = message_labels.label_id
90 WHERE message_labels.message_id = messages.id
91 ), '') as "label_provider_ids!: String"
92 FROM messages WHERE id = ?"#,
93 id_str,
94 )
95 .fetch_optional(self.reader())
96 .await?;
97
98 Ok(row.map(|r| {
99 record_to_envelope(
100 &r.id,
101 &r.account_id,
102 &r.provider_id,
103 &r.thread_id,
104 r.message_id_header.as_deref(),
105 r.in_reply_to.as_deref(),
106 r.reference_headers.as_deref(),
107 r.from_name.as_deref(),
108 &r.from_email,
109 &r.to_addrs,
110 &r.cc_addrs,
111 &r.bcc_addrs,
112 &r.subject,
113 r.date,
114 r.flags,
115 &r.snippet,
116 r.has_attachments,
117 r.size_bytes,
118 r.unsubscribe_method.as_deref(),
119 &r.label_provider_ids,
120 )
121 }))
122 }
123
124 pub async fn list_envelopes_by_label(
125 &self,
126 label_id: &LabelId,
127 limit: u32,
128 offset: u32,
129 ) -> Result<Vec<Envelope>, sqlx::Error> {
130 let lid = label_id.as_str();
131 let cutoff = future_date_cutoff_timestamp();
132 let lim = limit as i64;
133 let off = offset as i64;
134 let rows = sqlx::query!(
135 r#"SELECT
136 m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
137 m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
138 m.reference_headers, m.from_name, m.from_email as "from_email!",
139 m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
140 m.subject as "subject!", m.date as "date!", m.flags as "flags!",
141 m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
142 m.size_bytes as "size_bytes!", m.unsubscribe_method,
143 COALESCE((
144 SELECT GROUP_CONCAT(labels.provider_id, char(31))
145 FROM message_labels
146 JOIN labels ON labels.id = message_labels.label_id
147 WHERE message_labels.message_id = m.id
148 ), '') as "label_provider_ids!: String"
149 FROM messages m
150 JOIN message_labels ml ON m.id = ml.message_id
151 WHERE ml.label_id = ?
152 ORDER BY CASE WHEN m.date > ? THEN 0 ELSE m.date END DESC, m.id DESC
153 LIMIT ? OFFSET ?"#,
154 lid,
155 cutoff,
156 lim,
157 off,
158 )
159 .fetch_all(self.reader())
160 .await?;
161
162 Ok(rows
163 .into_iter()
164 .map(|r| {
165 record_to_envelope(
166 &r.id,
167 &r.account_id,
168 &r.provider_id,
169 &r.thread_id,
170 r.message_id_header.as_deref(),
171 r.in_reply_to.as_deref(),
172 r.reference_headers.as_deref(),
173 r.from_name.as_deref(),
174 &r.from_email,
175 &r.to_addrs,
176 &r.cc_addrs,
177 &r.bcc_addrs,
178 &r.subject,
179 r.date,
180 r.flags,
181 &r.snippet,
182 r.has_attachments,
183 r.size_bytes,
184 r.unsubscribe_method.as_deref(),
185 &r.label_provider_ids,
186 )
187 })
188 .collect())
189 }
190
191 pub async fn list_envelopes_by_account(
192 &self,
193 account_id: &AccountId,
194 limit: u32,
195 offset: u32,
196 ) -> Result<Vec<Envelope>, sqlx::Error> {
197 let aid = account_id.as_str();
198 let cutoff = future_date_cutoff_timestamp();
199 let lim = limit as i64;
200 let off = offset as i64;
201 let rows = sqlx::query!(
202 r#"SELECT
203 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
204 thread_id as "thread_id!", message_id_header, in_reply_to,
205 reference_headers, from_name, from_email as "from_email!",
206 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
207 subject as "subject!", date as "date!", flags as "flags!",
208 snippet as "snippet!", has_attachments as "has_attachments!: bool",
209 size_bytes as "size_bytes!", unsubscribe_method,
210 COALESCE((
211 SELECT GROUP_CONCAT(labels.provider_id, char(31))
212 FROM message_labels
213 JOIN labels ON labels.id = message_labels.label_id
214 WHERE message_labels.message_id = messages.id
215 ), '') as "label_provider_ids!: String"
216 FROM messages
217 WHERE account_id = ?
218 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
219 LIMIT ? OFFSET ?"#,
220 aid,
221 cutoff,
222 lim,
223 off,
224 )
225 .fetch_all(self.reader())
226 .await?;
227
228 Ok(rows
229 .into_iter()
230 .map(|r| {
231 record_to_envelope(
232 &r.id,
233 &r.account_id,
234 &r.provider_id,
235 &r.thread_id,
236 r.message_id_header.as_deref(),
237 r.in_reply_to.as_deref(),
238 r.reference_headers.as_deref(),
239 r.from_name.as_deref(),
240 &r.from_email,
241 &r.to_addrs,
242 &r.cc_addrs,
243 &r.bcc_addrs,
244 &r.subject,
245 r.date,
246 r.flags,
247 &r.snippet,
248 r.has_attachments,
249 r.size_bytes,
250 r.unsubscribe_method.as_deref(),
251 &r.label_provider_ids,
252 )
253 })
254 .collect())
255 }
256
257 pub async fn list_envelopes_by_ids(
258 &self,
259 message_ids: &[MessageId],
260 ) -> Result<Vec<Envelope>, sqlx::Error> {
261 if message_ids.is_empty() {
262 return Ok(Vec::new());
263 }
264
265 let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
266 let sql = format!(
267 r#"SELECT
268 m.id as id, m.account_id as account_id, m.provider_id as provider_id,
269 m.thread_id as thread_id, m.message_id_header, m.in_reply_to,
270 m.reference_headers, m.from_name, m.from_email as from_email,
271 m.to_addrs as to_addrs, m.cc_addrs as cc_addrs, m.bcc_addrs as bcc_addrs,
272 m.subject as subject, m.date as date, m.flags as flags,
273 m.snippet as snippet, m.has_attachments as has_attachments,
274 m.size_bytes as size_bytes, m.unsubscribe_method,
275 COALESCE((
276 SELECT GROUP_CONCAT(labels.provider_id, char(31))
277 FROM message_labels
278 JOIN labels ON labels.id = message_labels.label_id
279 WHERE message_labels.message_id = m.id
280 ), '') as label_provider_ids
281 FROM messages m
282 WHERE m.id IN ({})"#,
283 placeholders.join(", ")
284 );
285
286 let mut query = sqlx::query(&sql);
287 for message_id in message_ids {
288 query = query.bind(message_id.as_str());
289 }
290
291 let rows = query.fetch_all(self.reader()).await?;
292 let mut by_id = std::collections::HashMap::with_capacity(rows.len());
293 for row in rows {
294 let id = row.get::<String, _>(0);
295 let account_id = row.get::<String, _>(1);
296 let provider_id = row.get::<String, _>(2);
297 let thread_id = row.get::<String, _>(3);
298 let message_id_header = row.get::<Option<String>, _>(4);
299 let in_reply_to = row.get::<Option<String>, _>(5);
300 let reference_headers = row.get::<Option<String>, _>(6);
301 let from_name = row.get::<Option<String>, _>(7);
302 let from_email = row.get::<String, _>(8);
303 let to_addrs = row.get::<String, _>(9);
304 let cc_addrs = row.get::<String, _>(10);
305 let bcc_addrs = row.get::<String, _>(11);
306 let subject = row.get::<String, _>(12);
307 let date = row.get::<i64, _>(13);
308 let flags = row.get::<i64, _>(14);
309 let snippet = row.get::<String, _>(15);
310 let has_attachments = row.get::<bool, _>(16);
311 let size_bytes = row.get::<i64, _>(17);
312 let unsubscribe_method = row.get::<Option<String>, _>(18);
313 let label_provider_ids = row.get::<String, _>(19);
314 let envelope = record_to_envelope(
315 &id,
316 &account_id,
317 &provider_id,
318 &thread_id,
319 message_id_header.as_deref(),
320 in_reply_to.as_deref(),
321 reference_headers.as_deref(),
322 from_name.as_deref(),
323 &from_email,
324 &to_addrs,
325 &cc_addrs,
326 &bcc_addrs,
327 &subject,
328 date,
329 flags,
330 &snippet,
331 has_attachments,
332 size_bytes,
333 unsubscribe_method.as_deref(),
334 &label_provider_ids,
335 );
336 by_id.insert(envelope.id.clone(), envelope);
337 }
338
339 Ok(message_ids
340 .iter()
341 .filter_map(|message_id| by_id.remove(message_id))
342 .collect())
343 }
344
345 pub async fn delete_messages_by_provider_ids(
347 &self,
348 account_id: &AccountId,
349 provider_ids: &[String],
350 ) -> Result<u64, sqlx::Error> {
351 if provider_ids.is_empty() {
352 return Ok(0);
353 }
354 let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
355 let sql = format!(
356 "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
357 placeholders.join(", ")
358 );
359 let mut query = sqlx::query(&sql).bind(account_id.as_str());
360 for pid in provider_ids {
361 query = query.bind(pid);
362 }
363 let result = query.execute(self.writer()).await?;
364 Ok(result.rows_affected())
365 }
366
367 pub async fn set_message_labels(
368 &self,
369 message_id: &MessageId,
370 label_ids: &[LabelId],
371 ) -> Result<(), sqlx::Error> {
372 let mid = message_id.as_str();
373 sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
374 .execute(self.writer())
375 .await?;
376
377 for label_id in label_ids {
378 let mid = message_id.as_str();
379 let lid = label_id.as_str();
380 sqlx::query!(
381 "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
382 mid,
383 lid,
384 )
385 .execute(self.writer())
386 .await?;
387 }
388
389 Ok(())
390 }
391
392 pub async fn update_message_thread_id(
393 &self,
394 message_id: &MessageId,
395 thread_id: &ThreadId,
396 ) -> Result<(), sqlx::Error> {
397 sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
398 .bind(thread_id.as_str())
399 .bind(message_id.as_str())
400 .execute(self.writer())
401 .await?;
402 Ok(())
403 }
404
405 pub async fn get_message_id_by_provider_id(
406 &self,
407 account_id: &AccountId,
408 provider_id: &str,
409 ) -> Result<Option<MessageId>, sqlx::Error> {
410 let aid = account_id.as_str();
411 let row = sqlx::query!(
412 r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
413 aid,
414 provider_id,
415 )
416 .fetch_optional(self.reader())
417 .await?;
418
419 Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
420 }
421
422 pub async fn count_messages_by_account(
423 &self,
424 account_id: &AccountId,
425 ) -> Result<u32, sqlx::Error> {
426 let aid = account_id.as_str();
427 let row = sqlx::query!(
428 r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
429 aid,
430 )
431 .fetch_one(self.reader())
432 .await?;
433
434 Ok(row.cnt as u32)
435 }
436
437 pub async fn list_all_envelopes_paginated(
439 &self,
440 limit: u32,
441 offset: u32,
442 ) -> Result<Vec<Envelope>, sqlx::Error> {
443 let cutoff = future_date_cutoff_timestamp();
444 let lim = limit as i64;
445 let off = offset as i64;
446 let rows = sqlx::query!(
447 r#"SELECT
448 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
449 thread_id as "thread_id!", message_id_header, in_reply_to,
450 reference_headers, from_name, from_email as "from_email!",
451 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
452 subject as "subject!", date as "date!", flags as "flags!",
453 snippet as "snippet!", has_attachments as "has_attachments!: bool",
454 size_bytes as "size_bytes!", unsubscribe_method,
455 COALESCE((
456 SELECT GROUP_CONCAT(labels.provider_id, char(31))
457 FROM message_labels
458 JOIN labels ON labels.id = message_labels.label_id
459 WHERE message_labels.message_id = messages.id
460 ), '') as "label_provider_ids!: String"
461 FROM messages
462 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
463 LIMIT ? OFFSET ?"#,
464 cutoff,
465 lim,
466 off,
467 )
468 .fetch_all(self.reader())
469 .await?;
470
471 Ok(rows
472 .into_iter()
473 .map(|r| {
474 record_to_envelope(
475 &r.id,
476 &r.account_id,
477 &r.provider_id,
478 &r.thread_id,
479 r.message_id_header.as_deref(),
480 r.in_reply_to.as_deref(),
481 r.reference_headers.as_deref(),
482 r.from_name.as_deref(),
483 &r.from_email,
484 &r.to_addrs,
485 &r.cc_addrs,
486 &r.bcc_addrs,
487 &r.subject,
488 r.date,
489 r.flags,
490 &r.snippet,
491 r.has_attachments,
492 r.size_bytes,
493 r.unsubscribe_method.as_deref(),
494 &r.label_provider_ids,
495 )
496 })
497 .collect())
498 }
499
500 pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
502 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
503 .fetch_one(self.reader())
504 .await?;
505 Ok(row.cnt as u32)
506 }
507
508 pub async fn update_flags(
509 &self,
510 message_id: &MessageId,
511 flags: MessageFlags,
512 ) -> Result<(), sqlx::Error> {
513 let mid = message_id.as_str();
514 let flags_val = flags.bits() as i64;
515 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
516 .execute(self.writer())
517 .await?;
518 Ok(())
519 }
520
521 pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
523 let mid = message_id.as_str();
524 let row = sqlx::query!(
525 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
526 mid,
527 )
528 .fetch_optional(self.reader())
529 .await?;
530
531 if let Some(r) = row {
532 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
533 if read {
534 flags.insert(MessageFlags::READ);
535 } else {
536 flags.remove(MessageFlags::READ);
537 }
538 let flags_val = flags.bits() as i64;
539 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
540 .execute(self.writer())
541 .await?;
542 }
543 Ok(())
544 }
545
546 pub async fn set_starred(
548 &self,
549 message_id: &MessageId,
550 starred: bool,
551 ) -> Result<(), sqlx::Error> {
552 let mid = message_id.as_str();
553 let row = sqlx::query!(
554 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
555 mid,
556 )
557 .fetch_optional(self.reader())
558 .await?;
559
560 if let Some(r) = row {
561 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
562 if starred {
563 flags.insert(MessageFlags::STARRED);
564 } else {
565 flags.remove(MessageFlags::STARRED);
566 }
567 let flags_val = flags.bits() as i64;
568 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
569 .execute(self.writer())
570 .await?;
571 }
572 Ok(())
573 }
574
575 pub async fn get_provider_id(
577 &self,
578 message_id: &MessageId,
579 ) -> Result<Option<String>, sqlx::Error> {
580 let mid = message_id.as_str();
581 let row = sqlx::query!(
582 r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
583 mid,
584 )
585 .fetch_optional(self.reader())
586 .await?;
587 Ok(row.map(|r| r.provider_id))
588 }
589
590 pub async fn get_message_label_ids(
592 &self,
593 message_id: &MessageId,
594 ) -> Result<Vec<LabelId>, sqlx::Error> {
595 let mid = message_id.as_str();
596 let rows = sqlx::query!(
597 r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
598 mid,
599 )
600 .fetch_all(self.reader())
601 .await?;
602 Ok(rows
603 .into_iter()
604 .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
605 .collect())
606 }
607
608 pub async fn add_message_label(
610 &self,
611 message_id: &MessageId,
612 label_id: &LabelId,
613 ) -> Result<(), sqlx::Error> {
614 let mid = message_id.as_str();
615 let lid = label_id.as_str();
616 sqlx::query!(
617 "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
618 mid,
619 lid,
620 )
621 .execute(self.writer())
622 .await?;
623 Ok(())
624 }
625
626 pub async fn remove_message_label(
628 &self,
629 message_id: &MessageId,
630 label_id: &LabelId,
631 ) -> Result<(), sqlx::Error> {
632 let mid = message_id.as_str();
633 let lid = label_id.as_str();
634 sqlx::query!(
635 "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
636 mid,
637 lid,
638 )
639 .execute(self.writer())
640 .await?;
641 Ok(())
642 }
643
644 pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
646 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
647 .fetch_one(self.reader())
648 .await?;
649 Ok(row.cnt as u32)
650 }
651
652 pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
654 let mid = message_id.as_str();
655 let row = sqlx::query!(
656 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
657 mid,
658 )
659 .fetch_optional(self.reader())
660 .await?;
661
662 if let Some(r) = row {
663 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
664 flags.insert(MessageFlags::TRASH);
665 let flags_val = flags.bits() as i64;
666 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
667 .execute(self.writer())
668 .await?;
669 }
670 Ok(())
671 }
672
673 pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
675 let lim = limit as i64;
676 let rows = sqlx::query_as::<_, (String, String)>(
677 r#"SELECT
678 COALESCE(from_name, '') as name,
679 from_email as email
680 FROM messages
681 WHERE from_email != ''
682 GROUP BY from_email
683 ORDER BY COUNT(*) DESC
684 LIMIT ?"#,
685 )
686 .bind(lim)
687 .fetch_all(self.reader())
688 .await?;
689 Ok(rows)
690 }
691
692 pub async fn list_subscriptions(
693 &self,
694 limit: u32,
695 ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
696 let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
697 let trash_flag = MessageFlags::TRASH.bits() as i64;
698 let spam_flag = MessageFlags::SPAM.bits() as i64;
699 let cutoff = future_date_cutoff_timestamp();
700 let lim = limit as i64;
701
702 let rows = sqlx::query(
703 r#"WITH ranked AS (
704 SELECT
705 id,
706 account_id,
707 provider_id,
708 thread_id,
709 from_name,
710 from_email,
711 subject,
712 snippet,
713 date,
714 flags,
715 has_attachments,
716 size_bytes,
717 unsubscribe_method,
718 COUNT(*) OVER (
719 PARTITION BY account_id, LOWER(from_email)
720 ) AS message_count,
721 ROW_NUMBER() OVER (
722 PARTITION BY account_id, LOWER(from_email)
723 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
724 ) AS rn
725 FROM messages
726 WHERE from_email != ''
727 AND unsubscribe_method IS NOT NULL
728 AND unsubscribe_method != ?
729 AND (flags & ?) = 0
730 AND (flags & ?) = 0
731 )
732 SELECT
733 id,
734 account_id,
735 provider_id,
736 thread_id,
737 from_name,
738 from_email,
739 subject,
740 snippet,
741 date,
742 flags,
743 has_attachments,
744 size_bytes,
745 unsubscribe_method,
746 message_count
747 FROM ranked
748 WHERE rn = 1
749 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
750 LIMIT ?"#,
751 )
752 .bind(none_unsubscribe)
753 .bind(trash_flag)
754 .bind(spam_flag)
755 .bind(cutoff)
756 .bind(cutoff)
757 .bind(lim)
758 .fetch_all(self.reader())
759 .await?;
760
761 Ok(rows
762 .into_iter()
763 .map(|row| SubscriptionSummary {
764 account_id: AccountId::from_uuid(
765 uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
766 ),
767 sender_name: row.get::<Option<String>, _>("from_name"),
768 sender_email: row.get::<String, _>("from_email"),
769 message_count: row.get::<i64, _>("message_count") as u32,
770 latest_message_id: MessageId::from_uuid(
771 uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
772 ),
773 latest_provider_id: row.get::<String, _>("provider_id"),
774 latest_thread_id: ThreadId::from_uuid(
775 uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
776 ),
777 latest_subject: row.get::<String, _>("subject"),
778 latest_snippet: row.get::<String, _>("snippet"),
779 latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
780 .unwrap_or_default(),
781 latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
782 latest_has_attachments: row.get::<bool, _>("has_attachments"),
783 latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
784 unsubscribe: row
785 .get::<Option<String>, _>("unsubscribe_method")
786 .as_deref()
787 .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
788 .unwrap_or(UnsubscribeMethod::None),
789 })
790 .collect())
791 }
792}
793
794#[allow(clippy::too_many_arguments)]
798pub(crate) fn record_to_envelope(
799 id: &str,
800 account_id: &str,
801 provider_id: &str,
802 thread_id: &str,
803 message_id_header: Option<&str>,
804 in_reply_to: Option<&str>,
805 reference_headers: Option<&str>,
806 from_name: Option<&str>,
807 from_email: &str,
808 to_addrs: &str,
809 cc_addrs: &str,
810 bcc_addrs: &str,
811 subject: &str,
812 date: i64,
813 flags: i64,
814 snippet: &str,
815 has_attachments: bool,
816 size_bytes: i64,
817 unsubscribe_method: Option<&str>,
818 label_provider_ids: &str,
819) -> Envelope {
820 Envelope {
821 id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
822 account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
823 provider_id: provider_id.to_string(),
824 thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
825 message_id_header: message_id_header.map(|s| s.to_string()),
826 in_reply_to: in_reply_to.map(|s| s.to_string()),
827 references: reference_headers
828 .map(|r| serde_json::from_str(r).unwrap_or_default())
829 .unwrap_or_default(),
830 from: Address {
831 name: from_name.map(|s| s.to_string()),
832 email: from_email.to_string(),
833 },
834 to: serde_json::from_str(to_addrs).unwrap_or_default(),
835 cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
836 bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
837 subject: subject.to_string(),
838 date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
839 flags: MessageFlags::from_bits_truncate(flags as u32),
840 snippet: snippet.to_string(),
841 has_attachments,
842 size_bytes: size_bytes as u64,
843 unsubscribe: unsubscribe_method
844 .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
845 .unwrap_or(UnsubscribeMethod::None),
846 label_provider_ids: if label_provider_ids.is_empty() {
847 vec![]
848 } else {
849 label_provider_ids
850 .split('\u{1f}')
851 .filter(|provider_id| !provider_id.is_empty())
852 .map(str::to_string)
853 .collect()
854 },
855 }
856}