1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7 use super::*;
8 use mxr_core::id::*;
9 use mxr_core::types::*;
10 use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11 use mxr_search::SearchIndex;
12 use mxr_store::Store;
13 use std::sync::Arc;
14 use tokio::sync::Mutex;
15
16 struct ErrorProvider {
18 account_id: AccountId,
19 }
20
21 #[async_trait::async_trait]
22 impl MailSyncProvider for ErrorProvider {
23 fn name(&self) -> &str {
24 "error"
25 }
26 fn account_id(&self) -> &AccountId {
27 &self.account_id
28 }
29 fn capabilities(&self) -> SyncCapabilities {
30 SyncCapabilities {
31 labels: false,
32 server_search: false,
33 delta_sync: false,
34 push: false,
35 batch_operations: false,
36 native_thread_ids: true,
37 }
38 }
39 async fn authenticate(&mut self) -> Result<(), MxrError> {
40 Ok(())
41 }
42 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43 Ok(())
44 }
45 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46 Ok(vec![])
47 }
48 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49 Err(MxrError::Provider("simulated sync error".into()))
50 }
51 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52 Err(MxrError::Provider("simulated attachment error".into()))
53 }
54 async fn modify_labels(
55 &self,
56 _id: &str,
57 _add: &[String],
58 _rm: &[String],
59 ) -> Result<(), MxrError> {
60 Err(MxrError::Provider("simulated error".into()))
61 }
62 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63 Err(MxrError::Provider("simulated error".into()))
64 }
65 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66 Err(MxrError::Provider("simulated error".into()))
67 }
68 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69 Err(MxrError::Provider("simulated error".into()))
70 }
71 }
72
73 fn test_account(account_id: AccountId) -> mxr_core::Account {
74 mxr_core::Account {
75 id: account_id,
76 name: "Fake Account".to_string(),
77 email: "user@example.com".to_string(),
78 sync_backend: Some(BackendRef {
79 provider_kind: ProviderKind::Fake,
80 config_key: "fake".to_string(),
81 }),
82 send_backend: None,
83 enabled: true,
84 }
85 }
86
87 struct DeltaLabelProvider {
89 account_id: AccountId,
90 messages: Vec<SyncedMessage>,
91 labels: Vec<Label>,
92 label_changes: Vec<LabelChange>,
93 }
94
95 impl DeltaLabelProvider {
96 fn new(
97 account_id: AccountId,
98 messages: Vec<Envelope>,
99 labels: Vec<Label>,
100 label_changes: Vec<LabelChange>,
101 ) -> Self {
102 let messages = messages
103 .into_iter()
104 .map(|env| SyncedMessage {
105 body: make_empty_body(&env.id),
106 envelope: env,
107 })
108 .collect();
109 Self {
110 account_id,
111 messages,
112 labels,
113 label_changes,
114 }
115 }
116 }
117
118 struct ThreadingProvider {
119 account_id: AccountId,
120 messages: Vec<SyncedMessage>,
121 }
122
123 struct RecoveringNotFoundProvider {
124 account_id: AccountId,
125 message: SyncedMessage,
126 calls: std::sync::Mutex<Vec<SyncCursor>>,
127 }
128
129 #[async_trait::async_trait]
130 impl MailSyncProvider for ThreadingProvider {
131 fn name(&self) -> &str {
132 "threading"
133 }
134
135 fn account_id(&self) -> &AccountId {
136 &self.account_id
137 }
138
139 fn capabilities(&self) -> SyncCapabilities {
140 SyncCapabilities {
141 labels: false,
142 server_search: false,
143 delta_sync: false,
144 push: false,
145 batch_operations: false,
146 native_thread_ids: false,
147 }
148 }
149
150 async fn authenticate(&mut self) -> Result<(), MxrError> {
151 Ok(())
152 }
153
154 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
155 Ok(())
156 }
157
158 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
159 Ok(vec![])
160 }
161
162 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
163 Ok(SyncBatch {
164 upserted: self.messages.clone(),
165 deleted_provider_ids: vec![],
166 label_changes: vec![],
167 next_cursor: SyncCursor::Initial,
168 })
169 }
170
171 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
172 Err(MxrError::NotFound("no attachment".into()))
173 }
174
175 async fn modify_labels(
176 &self,
177 _id: &str,
178 _add: &[String],
179 _rm: &[String],
180 ) -> Result<(), MxrError> {
181 Ok(())
182 }
183
184 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
185 Ok(())
186 }
187
188 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
189 Ok(())
190 }
191
192 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
193 Ok(())
194 }
195 }
196
197 #[async_trait::async_trait]
198 impl MailSyncProvider for RecoveringNotFoundProvider {
199 fn name(&self) -> &str {
200 "recovering-not-found"
201 }
202
203 fn account_id(&self) -> &AccountId {
204 &self.account_id
205 }
206
207 fn capabilities(&self) -> SyncCapabilities {
208 SyncCapabilities {
209 labels: false,
210 server_search: false,
211 delta_sync: true,
212 push: false,
213 batch_operations: false,
214 native_thread_ids: true,
215 }
216 }
217
218 async fn authenticate(&mut self) -> Result<(), MxrError> {
219 Ok(())
220 }
221
222 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
223 Ok(())
224 }
225
226 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227 Ok(vec![])
228 }
229
230 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
231 self.calls.lock().unwrap().push(cursor.clone());
232 match cursor {
233 SyncCursor::Gmail { .. } => {
234 Err(MxrError::NotFound("Requested entity was not found.".into()))
235 }
236 SyncCursor::Initial => Ok(SyncBatch {
237 upserted: vec![self.message.clone()],
238 deleted_provider_ids: vec![],
239 label_changes: vec![],
240 next_cursor: SyncCursor::Gmail { history_id: 22 },
241 }),
242 other => panic!("unexpected cursor in test: {other:?}"),
243 }
244 }
245
246 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
247 Err(MxrError::NotFound("no attachment".into()))
248 }
249
250 async fn modify_labels(
251 &self,
252 _id: &str,
253 _add: &[String],
254 _rm: &[String],
255 ) -> Result<(), MxrError> {
256 Ok(())
257 }
258
259 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
260 Ok(())
261 }
262
263 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
264 Ok(())
265 }
266
267 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
268 Ok(())
269 }
270 }
271
272 fn make_empty_body(message_id: &MessageId) -> MessageBody {
273 MessageBody {
274 message_id: message_id.clone(),
275 text_plain: Some("test body".to_string()),
276 text_html: None,
277 attachments: vec![],
278 fetched_at: chrono::Utc::now(),
279 metadata: MessageMetadata::default(),
280 }
281 }
282
283 #[async_trait::async_trait]
284 impl MailSyncProvider for DeltaLabelProvider {
285 fn name(&self) -> &str {
286 "delta-label"
287 }
288 fn account_id(&self) -> &AccountId {
289 &self.account_id
290 }
291 fn capabilities(&self) -> SyncCapabilities {
292 SyncCapabilities {
293 labels: true,
294 server_search: false,
295 delta_sync: true,
296 push: false,
297 batch_operations: false,
298 native_thread_ids: true,
299 }
300 }
301 async fn authenticate(&mut self) -> Result<(), MxrError> {
302 Ok(())
303 }
304 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
305 Ok(())
306 }
307 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
308 Ok(self.labels.clone())
309 }
310 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
311 match cursor {
312 SyncCursor::Initial => Ok(SyncBatch {
313 upserted: self.messages.clone(),
314 deleted_provider_ids: vec![],
315 label_changes: vec![],
316 next_cursor: SyncCursor::Gmail { history_id: 100 },
317 }),
318 _ => Ok(SyncBatch {
319 upserted: vec![],
320 deleted_provider_ids: vec![],
321 label_changes: self.label_changes.clone(),
322 next_cursor: SyncCursor::Gmail { history_id: 200 },
323 }),
324 }
325 }
326 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
327 Err(MxrError::NotFound("no attachment".into()))
328 }
329 async fn modify_labels(
330 &self,
331 _id: &str,
332 _add: &[String],
333 _rm: &[String],
334 ) -> Result<(), MxrError> {
335 Ok(())
336 }
337 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
338 Ok(())
339 }
340 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
341 Ok(())
342 }
343 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
344 Ok(())
345 }
346 }
347
348 fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
349 Label {
350 id: LabelId::new(),
351 account_id: account_id.clone(),
352 name: name.to_string(),
353 kind: LabelKind::System,
354 color: None,
355 provider_id: provider_id.to_string(),
356 unread_count: 0,
357 total_count: 0,
358 }
359 }
360
361 fn make_test_envelope(
362 account_id: &AccountId,
363 provider_id: &str,
364 label_provider_ids: Vec<String>,
365 ) -> Envelope {
366 Envelope {
367 id: MessageId::new(),
368 account_id: account_id.clone(),
369 provider_id: provider_id.to_string(),
370 thread_id: ThreadId::new(),
371 message_id_header: None,
372 in_reply_to: None,
373 references: vec![],
374 from: mxr_core::Address {
375 name: Some("Test".to_string()),
376 email: "test@example.com".to_string(),
377 },
378 to: vec![],
379 cc: vec![],
380 bcc: vec![],
381 subject: "Test message".to_string(),
382 date: chrono::Utc::now(),
383 flags: MessageFlags::empty(),
384 snippet: "Test snippet".to_string(),
385 has_attachments: false,
386 size_bytes: 1000,
387 unsubscribe: UnsubscribeMethod::None,
388 label_provider_ids,
389 }
390 }
391
392 #[tokio::test]
393 async fn delta_sync_applies_label_additions() {
394 let store = Arc::new(Store::in_memory().await.unwrap());
395 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
396 let engine = SyncEngine::new(store.clone(), search.clone());
397
398 let account_id = AccountId::new();
399 store
400 .insert_account(&test_account(account_id.clone()))
401 .await
402 .unwrap();
403
404 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
405 let starred = make_test_label(&account_id, "Starred", "STARRED");
406 let labels = vec![inbox.clone(), starred.clone()];
407
408 let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
409 let msg_provider_id = msg.provider_id.clone();
410
411 let provider = DeltaLabelProvider::new(
412 account_id.clone(),
413 vec![msg],
414 labels,
415 vec![LabelChange {
416 provider_message_id: msg_provider_id.clone(),
417 added_labels: vec!["STARRED".to_string()],
418 removed_labels: vec![],
419 }],
420 );
421
422 engine.sync_account(&provider).await.unwrap();
424
425 let msg_id = store
427 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
428 .await
429 .unwrap()
430 .unwrap();
431 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
432 assert!(labels_before.contains(&inbox.id));
433 assert!(!labels_before.contains(&starred.id));
434
435 engine.sync_account(&provider).await.unwrap();
437
438 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
439 assert!(
440 labels_after.contains(&inbox.id),
441 "INBOX should still be present"
442 );
443 assert!(
444 labels_after.contains(&starred.id),
445 "STARRED should be added by delta"
446 );
447 }
448
449 #[tokio::test]
450 async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
451 let store = Arc::new(Store::in_memory().await.unwrap());
452 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
453 let engine = SyncEngine::new(store.clone(), search);
454
455 let account_id = AccountId::new();
456 store
457 .insert_account(&test_account(account_id.clone()))
458 .await
459 .unwrap();
460
461 let first_id = MessageId::new();
462 let second_id = MessageId::new();
463 let first = SyncedMessage {
464 envelope: Envelope {
465 id: first_id.clone(),
466 account_id: account_id.clone(),
467 provider_id: "prov-thread-1".into(),
468 thread_id: ThreadId::new(),
469 message_id_header: Some("<root@example.com>".into()),
470 in_reply_to: None,
471 references: vec![],
472 from: mxr_core::Address {
473 name: Some("Alice".into()),
474 email: "alice@example.com".into(),
475 },
476 to: vec![],
477 cc: vec![],
478 bcc: vec![],
479 subject: "Topic".into(),
480 date: chrono::Utc::now() - chrono::Duration::minutes(5),
481 flags: MessageFlags::empty(),
482 snippet: "first".into(),
483 has_attachments: false,
484 size_bytes: 100,
485 unsubscribe: UnsubscribeMethod::None,
486 label_provider_ids: vec![],
487 },
488 body: make_empty_body(&first_id),
489 };
490 let second = SyncedMessage {
491 envelope: Envelope {
492 id: second_id.clone(),
493 account_id: account_id.clone(),
494 provider_id: "prov-thread-2".into(),
495 thread_id: ThreadId::new(),
496 message_id_header: Some("<reply@example.com>".into()),
497 in_reply_to: Some("<root@example.com>".into()),
498 references: vec!["<root@example.com>".into()],
499 from: mxr_core::Address {
500 name: Some("Bob".into()),
501 email: "bob@example.com".into(),
502 },
503 to: vec![],
504 cc: vec![],
505 bcc: vec![],
506 subject: "Re: Topic".into(),
507 date: chrono::Utc::now(),
508 flags: MessageFlags::empty(),
509 snippet: "second".into(),
510 has_attachments: false,
511 size_bytes: 100,
512 unsubscribe: UnsubscribeMethod::None,
513 label_provider_ids: vec![],
514 },
515 body: make_empty_body(&second_id),
516 };
517
518 let provider = ThreadingProvider {
519 account_id: account_id.clone(),
520 messages: vec![first, second],
521 };
522
523 engine.sync_account(&provider).await.unwrap();
524
525 let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
526 let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
527 assert_eq!(first_env.thread_id, second_env.thread_id);
528
529 let thread = store
530 .get_thread(&first_env.thread_id)
531 .await
532 .unwrap()
533 .unwrap();
534 assert_eq!(thread.message_count, 2);
535 }
536
537 #[tokio::test]
538 async fn delta_sync_applies_label_removals() {
539 let store = Arc::new(Store::in_memory().await.unwrap());
540 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
541 let engine = SyncEngine::new(store.clone(), search.clone());
542
543 let account_id = AccountId::new();
544 store
545 .insert_account(&test_account(account_id.clone()))
546 .await
547 .unwrap();
548
549 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
550 let starred = make_test_label(&account_id, "Starred", "STARRED");
551 let labels = vec![inbox.clone(), starred.clone()];
552
553 let msg = make_test_envelope(
554 &account_id,
555 "prov-msg-2",
556 vec!["INBOX".to_string(), "STARRED".to_string()],
557 );
558 let msg_provider_id = msg.provider_id.clone();
559
560 let provider = DeltaLabelProvider::new(
561 account_id.clone(),
562 vec![msg],
563 labels,
564 vec![LabelChange {
565 provider_message_id: msg_provider_id.clone(),
566 added_labels: vec![],
567 removed_labels: vec!["STARRED".to_string()],
568 }],
569 );
570
571 engine.sync_account(&provider).await.unwrap();
573
574 let msg_id = store
575 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
576 .await
577 .unwrap()
578 .unwrap();
579 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
580 assert!(
581 labels_before.contains(&starred.id),
582 "STARRED should be present after initial sync"
583 );
584
585 engine.sync_account(&provider).await.unwrap();
587
588 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
589 assert!(labels_after.contains(&inbox.id), "INBOX should remain");
590 assert!(
591 !labels_after.contains(&starred.id),
592 "STARRED should be removed by delta"
593 );
594 }
595
596 #[tokio::test]
597 async fn delta_sync_handles_unknown_provider_message() {
598 let store = Arc::new(Store::in_memory().await.unwrap());
599 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
600 let engine = SyncEngine::new(store.clone(), search.clone());
601
602 let account_id = AccountId::new();
603 store
604 .insert_account(&test_account(account_id.clone()))
605 .await
606 .unwrap();
607
608 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
609 let labels = vec![inbox.clone()];
610
611 let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
612
613 let provider = DeltaLabelProvider::new(
614 account_id.clone(),
615 vec![msg],
616 labels,
617 vec![LabelChange {
618 provider_message_id: "nonexistent-msg".to_string(),
620 added_labels: vec!["INBOX".to_string()],
621 removed_labels: vec![],
622 }],
623 );
624
625 engine.sync_account(&provider).await.unwrap();
627
628 let result = engine.sync_account(&provider).await;
630 assert!(
631 result.is_ok(),
632 "Delta sync should gracefully skip unknown messages"
633 );
634 }
635
636 #[tokio::test]
637 async fn sync_populates_store_and_search() {
638 let store = Arc::new(Store::in_memory().await.unwrap());
639 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
640 let engine = SyncEngine::new(store.clone(), search.clone());
641
642 let account_id = AccountId::new();
643 store
644 .insert_account(&test_account(account_id.clone()))
645 .await
646 .unwrap();
647
648 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
649 let count = engine.sync_account(&provider).await.unwrap();
650 assert_eq!(count, 55);
651
652 let envelopes = store
654 .list_envelopes_by_account(&account_id, 100, 0)
655 .await
656 .unwrap();
657 assert_eq!(envelopes.len(), 55);
658
659 let results = search
661 .lock()
662 .await
663 .search("deployment", 10, 0, SortOrder::DateDesc)
664 .unwrap();
665 assert!(!results.results.is_empty());
666 }
667
668 #[tokio::test]
669 async fn bodies_stored_eagerly_during_sync() {
670 let store = Arc::new(Store::in_memory().await.unwrap());
671 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
672 let engine = SyncEngine::new(store.clone(), search.clone());
673
674 let account_id = AccountId::new();
675 store
676 .insert_account(&test_account(account_id.clone()))
677 .await
678 .unwrap();
679
680 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
681 engine.sync_account(&provider).await.unwrap();
682
683 let envelopes = store
685 .list_envelopes_by_account(&account_id, 1, 0)
686 .await
687 .unwrap();
688 let msg_id = &envelopes[0].id;
689
690 let body = engine.get_body(msg_id).await.unwrap();
692 assert!(body.text_plain.is_some());
693
694 let body2 = engine.get_body(msg_id).await.unwrap();
696 assert_eq!(body.text_plain, body2.text_plain);
697 }
698
699 #[tokio::test]
700 async fn snooze_wake() {
701 let store = Arc::new(Store::in_memory().await.unwrap());
702 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
703 let engine = SyncEngine::new(store.clone(), search.clone());
704
705 let account_id = AccountId::new();
706 store
707 .insert_account(&test_account(account_id.clone()))
708 .await
709 .unwrap();
710
711 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
712 engine.sync_account(&provider).await.unwrap();
713
714 let envelopes = store
716 .list_envelopes_by_account(&account_id, 1, 0)
717 .await
718 .unwrap();
719
720 let snoozed = Snoozed {
721 message_id: envelopes[0].id.clone(),
722 account_id: account_id.clone(),
723 snoozed_at: chrono::Utc::now(),
724 wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
725 original_labels: vec![],
726 };
727 store.insert_snooze(&snoozed).await.unwrap();
728
729 let woken = engine.check_snoozes().await.unwrap();
730 assert_eq!(woken.len(), 1);
731
732 let woken2 = engine.check_snoozes().await.unwrap();
734 assert_eq!(woken2.len(), 0);
735 }
736
737 #[tokio::test]
738 async fn cursor_persistence() {
739 let store = Arc::new(Store::in_memory().await.unwrap());
740 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
741 let engine = SyncEngine::new(store.clone(), search.clone());
742
743 let account_id = AccountId::new();
744 store
745 .insert_account(&test_account(account_id.clone()))
746 .await
747 .unwrap();
748
749 let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
751 assert!(cursor_before.is_none());
752
753 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
754 engine.sync_account(&provider).await.unwrap();
755
756 let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
758 assert!(cursor_after.is_some(), "Cursor should be set after sync");
759 let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
760 assert!(
761 cursor_json.contains("Gmail") && cursor_json.contains("1"),
762 "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
763 cursor_json
764 );
765 }
766
767 #[tokio::test]
768 async fn sync_error_does_not_crash() {
769 let store = Arc::new(Store::in_memory().await.unwrap());
770 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
771 let engine = SyncEngine::new(store.clone(), search.clone());
772
773 let account_id = AccountId::new();
774 store
775 .insert_account(&test_account(account_id.clone()))
776 .await
777 .unwrap();
778
779 let error_provider = ErrorProvider {
780 account_id: account_id.clone(),
781 };
782
783 let result = engine.sync_account(&error_provider).await;
785 assert!(
786 result.is_err(),
787 "sync_account should return Err for failing provider"
788 );
789 let err_msg = result.unwrap_err().to_string();
790 assert!(
791 err_msg.contains("simulated"),
792 "Error should contain provider message, got: {}",
793 err_msg
794 );
795 }
796
797 #[tokio::test]
798 async fn label_counts_after_sync() {
799 let store = Arc::new(Store::in_memory().await.unwrap());
800 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
801 let engine = SyncEngine::new(store.clone(), search.clone());
802
803 let account_id = AccountId::new();
804 store
805 .insert_account(&test_account(account_id.clone()))
806 .await
807 .unwrap();
808
809 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
810 engine.sync_account(&provider).await.unwrap();
811
812 let labels = store.list_labels_by_account(&account_id).await.unwrap();
813 assert!(!labels.is_empty(), "Should have labels after sync");
814
815 let has_counts = labels
816 .iter()
817 .any(|l| l.total_count > 0 || l.unread_count > 0);
818 assert!(
819 has_counts,
820 "At least one label should have non-zero counts after sync"
821 );
822 }
823
824 #[tokio::test]
825 async fn list_envelopes_by_label_returns_results() {
826 let store = Arc::new(Store::in_memory().await.unwrap());
827 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
828 let engine = SyncEngine::new(store.clone(), search.clone());
829
830 let account_id = AccountId::new();
831 store
832 .insert_account(&test_account(account_id.clone()))
833 .await
834 .unwrap();
835
836 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
837 engine.sync_account(&provider).await.unwrap();
838
839 let labels = store.list_labels_by_account(&account_id).await.unwrap();
840
841 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
843 assert!(
844 inbox_label.total_count > 0,
845 "Inbox should have messages after sync"
846 );
847
848 let envelopes = store
850 .list_envelopes_by_label(&inbox_label.id, 100, 0)
851 .await
852 .unwrap();
853
854 let all_envelopes = store
856 .list_envelopes_by_account(&account_id, 200, 0)
857 .await
858 .unwrap();
859
860 assert!(
861 !envelopes.is_empty(),
862 "list_envelopes_by_label should return messages for Inbox label (got 0). \
863 label_id={}, total_count={}, all_count={}",
864 inbox_label.id,
865 inbox_label.total_count,
866 all_envelopes.len()
867 );
868
869 assert!(
871 envelopes.len() <= all_envelopes.len(),
872 "Inbox-by-label ({}) should be <= all ({})",
873 envelopes.len(),
874 all_envelopes.len()
875 );
876 }
877
878 #[tokio::test]
879 async fn list_envelopes_by_sent_label_may_be_empty() {
880 let store = Arc::new(Store::in_memory().await.unwrap());
881 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
882 let engine = SyncEngine::new(store.clone(), search.clone());
883
884 let account_id = AccountId::new();
885 store
886 .insert_account(&test_account(account_id.clone()))
887 .await
888 .unwrap();
889
890 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
891 engine.sync_account(&provider).await.unwrap();
892
893 let labels = store.list_labels_by_account(&account_id).await.unwrap();
894
895 let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
897
898 let envelopes = store
899 .list_envelopes_by_label(&sent_label.id, 100, 0)
900 .await
901 .unwrap();
902
903 assert_eq!(
905 envelopes.len(),
906 0,
907 "Sent should have 0 messages in fake provider"
908 );
909
910 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
912 let inbox_envelopes = store
913 .list_envelopes_by_label(&inbox_label.id, 100, 0)
914 .await
915 .unwrap();
916 assert!(
917 !inbox_envelopes.is_empty(),
918 "Inbox should still have messages after querying Sent"
919 );
920
921 let all_envelopes = store
923 .list_envelopes_by_account(&account_id, 100, 0)
924 .await
925 .unwrap();
926 assert!(
927 !all_envelopes.is_empty(),
928 "All envelopes should still be retrievable"
929 );
930 }
931
932 #[tokio::test]
933 async fn progressive_loading_chunks() {
934 let store = Arc::new(Store::in_memory().await.unwrap());
935 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
936 let engine = SyncEngine::new(store.clone(), search.clone());
937
938 let account_id = AccountId::new();
939 store
940 .insert_account(&test_account(account_id.clone()))
941 .await
942 .unwrap();
943
944 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
945 let count = engine.sync_account(&provider).await.unwrap();
946 assert_eq!(count, 55, "Sync should report 55 messages processed");
947
948 let envelopes = store
950 .list_envelopes_by_account(&account_id, 200, 0)
951 .await
952 .unwrap();
953 assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
954
955 let results = search
957 .lock()
958 .await
959 .search("deployment", 10, 0, SortOrder::DateDesc)
960 .unwrap();
961 assert!(
962 !results.results.is_empty(),
963 "Search index should have 'deployment' results"
964 );
965 }
966
967 #[tokio::test]
968 async fn delta_sync_no_duplicate_labels() {
969 let store = Arc::new(Store::in_memory().await.unwrap());
970 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
971 let engine = SyncEngine::new(store.clone(), search.clone());
972
973 let account_id = AccountId::new();
974 store
975 .insert_account(&test_account(account_id.clone()))
976 .await
977 .unwrap();
978
979 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
980
981 engine.sync_account(&provider).await.unwrap();
983
984 let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
985 let label_count_first = labels_after_first.len();
986
987 let delta_count = engine.sync_account(&provider).await.unwrap();
989 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
990
991 let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
992
993 assert_eq!(
995 label_count_first,
996 labels_after_second.len(),
997 "Label count should not change after delta sync"
998 );
999
1000 for label in &labels_after_first {
1002 let still_exists = labels_after_second
1003 .iter()
1004 .any(|l| l.provider_id == label.provider_id && l.name == label.name);
1005 assert!(
1006 still_exists,
1007 "Label '{}' (provider_id='{}') should survive delta sync",
1008 label.name, label.provider_id
1009 );
1010 }
1011
1012 let envelopes = store
1015 .list_envelopes_by_account(&account_id, 200, 0)
1016 .await
1017 .unwrap();
1018 assert_eq!(
1019 envelopes.len(),
1020 55,
1021 "All 55 messages should survive delta sync"
1022 );
1023 }
1024
1025 #[tokio::test]
1026 async fn delta_sync_preserves_junction_table() {
1027 let store = Arc::new(Store::in_memory().await.unwrap());
1028 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1029 let engine = SyncEngine::new(store.clone(), search.clone());
1030
1031 let account_id = AccountId::new();
1032 store
1033 .insert_account(&test_account(account_id.clone()))
1034 .await
1035 .unwrap();
1036
1037 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1038
1039 engine.sync_account(&provider).await.unwrap();
1041
1042 let junction_before = store.count_message_labels().await.unwrap();
1043 assert!(
1044 junction_before > 0,
1045 "Junction table should be populated after initial sync"
1046 );
1047
1048 let delta_count = engine.sync_account(&provider).await.unwrap();
1050 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
1051
1052 let junction_after = store.count_message_labels().await.unwrap();
1053 assert_eq!(
1054 junction_before, junction_after,
1055 "Junction table should survive delta sync (before={}, after={})",
1056 junction_before, junction_after
1057 );
1058
1059 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1061 let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
1062 let envelopes = store
1063 .list_envelopes_by_label(&inbox.id, 100, 0)
1064 .await
1065 .unwrap();
1066 assert!(
1067 !envelopes.is_empty(),
1068 "Inbox should still return messages after delta sync"
1069 );
1070 }
1071
1072 #[tokio::test]
1073 async fn backfill_triggers_when_junction_empty() {
1074 let store = Arc::new(Store::in_memory().await.unwrap());
1075 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1076 let engine = SyncEngine::new(store.clone(), search.clone());
1077
1078 let account_id = AccountId::new();
1079 store
1080 .insert_account(&test_account(account_id.clone()))
1081 .await
1082 .unwrap();
1083
1084 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1085
1086 engine.sync_account(&provider).await.unwrap();
1088
1089 let junction_before = store.count_message_labels().await.unwrap();
1090 assert!(junction_before > 0);
1091
1092 sqlx::query("DELETE FROM message_labels")
1094 .execute(store.writer())
1095 .await
1096 .unwrap();
1097
1098 let junction_wiped = store.count_message_labels().await.unwrap();
1099 assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1100
1101 engine.sync_account(&provider).await.unwrap();
1103
1104 let junction_after = store.count_message_labels().await.unwrap();
1105 assert!(
1106 junction_after > 0,
1107 "Junction table should be repopulated after backfill (got {})",
1108 junction_after
1109 );
1110 }
1111
1112 #[tokio::test]
1113 async fn sync_label_resolution_matches_gmail_ids() {
1114 let store = Arc::new(Store::in_memory().await.unwrap());
1115 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1116 let engine = SyncEngine::new(store.clone(), search.clone());
1117
1118 let account_id = AccountId::new();
1119 store
1120 .insert_account(&test_account(account_id.clone()))
1121 .await
1122 .unwrap();
1123
1124 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1125 engine.sync_account(&provider).await.unwrap();
1126
1127 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1128
1129 let expected_mappings = [
1132 ("Inbox", "INBOX"),
1133 ("Sent", "SENT"),
1134 ("Trash", "TRASH"),
1135 ("Spam", "SPAM"),
1136 ("Starred", "STARRED"),
1137 ("Work", "work"),
1138 ("Personal", "personal"),
1139 ("Newsletters", "newsletters"),
1140 ];
1141 for (name, expected_pid) in &expected_mappings {
1142 let label = labels.iter().find(|l| l.name == *name);
1143 assert!(label.is_some(), "Label '{}' should exist after sync", name);
1144 assert_eq!(
1145 label.unwrap().provider_id,
1146 *expected_pid,
1147 "Label '{}' should have provider_id '{}'",
1148 name,
1149 expected_pid
1150 );
1151 }
1152
1153 let envelopes = store
1155 .list_envelopes_by_account(&account_id, 200, 0)
1156 .await
1157 .unwrap();
1158 let label_ids: std::collections::HashSet<String> =
1159 labels.iter().map(|l| l.id.as_str().to_string()).collect();
1160
1161 for env in &envelopes {
1162 let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1163 for lid in &msg_label_ids {
1164 assert!(
1165 label_ids.contains(&lid.as_str().to_string()),
1166 "Junction entry for message {} points to nonexistent label {}",
1167 env.id,
1168 lid
1169 );
1170 }
1171 }
1172 }
1173
1174 #[tokio::test]
1175 async fn list_envelopes_by_each_label_returns_correct_count() {
1176 let store = Arc::new(Store::in_memory().await.unwrap());
1177 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1178 let engine = SyncEngine::new(store.clone(), search.clone());
1179
1180 let account_id = AccountId::new();
1181 store
1182 .insert_account(&test_account(account_id.clone()))
1183 .await
1184 .unwrap();
1185
1186 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1187 engine.sync_account(&provider).await.unwrap();
1188
1189 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1190 for label in &labels {
1191 if label.total_count > 0 {
1192 let envelopes = store
1193 .list_envelopes_by_label(&label.id, 200, 0)
1194 .await
1195 .unwrap();
1196 assert_eq!(
1197 envelopes.len(),
1198 label.total_count as usize,
1199 "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1200 label.name,
1201 label.provider_id,
1202 label.total_count,
1203 envelopes.len()
1204 );
1205 }
1206 }
1207 }
1208
1209 #[tokio::test]
1210 async fn search_index_consistent_with_store() {
1211 let store = Arc::new(Store::in_memory().await.unwrap());
1212 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1213 let engine = SyncEngine::new(store.clone(), search.clone());
1214
1215 let account_id = AccountId::new();
1216 store
1217 .insert_account(&test_account(account_id.clone()))
1218 .await
1219 .unwrap();
1220
1221 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1222 engine.sync_account(&provider).await.unwrap();
1223
1224 let envelopes = store
1225 .list_envelopes_by_account(&account_id, 200, 0)
1226 .await
1227 .unwrap();
1228
1229 let search_guard = search.lock().await;
1230 for env in &envelopes {
1231 let keyword = env
1233 .subject
1234 .split_whitespace()
1235 .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1236 .unwrap_or(&env.subject);
1237 let results = search_guard
1238 .search(keyword, 100, 0, SortOrder::DateDesc)
1239 .unwrap();
1240 assert!(
1241 results
1242 .results
1243 .iter()
1244 .any(|r| r.message_id == env.id.as_str()),
1245 "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1246 env.id,
1247 env.subject,
1248 keyword
1249 );
1250 }
1251 }
1252
1253 #[tokio::test]
1254 async fn mutation_flags_persist_through_store() {
1255 let store = Arc::new(Store::in_memory().await.unwrap());
1256 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1257 let engine = SyncEngine::new(store.clone(), search.clone());
1258
1259 let account_id = AccountId::new();
1260 store
1261 .insert_account(&test_account(account_id.clone()))
1262 .await
1263 .unwrap();
1264
1265 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1266 engine.sync_account(&provider).await.unwrap();
1267
1268 let envelopes = store
1269 .list_envelopes_by_account(&account_id, 1, 0)
1270 .await
1271 .unwrap();
1272 let msg_id = &envelopes[0].id;
1273 let initial_flags = envelopes[0].flags;
1274
1275 store.set_starred(msg_id, true).await.unwrap();
1277 store.set_read(msg_id, true).await.unwrap();
1278
1279 let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1280 assert!(
1281 updated.flags.contains(MessageFlags::STARRED),
1282 "STARRED flag should be set"
1283 );
1284 assert!(
1285 updated.flags.contains(MessageFlags::READ),
1286 "READ flag should be set"
1287 );
1288
1289 store.set_starred(msg_id, false).await.unwrap();
1291 let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1292 assert!(
1293 !updated2.flags.contains(MessageFlags::STARRED),
1294 "STARRED flag should be cleared after set_starred(false)"
1295 );
1296 assert!(
1297 updated2.flags.contains(MessageFlags::READ),
1298 "READ flag should still be set after clearing STARRED"
1299 );
1300
1301 let _ = initial_flags; }
1305
1306 #[tokio::test]
1307 async fn junction_table_survives_message_update() {
1308 let store = Arc::new(Store::in_memory().await.unwrap());
1309 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1310 let engine = SyncEngine::new(store.clone(), search.clone());
1311
1312 let account_id = AccountId::new();
1313 store
1314 .insert_account(&test_account(account_id.clone()))
1315 .await
1316 .unwrap();
1317
1318 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1319 engine.sync_account(&provider).await.unwrap();
1320
1321 let envelopes = store
1323 .list_envelopes_by_account(&account_id, 1, 0)
1324 .await
1325 .unwrap();
1326 let msg_id = &envelopes[0].id;
1327
1328 let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1329 assert!(
1330 !junction_before.is_empty(),
1331 "Message should have label associations after sync"
1332 );
1333
1334 store.upsert_envelope(&envelopes[0]).await.unwrap();
1336 store
1338 .set_message_labels(msg_id, &junction_before)
1339 .await
1340 .unwrap();
1341
1342 let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1343 assert_eq!(
1344 junction_before.len(),
1345 junction_after.len(),
1346 "Junction rows should not double after re-sync (before={}, after={})",
1347 junction_before.len(),
1348 junction_after.len()
1349 );
1350 }
1351
1352 #[tokio::test]
1353 async fn find_labels_by_provider_ids_with_unknown_ids() {
1354 let store = Arc::new(Store::in_memory().await.unwrap());
1355 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1356 let engine = SyncEngine::new(store.clone(), search.clone());
1357
1358 let account_id = AccountId::new();
1359 store
1360 .insert_account(&test_account(account_id.clone()))
1361 .await
1362 .unwrap();
1363
1364 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1365 engine.sync_account(&provider).await.unwrap();
1366
1367 let result = store
1368 .find_labels_by_provider_ids(
1369 &account_id,
1370 &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1371 )
1372 .await
1373 .unwrap();
1374
1375 assert_eq!(
1376 result.len(),
1377 1,
1378 "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1379 );
1380 }
1381
1382 #[tokio::test]
1383 async fn body_available_after_sync() {
1384 let store = Arc::new(Store::in_memory().await.unwrap());
1385 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1386 let engine = SyncEngine::new(store.clone(), search.clone());
1387
1388 let account_id = AccountId::new();
1389 store
1390 .insert_account(&test_account(account_id.clone()))
1391 .await
1392 .unwrap();
1393
1394 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1395 engine.sync_account(&provider).await.unwrap();
1396
1397 let envelopes = store
1398 .list_envelopes_by_account(&account_id, 1, 0)
1399 .await
1400 .unwrap();
1401 let msg_id = &envelopes[0].id;
1402
1403 let body1 = engine.get_body(msg_id).await.unwrap();
1405 assert!(body1.text_plain.is_some(), "Body should have text_plain");
1406
1407 let body2 = engine.get_body(msg_id).await.unwrap();
1409
1410 assert_eq!(
1411 body1.text_plain, body2.text_plain,
1412 "Body text_plain should be consistent"
1413 );
1414 assert_eq!(
1415 body1.text_html, body2.text_html,
1416 "Body text_html should be consistent"
1417 );
1418 assert_eq!(
1419 body1.attachments.len(),
1420 body2.attachments.len(),
1421 "Body attachments count should be consistent"
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn gmail_not_found_cursor_resets_to_initial_and_recovers() {
1427 let store = Arc::new(Store::in_memory().await.unwrap());
1428 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1429 let engine = SyncEngine::new(store.clone(), search.clone());
1430
1431 let account_id = AccountId::new();
1432 store
1433 .insert_account(&test_account(account_id.clone()))
1434 .await
1435 .unwrap();
1436 store
1437 .set_sync_cursor(
1438 &account_id,
1439 &SyncCursor::Gmail {
1440 history_id: 27_697_494,
1441 },
1442 )
1443 .await
1444 .unwrap();
1445
1446 let message_id = MessageId::new();
1447 let provider = RecoveringNotFoundProvider {
1448 account_id: account_id.clone(),
1449 message: SyncedMessage {
1450 envelope: Envelope {
1451 id: message_id.clone(),
1452 account_id: account_id.clone(),
1453 provider_id: "recovered-1".into(),
1454 thread_id: ThreadId::new(),
1455 message_id_header: None,
1456 in_reply_to: None,
1457 references: vec![],
1458 from: Address {
1459 name: Some("Recovered".into()),
1460 email: "recovered@example.com".into(),
1461 },
1462 to: vec![],
1463 cc: vec![],
1464 bcc: vec![],
1465 subject: "Recovered after cursor reset".into(),
1466 date: chrono::Utc::now(),
1467 flags: MessageFlags::empty(),
1468 snippet: "Recovered".into(),
1469 has_attachments: false,
1470 size_bytes: 42,
1471 unsubscribe: UnsubscribeMethod::None,
1472 label_provider_ids: vec![],
1473 },
1474 body: make_empty_body(&message_id),
1475 },
1476 calls: std::sync::Mutex::new(Vec::new()),
1477 };
1478
1479 let outcome = engine.sync_account_with_outcome(&provider).await.unwrap();
1480
1481 assert_eq!(outcome.synced_count, 1);
1482 let calls = provider.calls.lock().unwrap();
1483 assert_eq!(calls.len(), 2);
1484 assert!(matches!(
1485 calls[0],
1486 SyncCursor::Gmail {
1487 history_id: 27_697_494
1488 }
1489 ));
1490 assert!(matches!(calls[1], SyncCursor::Initial));
1491 let stored_cursor = store.get_sync_cursor(&account_id).await.unwrap();
1492 assert!(matches!(
1493 stored_cursor,
1494 Some(SyncCursor::Gmail { history_id: 22 })
1495 ));
1496 }
1497
1498 #[tokio::test]
1499 async fn recalculate_label_counts_matches_junction() {
1500 let store = Arc::new(Store::in_memory().await.unwrap());
1501 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1502 let engine = SyncEngine::new(store.clone(), search.clone());
1503
1504 let account_id = AccountId::new();
1505 store
1506 .insert_account(&test_account(account_id.clone()))
1507 .await
1508 .unwrap();
1509
1510 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1511 engine.sync_account(&provider).await.unwrap();
1512
1513 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1514
1515 for label in &labels {
1516 let lid = label.id.as_str();
1517 let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1519 "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1520 )
1521 .bind(&lid)
1522 .fetch_one(store.reader())
1523 .await
1524 .unwrap();
1525
1526 assert_eq!(
1527 label.total_count as i64, junction_count,
1528 "Label '{}' total_count ({}) should match junction row count ({})",
1529 label.name, label.total_count, junction_count
1530 );
1531
1532 let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1534 "SELECT COUNT(*) FROM message_labels ml \
1535 JOIN messages m ON m.id = ml.message_id \
1536 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1537 )
1538 .bind(&lid)
1539 .fetch_one(store.reader())
1540 .await
1541 .unwrap();
1542
1543 assert_eq!(
1544 label.unread_count as i64, unread_count,
1545 "Label '{}' unread_count ({}) should match computed unread count ({})",
1546 label.name, label.unread_count, unread_count
1547 );
1548 }
1549 }
1550}