Skip to main content

mxr_sync/
lib.rs

1mod engine;
2pub mod threading;
3pub use engine::SyncEngine;
4
5#[cfg(test)]
6mod tests {
7    use super::*;
8    use mxr_core::id::*;
9    use mxr_core::types::*;
10    use mxr_core::{MailSyncProvider, MxrError, SyncCapabilities};
11    use mxr_search::SearchIndex;
12    use mxr_store::Store;
13    use std::sync::Arc;
14    use tokio::sync::Mutex;
15
16    /// A provider that always returns errors from sync_messages, for testing error handling.
17    struct ErrorProvider {
18        account_id: AccountId,
19    }
20
21    #[async_trait::async_trait]
22    impl MailSyncProvider for ErrorProvider {
23        fn name(&self) -> &str {
24            "error"
25        }
26        fn account_id(&self) -> &AccountId {
27            &self.account_id
28        }
29        fn capabilities(&self) -> SyncCapabilities {
30            SyncCapabilities {
31                labels: false,
32                server_search: false,
33                delta_sync: false,
34                push: false,
35                batch_operations: false,
36                native_thread_ids: true,
37            }
38        }
39        async fn authenticate(&mut self) -> Result<(), MxrError> {
40            Ok(())
41        }
42        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
43            Ok(())
44        }
45        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
46            Ok(vec![])
47        }
48        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
49            Err(MxrError::Provider("simulated sync error".into()))
50        }
51        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
52            Err(MxrError::Provider("simulated attachment error".into()))
53        }
54        async fn modify_labels(
55            &self,
56            _id: &str,
57            _add: &[String],
58            _rm: &[String],
59        ) -> Result<(), MxrError> {
60            Err(MxrError::Provider("simulated error".into()))
61        }
62        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
63            Err(MxrError::Provider("simulated error".into()))
64        }
65        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
66            Err(MxrError::Provider("simulated error".into()))
67        }
68        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
69            Err(MxrError::Provider("simulated error".into()))
70        }
71    }
72
73    fn test_account(account_id: AccountId) -> mxr_core::Account {
74        mxr_core::Account {
75            id: account_id,
76            name: "Fake Account".to_string(),
77            email: "user@example.com".to_string(),
78            sync_backend: Some(BackendRef {
79                provider_kind: ProviderKind::Fake,
80                config_key: "fake".to_string(),
81            }),
82            send_backend: None,
83            enabled: true,
84        }
85    }
86
87    /// Provider that returns label_changes on delta sync for testing the label change code path.
88    struct DeltaLabelProvider {
89        account_id: AccountId,
90        messages: Vec<SyncedMessage>,
91        labels: Vec<Label>,
92        label_changes: Vec<LabelChange>,
93    }
94
95    impl DeltaLabelProvider {
96        fn new(
97            account_id: AccountId,
98            messages: Vec<Envelope>,
99            labels: Vec<Label>,
100            label_changes: Vec<LabelChange>,
101        ) -> Self {
102            let messages = messages
103                .into_iter()
104                .map(|env| SyncedMessage {
105                    body: make_empty_body(&env.id),
106                    envelope: env,
107                })
108                .collect();
109            Self {
110                account_id,
111                messages,
112                labels,
113                label_changes,
114            }
115        }
116    }
117
118    struct ThreadingProvider {
119        account_id: AccountId,
120        messages: Vec<SyncedMessage>,
121    }
122
123    #[async_trait::async_trait]
124    impl MailSyncProvider for ThreadingProvider {
125        fn name(&self) -> &str {
126            "threading"
127        }
128
129        fn account_id(&self) -> &AccountId {
130            &self.account_id
131        }
132
133        fn capabilities(&self) -> SyncCapabilities {
134            SyncCapabilities {
135                labels: false,
136                server_search: false,
137                delta_sync: false,
138                push: false,
139                batch_operations: false,
140                native_thread_ids: false,
141            }
142        }
143
144        async fn authenticate(&mut self) -> Result<(), MxrError> {
145            Ok(())
146        }
147
148        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
149            Ok(())
150        }
151
152        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
153            Ok(vec![])
154        }
155
156        async fn sync_messages(&self, _cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
157            Ok(SyncBatch {
158                upserted: self.messages.clone(),
159                deleted_provider_ids: vec![],
160                label_changes: vec![],
161                next_cursor: SyncCursor::Initial,
162            })
163        }
164
165        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
166            Err(MxrError::NotFound("no attachment".into()))
167        }
168
169        async fn modify_labels(
170            &self,
171            _id: &str,
172            _add: &[String],
173            _rm: &[String],
174        ) -> Result<(), MxrError> {
175            Ok(())
176        }
177
178        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
179            Ok(())
180        }
181
182        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
183            Ok(())
184        }
185
186        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
187            Ok(())
188        }
189    }
190
191    fn make_empty_body(message_id: &MessageId) -> MessageBody {
192        MessageBody {
193            message_id: message_id.clone(),
194            text_plain: Some("test body".to_string()),
195            text_html: None,
196            attachments: vec![],
197            fetched_at: chrono::Utc::now(),
198            metadata: MessageMetadata::default(),
199        }
200    }
201
202    #[async_trait::async_trait]
203    impl MailSyncProvider for DeltaLabelProvider {
204        fn name(&self) -> &str {
205            "delta-label"
206        }
207        fn account_id(&self) -> &AccountId {
208            &self.account_id
209        }
210        fn capabilities(&self) -> SyncCapabilities {
211            SyncCapabilities {
212                labels: true,
213                server_search: false,
214                delta_sync: true,
215                push: false,
216                batch_operations: false,
217                native_thread_ids: true,
218            }
219        }
220        async fn authenticate(&mut self) -> Result<(), MxrError> {
221            Ok(())
222        }
223        async fn refresh_auth(&mut self) -> Result<(), MxrError> {
224            Ok(())
225        }
226        async fn sync_labels(&self) -> Result<Vec<Label>, MxrError> {
227            Ok(self.labels.clone())
228        }
229        async fn sync_messages(&self, cursor: &SyncCursor) -> Result<SyncBatch, MxrError> {
230            match cursor {
231                SyncCursor::Initial => Ok(SyncBatch {
232                    upserted: self.messages.clone(),
233                    deleted_provider_ids: vec![],
234                    label_changes: vec![],
235                    next_cursor: SyncCursor::Gmail { history_id: 100 },
236                }),
237                _ => Ok(SyncBatch {
238                    upserted: vec![],
239                    deleted_provider_ids: vec![],
240                    label_changes: self.label_changes.clone(),
241                    next_cursor: SyncCursor::Gmail { history_id: 200 },
242                }),
243            }
244        }
245        async fn fetch_attachment(&self, _mid: &str, _aid: &str) -> Result<Vec<u8>, MxrError> {
246            Err(MxrError::NotFound("no attachment".into()))
247        }
248        async fn modify_labels(
249            &self,
250            _id: &str,
251            _add: &[String],
252            _rm: &[String],
253        ) -> Result<(), MxrError> {
254            Ok(())
255        }
256        async fn trash(&self, _id: &str) -> Result<(), MxrError> {
257            Ok(())
258        }
259        async fn set_read(&self, _id: &str, _read: bool) -> Result<(), MxrError> {
260            Ok(())
261        }
262        async fn set_starred(&self, _id: &str, _starred: bool) -> Result<(), MxrError> {
263            Ok(())
264        }
265    }
266
267    fn make_test_label(account_id: &AccountId, name: &str, provider_id: &str) -> Label {
268        Label {
269            id: LabelId::new(),
270            account_id: account_id.clone(),
271            name: name.to_string(),
272            kind: LabelKind::System,
273            color: None,
274            provider_id: provider_id.to_string(),
275            unread_count: 0,
276            total_count: 0,
277        }
278    }
279
280    fn make_test_envelope(
281        account_id: &AccountId,
282        provider_id: &str,
283        label_provider_ids: Vec<String>,
284    ) -> Envelope {
285        Envelope {
286            id: MessageId::new(),
287            account_id: account_id.clone(),
288            provider_id: provider_id.to_string(),
289            thread_id: ThreadId::new(),
290            message_id_header: None,
291            in_reply_to: None,
292            references: vec![],
293            from: mxr_core::Address {
294                name: Some("Test".to_string()),
295                email: "test@example.com".to_string(),
296            },
297            to: vec![],
298            cc: vec![],
299            bcc: vec![],
300            subject: "Test message".to_string(),
301            date: chrono::Utc::now(),
302            flags: MessageFlags::empty(),
303            snippet: "Test snippet".to_string(),
304            has_attachments: false,
305            size_bytes: 1000,
306            unsubscribe: UnsubscribeMethod::None,
307            label_provider_ids,
308        }
309    }
310
311    #[tokio::test]
312    async fn delta_sync_applies_label_additions() {
313        let store = Arc::new(Store::in_memory().await.unwrap());
314        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
315        let engine = SyncEngine::new(store.clone(), search.clone());
316
317        let account_id = AccountId::new();
318        store
319            .insert_account(&test_account(account_id.clone()))
320            .await
321            .unwrap();
322
323        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
324        let starred = make_test_label(&account_id, "Starred", "STARRED");
325        let labels = vec![inbox.clone(), starred.clone()];
326
327        let msg = make_test_envelope(&account_id, "prov-msg-1", vec!["INBOX".to_string()]);
328        let msg_provider_id = msg.provider_id.clone();
329
330        let provider = DeltaLabelProvider::new(
331            account_id.clone(),
332            vec![msg],
333            labels,
334            vec![LabelChange {
335                provider_message_id: msg_provider_id.clone(),
336                added_labels: vec!["STARRED".to_string()],
337                removed_labels: vec![],
338            }],
339        );
340
341        // Initial sync
342        engine.sync_account(&provider).await.unwrap();
343
344        // Verify msg has INBOX label
345        let msg_id = store
346            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
347            .await
348            .unwrap()
349            .unwrap();
350        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
351        assert!(labels_before.contains(&inbox.id));
352        assert!(!labels_before.contains(&starred.id));
353
354        // Delta sync — adds STARRED label
355        engine.sync_account(&provider).await.unwrap();
356
357        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
358        assert!(labels_after.contains(&inbox.id), "INBOX should still be present");
359        assert!(labels_after.contains(&starred.id), "STARRED should be added by delta");
360    }
361
362    #[tokio::test]
363    async fn sync_rethreads_messages_when_provider_lacks_native_thread_ids() {
364        let store = Arc::new(Store::in_memory().await.unwrap());
365        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
366        let engine = SyncEngine::new(store.clone(), search);
367
368        let account_id = AccountId::new();
369        store
370            .insert_account(&test_account(account_id.clone()))
371            .await
372            .unwrap();
373
374        let first_id = MessageId::new();
375        let second_id = MessageId::new();
376        let first = SyncedMessage {
377            envelope: Envelope {
378                id: first_id.clone(),
379                account_id: account_id.clone(),
380                provider_id: "prov-thread-1".into(),
381                thread_id: ThreadId::new(),
382                message_id_header: Some("<root@example.com>".into()),
383                in_reply_to: None,
384                references: vec![],
385                from: mxr_core::Address {
386                    name: Some("Alice".into()),
387                    email: "alice@example.com".into(),
388                },
389                to: vec![],
390                cc: vec![],
391                bcc: vec![],
392                subject: "Topic".into(),
393                date: chrono::Utc::now() - chrono::Duration::minutes(5),
394                flags: MessageFlags::empty(),
395                snippet: "first".into(),
396                has_attachments: false,
397                size_bytes: 100,
398                unsubscribe: UnsubscribeMethod::None,
399                label_provider_ids: vec![],
400            },
401            body: make_empty_body(&first_id),
402        };
403        let second = SyncedMessage {
404            envelope: Envelope {
405                id: second_id.clone(),
406                account_id: account_id.clone(),
407                provider_id: "prov-thread-2".into(),
408                thread_id: ThreadId::new(),
409                message_id_header: Some("<reply@example.com>".into()),
410                in_reply_to: Some("<root@example.com>".into()),
411                references: vec!["<root@example.com>".into()],
412                from: mxr_core::Address {
413                    name: Some("Bob".into()),
414                    email: "bob@example.com".into(),
415                },
416                to: vec![],
417                cc: vec![],
418                bcc: vec![],
419                subject: "Re: Topic".into(),
420                date: chrono::Utc::now(),
421                flags: MessageFlags::empty(),
422                snippet: "second".into(),
423                has_attachments: false,
424                size_bytes: 100,
425                unsubscribe: UnsubscribeMethod::None,
426                label_provider_ids: vec![],
427            },
428            body: make_empty_body(&second_id),
429        };
430
431        let provider = ThreadingProvider {
432            account_id: account_id.clone(),
433            messages: vec![first, second],
434        };
435
436        engine.sync_account(&provider).await.unwrap();
437
438        let first_env = store.get_envelope(&first_id).await.unwrap().unwrap();
439        let second_env = store.get_envelope(&second_id).await.unwrap().unwrap();
440        assert_eq!(first_env.thread_id, second_env.thread_id);
441
442        let thread = store
443            .get_thread(&first_env.thread_id)
444            .await
445            .unwrap()
446            .unwrap();
447        assert_eq!(thread.message_count, 2);
448    }
449
450    #[tokio::test]
451    async fn delta_sync_applies_label_removals() {
452        let store = Arc::new(Store::in_memory().await.unwrap());
453        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
454        let engine = SyncEngine::new(store.clone(), search.clone());
455
456        let account_id = AccountId::new();
457        store
458            .insert_account(&test_account(account_id.clone()))
459            .await
460            .unwrap();
461
462        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
463        let starred = make_test_label(&account_id, "Starred", "STARRED");
464        let labels = vec![inbox.clone(), starred.clone()];
465
466        let msg = make_test_envelope(
467            &account_id,
468            "prov-msg-2",
469            vec!["INBOX".to_string(), "STARRED".to_string()],
470        );
471        let msg_provider_id = msg.provider_id.clone();
472
473        let provider = DeltaLabelProvider::new(
474            account_id.clone(),
475            vec![msg],
476            labels,
477            vec![LabelChange {
478                provider_message_id: msg_provider_id.clone(),
479                added_labels: vec![],
480                removed_labels: vec!["STARRED".to_string()],
481            }],
482        );
483
484        // Initial sync
485        engine.sync_account(&provider).await.unwrap();
486
487        let msg_id = store
488            .get_message_id_by_provider_id(&account_id, &msg_provider_id)
489            .await
490            .unwrap()
491            .unwrap();
492        let labels_before = store.get_message_label_ids(&msg_id).await.unwrap();
493        assert!(
494            labels_before.contains(&starred.id),
495            "STARRED should be present after initial sync"
496        );
497
498        // Delta sync — removes STARRED
499        engine.sync_account(&provider).await.unwrap();
500
501        let labels_after = store.get_message_label_ids(&msg_id).await.unwrap();
502        assert!(labels_after.contains(&inbox.id), "INBOX should remain");
503        assert!(
504            !labels_after.contains(&starred.id),
505            "STARRED should be removed by delta"
506        );
507    }
508
509    #[tokio::test]
510    async fn delta_sync_handles_unknown_provider_message() {
511        let store = Arc::new(Store::in_memory().await.unwrap());
512        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
513        let engine = SyncEngine::new(store.clone(), search.clone());
514
515        let account_id = AccountId::new();
516        store
517            .insert_account(&test_account(account_id.clone()))
518            .await
519            .unwrap();
520
521        let inbox = make_test_label(&account_id, "Inbox", "INBOX");
522        let labels = vec![inbox.clone()];
523
524        let msg = make_test_envelope(&account_id, "prov-msg-3", vec!["INBOX".to_string()]);
525
526        let provider = DeltaLabelProvider::new(
527            account_id.clone(),
528            vec![msg],
529            labels,
530            vec![LabelChange {
531                // This provider_message_id doesn't exist in our store
532                provider_message_id: "nonexistent-msg".to_string(),
533                added_labels: vec!["INBOX".to_string()],
534                removed_labels: vec![],
535            }],
536        );
537
538        // Initial sync
539        engine.sync_account(&provider).await.unwrap();
540
541        // Delta sync — should not crash on unknown message
542        let result = engine.sync_account(&provider).await;
543        assert!(
544            result.is_ok(),
545            "Delta sync should gracefully skip unknown messages"
546        );
547    }
548
549    #[tokio::test]
550    async fn sync_populates_store_and_search() {
551        let store = Arc::new(Store::in_memory().await.unwrap());
552        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
553        let engine = SyncEngine::new(store.clone(), search.clone());
554
555        let account_id = AccountId::new();
556        store
557            .insert_account(&test_account(account_id.clone()))
558            .await
559            .unwrap();
560
561        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
562        let count = engine.sync_account(&provider).await.unwrap();
563        assert_eq!(count, 55);
564
565        // Verify store
566        let envelopes = store
567            .list_envelopes_by_account(&account_id, 100, 0)
568            .await
569            .unwrap();
570        assert_eq!(envelopes.len(), 55);
571
572        // Verify search
573        let results = search.lock().await.search("deployment", 10).unwrap();
574        assert!(!results.is_empty());
575    }
576
577    #[tokio::test]
578    async fn bodies_stored_eagerly_during_sync() {
579        let store = Arc::new(Store::in_memory().await.unwrap());
580        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
581        let engine = SyncEngine::new(store.clone(), search.clone());
582
583        let account_id = AccountId::new();
584        store
585            .insert_account(&test_account(account_id.clone()))
586            .await
587            .unwrap();
588
589        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
590        engine.sync_account(&provider).await.unwrap();
591
592        // Get first message
593        let envelopes = store
594            .list_envelopes_by_account(&account_id, 1, 0)
595            .await
596            .unwrap();
597        let msg_id = &envelopes[0].id;
598
599        // Body should already be in store — fetched eagerly during sync
600        let body = engine.get_body(msg_id).await.unwrap();
601        assert!(body.text_plain.is_some());
602
603        // Second read — same result
604        let body2 = engine.get_body(msg_id).await.unwrap();
605        assert_eq!(body.text_plain, body2.text_plain);
606    }
607
608    #[tokio::test]
609    async fn snooze_wake() {
610        let store = Arc::new(Store::in_memory().await.unwrap());
611        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
612        let engine = SyncEngine::new(store.clone(), search.clone());
613
614        let account_id = AccountId::new();
615        store
616            .insert_account(&test_account(account_id.clone()))
617            .await
618            .unwrap();
619
620        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
621        engine.sync_account(&provider).await.unwrap();
622
623        // Get a message to snooze
624        let envelopes = store
625            .list_envelopes_by_account(&account_id, 1, 0)
626            .await
627            .unwrap();
628
629        let snoozed = Snoozed {
630            message_id: envelopes[0].id.clone(),
631            account_id: account_id.clone(),
632            snoozed_at: chrono::Utc::now(),
633            wake_at: chrono::Utc::now() - chrono::Duration::hours(1),
634            original_labels: vec![],
635        };
636        store.insert_snooze(&snoozed).await.unwrap();
637
638        let woken = engine.check_snoozes().await.unwrap();
639        assert_eq!(woken.len(), 1);
640
641        // Should be gone now
642        let woken2 = engine.check_snoozes().await.unwrap();
643        assert_eq!(woken2.len(), 0);
644    }
645
646    #[tokio::test]
647    async fn cursor_persistence() {
648        let store = Arc::new(Store::in_memory().await.unwrap());
649        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
650        let engine = SyncEngine::new(store.clone(), search.clone());
651
652        let account_id = AccountId::new();
653        store
654            .insert_account(&test_account(account_id.clone()))
655            .await
656            .unwrap();
657
658        // Before sync, cursor should be None
659        let cursor_before = store.get_sync_cursor(&account_id).await.unwrap();
660        assert!(cursor_before.is_none());
661
662        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
663        engine.sync_account(&provider).await.unwrap();
664
665        // After sync, cursor should match FakeProvider's next_cursor (Gmail { history_id: 1 })
666        let cursor_after = store.get_sync_cursor(&account_id).await.unwrap();
667        assert!(cursor_after.is_some(), "Cursor should be set after sync");
668        let cursor_json = serde_json::to_string(&cursor_after.unwrap()).unwrap();
669        assert!(
670            cursor_json.contains("Gmail") && cursor_json.contains("1"),
671            "Cursor should be Gmail {{ history_id: 1 }}, got: {}",
672            cursor_json
673        );
674    }
675
676    #[tokio::test]
677    async fn sync_error_does_not_crash() {
678        let store = Arc::new(Store::in_memory().await.unwrap());
679        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
680        let engine = SyncEngine::new(store.clone(), search.clone());
681
682        let account_id = AccountId::new();
683        store
684            .insert_account(&test_account(account_id.clone()))
685            .await
686            .unwrap();
687
688        let error_provider = ErrorProvider {
689            account_id: account_id.clone(),
690        };
691
692        // Should return Err, not panic
693        let result = engine.sync_account(&error_provider).await;
694        assert!(
695            result.is_err(),
696            "sync_account should return Err for failing provider"
697        );
698        let err_msg = result.unwrap_err().to_string();
699        assert!(
700            err_msg.contains("simulated"),
701            "Error should contain provider message, got: {}",
702            err_msg
703        );
704    }
705
706    #[tokio::test]
707    async fn label_counts_after_sync() {
708        let store = Arc::new(Store::in_memory().await.unwrap());
709        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
710        let engine = SyncEngine::new(store.clone(), search.clone());
711
712        let account_id = AccountId::new();
713        store
714            .insert_account(&test_account(account_id.clone()))
715            .await
716            .unwrap();
717
718        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
719        engine.sync_account(&provider).await.unwrap();
720
721        let labels = store.list_labels_by_account(&account_id).await.unwrap();
722        assert!(!labels.is_empty(), "Should have labels after sync");
723
724        let has_counts = labels
725            .iter()
726            .any(|l| l.total_count > 0 || l.unread_count > 0);
727        assert!(
728            has_counts,
729            "At least one label should have non-zero counts after sync"
730        );
731    }
732
733    #[tokio::test]
734    async fn list_envelopes_by_label_returns_results() {
735        let store = Arc::new(Store::in_memory().await.unwrap());
736        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
737        let engine = SyncEngine::new(store.clone(), search.clone());
738
739        let account_id = AccountId::new();
740        store
741            .insert_account(&test_account(account_id.clone()))
742            .await
743            .unwrap();
744
745        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
746        engine.sync_account(&provider).await.unwrap();
747
748        let labels = store.list_labels_by_account(&account_id).await.unwrap();
749
750        // Find the INBOX label
751        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
752        assert!(
753            inbox_label.total_count > 0,
754            "Inbox should have messages after sync"
755        );
756
757        // Now query envelopes by that label
758        let envelopes = store
759            .list_envelopes_by_label(&inbox_label.id, 100, 0)
760            .await
761            .unwrap();
762
763        // Also check all envelopes (no label filter)
764        let all_envelopes = store
765            .list_envelopes_by_account(&account_id, 200, 0)
766            .await
767            .unwrap();
768
769        assert!(
770            !envelopes.is_empty(),
771            "list_envelopes_by_label should return messages for Inbox label (got 0). \
772             label_id={}, total_count={}, all_count={}",
773            inbox_label.id,
774            inbox_label.total_count,
775            all_envelopes.len()
776        );
777
778        // Inbox-by-label should have same or fewer messages than all
779        assert!(
780            envelopes.len() <= all_envelopes.len(),
781            "Inbox-by-label ({}) should be <= all ({})",
782            envelopes.len(),
783            all_envelopes.len()
784        );
785    }
786
787    #[tokio::test]
788    async fn list_envelopes_by_sent_label_may_be_empty() {
789        let store = Arc::new(Store::in_memory().await.unwrap());
790        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
791        let engine = SyncEngine::new(store.clone(), search.clone());
792
793        let account_id = AccountId::new();
794        store
795            .insert_account(&test_account(account_id.clone()))
796            .await
797            .unwrap();
798
799        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
800        engine.sync_account(&provider).await.unwrap();
801
802        let labels = store.list_labels_by_account(&account_id).await.unwrap();
803
804        // Find Sent label
805        let sent_label = labels.iter().find(|l| l.name == "Sent").unwrap();
806
807        let envelopes = store
808            .list_envelopes_by_label(&sent_label.id, 100, 0)
809            .await
810            .unwrap();
811
812        // Sent has no messages in fake provider (no SENT flags set)
813        assert_eq!(
814            envelopes.len(),
815            0,
816            "Sent should have 0 messages in fake provider"
817        );
818
819        // But Inbox should still have messages
820        let inbox_label = labels.iter().find(|l| l.name == "Inbox").unwrap();
821        let inbox_envelopes = store
822            .list_envelopes_by_label(&inbox_label.id, 100, 0)
823            .await
824            .unwrap();
825        assert!(
826            !inbox_envelopes.is_empty(),
827            "Inbox should still have messages after querying Sent"
828        );
829
830        // And listing ALL envelopes (no label filter) should still work
831        let all_envelopes = store
832            .list_envelopes_by_account(&account_id, 100, 0)
833            .await
834            .unwrap();
835        assert!(
836            !all_envelopes.is_empty(),
837            "All envelopes should still be retrievable"
838        );
839    }
840
841    #[tokio::test]
842    async fn progressive_loading_chunks() {
843        let store = Arc::new(Store::in_memory().await.unwrap());
844        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
845        let engine = SyncEngine::new(store.clone(), search.clone());
846
847        let account_id = AccountId::new();
848        store
849            .insert_account(&test_account(account_id.clone()))
850            .await
851            .unwrap();
852
853        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
854        let count = engine.sync_account(&provider).await.unwrap();
855        assert_eq!(count, 55, "Sync should report 55 messages processed");
856
857        // Verify store has exactly 55 envelopes
858        let envelopes = store
859            .list_envelopes_by_account(&account_id, 200, 0)
860            .await
861            .unwrap();
862        assert_eq!(envelopes.len(), 55, "Store should contain 55 envelopes");
863
864        // Verify search index has results for known fixture terms
865        let results = search.lock().await.search("deployment", 10).unwrap();
866        assert!(
867            !results.is_empty(),
868            "Search index should have 'deployment' results"
869        );
870    }
871
872    #[tokio::test]
873    async fn delta_sync_no_duplicate_labels() {
874        let store = Arc::new(Store::in_memory().await.unwrap());
875        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
876        let engine = SyncEngine::new(store.clone(), search.clone());
877
878        let account_id = AccountId::new();
879        store
880            .insert_account(&test_account(account_id.clone()))
881            .await
882            .unwrap();
883
884        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
885
886        // Initial sync
887        engine.sync_account(&provider).await.unwrap();
888
889        let labels_after_first = store.list_labels_by_account(&account_id).await.unwrap();
890        let label_count_first = labels_after_first.len();
891
892        // Delta sync (should return 0 new messages)
893        let delta_count = engine.sync_account(&provider).await.unwrap();
894        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
895
896        let labels_after_second = store.list_labels_by_account(&account_id).await.unwrap();
897
898        // Label rows should not be duplicated
899        assert_eq!(
900            label_count_first,
901            labels_after_second.len(),
902            "Label count should not change after delta sync"
903        );
904
905        // Verify each label still exists with the correct provider_id
906        for label in &labels_after_first {
907            let still_exists = labels_after_second
908                .iter()
909                .any(|l| l.provider_id == label.provider_id && l.name == label.name);
910            assert!(
911                still_exists,
912                "Label '{}' (provider_id='{}') should survive delta sync",
913                label.name, label.provider_id
914            );
915        }
916
917        // Verify messages are still in the store (upsert_envelope uses INSERT OR REPLACE
918        // on messages table, which is not affected by label cascade)
919        let envelopes = store
920            .list_envelopes_by_account(&account_id, 200, 0)
921            .await
922            .unwrap();
923        assert_eq!(
924            envelopes.len(),
925            55,
926            "All 55 messages should survive delta sync"
927        );
928    }
929
930    #[tokio::test]
931    async fn delta_sync_preserves_junction_table() {
932        let store = Arc::new(Store::in_memory().await.unwrap());
933        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
934        let engine = SyncEngine::new(store.clone(), search.clone());
935
936        let account_id = AccountId::new();
937        store
938            .insert_account(&test_account(account_id.clone()))
939            .await
940            .unwrap();
941
942        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
943
944        // Initial sync
945        engine.sync_account(&provider).await.unwrap();
946
947        let junction_before = store.count_message_labels().await.unwrap();
948        assert!(
949            junction_before > 0,
950            "Junction table should be populated after initial sync"
951        );
952
953        // Delta sync (labels get re-upserted, no new messages)
954        let delta_count = engine.sync_account(&provider).await.unwrap();
955        assert_eq!(delta_count, 0, "Delta sync should return 0 new messages");
956
957        let junction_after = store.count_message_labels().await.unwrap();
958        assert_eq!(
959            junction_before, junction_after,
960            "Junction table should survive delta sync (before={}, after={})",
961            junction_before, junction_after
962        );
963
964        // Verify label filtering still works
965        let labels = store.list_labels_by_account(&account_id).await.unwrap();
966        let inbox = labels.iter().find(|l| l.name == "Inbox").unwrap();
967        let envelopes = store
968            .list_envelopes_by_label(&inbox.id, 100, 0)
969            .await
970            .unwrap();
971        assert!(
972            !envelopes.is_empty(),
973            "Inbox should still return messages after delta sync"
974        );
975    }
976
977    #[tokio::test]
978    async fn backfill_triggers_when_junction_empty() {
979        let store = Arc::new(Store::in_memory().await.unwrap());
980        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
981        let engine = SyncEngine::new(store.clone(), search.clone());
982
983        let account_id = AccountId::new();
984        store
985            .insert_account(&test_account(account_id.clone()))
986            .await
987            .unwrap();
988
989        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
990
991        // Initial sync
992        engine.sync_account(&provider).await.unwrap();
993
994        let junction_before = store.count_message_labels().await.unwrap();
995        assert!(junction_before > 0);
996
997        // Wipe junction table manually (simulates corrupted DB)
998        sqlx::query("DELETE FROM message_labels")
999            .execute(store.writer())
1000            .await
1001            .unwrap();
1002
1003        let junction_wiped = store.count_message_labels().await.unwrap();
1004        assert_eq!(junction_wiped, 0, "Junction should be empty after wipe");
1005
1006        // Sync again — should detect empty junction and backfill
1007        engine.sync_account(&provider).await.unwrap();
1008
1009        let junction_after = store.count_message_labels().await.unwrap();
1010        assert!(
1011            junction_after > 0,
1012            "Junction table should be repopulated after backfill (got {})",
1013            junction_after
1014        );
1015    }
1016
1017    #[tokio::test]
1018    async fn sync_label_resolution_matches_gmail_ids() {
1019        let store = Arc::new(Store::in_memory().await.unwrap());
1020        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1021        let engine = SyncEngine::new(store.clone(), search.clone());
1022
1023        let account_id = AccountId::new();
1024        store
1025            .insert_account(&test_account(account_id.clone()))
1026            .await
1027            .unwrap();
1028
1029        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1030        engine.sync_account(&provider).await.unwrap();
1031
1032        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1033
1034        // FakeProvider uses Gmail-style IDs: "INBOX", "SENT", "TRASH", etc.
1035        // Verify each label has a matching provider_id
1036        let expected_mappings = [
1037            ("Inbox", "INBOX"),
1038            ("Sent", "SENT"),
1039            ("Trash", "TRASH"),
1040            ("Spam", "SPAM"),
1041            ("Starred", "STARRED"),
1042            ("Work", "work"),
1043            ("Personal", "personal"),
1044            ("Newsletters", "newsletters"),
1045        ];
1046        for (name, expected_pid) in &expected_mappings {
1047            let label = labels.iter().find(|l| l.name == *name);
1048            assert!(label.is_some(), "Label '{}' should exist after sync", name);
1049            assert_eq!(
1050                label.unwrap().provider_id,
1051                *expected_pid,
1052                "Label '{}' should have provider_id '{}'",
1053                name,
1054                expected_pid
1055            );
1056        }
1057
1058        // For each message, verify junction table entries point to valid labels
1059        let envelopes = store
1060            .list_envelopes_by_account(&account_id, 200, 0)
1061            .await
1062            .unwrap();
1063        let label_ids: std::collections::HashSet<String> =
1064            labels.iter().map(|l| l.id.as_str().to_string()).collect();
1065
1066        for env in &envelopes {
1067            let msg_label_ids = store.get_message_label_ids(&env.id).await.unwrap();
1068            for lid in &msg_label_ids {
1069                assert!(
1070                    label_ids.contains(&lid.as_str().to_string()),
1071                    "Junction entry for message {} points to nonexistent label {}",
1072                    env.id,
1073                    lid
1074                );
1075            }
1076        }
1077    }
1078
1079    #[tokio::test]
1080    async fn list_envelopes_by_each_label_returns_correct_count() {
1081        let store = Arc::new(Store::in_memory().await.unwrap());
1082        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1083        let engine = SyncEngine::new(store.clone(), search.clone());
1084
1085        let account_id = AccountId::new();
1086        store
1087            .insert_account(&test_account(account_id.clone()))
1088            .await
1089            .unwrap();
1090
1091        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1092        engine.sync_account(&provider).await.unwrap();
1093
1094        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1095        for label in &labels {
1096            if label.total_count > 0 {
1097                let envelopes = store
1098                    .list_envelopes_by_label(&label.id, 200, 0)
1099                    .await
1100                    .unwrap();
1101                assert_eq!(
1102                    envelopes.len(),
1103                    label.total_count as usize,
1104                    "Label '{}' (provider_id='{}') has total_count={} but list_envelopes_by_label returned {}",
1105                    label.name,
1106                    label.provider_id,
1107                    label.total_count,
1108                    envelopes.len()
1109                );
1110            }
1111        }
1112    }
1113
1114    #[tokio::test]
1115    async fn search_index_consistent_with_store() {
1116        let store = Arc::new(Store::in_memory().await.unwrap());
1117        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1118        let engine = SyncEngine::new(store.clone(), search.clone());
1119
1120        let account_id = AccountId::new();
1121        store
1122            .insert_account(&test_account(account_id.clone()))
1123            .await
1124            .unwrap();
1125
1126        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1127        engine.sync_account(&provider).await.unwrap();
1128
1129        let envelopes = store
1130            .list_envelopes_by_account(&account_id, 200, 0)
1131            .await
1132            .unwrap();
1133
1134        let search_guard = search.lock().await;
1135        for env in &envelopes {
1136            // Extract a distinctive keyword from the subject
1137            let keyword = env
1138                .subject
1139                .split_whitespace()
1140                .find(|w| w.len() > 3 && w.chars().all(|c| c.is_alphanumeric()))
1141                .unwrap_or(&env.subject);
1142            let results = search_guard.search(keyword, 100).unwrap();
1143            assert!(
1144                results.iter().any(|r| r.message_id == env.id.as_str()),
1145                "Envelope '{}' (subject='{}') should be findable by keyword '{}' in search index",
1146                env.id,
1147                env.subject,
1148                keyword
1149            );
1150        }
1151    }
1152
1153    #[tokio::test]
1154    async fn mutation_flags_persist_through_store() {
1155        let store = Arc::new(Store::in_memory().await.unwrap());
1156        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1157        let engine = SyncEngine::new(store.clone(), search.clone());
1158
1159        let account_id = AccountId::new();
1160        store
1161            .insert_account(&test_account(account_id.clone()))
1162            .await
1163            .unwrap();
1164
1165        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1166        engine.sync_account(&provider).await.unwrap();
1167
1168        let envelopes = store
1169            .list_envelopes_by_account(&account_id, 1, 0)
1170            .await
1171            .unwrap();
1172        let msg_id = &envelopes[0].id;
1173        let initial_flags = envelopes[0].flags;
1174
1175        // Set starred
1176        store.set_starred(msg_id, true).await.unwrap();
1177        store.set_read(msg_id, true).await.unwrap();
1178
1179        let updated = store.get_envelope(msg_id).await.unwrap().unwrap();
1180        assert!(
1181            updated.flags.contains(MessageFlags::STARRED),
1182            "STARRED flag should be set"
1183        );
1184        assert!(
1185            updated.flags.contains(MessageFlags::READ),
1186            "READ flag should be set"
1187        );
1188
1189        // Clear starred, keep read
1190        store.set_starred(msg_id, false).await.unwrap();
1191        let updated2 = store.get_envelope(msg_id).await.unwrap().unwrap();
1192        assert!(
1193            !updated2.flags.contains(MessageFlags::STARRED),
1194            "STARRED flag should be cleared after set_starred(false)"
1195        );
1196        assert!(
1197            updated2.flags.contains(MessageFlags::READ),
1198            "READ flag should still be set after clearing STARRED"
1199        );
1200
1201        // Verify initial flags were different (test is meaningful)
1202        // At least one flag mutation should have changed something
1203        let _ = initial_flags; // used to confirm the test exercises real mutations
1204    }
1205
1206    #[tokio::test]
1207    async fn junction_table_survives_message_update() {
1208        let store = Arc::new(Store::in_memory().await.unwrap());
1209        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1210        let engine = SyncEngine::new(store.clone(), search.clone());
1211
1212        let account_id = AccountId::new();
1213        store
1214            .insert_account(&test_account(account_id.clone()))
1215            .await
1216            .unwrap();
1217
1218        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1219        engine.sync_account(&provider).await.unwrap();
1220
1221        // Count junction rows for first message
1222        let envelopes = store
1223            .list_envelopes_by_account(&account_id, 1, 0)
1224            .await
1225            .unwrap();
1226        let msg_id = &envelopes[0].id;
1227
1228        let junction_before = store.get_message_label_ids(msg_id).await.unwrap();
1229        assert!(
1230            !junction_before.is_empty(),
1231            "Message should have label associations after sync"
1232        );
1233
1234        // Re-upsert the same envelope (simulates re-sync)
1235        store.upsert_envelope(&envelopes[0]).await.unwrap();
1236        // Re-set labels (same as sync engine does)
1237        store
1238            .set_message_labels(msg_id, &junction_before)
1239            .await
1240            .unwrap();
1241
1242        let junction_after = store.get_message_label_ids(msg_id).await.unwrap();
1243        assert_eq!(
1244            junction_before.len(),
1245            junction_after.len(),
1246            "Junction rows should not double after re-sync (before={}, after={})",
1247            junction_before.len(),
1248            junction_after.len()
1249        );
1250    }
1251
1252    #[tokio::test]
1253    async fn find_labels_by_provider_ids_with_unknown_ids() {
1254        let store = Arc::new(Store::in_memory().await.unwrap());
1255        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1256        let engine = SyncEngine::new(store.clone(), search.clone());
1257
1258        let account_id = AccountId::new();
1259        store
1260            .insert_account(&test_account(account_id.clone()))
1261            .await
1262            .unwrap();
1263
1264        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1265        engine.sync_account(&provider).await.unwrap();
1266
1267        let result = store
1268            .find_labels_by_provider_ids(
1269                &account_id,
1270                &["INBOX".to_string(), "NONEXISTENT_LABEL".to_string()],
1271            )
1272            .await
1273            .unwrap();
1274
1275        assert_eq!(
1276            result.len(),
1277            1,
1278            "Should only return 1 result for INBOX, not 2 (NONEXISTENT_LABEL should be ignored)"
1279        );
1280    }
1281
1282    #[tokio::test]
1283    async fn body_available_after_sync() {
1284        let store = Arc::new(Store::in_memory().await.unwrap());
1285        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1286        let engine = SyncEngine::new(store.clone(), search.clone());
1287
1288        let account_id = AccountId::new();
1289        store
1290            .insert_account(&test_account(account_id.clone()))
1291            .await
1292            .unwrap();
1293
1294        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1295        engine.sync_account(&provider).await.unwrap();
1296
1297        let envelopes = store
1298            .list_envelopes_by_account(&account_id, 1, 0)
1299            .await
1300            .unwrap();
1301        let msg_id = &envelopes[0].id;
1302
1303        // Body already available — stored eagerly during sync
1304        let body1 = engine.get_body(msg_id).await.unwrap();
1305        assert!(body1.text_plain.is_some(), "Body should have text_plain");
1306
1307        // Second read — same result from store
1308        let body2 = engine.get_body(msg_id).await.unwrap();
1309
1310        assert_eq!(
1311            body1.text_plain, body2.text_plain,
1312            "Body text_plain should be consistent"
1313        );
1314        assert_eq!(
1315            body1.text_html, body2.text_html,
1316            "Body text_html should be consistent"
1317        );
1318        assert_eq!(
1319            body1.attachments.len(),
1320            body2.attachments.len(),
1321            "Body attachments count should be consistent"
1322        );
1323    }
1324
1325    #[tokio::test]
1326    async fn recalculate_label_counts_matches_junction() {
1327        let store = Arc::new(Store::in_memory().await.unwrap());
1328        let search = Arc::new(Mutex::new(SearchIndex::in_memory().unwrap()));
1329        let engine = SyncEngine::new(store.clone(), search.clone());
1330
1331        let account_id = AccountId::new();
1332        store
1333            .insert_account(&test_account(account_id.clone()))
1334            .await
1335            .unwrap();
1336
1337        let provider = mxr_provider_fake::FakeProvider::new(account_id.clone());
1338        engine.sync_account(&provider).await.unwrap();
1339
1340        let labels = store.list_labels_by_account(&account_id).await.unwrap();
1341
1342        for label in &labels {
1343            let lid = label.id.as_str();
1344            // Manually count junction rows for this label
1345            let junction_count: i64 = sqlx::query_scalar::<_, i64>(
1346                "SELECT COUNT(*) FROM message_labels WHERE label_id = ?",
1347            )
1348            .bind(&lid)
1349            .fetch_one(store.reader())
1350            .await
1351            .unwrap();
1352
1353            assert_eq!(
1354                label.total_count as i64, junction_count,
1355                "Label '{}' total_count ({}) should match junction row count ({})",
1356                label.name, label.total_count, junction_count
1357            );
1358
1359            // Also verify unread count
1360            let unread_count: i64 = sqlx::query_scalar::<_, i64>(
1361                "SELECT COUNT(*) FROM message_labels ml \
1362                 JOIN messages m ON m.id = ml.message_id \
1363                 WHERE ml.label_id = ? AND (m.flags & 1) = 0",
1364            )
1365            .bind(&lid)
1366            .fetch_one(store.reader())
1367            .await
1368            .unwrap();
1369
1370            assert_eq!(
1371                label.unread_count as i64, unread_count,
1372                "Label '{}' unread_count ({}) should match computed unread count ({})",
1373                label.name, label.unread_count, unread_count
1374            );
1375        }
1376    }
1377}