1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7 use super::*;
8 use mxr_core::id::*;
9 use mxr_core::types::*;
10 use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11 use mxr_search::SearchIndex;
12 use mxr_store::Store;
13 use std::sync::Arc;
14 use tokio::sync::Mutex;
15
16 struct ErrorProvider {
18 account_id: AccountId,
19 }
20
21 #[async_trait::async_trait]
22 impl MailSyncProvider for ErrorProvider {
23 fn name(&self) -> &str {
24 "error"
25 }
26 fn account_id(&self) -> &AccountId {
27 &self.account_id
28 }
29 fn capabilities(&self) -> SyncCapabilities {
30 SyncCapabilities {
31 labels: false,
32 server_search: false,
33 delta_sync: false,
34 push: false,
35 batch_operations: false,
36 native_thread_ids: true,
37 }
38 }
39 async fn authenticate(&mut self) -> Result<(), MxrError> {
40 Ok(())
41 }
42 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43 Ok(())
44 }
45 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46 Ok(vec![])
47 }
48 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49 Err(MxrError::Provider("simulated sync error".into()))
50 }
51 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52 Err(MxrError::Provider("simulated attachment error".into()))
53 }
54 async fn modify_labels(
55 &self,
56 _id: &str,
57 _add: &[String],
58 _rm: &[String],
59 ) -> Result<(), MxrError> {
60 Err(MxrError::Provider("simulated error".into()))
61 }
62 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63 Err(MxrError::Provider("simulated error".into()))
64 }
65 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66 Err(MxrError::Provider("simulated error".into()))
67 }
68 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69 Err(MxrError::Provider("simulated error".into()))
70 }
71 }
72
73 fn test_account(account_id: AccountId) -> mxr_core::Account {
74 mxr_core::Account {
75 id: account_id,
76 name: "Fake Account".to_string(),
77 email: "user@example.com".to_string(),
78 sync_backend: Some(BackendRef {
79 provider_kind: ProviderKind::Fake,
80 config_key: "fake".to_string(),
81 }),
82 send_backend: None,
83 enabled: true,
84 }
85 }
86
87 struct DeltaLabelProvider {
89 account_id: AccountId,
90 messages: Vec<SyncedMessage>,
91 labels: Vec<Label>,
92 label_changes: Vec<LabelChange>,
93 }
94
95 impl DeltaLabelProvider {
96 fn new(
97 account_id: AccountId,
98 messages: Vec<Envelope>,
99 labels: Vec<Label>,
100 label_changes: Vec<LabelChange>,
101 ) -> Self {
102 let messages = messages
103 .into_iter()
104 .map(|env| SyncedMessage {
105 body: make_empty_body(&env.id),
106 envelope: env,
107 })
108 .collect();
109 Self {
110 account_id,
111 messages,
112 labels,
113 label_changes,
114 }
115 }
116 }
117
118 struct ThreadingProvider {
119 account_id: AccountId,
120 messages: Vec<SyncedMessage>,
121 }
122
123 #[async_trait::async_trait]
124 impl MailSyncProvider for ThreadingProvider {
125 fn name(&self) -> &str {
126 "threading"
127 }
128
129 fn account_id(&self) -> &AccountId {
130 &self.account_id
131 }
132
133 fn capabilities(&self) -> SyncCapabilities {
134 SyncCapabilities {
135 labels: false,
136 server_search: false,
137 delta_sync: false,
138 push: false,
139 batch_operations: false,
140 native_thread_ids: false,
141 }
142 }
143
144 async fn authenticate(&mut self) -> Result<(), MxrError> {
145 Ok(())
146 }
147
148 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
149 Ok(())
150 }
151
152 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
153 Ok(vec![])
154 }
155
156 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
157 Ok(SyncBatch {
158 upserted: self.messages.clone(),
159 deleted_provider_ids: vec![],
160 label_changes: vec![],
161 next_cursor: SyncCursor::Initial,
162 })
163 }
164
165 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
166 Err(MxrError::NotFound("no attachment".into()))
167 }
168
169 async fn modify_labels(
170 &self,
171 _id: &str,
172 _add: &[String],
173 _rm: &[String],
174 ) -> Result<(), MxrError> {
175 Ok(())
176 }
177
178 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
179 Ok(())
180 }
181
182 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
183 Ok(())
184 }
185
186 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
187 Ok(())
188 }
189 }
190
191 fn make_empty_body(message_id: &MessageId) -> MessageBody {
192 MessageBody {
193 message_id: message_id.clone(),
194 text_plain: Some("test body".to_string()),
195 text_html: None,
196 attachments: vec![],
197 fetched_at: chrono::Utc::now(),
198 metadata: MessageMetadata::default(),
199 }
200 }
201
202 #[async_trait::async_trait]
203 impl MailSyncProvider for DeltaLabelProvider {
204 fn name(&self) -> &str {
205 "delta-label"
206 }
207 fn account_id(&self) -> &AccountId {
208 &self.account_id
209 }
210 fn capabilities(&self) -> SyncCapabilities {
211 SyncCapabilities {
212 labels: true,
213 server_search: false,
214 delta_sync: true,
215 push: false,
216 batch_operations: false,
217 native_thread_ids: true,
218 }
219 }
220 async fn authenticate(&mut self) -> Result<(), MxrError> {
221 Ok(())
222 }
223 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
224 Ok(())
225 }
226 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227 Ok(self.labels.clone())
228 }
229 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
230 match cursor {
231 SyncCursor::Initial => Ok(SyncBatch {
232 upserted: self.messages.clone(),
233 deleted_provider_ids: vec![],
234 label_changes: vec![],
235 next_cursor: SyncCursor::Gmail { history_id: 100 },
236 }),
237 _ => Ok(SyncBatch {
238 upserted: vec![],
239 deleted_provider_ids: vec![],
240 label_changes: self.label_changes.clone(),
241 next_cursor: SyncCursor::Gmail { history_id: 200 },
242 }),
243 }
244 }
245 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
246 Err(MxrError::NotFound("no attachment".into()))
247 }
248 async fn modify_labels(
249 &self,
250 _id: &str,
251 _add: &[String],
252 _rm: &[String],
253 ) -> Result<(), MxrError> {
254 Ok(())
255 }
256 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
257 Ok(())
258 }
259 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
260 Ok(())
261 }
262 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
263 Ok(())
264 }
265 }
266
267 fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
268 Label {
269 id: LabelId::new(),
270 account_id: account_id.clone(),
271 name: name.to_string(),
272 kind: LabelKind::System,
273 color: None,
274 provider_id: provider_id.to_string(),
275 unread_count: 0,
276 total_count: 0,
277 }
278 }
279
280 fn make_test_envelope(
281 account_id: &AccountId,
282 provider_id: &str,
283 label_provider_ids: Vec<String>,
284 ) -> Envelope {
285 Envelope {
286 id: MessageId::new(),
287 account_id: account_id.clone(),
288 provider_id: provider_id.to_string(),
289 thread_id: ThreadId::new(),
290 message_id_header: None,
291 in_reply_to: None,
292 references: vec![],
293 from: mxr_core::Address {
294 name: Some("Test".to_string()),
295 email: "test@example.com".to_string(),
296 },
297 to: vec![],
298 cc: vec![],
299 bcc: vec![],
300 subject: "Test message".to_string(),
301 date: chrono::Utc::now(),
302 flags: MessageFlags::empty(),
303 snippet: "Test snippet".to_string(),
304 has_attachments: false,
305 size_bytes: 1000,
306 unsubscribe: UnsubscribeMethod::None,
307 label_provider_ids,
308 }
309 }
310
311 #[tokio::test]
312 async fn delta_sync_applies_label_additions() {
313 let store = Arc::new(Store::in_memory().await.unwrap());
314 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
315 let engine = SyncEngine::new(store.clone(), search.clone());
316
317 let account_id = AccountId::new();
318 store
319 .insert_account(&test_account(account_id.clone()))
320 .await
321 .unwrap();
322
323 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
324 let starred = make_test_label(&account_id, "Starred", "STARRED");
325 let labels = vec![inbox.clone(), starred.clone()];
326
327 let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
328 let msg_provider_id = msg.provider_id.clone();
329
330 let provider = DeltaLabelProvider::new(
331 account_id.clone(),
332 vec![msg],
333 labels,
334 vec![LabelChange {
335 provider_message_id: msg_provider_id.clone(),
336 added_labels: vec!["STARRED".to_string()],
337 removed_labels: vec![],
338 }],
339 );
340
341 engine.sync_account(&provider).await.unwrap();
343
344 let msg_id = store
346 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
347 .await
348 .unwrap()
349 .unwrap();
350 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
351 assert!(labels_before.contains(&inbox.id));
352 assert!(!labels_before.contains(&starred.id));
353
354 engine.sync_account(&provider).await.unwrap();
356
357 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
358 assert!(
359 labels_after.contains(&inbox.id),
360 "INBOX should still be present"
361 );
362 assert!(
363 labels_after.contains(&starred.id),
364 "STARRED should be added by delta"
365 );
366 }
367
368 #[tokio::test]
369 async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
370 let store = Arc::new(Store::in_memory().await.unwrap());
371 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
372 let engine = SyncEngine::new(store.clone(), search);
373
374 let account_id = AccountId::new();
375 store
376 .insert_account(&test_account(account_id.clone()))
377 .await
378 .unwrap();
379
380 let first_id = MessageId::new();
381 let second_id = MessageId::new();
382 let first = SyncedMessage {
383 envelope: Envelope {
384 id: first_id.clone(),
385 account_id: account_id.clone(),
386 provider_id: "prov-thread-1".into(),
387 thread_id: ThreadId::new(),
388 message_id_header: Some("<root@example.com>".into()),
389 in_reply_to: None,
390 references: vec![],
391 from: mxr_core::Address {
392 name: Some("Alice".into()),
393 email: "alice@example.com".into(),
394 },
395 to: vec![],
396 cc: vec![],
397 bcc: vec![],
398 subject: "Topic".into(),
399 date: chrono::Utc::now() - chrono::Duration::minutes(5),
400 flags: MessageFlags::empty(),
401 snippet: "first".into(),
402 has_attachments: false,
403 size_bytes: 100,
404 unsubscribe: UnsubscribeMethod::None,
405 label_provider_ids: vec![],
406 },
407 body: make_empty_body(&first_id),
408 };
409 let second = SyncedMessage {
410 envelope: Envelope {
411 id: second_id.clone(),
412 account_id: account_id.clone(),
413 provider_id: "prov-thread-2".into(),
414 thread_id: ThreadId::new(),
415 message_id_header: Some("<reply@example.com>".into()),
416 in_reply_to: Some("<root@example.com>".into()),
417 references: vec!["<root@example.com>".into()],
418 from: mxr_core::Address {
419 name: Some("Bob".into()),
420 email: "bob@example.com".into(),
421 },
422 to: vec![],
423 cc: vec![],
424 bcc: vec![],
425 subject: "Re: Topic".into(),
426 date: chrono::Utc::now(),
427 flags: MessageFlags::empty(),
428 snippet: "second".into(),
429 has_attachments: false,
430 size_bytes: 100,
431 unsubscribe: UnsubscribeMethod::None,
432 label_provider_ids: vec![],
433 },
434 body: make_empty_body(&second_id),
435 };
436
437 let provider = ThreadingProvider {
438 account_id: account_id.clone(),
439 messages: vec![first, second],
440 };
441
442 engine.sync_account(&provider).await.unwrap();
443
444 let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
445 let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
446 assert_eq!(first_env.thread_id, second_env.thread_id);
447
448 let thread = store
449 .get_thread(&first_env.thread_id)
450 .await
451 .unwrap()
452 .unwrap();
453 assert_eq!(thread.message_count, 2);
454 }
455
456 #[tokio::test]
457 async fn delta_sync_applies_label_removals() {
458 let store = Arc::new(Store::in_memory().await.unwrap());
459 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
460 let engine = SyncEngine::new(store.clone(), search.clone());
461
462 let account_id = AccountId::new();
463 store
464 .insert_account(&test_account(account_id.clone()))
465 .await
466 .unwrap();
467
468 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
469 let starred = make_test_label(&account_id, "Starred", "STARRED");
470 let labels = vec![inbox.clone(), starred.clone()];
471
472 let msg = make_test_envelope(
473 &account_id,
474 "prov-msg-2",
475 vec!["INBOX".to_string(), "STARRED".to_string()],
476 );
477 let msg_provider_id = msg.provider_id.clone();
478
479 let provider = DeltaLabelProvider::new(
480 account_id.clone(),
481 vec![msg],
482 labels,
483 vec![LabelChange {
484 provider_message_id: msg_provider_id.clone(),
485 added_labels: vec![],
486 removed_labels: vec!["STARRED".to_string()],
487 }],
488 );
489
490 engine.sync_account(&provider).await.unwrap();
492
493 let msg_id = store
494 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
495 .await
496 .unwrap()
497 .unwrap();
498 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
499 assert!(
500 labels_before.contains(&starred.id),
501 "STARRED should be present after initial sync"
502 );
503
504 engine.sync_account(&provider).await.unwrap();
506
507 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
508 assert!(labels_after.contains(&inbox.id), "INBOX should remain");
509 assert!(
510 !labels_after.contains(&starred.id),
511 "STARRED should be removed by delta"
512 );
513 }
514
515 #[tokio::test]
516 async fn delta_sync_handles_unknown_provider_message() {
517 let store = Arc::new(Store::in_memory().await.unwrap());
518 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
519 let engine = SyncEngine::new(store.clone(), search.clone());
520
521 let account_id = AccountId::new();
522 store
523 .insert_account(&test_account(account_id.clone()))
524 .await
525 .unwrap();
526
527 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
528 let labels = vec![inbox.clone()];
529
530 let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
531
532 let provider = DeltaLabelProvider::new(
533 account_id.clone(),
534 vec![msg],
535 labels,
536 vec![LabelChange {
537 provider_message_id: "nonexistent-msg".to_string(),
539 added_labels: vec!["INBOX".to_string()],
540 removed_labels: vec![],
541 }],
542 );
543
544 engine.sync_account(&provider).await.unwrap();
546
547 let result = engine.sync_account(&provider).await;
549 assert!(
550 result.is_ok(),
551 "Delta sync should gracefully skip unknown messages"
552 );
553 }
554
555 #[tokio::test]
556 async fn sync_populates_store_and_search() {
557 let store = Arc::new(Store::in_memory().await.unwrap());
558 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
559 let engine = SyncEngine::new(store.clone(), search.clone());
560
561 let account_id = AccountId::new();
562 store
563 .insert_account(&test_account(account_id.clone()))
564 .await
565 .unwrap();
566
567 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
568 let count = engine.sync_account(&provider).await.unwrap();
569 assert_eq!(count, 55);
570
571 let envelopes = store
573 .list_envelopes_by_account(&account_id, 100, 0)
574 .await
575 .unwrap();
576 assert_eq!(envelopes.len(), 55);
577
578 let results = search.lock().await.search("deployment", 10).unwrap();
580 assert!(!results.is_empty());
581 }
582
583 #[tokio::test]
584 async fn bodies_stored_eagerly_during_sync() {
585 let store = Arc::new(Store::in_memory().await.unwrap());
586 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
587 let engine = SyncEngine::new(store.clone(), search.clone());
588
589 let account_id = AccountId::new();
590 store
591 .insert_account(&test_account(account_id.clone()))
592 .await
593 .unwrap();
594
595 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
596 engine.sync_account(&provider).await.unwrap();
597
598 let envelopes = store
600 .list_envelopes_by_account(&account_id, 1, 0)
601 .await
602 .unwrap();
603 let msg_id = &envelopes[0].id;
604
605 let body = engine.get_body(msg_id).await.unwrap();
607 assert!(body.text_plain.is_some());
608
609 let body2 = engine.get_body(msg_id).await.unwrap();
611 assert_eq!(body.text_plain, body2.text_plain);
612 }
613
614 #[tokio::test]
615 async fn snooze_wake() {
616 let store = Arc::new(Store::in_memory().await.unwrap());
617 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
618 let engine = SyncEngine::new(store.clone(), search.clone());
619
620 let account_id = AccountId::new();
621 store
622 .insert_account(&test_account(account_id.clone()))
623 .await
624 .unwrap();
625
626 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
627 engine.sync_account(&provider).await.unwrap();
628
629 let envelopes = store
631 .list_envelopes_by_account(&account_id, 1, 0)
632 .await
633 .unwrap();
634
635 let snoozed = Snoozed {
636 message_id: envelopes[0].id.clone(),
637 account_id: account_id.clone(),
638 snoozed_at: chrono::Utc::now(),
639 wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
640 original_labels: vec![],
641 };
642 store.insert_snooze(&snoozed).await.unwrap();
643
644 let woken = engine.check_snoozes().await.unwrap();
645 assert_eq!(woken.len(), 1);
646
647 let woken2 = engine.check_snoozes().await.unwrap();
649 assert_eq!(woken2.len(), 0);
650 }
651
652 #[tokio::test]
653 async fn cursor_persistence() {
654 let store = Arc::new(Store::in_memory().await.unwrap());
655 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
656 let engine = SyncEngine::new(store.clone(), search.clone());
657
658 let account_id = AccountId::new();
659 store
660 .insert_account(&test_account(account_id.clone()))
661 .await
662 .unwrap();
663
664 let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
666 assert!(cursor_before.is_none());
667
668 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
669 engine.sync_account(&provider).await.unwrap();
670
671 let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
673 assert!(cursor_after.is_some(), "Cursor should be set after sync");
674 let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
675 assert!(
676 cursor_json.contains("Gmail") && cursor_json.contains("1"),
677 "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
678 cursor_json
679 );
680 }
681
682 #[tokio::test]
683 async fn sync_error_does_not_crash() {
684 let store = Arc::new(Store::in_memory().await.unwrap());
685 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
686 let engine = SyncEngine::new(store.clone(), search.clone());
687
688 let account_id = AccountId::new();
689 store
690 .insert_account(&test_account(account_id.clone()))
691 .await
692 .unwrap();
693
694 let error_provider = ErrorProvider {
695 account_id: account_id.clone(),
696 };
697
698 let result = engine.sync_account(&error_provider).await;
700 assert!(
701 result.is_err(),
702 "sync_account should return Err for failing provider"
703 );
704 let err_msg = result.unwrap_err().to_string();
705 assert!(
706 err_msg.contains("simulated"),
707 "Error should contain provider message, got: {}",
708 err_msg
709 );
710 }
711
712 #[tokio::test]
713 async fn label_counts_after_sync() {
714 let store = Arc::new(Store::in_memory().await.unwrap());
715 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
716 let engine = SyncEngine::new(store.clone(), search.clone());
717
718 let account_id = AccountId::new();
719 store
720 .insert_account(&test_account(account_id.clone()))
721 .await
722 .unwrap();
723
724 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
725 engine.sync_account(&provider).await.unwrap();
726
727 let labels = store.list_labels_by_account(&account_id).await.unwrap();
728 assert!(!labels.is_empty(), "Should have labels after sync");
729
730 let has_counts = labels
731 .iter()
732 .any(|l| l.total_count > 0 || l.unread_count > 0);
733 assert!(
734 has_counts,
735 "At least one label should have non-zero counts after sync"
736 );
737 }
738
739 #[tokio::test]
740 async fn list_envelopes_by_label_returns_results() {
741 let store = Arc::new(Store::in_memory().await.unwrap());
742 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
743 let engine = SyncEngine::new(store.clone(), search.clone());
744
745 let account_id = AccountId::new();
746 store
747 .insert_account(&test_account(account_id.clone()))
748 .await
749 .unwrap();
750
751 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
752 engine.sync_account(&provider).await.unwrap();
753
754 let labels = store.list_labels_by_account(&account_id).await.unwrap();
755
756 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
758 assert!(
759 inbox_label.total_count > 0,
760 "Inbox should have messages after sync"
761 );
762
763 let envelopes = store
765 .list_envelopes_by_label(&inbox_label.id, 100, 0)
766 .await
767 .unwrap();
768
769 let all_envelopes = store
771 .list_envelopes_by_account(&account_id, 200, 0)
772 .await
773 .unwrap();
774
775 assert!(
776 !envelopes.is_empty(),
777 "list_envelopes_by_label should return messages for Inbox label (got 0). \
778 label_id={}, total_count={}, all_count={}",
779 inbox_label.id,
780 inbox_label.total_count,
781 all_envelopes.len()
782 );
783
784 assert!(
786 envelopes.len() <= all_envelopes.len(),
787 "Inbox-by-label ({}) should be <= all ({})",
788 envelopes.len(),
789 all_envelopes.len()
790 );
791 }
792
793 #[tokio::test]
794 async fn list_envelopes_by_sent_label_may_be_empty() {
795 let store = Arc::new(Store::in_memory().await.unwrap());
796 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
797 let engine = SyncEngine::new(store.clone(), search.clone());
798
799 let account_id = AccountId::new();
800 store
801 .insert_account(&test_account(account_id.clone()))
802 .await
803 .unwrap();
804
805 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
806 engine.sync_account(&provider).await.unwrap();
807
808 let labels = store.list_labels_by_account(&account_id).await.unwrap();
809
810 let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
812
813 let envelopes = store
814 .list_envelopes_by_label(&sent_label.id, 100, 0)
815 .await
816 .unwrap();
817
818 assert_eq!(
820 envelopes.len(),
821 0,
822 "Sent should have 0 messages in fake provider"
823 );
824
825 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
827 let inbox_envelopes = store
828 .list_envelopes_by_label(&inbox_label.id, 100, 0)
829 .await
830 .unwrap();
831 assert!(
832 !inbox_envelopes.is_empty(),
833 "Inbox should still have messages after querying Sent"
834 );
835
836 let all_envelopes = store
838 .list_envelopes_by_account(&account_id, 100, 0)
839 .await
840 .unwrap();
841 assert!(
842 !all_envelopes.is_empty(),
843 "All envelopes should still be retrievable"
844 );
845 }
846
847 #[tokio::test]
848 async fn progressive_loading_chunks() {
849 let store = Arc::new(Store::in_memory().await.unwrap());
850 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
851 let engine = SyncEngine::new(store.clone(), search.clone());
852
853 let account_id = AccountId::new();
854 store
855 .insert_account(&test_account(account_id.clone()))
856 .await
857 .unwrap();
858
859 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
860 let count = engine.sync_account(&provider).await.unwrap();
861 assert_eq!(count, 55, "Sync should report 55 messages processed");
862
863 let envelopes = store
865 .list_envelopes_by_account(&account_id, 200, 0)
866 .await
867 .unwrap();
868 assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
869
870 let results = search.lock().await.search("deployment", 10).unwrap();
872 assert!(
873 !results.is_empty(),
874 "Search index should have 'deployment' results"
875 );
876 }
877
878 #[tokio::test]
879 async fn delta_sync_no_duplicate_labels() {
880 let store = Arc::new(Store::in_memory().await.unwrap());
881 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
882 let engine = SyncEngine::new(store.clone(), search.clone());
883
884 let account_id = AccountId::new();
885 store
886 .insert_account(&test_account(account_id.clone()))
887 .await
888 .unwrap();
889
890 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
891
892 engine.sync_account(&provider).await.unwrap();
894
895 let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
896 let label_count_first = labels_after_first.len();
897
898 let delta_count = engine.sync_account(&provider).await.unwrap();
900 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
901
902 let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
903
904 assert_eq!(
906 label_count_first,
907 labels_after_second.len(),
908 "Label count should not change after delta sync"
909 );
910
911 for label in &labels_after_first {
913 let still_exists = labels_after_second
914 .iter()
915 .any(|l| l.provider_id == label.provider_id && l.name == label.name);
916 assert!(
917 still_exists,
918 "Label '{}' (provider_id='{}') should survive delta sync",
919 label.name, label.provider_id
920 );
921 }
922
923 let envelopes = store
926 .list_envelopes_by_account(&account_id, 200, 0)
927 .await
928 .unwrap();
929 assert_eq!(
930 envelopes.len(),
931 55,
932 "All 55 messages should survive delta sync"
933 );
934 }
935
936 #[tokio::test]
937 async fn delta_sync_preserves_junction_table() {
938 let store = Arc::new(Store::in_memory().await.unwrap());
939 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
940 let engine = SyncEngine::new(store.clone(), search.clone());
941
942 let account_id = AccountId::new();
943 store
944 .insert_account(&test_account(account_id.clone()))
945 .await
946 .unwrap();
947
948 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
949
950 engine.sync_account(&provider).await.unwrap();
952
953 let junction_before = store.count_message_labels().await.unwrap();
954 assert!(
955 junction_before > 0,
956 "Junction table should be populated after initial sync"
957 );
958
959 let delta_count = engine.sync_account(&provider).await.unwrap();
961 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
962
963 let junction_after = store.count_message_labels().await.unwrap();
964 assert_eq!(
965 junction_before, junction_after,
966 "Junction table should survive delta sync (before={}, after={})",
967 junction_before, junction_after
968 );
969
970 let labels = store.list_labels_by_account(&account_id).await.unwrap();
972 let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
973 let envelopes = store
974 .list_envelopes_by_label(&inbox.id, 100, 0)
975 .await
976 .unwrap();
977 assert!(
978 !envelopes.is_empty(),
979 "Inbox should still return messages after delta sync"
980 );
981 }
982
983 #[tokio::test]
984 async fn backfill_triggers_when_junction_empty() {
985 let store = Arc::new(Store::in_memory().await.unwrap());
986 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
987 let engine = SyncEngine::new(store.clone(), search.clone());
988
989 let account_id = AccountId::new();
990 store
991 .insert_account(&test_account(account_id.clone()))
992 .await
993 .unwrap();
994
995 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
996
997 engine.sync_account(&provider).await.unwrap();
999
1000 let junction_before = store.count_message_labels().await.unwrap();
1001 assert!(junction_before > 0);
1002
1003 sqlx::query("DELETE FROM message_labels")
1005 .execute(store.writer())
1006 .await
1007 .unwrap();
1008
1009 let junction_wiped = store.count_message_labels().await.unwrap();
1010 assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1011
1012 engine.sync_account(&provider).await.unwrap();
1014
1015 let junction_after = store.count_message_labels().await.unwrap();
1016 assert!(
1017 junction_after > 0,
1018 "Junction table should be repopulated after backfill (got {})",
1019 junction_after
1020 );
1021 }
1022
1023 #[tokio::test]
1024 async fn sync_label_resolution_matches_gmail_ids() {
1025 let store = Arc::new(Store::in_memory().await.unwrap());
1026 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1027 let engine = SyncEngine::new(store.clone(), search.clone());
1028
1029 let account_id = AccountId::new();
1030 store
1031 .insert_account(&test_account(account_id.clone()))
1032 .await
1033 .unwrap();
1034
1035 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1036 engine.sync_account(&provider).await.unwrap();
1037
1038 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1039
1040 let expected_mappings = [
1043 ("Inbox", "INBOX"),
1044 ("Sent", "SENT"),
1045 ("Trash", "TRASH"),
1046 ("Spam", "SPAM"),
1047 ("Starred", "STARRED"),
1048 ("Work", "work"),
1049 ("Personal", "personal"),
1050 ("Newsletters", "newsletters"),
1051 ];
1052 for (name, expected_pid) in &expected_mappings {
1053 let label = labels.iter().find(|l| l.name == *name);
1054 assert!(label.is_some(), "Label '{}' should exist after sync", name);
1055 assert_eq!(
1056 label.unwrap().provider_id,
1057 *expected_pid,
1058 "Label '{}' should have provider_id '{}'",
1059 name,
1060 expected_pid
1061 );
1062 }
1063
1064 let envelopes = store
1066 .list_envelopes_by_account(&account_id, 200, 0)
1067 .await
1068 .unwrap();
1069 let label_ids: std::collections::HashSet<String> =
1070 labels.iter().map(|l| l.id.as_str().to_string()).collect();
1071
1072 for env in &envelopes {
1073 let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1074 for lid in &msg_label_ids {
1075 assert!(
1076 label_ids.contains(&lid.as_str().to_string()),
1077 "Junction entry for message {} points to nonexistent label {}",
1078 env.id,
1079 lid
1080 );
1081 }
1082 }
1083 }
1084
1085 #[tokio::test]
1086 async fn list_envelopes_by_each_label_returns_correct_count() {
1087 let store = Arc::new(Store::in_memory().await.unwrap());
1088 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1089 let engine = SyncEngine::new(store.clone(), search.clone());
1090
1091 let account_id = AccountId::new();
1092 store
1093 .insert_account(&test_account(account_id.clone()))
1094 .await
1095 .unwrap();
1096
1097 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1098 engine.sync_account(&provider).await.unwrap();
1099
1100 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1101 for label in &labels {
1102 if label.total_count > 0 {
1103 let envelopes = store
1104 .list_envelopes_by_label(&label.id, 200, 0)
1105 .await
1106 .unwrap();
1107 assert_eq!(
1108 envelopes.len(),
1109 label.total_count as usize,
1110 "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1111 label.name,
1112 label.provider_id,
1113 label.total_count,
1114 envelopes.len()
1115 );
1116 }
1117 }
1118 }
1119
1120 #[tokio::test]
1121 async fn search_index_consistent_with_store() {
1122 let store = Arc::new(Store::in_memory().await.unwrap());
1123 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1124 let engine = SyncEngine::new(store.clone(), search.clone());
1125
1126 let account_id = AccountId::new();
1127 store
1128 .insert_account(&test_account(account_id.clone()))
1129 .await
1130 .unwrap();
1131
1132 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1133 engine.sync_account(&provider).await.unwrap();
1134
1135 let envelopes = store
1136 .list_envelopes_by_account(&account_id, 200, 0)
1137 .await
1138 .unwrap();
1139
1140 let search_guard = search.lock().await;
1141 for env in &envelopes {
1142 let keyword = env
1144 .subject
1145 .split_whitespace()
1146 .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1147 .unwrap_or(&env.subject);
1148 let results = search_guard.search(keyword, 100).unwrap();
1149 assert!(
1150 results.iter().any(|r| r.message_id == env.id.as_str()),
1151 "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1152 env.id,
1153 env.subject,
1154 keyword
1155 );
1156 }
1157 }
1158
1159 #[tokio::test]
1160 async fn mutation_flags_persist_through_store() {
1161 let store = Arc::new(Store::in_memory().await.unwrap());
1162 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1163 let engine = SyncEngine::new(store.clone(), search.clone());
1164
1165 let account_id = AccountId::new();
1166 store
1167 .insert_account(&test_account(account_id.clone()))
1168 .await
1169 .unwrap();
1170
1171 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1172 engine.sync_account(&provider).await.unwrap();
1173
1174 let envelopes = store
1175 .list_envelopes_by_account(&account_id, 1, 0)
1176 .await
1177 .unwrap();
1178 let msg_id = &envelopes[0].id;
1179 let initial_flags = envelopes[0].flags;
1180
1181 store.set_starred(msg_id, true).await.unwrap();
1183 store.set_read(msg_id, true).await.unwrap();
1184
1185 let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1186 assert!(
1187 updated.flags.contains(MessageFlags::STARRED),
1188 "STARRED flag should be set"
1189 );
1190 assert!(
1191 updated.flags.contains(MessageFlags::READ),
1192 "READ flag should be set"
1193 );
1194
1195 store.set_starred(msg_id, false).await.unwrap();
1197 let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1198 assert!(
1199 !updated2.flags.contains(MessageFlags::STARRED),
1200 "STARRED flag should be cleared after set_starred(false)"
1201 );
1202 assert!(
1203 updated2.flags.contains(MessageFlags::READ),
1204 "READ flag should still be set after clearing STARRED"
1205 );
1206
1207 let _ = initial_flags; }
1211
1212 #[tokio::test]
1213 async fn junction_table_survives_message_update() {
1214 let store = Arc::new(Store::in_memory().await.unwrap());
1215 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1216 let engine = SyncEngine::new(store.clone(), search.clone());
1217
1218 let account_id = AccountId::new();
1219 store
1220 .insert_account(&test_account(account_id.clone()))
1221 .await
1222 .unwrap();
1223
1224 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1225 engine.sync_account(&provider).await.unwrap();
1226
1227 let envelopes = store
1229 .list_envelopes_by_account(&account_id, 1, 0)
1230 .await
1231 .unwrap();
1232 let msg_id = &envelopes[0].id;
1233
1234 let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1235 assert!(
1236 !junction_before.is_empty(),
1237 "Message should have label associations after sync"
1238 );
1239
1240 store.upsert_envelope(&envelopes[0]).await.unwrap();
1242 store
1244 .set_message_labels(msg_id, &junction_before)
1245 .await
1246 .unwrap();
1247
1248 let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1249 assert_eq!(
1250 junction_before.len(),
1251 junction_after.len(),
1252 "Junction rows should not double after re-sync (before={}, after={})",
1253 junction_before.len(),
1254 junction_after.len()
1255 );
1256 }
1257
1258 #[tokio::test]
1259 async fn find_labels_by_provider_ids_with_unknown_ids() {
1260 let store = Arc::new(Store::in_memory().await.unwrap());
1261 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1262 let engine = SyncEngine::new(store.clone(), search.clone());
1263
1264 let account_id = AccountId::new();
1265 store
1266 .insert_account(&test_account(account_id.clone()))
1267 .await
1268 .unwrap();
1269
1270 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1271 engine.sync_account(&provider).await.unwrap();
1272
1273 let result = store
1274 .find_labels_by_provider_ids(
1275 &account_id,
1276 &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1277 )
1278 .await
1279 .unwrap();
1280
1281 assert_eq!(
1282 result.len(),
1283 1,
1284 "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1285 );
1286 }
1287
1288 #[tokio::test]
1289 async fn body_available_after_sync() {
1290 let store = Arc::new(Store::in_memory().await.unwrap());
1291 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1292 let engine = SyncEngine::new(store.clone(), search.clone());
1293
1294 let account_id = AccountId::new();
1295 store
1296 .insert_account(&test_account(account_id.clone()))
1297 .await
1298 .unwrap();
1299
1300 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1301 engine.sync_account(&provider).await.unwrap();
1302
1303 let envelopes = store
1304 .list_envelopes_by_account(&account_id, 1, 0)
1305 .await
1306 .unwrap();
1307 let msg_id = &envelopes[0].id;
1308
1309 let body1 = engine.get_body(msg_id).await.unwrap();
1311 assert!(body1.text_plain.is_some(), "Body should have text_plain");
1312
1313 let body2 = engine.get_body(msg_id).await.unwrap();
1315
1316 assert_eq!(
1317 body1.text_plain, body2.text_plain,
1318 "Body text_plain should be consistent"
1319 );
1320 assert_eq!(
1321 body1.text_html, body2.text_html,
1322 "Body text_html should be consistent"
1323 );
1324 assert_eq!(
1325 body1.attachments.len(),
1326 body2.attachments.len(),
1327 "Body attachments count should be consistent"
1328 );
1329 }
1330
1331 #[tokio::test]
1332 async fn recalculate_label_counts_matches_junction() {
1333 let store = Arc::new(Store::in_memory().await.unwrap());
1334 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1335 let engine = SyncEngine::new(store.clone(), search.clone());
1336
1337 let account_id = AccountId::new();
1338 store
1339 .insert_account(&test_account(account_id.clone()))
1340 .await
1341 .unwrap();
1342
1343 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1344 engine.sync_account(&provider).await.unwrap();
1345
1346 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1347
1348 for label in &labels {
1349 let lid = label.id.as_str();
1350 let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1352 "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1353 )
1354 .bind(&lid)
1355 .fetch_one(store.reader())
1356 .await
1357 .unwrap();
1358
1359 assert_eq!(
1360 label.total_count as i64, junction_count,
1361 "Label '{}' total_count ({}) should match junction row count ({})",
1362 label.name, label.total_count, junction_count
1363 );
1364
1365 let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1367 "SELECT COUNT(*) FROM message_labels ml \
1368 JOIN messages m ON m.id = ml.message_id \
1369 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1370 )
1371 .bind(&lid)
1372 .fetch_one(store.reader())
1373 .await
1374 .unwrap();
1375
1376 assert_eq!(
1377 label.unread_count as i64, unread_count,
1378 "Label '{}' unread_count ({}) should match computed unread count ({})",
1379 label.name, label.unread_count, unread_count
1380 );
1381 }
1382 }
1383}