Skip to main content

mxr_sync/
lib.rs

1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7    use super::*;
8    use mxr_core::id::*;
9    use mxr_core::types::*;
10    use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11    use mxr_search::SearchIndex;
12    use mxr_store::Store;
13    use std::sync::Arc;
14    use tokio::sync::Mutex;
15
16    /// A provider that always returns errors from sync_messages, for testing error handling.
17    struct ErrorProvider {
18        account_id: AccountId,
19    }
20
21    #[async_trait::async_trait]
22    impl MailSyncProvider for ErrorProvider {
23        fn name(&self) -> &str {
24            "error"
25        }
26        fn account_id(&self) -> &AccountId {
27            &self.account_id
28        }
29        fn capabilities(&self) -> SyncCapabilities {
30            SyncCapabilities {
31                labels: false,
32                server_search: false,
33                delta_sync: false,
34                push: false,
35                batch_operations: false,
36                native_thread_ids: true,
37            }
38        }
39        async fn authenticate(&mut self) -> Result<(), MxrError> {
40            Ok(())
41        }
42        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43            Ok(())
44        }
45        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46            Ok(vec![])
47        }
48        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49            Err(MxrError::Provider("simulated sync error".into()))
50        }
51        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52            Err(MxrError::Provider("simulated attachment error".into()))
53        }
54        async fn modify_labels(
55            &self,
56            _id: &str,
57            _add: &[String],
58            _rm: &[String],
59        ) -> Result<(), MxrError> {
60            Err(MxrError::Provider("simulated error".into()))
61        }
62        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63            Err(MxrError::Provider("simulated error".into()))
64        }
65        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66            Err(MxrError::Provider("simulated error".into()))
67        }
68        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69            Err(MxrError::Provider("simulated error".into()))
70        }
71    }
72
73    fn test_account(account_id: AccountId) -> mxr_core::Account {
74        mxr_core::Account {
75            id: account_id,
76            name: "Fake Account".to_string(),
77            email: "user@example.com".to_string(),
78            sync_backend: Some(BackendRef {
79                provider_kind: ProviderKind::Fake,
80                config_key: "fake".to_string(),
81            }),
82            send_backend: None,
83            enabled: true,
84        }
85    }
86
87    /// Provider that returns label_changes on delta sync for testing the label change code path.
88    struct DeltaLabelProvider {
89        account_id: AccountId,
90        messages: Vec<SyncedMessage>,
91        labels: Vec<Label>,
92        label_changes: Vec<LabelChange>,
93    }
94
95    impl DeltaLabelProvider {
96        fn new(
97            account_id: AccountId,
98            messages: Vec<Envelope>,
99            labels: Vec<Label>,
100            label_changes: Vec<LabelChange>,
101        ) -> Self {
102            let messages = messages
103                .into_iter()
104                .map(|env| SyncedMessage {
105                    body: make_empty_body(&env.id),
106                    envelope: env,
107                })
108                .collect();
109            Self {
110                account_id,
111                messages,
112                labels,
113                label_changes,
114            }
115        }
116    }
117
118    struct ThreadingProvider {
119        account_id: AccountId,
120        messages: Vec<SyncedMessage>,
121    }
122
123    struct RecoveringNotFoundProvider {
124        account_id: AccountId,
125        message: SyncedMessage,
126        calls: std::sync::Mutex<Vec<SyncCursor>>,
127    }
128
129    #[async_trait::async_trait]
130    impl MailSyncProvider for ThreadingProvider {
131        fn name(&self) -> &str {
132            "threading"
133        }
134
135        fn account_id(&self) -> &AccountId {
136            &self.account_id
137        }
138
139        fn capabilities(&self) -> SyncCapabilities {
140            SyncCapabilities {
141                labels: false,
142                server_search: false,
143                delta_sync: false,
144                push: false,
145                batch_operations: false,
146                native_thread_ids: false,
147            }
148        }
149
150        async fn authenticate(&mut self) -> Result<(), MxrError> {
151            Ok(())
152        }
153
154        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
155            Ok(())
156        }
157
158        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
159            Ok(vec![])
160        }
161
162        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
163            Ok(SyncBatch {
164                upserted: self.messages.clone(),
165                deleted_provider_ids: vec![],
166                label_changes: vec![],
167                next_cursor: SyncCursor::Initial,
168            })
169        }
170
171        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
172            Err(MxrError::NotFound("no attachment".into()))
173        }
174
175        async fn modify_labels(
176            &self,
177            _id: &str,
178            _add: &[String],
179            _rm: &[String],
180        ) -> Result<(), MxrError> {
181            Ok(())
182        }
183
184        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
185            Ok(())
186        }
187
188        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
189            Ok(())
190        }
191
192        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
193            Ok(())
194        }
195    }
196
197    #[async_trait::async_trait]
198    impl MailSyncProvider for RecoveringNotFoundProvider {
199        fn name(&self) -> &str {
200            "recovering-not-found"
201        }
202
203        fn account_id(&self) -> &AccountId {
204            &self.account_id
205        }
206
207        fn capabilities(&self) -> SyncCapabilities {
208            SyncCapabilities {
209                labels: false,
210                server_search: false,
211                delta_sync: true,
212                push: false,
213                batch_operations: false,
214                native_thread_ids: true,
215            }
216        }
217
218        async fn authenticate(&mut self) -> Result<(), MxrError> {
219            Ok(())
220        }
221
222        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
223            Ok(())
224        }
225
226        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227            Ok(vec![])
228        }
229
230        async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
231            self.calls.lock().unwrap().push(cursor.clone());
232            match cursor {
233                SyncCursor::Gmail { .. } => {
234                    Err(MxrError::NotFound("Requested entity was not found.".into()))
235                }
236                SyncCursor::Initial => Ok(SyncBatch {
237                    upserted: vec![self.message.clone()],
238                    deleted_provider_ids: vec![],
239                    label_changes: vec![],
240                    next_cursor: SyncCursor::Gmail { history_id: 22 },
241                }),
242                other => panic!("unexpected cursor in test: {other:?}"),
243            }
244        }
245
246        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
247            Err(MxrError::NotFound("no attachment".into()))
248        }
249
250        async fn modify_labels(
251            &self,
252            _id: &str,
253            _add: &[String],
254            _rm: &[String],
255        ) -> Result<(), MxrError> {
256            Ok(())
257        }
258
259        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
260            Ok(())
261        }
262
263        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
264            Ok(())
265        }
266
267        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
268            Ok(())
269        }
270    }
271
272    fn make_empty_body(message_id: &MessageId) -> MessageBody {
273        MessageBody {
274            message_id: message_id.clone(),
275            text_plain: Some("test body".to_string()),
276            text_html: None,
277            attachments: vec![],
278            fetched_at: chrono::Utc::now(),
279            metadata: MessageMetadata::default(),
280        }
281    }
282
283    #[async_trait::async_trait]
284    impl MailSyncProvider for DeltaLabelProvider {
285        fn name(&self) -> &str {
286            "delta-label"
287        }
288        fn account_id(&self) -> &AccountId {
289            &self.account_id
290        }
291        fn capabilities(&self) -> SyncCapabilities {
292            SyncCapabilities {
293                labels: true,
294                server_search: false,
295                delta_sync: true,
296                push: false,
297                batch_operations: false,
298                native_thread_ids: true,
299            }
300        }
301        async fn authenticate(&mut self) -> Result<(), MxrError> {
302            Ok(())
303        }
304        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
305            Ok(())
306        }
307        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
308            Ok(self.labels.clone())
309        }
310        async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
311            match cursor {
312                SyncCursor::Initial => Ok(SyncBatch {
313                    upserted: self.messages.clone(),
314                    deleted_provider_ids: vec![],
315                    label_changes: vec![],
316                    next_cursor: SyncCursor::Gmail { history_id: 100 },
317                }),
318                _ => Ok(SyncBatch {
319                    upserted: vec![],
320                    deleted_provider_ids: vec![],
321                    label_changes: self.label_changes.clone(),
322                    next_cursor: SyncCursor::Gmail { history_id: 200 },
323                }),
324            }
325        }
326        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
327            Err(MxrError::NotFound("no attachment".into()))
328        }
329        async fn modify_labels(
330            &self,
331            _id: &str,
332            _add: &[String],
333            _rm: &[String],
334        ) -> Result<(), MxrError> {
335            Ok(())
336        }
337        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
338            Ok(())
339        }
340        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
341            Ok(())
342        }
343        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
344            Ok(())
345        }
346    }
347
348    fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
349        Label {
350            id: LabelId::new(),
351            account_id: account_id.clone(),
352            name: name.to_string(),
353            kind: LabelKind::System,
354            color: None,
355            provider_id: provider_id.to_string(),
356            unread_count: 0,
357            total_count: 0,
358        }
359    }
360
361    fn make_test_envelope(
362        account_id: &AccountId,
363        provider_id: &str,
364        label_provider_ids: Vec<String>,
365    ) -> Envelope {
366        Envelope {
367            id: MessageId::new(),
368            account_id: account_id.clone(),
369            provider_id: provider_id.to_string(),
370            thread_id: ThreadId::new(),
371            message_id_header: None,
372            in_reply_to: None,
373            references: vec![],
374            from: mxr_core::Address {
375                name: Some("Test".to_string()),
376                email: "test@example.com".to_string(),
377            },
378            to: vec![],
379            cc: vec![],
380            bcc: vec![],
381            subject: "Test message".to_string(),
382            date: chrono::Utc::now(),
383            flags: MessageFlags::empty(),
384            snippet: "Test snippet".to_string(),
385            has_attachments: false,
386            size_bytes: 1000,
387            unsubscribe: UnsubscribeMethod::None,
388            label_provider_ids,
389        }
390    }
391
392    #[tokio::test]
393    async fn delta_sync_applies_label_additions() {
394        let store = Arc::new(Store::in_memory().await.unwrap());
395        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
396        let engine = SyncEngine::new(store.clone(), search.clone());
397
398        let account_id = AccountId::new();
399        store
400            .insert_account(&test_account(account_id.clone()))
401            .await
402            .unwrap();
403
404        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
405        let starred = make_test_label(&account_id, "Starred", "STARRED");
406        let labels = vec![inbox.clone(), starred.clone()];
407
408        let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
409        let msg_provider_id = msg.provider_id.clone();
410
411        let provider = DeltaLabelProvider::new(
412            account_id.clone(),
413            vec![msg],
414            labels,
415            vec![LabelChange {
416                provider_message_id: msg_provider_id.clone(),
417                added_labels: vec!["STARRED".to_string()],
418                removed_labels: vec![],
419            }],
420        );
421
422        // Initial sync
423        engine.sync_account(&provider).await.unwrap();
424
425        // Verify msg has INBOX label
426        let msg_id = store
427            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
428            .await
429            .unwrap()
430            .unwrap();
431        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
432        assert!(labels_before.contains(&inbox.id));
433        assert!(!labels_before.contains(&starred.id));
434
435        // Delta sync — adds STARRED label
436        engine.sync_account(&provider).await.unwrap();
437
438        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
439        assert!(
440            labels_after.contains(&inbox.id),
441            "INBOX should still be present"
442        );
443        assert!(
444            labels_after.contains(&starred.id),
445            "STARRED should be added by delta"
446        );
447    }
448
449    #[tokio::test]
450    async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
451        let store = Arc::new(Store::in_memory().await.unwrap());
452        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
453        let engine = SyncEngine::new(store.clone(), search);
454
455        let account_id = AccountId::new();
456        store
457            .insert_account(&test_account(account_id.clone()))
458            .await
459            .unwrap();
460
461        let first_id = MessageId::new();
462        let second_id = MessageId::new();
463        let first = SyncedMessage {
464            envelope: Envelope {
465                id: first_id.clone(),
466                account_id: account_id.clone(),
467                provider_id: "prov-thread-1".into(),
468                thread_id: ThreadId::new(),
469                message_id_header: Some("<root@example.com>".into()),
470                in_reply_to: None,
471                references: vec![],
472                from: mxr_core::Address {
473                    name: Some("Alice".into()),
474                    email: "alice@example.com".into(),
475                },
476                to: vec![],
477                cc: vec![],
478                bcc: vec![],
479                subject: "Topic".into(),
480                date: chrono::Utc::now() - chrono::Duration::minutes(5),
481                flags: MessageFlags::empty(),
482                snippet: "first".into(),
483                has_attachments: false,
484                size_bytes: 100,
485                unsubscribe: UnsubscribeMethod::None,
486                label_provider_ids: vec![],
487            },
488            body: make_empty_body(&first_id),
489        };
490        let second = SyncedMessage {
491            envelope: Envelope {
492                id: second_id.clone(),
493                account_id: account_id.clone(),
494                provider_id: "prov-thread-2".into(),
495                thread_id: ThreadId::new(),
496                message_id_header: Some("<reply@example.com>".into()),
497                in_reply_to: Some("<root@example.com>".into()),
498                references: vec!["<root@example.com>".into()],
499                from: mxr_core::Address {
500                    name: Some("Bob".into()),
501                    email: "bob@example.com".into(),
502                },
503                to: vec![],
504                cc: vec![],
505                bcc: vec![],
506                subject: "Re: Topic".into(),
507                date: chrono::Utc::now(),
508                flags: MessageFlags::empty(),
509                snippet: "second".into(),
510                has_attachments: false,
511                size_bytes: 100,
512                unsubscribe: UnsubscribeMethod::None,
513                label_provider_ids: vec![],
514            },
515            body: make_empty_body(&second_id),
516        };
517
518        let provider = ThreadingProvider {
519            account_id: account_id.clone(),
520            messages: vec![first, second],
521        };
522
523        engine.sync_account(&provider).await.unwrap();
524
525        let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
526        let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
527        assert_eq!(first_env.thread_id, second_env.thread_id);
528
529        let thread = store
530            .get_thread(&first_env.thread_id)
531            .await
532            .unwrap()
533            .unwrap();
534        assert_eq!(thread.message_count, 2);
535    }
536
537    #[tokio::test]
538    async fn delta_sync_applies_label_removals() {
539        let store = Arc::new(Store::in_memory().await.unwrap());
540        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
541        let engine = SyncEngine::new(store.clone(), search.clone());
542
543        let account_id = AccountId::new();
544        store
545            .insert_account(&test_account(account_id.clone()))
546            .await
547            .unwrap();
548
549        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
550        let starred = make_test_label(&account_id, "Starred", "STARRED");
551        let labels = vec![inbox.clone(), starred.clone()];
552
553        let msg = make_test_envelope(
554            &account_id,
555            "prov-msg-2",
556            vec!["INBOX".to_string(), "STARRED".to_string()],
557        );
558        let msg_provider_id = msg.provider_id.clone();
559
560        let provider = DeltaLabelProvider::new(
561            account_id.clone(),
562            vec![msg],
563            labels,
564            vec![LabelChange {
565                provider_message_id: msg_provider_id.clone(),
566                added_labels: vec![],
567                removed_labels: vec!["STARRED".to_string()],
568            }],
569        );
570
571        // Initial sync
572        engine.sync_account(&provider).await.unwrap();
573
574        let msg_id = store
575            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
576            .await
577            .unwrap()
578            .unwrap();
579        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
580        assert!(
581            labels_before.contains(&starred.id),
582            "STARRED should be present after initial sync"
583        );
584
585        // Delta sync — removes STARRED
586        engine.sync_account(&provider).await.unwrap();
587
588        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
589        assert!(labels_after.contains(&inbox.id), "INBOX should remain");
590        assert!(
591            !labels_after.contains(&starred.id),
592            "STARRED should be removed by delta"
593        );
594    }
595
596    #[tokio::test]
597    async fn delta_sync_handles_unknown_provider_message() {
598        let store = Arc::new(Store::in_memory().await.unwrap());
599        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
600        let engine = SyncEngine::new(store.clone(), search.clone());
601
602        let account_id = AccountId::new();
603        store
604            .insert_account(&test_account(account_id.clone()))
605            .await
606            .unwrap();
607
608        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
609        let labels = vec![inbox.clone()];
610
611        let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
612
613        let provider = DeltaLabelProvider::new(
614            account_id.clone(),
615            vec![msg],
616            labels,
617            vec![LabelChange {
618                // This provider_message_id doesn't exist in our store
619                provider_message_id: "nonexistent-msg".to_string(),
620                added_labels: vec!["INBOX".to_string()],
621                removed_labels: vec![],
622            }],
623        );
624
625        // Initial sync
626        engine.sync_account(&provider).await.unwrap();
627
628        // Delta sync — should not crash on unknown message
629        let result = engine.sync_account(&provider).await;
630        assert!(
631            result.is_ok(),
632            "Delta sync should gracefully skip unknown messages"
633        );
634    }
635
636    #[tokio::test]
637    async fn sync_populates_store_and_search() {
638        let store = Arc::new(Store::in_memory().await.unwrap());
639        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
640        let engine = SyncEngine::new(store.clone(), search.clone());
641
642        let account_id = AccountId::new();
643        store
644            .insert_account(&test_account(account_id.clone()))
645            .await
646            .unwrap();
647
648        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
649        let count = engine.sync_account(&provider).await.unwrap();
650        assert_eq!(count, 55);
651
652        // Verify store
653        let envelopes = store
654            .list_envelopes_by_account(&account_id, 100, 0)
655            .await
656            .unwrap();
657        assert_eq!(envelopes.len(), 55);
658
659        // Verify search
660        let results = search
661            .lock()
662            .await
663            .search("deployment", 10, 0, SortOrder::DateDesc)
664            .unwrap();
665        assert!(!results.results.is_empty());
666    }
667
668    #[tokio::test]
669    async fn bodies_stored_eagerly_during_sync() {
670        let store = Arc::new(Store::in_memory().await.unwrap());
671        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
672        let engine = SyncEngine::new(store.clone(), search.clone());
673
674        let account_id = AccountId::new();
675        store
676            .insert_account(&test_account(account_id.clone()))
677            .await
678            .unwrap();
679
680        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
681        engine.sync_account(&provider).await.unwrap();
682
683        // Get first message
684        let envelopes = store
685            .list_envelopes_by_account(&account_id, 1, 0)
686            .await
687            .unwrap();
688        let msg_id = &envelopes[0].id;
689
690        // Body should already be in store — fetched eagerly during sync
691        let body = engine.get_body(msg_id).await.unwrap();
692        assert!(body.text_plain.is_some());
693
694        // Second read — same result
695        let body2 = engine.get_body(msg_id).await.unwrap();
696        assert_eq!(body.text_plain, body2.text_plain);
697    }
698
699    #[tokio::test]
700    async fn snooze_wake() {
701        let store = Arc::new(Store::in_memory().await.unwrap());
702        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
703        let engine = SyncEngine::new(store.clone(), search.clone());
704
705        let account_id = AccountId::new();
706        store
707            .insert_account(&test_account(account_id.clone()))
708            .await
709            .unwrap();
710
711        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
712        engine.sync_account(&provider).await.unwrap();
713
714        // Get a message to snooze
715        let envelopes = store
716            .list_envelopes_by_account(&account_id, 1, 0)
717            .await
718            .unwrap();
719
720        let snoozed = Snoozed {
721            message_id: envelopes[0].id.clone(),
722            account_id: account_id.clone(),
723            snoozed_at: chrono::Utc::now(),
724            wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
725            original_labels: vec![],
726        };
727        store.insert_snooze(&snoozed).await.unwrap();
728
729        let woken = engine.check_snoozes().await.unwrap();
730        assert_eq!(woken.len(), 1);
731
732        // Should be gone now
733        let woken2 = engine.check_snoozes().await.unwrap();
734        assert_eq!(woken2.len(), 0);
735    }
736
737    #[tokio::test]
738    async fn cursor_persistence() {
739        let store = Arc::new(Store::in_memory().await.unwrap());
740        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
741        let engine = SyncEngine::new(store.clone(), search.clone());
742
743        let account_id = AccountId::new();
744        store
745            .insert_account(&test_account(account_id.clone()))
746            .await
747            .unwrap();
748
749        // Before sync, cursor should be None
750        let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
751        assert!(cursor_before.is_none());
752
753        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
754        engine.sync_account(&provider).await.unwrap();
755
756        // After sync, cursor should match FakeProvider's next_cursor (Gmail { history_id: 1 })
757        let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
758        assert!(cursor_after.is_some(), "Cursor should be set after sync");
759        let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
760        assert!(
761            cursor_json.contains("Gmail") && cursor_json.contains("1"),
762            "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
763            cursor_json
764        );
765    }
766
767    #[tokio::test]
768    async fn sync_error_does_not_crash() {
769        let store = Arc::new(Store::in_memory().await.unwrap());
770        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
771        let engine = SyncEngine::new(store.clone(), search.clone());
772
773        let account_id = AccountId::new();
774        store
775            .insert_account(&test_account(account_id.clone()))
776            .await
777            .unwrap();
778
779        let error_provider = ErrorProvider {
780            account_id: account_id.clone(),
781        };
782
783        // Should return Err, not panic
784        let result = engine.sync_account(&error_provider).await;
785        assert!(
786            result.is_err(),
787            "sync_account should return Err for failing provider"
788        );
789        let err_msg = result.unwrap_err().to_string();
790        assert!(
791            err_msg.contains("simulated"),
792            "Error should contain provider message, got: {}",
793            err_msg
794        );
795    }
796
797    #[tokio::test]
798    async fn label_counts_after_sync() {
799        let store = Arc::new(Store::in_memory().await.unwrap());
800        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
801        let engine = SyncEngine::new(store.clone(), search.clone());
802
803        let account_id = AccountId::new();
804        store
805            .insert_account(&test_account(account_id.clone()))
806            .await
807            .unwrap();
808
809        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
810        engine.sync_account(&provider).await.unwrap();
811
812        let labels = store.list_labels_by_account(&account_id).await.unwrap();
813        assert!(!labels.is_empty(), "Should have labels after sync");
814
815        let has_counts = labels
816            .iter()
817            .any(|l| l.total_count > 0 || l.unread_count > 0);
818        assert!(
819            has_counts,
820            "At least one label should have non-zero counts after sync"
821        );
822    }
823
824    #[tokio::test]
825    async fn list_envelopes_by_label_returns_results() {
826        let store = Arc::new(Store::in_memory().await.unwrap());
827        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
828        let engine = SyncEngine::new(store.clone(), search.clone());
829
830        let account_id = AccountId::new();
831        store
832            .insert_account(&test_account(account_id.clone()))
833            .await
834            .unwrap();
835
836        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
837        engine.sync_account(&provider).await.unwrap();
838
839        let labels = store.list_labels_by_account(&account_id).await.unwrap();
840
841        // Find the INBOX label
842        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
843        assert!(
844            inbox_label.total_count > 0,
845            "Inbox should have messages after sync"
846        );
847
848        // Now query envelopes by that label
849        let envelopes = store
850            .list_envelopes_by_label(&inbox_label.id, 100, 0)
851            .await
852            .unwrap();
853
854        // Also check all envelopes (no label filter)
855        let all_envelopes = store
856            .list_envelopes_by_account(&account_id, 200, 0)
857            .await
858            .unwrap();
859
860        assert!(
861            !envelopes.is_empty(),
862            "list_envelopes_by_label should return messages for Inbox label (got 0). \
863             label_id={}, total_count={}, all_count={}",
864            inbox_label.id,
865            inbox_label.total_count,
866            all_envelopes.len()
867        );
868
869        // Inbox-by-label should have same or fewer messages than all
870        assert!(
871            envelopes.len() <= all_envelopes.len(),
872            "Inbox-by-label ({}) should be <= all ({})",
873            envelopes.len(),
874            all_envelopes.len()
875        );
876    }
877
878    #[tokio::test]
879    async fn list_envelopes_by_sent_label_may_be_empty() {
880        let store = Arc::new(Store::in_memory().await.unwrap());
881        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
882        let engine = SyncEngine::new(store.clone(), search.clone());
883
884        let account_id = AccountId::new();
885        store
886            .insert_account(&test_account(account_id.clone()))
887            .await
888            .unwrap();
889
890        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
891        engine.sync_account(&provider).await.unwrap();
892
893        let labels = store.list_labels_by_account(&account_id).await.unwrap();
894
895        // Find Sent label
896        let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
897
898        let envelopes = store
899            .list_envelopes_by_label(&sent_label.id, 100, 0)
900            .await
901            .unwrap();
902
903        // Sent has no messages in fake provider (no SENT flags set)
904        assert_eq!(
905            envelopes.len(),
906            0,
907            "Sent should have 0 messages in fake provider"
908        );
909
910        // But Inbox should still have messages
911        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
912        let inbox_envelopes = store
913            .list_envelopes_by_label(&inbox_label.id, 100, 0)
914            .await
915            .unwrap();
916        assert!(
917            !inbox_envelopes.is_empty(),
918            "Inbox should still have messages after querying Sent"
919        );
920
921        // And listing ALL envelopes (no label filter) should still work
922        let all_envelopes = store
923            .list_envelopes_by_account(&account_id, 100, 0)
924            .await
925            .unwrap();
926        assert!(
927            !all_envelopes.is_empty(),
928            "All envelopes should still be retrievable"
929        );
930    }
931
932    #[tokio::test]
933    async fn progressive_loading_chunks() {
934        let store = Arc::new(Store::in_memory().await.unwrap());
935        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
936        let engine = SyncEngine::new(store.clone(), search.clone());
937
938        let account_id = AccountId::new();
939        store
940            .insert_account(&test_account(account_id.clone()))
941            .await
942            .unwrap();
943
944        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
945        let count = engine.sync_account(&provider).await.unwrap();
946        assert_eq!(count, 55, "Sync should report 55 messages processed");
947
948        // Verify store has exactly 55 envelopes
949        let envelopes = store
950            .list_envelopes_by_account(&account_id, 200, 0)
951            .await
952            .unwrap();
953        assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
954
955        // Verify search index has results for known fixture terms
956        let results = search
957            .lock()
958            .await
959            .search("deployment", 10, 0, SortOrder::DateDesc)
960            .unwrap();
961        assert!(
962            !results.results.is_empty(),
963            "Search index should have 'deployment' results"
964        );
965    }
966
967    #[tokio::test]
968    async fn delta_sync_no_duplicate_labels() {
969        let store = Arc::new(Store::in_memory().await.unwrap());
970        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
971        let engine = SyncEngine::new(store.clone(), search.clone());
972
973        let account_id = AccountId::new();
974        store
975            .insert_account(&test_account(account_id.clone()))
976            .await
977            .unwrap();
978
979        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
980
981        // Initial sync
982        engine.sync_account(&provider).await.unwrap();
983
984        let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
985        let label_count_first = labels_after_first.len();
986
987        // Delta sync (should return 0 new messages)
988        let delta_count = engine.sync_account(&provider).await.unwrap();
989        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
990
991        let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
992
993        // Label rows should not be duplicated
994        assert_eq!(
995            label_count_first,
996            labels_after_second.len(),
997            "Label count should not change after delta sync"
998        );
999
1000        // Verify each label still exists with the correct provider_id
1001        for label in &labels_after_first {
1002            let still_exists = labels_after_second
1003                .iter()
1004                .any(|l| l.provider_id == label.provider_id && l.name == label.name);
1005            assert!(
1006                still_exists,
1007                "Label '{}' (provider_id='{}') should survive delta sync",
1008                label.name, label.provider_id
1009            );
1010        }
1011
1012        // Verify messages are still in the store (upsert_envelope uses INSERT OR REPLACE
1013        // on messages table, which is not affected by label cascade)
1014        let envelopes = store
1015            .list_envelopes_by_account(&account_id, 200, 0)
1016            .await
1017            .unwrap();
1018        assert_eq!(
1019            envelopes.len(),
1020            55,
1021            "All 55 messages should survive delta sync"
1022        );
1023    }
1024
1025    #[tokio::test]
1026    async fn delta_sync_preserves_junction_table() {
1027        let store = Arc::new(Store::in_memory().await.unwrap());
1028        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1029        let engine = SyncEngine::new(store.clone(), search.clone());
1030
1031        let account_id = AccountId::new();
1032        store
1033            .insert_account(&test_account(account_id.clone()))
1034            .await
1035            .unwrap();
1036
1037        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1038
1039        // Initial sync
1040        engine.sync_account(&provider).await.unwrap();
1041
1042        let junction_before = store.count_message_labels().await.unwrap();
1043        assert!(
1044            junction_before > 0,
1045            "Junction table should be populated after initial sync"
1046        );
1047
1048        // Delta sync (labels get re-upserted, no new messages)
1049        let delta_count = engine.sync_account(&provider).await.unwrap();
1050        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
1051
1052        let junction_after = store.count_message_labels().await.unwrap();
1053        assert_eq!(
1054            junction_before, junction_after,
1055            "Junction table should survive delta sync (before={}, after={})",
1056            junction_before, junction_after
1057        );
1058
1059        // Verify label filtering still works
1060        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1061        let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
1062        let envelopes = store
1063            .list_envelopes_by_label(&inbox.id, 100, 0)
1064            .await
1065            .unwrap();
1066        assert!(
1067            !envelopes.is_empty(),
1068            "Inbox should still return messages after delta sync"
1069        );
1070    }
1071
1072    #[tokio::test]
1073    async fn backfill_triggers_when_junction_empty() {
1074        let store = Arc::new(Store::in_memory().await.unwrap());
1075        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1076        let engine = SyncEngine::new(store.clone(), search.clone());
1077
1078        let account_id = AccountId::new();
1079        store
1080            .insert_account(&test_account(account_id.clone()))
1081            .await
1082            .unwrap();
1083
1084        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1085
1086        // Initial sync
1087        engine.sync_account(&provider).await.unwrap();
1088
1089        let junction_before = store.count_message_labels().await.unwrap();
1090        assert!(junction_before > 0);
1091
1092        // Wipe junction table manually (simulates corrupted DB)
1093        sqlx::query("DELETE FROM message_labels")
1094            .execute(store.writer())
1095            .await
1096            .unwrap();
1097
1098        let junction_wiped = store.count_message_labels().await.unwrap();
1099        assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1100
1101        // Sync again — should detect empty junction and backfill
1102        engine.sync_account(&provider).await.unwrap();
1103
1104        let junction_after = store.count_message_labels().await.unwrap();
1105        assert!(
1106            junction_after > 0,
1107            "Junction table should be repopulated after backfill (got {})",
1108            junction_after
1109        );
1110    }
1111
1112    #[tokio::test]
1113    async fn sync_label_resolution_matches_gmail_ids() {
1114        let store = Arc::new(Store::in_memory().await.unwrap());
1115        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1116        let engine = SyncEngine::new(store.clone(), search.clone());
1117
1118        let account_id = AccountId::new();
1119        store
1120            .insert_account(&test_account(account_id.clone()))
1121            .await
1122            .unwrap();
1123
1124        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1125        engine.sync_account(&provider).await.unwrap();
1126
1127        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1128
1129        // FakeProvider uses Gmail-style IDs: "INBOX", "SENT", "TRASH", etc.
1130        // Verify each label has a matching provider_id
1131        let expected_mappings = [
1132            ("Inbox", "INBOX"),
1133            ("Sent", "SENT"),
1134            ("Trash", "TRASH"),
1135            ("Spam", "SPAM"),
1136            ("Starred", "STARRED"),
1137            ("Work", "work"),
1138            ("Personal", "personal"),
1139            ("Newsletters", "newsletters"),
1140        ];
1141        for (name, expected_pid) in &expected_mappings {
1142            let label = labels.iter().find(|l| l.name == *name);
1143            assert!(label.is_some(), "Label '{}' should exist after sync", name);
1144            assert_eq!(
1145                label.unwrap().provider_id,
1146                *expected_pid,
1147                "Label '{}' should have provider_id '{}'",
1148                name,
1149                expected_pid
1150            );
1151        }
1152
1153        // For each message, verify junction table entries point to valid labels
1154        let envelopes = store
1155            .list_envelopes_by_account(&account_id, 200, 0)
1156            .await
1157            .unwrap();
1158        let label_ids: std::collections::HashSet<String> =
1159            labels.iter().map(|l| l.id.as_str().to_string()).collect();
1160
1161        for env in &envelopes {
1162            let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1163            for lid in &msg_label_ids {
1164                assert!(
1165                    label_ids.contains(&lid.as_str().to_string()),
1166                    "Junction entry for message {} points to nonexistent label {}",
1167                    env.id,
1168                    lid
1169                );
1170            }
1171        }
1172    }
1173
1174    #[tokio::test]
1175    async fn list_envelopes_by_each_label_returns_correct_count() {
1176        let store = Arc::new(Store::in_memory().await.unwrap());
1177        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1178        let engine = SyncEngine::new(store.clone(), search.clone());
1179
1180        let account_id = AccountId::new();
1181        store
1182            .insert_account(&test_account(account_id.clone()))
1183            .await
1184            .unwrap();
1185
1186        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1187        engine.sync_account(&provider).await.unwrap();
1188
1189        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1190        for label in &labels {
1191            if label.total_count > 0 {
1192                let envelopes = store
1193                    .list_envelopes_by_label(&label.id, 200, 0)
1194                    .await
1195                    .unwrap();
1196                assert_eq!(
1197                    envelopes.len(),
1198                    label.total_count as usize,
1199                    "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1200                    label.name,
1201                    label.provider_id,
1202                    label.total_count,
1203                    envelopes.len()
1204                );
1205            }
1206        }
1207    }
1208
1209    #[tokio::test]
1210    async fn search_index_consistent_with_store() {
1211        let store = Arc::new(Store::in_memory().await.unwrap());
1212        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1213        let engine = SyncEngine::new(store.clone(), search.clone());
1214
1215        let account_id = AccountId::new();
1216        store
1217            .insert_account(&test_account(account_id.clone()))
1218            .await
1219            .unwrap();
1220
1221        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1222        engine.sync_account(&provider).await.unwrap();
1223
1224        let envelopes = store
1225            .list_envelopes_by_account(&account_id, 200, 0)
1226            .await
1227            .unwrap();
1228
1229        let search_guard = search.lock().await;
1230        for env in &envelopes {
1231            // Extract a distinctive keyword from the subject
1232            let keyword = env
1233                .subject
1234                .split_whitespace()
1235                .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1236                .unwrap_or(&env.subject);
1237            let results = search_guard
1238                .search(keyword, 100, 0, SortOrder::DateDesc)
1239                .unwrap();
1240            assert!(
1241                results
1242                    .results
1243                    .iter()
1244                    .any(|r| r.message_id == env.id.as_str()),
1245                "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1246                env.id,
1247                env.subject,
1248                keyword
1249            );
1250        }
1251    }
1252
1253    #[tokio::test]
1254    async fn mutation_flags_persist_through_store() {
1255        let store = Arc::new(Store::in_memory().await.unwrap());
1256        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1257        let engine = SyncEngine::new(store.clone(), search.clone());
1258
1259        let account_id = AccountId::new();
1260        store
1261            .insert_account(&test_account(account_id.clone()))
1262            .await
1263            .unwrap();
1264
1265        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1266        engine.sync_account(&provider).await.unwrap();
1267
1268        let envelopes = store
1269            .list_envelopes_by_account(&account_id, 1, 0)
1270            .await
1271            .unwrap();
1272        let msg_id = &envelopes[0].id;
1273        let initial_flags = envelopes[0].flags;
1274
1275        // Set starred
1276        store.set_starred(msg_id, true).await.unwrap();
1277        store.set_read(msg_id, true).await.unwrap();
1278
1279        let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1280        assert!(
1281            updated.flags.contains(MessageFlags::STARRED),
1282            "STARRED flag should be set"
1283        );
1284        assert!(
1285            updated.flags.contains(MessageFlags::READ),
1286            "READ flag should be set"
1287        );
1288
1289        // Clear starred, keep read
1290        store.set_starred(msg_id, false).await.unwrap();
1291        let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1292        assert!(
1293            !updated2.flags.contains(MessageFlags::STARRED),
1294            "STARRED flag should be cleared after set_starred(false)"
1295        );
1296        assert!(
1297            updated2.flags.contains(MessageFlags::READ),
1298            "READ flag should still be set after clearing STARRED"
1299        );
1300
1301        // Verify initial flags were different (test is meaningful)
1302        // At least one flag mutation should have changed something
1303        let _ = initial_flags; // used to confirm the test exercises real mutations
1304    }
1305
1306    #[tokio::test]
1307    async fn junction_table_survives_message_update() {
1308        let store = Arc::new(Store::in_memory().await.unwrap());
1309        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1310        let engine = SyncEngine::new(store.clone(), search.clone());
1311
1312        let account_id = AccountId::new();
1313        store
1314            .insert_account(&test_account(account_id.clone()))
1315            .await
1316            .unwrap();
1317
1318        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1319        engine.sync_account(&provider).await.unwrap();
1320
1321        // Count junction rows for first message
1322        let envelopes = store
1323            .list_envelopes_by_account(&account_id, 1, 0)
1324            .await
1325            .unwrap();
1326        let msg_id = &envelopes[0].id;
1327
1328        let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1329        assert!(
1330            !junction_before.is_empty(),
1331            "Message should have label associations after sync"
1332        );
1333
1334        // Re-upsert the same envelope (simulates re-sync)
1335        store.upsert_envelope(&envelopes[0]).await.unwrap();
1336        // Re-set labels (same as sync engine does)
1337        store
1338            .set_message_labels(msg_id, &junction_before)
1339            .await
1340            .unwrap();
1341
1342        let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1343        assert_eq!(
1344            junction_before.len(),
1345            junction_after.len(),
1346            "Junction rows should not double after re-sync (before={}, after={})",
1347            junction_before.len(),
1348            junction_after.len()
1349        );
1350    }
1351
1352    #[tokio::test]
1353    async fn find_labels_by_provider_ids_with_unknown_ids() {
1354        let store = Arc::new(Store::in_memory().await.unwrap());
1355        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1356        let engine = SyncEngine::new(store.clone(), search.clone());
1357
1358        let account_id = AccountId::new();
1359        store
1360            .insert_account(&test_account(account_id.clone()))
1361            .await
1362            .unwrap();
1363
1364        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1365        engine.sync_account(&provider).await.unwrap();
1366
1367        let result = store
1368            .find_labels_by_provider_ids(
1369                &account_id,
1370                &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1371            )
1372            .await
1373            .unwrap();
1374
1375        assert_eq!(
1376            result.len(),
1377            1,
1378            "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1379        );
1380    }
1381
1382    #[tokio::test]
1383    async fn body_available_after_sync() {
1384        let store = Arc::new(Store::in_memory().await.unwrap());
1385        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1386        let engine = SyncEngine::new(store.clone(), search.clone());
1387
1388        let account_id = AccountId::new();
1389        store
1390            .insert_account(&test_account(account_id.clone()))
1391            .await
1392            .unwrap();
1393
1394        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1395        engine.sync_account(&provider).await.unwrap();
1396
1397        let envelopes = store
1398            .list_envelopes_by_account(&account_id, 1, 0)
1399            .await
1400            .unwrap();
1401        let msg_id = &envelopes[0].id;
1402
1403        // Body already available — stored eagerly during sync
1404        let body1 = engine.get_body(msg_id).await.unwrap();
1405        assert!(body1.text_plain.is_some(), "Body should have text_plain");
1406
1407        // Second read — same result from store
1408        let body2 = engine.get_body(msg_id).await.unwrap();
1409
1410        assert_eq!(
1411            body1.text_plain, body2.text_plain,
1412            "Body text_plain should be consistent"
1413        );
1414        assert_eq!(
1415            body1.text_html, body2.text_html,
1416            "Body text_html should be consistent"
1417        );
1418        assert_eq!(
1419            body1.attachments.len(),
1420            body2.attachments.len(),
1421            "Body attachments count should be consistent"
1422        );
1423    }
1424
1425    #[tokio::test]
1426    async fn gmail_not_found_cursor_resets_to_initial_and_recovers() {
1427        let store = Arc::new(Store::in_memory().await.unwrap());
1428        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1429        let engine = SyncEngine::new(store.clone(), search.clone());
1430
1431        let account_id = AccountId::new();
1432        store
1433            .insert_account(&test_account(account_id.clone()))
1434            .await
1435            .unwrap();
1436        store
1437            .set_sync_cursor(
1438                &account_id,
1439                &SyncCursor::Gmail {
1440                    history_id: 27_697_494,
1441                },
1442            )
1443            .await
1444            .unwrap();
1445
1446        let message_id = MessageId::new();
1447        let provider = RecoveringNotFoundProvider {
1448            account_id: account_id.clone(),
1449            message: SyncedMessage {
1450                envelope: Envelope {
1451                    id: message_id.clone(),
1452                    account_id: account_id.clone(),
1453                    provider_id: "recovered-1".into(),
1454                    thread_id: ThreadId::new(),
1455                    message_id_header: None,
1456                    in_reply_to: None,
1457                    references: vec![],
1458                    from: Address {
1459                        name: Some("Recovered".into()),
1460                        email: "recovered@example.com".into(),
1461                    },
1462                    to: vec![],
1463                    cc: vec![],
1464                    bcc: vec![],
1465                    subject: "Recovered after cursor reset".into(),
1466                    date: chrono::Utc::now(),
1467                    flags: MessageFlags::empty(),
1468                    snippet: "Recovered".into(),
1469                    has_attachments: false,
1470                    size_bytes: 42,
1471                    unsubscribe: UnsubscribeMethod::None,
1472                    label_provider_ids: vec![],
1473                },
1474                body: make_empty_body(&message_id),
1475            },
1476            calls: std::sync::Mutex::new(Vec::new()),
1477        };
1478
1479        let outcome = engine.sync_account_with_outcome(&provider).await.unwrap();
1480
1481        assert_eq!(outcome.synced_count, 1);
1482        let calls = provider.calls.lock().unwrap();
1483        assert_eq!(calls.len(), 2);
1484        assert!(matches!(
1485            calls[0],
1486            SyncCursor::Gmail {
1487                history_id: 27_697_494
1488            }
1489        ));
1490        assert!(matches!(calls[1], SyncCursor::Initial));
1491        let stored_cursor = store.get_sync_cursor(&account_id).await.unwrap();
1492        assert!(matches!(
1493            stored_cursor,
1494            Some(SyncCursor::Gmail { history_id: 22 })
1495        ));
1496    }
1497
1498    #[tokio::test]
1499    async fn recalculate_label_counts_matches_junction() {
1500        let store = Arc::new(Store::in_memory().await.unwrap());
1501        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1502        let engine = SyncEngine::new(store.clone(), search.clone());
1503
1504        let account_id = AccountId::new();
1505        store
1506            .insert_account(&test_account(account_id.clone()))
1507            .await
1508            .unwrap();
1509
1510        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1511        engine.sync_account(&provider).await.unwrap();
1512
1513        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1514
1515        for label in &labels {
1516            let lid = label.id.as_str();
1517            // Manually count junction rows for this label
1518            let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1519                "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1520            )
1521            .bind(&lid)
1522            .fetch_one(store.reader())
1523            .await
1524            .unwrap();
1525
1526            assert_eq!(
1527                label.total_count as i64, junction_count,
1528                "Label '{}' total_count ({}) should match junction row count ({})",
1529                label.name, label.total_count, junction_count
1530            );
1531
1532            // Also verify unread count
1533            let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1534                "SELECT COUNT(*) FROM message_labels ml \
1535                 JOIN messages m ON m.id = ml.message_id \
1536                 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1537            )
1538            .bind(&lid)
1539            .fetch_one(store.reader())
1540            .await
1541            .unwrap();
1542
1543            assert_eq!(
1544                label.unread_count as i64, unread_count,
1545                "Label '{}' unread_count ({}) should match computed unread count ({})",
1546                label.name, label.unread_count, unread_count
1547            );
1548        }
1549    }
1550}