Skip to main content

mxr_sync/
engine.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13    pub synced_count: u32,
14    pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18    store: Arc<Store>,
19    search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23    pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24        Self { store, search }
25    }
26
27    pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28        Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29    }
30
31    pub async fn sync_account_with_outcome(
32        &self,
33        provider: &dyn MailSyncProvider,
34    ) -> Result<SyncOutcome, MxrError> {
35        let account_id = provider.account_id();
36
37        loop {
38            let cursor = self
39                .store
40                .get_sync_cursor(account_id)
41                .await
42                .map_err(|e| MxrError::Store(e.to_string()))?
43                .unwrap_or(SyncCursor::Initial);
44
45            // Sync labels — skip during backfill to avoid slowing down pagination
46            if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
47                let labels = provider.sync_labels().await?;
48                tracing::debug!(count = labels.len(), "synced labels from provider");
49                for label in &labels {
50                    self.store
51                        .upsert_label(label)
52                        .await
53                        .map_err(|e| MxrError::Store(e.to_string()))?;
54                }
55            }
56
57            // Sync messages
58            tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
59            let batch = provider.sync_messages(&cursor).await?;
60            let synced_count = batch.upserted.len() as u32;
61            let upserted_message_ids = batch
62                .upserted
63                .iter()
64                .map(|synced| synced.envelope.id.clone())
65                .collect::<Vec<_>>();
66
67            // Apply upserts — store envelope + body, index with body text
68            for synced in &batch.upserted {
69                // Store envelope
70                self.store
71                    .upsert_envelope(&synced.envelope)
72                    .await
73                    .map_err(|e| MxrError::Store(e.to_string()))?;
74
75                // Store body (eagerly fetched during sync)
76                self.store
77                    .insert_body(&synced.body)
78                    .await
79                    .map_err(|e| MxrError::Store(e.to_string()))?;
80
81                // Populate message_labels junction table
82                if !synced.envelope.label_provider_ids.is_empty() {
83                    let label_ids = self
84                        .store
85                        .find_labels_by_provider_ids(account_id, &synced.envelope.label_provider_ids)
86                        .await
87                        .map_err(|e| MxrError::Store(e.to_string()))?;
88                    if !label_ids.is_empty() {
89                        self.store
90                            .set_message_labels(&synced.envelope.id, &label_ids)
91                            .await
92                            .map_err(|e| MxrError::Store(e.to_string()))?;
93                    }
94                }
95
96                // Search index — index with body text for immediate full-text search
97                {
98                    let mut search = self.search.lock().await;
99                    search.index_body(&synced.envelope, &synced.body)?;
100                }
101            }
102
103            // Deletions (store-only, no search lock)
104            if !batch.deleted_provider_ids.is_empty() {
105                self.store
106                    .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
107                    .await
108                    .map_err(|e| MxrError::Store(e.to_string()))?;
109            }
110
111            // Apply label changes from delta sync (previously dead code)
112            for change in &batch.label_changes {
113                if let Ok(Some(message_id)) = self
114                    .store
115                    .get_message_id_by_provider_id(account_id, &change.provider_message_id)
116                    .await
117                {
118                    if !change.added_labels.is_empty() {
119                        if let Ok(add_ids) = self
120                            .store
121                            .find_labels_by_provider_ids(account_id, &change.added_labels)
122                            .await
123                        {
124                            for lid in &add_ids {
125                                let _ = self.store.add_message_label(&message_id, lid).await;
126                            }
127                        }
128                    }
129                    if !change.removed_labels.is_empty() {
130                        if let Ok(rm_ids) = self
131                            .store
132                            .find_labels_by_provider_ids(account_id, &change.removed_labels)
133                            .await
134                        {
135                            for lid in &rm_ids {
136                                let _ = self.store.remove_message_label(&message_id, lid).await;
137                            }
138                        }
139                    }
140
141                    if let (Ok(Some(envelope)), Ok(Some(body))) = (
142                        self.store.get_envelope(&message_id).await,
143                        self.store.get_body(&message_id).await,
144                    ) {
145                        let mut search = self.search.lock().await;
146                        search.index_body(&envelope, &body)?;
147                    }
148                }
149            }
150
151            // Commit search index
152            {
153                let mut search = self.search.lock().await;
154                search.commit()?;
155            }
156
157            // Recalculate label counts every batch (including during backfill)
158            self.store
159                .recalculate_label_counts(account_id)
160                .await
161                .map_err(|e| MxrError::Store(e.to_string()))?;
162
163            if !provider.capabilities().native_thread_ids {
164                self.rethread_account(account_id).await?;
165            }
166
167            // Update cursor
168            tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
169            self.store
170                .set_sync_cursor(account_id, &batch.next_cursor)
171                .await
172                .map_err(|e| MxrError::Store(e.to_string()))?;
173
174            // Backfill: if junction table is empty but messages exist, reset cursor
175            // and re-sync to rebuild label associations (handles DBs corrupted by
176            // the old INSERT OR REPLACE cascade bug).
177            let junction_count = self
178                .store
179                .count_message_labels()
180                .await
181                .map_err(|e| MxrError::Store(e.to_string()))?;
182            let message_count = self
183                .store
184                .count_messages_by_account(account_id)
185                .await
186                .map_err(|e| MxrError::Store(e.to_string()))?;
187            if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
188                tracing::warn!(
189                    message_count,
190                    "Junction table empty — resetting sync cursor for full re-sync"
191                );
192                self.store
193                    .set_sync_cursor(account_id, &SyncCursor::Initial)
194                    .await
195                    .map_err(|e| MxrError::Store(e.to_string()))?;
196                continue;
197            }
198
199            return Ok(SyncOutcome {
200                synced_count,
201                upserted_message_ids,
202            });
203        }
204    }
205
206    async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
207        let envelopes = self
208            .store
209            .list_envelopes_by_account(account_id, 10_000, 0)
210            .await
211            .map_err(|e| MxrError::Store(e.to_string()))?;
212
213        let by_header: HashMap<String, Envelope> = envelopes
214            .iter()
215            .filter_map(|envelope| {
216                envelope
217                    .message_id_header
218                    .clone()
219                    .map(|header| (header, envelope.clone()))
220            })
221            .collect();
222
223        let threading_input: Vec<MessageForThreading> = envelopes
224            .iter()
225            .filter_map(|envelope| {
226                envelope
227                    .message_id_header
228                    .clone()
229                    .map(|message_id| MessageForThreading {
230                        message_id,
231                        in_reply_to: envelope.in_reply_to.clone(),
232                        references: envelope.references.clone(),
233                        date: envelope.date,
234                        subject: envelope.subject.clone(),
235                    })
236            })
237            .collect();
238
239        for thread in thread_messages(&threading_input) {
240            let thread_members: Vec<Envelope> = thread
241                .messages
242                .iter()
243                .filter_map(|message_id| by_header.get(message_id).cloned())
244                .collect();
245
246            if thread_members.is_empty() {
247                continue;
248            }
249
250            let canonical_thread_id = by_header
251                .get(&thread.root_message_id)
252                .map(|root| root.thread_id.clone())
253                .or_else(|| thread_members.first().map(|member| member.thread_id.clone()))
254                .unwrap_or_default();
255
256            for member in thread_members {
257                if member.thread_id != canonical_thread_id {
258                    self.store
259                        .update_message_thread_id(&member.id, &canonical_thread_id)
260                        .await
261                        .map_err(|e| MxrError::Store(e.to_string()))?;
262                }
263            }
264        }
265
266        Ok(())
267    }
268
269    /// Read body from store. Bodies are always available after sync.
270    pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
271        self.store
272            .get_body(message_id)
273            .await
274            .map_err(|e| MxrError::Store(e.to_string()))?
275            .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
276    }
277
278    pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
279        let now = chrono::Utc::now();
280        let due = self
281            .store
282            .get_due_snoozes(now)
283            .await
284            .map_err(|e| MxrError::Store(e.to_string()))?;
285
286        let mut woken = Vec::new();
287        for snoozed in &due {
288            self.store
289                .remove_snooze(&snoozed.message_id)
290                .await
291                .map_err(|e| MxrError::Store(e.to_string()))?;
292            woken.push(snoozed.message_id.clone());
293        }
294
295        Ok(woken)
296    }
297}