1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13 pub synced_count: u32,
14 pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18 store: Arc<Store>,
19 search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23 pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24 Self { store, search }
25 }
26
27 pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28 Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29 }
30
31 pub async fn sync_account_with_outcome(
32 &self,
33 provider: &dyn MailSyncProvider,
34 ) -> Result<SyncOutcome, MxrError> {
35 let account_id = provider.account_id();
36 let mut recovered_gmail_cursor_not_found = false;
37
38 loop {
39 let cursor = self
40 .store
41 .get_sync_cursor(account_id)
42 .await
43 .map_err(|e| MxrError::Store(e.to_string()))?
44 .unwrap_or(SyncCursor::Initial);
45
46 if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
48 let labels = provider.sync_labels().await?;
49 tracing::debug!(count = labels.len(), "synced labels from provider");
50 for label in &labels {
51 self.store
52 .upsert_label(label)
53 .await
54 .map_err(|e| MxrError::Store(e.to_string()))?;
55 }
56 }
57
58 tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
60 let batch = match provider.sync_messages(&cursor).await {
61 Ok(batch) => batch,
62 Err(MxrError::NotFound(error))
63 if !recovered_gmail_cursor_not_found
64 && matches!(
65 cursor,
66 SyncCursor::Gmail { .. } | SyncCursor::GmailBackfill { .. }
67 ) =>
68 {
69 tracing::warn!(
70 account = %account_id,
71 cursor = ?cursor,
72 error = %error,
73 "provider sync hit not-found on gmail cursor; resetting to initial sync"
74 );
75 self.store
76 .set_sync_cursor(account_id, &SyncCursor::Initial)
77 .await
78 .map_err(|e| MxrError::Store(e.to_string()))?;
79 recovered_gmail_cursor_not_found = true;
80 continue;
81 }
82 Err(error) => return Err(error),
83 };
84 let synced_count = batch.upserted.len() as u32;
85 let upserted_message_ids = batch
86 .upserted
87 .iter()
88 .map(|synced| synced.envelope.id.clone())
89 .collect::<Vec<_>>();
90
91 for synced in &batch.upserted {
93 self.store
95 .upsert_envelope(&synced.envelope)
96 .await
97 .map_err(|e| MxrError::Store(e.to_string()))?;
98
99 self.store
101 .insert_body(&synced.body)
102 .await
103 .map_err(|e| MxrError::Store(e.to_string()))?;
104
105 if !synced.envelope.label_provider_ids.is_empty() {
107 let label_ids = self
108 .store
109 .find_labels_by_provider_ids(
110 account_id,
111 &synced.envelope.label_provider_ids,
112 )
113 .await
114 .map_err(|e| MxrError::Store(e.to_string()))?;
115 if !label_ids.is_empty() {
116 self.store
117 .set_message_labels(&synced.envelope.id, &label_ids)
118 .await
119 .map_err(|e| MxrError::Store(e.to_string()))?;
120 }
121 }
122
123 {
125 let mut search = self.search.lock().await;
126 search.index_body(&synced.envelope, &synced.body)?;
127 }
128 }
129
130 if !batch.deleted_provider_ids.is_empty() {
132 self.store
133 .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
134 .await
135 .map_err(|e| MxrError::Store(e.to_string()))?;
136 }
137
138 for change in &batch.label_changes {
140 if let Ok(Some(message_id)) = self
141 .store
142 .get_message_id_by_provider_id(account_id, &change.provider_message_id)
143 .await
144 {
145 if !change.added_labels.is_empty() {
146 if let Ok(add_ids) = self
147 .store
148 .find_labels_by_provider_ids(account_id, &change.added_labels)
149 .await
150 {
151 for lid in &add_ids {
152 let _ = self.store.add_message_label(&message_id, lid).await;
153 }
154 }
155 }
156 if !change.removed_labels.is_empty() {
157 if let Ok(rm_ids) = self
158 .store
159 .find_labels_by_provider_ids(account_id, &change.removed_labels)
160 .await
161 {
162 for lid in &rm_ids {
163 let _ = self.store.remove_message_label(&message_id, lid).await;
164 }
165 }
166 }
167
168 if let (Ok(Some(envelope)), Ok(Some(body))) = (
169 self.store.get_envelope(&message_id).await,
170 self.store.get_body(&message_id).await,
171 ) {
172 let mut search = self.search.lock().await;
173 search.index_body(&envelope, &body)?;
174 }
175 }
176 }
177
178 {
180 let mut search = self.search.lock().await;
181 search.commit()?;
182 }
183
184 self.store
186 .recalculate_label_counts(account_id)
187 .await
188 .map_err(|e| MxrError::Store(e.to_string()))?;
189
190 if !provider.capabilities().native_thread_ids {
191 self.rethread_account(account_id).await?;
192 }
193
194 tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
196 self.store
197 .set_sync_cursor(account_id, &batch.next_cursor)
198 .await
199 .map_err(|e| MxrError::Store(e.to_string()))?;
200
201 let junction_count = self
205 .store
206 .count_message_labels()
207 .await
208 .map_err(|e| MxrError::Store(e.to_string()))?;
209 let message_count = self
210 .store
211 .count_messages_by_account(account_id)
212 .await
213 .map_err(|e| MxrError::Store(e.to_string()))?;
214 if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
215 tracing::warn!(
216 message_count,
217 "Junction table empty — resetting sync cursor for full re-sync"
218 );
219 self.store
220 .set_sync_cursor(account_id, &SyncCursor::Initial)
221 .await
222 .map_err(|e| MxrError::Store(e.to_string()))?;
223 continue;
224 }
225
226 return Ok(SyncOutcome {
227 synced_count,
228 upserted_message_ids,
229 });
230 }
231 }
232
233 async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
234 let envelopes = self
235 .store
236 .list_envelopes_by_account(account_id, 10_000, 0)
237 .await
238 .map_err(|e| MxrError::Store(e.to_string()))?;
239
240 let by_header: HashMap<String, Envelope> = envelopes
241 .iter()
242 .filter_map(|envelope| {
243 envelope
244 .message_id_header
245 .clone()
246 .map(|header| (header, envelope.clone()))
247 })
248 .collect();
249
250 let threading_input: Vec<MessageForThreading> = envelopes
251 .iter()
252 .filter_map(|envelope| {
253 envelope
254 .message_id_header
255 .clone()
256 .map(|message_id| MessageForThreading {
257 message_id,
258 in_reply_to: envelope.in_reply_to.clone(),
259 references: envelope.references.clone(),
260 date: envelope.date,
261 subject: envelope.subject.clone(),
262 })
263 })
264 .collect();
265
266 for thread in thread_messages(&threading_input) {
267 let thread_members: Vec<Envelope> = thread
268 .messages
269 .iter()
270 .filter_map(|message_id| by_header.get(message_id).cloned())
271 .collect();
272
273 if thread_members.is_empty() {
274 continue;
275 }
276
277 let canonical_thread_id = by_header
278 .get(&thread.root_message_id)
279 .map(|root| root.thread_id.clone())
280 .or_else(|| {
281 thread_members
282 .first()
283 .map(|member| member.thread_id.clone())
284 })
285 .unwrap_or_default();
286
287 for member in thread_members {
288 if member.thread_id != canonical_thread_id {
289 self.store
290 .update_message_thread_id(&member.id, &canonical_thread_id)
291 .await
292 .map_err(|e| MxrError::Store(e.to_string()))?;
293 }
294 }
295 }
296
297 Ok(())
298 }
299
300 pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
302 self.store
303 .get_body(message_id)
304 .await
305 .map_err(|e| MxrError::Store(e.to_string()))?
306 .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
307 }
308
309 pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
310 let now = chrono::Utc::now();
311 let due = self
312 .store
313 .get_due_snoozes(now)
314 .await
315 .map_err(|e| MxrError::Store(e.to_string()))?;
316
317 let mut woken = Vec::new();
318 for snoozed in &due {
319 self.store
320 .remove_snooze(&snoozed.message_id)
321 .await
322 .map_err(|e| MxrError::Store(e.to_string()))?;
323 woken.push(snoozed.message_id.clone());
324 }
325
326 Ok(woken)
327 }
328}