1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4
5pub(crate) fn future_date_cutoff_timestamp() -> i64 {
6 (chrono::Utc::now() + chrono::Duration::days(1)).timestamp()
7}
8
9impl super::Store {
10 pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
11 let id = envelope.id.as_str();
12 let account_id = envelope.account_id.as_str();
13 let thread_id = envelope.thread_id.as_str();
14 let from_name = envelope.from.name.as_deref();
15 let to_addrs = serde_json::to_string(&envelope.to).unwrap();
16 let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
17 let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
18 let refs = serde_json::to_string(&envelope.references).unwrap();
19 let date = envelope.date.timestamp();
20 let flags = envelope.flags.bits() as i64;
21 let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
22 let has_attachments = envelope.has_attachments;
23 let size_bytes = envelope.size_bytes as i64;
24
25 sqlx::query!(
26 "INSERT INTO messages
27 (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
28 reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
29 subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
30 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31 ON CONFLICT(id) DO UPDATE SET
32 provider_id = excluded.provider_id,
33 thread_id = excluded.thread_id,
34 message_id_header = excluded.message_id_header,
35 in_reply_to = excluded.in_reply_to,
36 reference_headers = excluded.reference_headers,
37 from_name = excluded.from_name,
38 from_email = excluded.from_email,
39 to_addrs = excluded.to_addrs,
40 cc_addrs = excluded.cc_addrs,
41 bcc_addrs = excluded.bcc_addrs,
42 subject = excluded.subject,
43 date = excluded.date,
44 flags = excluded.flags,
45 snippet = excluded.snippet,
46 has_attachments = excluded.has_attachments,
47 size_bytes = excluded.size_bytes,
48 unsubscribe_method = excluded.unsubscribe_method",
49 id,
50 account_id,
51 envelope.provider_id,
52 thread_id,
53 envelope.message_id_header,
54 envelope.in_reply_to,
55 refs,
56 from_name,
57 envelope.from.email,
58 to_addrs,
59 cc_addrs,
60 bcc_addrs,
61 envelope.subject,
62 date,
63 flags,
64 envelope.snippet,
65 has_attachments,
66 size_bytes,
67 unsub,
68 )
69 .execute(self.writer())
70 .await?;
71
72 Ok(())
73 }
74
75 pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
76 let id_str = id.as_str();
77 let row = sqlx::query!(
78 r#"SELECT
79 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
80 thread_id as "thread_id!", message_id_header, in_reply_to,
81 reference_headers, from_name, from_email as "from_email!",
82 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
83 subject as "subject!", date as "date!", flags as "flags!",
84 snippet as "snippet!", has_attachments as "has_attachments!: bool",
85 size_bytes as "size_bytes!", unsubscribe_method,
86 COALESCE((
87 SELECT GROUP_CONCAT(labels.provider_id, char(31))
88 FROM message_labels
89 JOIN labels ON labels.id = message_labels.label_id
90 WHERE message_labels.message_id = messages.id
91 ), '') as "label_provider_ids!: String"
92 FROM messages WHERE id = ?"#,
93 id_str,
94 )
95 .fetch_optional(self.reader())
96 .await?;
97
98 Ok(row.map(|r| {
99 record_to_envelope(
100 &r.id,
101 &r.account_id,
102 &r.provider_id,
103 &r.thread_id,
104 r.message_id_header.as_deref(),
105 r.in_reply_to.as_deref(),
106 r.reference_headers.as_deref(),
107 r.from_name.as_deref(),
108 &r.from_email,
109 &r.to_addrs,
110 &r.cc_addrs,
111 &r.bcc_addrs,
112 &r.subject,
113 r.date,
114 r.flags,
115 &r.snippet,
116 r.has_attachments,
117 r.size_bytes,
118 r.unsubscribe_method.as_deref(),
119 &r.label_provider_ids,
120 )
121 }))
122 }
123
124 pub async fn list_envelopes_by_label(
125 &self,
126 label_id: &LabelId,
127 limit: u32,
128 offset: u32,
129 ) -> Result<Vec<Envelope>, sqlx::Error> {
130 let lid = label_id.as_str();
131 let cutoff = future_date_cutoff_timestamp();
132 let lim = limit as i64;
133 let off = offset as i64;
134 let rows = sqlx::query!(
135 r#"SELECT
136 m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
137 m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
138 m.reference_headers, m.from_name, m.from_email as "from_email!",
139 m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
140 m.subject as "subject!", m.date as "date!", m.flags as "flags!",
141 m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
142 m.size_bytes as "size_bytes!", m.unsubscribe_method,
143 COALESCE((
144 SELECT GROUP_CONCAT(labels.provider_id, char(31))
145 FROM message_labels
146 JOIN labels ON labels.id = message_labels.label_id
147 WHERE message_labels.message_id = m.id
148 ), '') as "label_provider_ids!: String"
149 FROM messages m
150 JOIN message_labels ml ON m.id = ml.message_id
151 WHERE ml.label_id = ?
152 ORDER BY CASE WHEN m.date > ? THEN 0 ELSE m.date END DESC, m.id DESC
153 LIMIT ? OFFSET ?"#,
154 lid,
155 cutoff,
156 lim,
157 off,
158 )
159 .fetch_all(self.reader())
160 .await?;
161
162 Ok(rows
163 .into_iter()
164 .map(|r| {
165 record_to_envelope(
166 &r.id,
167 &r.account_id,
168 &r.provider_id,
169 &r.thread_id,
170 r.message_id_header.as_deref(),
171 r.in_reply_to.as_deref(),
172 r.reference_headers.as_deref(),
173 r.from_name.as_deref(),
174 &r.from_email,
175 &r.to_addrs,
176 &r.cc_addrs,
177 &r.bcc_addrs,
178 &r.subject,
179 r.date,
180 r.flags,
181 &r.snippet,
182 r.has_attachments,
183 r.size_bytes,
184 r.unsubscribe_method.as_deref(),
185 &r.label_provider_ids,
186 )
187 })
188 .collect())
189 }
190
191 pub async fn list_envelopes_by_account(
192 &self,
193 account_id: &AccountId,
194 limit: u32,
195 offset: u32,
196 ) -> Result<Vec<Envelope>, sqlx::Error> {
197 let aid = account_id.as_str();
198 let cutoff = future_date_cutoff_timestamp();
199 let lim = limit as i64;
200 let off = offset as i64;
201 let rows = sqlx::query!(
202 r#"SELECT
203 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
204 thread_id as "thread_id!", message_id_header, in_reply_to,
205 reference_headers, from_name, from_email as "from_email!",
206 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
207 subject as "subject!", date as "date!", flags as "flags!",
208 snippet as "snippet!", has_attachments as "has_attachments!: bool",
209 size_bytes as "size_bytes!", unsubscribe_method,
210 COALESCE((
211 SELECT GROUP_CONCAT(labels.provider_id, char(31))
212 FROM message_labels
213 JOIN labels ON labels.id = message_labels.label_id
214 WHERE message_labels.message_id = messages.id
215 ), '') as "label_provider_ids!: String"
216 FROM messages
217 WHERE account_id = ?
218 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
219 LIMIT ? OFFSET ?"#,
220 aid,
221 cutoff,
222 lim,
223 off,
224 )
225 .fetch_all(self.reader())
226 .await?;
227
228 Ok(rows
229 .into_iter()
230 .map(|r| {
231 record_to_envelope(
232 &r.id,
233 &r.account_id,
234 &r.provider_id,
235 &r.thread_id,
236 r.message_id_header.as_deref(),
237 r.in_reply_to.as_deref(),
238 r.reference_headers.as_deref(),
239 r.from_name.as_deref(),
240 &r.from_email,
241 &r.to_addrs,
242 &r.cc_addrs,
243 &r.bcc_addrs,
244 &r.subject,
245 r.date,
246 r.flags,
247 &r.snippet,
248 r.has_attachments,
249 r.size_bytes,
250 r.unsubscribe_method.as_deref(),
251 &r.label_provider_ids,
252 )
253 })
254 .collect())
255 }
256
257 pub async fn list_envelopes_by_ids(
258 &self,
259 message_ids: &[MessageId],
260 ) -> Result<Vec<Envelope>, sqlx::Error> {
261 if message_ids.is_empty() {
262 return Ok(Vec::new());
263 }
264
265 let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
266 let sql = format!(
267 r#"SELECT
268 m.id as id, m.account_id as account_id, m.provider_id as provider_id,
269 m.thread_id as thread_id, m.message_id_header, m.in_reply_to,
270 m.reference_headers, m.from_name, m.from_email as from_email,
271 m.to_addrs as to_addrs, m.cc_addrs as cc_addrs, m.bcc_addrs as bcc_addrs,
272 m.subject as subject, m.date as date, m.flags as flags,
273 m.snippet as snippet, m.has_attachments as has_attachments,
274 m.size_bytes as size_bytes, m.unsubscribe_method,
275 COALESCE((
276 SELECT GROUP_CONCAT(labels.provider_id, char(31))
277 FROM message_labels
278 JOIN labels ON labels.id = message_labels.label_id
279 WHERE message_labels.message_id = m.id
280 ), '') as label_provider_ids
281 FROM messages m
282 WHERE m.id IN ({})"#,
283 placeholders.join(", ")
284 );
285
286 let mut query = sqlx::query(&sql);
287 for message_id in message_ids {
288 query = query.bind(message_id.as_str());
289 }
290
291 let rows = query.fetch_all(self.reader()).await?;
292 let mut by_id = std::collections::HashMap::with_capacity(rows.len());
293 for row in rows {
294 let envelope = record_to_envelope(
295 row.get::<String, _>("id").as_str(),
296 row.get::<String, _>("account_id").as_str(),
297 row.get::<String, _>("provider_id").as_str(),
298 row.get::<String, _>("thread_id").as_str(),
299 row.get::<Option<String>, _>("message_id_header").as_deref(),
300 row.get::<Option<String>, _>("in_reply_to").as_deref(),
301 row.get::<Option<String>, _>("reference_headers").as_deref(),
302 row.get::<Option<String>, _>("from_name").as_deref(),
303 row.get::<String, _>("from_email").as_str(),
304 row.get::<String, _>("to_addrs").as_str(),
305 row.get::<String, _>("cc_addrs").as_str(),
306 row.get::<String, _>("bcc_addrs").as_str(),
307 row.get::<String, _>("subject").as_str(),
308 row.get::<i64, _>("date"),
309 row.get::<i64, _>("flags"),
310 row.get::<String, _>("snippet").as_str(),
311 row.get::<bool, _>("has_attachments"),
312 row.get::<i64, _>("size_bytes"),
313 row.get::<Option<String>, _>("unsubscribe_method")
314 .as_deref(),
315 row.get::<String, _>("label_provider_ids").as_str(),
316 );
317 by_id.insert(envelope.id.clone(), envelope);
318 }
319
320 Ok(message_ids
321 .iter()
322 .filter_map(|message_id| by_id.remove(message_id))
323 .collect())
324 }
325
326 pub async fn delete_messages_by_provider_ids(
328 &self,
329 account_id: &AccountId,
330 provider_ids: &[String],
331 ) -> Result<u64, sqlx::Error> {
332 if provider_ids.is_empty() {
333 return Ok(0);
334 }
335 let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
336 let sql = format!(
337 "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
338 placeholders.join(", ")
339 );
340 let mut query = sqlx::query(&sql).bind(account_id.as_str());
341 for pid in provider_ids {
342 query = query.bind(pid);
343 }
344 let result = query.execute(self.writer()).await?;
345 Ok(result.rows_affected())
346 }
347
348 pub async fn set_message_labels(
349 &self,
350 message_id: &MessageId,
351 label_ids: &[LabelId],
352 ) -> Result<(), sqlx::Error> {
353 let mid = message_id.as_str();
354 sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
355 .execute(self.writer())
356 .await?;
357
358 for label_id in label_ids {
359 let mid = message_id.as_str();
360 let lid = label_id.as_str();
361 sqlx::query!(
362 "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
363 mid,
364 lid,
365 )
366 .execute(self.writer())
367 .await?;
368 }
369
370 Ok(())
371 }
372
373 pub async fn update_message_thread_id(
374 &self,
375 message_id: &MessageId,
376 thread_id: &ThreadId,
377 ) -> Result<(), sqlx::Error> {
378 sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
379 .bind(thread_id.as_str())
380 .bind(message_id.as_str())
381 .execute(self.writer())
382 .await?;
383 Ok(())
384 }
385
386 pub async fn get_message_id_by_provider_id(
387 &self,
388 account_id: &AccountId,
389 provider_id: &str,
390 ) -> Result<Option<MessageId>, sqlx::Error> {
391 let aid = account_id.as_str();
392 let row = sqlx::query!(
393 r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
394 aid,
395 provider_id,
396 )
397 .fetch_optional(self.reader())
398 .await?;
399
400 Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
401 }
402
403 pub async fn count_messages_by_account(
404 &self,
405 account_id: &AccountId,
406 ) -> Result<u32, sqlx::Error> {
407 let aid = account_id.as_str();
408 let row = sqlx::query!(
409 r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
410 aid,
411 )
412 .fetch_one(self.reader())
413 .await?;
414
415 Ok(row.cnt as u32)
416 }
417
418 pub async fn list_all_envelopes_paginated(
420 &self,
421 limit: u32,
422 offset: u32,
423 ) -> Result<Vec<Envelope>, sqlx::Error> {
424 let cutoff = future_date_cutoff_timestamp();
425 let lim = limit as i64;
426 let off = offset as i64;
427 let rows = sqlx::query!(
428 r#"SELECT
429 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
430 thread_id as "thread_id!", message_id_header, in_reply_to,
431 reference_headers, from_name, from_email as "from_email!",
432 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
433 subject as "subject!", date as "date!", flags as "flags!",
434 snippet as "snippet!", has_attachments as "has_attachments!: bool",
435 size_bytes as "size_bytes!", unsubscribe_method,
436 COALESCE((
437 SELECT GROUP_CONCAT(labels.provider_id, char(31))
438 FROM message_labels
439 JOIN labels ON labels.id = message_labels.label_id
440 WHERE message_labels.message_id = messages.id
441 ), '') as "label_provider_ids!: String"
442 FROM messages
443 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
444 LIMIT ? OFFSET ?"#,
445 cutoff,
446 lim,
447 off,
448 )
449 .fetch_all(self.reader())
450 .await?;
451
452 Ok(rows
453 .into_iter()
454 .map(|r| {
455 record_to_envelope(
456 &r.id,
457 &r.account_id,
458 &r.provider_id,
459 &r.thread_id,
460 r.message_id_header.as_deref(),
461 r.in_reply_to.as_deref(),
462 r.reference_headers.as_deref(),
463 r.from_name.as_deref(),
464 &r.from_email,
465 &r.to_addrs,
466 &r.cc_addrs,
467 &r.bcc_addrs,
468 &r.subject,
469 r.date,
470 r.flags,
471 &r.snippet,
472 r.has_attachments,
473 r.size_bytes,
474 r.unsubscribe_method.as_deref(),
475 &r.label_provider_ids,
476 )
477 })
478 .collect())
479 }
480
481 pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
483 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
484 .fetch_one(self.reader())
485 .await?;
486 Ok(row.cnt as u32)
487 }
488
489 pub async fn update_flags(
490 &self,
491 message_id: &MessageId,
492 flags: MessageFlags,
493 ) -> Result<(), sqlx::Error> {
494 let mid = message_id.as_str();
495 let flags_val = flags.bits() as i64;
496 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
497 .execute(self.writer())
498 .await?;
499 Ok(())
500 }
501
502 pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
504 let mid = message_id.as_str();
505 let row = sqlx::query!(
506 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
507 mid,
508 )
509 .fetch_optional(self.reader())
510 .await?;
511
512 if let Some(r) = row {
513 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
514 if read {
515 flags.insert(MessageFlags::READ);
516 } else {
517 flags.remove(MessageFlags::READ);
518 }
519 let flags_val = flags.bits() as i64;
520 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
521 .execute(self.writer())
522 .await?;
523 }
524 Ok(())
525 }
526
527 pub async fn set_starred(
529 &self,
530 message_id: &MessageId,
531 starred: bool,
532 ) -> Result<(), sqlx::Error> {
533 let mid = message_id.as_str();
534 let row = sqlx::query!(
535 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
536 mid,
537 )
538 .fetch_optional(self.reader())
539 .await?;
540
541 if let Some(r) = row {
542 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
543 if starred {
544 flags.insert(MessageFlags::STARRED);
545 } else {
546 flags.remove(MessageFlags::STARRED);
547 }
548 let flags_val = flags.bits() as i64;
549 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
550 .execute(self.writer())
551 .await?;
552 }
553 Ok(())
554 }
555
556 pub async fn get_provider_id(
558 &self,
559 message_id: &MessageId,
560 ) -> Result<Option<String>, sqlx::Error> {
561 let mid = message_id.as_str();
562 let row = sqlx::query!(
563 r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
564 mid,
565 )
566 .fetch_optional(self.reader())
567 .await?;
568 Ok(row.map(|r| r.provider_id))
569 }
570
571 pub async fn get_message_label_ids(
573 &self,
574 message_id: &MessageId,
575 ) -> Result<Vec<LabelId>, sqlx::Error> {
576 let mid = message_id.as_str();
577 let rows = sqlx::query!(
578 r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
579 mid,
580 )
581 .fetch_all(self.reader())
582 .await?;
583 Ok(rows
584 .into_iter()
585 .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
586 .collect())
587 }
588
589 pub async fn add_message_label(
591 &self,
592 message_id: &MessageId,
593 label_id: &LabelId,
594 ) -> Result<(), sqlx::Error> {
595 let mid = message_id.as_str();
596 let lid = label_id.as_str();
597 sqlx::query!(
598 "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
599 mid,
600 lid,
601 )
602 .execute(self.writer())
603 .await?;
604 Ok(())
605 }
606
607 pub async fn remove_message_label(
609 &self,
610 message_id: &MessageId,
611 label_id: &LabelId,
612 ) -> Result<(), sqlx::Error> {
613 let mid = message_id.as_str();
614 let lid = label_id.as_str();
615 sqlx::query!(
616 "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
617 mid,
618 lid,
619 )
620 .execute(self.writer())
621 .await?;
622 Ok(())
623 }
624
625 pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
627 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
628 .fetch_one(self.reader())
629 .await?;
630 Ok(row.cnt as u32)
631 }
632
633 pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
635 let mid = message_id.as_str();
636 let row = sqlx::query!(
637 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
638 mid,
639 )
640 .fetch_optional(self.reader())
641 .await?;
642
643 if let Some(r) = row {
644 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
645 flags.insert(MessageFlags::TRASH);
646 let flags_val = flags.bits() as i64;
647 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
648 .execute(self.writer())
649 .await?;
650 }
651 Ok(())
652 }
653
654 pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
656 let lim = limit as i64;
657 let rows = sqlx::query_as::<_, (String, String)>(
658 r#"SELECT
659 COALESCE(from_name, '') as name,
660 from_email as email
661 FROM messages
662 WHERE from_email != ''
663 GROUP BY from_email
664 ORDER BY COUNT(*) DESC
665 LIMIT ?"#,
666 )
667 .bind(lim)
668 .fetch_all(self.reader())
669 .await?;
670 Ok(rows)
671 }
672
673 pub async fn list_subscriptions(
674 &self,
675 limit: u32,
676 ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
677 let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
678 let trash_flag = MessageFlags::TRASH.bits() as i64;
679 let spam_flag = MessageFlags::SPAM.bits() as i64;
680 let cutoff = future_date_cutoff_timestamp();
681 let lim = limit as i64;
682
683 let rows = sqlx::query(
684 r#"WITH ranked AS (
685 SELECT
686 id,
687 account_id,
688 provider_id,
689 thread_id,
690 from_name,
691 from_email,
692 subject,
693 snippet,
694 date,
695 flags,
696 has_attachments,
697 size_bytes,
698 unsubscribe_method,
699 COUNT(*) OVER (
700 PARTITION BY account_id, LOWER(from_email)
701 ) AS message_count,
702 ROW_NUMBER() OVER (
703 PARTITION BY account_id, LOWER(from_email)
704 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
705 ) AS rn
706 FROM messages
707 WHERE from_email != ''
708 AND unsubscribe_method IS NOT NULL
709 AND unsubscribe_method != ?
710 AND (flags & ?) = 0
711 AND (flags & ?) = 0
712 )
713 SELECT
714 id,
715 account_id,
716 provider_id,
717 thread_id,
718 from_name,
719 from_email,
720 subject,
721 snippet,
722 date,
723 flags,
724 has_attachments,
725 size_bytes,
726 unsubscribe_method,
727 message_count
728 FROM ranked
729 WHERE rn = 1
730 ORDER BY CASE WHEN date > ? THEN 0 ELSE date END DESC, id DESC
731 LIMIT ?"#,
732 )
733 .bind(cutoff)
734 .bind(none_unsubscribe)
735 .bind(trash_flag)
736 .bind(spam_flag)
737 .bind(cutoff)
738 .bind(lim)
739 .fetch_all(self.reader())
740 .await?;
741
742 Ok(rows
743 .into_iter()
744 .map(|row| SubscriptionSummary {
745 account_id: AccountId::from_uuid(
746 uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
747 ),
748 sender_name: row.get::<Option<String>, _>("from_name"),
749 sender_email: row.get::<String, _>("from_email"),
750 message_count: row.get::<i64, _>("message_count") as u32,
751 latest_message_id: MessageId::from_uuid(
752 uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
753 ),
754 latest_provider_id: row.get::<String, _>("provider_id"),
755 latest_thread_id: ThreadId::from_uuid(
756 uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
757 ),
758 latest_subject: row.get::<String, _>("subject"),
759 latest_snippet: row.get::<String, _>("snippet"),
760 latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
761 .unwrap_or_default(),
762 latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
763 latest_has_attachments: row.get::<bool, _>("has_attachments"),
764 latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
765 unsubscribe: row
766 .get::<Option<String>, _>("unsubscribe_method")
767 .as_deref()
768 .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
769 .unwrap_or(UnsubscribeMethod::None),
770 })
771 .collect())
772 }
773}
774
775#[allow(clippy::too_many_arguments)]
779pub(crate) fn record_to_envelope(
780 id: &str,
781 account_id: &str,
782 provider_id: &str,
783 thread_id: &str,
784 message_id_header: Option<&str>,
785 in_reply_to: Option<&str>,
786 reference_headers: Option<&str>,
787 from_name: Option<&str>,
788 from_email: &str,
789 to_addrs: &str,
790 cc_addrs: &str,
791 bcc_addrs: &str,
792 subject: &str,
793 date: i64,
794 flags: i64,
795 snippet: &str,
796 has_attachments: bool,
797 size_bytes: i64,
798 unsubscribe_method: Option<&str>,
799 label_provider_ids: &str,
800) -> Envelope {
801 Envelope {
802 id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
803 account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
804 provider_id: provider_id.to_string(),
805 thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
806 message_id_header: message_id_header.map(|s| s.to_string()),
807 in_reply_to: in_reply_to.map(|s| s.to_string()),
808 references: reference_headers
809 .map(|r| serde_json::from_str(r).unwrap_or_default())
810 .unwrap_or_default(),
811 from: Address {
812 name: from_name.map(|s| s.to_string()),
813 email: from_email.to_string(),
814 },
815 to: serde_json::from_str(to_addrs).unwrap_or_default(),
816 cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
817 bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
818 subject: subject.to_string(),
819 date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
820 flags: MessageFlags::from_bits_truncate(flags as u32),
821 snippet: snippet.to_string(),
822 has_attachments,
823 size_bytes: size_bytes as u64,
824 unsubscribe: unsubscribe_method
825 .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
826 .unwrap_or(UnsubscribeMethod::None),
827 label_provider_ids: if label_provider_ids.is_empty() {
828 vec![]
829 } else {
830 label_provider_ids
831 .split('\u{1f}')
832 .filter(|provider_id| !provider_id.is_empty())
833 .map(str::to_string)
834 .collect()
835 },
836 }
837}