Skip to main content

mxr_sync/
engine.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13    pub synced_count: u32,
14    pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18    store: Arc<Store>,
19    search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23    pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24        Self { store, search }
25    }
26
27    pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28        Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29    }
30
31    pub async fn sync_account_with_outcome(
32        &self,
33        provider: &dyn MailSyncProvider,
34    ) -> Result<SyncOutcome, MxrError> {
35        let account_id = provider.account_id();
36        let mut recovered_gmail_cursor_not_found = false;
37
38        loop {
39            let cursor = self
40                .store
41                .get_sync_cursor(account_id)
42                .await
43                .map_err(|e| MxrError::Store(e.to_string()))?
44                .unwrap_or(SyncCursor::Initial);
45
46            // Sync labels — skip during backfill to avoid slowing down pagination
47            if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
48                let labels = provider.sync_labels().await?;
49                tracing::debug!(count = labels.len(), "synced labels from provider");
50                for label in &labels {
51                    self.store
52                        .upsert_label(label)
53                        .await
54                        .map_err(|e| MxrError::Store(e.to_string()))?;
55                }
56            }
57
58            // Sync messages
59            tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
60            let batch = match provider.sync_messages(&cursor).await {
61                Ok(batch) => batch,
62                Err(MxrError::NotFound(error))
63                    if !recovered_gmail_cursor_not_found
64                        && matches!(
65                            cursor,
66                            SyncCursor::Gmail { .. } | SyncCursor::GmailBackfill { .. }
67                        ) =>
68                {
69                    tracing::warn!(
70                        account = %account_id,
71                        cursor = ?cursor,
72                        error = %error,
73                        "provider sync hit not-found on gmail cursor; resetting to initial sync"
74                    );
75                    self.store
76                        .set_sync_cursor(account_id, &SyncCursor::Initial)
77                        .await
78                        .map_err(|e| MxrError::Store(e.to_string()))?;
79                    recovered_gmail_cursor_not_found = true;
80                    continue;
81                }
82                Err(error) => return Err(error),
83            };
84            let synced_count = batch.upserted.len() as u32;
85            let upserted_message_ids = batch
86                .upserted
87                .iter()
88                .map(|synced| synced.envelope.id.clone())
89                .collect::<Vec<_>>();
90
91            // Apply upserts — store envelope + body, index with body text
92            for synced in &batch.upserted {
93                // Store envelope
94                self.store
95                    .upsert_envelope(&synced.envelope)
96                    .await
97                    .map_err(|e| MxrError::Store(e.to_string()))?;
98
99                // Store body (eagerly fetched during sync)
100                self.store
101                    .insert_body(&synced.body)
102                    .await
103                    .map_err(|e| MxrError::Store(e.to_string()))?;
104
105                // Populate message_labels junction table
106                if !synced.envelope.label_provider_ids.is_empty() {
107                    let label_ids = self
108                        .store
109                        .find_labels_by_provider_ids(
110                            account_id,
111                            &synced.envelope.label_provider_ids,
112                        )
113                        .await
114                        .map_err(|e| MxrError::Store(e.to_string()))?;
115                    if !label_ids.is_empty() {
116                        self.store
117                            .set_message_labels(&synced.envelope.id, &label_ids)
118                            .await
119                            .map_err(|e| MxrError::Store(e.to_string()))?;
120                    }
121                }
122
123                // Search index — index with body text for immediate full-text search
124                {
125                    let mut search = self.search.lock().await;
126                    search.index_body(&synced.envelope, &synced.body)?;
127                }
128            }
129
130            // Deletions (store-only, no search lock)
131            if !batch.deleted_provider_ids.is_empty() {
132                self.store
133                    .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
134                    .await
135                    .map_err(|e| MxrError::Store(e.to_string()))?;
136            }
137
138            // Apply label changes from delta sync (previously dead code)
139            for change in &batch.label_changes {
140                if let Ok(Some(message_id)) = self
141                    .store
142                    .get_message_id_by_provider_id(account_id, &change.provider_message_id)
143                    .await
144                {
145                    if !change.added_labels.is_empty() {
146                        if let Ok(add_ids) = self
147                            .store
148                            .find_labels_by_provider_ids(account_id, &change.added_labels)
149                            .await
150                        {
151                            for lid in &add_ids {
152                                let _ = self.store.add_message_label(&message_id, lid).await;
153                            }
154                        }
155                    }
156                    if !change.removed_labels.is_empty() {
157                        if let Ok(rm_ids) = self
158                            .store
159                            .find_labels_by_provider_ids(account_id, &change.removed_labels)
160                            .await
161                        {
162                            for lid in &rm_ids {
163                                let _ = self.store.remove_message_label(&message_id, lid).await;
164                            }
165                        }
166                    }
167
168                    if let (Ok(Some(envelope)), Ok(Some(body))) = (
169                        self.store.get_envelope(&message_id).await,
170                        self.store.get_body(&message_id).await,
171                    ) {
172                        let mut search = self.search.lock().await;
173                        search.index_body(&envelope, &body)?;
174                    }
175                }
176            }
177
178            // Commit search index
179            {
180                let mut search = self.search.lock().await;
181                search.commit()?;
182            }
183
184            // Recalculate label counts every batch (including during backfill)
185            self.store
186                .recalculate_label_counts(account_id)
187                .await
188                .map_err(|e| MxrError::Store(e.to_string()))?;
189
190            if !provider.capabilities().native_thread_ids {
191                self.rethread_account(account_id).await?;
192            }
193
194            // Update cursor
195            tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
196            self.store
197                .set_sync_cursor(account_id, &batch.next_cursor)
198                .await
199                .map_err(|e| MxrError::Store(e.to_string()))?;
200
201            // Backfill: if junction table is empty but messages exist, reset cursor
202            // and re-sync to rebuild label associations (handles DBs corrupted by
203            // the old INSERT OR REPLACE cascade bug).
204            let junction_count = self
205                .store
206                .count_message_labels()
207                .await
208                .map_err(|e| MxrError::Store(e.to_string()))?;
209            let message_count = self
210                .store
211                .count_messages_by_account(account_id)
212                .await
213                .map_err(|e| MxrError::Store(e.to_string()))?;
214            if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
215                tracing::warn!(
216                    message_count,
217                    "Junction table empty — resetting sync cursor for full re-sync"
218                );
219                self.store
220                    .set_sync_cursor(account_id, &SyncCursor::Initial)
221                    .await
222                    .map_err(|e| MxrError::Store(e.to_string()))?;
223                continue;
224            }
225
226            return Ok(SyncOutcome {
227                synced_count,
228                upserted_message_ids,
229            });
230        }
231    }
232
233    async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
234        let envelopes = self
235            .store
236            .list_envelopes_by_account(account_id, 10_000, 0)
237            .await
238            .map_err(|e| MxrError::Store(e.to_string()))?;
239
240        let by_header: HashMap<String, Envelope> = envelopes
241            .iter()
242            .filter_map(|envelope| {
243                envelope
244                    .message_id_header
245                    .clone()
246                    .map(|header| (header, envelope.clone()))
247            })
248            .collect();
249
250        let threading_input: Vec<MessageForThreading> = envelopes
251            .iter()
252            .filter_map(|envelope| {
253                envelope
254                    .message_id_header
255                    .clone()
256                    .map(|message_id| MessageForThreading {
257                        message_id,
258                        in_reply_to: envelope.in_reply_to.clone(),
259                        references: envelope.references.clone(),
260                        date: envelope.date,
261                        subject: envelope.subject.clone(),
262                    })
263            })
264            .collect();
265
266        for thread in thread_messages(&threading_input) {
267            let thread_members: Vec<Envelope> = thread
268                .messages
269                .iter()
270                .filter_map(|message_id| by_header.get(message_id).cloned())
271                .collect();
272
273            if thread_members.is_empty() {
274                continue;
275            }
276
277            let canonical_thread_id = by_header
278                .get(&thread.root_message_id)
279                .map(|root| root.thread_id.clone())
280                .or_else(|| {
281                    thread_members
282                        .first()
283                        .map(|member| member.thread_id.clone())
284                })
285                .unwrap_or_default();
286
287            for member in thread_members {
288                if member.thread_id != canonical_thread_id {
289                    self.store
290                        .update_message_thread_id(&member.id, &canonical_thread_id)
291                        .await
292                        .map_err(|e| MxrError::Store(e.to_string()))?;
293                }
294            }
295        }
296
297        Ok(())
298    }
299
300    /// Read body from store. Bodies are always available after sync.
301    pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
302        self.store
303            .get_body(message_id)
304            .await
305            .map_err(|e| MxrError::Store(e.to_string()))?
306            .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
307    }
308
309    pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
310        let now = chrono::Utc::now();
311        let due = self
312            .store
313            .get_due_snoozes(now)
314            .await
315            .map_err(|e| MxrError::Store(e.to_string()))?;
316
317        let mut woken = Vec::new();
318        for snoozed in &due {
319            self.store
320                .remove_snooze(&snoozed.message_id)
321                .await
322                .map_err(|e| MxrError::Store(e.to_string()))?;
323            woken.push(snoozed.message_id.clone());
324        }
325
326        Ok(woken)
327    }
328}