1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7 use super::*;
8 use mxr_core::id::*;
9 use mxr_core::types::*;
10 use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11 use mxr_search::SearchIndex;
12 use mxr_store::Store;
13 use std::sync::Arc;
14 use tokio::sync::Mutex;
15
16 struct ErrorProvider {
18 account_id: AccountId,
19 }
20
21 #[async_trait::async_trait]
22 impl MailSyncProvider for ErrorProvider {
23 fn name(&self) -> &str {
24 "error"
25 }
26 fn account_id(&self) -> &AccountId {
27 &self.account_id
28 }
29 fn capabilities(&self) -> SyncCapabilities {
30 SyncCapabilities {
31 labels: false,
32 server_search: false,
33 delta_sync: false,
34 push: false,
35 batch_operations: false,
36 native_thread_ids: true,
37 }
38 }
39 async fn authenticate(&mut self) -> Result<(), MxrError> {
40 Ok(())
41 }
42 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43 Ok(())
44 }
45 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46 Ok(vec![])
47 }
48 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49 Err(MxrError::Provider("simulated sync error".into()))
50 }
51 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52 Err(MxrError::Provider("simulated attachment error".into()))
53 }
54 async fn modify_labels(
55 &self,
56 _id: &str,
57 _add: &[String],
58 _rm: &[String],
59 ) -> Result<(), MxrError> {
60 Err(MxrError::Provider("simulated error".into()))
61 }
62 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63 Err(MxrError::Provider("simulated error".into()))
64 }
65 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66 Err(MxrError::Provider("simulated error".into()))
67 }
68 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69 Err(MxrError::Provider("simulated error".into()))
70 }
71 }
72
73 fn test_account(account_id: AccountId) -> mxr_core::Account {
74 mxr_core::Account {
75 id: account_id,
76 name: "Fake Account".to_string(),
77 email: "user@example.com".to_string(),
78 sync_backend: Some(BackendRef {
79 provider_kind: ProviderKind::Fake,
80 config_key: "fake".to_string(),
81 }),
82 send_backend: None,
83 enabled: true,
84 }
85 }
86
87 struct DeltaLabelProvider {
89 account_id: AccountId,
90 messages: Vec<SyncedMessage>,
91 labels: Vec<Label>,
92 label_changes: Vec<LabelChange>,
93 }
94
95 impl DeltaLabelProvider {
96 fn new(
97 account_id: AccountId,
98 messages: Vec<Envelope>,
99 labels: Vec<Label>,
100 label_changes: Vec<LabelChange>,
101 ) -> Self {
102 let messages = messages
103 .into_iter()
104 .map(|env| SyncedMessage {
105 body: make_empty_body(&env.id),
106 envelope: env,
107 })
108 .collect();
109 Self {
110 account_id,
111 messages,
112 labels,
113 label_changes,
114 }
115 }
116 }
117
118 struct ThreadingProvider {
119 account_id: AccountId,
120 messages: Vec<SyncedMessage>,
121 }
122
123 #[async_trait::async_trait]
124 impl MailSyncProvider for ThreadingProvider {
125 fn name(&self) -> &str {
126 "threading"
127 }
128
129 fn account_id(&self) -> &AccountId {
130 &self.account_id
131 }
132
133 fn capabilities(&self) -> SyncCapabilities {
134 SyncCapabilities {
135 labels: false,
136 server_search: false,
137 delta_sync: false,
138 push: false,
139 batch_operations: false,
140 native_thread_ids: false,
141 }
142 }
143
144 async fn authenticate(&mut self) -> Result<(), MxrError> {
145 Ok(())
146 }
147
148 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
149 Ok(())
150 }
151
152 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
153 Ok(vec![])
154 }
155
156 async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
157 Ok(SyncBatch {
158 upserted: self.messages.clone(),
159 deleted_provider_ids: vec![],
160 label_changes: vec![],
161 next_cursor: SyncCursor::Initial,
162 })
163 }
164
165 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
166 Err(MxrError::NotFound("no attachment".into()))
167 }
168
169 async fn modify_labels(
170 &self,
171 _id: &str,
172 _add: &[String],
173 _rm: &[String],
174 ) -> Result<(), MxrError> {
175 Ok(())
176 }
177
178 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
179 Ok(())
180 }
181
182 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
183 Ok(())
184 }
185
186 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
187 Ok(())
188 }
189 }
190
191 fn make_empty_body(message_id: &MessageId) -> MessageBody {
192 MessageBody {
193 message_id: message_id.clone(),
194 text_plain: Some("test body".to_string()),
195 text_html: None,
196 attachments: vec![],
197 fetched_at: chrono::Utc::now(),
198 metadata: MessageMetadata::default(),
199 }
200 }
201
202 #[async_trait::async_trait]
203 impl MailSyncProvider for DeltaLabelProvider {
204 fn name(&self) -> &str {
205 "delta-label"
206 }
207 fn account_id(&self) -> &AccountId {
208 &self.account_id
209 }
210 fn capabilities(&self) -> SyncCapabilities {
211 SyncCapabilities {
212 labels: true,
213 server_search: false,
214 delta_sync: true,
215 push: false,
216 batch_operations: false,
217 native_thread_ids: true,
218 }
219 }
220 async fn authenticate(&mut self) -> Result<(), MxrError> {
221 Ok(())
222 }
223 async fn refresh_auth(&mut self) -> Result<(), MxrError> {
224 Ok(())
225 }
226 async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227 Ok(self.labels.clone())
228 }
229 async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
230 match cursor {
231 SyncCursor::Initial => Ok(SyncBatch {
232 upserted: self.messages.clone(),
233 deleted_provider_ids: vec![],
234 label_changes: vec![],
235 next_cursor: SyncCursor::Gmail { history_id: 100 },
236 }),
237 _ => Ok(SyncBatch {
238 upserted: vec![],
239 deleted_provider_ids: vec![],
240 label_changes: self.label_changes.clone(),
241 next_cursor: SyncCursor::Gmail { history_id: 200 },
242 }),
243 }
244 }
245 async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
246 Err(MxrError::NotFound("no attachment".into()))
247 }
248 async fn modify_labels(
249 &self,
250 _id: &str,
251 _add: &[String],
252 _rm: &[String],
253 ) -> Result<(), MxrError> {
254 Ok(())
255 }
256 async fn trash(&self, _id: &str) -> Result<(), MxrError> {
257 Ok(())
258 }
259 async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
260 Ok(())
261 }
262 async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
263 Ok(())
264 }
265 }
266
267 fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
268 Label {
269 id: LabelId::new(),
270 account_id: account_id.clone(),
271 name: name.to_string(),
272 kind: LabelKind::System,
273 color: None,
274 provider_id: provider_id.to_string(),
275 unread_count: 0,
276 total_count: 0,
277 }
278 }
279
280 fn make_test_envelope(
281 account_id: &AccountId,
282 provider_id: &str,
283 label_provider_ids: Vec<String>,
284 ) -> Envelope {
285 Envelope {
286 id: MessageId::new(),
287 account_id: account_id.clone(),
288 provider_id: provider_id.to_string(),
289 thread_id: ThreadId::new(),
290 message_id_header: None,
291 in_reply_to: None,
292 references: vec![],
293 from: mxr_core::Address {
294 name: Some("Test".to_string()),
295 email: "test@example.com".to_string(),
296 },
297 to: vec![],
298 cc: vec![],
299 bcc: vec![],
300 subject: "Test message".to_string(),
301 date: chrono::Utc::now(),
302 flags: MessageFlags::empty(),
303 snippet: "Test snippet".to_string(),
304 has_attachments: false,
305 size_bytes: 1000,
306 unsubscribe: UnsubscribeMethod::None,
307 label_provider_ids,
308 }
309 }
310
311 #[tokio::test]
312 async fn delta_sync_applies_label_additions() {
313 let store = Arc::new(Store::in_memory().await.unwrap());
314 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
315 let engine = SyncEngine::new(store.clone(), search.clone());
316
317 let account_id = AccountId::new();
318 store
319 .insert_account(&test_account(account_id.clone()))
320 .await
321 .unwrap();
322
323 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
324 let starred = make_test_label(&account_id, "Starred", "STARRED");
325 let labels = vec![inbox.clone(), starred.clone()];
326
327 let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
328 let msg_provider_id = msg.provider_id.clone();
329
330 let provider = DeltaLabelProvider::new(
331 account_id.clone(),
332 vec![msg],
333 labels,
334 vec![LabelChange {
335 provider_message_id: msg_provider_id.clone(),
336 added_labels: vec!["STARRED".to_string()],
337 removed_labels: vec![],
338 }],
339 );
340
341 engine.sync_account(&provider).await.unwrap();
343
344 let msg_id = store
346 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
347 .await
348 .unwrap()
349 .unwrap();
350 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
351 assert!(labels_before.contains(&inbox.id));
352 assert!(!labels_before.contains(&starred.id));
353
354 engine.sync_account(&provider).await.unwrap();
356
357 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
358 assert!(labels_after.contains(&inbox.id), "INBOX should still be present");
359 assert!(labels_after.contains(&starred.id), "STARRED should be added by delta");
360 }
361
362 #[tokio::test]
363 async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
364 let store = Arc::new(Store::in_memory().await.unwrap());
365 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
366 let engine = SyncEngine::new(store.clone(), search);
367
368 let account_id = AccountId::new();
369 store
370 .insert_account(&test_account(account_id.clone()))
371 .await
372 .unwrap();
373
374 let first_id = MessageId::new();
375 let second_id = MessageId::new();
376 let first = SyncedMessage {
377 envelope: Envelope {
378 id: first_id.clone(),
379 account_id: account_id.clone(),
380 provider_id: "prov-thread-1".into(),
381 thread_id: ThreadId::new(),
382 message_id_header: Some("<root@example.com>".into()),
383 in_reply_to: None,
384 references: vec![],
385 from: mxr_core::Address {
386 name: Some("Alice".into()),
387 email: "alice@example.com".into(),
388 },
389 to: vec![],
390 cc: vec![],
391 bcc: vec![],
392 subject: "Topic".into(),
393 date: chrono::Utc::now() - chrono::Duration::minutes(5),
394 flags: MessageFlags::empty(),
395 snippet: "first".into(),
396 has_attachments: false,
397 size_bytes: 100,
398 unsubscribe: UnsubscribeMethod::None,
399 label_provider_ids: vec![],
400 },
401 body: make_empty_body(&first_id),
402 };
403 let second = SyncedMessage {
404 envelope: Envelope {
405 id: second_id.clone(),
406 account_id: account_id.clone(),
407 provider_id: "prov-thread-2".into(),
408 thread_id: ThreadId::new(),
409 message_id_header: Some("<reply@example.com>".into()),
410 in_reply_to: Some("<root@example.com>".into()),
411 references: vec!["<root@example.com>".into()],
412 from: mxr_core::Address {
413 name: Some("Bob".into()),
414 email: "bob@example.com".into(),
415 },
416 to: vec![],
417 cc: vec![],
418 bcc: vec![],
419 subject: "Re: Topic".into(),
420 date: chrono::Utc::now(),
421 flags: MessageFlags::empty(),
422 snippet: "second".into(),
423 has_attachments: false,
424 size_bytes: 100,
425 unsubscribe: UnsubscribeMethod::None,
426 label_provider_ids: vec![],
427 },
428 body: make_empty_body(&second_id),
429 };
430
431 let provider = ThreadingProvider {
432 account_id: account_id.clone(),
433 messages: vec![first, second],
434 };
435
436 engine.sync_account(&provider).await.unwrap();
437
438 let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
439 let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
440 assert_eq!(first_env.thread_id, second_env.thread_id);
441
442 let thread = store
443 .get_thread(&first_env.thread_id)
444 .await
445 .unwrap()
446 .unwrap();
447 assert_eq!(thread.message_count, 2);
448 }
449
450 #[tokio::test]
451 async fn delta_sync_applies_label_removals() {
452 let store = Arc::new(Store::in_memory().await.unwrap());
453 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
454 let engine = SyncEngine::new(store.clone(), search.clone());
455
456 let account_id = AccountId::new();
457 store
458 .insert_account(&test_account(account_id.clone()))
459 .await
460 .unwrap();
461
462 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
463 let starred = make_test_label(&account_id, "Starred", "STARRED");
464 let labels = vec![inbox.clone(), starred.clone()];
465
466 let msg = make_test_envelope(
467 &account_id,
468 "prov-msg-2",
469 vec!["INBOX".to_string(), "STARRED".to_string()],
470 );
471 let msg_provider_id = msg.provider_id.clone();
472
473 let provider = DeltaLabelProvider::new(
474 account_id.clone(),
475 vec![msg],
476 labels,
477 vec![LabelChange {
478 provider_message_id: msg_provider_id.clone(),
479 added_labels: vec![],
480 removed_labels: vec!["STARRED".to_string()],
481 }],
482 );
483
484 engine.sync_account(&provider).await.unwrap();
486
487 let msg_id = store
488 .get_message_id_by_provider_id(&account_id, &msg_provider_id)
489 .await
490 .unwrap()
491 .unwrap();
492 let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
493 assert!(
494 labels_before.contains(&starred.id),
495 "STARRED should be present after initial sync"
496 );
497
498 engine.sync_account(&provider).await.unwrap();
500
501 let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
502 assert!(labels_after.contains(&inbox.id), "INBOX should remain");
503 assert!(
504 !labels_after.contains(&starred.id),
505 "STARRED should be removed by delta"
506 );
507 }
508
509 #[tokio::test]
510 async fn delta_sync_handles_unknown_provider_message() {
511 let store = Arc::new(Store::in_memory().await.unwrap());
512 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
513 let engine = SyncEngine::new(store.clone(), search.clone());
514
515 let account_id = AccountId::new();
516 store
517 .insert_account(&test_account(account_id.clone()))
518 .await
519 .unwrap();
520
521 let inbox = make_test_label(&account_id, "Inbox", "INBOX");
522 let labels = vec![inbox.clone()];
523
524 let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
525
526 let provider = DeltaLabelProvider::new(
527 account_id.clone(),
528 vec![msg],
529 labels,
530 vec![LabelChange {
531 provider_message_id: "nonexistent-msg".to_string(),
533 added_labels: vec!["INBOX".to_string()],
534 removed_labels: vec![],
535 }],
536 );
537
538 engine.sync_account(&provider).await.unwrap();
540
541 let result = engine.sync_account(&provider).await;
543 assert!(
544 result.is_ok(),
545 "Delta sync should gracefully skip unknown messages"
546 );
547 }
548
549 #[tokio::test]
550 async fn sync_populates_store_and_search() {
551 let store = Arc::new(Store::in_memory().await.unwrap());
552 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
553 let engine = SyncEngine::new(store.clone(), search.clone());
554
555 let account_id = AccountId::new();
556 store
557 .insert_account(&test_account(account_id.clone()))
558 .await
559 .unwrap();
560
561 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
562 let count = engine.sync_account(&provider).await.unwrap();
563 assert_eq!(count, 55);
564
565 let envelopes = store
567 .list_envelopes_by_account(&account_id, 100, 0)
568 .await
569 .unwrap();
570 assert_eq!(envelopes.len(), 55);
571
572 let results = search.lock().await.search("deployment", 10).unwrap();
574 assert!(!results.is_empty());
575 }
576
577 #[tokio::test]
578 async fn bodies_stored_eagerly_during_sync() {
579 let store = Arc::new(Store::in_memory().await.unwrap());
580 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
581 let engine = SyncEngine::new(store.clone(), search.clone());
582
583 let account_id = AccountId::new();
584 store
585 .insert_account(&test_account(account_id.clone()))
586 .await
587 .unwrap();
588
589 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
590 engine.sync_account(&provider).await.unwrap();
591
592 let envelopes = store
594 .list_envelopes_by_account(&account_id, 1, 0)
595 .await
596 .unwrap();
597 let msg_id = &envelopes[0].id;
598
599 let body = engine.get_body(msg_id).await.unwrap();
601 assert!(body.text_plain.is_some());
602
603 let body2 = engine.get_body(msg_id).await.unwrap();
605 assert_eq!(body.text_plain, body2.text_plain);
606 }
607
608 #[tokio::test]
609 async fn snooze_wake() {
610 let store = Arc::new(Store::in_memory().await.unwrap());
611 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
612 let engine = SyncEngine::new(store.clone(), search.clone());
613
614 let account_id = AccountId::new();
615 store
616 .insert_account(&test_account(account_id.clone()))
617 .await
618 .unwrap();
619
620 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
621 engine.sync_account(&provider).await.unwrap();
622
623 let envelopes = store
625 .list_envelopes_by_account(&account_id, 1, 0)
626 .await
627 .unwrap();
628
629 let snoozed = Snoozed {
630 message_id: envelopes[0].id.clone(),
631 account_id: account_id.clone(),
632 snoozed_at: chrono::Utc::now(),
633 wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
634 original_labels: vec![],
635 };
636 store.insert_snooze(&snoozed).await.unwrap();
637
638 let woken = engine.check_snoozes().await.unwrap();
639 assert_eq!(woken.len(), 1);
640
641 let woken2 = engine.check_snoozes().await.unwrap();
643 assert_eq!(woken2.len(), 0);
644 }
645
646 #[tokio::test]
647 async fn cursor_persistence() {
648 let store = Arc::new(Store::in_memory().await.unwrap());
649 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
650 let engine = SyncEngine::new(store.clone(), search.clone());
651
652 let account_id = AccountId::new();
653 store
654 .insert_account(&test_account(account_id.clone()))
655 .await
656 .unwrap();
657
658 let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
660 assert!(cursor_before.is_none());
661
662 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
663 engine.sync_account(&provider).await.unwrap();
664
665 let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
667 assert!(cursor_after.is_some(), "Cursor should be set after sync");
668 let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
669 assert!(
670 cursor_json.contains("Gmail") && cursor_json.contains("1"),
671 "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
672 cursor_json
673 );
674 }
675
676 #[tokio::test]
677 async fn sync_error_does_not_crash() {
678 let store = Arc::new(Store::in_memory().await.unwrap());
679 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
680 let engine = SyncEngine::new(store.clone(), search.clone());
681
682 let account_id = AccountId::new();
683 store
684 .insert_account(&test_account(account_id.clone()))
685 .await
686 .unwrap();
687
688 let error_provider = ErrorProvider {
689 account_id: account_id.clone(),
690 };
691
692 let result = engine.sync_account(&error_provider).await;
694 assert!(
695 result.is_err(),
696 "sync_account should return Err for failing provider"
697 );
698 let err_msg = result.unwrap_err().to_string();
699 assert!(
700 err_msg.contains("simulated"),
701 "Error should contain provider message, got: {}",
702 err_msg
703 );
704 }
705
706 #[tokio::test]
707 async fn label_counts_after_sync() {
708 let store = Arc::new(Store::in_memory().await.unwrap());
709 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
710 let engine = SyncEngine::new(store.clone(), search.clone());
711
712 let account_id = AccountId::new();
713 store
714 .insert_account(&test_account(account_id.clone()))
715 .await
716 .unwrap();
717
718 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
719 engine.sync_account(&provider).await.unwrap();
720
721 let labels = store.list_labels_by_account(&account_id).await.unwrap();
722 assert!(!labels.is_empty(), "Should have labels after sync");
723
724 let has_counts = labels
725 .iter()
726 .any(|l| l.total_count > 0 || l.unread_count > 0);
727 assert!(
728 has_counts,
729 "At least one label should have non-zero counts after sync"
730 );
731 }
732
733 #[tokio::test]
734 async fn list_envelopes_by_label_returns_results() {
735 let store = Arc::new(Store::in_memory().await.unwrap());
736 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
737 let engine = SyncEngine::new(store.clone(), search.clone());
738
739 let account_id = AccountId::new();
740 store
741 .insert_account(&test_account(account_id.clone()))
742 .await
743 .unwrap();
744
745 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
746 engine.sync_account(&provider).await.unwrap();
747
748 let labels = store.list_labels_by_account(&account_id).await.unwrap();
749
750 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
752 assert!(
753 inbox_label.total_count > 0,
754 "Inbox should have messages after sync"
755 );
756
757 let envelopes = store
759 .list_envelopes_by_label(&inbox_label.id, 100, 0)
760 .await
761 .unwrap();
762
763 let all_envelopes = store
765 .list_envelopes_by_account(&account_id, 200, 0)
766 .await
767 .unwrap();
768
769 assert!(
770 !envelopes.is_empty(),
771 "list_envelopes_by_label should return messages for Inbox label (got 0). \
772 label_id={}, total_count={}, all_count={}",
773 inbox_label.id,
774 inbox_label.total_count,
775 all_envelopes.len()
776 );
777
778 assert!(
780 envelopes.len() <= all_envelopes.len(),
781 "Inbox-by-label ({}) should be <= all ({})",
782 envelopes.len(),
783 all_envelopes.len()
784 );
785 }
786
787 #[tokio::test]
788 async fn list_envelopes_by_sent_label_may_be_empty() {
789 let store = Arc::new(Store::in_memory().await.unwrap());
790 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
791 let engine = SyncEngine::new(store.clone(), search.clone());
792
793 let account_id = AccountId::new();
794 store
795 .insert_account(&test_account(account_id.clone()))
796 .await
797 .unwrap();
798
799 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
800 engine.sync_account(&provider).await.unwrap();
801
802 let labels = store.list_labels_by_account(&account_id).await.unwrap();
803
804 let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
806
807 let envelopes = store
808 .list_envelopes_by_label(&sent_label.id, 100, 0)
809 .await
810 .unwrap();
811
812 assert_eq!(
814 envelopes.len(),
815 0,
816 "Sent should have 0 messages in fake provider"
817 );
818
819 let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
821 let inbox_envelopes = store
822 .list_envelopes_by_label(&inbox_label.id, 100, 0)
823 .await
824 .unwrap();
825 assert!(
826 !inbox_envelopes.is_empty(),
827 "Inbox should still have messages after querying Sent"
828 );
829
830 let all_envelopes = store
832 .list_envelopes_by_account(&account_id, 100, 0)
833 .await
834 .unwrap();
835 assert!(
836 !all_envelopes.is_empty(),
837 "All envelopes should still be retrievable"
838 );
839 }
840
841 #[tokio::test]
842 async fn progressive_loading_chunks() {
843 let store = Arc::new(Store::in_memory().await.unwrap());
844 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
845 let engine = SyncEngine::new(store.clone(), search.clone());
846
847 let account_id = AccountId::new();
848 store
849 .insert_account(&test_account(account_id.clone()))
850 .await
851 .unwrap();
852
853 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
854 let count = engine.sync_account(&provider).await.unwrap();
855 assert_eq!(count, 55, "Sync should report 55 messages processed");
856
857 let envelopes = store
859 .list_envelopes_by_account(&account_id, 200, 0)
860 .await
861 .unwrap();
862 assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
863
864 let results = search.lock().await.search("deployment", 10).unwrap();
866 assert!(
867 !results.is_empty(),
868 "Search index should have 'deployment' results"
869 );
870 }
871
872 #[tokio::test]
873 async fn delta_sync_no_duplicate_labels() {
874 let store = Arc::new(Store::in_memory().await.unwrap());
875 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
876 let engine = SyncEngine::new(store.clone(), search.clone());
877
878 let account_id = AccountId::new();
879 store
880 .insert_account(&test_account(account_id.clone()))
881 .await
882 .unwrap();
883
884 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
885
886 engine.sync_account(&provider).await.unwrap();
888
889 let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
890 let label_count_first = labels_after_first.len();
891
892 let delta_count = engine.sync_account(&provider).await.unwrap();
894 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
895
896 let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
897
898 assert_eq!(
900 label_count_first,
901 labels_after_second.len(),
902 "Label count should not change after delta sync"
903 );
904
905 for label in &labels_after_first {
907 let still_exists = labels_after_second
908 .iter()
909 .any(|l| l.provider_id == label.provider_id && l.name == label.name);
910 assert!(
911 still_exists,
912 "Label '{}' (provider_id='{}') should survive delta sync",
913 label.name, label.provider_id
914 );
915 }
916
917 let envelopes = store
920 .list_envelopes_by_account(&account_id, 200, 0)
921 .await
922 .unwrap();
923 assert_eq!(
924 envelopes.len(),
925 55,
926 "All 55 messages should survive delta sync"
927 );
928 }
929
930 #[tokio::test]
931 async fn delta_sync_preserves_junction_table() {
932 let store = Arc::new(Store::in_memory().await.unwrap());
933 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
934 let engine = SyncEngine::new(store.clone(), search.clone());
935
936 let account_id = AccountId::new();
937 store
938 .insert_account(&test_account(account_id.clone()))
939 .await
940 .unwrap();
941
942 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
943
944 engine.sync_account(&provider).await.unwrap();
946
947 let junction_before = store.count_message_labels().await.unwrap();
948 assert!(
949 junction_before > 0,
950 "Junction table should be populated after initial sync"
951 );
952
953 let delta_count = engine.sync_account(&provider).await.unwrap();
955 assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
956
957 let junction_after = store.count_message_labels().await.unwrap();
958 assert_eq!(
959 junction_before, junction_after,
960 "Junction table should survive delta sync (before={}, after={})",
961 junction_before, junction_after
962 );
963
964 let labels = store.list_labels_by_account(&account_id).await.unwrap();
966 let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
967 let envelopes = store
968 .list_envelopes_by_label(&inbox.id, 100, 0)
969 .await
970 .unwrap();
971 assert!(
972 !envelopes.is_empty(),
973 "Inbox should still return messages after delta sync"
974 );
975 }
976
977 #[tokio::test]
978 async fn backfill_triggers_when_junction_empty() {
979 let store = Arc::new(Store::in_memory().await.unwrap());
980 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
981 let engine = SyncEngine::new(store.clone(), search.clone());
982
983 let account_id = AccountId::new();
984 store
985 .insert_account(&test_account(account_id.clone()))
986 .await
987 .unwrap();
988
989 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
990
991 engine.sync_account(&provider).await.unwrap();
993
994 let junction_before = store.count_message_labels().await.unwrap();
995 assert!(junction_before > 0);
996
997 sqlx::query("DELETE FROM message_labels")
999 .execute(store.writer())
1000 .await
1001 .unwrap();
1002
1003 let junction_wiped = store.count_message_labels().await.unwrap();
1004 assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1005
1006 engine.sync_account(&provider).await.unwrap();
1008
1009 let junction_after = store.count_message_labels().await.unwrap();
1010 assert!(
1011 junction_after > 0,
1012 "Junction table should be repopulated after backfill (got {})",
1013 junction_after
1014 );
1015 }
1016
1017 #[tokio::test]
1018 async fn sync_label_resolution_matches_gmail_ids() {
1019 let store = Arc::new(Store::in_memory().await.unwrap());
1020 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1021 let engine = SyncEngine::new(store.clone(), search.clone());
1022
1023 let account_id = AccountId::new();
1024 store
1025 .insert_account(&test_account(account_id.clone()))
1026 .await
1027 .unwrap();
1028
1029 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1030 engine.sync_account(&provider).await.unwrap();
1031
1032 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1033
1034 let expected_mappings = [
1037 ("Inbox", "INBOX"),
1038 ("Sent", "SENT"),
1039 ("Trash", "TRASH"),
1040 ("Spam", "SPAM"),
1041 ("Starred", "STARRED"),
1042 ("Work", "work"),
1043 ("Personal", "personal"),
1044 ("Newsletters", "newsletters"),
1045 ];
1046 for (name, expected_pid) in &expected_mappings {
1047 let label = labels.iter().find(|l| l.name == *name);
1048 assert!(label.is_some(), "Label '{}' should exist after sync", name);
1049 assert_eq!(
1050 label.unwrap().provider_id,
1051 *expected_pid,
1052 "Label '{}' should have provider_id '{}'",
1053 name,
1054 expected_pid
1055 );
1056 }
1057
1058 let envelopes = store
1060 .list_envelopes_by_account(&account_id, 200, 0)
1061 .await
1062 .unwrap();
1063 let label_ids: std::collections::HashSet<String> =
1064 labels.iter().map(|l| l.id.as_str().to_string()).collect();
1065
1066 for env in &envelopes {
1067 let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1068 for lid in &msg_label_ids {
1069 assert!(
1070 label_ids.contains(&lid.as_str().to_string()),
1071 "Junction entry for message {} points to nonexistent label {}",
1072 env.id,
1073 lid
1074 );
1075 }
1076 }
1077 }
1078
1079 #[tokio::test]
1080 async fn list_envelopes_by_each_label_returns_correct_count() {
1081 let store = Arc::new(Store::in_memory().await.unwrap());
1082 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1083 let engine = SyncEngine::new(store.clone(), search.clone());
1084
1085 let account_id = AccountId::new();
1086 store
1087 .insert_account(&test_account(account_id.clone()))
1088 .await
1089 .unwrap();
1090
1091 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1092 engine.sync_account(&provider).await.unwrap();
1093
1094 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1095 for label in &labels {
1096 if label.total_count > 0 {
1097 let envelopes = store
1098 .list_envelopes_by_label(&label.id, 200, 0)
1099 .await
1100 .unwrap();
1101 assert_eq!(
1102 envelopes.len(),
1103 label.total_count as usize,
1104 "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1105 label.name,
1106 label.provider_id,
1107 label.total_count,
1108 envelopes.len()
1109 );
1110 }
1111 }
1112 }
1113
1114 #[tokio::test]
1115 async fn search_index_consistent_with_store() {
1116 let store = Arc::new(Store::in_memory().await.unwrap());
1117 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1118 let engine = SyncEngine::new(store.clone(), search.clone());
1119
1120 let account_id = AccountId::new();
1121 store
1122 .insert_account(&test_account(account_id.clone()))
1123 .await
1124 .unwrap();
1125
1126 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1127 engine.sync_account(&provider).await.unwrap();
1128
1129 let envelopes = store
1130 .list_envelopes_by_account(&account_id, 200, 0)
1131 .await
1132 .unwrap();
1133
1134 let search_guard = search.lock().await;
1135 for env in &envelopes {
1136 let keyword = env
1138 .subject
1139 .split_whitespace()
1140 .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1141 .unwrap_or(&env.subject);
1142 let results = search_guard.search(keyword, 100).unwrap();
1143 assert!(
1144 results.iter().any(|r| r.message_id == env.id.as_str()),
1145 "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1146 env.id,
1147 env.subject,
1148 keyword
1149 );
1150 }
1151 }
1152
1153 #[tokio::test]
1154 async fn mutation_flags_persist_through_store() {
1155 let store = Arc::new(Store::in_memory().await.unwrap());
1156 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1157 let engine = SyncEngine::new(store.clone(), search.clone());
1158
1159 let account_id = AccountId::new();
1160 store
1161 .insert_account(&test_account(account_id.clone()))
1162 .await
1163 .unwrap();
1164
1165 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1166 engine.sync_account(&provider).await.unwrap();
1167
1168 let envelopes = store
1169 .list_envelopes_by_account(&account_id, 1, 0)
1170 .await
1171 .unwrap();
1172 let msg_id = &envelopes[0].id;
1173 let initial_flags = envelopes[0].flags;
1174
1175 store.set_starred(msg_id, true).await.unwrap();
1177 store.set_read(msg_id, true).await.unwrap();
1178
1179 let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1180 assert!(
1181 updated.flags.contains(MessageFlags::STARRED),
1182 "STARRED flag should be set"
1183 );
1184 assert!(
1185 updated.flags.contains(MessageFlags::READ),
1186 "READ flag should be set"
1187 );
1188
1189 store.set_starred(msg_id, false).await.unwrap();
1191 let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1192 assert!(
1193 !updated2.flags.contains(MessageFlags::STARRED),
1194 "STARRED flag should be cleared after set_starred(false)"
1195 );
1196 assert!(
1197 updated2.flags.contains(MessageFlags::READ),
1198 "READ flag should still be set after clearing STARRED"
1199 );
1200
1201 let _ = initial_flags; }
1205
1206 #[tokio::test]
1207 async fn junction_table_survives_message_update() {
1208 let store = Arc::new(Store::in_memory().await.unwrap());
1209 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1210 let engine = SyncEngine::new(store.clone(), search.clone());
1211
1212 let account_id = AccountId::new();
1213 store
1214 .insert_account(&test_account(account_id.clone()))
1215 .await
1216 .unwrap();
1217
1218 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1219 engine.sync_account(&provider).await.unwrap();
1220
1221 let envelopes = store
1223 .list_envelopes_by_account(&account_id, 1, 0)
1224 .await
1225 .unwrap();
1226 let msg_id = &envelopes[0].id;
1227
1228 let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1229 assert!(
1230 !junction_before.is_empty(),
1231 "Message should have label associations after sync"
1232 );
1233
1234 store.upsert_envelope(&envelopes[0]).await.unwrap();
1236 store
1238 .set_message_labels(msg_id, &junction_before)
1239 .await
1240 .unwrap();
1241
1242 let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1243 assert_eq!(
1244 junction_before.len(),
1245 junction_after.len(),
1246 "Junction rows should not double after re-sync (before={}, after={})",
1247 junction_before.len(),
1248 junction_after.len()
1249 );
1250 }
1251
1252 #[tokio::test]
1253 async fn find_labels_by_provider_ids_with_unknown_ids() {
1254 let store = Arc::new(Store::in_memory().await.unwrap());
1255 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1256 let engine = SyncEngine::new(store.clone(), search.clone());
1257
1258 let account_id = AccountId::new();
1259 store
1260 .insert_account(&test_account(account_id.clone()))
1261 .await
1262 .unwrap();
1263
1264 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1265 engine.sync_account(&provider).await.unwrap();
1266
1267 let result = store
1268 .find_labels_by_provider_ids(
1269 &account_id,
1270 &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1271 )
1272 .await
1273 .unwrap();
1274
1275 assert_eq!(
1276 result.len(),
1277 1,
1278 "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1279 );
1280 }
1281
1282 #[tokio::test]
1283 async fn body_available_after_sync() {
1284 let store = Arc::new(Store::in_memory().await.unwrap());
1285 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1286 let engine = SyncEngine::new(store.clone(), search.clone());
1287
1288 let account_id = AccountId::new();
1289 store
1290 .insert_account(&test_account(account_id.clone()))
1291 .await
1292 .unwrap();
1293
1294 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1295 engine.sync_account(&provider).await.unwrap();
1296
1297 let envelopes = store
1298 .list_envelopes_by_account(&account_id, 1, 0)
1299 .await
1300 .unwrap();
1301 let msg_id = &envelopes[0].id;
1302
1303 let body1 = engine.get_body(msg_id).await.unwrap();
1305 assert!(body1.text_plain.is_some(), "Body should have text_plain");
1306
1307 let body2 = engine.get_body(msg_id).await.unwrap();
1309
1310 assert_eq!(
1311 body1.text_plain, body2.text_plain,
1312 "Body text_plain should be consistent"
1313 );
1314 assert_eq!(
1315 body1.text_html, body2.text_html,
1316 "Body text_html should be consistent"
1317 );
1318 assert_eq!(
1319 body1.attachments.len(),
1320 body2.attachments.len(),
1321 "Body attachments count should be consistent"
1322 );
1323 }
1324
1325 #[tokio::test]
1326 async fn recalculate_label_counts_matches_junction() {
1327 let store = Arc::new(Store::in_memory().await.unwrap());
1328 let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1329 let engine = SyncEngine::new(store.clone(), search.clone());
1330
1331 let account_id = AccountId::new();
1332 store
1333 .insert_account(&test_account(account_id.clone()))
1334 .await
1335 .unwrap();
1336
1337 let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1338 engine.sync_account(&provider).await.unwrap();
1339
1340 let labels = store.list_labels_by_account(&account_id).await.unwrap();
1341
1342 for label in &labels {
1343 let lid = label.id.as_str();
1344 let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1346 "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1347 )
1348 .bind(&lid)
1349 .fetch_one(store.reader())
1350 .await
1351 .unwrap();
1352
1353 assert_eq!(
1354 label.total_count as i64, junction_count,
1355 "Label '{}' total_count ({}) should match junction row count ({})",
1356 label.name, label.total_count, junction_count
1357 );
1358
1359 let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1361 "SELECT COUNT(*) FROM message_labels ml \
1362 JOIN messages m ON m.id = ml.message_id \
1363 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1364 )
1365 .bind(&lid)
1366 .fetch_one(store.reader())
1367 .await
1368 .unwrap();
1369
1370 assert_eq!(
1371 label.unread_count as i64, unread_count,
1372 "Label '{}' unread_count ({}) should match computed unread count ({})",
1373 label.name, label.unread_count, unread_count
1374 );
1375 }
1376 }
1377}