1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7 use super::*;
8 use mxr_core::id::*;
9 use mxr_core::types::*;
10 use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11 use mxr_search::SearchIndex;
12 use mxr_store::Store;
13 use std::sync::Arc;
14 use tokio::sync::Mutex;
15
16 struct ErrorProvider {
18 account_id: AccountId,
19 }
20
21 #[async_trait::async_trait]
22 impl MailSyncProvider for ErrorProvider {
23 fn name(&self) -> &str {
24 "error"
25 }
26 fn account_id(&self) -> &AccountId {
27 &self.account_id
28 }
29 fn capabilities(&self) -> SyncCapabilities {
30 SyncCapabilities {
31 labels: false,
32 server_search: false,
33 delta_sync: false,
34 push: false,
35 batch_operations: false,
36 native_thread_ids: true,
37 }
38 }
39 async fn authenticate(&mut self) -> Result<(), MxrError> {
40 Ok(())
41 }
42 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43 Ok(())
44 }
45 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46 Ok(vec![])
47 }
48 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49 Err(MxrError::Provider("simulated sync error".into()))
50 }
51 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52 Err(MxrError::Provider("simulated attachment error".into()))
53 }
54 async fn modify_labels(
55 &self,
56 _id: &str,
57 _add: &[String],
58 _rm: &[String],
59 ) -> Result<(), MxrError> {
60 Err(MxrError::Provider("simulated error".into()))
61 }
62 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63 Err(MxrError::Provider("simulated error".into()))
64 }
65 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66 Err(MxrError::Provider("simulated error".into()))
67 }
68 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69 Err(MxrError::Provider("simulated error".into()))
70 }
71 }
72
73 fn test_account(account_id: AccountId) -> mxr_core::Account {
74 mxr_core::Account {
75 id: account_id,
76 name: "Fake Account".to_string(),
77 email: "user@example.com".to_string(),
78 sync_backend: Some(BackendRef {
79 provider_kind: ProviderKind::Fake,
80 config_key: "fake".to_string(),
81 }),
82 send_backend: None,
83 enabled: true,
84 }
85 }
86
87 struct DeltaLabelProvider {
89 account_id: AccountId,
90 messages: Vec<SyncedMessage>,
91 labels: Vec<Label>,
92 label_changes: Vec<LabelChange>,
93 }
94
95 impl DeltaLabelProvider {
96 fn new(
97 account_id: AccountId,
98 messages: Vec<Envelope>,
99 labels: Vec<Label>,
100 label_changes: Vec<LabelChange>,
101 ) -> Self {
102 let messages = messages
103 .into_iter()
104 .map(|env| SyncedMessage {
105 body: make_empty_body(&env.id),
106 envelope: env,
107 })
108 .collect();
109 Self {
110 account_id,
111 messages,
112 labels,
113 label_changes,
114 }
115 }
116 }
117
118 struct ThreadingProvider {
119 account_id: AccountId,
120 messages: Vec<SyncedMessage>,
121 }
122
123 struct RecoveringNotFoundProvider {
124 account_id: AccountId,
125 message: SyncedMessage,
126 calls: std::sync::Mutex<Vec<SyncCursor>>,
127 }
128
129 #[async_trait::async_trait]
130 impl MailSyncProvider for ThreadingProvider {
131 fn name(&self) -> &str {
132 "threading"
133 }
134
135 fn account_id(&self) -> &AccountId {
136 &self.account_id
137 }
138
139 fn capabilities(&self) -> SyncCapabilities {
140 SyncCapabilities {
141 labels: false,
142 server_search: false,
143 delta_sync: false,
144 push: false,
145 batch_operations: false,
146 native_thread_ids: false,
147 }
148 }
149
150 async fn authenticate(&mut self) -> Result<(), MxrError> {
151 Ok(())
152 }
153
154 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
155 Ok(())
156 }
157
158 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
159 Ok(vec![])
160 }
161
162 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
163 Ok(SyncBatch {
164 upserted: self.messages.clone(),
165 deleted_provider_ids: vec![],
166 label_changes: vec![],
167 next_cursor: SyncCursor::Initial,
168 })
169 }
170
171 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
172 Err(MxrError::NotFound("no attachment".into()))
173 }
174
175 async fn modify_labels(
176 &self,
177 _id: &str,
178 _add: &[String],
179 _rm: &[String],
180 ) -> Result<(), MxrError> {
181 Ok(())
182 }
183
184 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
185 Ok(())
186 }
187
188 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
189 Ok(())
190 }
191
192 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
193 Ok(())
194 }
195 }
196
197 #[async_trait::async_trait]
198 impl MailSyncProvider for RecoveringNotFoundProvider {
199 fn name(&self) -> &str {
200 "recovering-not-found"
201 }
202
203 fn account_id(&self) -> &AccountId {
204 &self.account_id
205 }
206
207 fn capabilities(&self) -> SyncCapabilities {
208 SyncCapabilities {
209 labels: false,
210 server_search: false,
211 delta_sync: true,
212 push: false,
213 batch_operations: false,
214 native_thread_ids: true,
215 }
216 }
217
218 async fn authenticate(&mut self) -> Result<(), MxrError> {
219 Ok(())
220 }
221
222 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
223 Ok(())
224 }
225
226 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227 Ok(vec![])
228 }
229
230 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
231 self.calls.lock().unwrap().push(cursor.clone());
232 match cursor {
233 SyncCursor::Gmail { .. } => {
234 Err(MxrError::NotFound("Requested entity was not found.".into()))
235 }
236 SyncCursor::Initial => Ok(SyncBatch {
237 upserted: vec![self.message.clone()],
238 deleted_provider_ids: vec![],
239 label_changes: vec![],
240 next_cursor: SyncCursor::Gmail { history_id: 22 },
241 }),
242 other => panic!("unexpected cursor in test: {other:?}"),
243 }
244 }
245
246 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
247 Err(MxrError::NotFound("no attachment".into()))
248 }
249
250 async fn modify_labels(
251 &self,
252 _id: &str,
253 _add: &[String],
254 _rm: &[String],
255 ) -> Result<(), MxrError> {
256 Ok(())
257 }
258
259 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
260 Ok(())
261 }
262
263 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
264 Ok(())
265 }
266
267 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
268 Ok(())
269 }
270 }
271
272 fn make_empty_body(message_id: &MessageId) -> MessageBody {
273 MessageBody {
274 message_id: message_id.clone(),
275 text_plain: Some("test body".to_string()),
276 text_html: None,
277 attachments: vec![],
278 fetched_at: chrono::Utc::now(),
279 metadata: MessageMetadata::default(),
280 }
281 }
282
283 #[async_trait::async_trait]
284 impl MailSyncProvider for DeltaLabelProvider {
285 fn name(&self) -> &str {
286 "delta-label"
287 }
288 fn account_id(&self) -> &AccountId {
289 &self.account_id
290 }
291 fn capabilities(&self) -> SyncCapabilities {
292 SyncCapabilities {
293 labels: true,
294 server_search: false,
295 delta_sync: true,
296 push: false,
297 batch_operations: false,
298 native_thread_ids: true,
299 }
300 }
301 async fn authenticate(&mut self) -> Result<(), MxrError> {
302 Ok(())
303 }
304 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
305 Ok(())
306 }
307 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
308 Ok(self.labels.clone())
309 }
310 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
311 match cursor {
312 SyncCursor::Initial => Ok(SyncBatch {
313 upserted: self.messages.clone(),
314 deleted_provider_ids: vec![],
315 label_changes: vec![],
316 next_cursor: SyncCursor::Gmail { history_id: 100 },
317 }),
318 _ => Ok(SyncBatch {
319 upserted: vec![],
320 deleted_provider_ids: vec![],
321 label_changes: self.label_changes.clone(),
322 next_cursor: SyncCursor::Gmail { history_id: 200 },
323 }),
324 }
325 }
326 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
327 Err(MxrError::NotFound("no attachment".into()))
328 }
329 async fn modify_labels(
330 &self,
331 _id: &str,
332 _add: &[String],
333 _rm: &[String],
334 ) -> Result<(), MxrError> {
335 Ok(())
336 }
337 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
338 Ok(())
339 }
340 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
341 Ok(())
342 }
343 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
344 Ok(())
345 }
346 }
347
348 fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
349 Label {
350 id: LabelId::new(),
351 account_id: account_id.clone(),
352 name: name.to_string(),
353 kind: LabelKind::System,
354 color: None,
355 provider_id: provider_id.to_string(),
356 unread_count: 0,
357 total_count: 0,
358 }
359 }
360
361 fn make_test_envelope(
362 account_id: &AccountId,
363 provider_id: &str,
364 label_provider_ids: Vec<String>,
365 ) -> Envelope {
366 Envelope {
367 id: MessageId::new(),
368 account_id: account_id.clone(),
369 provider_id: provider_id.to_string(),
370 thread_id: ThreadId::new(),
371 message_id_header: None,
372 in_reply_to: None,
373 references: vec![],
374 from: mxr_core::Address {
375 name: Some("Test".to_string()),
376 email: "test@example.com".to_string(),
377 },
378 to: vec![],
379 cc: vec![],
380 bcc: vec![],
381 subject: "Test message".to_string(),
382 date: chrono::Utc::now(),
383 flags: MessageFlags::empty(),
384 snippet: "Test snippet".to_string(),
385 has_attachments: false,
386 size_bytes: 1000,
387 unsubscribe: UnsubscribeMethod::None,
388 label_provider_ids,
389 }
390 }
391
392 #[tokio::test]
393 async fn delta_sync_applies_label_additions() {
394 let store = Arc::new(Store::in_memory().await.unwrap());
395 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
396 let engine = SyncEngine::new(store.clone(), search.clone());
397
398 let account_id = AccountId::new();
399 store
400 .insert_account(&test_account(account_id.clone()))
401 .await
402 .unwrap();
403
404 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
405 let starred = make_test_label(&account_id, "Starred", "STARRED");
406 let labels = vec![inbox.clone(), starred.clone()];
407
408 let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
409 let msg_provider_id = msg.provider_id.clone();
410
411 let provider = DeltaLabelProvider::new(
412 account_id.clone(),
413 vec![msg],
414 labels,
415 vec![LabelChange {
416 provider_message_id: msg_provider_id.clone(),
417 added_labels: vec!["STARRED".to_string()],
418 removed_labels: vec![],
419 }],
420 );
421
422 engine.sync_account(&provider).await.unwrap();
424
425 let msg_id = store
427 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
428 .await
429 .unwrap()
430 .unwrap();
431 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
432 assert!(labels_before.contains(&inbox.id));
433 assert!(!labels_before.contains(&starred.id));
434
435 engine.sync_account(&provider).await.unwrap();
437
438 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
439 assert!(
440 labels_after.contains(&inbox.id),
441 "INBOX should still be present"
442 );
443 assert!(
444 labels_after.contains(&starred.id),
445 "STARRED should be added by delta"
446 );
447 }
448
449 #[tokio::test]
450 async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
451 let store = Arc::new(Store::in_memory().await.unwrap());
452 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
453 let engine = SyncEngine::new(store.clone(), search);
454
455 let account_id = AccountId::new();
456 store
457 .insert_account(&test_account(account_id.clone()))
458 .await
459 .unwrap();
460
461 let first_id = MessageId::new();
462 let second_id = MessageId::new();
463 let first = SyncedMessage {
464 envelope: Envelope {
465 id: first_id.clone(),
466 account_id: account_id.clone(),
467 provider_id: "prov-thread-1".into(),
468 thread_id: ThreadId::new(),
469 message_id_header: Some("<root@example.com>".into()),
470 in_reply_to: None,
471 references: vec![],
472 from: mxr_core::Address {
473 name: Some("Alice".into()),
474 email: "alice@example.com".into(),
475 },
476 to: vec![],
477 cc: vec![],
478 bcc: vec![],
479 subject: "Topic".into(),
480 date: chrono::Utc::now() - chrono::Duration::minutes(5),
481 flags: MessageFlags::empty(),
482 snippet: "first".into(),
483 has_attachments: false,
484 size_bytes: 100,
485 unsubscribe: UnsubscribeMethod::None,
486 label_provider_ids: vec![],
487 },
488 body: make_empty_body(&first_id),
489 };
490 let second = SyncedMessage {
491 envelope: Envelope {
492 id: second_id.clone(),
493 account_id: account_id.clone(),
494 provider_id: "prov-thread-2".into(),
495 thread_id: ThreadId::new(),
496 message_id_header: Some("<reply@example.com>".into()),
497 in_reply_to: Some("<root@example.com>".into()),
498 references: vec!["<root@example.com>".into()],
499 from: mxr_core::Address {
500 name: Some("Bob".into()),
501 email: "bob@example.com".into(),
502 },
503 to: vec![],
504 cc: vec![],
505 bcc: vec![],
506 subject: "Re: Topic".into(),
507 date: chrono::Utc::now(),
508 flags: MessageFlags::empty(),
509 snippet: "second".into(),
510 has_attachments: false,
511 size_bytes: 100,
512 unsubscribe: UnsubscribeMethod::None,
513 label_provider_ids: vec![],
514 },
515 body: make_empty_body(&second_id),
516 };
517
518 let provider = ThreadingProvider {
519 account_id: account_id.clone(),
520 messages: vec![first, second],
521 };
522
523 engine.sync_account(&provider).await.unwrap();
524
525 let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
526 let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
527 assert_eq!(first_env.thread_id, second_env.thread_id);
528
529 let thread = store
530 .get_thread(&first_env.thread_id)
531 .await
532 .unwrap()
533 .unwrap();
534 assert_eq!(thread.message_count, 2);
535 }
536
537 #[tokio::test]
538 async fn delta_sync_applies_label_removals() {
539 let store = Arc::new(Store::in_memory().await.unwrap());
540 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
541 let engine = SyncEngine::new(store.clone(), search.clone());
542
543 let account_id = AccountId::new();
544 store
545 .insert_account(&test_account(account_id.clone()))
546 .await
547 .unwrap();
548
549 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
550 let starred = make_test_label(&account_id, "Starred", "STARRED");
551 let labels = vec![inbox.clone(), starred.clone()];
552
553 let msg = make_test_envelope(
554 &account_id,
555 "prov-msg-2",
556 vec!["INBOX".to_string(), "STARRED".to_string()],
557 );
558 let msg_provider_id = msg.provider_id.clone();
559
560 let provider = DeltaLabelProvider::new(
561 account_id.clone(),
562 vec![msg],
563 labels,
564 vec![LabelChange {
565 provider_message_id: msg_provider_id.clone(),
566 added_labels: vec![],
567 removed_labels: vec!["STARRED".to_string()],
568 }],
569 );
570
571 engine.sync_account(&provider).await.unwrap();
573
574 let msg_id = store
575 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
576 .await
577 .unwrap()
578 .unwrap();
579 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
580 assert!(
581 labels_before.contains(&starred.id),
582 "STARRED should be present after initial sync"
583 );
584
585 engine.sync_account(&provider).await.unwrap();
587
588 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
589 assert!(labels_after.contains(&inbox.id), "INBOX should remain");
590 assert!(
591 !labels_after.contains(&starred.id),
592 "STARRED should be removed by delta"
593 );
594 }
595
596 #[tokio::test]
597 async fn delta_sync_handles_unknown_provider_message() {
598 let store = Arc::new(Store::in_memory().await.unwrap());
599 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
600 let engine = SyncEngine::new(store.clone(), search.clone());
601
602 let account_id = AccountId::new();
603 store
604 .insert_account(&test_account(account_id.clone()))
605 .await
606 .unwrap();
607
608 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
609 let labels = vec![inbox.clone()];
610
611 let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
612
613 let provider = DeltaLabelProvider::new(
614 account_id.clone(),
615 vec![msg],
616 labels,
617 vec![LabelChange {
618 provider_message_id: "nonexistent-msg".to_string(),
620 added_labels: vec!["INBOX".to_string()],
621 removed_labels: vec![],
622 }],
623 );
624
625 engine.sync_account(&provider).await.unwrap();
627
628 let result = engine.sync_account(&provider).await;
630 assert!(
631 result.is_ok(),
632 "Delta sync should gracefully skip unknown messages"
633 );
634 }
635
636 #[tokio::test]
637 async fn sync_populates_store_and_search() {
638 let store = Arc::new(Store::in_memory().await.unwrap());
639 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
640 let engine = SyncEngine::new(store.clone(), search.clone());
641
642 let account_id = AccountId::new();
643 store
644 .insert_account(&test_account(account_id.clone()))
645 .await
646 .unwrap();
647
648 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
649 let count = engine.sync_account(&provider).await.unwrap();
650 assert_eq!(count, 55);
651
652 let envelopes = store
654 .list_envelopes_by_account(&account_id, 100, 0)
655 .await
656 .unwrap();
657 assert_eq!(envelopes.len(), 55);
658
659 let results = search.lock().await.search("deployment", 10).unwrap();
661 assert!(!results.is_empty());
662 }
663
664 #[tokio::test]
665 async fn bodies_stored_eagerly_during_sync() {
666 let store = Arc::new(Store::in_memory().await.unwrap());
667 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
668 let engine = SyncEngine::new(store.clone(), search.clone());
669
670 let account_id = AccountId::new();
671 store
672 .insert_account(&test_account(account_id.clone()))
673 .await
674 .unwrap();
675
676 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
677 engine.sync_account(&provider).await.unwrap();
678
679 let envelopes = store
681 .list_envelopes_by_account(&account_id, 1, 0)
682 .await
683 .unwrap();
684 let msg_id = &envelopes[0].id;
685
686 let body = engine.get_body(msg_id).await.unwrap();
688 assert!(body.text_plain.is_some());
689
690 let body2 = engine.get_body(msg_id).await.unwrap();
692 assert_eq!(body.text_plain, body2.text_plain);
693 }
694
695 #[tokio::test]
696 async fn snooze_wake() {
697 let store = Arc::new(Store::in_memory().await.unwrap());
698 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
699 let engine = SyncEngine::new(store.clone(), search.clone());
700
701 let account_id = AccountId::new();
702 store
703 .insert_account(&test_account(account_id.clone()))
704 .await
705 .unwrap();
706
707 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
708 engine.sync_account(&provider).await.unwrap();
709
710 let envelopes = store
712 .list_envelopes_by_account(&account_id, 1, 0)
713 .await
714 .unwrap();
715
716 let snoozed = Snoozed {
717 message_id: envelopes[0].id.clone(),
718 account_id: account_id.clone(),
719 snoozed_at: chrono::Utc::now(),
720 wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
721 original_labels: vec![],
722 };
723 store.insert_snooze(&snoozed).await.unwrap();
724
725 let woken = engine.check_snoozes().await.unwrap();
726 assert_eq!(woken.len(), 1);
727
728 let woken2 = engine.check_snoozes().await.unwrap();
730 assert_eq!(woken2.len(), 0);
731 }
732
733 #[tokio::test]
734 async fn cursor_persistence() {
735 let store = Arc::new(Store::in_memory().await.unwrap());
736 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
737 let engine = SyncEngine::new(store.clone(), search.clone());
738
739 let account_id = AccountId::new();
740 store
741 .insert_account(&test_account(account_id.clone()))
742 .await
743 .unwrap();
744
745 let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
747 assert!(cursor_before.is_none());
748
749 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
750 engine.sync_account(&provider).await.unwrap();
751
752 let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
754 assert!(cursor_after.is_some(), "Cursor should be set after sync");
755 let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
756 assert!(
757 cursor_json.contains("Gmail") && cursor_json.contains("1"),
758 "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
759 cursor_json
760 );
761 }
762
763 #[tokio::test]
764 async fn sync_error_does_not_crash() {
765 let store = Arc::new(Store::in_memory().await.unwrap());
766 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
767 let engine = SyncEngine::new(store.clone(), search.clone());
768
769 let account_id = AccountId::new();
770 store
771 .insert_account(&test_account(account_id.clone()))
772 .await
773 .unwrap();
774
775 let error_provider = ErrorProvider {
776 account_id: account_id.clone(),
777 };
778
779 let result = engine.sync_account(&error_provider).await;
781 assert!(
782 result.is_err(),
783 "sync_account should return Err for failing provider"
784 );
785 let err_msg = result.unwrap_err().to_string();
786 assert!(
787 err_msg.contains("simulated"),
788 "Error should contain provider message, got: {}",
789 err_msg
790 );
791 }
792
793 #[tokio::test]
794 async fn label_counts_after_sync() {
795 let store = Arc::new(Store::in_memory().await.unwrap());
796 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
797 let engine = SyncEngine::new(store.clone(), search.clone());
798
799 let account_id = AccountId::new();
800 store
801 .insert_account(&test_account(account_id.clone()))
802 .await
803 .unwrap();
804
805 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
806 engine.sync_account(&provider).await.unwrap();
807
808 let labels = store.list_labels_by_account(&account_id).await.unwrap();
809 assert!(!labels.is_empty(), "Should have labels after sync");
810
811 let has_counts = labels
812 .iter()
813 .any(|l| l.total_count > 0 || l.unread_count > 0);
814 assert!(
815 has_counts,
816 "At least one label should have non-zero counts after sync"
817 );
818 }
819
820 #[tokio::test]
821 async fn list_envelopes_by_label_returns_results() {
822 let store = Arc::new(Store::in_memory().await.unwrap());
823 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
824 let engine = SyncEngine::new(store.clone(), search.clone());
825
826 let account_id = AccountId::new();
827 store
828 .insert_account(&test_account(account_id.clone()))
829 .await
830 .unwrap();
831
832 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
833 engine.sync_account(&provider).await.unwrap();
834
835 let labels = store.list_labels_by_account(&account_id).await.unwrap();
836
837 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
839 assert!(
840 inbox_label.total_count > 0,
841 "Inbox should have messages after sync"
842 );
843
844 let envelopes = store
846 .list_envelopes_by_label(&inbox_label.id, 100, 0)
847 .await
848 .unwrap();
849
850 let all_envelopes = store
852 .list_envelopes_by_account(&account_id, 200, 0)
853 .await
854 .unwrap();
855
856 assert!(
857 !envelopes.is_empty(),
858 "list_envelopes_by_label should return messages for Inbox label (got 0). \
859 label_id={}, total_count={}, all_count={}",
860 inbox_label.id,
861 inbox_label.total_count,
862 all_envelopes.len()
863 );
864
865 assert!(
867 envelopes.len() <= all_envelopes.len(),
868 "Inbox-by-label ({}) should be <= all ({})",
869 envelopes.len(),
870 all_envelopes.len()
871 );
872 }
873
874 #[tokio::test]
875 async fn list_envelopes_by_sent_label_may_be_empty() {
876 let store = Arc::new(Store::in_memory().await.unwrap());
877 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
878 let engine = SyncEngine::new(store.clone(), search.clone());
879
880 let account_id = AccountId::new();
881 store
882 .insert_account(&test_account(account_id.clone()))
883 .await
884 .unwrap();
885
886 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
887 engine.sync_account(&provider).await.unwrap();
888
889 let labels = store.list_labels_by_account(&account_id).await.unwrap();
890
891 let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
893
894 let envelopes = store
895 .list_envelopes_by_label(&sent_label.id, 100, 0)
896 .await
897 .unwrap();
898
899 assert_eq!(
901 envelopes.len(),
902 0,
903 "Sent should have 0 messages in fake provider"
904 );
905
906 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
908 let inbox_envelopes = store
909 .list_envelopes_by_label(&inbox_label.id, 100, 0)
910 .await
911 .unwrap();
912 assert!(
913 !inbox_envelopes.is_empty(),
914 "Inbox should still have messages after querying Sent"
915 );
916
917 let all_envelopes = store
919 .list_envelopes_by_account(&account_id, 100, 0)
920 .await
921 .unwrap();
922 assert!(
923 !all_envelopes.is_empty(),
924 "All envelopes should still be retrievable"
925 );
926 }
927
928 #[tokio::test]
929 async fn progressive_loading_chunks() {
930 let store = Arc::new(Store::in_memory().await.unwrap());
931 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
932 let engine = SyncEngine::new(store.clone(), search.clone());
933
934 let account_id = AccountId::new();
935 store
936 .insert_account(&test_account(account_id.clone()))
937 .await
938 .unwrap();
939
940 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
941 let count = engine.sync_account(&provider).await.unwrap();
942 assert_eq!(count, 55, "Sync should report 55 messages processed");
943
944 let envelopes = store
946 .list_envelopes_by_account(&account_id, 200, 0)
947 .await
948 .unwrap();
949 assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
950
951 let results = search.lock().await.search("deployment", 10).unwrap();
953 assert!(
954 !results.is_empty(),
955 "Search index should have 'deployment' results"
956 );
957 }
958
959 #[tokio::test]
960 async fn delta_sync_no_duplicate_labels() {
961 let store = Arc::new(Store::in_memory().await.unwrap());
962 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
963 let engine = SyncEngine::new(store.clone(), search.clone());
964
965 let account_id = AccountId::new();
966 store
967 .insert_account(&test_account(account_id.clone()))
968 .await
969 .unwrap();
970
971 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
972
973 engine.sync_account(&provider).await.unwrap();
975
976 let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
977 let label_count_first = labels_after_first.len();
978
979 let delta_count = engine.sync_account(&provider).await.unwrap();
981 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
982
983 let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
984
985 assert_eq!(
987 label_count_first,
988 labels_after_second.len(),
989 "Label count should not change after delta sync"
990 );
991
992 for label in &labels_after_first {
994 let still_exists = labels_after_second
995 .iter()
996 .any(|l| l.provider_id == label.provider_id && l.name == label.name);
997 assert!(
998 still_exists,
999 "Label '{}' (provider_id='{}') should survive delta sync",
1000 label.name, label.provider_id
1001 );
1002 }
1003
1004 let envelopes = store
1007 .list_envelopes_by_account(&account_id, 200, 0)
1008 .await
1009 .unwrap();
1010 assert_eq!(
1011 envelopes.len(),
1012 55,
1013 "All 55 messages should survive delta sync"
1014 );
1015 }
1016
1017 #[tokio::test]
1018 async fn delta_sync_preserves_junction_table() {
1019 let store = Arc::new(Store::in_memory().await.unwrap());
1020 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1021 let engine = SyncEngine::new(store.clone(), search.clone());
1022
1023 let account_id = AccountId::new();
1024 store
1025 .insert_account(&test_account(account_id.clone()))
1026 .await
1027 .unwrap();
1028
1029 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1030
1031 engine.sync_account(&provider).await.unwrap();
1033
1034 let junction_before = store.count_message_labels().await.unwrap();
1035 assert!(
1036 junction_before > 0,
1037 "Junction table should be populated after initial sync"
1038 );
1039
1040 let delta_count = engine.sync_account(&provider).await.unwrap();
1042 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
1043
1044 let junction_after = store.count_message_labels().await.unwrap();
1045 assert_eq!(
1046 junction_before, junction_after,
1047 "Junction table should survive delta sync (before={}, after={})",
1048 junction_before, junction_after
1049 );
1050
1051 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1053 let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
1054 let envelopes = store
1055 .list_envelopes_by_label(&inbox.id, 100, 0)
1056 .await
1057 .unwrap();
1058 assert!(
1059 !envelopes.is_empty(),
1060 "Inbox should still return messages after delta sync"
1061 );
1062 }
1063
1064 #[tokio::test]
1065 async fn backfill_triggers_when_junction_empty() {
1066 let store = Arc::new(Store::in_memory().await.unwrap());
1067 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1068 let engine = SyncEngine::new(store.clone(), search.clone());
1069
1070 let account_id = AccountId::new();
1071 store
1072 .insert_account(&test_account(account_id.clone()))
1073 .await
1074 .unwrap();
1075
1076 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1077
1078 engine.sync_account(&provider).await.unwrap();
1080
1081 let junction_before = store.count_message_labels().await.unwrap();
1082 assert!(junction_before > 0);
1083
1084 sqlx::query("DELETE FROM message_labels")
1086 .execute(store.writer())
1087 .await
1088 .unwrap();
1089
1090 let junction_wiped = store.count_message_labels().await.unwrap();
1091 assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1092
1093 engine.sync_account(&provider).await.unwrap();
1095
1096 let junction_after = store.count_message_labels().await.unwrap();
1097 assert!(
1098 junction_after > 0,
1099 "Junction table should be repopulated after backfill (got {})",
1100 junction_after
1101 );
1102 }
1103
1104 #[tokio::test]
1105 async fn sync_label_resolution_matches_gmail_ids() {
1106 let store = Arc::new(Store::in_memory().await.unwrap());
1107 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1108 let engine = SyncEngine::new(store.clone(), search.clone());
1109
1110 let account_id = AccountId::new();
1111 store
1112 .insert_account(&test_account(account_id.clone()))
1113 .await
1114 .unwrap();
1115
1116 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1117 engine.sync_account(&provider).await.unwrap();
1118
1119 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1120
1121 let expected_mappings = [
1124 ("Inbox", "INBOX"),
1125 ("Sent", "SENT"),
1126 ("Trash", "TRASH"),
1127 ("Spam", "SPAM"),
1128 ("Starred", "STARRED"),
1129 ("Work", "work"),
1130 ("Personal", "personal"),
1131 ("Newsletters", "newsletters"),
1132 ];
1133 for (name, expected_pid) in &expected_mappings {
1134 let label = labels.iter().find(|l| l.name == *name);
1135 assert!(label.is_some(), "Label '{}' should exist after sync", name);
1136 assert_eq!(
1137 label.unwrap().provider_id,
1138 *expected_pid,
1139 "Label '{}' should have provider_id '{}'",
1140 name,
1141 expected_pid
1142 );
1143 }
1144
1145 let envelopes = store
1147 .list_envelopes_by_account(&account_id, 200, 0)
1148 .await
1149 .unwrap();
1150 let label_ids: std::collections::HashSet<String> =
1151 labels.iter().map(|l| l.id.as_str().to_string()).collect();
1152
1153 for env in &envelopes {
1154 let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1155 for lid in &msg_label_ids {
1156 assert!(
1157 label_ids.contains(&lid.as_str().to_string()),
1158 "Junction entry for message {} points to nonexistent label {}",
1159 env.id,
1160 lid
1161 );
1162 }
1163 }
1164 }
1165
1166 #[tokio::test]
1167 async fn list_envelopes_by_each_label_returns_correct_count() {
1168 let store = Arc::new(Store::in_memory().await.unwrap());
1169 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1170 let engine = SyncEngine::new(store.clone(), search.clone());
1171
1172 let account_id = AccountId::new();
1173 store
1174 .insert_account(&test_account(account_id.clone()))
1175 .await
1176 .unwrap();
1177
1178 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1179 engine.sync_account(&provider).await.unwrap();
1180
1181 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1182 for label in &labels {
1183 if label.total_count > 0 {
1184 let envelopes = store
1185 .list_envelopes_by_label(&label.id, 200, 0)
1186 .await
1187 .unwrap();
1188 assert_eq!(
1189 envelopes.len(),
1190 label.total_count as usize,
1191 "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1192 label.name,
1193 label.provider_id,
1194 label.total_count,
1195 envelopes.len()
1196 );
1197 }
1198 }
1199 }
1200
1201 #[tokio::test]
1202 async fn search_index_consistent_with_store() {
1203 let store = Arc::new(Store::in_memory().await.unwrap());
1204 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1205 let engine = SyncEngine::new(store.clone(), search.clone());
1206
1207 let account_id = AccountId::new();
1208 store
1209 .insert_account(&test_account(account_id.clone()))
1210 .await
1211 .unwrap();
1212
1213 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1214 engine.sync_account(&provider).await.unwrap();
1215
1216 let envelopes = store
1217 .list_envelopes_by_account(&account_id, 200, 0)
1218 .await
1219 .unwrap();
1220
1221 let search_guard = search.lock().await;
1222 for env in &envelopes {
1223 let keyword = env
1225 .subject
1226 .split_whitespace()
1227 .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1228 .unwrap_or(&env.subject);
1229 let results = search_guard.search(keyword, 100).unwrap();
1230 assert!(
1231 results.iter().any(|r| r.message_id == env.id.as_str()),
1232 "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1233 env.id,
1234 env.subject,
1235 keyword
1236 );
1237 }
1238 }
1239
1240 #[tokio::test]
1241 async fn mutation_flags_persist_through_store() {
1242 let store = Arc::new(Store::in_memory().await.unwrap());
1243 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1244 let engine = SyncEngine::new(store.clone(), search.clone());
1245
1246 let account_id = AccountId::new();
1247 store
1248 .insert_account(&test_account(account_id.clone()))
1249 .await
1250 .unwrap();
1251
1252 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1253 engine.sync_account(&provider).await.unwrap();
1254
1255 let envelopes = store
1256 .list_envelopes_by_account(&account_id, 1, 0)
1257 .await
1258 .unwrap();
1259 let msg_id = &envelopes[0].id;
1260 let initial_flags = envelopes[0].flags;
1261
1262 store.set_starred(msg_id, true).await.unwrap();
1264 store.set_read(msg_id, true).await.unwrap();
1265
1266 let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1267 assert!(
1268 updated.flags.contains(MessageFlags::STARRED),
1269 "STARRED flag should be set"
1270 );
1271 assert!(
1272 updated.flags.contains(MessageFlags::READ),
1273 "READ flag should be set"
1274 );
1275
1276 store.set_starred(msg_id, false).await.unwrap();
1278 let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1279 assert!(
1280 !updated2.flags.contains(MessageFlags::STARRED),
1281 "STARRED flag should be cleared after set_starred(false)"
1282 );
1283 assert!(
1284 updated2.flags.contains(MessageFlags::READ),
1285 "READ flag should still be set after clearing STARRED"
1286 );
1287
1288 let _ = initial_flags; }
1292
1293 #[tokio::test]
1294 async fn junction_table_survives_message_update() {
1295 let store = Arc::new(Store::in_memory().await.unwrap());
1296 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1297 let engine = SyncEngine::new(store.clone(), search.clone());
1298
1299 let account_id = AccountId::new();
1300 store
1301 .insert_account(&test_account(account_id.clone()))
1302 .await
1303 .unwrap();
1304
1305 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1306 engine.sync_account(&provider).await.unwrap();
1307
1308 let envelopes = store
1310 .list_envelopes_by_account(&account_id, 1, 0)
1311 .await
1312 .unwrap();
1313 let msg_id = &envelopes[0].id;
1314
1315 let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1316 assert!(
1317 !junction_before.is_empty(),
1318 "Message should have label associations after sync"
1319 );
1320
1321 store.upsert_envelope(&envelopes[0]).await.unwrap();
1323 store
1325 .set_message_labels(msg_id, &junction_before)
1326 .await
1327 .unwrap();
1328
1329 let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1330 assert_eq!(
1331 junction_before.len(),
1332 junction_after.len(),
1333 "Junction rows should not double after re-sync (before={}, after={})",
1334 junction_before.len(),
1335 junction_after.len()
1336 );
1337 }
1338
1339 #[tokio::test]
1340 async fn find_labels_by_provider_ids_with_unknown_ids() {
1341 let store = Arc::new(Store::in_memory().await.unwrap());
1342 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1343 let engine = SyncEngine::new(store.clone(), search.clone());
1344
1345 let account_id = AccountId::new();
1346 store
1347 .insert_account(&test_account(account_id.clone()))
1348 .await
1349 .unwrap();
1350
1351 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1352 engine.sync_account(&provider).await.unwrap();
1353
1354 let result = store
1355 .find_labels_by_provider_ids(
1356 &account_id,
1357 &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1358 )
1359 .await
1360 .unwrap();
1361
1362 assert_eq!(
1363 result.len(),
1364 1,
1365 "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn body_available_after_sync() {
1371 let store = Arc::new(Store::in_memory().await.unwrap());
1372 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1373 let engine = SyncEngine::new(store.clone(), search.clone());
1374
1375 let account_id = AccountId::new();
1376 store
1377 .insert_account(&test_account(account_id.clone()))
1378 .await
1379 .unwrap();
1380
1381 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1382 engine.sync_account(&provider).await.unwrap();
1383
1384 let envelopes = store
1385 .list_envelopes_by_account(&account_id, 1, 0)
1386 .await
1387 .unwrap();
1388 let msg_id = &envelopes[0].id;
1389
1390 let body1 = engine.get_body(msg_id).await.unwrap();
1392 assert!(body1.text_plain.is_some(), "Body should have text_plain");
1393
1394 let body2 = engine.get_body(msg_id).await.unwrap();
1396
1397 assert_eq!(
1398 body1.text_plain, body2.text_plain,
1399 "Body text_plain should be consistent"
1400 );
1401 assert_eq!(
1402 body1.text_html, body2.text_html,
1403 "Body text_html should be consistent"
1404 );
1405 assert_eq!(
1406 body1.attachments.len(),
1407 body2.attachments.len(),
1408 "Body attachments count should be consistent"
1409 );
1410 }
1411
1412 #[tokio::test]
1413 async fn gmail_not_found_cursor_resets_to_initial_and_recovers() {
1414 let store = Arc::new(Store::in_memory().await.unwrap());
1415 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1416 let engine = SyncEngine::new(store.clone(), search.clone());
1417
1418 let account_id = AccountId::new();
1419 store
1420 .insert_account(&test_account(account_id.clone()))
1421 .await
1422 .unwrap();
1423 store
1424 .set_sync_cursor(
1425 &account_id,
1426 &SyncCursor::Gmail {
1427 history_id: 27_697_494,
1428 },
1429 )
1430 .await
1431 .unwrap();
1432
1433 let message_id = MessageId::new();
1434 let provider = RecoveringNotFoundProvider {
1435 account_id: account_id.clone(),
1436 message: SyncedMessage {
1437 envelope: Envelope {
1438 id: message_id.clone(),
1439 account_id: account_id.clone(),
1440 provider_id: "recovered-1".into(),
1441 thread_id: ThreadId::new(),
1442 message_id_header: None,
1443 in_reply_to: None,
1444 references: vec![],
1445 from: Address {
1446 name: Some("Recovered".into()),
1447 email: "recovered@example.com".into(),
1448 },
1449 to: vec![],
1450 cc: vec![],
1451 bcc: vec![],
1452 subject: "Recovered after cursor reset".into(),
1453 date: chrono::Utc::now(),
1454 flags: MessageFlags::empty(),
1455 snippet: "Recovered".into(),
1456 has_attachments: false,
1457 size_bytes: 42,
1458 unsubscribe: UnsubscribeMethod::None,
1459 label_provider_ids: vec![],
1460 },
1461 body: make_empty_body(&message_id),
1462 },
1463 calls: std::sync::Mutex::new(Vec::new()),
1464 };
1465
1466 let outcome = engine.sync_account_with_outcome(&provider).await.unwrap();
1467
1468 assert_eq!(outcome.synced_count, 1);
1469 let calls = provider.calls.lock().unwrap();
1470 assert_eq!(calls.len(), 2);
1471 assert!(matches!(
1472 calls[0],
1473 SyncCursor::Gmail {
1474 history_id: 27_697_494
1475 }
1476 ));
1477 assert!(matches!(calls[1], SyncCursor::Initial));
1478 let stored_cursor = store.get_sync_cursor(&account_id).await.unwrap();
1479 assert!(matches!(
1480 stored_cursor,
1481 Some(SyncCursor::Gmail { history_id: 22 })
1482 ));
1483 }
1484
1485 #[tokio::test]
1486 async fn recalculate_label_counts_matches_junction() {
1487 let store = Arc::new(Store::in_memory().await.unwrap());
1488 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1489 let engine = SyncEngine::new(store.clone(), search.clone());
1490
1491 let account_id = AccountId::new();
1492 store
1493 .insert_account(&test_account(account_id.clone()))
1494 .await
1495 .unwrap();
1496
1497 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1498 engine.sync_account(&provider).await.unwrap();
1499
1500 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1501
1502 for label in &labels {
1503 let lid = label.id.as_str();
1504 let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1506 "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1507 )
1508 .bind(&lid)
1509 .fetch_one(store.reader())
1510 .await
1511 .unwrap();
1512
1513 assert_eq!(
1514 label.total_count as i64, junction_count,
1515 "Label '{}' total_count ({}) should match junction row count ({})",
1516 label.name, label.total_count, junction_count
1517 );
1518
1519 let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1521 "SELECT COUNT(*) FROM message_labels ml \
1522 JOIN messages m ON m.id = ml.message_id \
1523 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1524 )
1525 .bind(&lid)
1526 .fetch_one(store.reader())
1527 .await
1528 .unwrap();
1529
1530 assert_eq!(
1531 label.unread_count as i64, unread_count,
1532 "Label '{}' unread_count ({}) should match computed unread count ({})",
1533 label.name, label.unread_count, unread_count
1534 );
1535 }
1536 }
1537}