1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13 pub synced_count: u32,
14 pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18 store: Arc<Store>,
19 search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23 pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24 Self { store, search }
25 }
26
27 pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28 Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29 }
30
31 pub async fn sync_account_with_outcome(
32 &self,
33 provider: &dyn MailSyncProvider,
34 ) -> Result<SyncOutcome, MxrError> {
35 let account_id = provider.account_id();
36
37 loop {
38 let cursor = self
39 .store
40 .get_sync_cursor(account_id)
41 .await
42 .map_err(|e| MxrError::Store(e.to_string()))?
43 .unwrap_or(SyncCursor::Initial);
44
45 if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
47 let labels = provider.sync_labels().await?;
48 tracing::debug!(count = labels.len(), "synced labels from provider");
49 for label in &labels {
50 self.store
51 .upsert_label(label)
52 .await
53 .map_err(|e| MxrError::Store(e.to_string()))?;
54 }
55 }
56
57 tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
59 let batch = provider.sync_messages(&cursor).await?;
60 let synced_count = batch.upserted.len() as u32;
61 let upserted_message_ids = batch
62 .upserted
63 .iter()
64 .map(|synced| synced.envelope.id.clone())
65 .collect::<Vec<_>>();
66
67 for synced in &batch.upserted {
69 self.store
71 .upsert_envelope(&synced.envelope)
72 .await
73 .map_err(|e| MxrError::Store(e.to_string()))?;
74
75 self.store
77 .insert_body(&synced.body)
78 .await
79 .map_err(|e| MxrError::Store(e.to_string()))?;
80
81 if !synced.envelope.label_provider_ids.is_empty() {
83 let label_ids = self
84 .store
85 .find_labels_by_provider_ids(account_id, &synced.envelope.label_provider_ids)
86 .await
87 .map_err(|e| MxrError::Store(e.to_string()))?;
88 if !label_ids.is_empty() {
89 self.store
90 .set_message_labels(&synced.envelope.id, &label_ids)
91 .await
92 .map_err(|e| MxrError::Store(e.to_string()))?;
93 }
94 }
95
96 {
98 let mut search = self.search.lock().await;
99 search.index_body(&synced.envelope, &synced.body)?;
100 }
101 }
102
103 if !batch.deleted_provider_ids.is_empty() {
105 self.store
106 .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
107 .await
108 .map_err(|e| MxrError::Store(e.to_string()))?;
109 }
110
111 for change in &batch.label_changes {
113 if let Ok(Some(message_id)) = self
114 .store
115 .get_message_id_by_provider_id(account_id, &change.provider_message_id)
116 .await
117 {
118 if !change.added_labels.is_empty() {
119 if let Ok(add_ids) = self
120 .store
121 .find_labels_by_provider_ids(account_id, &change.added_labels)
122 .await
123 {
124 for lid in &add_ids {
125 let _ = self.store.add_message_label(&message_id, lid).await;
126 }
127 }
128 }
129 if !change.removed_labels.is_empty() {
130 if let Ok(rm_ids) = self
131 .store
132 .find_labels_by_provider_ids(account_id, &change.removed_labels)
133 .await
134 {
135 for lid in &rm_ids {
136 let _ = self.store.remove_message_label(&message_id, lid).await;
137 }
138 }
139 }
140
141 if let (Ok(Some(envelope)), Ok(Some(body))) = (
142 self.store.get_envelope(&message_id).await,
143 self.store.get_body(&message_id).await,
144 ) {
145 let mut search = self.search.lock().await;
146 search.index_body(&envelope, &body)?;
147 }
148 }
149 }
150
151 {
153 let mut search = self.search.lock().await;
154 search.commit()?;
155 }
156
157 self.store
159 .recalculate_label_counts(account_id)
160 .await
161 .map_err(|e| MxrError::Store(e.to_string()))?;
162
163 if !provider.capabilities().native_thread_ids {
164 self.rethread_account(account_id).await?;
165 }
166
167 tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
169 self.store
170 .set_sync_cursor(account_id, &batch.next_cursor)
171 .await
172 .map_err(|e| MxrError::Store(e.to_string()))?;
173
174 let junction_count = self
178 .store
179 .count_message_labels()
180 .await
181 .map_err(|e| MxrError::Store(e.to_string()))?;
182 let message_count = self
183 .store
184 .count_messages_by_account(account_id)
185 .await
186 .map_err(|e| MxrError::Store(e.to_string()))?;
187 if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
188 tracing::warn!(
189 message_count,
190 "Junction table empty — resetting sync cursor for full re-sync"
191 );
192 self.store
193 .set_sync_cursor(account_id, &SyncCursor::Initial)
194 .await
195 .map_err(|e| MxrError::Store(e.to_string()))?;
196 continue;
197 }
198
199 return Ok(SyncOutcome {
200 synced_count,
201 upserted_message_ids,
202 });
203 }
204 }
205
206 async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
207 let envelopes = self
208 .store
209 .list_envelopes_by_account(account_id, 10_000, 0)
210 .await
211 .map_err(|e| MxrError::Store(e.to_string()))?;
212
213 let by_header: HashMap<String, Envelope> = envelopes
214 .iter()
215 .filter_map(|envelope| {
216 envelope
217 .message_id_header
218 .clone()
219 .map(|header| (header, envelope.clone()))
220 })
221 .collect();
222
223 let threading_input: Vec<MessageForThreading> = envelopes
224 .iter()
225 .filter_map(|envelope| {
226 envelope
227 .message_id_header
228 .clone()
229 .map(|message_id| MessageForThreading {
230 message_id,
231 in_reply_to: envelope.in_reply_to.clone(),
232 references: envelope.references.clone(),
233 date: envelope.date,
234 subject: envelope.subject.clone(),
235 })
236 })
237 .collect();
238
239 for thread in thread_messages(&threading_input) {
240 let thread_members: Vec<Envelope> = thread
241 .messages
242 .iter()
243 .filter_map(|message_id| by_header.get(message_id).cloned())
244 .collect();
245
246 if thread_members.is_empty() {
247 continue;
248 }
249
250 let canonical_thread_id = by_header
251 .get(&thread.root_message_id)
252 .map(|root| root.thread_id.clone())
253 .or_else(|| thread_members.first().map(|member| member.thread_id.clone()))
254 .unwrap_or_default();
255
256 for member in thread_members {
257 if member.thread_id != canonical_thread_id {
258 self.store
259 .update_message_thread_id(&member.id, &canonical_thread_id)
260 .await
261 .map_err(|e| MxrError::Store(e.to_string()))?;
262 }
263 }
264 }
265
266 Ok(())
267 }
268
269 pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
271 self.store
272 .get_body(message_id)
273 .await
274 .map_err(|e| MxrError::Store(e.to_string()))?
275 .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
276 }
277
278 pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
279 let now = chrono::Utc::now();
280 let due = self
281 .store
282 .get_due_snoozes(now)
283 .await
284 .map_err(|e| MxrError::Store(e.to_string()))?;
285
286 let mut woken = Vec::new();
287 for snoozed in &due {
288 self.store
289 .remove_snooze(&snoozed.message_id)
290 .await
291 .map_err(|e| MxrError::Store(e.to_string()))?;
292 woken.push(snoozed.message_id.clone());
293 }
294
295 Ok(woken)
296 }
297}