Skip to main content

mxr_sync/
lib.rs

1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7    use super::*;
8    use mxr_core::id::*;
9    use mxr_core::types::*;
10    use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11    use mxr_search::SearchIndex;
12    use mxr_store::Store;
13    use std::sync::Arc;
14    use tokio::sync::Mutex;
15
16    /// A provider that always returns errors from sync_messages, for testing error handling.
17    struct ErrorProvider {
18        account_id: AccountId,
19    }
20
21    #[async_trait::async_trait]
22    impl MailSyncProvider for ErrorProvider {
23        fn name(&self) -> &str {
24            "error"
25        }
26        fn account_id(&self) -> &AccountId {
27            &self.account_id
28        }
29        fn capabilities(&self) -> SyncCapabilities {
30            SyncCapabilities {
31                labels: false,
32                server_search: false,
33                delta_sync: false,
34                push: false,
35                batch_operations: false,
36                native_thread_ids: true,
37            }
38        }
39        async fn authenticate(&mut self) -> Result<(), MxrError> {
40            Ok(())
41        }
42        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43            Ok(())
44        }
45        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46            Ok(vec![])
47        }
48        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49            Err(MxrError::Provider("simulated sync error".into()))
50        }
51        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52            Err(MxrError::Provider("simulated attachment error".into()))
53        }
54        async fn modify_labels(
55            &self,
56            _id: &str,
57            _add: &[String],
58            _rm: &[String],
59        ) -> Result<(), MxrError> {
60            Err(MxrError::Provider("simulated error".into()))
61        }
62        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63            Err(MxrError::Provider("simulated error".into()))
64        }
65        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66            Err(MxrError::Provider("simulated error".into()))
67        }
68        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69            Err(MxrError::Provider("simulated error".into()))
70        }
71    }
72
73    fn test_account(account_id: AccountId) -> mxr_core::Account {
74        mxr_core::Account {
75            id: account_id,
76            name: "Fake Account".to_string(),
77            email: "user@example.com".to_string(),
78            sync_backend: Some(BackendRef {
79                provider_kind: ProviderKind::Fake,
80                config_key: "fake".to_string(),
81            }),
82            send_backend: None,
83            enabled: true,
84        }
85    }
86
87    /// Provider that returns label_changes on delta sync for testing the label change code path.
88    struct DeltaLabelProvider {
89        account_id: AccountId,
90        messages: Vec<SyncedMessage>,
91        labels: Vec<Label>,
92        label_changes: Vec<LabelChange>,
93    }
94
95    impl DeltaLabelProvider {
96        fn new(
97            account_id: AccountId,
98            messages: Vec<Envelope>,
99            labels: Vec<Label>,
100            label_changes: Vec<LabelChange>,
101        ) -> Self {
102            let messages = messages
103                .into_iter()
104                .map(|env| SyncedMessage {
105                    body: make_empty_body(&env.id),
106                    envelope: env,
107                })
108                .collect();
109            Self {
110                account_id,
111                messages,
112                labels,
113                label_changes,
114            }
115        }
116    }
117
118    struct ThreadingProvider {
119        account_id: AccountId,
120        messages: Vec<SyncedMessage>,
121    }
122
123    #[async_trait::async_trait]
124    impl MailSyncProvider for ThreadingProvider {
125        fn name(&self) -> &str {
126            "threading"
127        }
128
129        fn account_id(&self) -> &AccountId {
130            &self.account_id
131        }
132
133        fn capabilities(&self) -> SyncCapabilities {
134            SyncCapabilities {
135                labels: false,
136                server_search: false,
137                delta_sync: false,
138                push: false,
139                batch_operations: false,
140                native_thread_ids: false,
141            }
142        }
143
144        async fn authenticate(&mut self) -> Result<(), MxrError> {
145            Ok(())
146        }
147
148        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
149            Ok(())
150        }
151
152        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
153            Ok(vec![])
154        }
155
156        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
157            Ok(SyncBatch {
158                upserted: self.messages.clone(),
159                deleted_provider_ids: vec![],
160                label_changes: vec![],
161                next_cursor: SyncCursor::Initial,
162            })
163        }
164
165        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
166            Err(MxrError::NotFound("no attachment".into()))
167        }
168
169        async fn modify_labels(
170            &self,
171            _id: &str,
172            _add: &[String],
173            _rm: &[String],
174        ) -> Result<(), MxrError> {
175            Ok(())
176        }
177
178        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
179            Ok(())
180        }
181
182        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
183            Ok(())
184        }
185
186        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
187            Ok(())
188        }
189    }
190
191    fn make_empty_body(message_id: &MessageId) -> MessageBody {
192        MessageBody {
193            message_id: message_id.clone(),
194            text_plain: Some("test body".to_string()),
195            text_html: None,
196            attachments: vec![],
197            fetched_at: chrono::Utc::now(),
198            metadata: MessageMetadata::default(),
199        }
200    }
201
202    #[async_trait::async_trait]
203    impl MailSyncProvider for DeltaLabelProvider {
204        fn name(&self) -> &str {
205            "delta-label"
206        }
207        fn account_id(&self) -> &AccountId {
208            &self.account_id
209        }
210        fn capabilities(&self) -> SyncCapabilities {
211            SyncCapabilities {
212                labels: true,
213                server_search: false,
214                delta_sync: true,
215                push: false,
216                batch_operations: false,
217                native_thread_ids: true,
218            }
219        }
220        async fn authenticate(&mut self) -> Result<(), MxrError> {
221            Ok(())
222        }
223        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
224            Ok(())
225        }
226        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227            Ok(self.labels.clone())
228        }
229        async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
230            match cursor {
231                SyncCursor::Initial => Ok(SyncBatch {
232                    upserted: self.messages.clone(),
233                    deleted_provider_ids: vec![],
234                    label_changes: vec![],
235                    next_cursor: SyncCursor::Gmail { history_id: 100 },
236                }),
237                _ => Ok(SyncBatch {
238                    upserted: vec![],
239                    deleted_provider_ids: vec![],
240                    label_changes: self.label_changes.clone(),
241                    next_cursor: SyncCursor::Gmail { history_id: 200 },
242                }),
243            }
244        }
245        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
246            Err(MxrError::NotFound("no attachment".into()))
247        }
248        async fn modify_labels(
249            &self,
250            _id: &str,
251            _add: &[String],
252            _rm: &[String],
253        ) -> Result<(), MxrError> {
254            Ok(())
255        }
256        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
257            Ok(())
258        }
259        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
260            Ok(())
261        }
262        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
263            Ok(())
264        }
265    }
266
267    fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
268        Label {
269            id: LabelId::new(),
270            account_id: account_id.clone(),
271            name: name.to_string(),
272            kind: LabelKind::System,
273            color: None,
274            provider_id: provider_id.to_string(),
275            unread_count: 0,
276            total_count: 0,
277        }
278    }
279
280    fn make_test_envelope(
281        account_id: &AccountId,
282        provider_id: &str,
283        label_provider_ids: Vec<String>,
284    ) -> Envelope {
285        Envelope {
286            id: MessageId::new(),
287            account_id: account_id.clone(),
288            provider_id: provider_id.to_string(),
289            thread_id: ThreadId::new(),
290            message_id_header: None,
291            in_reply_to: None,
292            references: vec![],
293            from: mxr_core::Address {
294                name: Some("Test".to_string()),
295                email: "test@example.com".to_string(),
296            },
297            to: vec![],
298            cc: vec![],
299            bcc: vec![],
300            subject: "Test message".to_string(),
301            date: chrono::Utc::now(),
302            flags: MessageFlags::empty(),
303            snippet: "Test snippet".to_string(),
304            has_attachments: false,
305            size_bytes: 1000,
306            unsubscribe: UnsubscribeMethod::None,
307            label_provider_ids,
308        }
309    }
310
311    #[tokio::test]
312    async fn delta_sync_applies_label_additions() {
313        let store = Arc::new(Store::in_memory().await.unwrap());
314        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
315        let engine = SyncEngine::new(store.clone(), search.clone());
316
317        let account_id = AccountId::new();
318        store
319            .insert_account(&test_account(account_id.clone()))
320            .await
321            .unwrap();
322
323        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
324        let starred = make_test_label(&account_id, "Starred", "STARRED");
325        let labels = vec![inbox.clone(), starred.clone()];
326
327        let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
328        let msg_provider_id = msg.provider_id.clone();
329
330        let provider = DeltaLabelProvider::new(
331            account_id.clone(),
332            vec![msg],
333            labels,
334            vec![LabelChange {
335                provider_message_id: msg_provider_id.clone(),
336                added_labels: vec!["STARRED".to_string()],
337                removed_labels: vec![],
338            }],
339        );
340
341        // Initial sync
342        engine.sync_account(&provider).await.unwrap();
343
344        // Verify msg has INBOX label
345        let msg_id = store
346            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
347            .await
348            .unwrap()
349            .unwrap();
350        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
351        assert!(labels_before.contains(&inbox.id));
352        assert!(!labels_before.contains(&starred.id));
353
354        // Delta sync — adds STARRED label
355        engine.sync_account(&provider).await.unwrap();
356
357        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
358        assert!(
359            labels_after.contains(&inbox.id),
360            "INBOX should still be present"
361        );
362        assert!(
363            labels_after.contains(&starred.id),
364            "STARRED should be added by delta"
365        );
366    }
367
368    #[tokio::test]
369    async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
370        let store = Arc::new(Store::in_memory().await.unwrap());
371        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
372        let engine = SyncEngine::new(store.clone(), search);
373
374        let account_id = AccountId::new();
375        store
376            .insert_account(&test_account(account_id.clone()))
377            .await
378            .unwrap();
379
380        let first_id = MessageId::new();
381        let second_id = MessageId::new();
382        let first = SyncedMessage {
383            envelope: Envelope {
384                id: first_id.clone(),
385                account_id: account_id.clone(),
386                provider_id: "prov-thread-1".into(),
387                thread_id: ThreadId::new(),
388                message_id_header: Some("<root@example.com>".into()),
389                in_reply_to: None,
390                references: vec![],
391                from: mxr_core::Address {
392                    name: Some("Alice".into()),
393                    email: "alice@example.com".into(),
394                },
395                to: vec![],
396                cc: vec![],
397                bcc: vec![],
398                subject: "Topic".into(),
399                date: chrono::Utc::now() - chrono::Duration::minutes(5),
400                flags: MessageFlags::empty(),
401                snippet: "first".into(),
402                has_attachments: false,
403                size_bytes: 100,
404                unsubscribe: UnsubscribeMethod::None,
405                label_provider_ids: vec![],
406            },
407            body: make_empty_body(&first_id),
408        };
409        let second = SyncedMessage {
410            envelope: Envelope {
411                id: second_id.clone(),
412                account_id: account_id.clone(),
413                provider_id: "prov-thread-2".into(),
414                thread_id: ThreadId::new(),
415                message_id_header: Some("<reply@example.com>".into()),
416                in_reply_to: Some("<root@example.com>".into()),
417                references: vec!["<root@example.com>".into()],
418                from: mxr_core::Address {
419                    name: Some("Bob".into()),
420                    email: "bob@example.com".into(),
421                },
422                to: vec![],
423                cc: vec![],
424                bcc: vec![],
425                subject: "Re: Topic".into(),
426                date: chrono::Utc::now(),
427                flags: MessageFlags::empty(),
428                snippet: "second".into(),
429                has_attachments: false,
430                size_bytes: 100,
431                unsubscribe: UnsubscribeMethod::None,
432                label_provider_ids: vec![],
433            },
434            body: make_empty_body(&second_id),
435        };
436
437        let provider = ThreadingProvider {
438            account_id: account_id.clone(),
439            messages: vec![first, second],
440        };
441
442        engine.sync_account(&provider).await.unwrap();
443
444        let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
445        let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
446        assert_eq!(first_env.thread_id, second_env.thread_id);
447
448        let thread = store
449            .get_thread(&first_env.thread_id)
450            .await
451            .unwrap()
452            .unwrap();
453        assert_eq!(thread.message_count, 2);
454    }
455
456    #[tokio::test]
457    async fn delta_sync_applies_label_removals() {
458        let store = Arc::new(Store::in_memory().await.unwrap());
459        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
460        let engine = SyncEngine::new(store.clone(), search.clone());
461
462        let account_id = AccountId::new();
463        store
464            .insert_account(&test_account(account_id.clone()))
465            .await
466            .unwrap();
467
468        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
469        let starred = make_test_label(&account_id, "Starred", "STARRED");
470        let labels = vec![inbox.clone(), starred.clone()];
471
472        let msg = make_test_envelope(
473            &account_id,
474            "prov-msg-2",
475            vec!["INBOX".to_string(), "STARRED".to_string()],
476        );
477        let msg_provider_id = msg.provider_id.clone();
478
479        let provider = DeltaLabelProvider::new(
480            account_id.clone(),
481            vec![msg],
482            labels,
483            vec![LabelChange {
484                provider_message_id: msg_provider_id.clone(),
485                added_labels: vec![],
486                removed_labels: vec!["STARRED".to_string()],
487            }],
488        );
489
490        // Initial sync
491        engine.sync_account(&provider).await.unwrap();
492
493        let msg_id = store
494            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
495            .await
496            .unwrap()
497            .unwrap();
498        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
499        assert!(
500            labels_before.contains(&starred.id),
501            "STARRED should be present after initial sync"
502        );
503
504        // Delta sync — removes STARRED
505        engine.sync_account(&provider).await.unwrap();
506
507        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
508        assert!(labels_after.contains(&inbox.id), "INBOX should remain");
509        assert!(
510            !labels_after.contains(&starred.id),
511            "STARRED should be removed by delta"
512        );
513    }
514
515    #[tokio::test]
516    async fn delta_sync_handles_unknown_provider_message() {
517        let store = Arc::new(Store::in_memory().await.unwrap());
518        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
519        let engine = SyncEngine::new(store.clone(), search.clone());
520
521        let account_id = AccountId::new();
522        store
523            .insert_account(&test_account(account_id.clone()))
524            .await
525            .unwrap();
526
527        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
528        let labels = vec![inbox.clone()];
529
530        let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
531
532        let provider = DeltaLabelProvider::new(
533            account_id.clone(),
534            vec![msg],
535            labels,
536            vec![LabelChange {
537                // This provider_message_id doesn't exist in our store
538                provider_message_id: "nonexistent-msg".to_string(),
539                added_labels: vec!["INBOX".to_string()],
540                removed_labels: vec![],
541            }],
542        );
543
544        // Initial sync
545        engine.sync_account(&provider).await.unwrap();
546
547        // Delta sync — should not crash on unknown message
548        let result = engine.sync_account(&provider).await;
549        assert!(
550            result.is_ok(),
551            "Delta sync should gracefully skip unknown messages"
552        );
553    }
554
555    #[tokio::test]
556    async fn sync_populates_store_and_search() {
557        let store = Arc::new(Store::in_memory().await.unwrap());
558        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
559        let engine = SyncEngine::new(store.clone(), search.clone());
560
561        let account_id = AccountId::new();
562        store
563            .insert_account(&test_account(account_id.clone()))
564            .await
565            .unwrap();
566
567        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
568        let count = engine.sync_account(&provider).await.unwrap();
569        assert_eq!(count, 55);
570
571        // Verify store
572        let envelopes = store
573            .list_envelopes_by_account(&account_id, 100, 0)
574            .await
575            .unwrap();
576        assert_eq!(envelopes.len(), 55);
577
578        // Verify search
579        let results = search.lock().await.search("deployment", 10).unwrap();
580        assert!(!results.is_empty());
581    }
582
583    #[tokio::test]
584    async fn bodies_stored_eagerly_during_sync() {
585        let store = Arc::new(Store::in_memory().await.unwrap());
586        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
587        let engine = SyncEngine::new(store.clone(), search.clone());
588
589        let account_id = AccountId::new();
590        store
591            .insert_account(&test_account(account_id.clone()))
592            .await
593            .unwrap();
594
595        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
596        engine.sync_account(&provider).await.unwrap();
597
598        // Get first message
599        let envelopes = store
600            .list_envelopes_by_account(&account_id, 1, 0)
601            .await
602            .unwrap();
603        let msg_id = &envelopes[0].id;
604
605        // Body should already be in store — fetched eagerly during sync
606        let body = engine.get_body(msg_id).await.unwrap();
607        assert!(body.text_plain.is_some());
608
609        // Second read — same result
610        let body2 = engine.get_body(msg_id).await.unwrap();
611        assert_eq!(body.text_plain, body2.text_plain);
612    }
613
614    #[tokio::test]
615    async fn snooze_wake() {
616        let store = Arc::new(Store::in_memory().await.unwrap());
617        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
618        let engine = SyncEngine::new(store.clone(), search.clone());
619
620        let account_id = AccountId::new();
621        store
622            .insert_account(&test_account(account_id.clone()))
623            .await
624            .unwrap();
625
626        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
627        engine.sync_account(&provider).await.unwrap();
628
629        // Get a message to snooze
630        let envelopes = store
631            .list_envelopes_by_account(&account_id, 1, 0)
632            .await
633            .unwrap();
634
635        let snoozed = Snoozed {
636            message_id: envelopes[0].id.clone(),
637            account_id: account_id.clone(),
638            snoozed_at: chrono::Utc::now(),
639            wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
640            original_labels: vec![],
641        };
642        store.insert_snooze(&snoozed).await.unwrap();
643
644        let woken = engine.check_snoozes().await.unwrap();
645        assert_eq!(woken.len(), 1);
646
647        // Should be gone now
648        let woken2 = engine.check_snoozes().await.unwrap();
649        assert_eq!(woken2.len(), 0);
650    }
651
652    #[tokio::test]
653    async fn cursor_persistence() {
654        let store = Arc::new(Store::in_memory().await.unwrap());
655        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
656        let engine = SyncEngine::new(store.clone(), search.clone());
657
658        let account_id = AccountId::new();
659        store
660            .insert_account(&test_account(account_id.clone()))
661            .await
662            .unwrap();
663
664        // Before sync, cursor should be None
665        let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
666        assert!(cursor_before.is_none());
667
668        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
669        engine.sync_account(&provider).await.unwrap();
670
671        // After sync, cursor should match FakeProvider's next_cursor (Gmail { history_id: 1 })
672        let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
673        assert!(cursor_after.is_some(), "Cursor should be set after sync");
674        let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
675        assert!(
676            cursor_json.contains("Gmail") && cursor_json.contains("1"),
677            "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
678            cursor_json
679        );
680    }
681
682    #[tokio::test]
683    async fn sync_error_does_not_crash() {
684        let store = Arc::new(Store::in_memory().await.unwrap());
685        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
686        let engine = SyncEngine::new(store.clone(), search.clone());
687
688        let account_id = AccountId::new();
689        store
690            .insert_account(&test_account(account_id.clone()))
691            .await
692            .unwrap();
693
694        let error_provider = ErrorProvider {
695            account_id: account_id.clone(),
696        };
697
698        // Should return Err, not panic
699        let result = engine.sync_account(&error_provider).await;
700        assert!(
701            result.is_err(),
702            "sync_account should return Err for failing provider"
703        );
704        let err_msg = result.unwrap_err().to_string();
705        assert!(
706            err_msg.contains("simulated"),
707            "Error should contain provider message, got: {}",
708            err_msg
709        );
710    }
711
712    #[tokio::test]
713    async fn label_counts_after_sync() {
714        let store = Arc::new(Store::in_memory().await.unwrap());
715        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
716        let engine = SyncEngine::new(store.clone(), search.clone());
717
718        let account_id = AccountId::new();
719        store
720            .insert_account(&test_account(account_id.clone()))
721            .await
722            .unwrap();
723
724        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
725        engine.sync_account(&provider).await.unwrap();
726
727        let labels = store.list_labels_by_account(&account_id).await.unwrap();
728        assert!(!labels.is_empty(), "Should have labels after sync");
729
730        let has_counts = labels
731            .iter()
732            .any(|l| l.total_count > 0 || l.unread_count > 0);
733        assert!(
734            has_counts,
735            "At least one label should have non-zero counts after sync"
736        );
737    }
738
739    #[tokio::test]
740    async fn list_envelopes_by_label_returns_results() {
741        let store = Arc::new(Store::in_memory().await.unwrap());
742        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
743        let engine = SyncEngine::new(store.clone(), search.clone());
744
745        let account_id = AccountId::new();
746        store
747            .insert_account(&test_account(account_id.clone()))
748            .await
749            .unwrap();
750
751        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
752        engine.sync_account(&provider).await.unwrap();
753
754        let labels = store.list_labels_by_account(&account_id).await.unwrap();
755
756        // Find the INBOX label
757        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
758        assert!(
759            inbox_label.total_count > 0,
760            "Inbox should have messages after sync"
761        );
762
763        // Now query envelopes by that label
764        let envelopes = store
765            .list_envelopes_by_label(&inbox_label.id, 100, 0)
766            .await
767            .unwrap();
768
769        // Also check all envelopes (no label filter)
770        let all_envelopes = store
771            .list_envelopes_by_account(&account_id, 200, 0)
772            .await
773            .unwrap();
774
775        assert!(
776            !envelopes.is_empty(),
777            "list_envelopes_by_label should return messages for Inbox label (got 0). \
778             label_id={}, total_count={}, all_count={}",
779            inbox_label.id,
780            inbox_label.total_count,
781            all_envelopes.len()
782        );
783
784        // Inbox-by-label should have same or fewer messages than all
785        assert!(
786            envelopes.len() <= all_envelopes.len(),
787            "Inbox-by-label ({}) should be <= all ({})",
788            envelopes.len(),
789            all_envelopes.len()
790        );
791    }
792
793    #[tokio::test]
794    async fn list_envelopes_by_sent_label_may_be_empty() {
795        let store = Arc::new(Store::in_memory().await.unwrap());
796        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
797        let engine = SyncEngine::new(store.clone(), search.clone());
798
799        let account_id = AccountId::new();
800        store
801            .insert_account(&test_account(account_id.clone()))
802            .await
803            .unwrap();
804
805        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
806        engine.sync_account(&provider).await.unwrap();
807
808        let labels = store.list_labels_by_account(&account_id).await.unwrap();
809
810        // Find Sent label
811        let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
812
813        let envelopes = store
814            .list_envelopes_by_label(&sent_label.id, 100, 0)
815            .await
816            .unwrap();
817
818        // Sent has no messages in fake provider (no SENT flags set)
819        assert_eq!(
820            envelopes.len(),
821            0,
822            "Sent should have 0 messages in fake provider"
823        );
824
825        // But Inbox should still have messages
826        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
827        let inbox_envelopes = store
828            .list_envelopes_by_label(&inbox_label.id, 100, 0)
829            .await
830            .unwrap();
831        assert!(
832            !inbox_envelopes.is_empty(),
833            "Inbox should still have messages after querying Sent"
834        );
835
836        // And listing ALL envelopes (no label filter) should still work
837        let all_envelopes = store
838            .list_envelopes_by_account(&account_id, 100, 0)
839            .await
840            .unwrap();
841        assert!(
842            !all_envelopes.is_empty(),
843            "All envelopes should still be retrievable"
844        );
845    }
846
847    #[tokio::test]
848    async fn progressive_loading_chunks() {
849        let store = Arc::new(Store::in_memory().await.unwrap());
850        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
851        let engine = SyncEngine::new(store.clone(), search.clone());
852
853        let account_id = AccountId::new();
854        store
855            .insert_account(&test_account(account_id.clone()))
856            .await
857            .unwrap();
858
859        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
860        let count = engine.sync_account(&provider).await.unwrap();
861        assert_eq!(count, 55, "Sync should report 55 messages processed");
862
863        // Verify store has exactly 55 envelopes
864        let envelopes = store
865            .list_envelopes_by_account(&account_id, 200, 0)
866            .await
867            .unwrap();
868        assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
869
870        // Verify search index has results for known fixture terms
871        let results = search.lock().await.search("deployment", 10).unwrap();
872        assert!(
873            !results.is_empty(),
874            "Search index should have 'deployment' results"
875        );
876    }
877
878    #[tokio::test]
879    async fn delta_sync_no_duplicate_labels() {
880        let store = Arc::new(Store::in_memory().await.unwrap());
881        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
882        let engine = SyncEngine::new(store.clone(), search.clone());
883
884        let account_id = AccountId::new();
885        store
886            .insert_account(&test_account(account_id.clone()))
887            .await
888            .unwrap();
889
890        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
891
892        // Initial sync
893        engine.sync_account(&provider).await.unwrap();
894
895        let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
896        let label_count_first = labels_after_first.len();
897
898        // Delta sync (should return 0 new messages)
899        let delta_count = engine.sync_account(&provider).await.unwrap();
900        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
901
902        let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
903
904        // Label rows should not be duplicated
905        assert_eq!(
906            label_count_first,
907            labels_after_second.len(),
908            "Label count should not change after delta sync"
909        );
910
911        // Verify each label still exists with the correct provider_id
912        for label in &labels_after_first {
913            let still_exists = labels_after_second
914                .iter()
915                .any(|l| l.provider_id == label.provider_id && l.name == label.name);
916            assert!(
917                still_exists,
918                "Label '{}' (provider_id='{}') should survive delta sync",
919                label.name, label.provider_id
920            );
921        }
922
923        // Verify messages are still in the store (upsert_envelope uses INSERT OR REPLACE
924        // on messages table, which is not affected by label cascade)
925        let envelopes = store
926            .list_envelopes_by_account(&account_id, 200, 0)
927            .await
928            .unwrap();
929        assert_eq!(
930            envelopes.len(),
931            55,
932            "All 55 messages should survive delta sync"
933        );
934    }
935
936    #[tokio::test]
937    async fn delta_sync_preserves_junction_table() {
938        let store = Arc::new(Store::in_memory().await.unwrap());
939        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
940        let engine = SyncEngine::new(store.clone(), search.clone());
941
942        let account_id = AccountId::new();
943        store
944            .insert_account(&test_account(account_id.clone()))
945            .await
946            .unwrap();
947
948        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
949
950        // Initial sync
951        engine.sync_account(&provider).await.unwrap();
952
953        let junction_before = store.count_message_labels().await.unwrap();
954        assert!(
955            junction_before > 0,
956            "Junction table should be populated after initial sync"
957        );
958
959        // Delta sync (labels get re-upserted, no new messages)
960        let delta_count = engine.sync_account(&provider).await.unwrap();
961        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
962
963        let junction_after = store.count_message_labels().await.unwrap();
964        assert_eq!(
965            junction_before, junction_after,
966            "Junction table should survive delta sync (before={}, after={})",
967            junction_before, junction_after
968        );
969
970        // Verify label filtering still works
971        let labels = store.list_labels_by_account(&account_id).await.unwrap();
972        let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
973        let envelopes = store
974            .list_envelopes_by_label(&inbox.id, 100, 0)
975            .await
976            .unwrap();
977        assert!(
978            !envelopes.is_empty(),
979            "Inbox should still return messages after delta sync"
980        );
981    }
982
983    #[tokio::test]
984    async fn backfill_triggers_when_junction_empty() {
985        let store = Arc::new(Store::in_memory().await.unwrap());
986        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
987        let engine = SyncEngine::new(store.clone(), search.clone());
988
989        let account_id = AccountId::new();
990        store
991            .insert_account(&test_account(account_id.clone()))
992            .await
993            .unwrap();
994
995        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
996
997        // Initial sync
998        engine.sync_account(&provider).await.unwrap();
999
1000        let junction_before = store.count_message_labels().await.unwrap();
1001        assert!(junction_before > 0);
1002
1003        // Wipe junction table manually (simulates corrupted DB)
1004        sqlx::query("DELETE FROM message_labels")
1005            .execute(store.writer())
1006            .await
1007            .unwrap();
1008
1009        let junction_wiped = store.count_message_labels().await.unwrap();
1010        assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1011
1012        // Sync again — should detect empty junction and backfill
1013        engine.sync_account(&provider).await.unwrap();
1014
1015        let junction_after = store.count_message_labels().await.unwrap();
1016        assert!(
1017            junction_after > 0,
1018            "Junction table should be repopulated after backfill (got {})",
1019            junction_after
1020        );
1021    }
1022
1023    #[tokio::test]
1024    async fn sync_label_resolution_matches_gmail_ids() {
1025        let store = Arc::new(Store::in_memory().await.unwrap());
1026        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1027        let engine = SyncEngine::new(store.clone(), search.clone());
1028
1029        let account_id = AccountId::new();
1030        store
1031            .insert_account(&test_account(account_id.clone()))
1032            .await
1033            .unwrap();
1034
1035        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1036        engine.sync_account(&provider).await.unwrap();
1037
1038        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1039
1040        // FakeProvider uses Gmail-style IDs: "INBOX", "SENT", "TRASH", etc.
1041        // Verify each label has a matching provider_id
1042        let expected_mappings = [
1043            ("Inbox", "INBOX"),
1044            ("Sent", "SENT"),
1045            ("Trash", "TRASH"),
1046            ("Spam", "SPAM"),
1047            ("Starred", "STARRED"),
1048            ("Work", "work"),
1049            ("Personal", "personal"),
1050            ("Newsletters", "newsletters"),
1051        ];
1052        for (name, expected_pid) in &expected_mappings {
1053            let label = labels.iter().find(|l| l.name == *name);
1054            assert!(label.is_some(), "Label '{}' should exist after sync", name);
1055            assert_eq!(
1056                label.unwrap().provider_id,
1057                *expected_pid,
1058                "Label '{}' should have provider_id '{}'",
1059                name,
1060                expected_pid
1061            );
1062        }
1063
1064        // For each message, verify junction table entries point to valid labels
1065        let envelopes = store
1066            .list_envelopes_by_account(&account_id, 200, 0)
1067            .await
1068            .unwrap();
1069        let label_ids: std::collections::HashSet<String> =
1070            labels.iter().map(|l| l.id.as_str().to_string()).collect();
1071
1072        for env in &envelopes {
1073            let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1074            for lid in &msg_label_ids {
1075                assert!(
1076                    label_ids.contains(&lid.as_str().to_string()),
1077                    "Junction entry for message {} points to nonexistent label {}",
1078                    env.id,
1079                    lid
1080                );
1081            }
1082        }
1083    }
1084
1085    #[tokio::test]
1086    async fn list_envelopes_by_each_label_returns_correct_count() {
1087        let store = Arc::new(Store::in_memory().await.unwrap());
1088        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1089        let engine = SyncEngine::new(store.clone(), search.clone());
1090
1091        let account_id = AccountId::new();
1092        store
1093            .insert_account(&test_account(account_id.clone()))
1094            .await
1095            .unwrap();
1096
1097        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1098        engine.sync_account(&provider).await.unwrap();
1099
1100        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1101        for label in &labels {
1102            if label.total_count > 0 {
1103                let envelopes = store
1104                    .list_envelopes_by_label(&label.id, 200, 0)
1105                    .await
1106                    .unwrap();
1107                assert_eq!(
1108                    envelopes.len(),
1109                    label.total_count as usize,
1110                    "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1111                    label.name,
1112                    label.provider_id,
1113                    label.total_count,
1114                    envelopes.len()
1115                );
1116            }
1117        }
1118    }
1119
1120    #[tokio::test]
1121    async fn search_index_consistent_with_store() {
1122        let store = Arc::new(Store::in_memory().await.unwrap());
1123        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1124        let engine = SyncEngine::new(store.clone(), search.clone());
1125
1126        let account_id = AccountId::new();
1127        store
1128            .insert_account(&test_account(account_id.clone()))
1129            .await
1130            .unwrap();
1131
1132        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1133        engine.sync_account(&provider).await.unwrap();
1134
1135        let envelopes = store
1136            .list_envelopes_by_account(&account_id, 200, 0)
1137            .await
1138            .unwrap();
1139
1140        let search_guard = search.lock().await;
1141        for env in &envelopes {
1142            // Extract a distinctive keyword from the subject
1143            let keyword = env
1144                .subject
1145                .split_whitespace()
1146                .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1147                .unwrap_or(&env.subject);
1148            let results = search_guard.search(keyword, 100).unwrap();
1149            assert!(
1150                results.iter().any(|r| r.message_id == env.id.as_str()),
1151                "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1152                env.id,
1153                env.subject,
1154                keyword
1155            );
1156        }
1157    }
1158
1159    #[tokio::test]
1160    async fn mutation_flags_persist_through_store() {
1161        let store = Arc::new(Store::in_memory().await.unwrap());
1162        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1163        let engine = SyncEngine::new(store.clone(), search.clone());
1164
1165        let account_id = AccountId::new();
1166        store
1167            .insert_account(&test_account(account_id.clone()))
1168            .await
1169            .unwrap();
1170
1171        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1172        engine.sync_account(&provider).await.unwrap();
1173
1174        let envelopes = store
1175            .list_envelopes_by_account(&account_id, 1, 0)
1176            .await
1177            .unwrap();
1178        let msg_id = &envelopes[0].id;
1179        let initial_flags = envelopes[0].flags;
1180
1181        // Set starred
1182        store.set_starred(msg_id, true).await.unwrap();
1183        store.set_read(msg_id, true).await.unwrap();
1184
1185        let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1186        assert!(
1187            updated.flags.contains(MessageFlags::STARRED),
1188            "STARRED flag should be set"
1189        );
1190        assert!(
1191            updated.flags.contains(MessageFlags::READ),
1192            "READ flag should be set"
1193        );
1194
1195        // Clear starred, keep read
1196        store.set_starred(msg_id, false).await.unwrap();
1197        let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1198        assert!(
1199            !updated2.flags.contains(MessageFlags::STARRED),
1200            "STARRED flag should be cleared after set_starred(false)"
1201        );
1202        assert!(
1203            updated2.flags.contains(MessageFlags::READ),
1204            "READ flag should still be set after clearing STARRED"
1205        );
1206
1207        // Verify initial flags were different (test is meaningful)
1208        // At least one flag mutation should have changed something
1209        let _ = initial_flags; // used to confirm the test exercises real mutations
1210    }
1211
1212    #[tokio::test]
1213    async fn junction_table_survives_message_update() {
1214        let store = Arc::new(Store::in_memory().await.unwrap());
1215        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1216        let engine = SyncEngine::new(store.clone(), search.clone());
1217
1218        let account_id = AccountId::new();
1219        store
1220            .insert_account(&test_account(account_id.clone()))
1221            .await
1222            .unwrap();
1223
1224        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1225        engine.sync_account(&provider).await.unwrap();
1226
1227        // Count junction rows for first message
1228        let envelopes = store
1229            .list_envelopes_by_account(&account_id, 1, 0)
1230            .await
1231            .unwrap();
1232        let msg_id = &envelopes[0].id;
1233
1234        let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1235        assert!(
1236            !junction_before.is_empty(),
1237            "Message should have label associations after sync"
1238        );
1239
1240        // Re-upsert the same envelope (simulates re-sync)
1241        store.upsert_envelope(&envelopes[0]).await.unwrap();
1242        // Re-set labels (same as sync engine does)
1243        store
1244            .set_message_labels(msg_id, &junction_before)
1245            .await
1246            .unwrap();
1247
1248        let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1249        assert_eq!(
1250            junction_before.len(),
1251            junction_after.len(),
1252            "Junction rows should not double after re-sync (before={}, after={})",
1253            junction_before.len(),
1254            junction_after.len()
1255        );
1256    }
1257
1258    #[tokio::test]
1259    async fn find_labels_by_provider_ids_with_unknown_ids() {
1260        let store = Arc::new(Store::in_memory().await.unwrap());
1261        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1262        let engine = SyncEngine::new(store.clone(), search.clone());
1263
1264        let account_id = AccountId::new();
1265        store
1266            .insert_account(&test_account(account_id.clone()))
1267            .await
1268            .unwrap();
1269
1270        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1271        engine.sync_account(&provider).await.unwrap();
1272
1273        let result = store
1274            .find_labels_by_provider_ids(
1275                &account_id,
1276                &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1277            )
1278            .await
1279            .unwrap();
1280
1281        assert_eq!(
1282            result.len(),
1283            1,
1284            "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1285        );
1286    }
1287
1288    #[tokio::test]
1289    async fn body_available_after_sync() {
1290        let store = Arc::new(Store::in_memory().await.unwrap());
1291        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1292        let engine = SyncEngine::new(store.clone(), search.clone());
1293
1294        let account_id = AccountId::new();
1295        store
1296            .insert_account(&test_account(account_id.clone()))
1297            .await
1298            .unwrap();
1299
1300        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1301        engine.sync_account(&provider).await.unwrap();
1302
1303        let envelopes = store
1304            .list_envelopes_by_account(&account_id, 1, 0)
1305            .await
1306            .unwrap();
1307        let msg_id = &envelopes[0].id;
1308
1309        // Body already available — stored eagerly during sync
1310        let body1 = engine.get_body(msg_id).await.unwrap();
1311        assert!(body1.text_plain.is_some(), "Body should have text_plain");
1312
1313        // Second read — same result from store
1314        let body2 = engine.get_body(msg_id).await.unwrap();
1315
1316        assert_eq!(
1317            body1.text_plain, body2.text_plain,
1318            "Body text_plain should be consistent"
1319        );
1320        assert_eq!(
1321            body1.text_html, body2.text_html,
1322            "Body text_html should be consistent"
1323        );
1324        assert_eq!(
1325            body1.attachments.len(),
1326            body2.attachments.len(),
1327            "Body attachments count should be consistent"
1328        );
1329    }
1330
1331    #[tokio::test]
1332    async fn recalculate_label_counts_matches_junction() {
1333        let store = Arc::new(Store::in_memory().await.unwrap());
1334        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1335        let engine = SyncEngine::new(store.clone(), search.clone());
1336
1337        let account_id = AccountId::new();
1338        store
1339            .insert_account(&test_account(account_id.clone()))
1340            .await
1341            .unwrap();
1342
1343        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1344        engine.sync_account(&provider).await.unwrap();
1345
1346        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1347
1348        for label in &labels {
1349            let lid = label.id.as_str();
1350            // Manually count junction rows for this label
1351            let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1352                "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1353            )
1354            .bind(&lid)
1355            .fetch_one(store.reader())
1356            .await
1357            .unwrap();
1358
1359            assert_eq!(
1360                label.total_count as i64, junction_count,
1361                "Label '{}' total_count ({}) should match junction row count ({})",
1362                label.name, label.total_count, junction_count
1363            );
1364
1365            // Also verify unread count
1366            let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1367                "SELECT COUNT(*) FROM message_labels ml \
1368                 JOIN messages m ON m.id = ml.message_id \
1369                 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1370            )
1371            .bind(&lid)
1372            .fetch_one(store.reader())
1373            .await
1374            .unwrap();
1375
1376            assert_eq!(
1377                label.unread_count as i64, unread_count,
1378                "Label '{}' unread_count ({}) should match computed unread count ({})",
1379                label.name, label.unread_count, unread_count
1380            );
1381        }
1382    }
1383}