1use mxr_core::id::*;
2use mxr_core::types::*;
3use sqlx::Row;
4impl super::Store {
5 pub async fn upsert_envelope(&self, envelope: &Envelope) -> Result<(), sqlx::Error> {
6 let id = envelope.id.as_str();
7 let account_id = envelope.account_id.as_str();
8 let thread_id = envelope.thread_id.as_str();
9 let from_name = envelope.from.name.as_deref();
10 let to_addrs = serde_json::to_string(&envelope.to).unwrap();
11 let cc_addrs = serde_json::to_string(&envelope.cc).unwrap();
12 let bcc_addrs = serde_json::to_string(&envelope.bcc).unwrap();
13 let refs = serde_json::to_string(&envelope.references).unwrap();
14 let date = envelope.date.timestamp();
15 let flags = envelope.flags.bits() as i64;
16 let unsub = serde_json::to_string(&envelope.unsubscribe).unwrap();
17 let has_attachments = envelope.has_attachments;
18 let size_bytes = envelope.size_bytes as i64;
19
20 sqlx::query!(
21 "INSERT INTO messages
22 (id, account_id, provider_id, thread_id, message_id_header, in_reply_to,
23 reference_headers, from_name, from_email, to_addrs, cc_addrs, bcc_addrs,
24 subject, date, flags, snippet, has_attachments, size_bytes, unsubscribe_method)
25 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
26 ON CONFLICT(id) DO UPDATE SET
27 provider_id = excluded.provider_id,
28 thread_id = excluded.thread_id,
29 message_id_header = excluded.message_id_header,
30 in_reply_to = excluded.in_reply_to,
31 reference_headers = excluded.reference_headers,
32 from_name = excluded.from_name,
33 from_email = excluded.from_email,
34 to_addrs = excluded.to_addrs,
35 cc_addrs = excluded.cc_addrs,
36 bcc_addrs = excluded.bcc_addrs,
37 subject = excluded.subject,
38 date = excluded.date,
39 flags = excluded.flags,
40 snippet = excluded.snippet,
41 has_attachments = excluded.has_attachments,
42 size_bytes = excluded.size_bytes,
43 unsubscribe_method = excluded.unsubscribe_method",
44 id,
45 account_id,
46 envelope.provider_id,
47 thread_id,
48 envelope.message_id_header,
49 envelope.in_reply_to,
50 refs,
51 from_name,
52 envelope.from.email,
53 to_addrs,
54 cc_addrs,
55 bcc_addrs,
56 envelope.subject,
57 date,
58 flags,
59 envelope.snippet,
60 has_attachments,
61 size_bytes,
62 unsub,
63 )
64 .execute(self.writer())
65 .await?;
66
67 Ok(())
68 }
69
70 pub async fn get_envelope(&self, id: &MessageId) -> Result<Option<Envelope>, sqlx::Error> {
71 let id_str = id.as_str();
72 let row = sqlx::query!(
73 r#"SELECT
74 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
75 thread_id as "thread_id!", message_id_header, in_reply_to,
76 reference_headers, from_name, from_email as "from_email!",
77 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
78 subject as "subject!", date as "date!", flags as "flags!",
79 snippet as "snippet!", has_attachments as "has_attachments!: bool",
80 size_bytes as "size_bytes!", unsubscribe_method,
81 COALESCE((
82 SELECT GROUP_CONCAT(labels.provider_id, char(31))
83 FROM message_labels
84 JOIN labels ON labels.id = message_labels.label_id
85 WHERE message_labels.message_id = messages.id
86 ), '') as "label_provider_ids!: String"
87 FROM messages WHERE id = ?"#,
88 id_str,
89 )
90 .fetch_optional(self.reader())
91 .await?;
92
93 Ok(row.map(|r| {
94 record_to_envelope(
95 &r.id,
96 &r.account_id,
97 &r.provider_id,
98 &r.thread_id,
99 r.message_id_header.as_deref(),
100 r.in_reply_to.as_deref(),
101 r.reference_headers.as_deref(),
102 r.from_name.as_deref(),
103 &r.from_email,
104 &r.to_addrs,
105 &r.cc_addrs,
106 &r.bcc_addrs,
107 &r.subject,
108 r.date,
109 r.flags,
110 &r.snippet,
111 r.has_attachments,
112 r.size_bytes,
113 r.unsubscribe_method.as_deref(),
114 &r.label_provider_ids,
115 )
116 }))
117 }
118
119 pub async fn list_envelopes_by_label(
120 &self,
121 label_id: &LabelId,
122 limit: u32,
123 offset: u32,
124 ) -> Result<Vec<Envelope>, sqlx::Error> {
125 let lid = label_id.as_str();
126 let lim = limit as i64;
127 let off = offset as i64;
128 let rows = sqlx::query!(
129 r#"SELECT
130 m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
131 m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
132 m.reference_headers, m.from_name, m.from_email as "from_email!",
133 m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
134 m.subject as "subject!", m.date as "date!", m.flags as "flags!",
135 m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
136 m.size_bytes as "size_bytes!", m.unsubscribe_method,
137 COALESCE((
138 SELECT GROUP_CONCAT(labels.provider_id, char(31))
139 FROM message_labels
140 JOIN labels ON labels.id = message_labels.label_id
141 WHERE message_labels.message_id = m.id
142 ), '') as "label_provider_ids!: String"
143 FROM messages m
144 JOIN message_labels ml ON m.id = ml.message_id
145 WHERE ml.label_id = ?
146 ORDER BY m.date DESC
147 LIMIT ? OFFSET ?"#,
148 lid,
149 lim,
150 off,
151 )
152 .fetch_all(self.reader())
153 .await?;
154
155 Ok(rows
156 .into_iter()
157 .map(|r| {
158 record_to_envelope(
159 &r.id,
160 &r.account_id,
161 &r.provider_id,
162 &r.thread_id,
163 r.message_id_header.as_deref(),
164 r.in_reply_to.as_deref(),
165 r.reference_headers.as_deref(),
166 r.from_name.as_deref(),
167 &r.from_email,
168 &r.to_addrs,
169 &r.cc_addrs,
170 &r.bcc_addrs,
171 &r.subject,
172 r.date,
173 r.flags,
174 &r.snippet,
175 r.has_attachments,
176 r.size_bytes,
177 r.unsubscribe_method.as_deref(),
178 &r.label_provider_ids,
179 )
180 })
181 .collect())
182 }
183
184 pub async fn list_envelopes_by_account(
185 &self,
186 account_id: &AccountId,
187 limit: u32,
188 offset: u32,
189 ) -> Result<Vec<Envelope>, sqlx::Error> {
190 let aid = account_id.as_str();
191 let lim = limit as i64;
192 let off = offset as i64;
193 let rows = sqlx::query!(
194 r#"SELECT
195 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
196 thread_id as "thread_id!", message_id_header, in_reply_to,
197 reference_headers, from_name, from_email as "from_email!",
198 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
199 subject as "subject!", date as "date!", flags as "flags!",
200 snippet as "snippet!", has_attachments as "has_attachments!: bool",
201 size_bytes as "size_bytes!", unsubscribe_method,
202 COALESCE((
203 SELECT GROUP_CONCAT(labels.provider_id, char(31))
204 FROM message_labels
205 JOIN labels ON labels.id = message_labels.label_id
206 WHERE message_labels.message_id = messages.id
207 ), '') as "label_provider_ids!: String"
208 FROM messages WHERE account_id = ? ORDER BY date DESC LIMIT ? OFFSET ?"#,
209 aid,
210 lim,
211 off,
212 )
213 .fetch_all(self.reader())
214 .await?;
215
216 Ok(rows
217 .into_iter()
218 .map(|r| {
219 record_to_envelope(
220 &r.id,
221 &r.account_id,
222 &r.provider_id,
223 &r.thread_id,
224 r.message_id_header.as_deref(),
225 r.in_reply_to.as_deref(),
226 r.reference_headers.as_deref(),
227 r.from_name.as_deref(),
228 &r.from_email,
229 &r.to_addrs,
230 &r.cc_addrs,
231 &r.bcc_addrs,
232 &r.subject,
233 r.date,
234 r.flags,
235 &r.snippet,
236 r.has_attachments,
237 r.size_bytes,
238 r.unsubscribe_method.as_deref(),
239 &r.label_provider_ids,
240 )
241 })
242 .collect())
243 }
244
245 pub async fn list_envelopes_by_ids(
246 &self,
247 message_ids: &[MessageId],
248 ) -> Result<Vec<Envelope>, sqlx::Error> {
249 if message_ids.is_empty() {
250 return Ok(Vec::new());
251 }
252
253 let placeholders: Vec<String> = message_ids.iter().map(|_| "?".to_string()).collect();
254 let sql = format!(
255 r#"SELECT
256 m.id as "id!", m.account_id as "account_id!", m.provider_id as "provider_id!",
257 m.thread_id as "thread_id!", m.message_id_header, m.in_reply_to,
258 m.reference_headers, m.from_name, m.from_email as "from_email!",
259 m.to_addrs as "to_addrs!", m.cc_addrs as "cc_addrs!", m.bcc_addrs as "bcc_addrs!",
260 m.subject as "subject!", m.date as "date!", m.flags as "flags!",
261 m.snippet as "snippet!", m.has_attachments as "has_attachments!: bool",
262 m.size_bytes as "size_bytes!", m.unsubscribe_method,
263 COALESCE((
264 SELECT GROUP_CONCAT(labels.provider_id, char(31))
265 FROM message_labels
266 JOIN labels ON labels.id = message_labels.label_id
267 WHERE message_labels.message_id = m.id
268 ), '') as "label_provider_ids!: String"
269 FROM messages m
270 WHERE m.id IN ({})"#,
271 placeholders.join(", ")
272 );
273
274 let mut query = sqlx::query(&sql);
275 for message_id in message_ids {
276 query = query.bind(message_id.as_str());
277 }
278
279 let rows = query.fetch_all(self.reader()).await?;
280 let mut by_id = std::collections::HashMap::with_capacity(rows.len());
281 for row in rows {
282 let envelope = record_to_envelope(
283 row.get::<String, _>("id").as_str(),
284 row.get::<String, _>("account_id").as_str(),
285 row.get::<String, _>("provider_id").as_str(),
286 row.get::<String, _>("thread_id").as_str(),
287 row.get::<Option<String>, _>("message_id_header").as_deref(),
288 row.get::<Option<String>, _>("in_reply_to").as_deref(),
289 row.get::<Option<String>, _>("reference_headers").as_deref(),
290 row.get::<Option<String>, _>("from_name").as_deref(),
291 row.get::<String, _>("from_email").as_str(),
292 row.get::<String, _>("to_addrs").as_str(),
293 row.get::<String, _>("cc_addrs").as_str(),
294 row.get::<String, _>("bcc_addrs").as_str(),
295 row.get::<String, _>("subject").as_str(),
296 row.get::<i64, _>("date"),
297 row.get::<i64, _>("flags"),
298 row.get::<String, _>("snippet").as_str(),
299 row.get::<bool, _>("has_attachments"),
300 row.get::<i64, _>("size_bytes"),
301 row.get::<Option<String>, _>("unsubscribe_method")
302 .as_deref(),
303 row.get::<String, _>("label_provider_ids").as_str(),
304 );
305 by_id.insert(envelope.id.clone(), envelope);
306 }
307
308 Ok(message_ids
309 .iter()
310 .filter_map(|message_id| by_id.remove(message_id))
311 .collect())
312 }
313
314 pub async fn delete_messages_by_provider_ids(
316 &self,
317 account_id: &AccountId,
318 provider_ids: &[String],
319 ) -> Result<u64, sqlx::Error> {
320 if provider_ids.is_empty() {
321 return Ok(0);
322 }
323 let placeholders: Vec<String> = provider_ids.iter().map(|_| "?".to_string()).collect();
324 let sql = format!(
325 "DELETE FROM messages WHERE account_id = ? AND provider_id IN ({})",
326 placeholders.join(", ")
327 );
328 let mut query = sqlx::query(&sql).bind(account_id.as_str());
329 for pid in provider_ids {
330 query = query.bind(pid);
331 }
332 let result = query.execute(self.writer()).await?;
333 Ok(result.rows_affected())
334 }
335
336 pub async fn set_message_labels(
337 &self,
338 message_id: &MessageId,
339 label_ids: &[LabelId],
340 ) -> Result<(), sqlx::Error> {
341 let mid = message_id.as_str();
342 sqlx::query!("DELETE FROM message_labels WHERE message_id = ?", mid)
343 .execute(self.writer())
344 .await?;
345
346 for label_id in label_ids {
347 let mid = message_id.as_str();
348 let lid = label_id.as_str();
349 sqlx::query!(
350 "INSERT INTO message_labels (message_id, label_id) VALUES (?, ?)",
351 mid,
352 lid,
353 )
354 .execute(self.writer())
355 .await?;
356 }
357
358 Ok(())
359 }
360
361 pub async fn update_message_thread_id(
362 &self,
363 message_id: &MessageId,
364 thread_id: &ThreadId,
365 ) -> Result<(), sqlx::Error> {
366 sqlx::query("UPDATE messages SET thread_id = ? WHERE id = ?")
367 .bind(thread_id.as_str())
368 .bind(message_id.as_str())
369 .execute(self.writer())
370 .await?;
371 Ok(())
372 }
373
374 pub async fn get_message_id_by_provider_id(
375 &self,
376 account_id: &AccountId,
377 provider_id: &str,
378 ) -> Result<Option<MessageId>, sqlx::Error> {
379 let aid = account_id.as_str();
380 let row = sqlx::query!(
381 r#"SELECT id as "id!" FROM messages WHERE account_id = ? AND provider_id = ?"#,
382 aid,
383 provider_id,
384 )
385 .fetch_optional(self.reader())
386 .await?;
387
388 Ok(row.map(|r| MessageId::from_uuid(uuid::Uuid::parse_str(&r.id).unwrap())))
389 }
390
391 pub async fn count_messages_by_account(
392 &self,
393 account_id: &AccountId,
394 ) -> Result<u32, sqlx::Error> {
395 let aid = account_id.as_str();
396 let row = sqlx::query!(
397 r#"SELECT COUNT(*) as "cnt!: i64" FROM messages WHERE account_id = ?"#,
398 aid,
399 )
400 .fetch_one(self.reader())
401 .await?;
402
403 Ok(row.cnt as u32)
404 }
405
406 pub async fn list_all_envelopes_paginated(
408 &self,
409 limit: u32,
410 offset: u32,
411 ) -> Result<Vec<Envelope>, sqlx::Error> {
412 let lim = limit as i64;
413 let off = offset as i64;
414 let rows = sqlx::query!(
415 r#"SELECT
416 id as "id!", account_id as "account_id!", provider_id as "provider_id!",
417 thread_id as "thread_id!", message_id_header, in_reply_to,
418 reference_headers, from_name, from_email as "from_email!",
419 to_addrs as "to_addrs!", cc_addrs as "cc_addrs!", bcc_addrs as "bcc_addrs!",
420 subject as "subject!", date as "date!", flags as "flags!",
421 snippet as "snippet!", has_attachments as "has_attachments!: bool",
422 size_bytes as "size_bytes!", unsubscribe_method,
423 COALESCE((
424 SELECT GROUP_CONCAT(labels.provider_id, char(31))
425 FROM message_labels
426 JOIN labels ON labels.id = message_labels.label_id
427 WHERE message_labels.message_id = messages.id
428 ), '') as "label_provider_ids!: String"
429 FROM messages ORDER BY date DESC LIMIT ? OFFSET ?"#,
430 lim,
431 off,
432 )
433 .fetch_all(self.reader())
434 .await?;
435
436 Ok(rows
437 .into_iter()
438 .map(|r| {
439 record_to_envelope(
440 &r.id,
441 &r.account_id,
442 &r.provider_id,
443 &r.thread_id,
444 r.message_id_header.as_deref(),
445 r.in_reply_to.as_deref(),
446 r.reference_headers.as_deref(),
447 r.from_name.as_deref(),
448 &r.from_email,
449 &r.to_addrs,
450 &r.cc_addrs,
451 &r.bcc_addrs,
452 &r.subject,
453 r.date,
454 r.flags,
455 &r.snippet,
456 r.has_attachments,
457 r.size_bytes,
458 r.unsubscribe_method.as_deref(),
459 &r.label_provider_ids,
460 )
461 })
462 .collect())
463 }
464
465 pub async fn count_all_messages(&self) -> Result<u32, sqlx::Error> {
467 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM messages"#)
468 .fetch_one(self.reader())
469 .await?;
470 Ok(row.cnt as u32)
471 }
472
473 pub async fn update_flags(
474 &self,
475 message_id: &MessageId,
476 flags: MessageFlags,
477 ) -> Result<(), sqlx::Error> {
478 let mid = message_id.as_str();
479 let flags_val = flags.bits() as i64;
480 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
481 .execute(self.writer())
482 .await?;
483 Ok(())
484 }
485
486 pub async fn set_read(&self, message_id: &MessageId, read: bool) -> Result<(), sqlx::Error> {
488 let mid = message_id.as_str();
489 let row = sqlx::query!(
490 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
491 mid,
492 )
493 .fetch_optional(self.reader())
494 .await?;
495
496 if let Some(r) = row {
497 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
498 if read {
499 flags.insert(MessageFlags::READ);
500 } else {
501 flags.remove(MessageFlags::READ);
502 }
503 let flags_val = flags.bits() as i64;
504 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
505 .execute(self.writer())
506 .await?;
507 }
508 Ok(())
509 }
510
511 pub async fn set_starred(
513 &self,
514 message_id: &MessageId,
515 starred: bool,
516 ) -> Result<(), sqlx::Error> {
517 let mid = message_id.as_str();
518 let row = sqlx::query!(
519 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
520 mid,
521 )
522 .fetch_optional(self.reader())
523 .await?;
524
525 if let Some(r) = row {
526 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
527 if starred {
528 flags.insert(MessageFlags::STARRED);
529 } else {
530 flags.remove(MessageFlags::STARRED);
531 }
532 let flags_val = flags.bits() as i64;
533 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
534 .execute(self.writer())
535 .await?;
536 }
537 Ok(())
538 }
539
540 pub async fn get_provider_id(
542 &self,
543 message_id: &MessageId,
544 ) -> Result<Option<String>, sqlx::Error> {
545 let mid = message_id.as_str();
546 let row = sqlx::query!(
547 r#"SELECT provider_id as "provider_id!" FROM messages WHERE id = ?"#,
548 mid,
549 )
550 .fetch_optional(self.reader())
551 .await?;
552 Ok(row.map(|r| r.provider_id))
553 }
554
555 pub async fn get_message_label_ids(
557 &self,
558 message_id: &MessageId,
559 ) -> Result<Vec<LabelId>, sqlx::Error> {
560 let mid = message_id.as_str();
561 let rows = sqlx::query!(
562 r#"SELECT label_id as "label_id!" FROM message_labels WHERE message_id = ?"#,
563 mid,
564 )
565 .fetch_all(self.reader())
566 .await?;
567 Ok(rows
568 .into_iter()
569 .map(|r| LabelId::from_uuid(uuid::Uuid::parse_str(&r.label_id).unwrap()))
570 .collect())
571 }
572
573 pub async fn add_message_label(
575 &self,
576 message_id: &MessageId,
577 label_id: &LabelId,
578 ) -> Result<(), sqlx::Error> {
579 let mid = message_id.as_str();
580 let lid = label_id.as_str();
581 sqlx::query!(
582 "INSERT OR IGNORE INTO message_labels (message_id, label_id) VALUES (?, ?)",
583 mid,
584 lid,
585 )
586 .execute(self.writer())
587 .await?;
588 Ok(())
589 }
590
591 pub async fn remove_message_label(
593 &self,
594 message_id: &MessageId,
595 label_id: &LabelId,
596 ) -> Result<(), sqlx::Error> {
597 let mid = message_id.as_str();
598 let lid = label_id.as_str();
599 sqlx::query!(
600 "DELETE FROM message_labels WHERE message_id = ? AND label_id = ?",
601 mid,
602 lid,
603 )
604 .execute(self.writer())
605 .await?;
606 Ok(())
607 }
608
609 pub async fn count_message_labels(&self) -> Result<u32, sqlx::Error> {
611 let row = sqlx::query!(r#"SELECT COUNT(*) as "cnt!: i64" FROM message_labels"#,)
612 .fetch_one(self.reader())
613 .await?;
614 Ok(row.cnt as u32)
615 }
616
617 pub async fn move_to_trash(&self, message_id: &MessageId) -> Result<(), sqlx::Error> {
619 let mid = message_id.as_str();
620 let row = sqlx::query!(
621 r#"SELECT flags as "flags!" FROM messages WHERE id = ?"#,
622 mid,
623 )
624 .fetch_optional(self.reader())
625 .await?;
626
627 if let Some(r) = row {
628 let mut flags = MessageFlags::from_bits_truncate(r.flags as u32);
629 flags.insert(MessageFlags::TRASH);
630 let flags_val = flags.bits() as i64;
631 sqlx::query!("UPDATE messages SET flags = ? WHERE id = ?", flags_val, mid)
632 .execute(self.writer())
633 .await?;
634 }
635 Ok(())
636 }
637
638 pub async fn list_contacts(&self, limit: u32) -> Result<Vec<(String, String)>, sqlx::Error> {
640 let lim = limit as i64;
641 let rows = sqlx::query_as::<_, (String, String)>(
642 r#"SELECT
643 COALESCE(from_name, '') as name,
644 from_email as email
645 FROM messages
646 WHERE from_email != ''
647 GROUP BY from_email
648 ORDER BY COUNT(*) DESC
649 LIMIT ?"#,
650 )
651 .bind(lim)
652 .fetch_all(self.reader())
653 .await?;
654 Ok(rows)
655 }
656
657 pub async fn list_subscriptions(
658 &self,
659 limit: u32,
660 ) -> Result<Vec<SubscriptionSummary>, sqlx::Error> {
661 let none_unsubscribe = serde_json::to_string(&UnsubscribeMethod::None).unwrap();
662 let trash_flag = MessageFlags::TRASH.bits() as i64;
663 let spam_flag = MessageFlags::SPAM.bits() as i64;
664 let lim = limit as i64;
665
666 let rows = sqlx::query(
667 r#"WITH ranked AS (
668 SELECT
669 id,
670 account_id,
671 provider_id,
672 thread_id,
673 from_name,
674 from_email,
675 subject,
676 snippet,
677 date,
678 flags,
679 has_attachments,
680 size_bytes,
681 unsubscribe_method,
682 COUNT(*) OVER (
683 PARTITION BY account_id, LOWER(from_email)
684 ) AS message_count,
685 ROW_NUMBER() OVER (
686 PARTITION BY account_id, LOWER(from_email)
687 ORDER BY date DESC, id DESC
688 ) AS rn
689 FROM messages
690 WHERE from_email != ''
691 AND unsubscribe_method IS NOT NULL
692 AND unsubscribe_method != ?
693 AND (flags & ?) = 0
694 AND (flags & ?) = 0
695 )
696 SELECT
697 id,
698 account_id,
699 provider_id,
700 thread_id,
701 from_name,
702 from_email,
703 subject,
704 snippet,
705 date,
706 flags,
707 has_attachments,
708 size_bytes,
709 unsubscribe_method,
710 message_count
711 FROM ranked
712 WHERE rn = 1
713 ORDER BY date DESC, id DESC
714 LIMIT ?"#,
715 )
716 .bind(none_unsubscribe)
717 .bind(trash_flag)
718 .bind(spam_flag)
719 .bind(lim)
720 .fetch_all(self.reader())
721 .await?;
722
723 Ok(rows
724 .into_iter()
725 .map(|row| SubscriptionSummary {
726 account_id: AccountId::from_uuid(
727 uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
728 ),
729 sender_name: row.get::<Option<String>, _>("from_name"),
730 sender_email: row.get::<String, _>("from_email"),
731 message_count: row.get::<i64, _>("message_count") as u32,
732 latest_message_id: MessageId::from_uuid(
733 uuid::Uuid::parse_str(&row.get::<String, _>("id")).unwrap(),
734 ),
735 latest_provider_id: row.get::<String, _>("provider_id"),
736 latest_thread_id: ThreadId::from_uuid(
737 uuid::Uuid::parse_str(&row.get::<String, _>("thread_id")).unwrap(),
738 ),
739 latest_subject: row.get::<String, _>("subject"),
740 latest_snippet: row.get::<String, _>("snippet"),
741 latest_date: chrono::DateTime::from_timestamp(row.get::<i64, _>("date"), 0)
742 .unwrap_or_default(),
743 latest_flags: MessageFlags::from_bits_truncate(row.get::<i64, _>("flags") as u32),
744 latest_has_attachments: row.get::<bool, _>("has_attachments"),
745 latest_size_bytes: row.get::<i64, _>("size_bytes") as u64,
746 unsubscribe: row
747 .get::<Option<String>, _>("unsubscribe_method")
748 .as_deref()
749 .map(|value| serde_json::from_str(value).unwrap_or(UnsubscribeMethod::None))
750 .unwrap_or(UnsubscribeMethod::None),
751 })
752 .collect())
753 }
754}
755
756#[allow(clippy::too_many_arguments)]
760pub(crate) fn record_to_envelope(
761 id: &str,
762 account_id: &str,
763 provider_id: &str,
764 thread_id: &str,
765 message_id_header: Option<&str>,
766 in_reply_to: Option<&str>,
767 reference_headers: Option<&str>,
768 from_name: Option<&str>,
769 from_email: &str,
770 to_addrs: &str,
771 cc_addrs: &str,
772 bcc_addrs: &str,
773 subject: &str,
774 date: i64,
775 flags: i64,
776 snippet: &str,
777 has_attachments: bool,
778 size_bytes: i64,
779 unsubscribe_method: Option<&str>,
780 label_provider_ids: &str,
781) -> Envelope {
782 Envelope {
783 id: MessageId::from_uuid(uuid::Uuid::parse_str(id).unwrap()),
784 account_id: AccountId::from_uuid(uuid::Uuid::parse_str(account_id).unwrap()),
785 provider_id: provider_id.to_string(),
786 thread_id: ThreadId::from_uuid(uuid::Uuid::parse_str(thread_id).unwrap()),
787 message_id_header: message_id_header.map(|s| s.to_string()),
788 in_reply_to: in_reply_to.map(|s| s.to_string()),
789 references: reference_headers
790 .map(|r| serde_json::from_str(r).unwrap_or_default())
791 .unwrap_or_default(),
792 from: Address {
793 name: from_name.map(|s| s.to_string()),
794 email: from_email.to_string(),
795 },
796 to: serde_json::from_str(to_addrs).unwrap_or_default(),
797 cc: serde_json::from_str(cc_addrs).unwrap_or_default(),
798 bcc: serde_json::from_str(bcc_addrs).unwrap_or_default(),
799 subject: subject.to_string(),
800 date: chrono::DateTime::from_timestamp(date, 0).unwrap_or_default(),
801 flags: MessageFlags::from_bits_truncate(flags as u32),
802 snippet: snippet.to_string(),
803 has_attachments,
804 size_bytes: size_bytes as u64,
805 unsubscribe: unsubscribe_method
806 .map(|u| serde_json::from_str(u).unwrap_or(UnsubscribeMethod::None))
807 .unwrap_or(UnsubscribeMethod::None),
808 label_provider_ids: if label_provider_ids.is_empty() {
809 vec![]
810 } else {
811 label_provider_ids
812 .split('\u{1f}')
813 .filter(|provider_id| !provider_id.is_empty())
814 .map(str::to_string)
815 .collect()
816 },
817 }
818}