Skip to main content

mxr_sync/
engine.rs

1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13    pub synced_count: u32,
14    pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18    store: Arc<Store>,
19    search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23    pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24        Self { store, search }
25    }
26
27    pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28        Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29    }
30
31    pub async fn sync_account_with_outcome(
32        &self,
33        provider: &dyn MailSyncProvider,
34    ) -> Result<SyncOutcome, MxrError> {
35        let account_id = provider.account_id();
36
37        loop {
38            let cursor = self
39                .store
40                .get_sync_cursor(account_id)
41                .await
42                .map_err(|e| MxrError::Store(e.to_string()))?
43                .unwrap_or(SyncCursor::Initial);
44
45            // Sync labels — skip during backfill to avoid slowing down pagination
46            if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
47                let labels = provider.sync_labels().await?;
48                tracing::debug!(count = labels.len(), "synced labels from provider");
49                for label in &labels {
50                    self.store
51                        .upsert_label(label)
52                        .await
53                        .map_err(|e| MxrError::Store(e.to_string()))?;
54                }
55            }
56
57            // Sync messages
58            tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
59            let batch = provider.sync_messages(&cursor).await?;
60            let synced_count = batch.upserted.len() as u32;
61            let upserted_message_ids = batch
62                .upserted
63                .iter()
64                .map(|synced| synced.envelope.id.clone())
65                .collect::<Vec<_>>();
66
67            // Apply upserts — store envelope + body, index with body text
68            for synced in &batch.upserted {
69                // Store envelope
70                self.store
71                    .upsert_envelope(&synced.envelope)
72                    .await
73                    .map_err(|e| MxrError::Store(e.to_string()))?;
74
75                // Store body (eagerly fetched during sync)
76                self.store
77                    .insert_body(&synced.body)
78                    .await
79                    .map_err(|e| MxrError::Store(e.to_string()))?;
80
81                // Populate message_labels junction table
82                if !synced.envelope.label_provider_ids.is_empty() {
83                    let label_ids = self
84                        .store
85                        .find_labels_by_provider_ids(
86                            account_id,
87                            &synced.envelope.label_provider_ids,
88                        )
89                        .await
90                        .map_err(|e| MxrError::Store(e.to_string()))?;
91                    if !label_ids.is_empty() {
92                        self.store
93                            .set_message_labels(&synced.envelope.id, &label_ids)
94                            .await
95                            .map_err(|e| MxrError::Store(e.to_string()))?;
96                    }
97                }
98
99                // Search index — index with body text for immediate full-text search
100                {
101                    let mut search = self.search.lock().await;
102                    search.index_body(&synced.envelope, &synced.body)?;
103                }
104            }
105
106            // Deletions (store-only, no search lock)
107            if !batch.deleted_provider_ids.is_empty() {
108                self.store
109                    .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
110                    .await
111                    .map_err(|e| MxrError::Store(e.to_string()))?;
112            }
113
114            // Apply label changes from delta sync (previously dead code)
115            for change in &batch.label_changes {
116                if let Ok(Some(message_id)) = self
117                    .store
118                    .get_message_id_by_provider_id(account_id, &change.provider_message_id)
119                    .await
120                {
121                    if !change.added_labels.is_empty() {
122                        if let Ok(add_ids) = self
123                            .store
124                            .find_labels_by_provider_ids(account_id, &change.added_labels)
125                            .await
126                        {
127                            for lid in &add_ids {
128                                let _ = self.store.add_message_label(&message_id, lid).await;
129                            }
130                        }
131                    }
132                    if !change.removed_labels.is_empty() {
133                        if let Ok(rm_ids) = self
134                            .store
135                            .find_labels_by_provider_ids(account_id, &change.removed_labels)
136                            .await
137                        {
138                            for lid in &rm_ids {
139                                let _ = self.store.remove_message_label(&message_id, lid).await;
140                            }
141                        }
142                    }
143
144                    if let (Ok(Some(envelope)), Ok(Some(body))) = (
145                        self.store.get_envelope(&message_id).await,
146                        self.store.get_body(&message_id).await,
147                    ) {
148                        let mut search = self.search.lock().await;
149                        search.index_body(&envelope, &body)?;
150                    }
151                }
152            }
153
154            // Commit search index
155            {
156                let mut search = self.search.lock().await;
157                search.commit()?;
158            }
159
160            // Recalculate label counts every batch (including during backfill)
161            self.store
162                .recalculate_label_counts(account_id)
163                .await
164                .map_err(|e| MxrError::Store(e.to_string()))?;
165
166            if !provider.capabilities().native_thread_ids {
167                self.rethread_account(account_id).await?;
168            }
169
170            // Update cursor
171            tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
172            self.store
173                .set_sync_cursor(account_id, &batch.next_cursor)
174                .await
175                .map_err(|e| MxrError::Store(e.to_string()))?;
176
177            // Backfill: if junction table is empty but messages exist, reset cursor
178            // and re-sync to rebuild label associations (handles DBs corrupted by
179            // the old INSERT OR REPLACE cascade bug).
180            let junction_count = self
181                .store
182                .count_message_labels()
183                .await
184                .map_err(|e| MxrError::Store(e.to_string()))?;
185            let message_count = self
186                .store
187                .count_messages_by_account(account_id)
188                .await
189                .map_err(|e| MxrError::Store(e.to_string()))?;
190            if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
191                tracing::warn!(
192                    message_count,
193                    "Junction table empty — resetting sync cursor for full re-sync"
194                );
195                self.store
196                    .set_sync_cursor(account_id, &SyncCursor::Initial)
197                    .await
198                    .map_err(|e| MxrError::Store(e.to_string()))?;
199                continue;
200            }
201
202            return Ok(SyncOutcome {
203                synced_count,
204                upserted_message_ids,
205            });
206        }
207    }
208
209    async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
210        let envelopes = self
211            .store
212            .list_envelopes_by_account(account_id, 10_000, 0)
213            .await
214            .map_err(|e| MxrError::Store(e.to_string()))?;
215
216        let by_header: HashMap<String, Envelope> = envelopes
217            .iter()
218            .filter_map(|envelope| {
219                envelope
220                    .message_id_header
221                    .clone()
222                    .map(|header| (header, envelope.clone()))
223            })
224            .collect();
225
226        let threading_input: Vec<MessageForThreading> = envelopes
227            .iter()
228            .filter_map(|envelope| {
229                envelope
230                    .message_id_header
231                    .clone()
232                    .map(|message_id| MessageForThreading {
233                        message_id,
234                        in_reply_to: envelope.in_reply_to.clone(),
235                        references: envelope.references.clone(),
236                        date: envelope.date,
237                        subject: envelope.subject.clone(),
238                    })
239            })
240            .collect();
241
242        for thread in thread_messages(&threading_input) {
243            let thread_members: Vec<Envelope> = thread
244                .messages
245                .iter()
246                .filter_map(|message_id| by_header.get(message_id).cloned())
247                .collect();
248
249            if thread_members.is_empty() {
250                continue;
251            }
252
253            let canonical_thread_id = by_header
254                .get(&thread.root_message_id)
255                .map(|root| root.thread_id.clone())
256                .or_else(|| {
257                    thread_members
258                        .first()
259                        .map(|member| member.thread_id.clone())
260                })
261                .unwrap_or_default();
262
263            for member in thread_members {
264                if member.thread_id != canonical_thread_id {
265                    self.store
266                        .update_message_thread_id(&member.id, &canonical_thread_id)
267                        .await
268                        .map_err(|e| MxrError::Store(e.to_string()))?;
269                }
270            }
271        }
272
273        Ok(())
274    }
275
276    /// Read body from store. Bodies are always available after sync.
277    pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
278        self.store
279            .get_body(message_id)
280            .await
281            .map_err(|e| MxrError::Store(e.to_string()))?
282            .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
283    }
284
285    pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
286        let now = chrono::Utc::now();
287        let due = self
288            .store
289            .get_due_snoozes(now)
290            .await
291            .map_err(|e| MxrError::Store(e.to_string()))?;
292
293        let mut woken = Vec::new();
294        for snoozed in &due {
295            self.store
296                .remove_snooze(&snoozed.message_id)
297                .await
298                .map_err(|e| MxrError::Store(e.to_string()))?;
299            woken.push(snoozed.message_id.clone());
300        }
301
302        Ok(woken)
303    }
304}