1use mxr_core::id::*;
2use mxr_core::types::*;
3use mxr_core::{MailSyncProvider, MxrError};
4use mxr_search::SearchIndex;
5use mxr_store::Store;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::Mutex;
9
10use crate::threading::{thread_messages, MessageForThreading};
11
12pub struct SyncOutcome {
13 pub synced_count: u32,
14 pub upserted_message_ids: Vec<MessageId>,
15}
16
17pub struct SyncEngine {
18 store: Arc<Store>,
19 search: Arc<Mutex<SearchIndex>>,
20}
21
22impl SyncEngine {
23 pub fn new(store: Arc<Store>, search: Arc<Mutex<SearchIndex>>) -> Self {
24 Self { store, search }
25 }
26
27 pub async fn sync_account(&self, provider: &dyn MailSyncProvider) -> Result<u32, MxrError> {
28 Ok(self.sync_account_with_outcome(provider).await?.synced_count)
29 }
30
31 pub async fn sync_account_with_outcome(
32 &self,
33 provider: &dyn MailSyncProvider,
34 ) -> Result<SyncOutcome, MxrError> {
35 let account_id = provider.account_id();
36
37 loop {
38 let cursor = self
39 .store
40 .get_sync_cursor(account_id)
41 .await
42 .map_err(|e| MxrError::Store(e.to_string()))?
43 .unwrap_or(SyncCursor::Initial);
44
45 if !matches!(cursor, SyncCursor::GmailBackfill { .. }) {
47 let labels = provider.sync_labels().await?;
48 tracing::debug!(count = labels.len(), "synced labels from provider");
49 for label in &labels {
50 self.store
51 .upsert_label(label)
52 .await
53 .map_err(|e| MxrError::Store(e.to_string()))?;
54 }
55 }
56
57 tracing::info!(cursor = ?cursor, "sync_account: dispatching with cursor");
59 let batch = provider.sync_messages(&cursor).await?;
60 let synced_count = batch.upserted.len() as u32;
61 let upserted_message_ids = batch
62 .upserted
63 .iter()
64 .map(|synced| synced.envelope.id.clone())
65 .collect::<Vec<_>>();
66
67 for synced in &batch.upserted {
69 self.store
71 .upsert_envelope(&synced.envelope)
72 .await
73 .map_err(|e| MxrError::Store(e.to_string()))?;
74
75 self.store
77 .insert_body(&synced.body)
78 .await
79 .map_err(|e| MxrError::Store(e.to_string()))?;
80
81 if !synced.envelope.label_provider_ids.is_empty() {
83 let label_ids = self
84 .store
85 .find_labels_by_provider_ids(
86 account_id,
87 &synced.envelope.label_provider_ids,
88 )
89 .await
90 .map_err(|e| MxrError::Store(e.to_string()))?;
91 if !label_ids.is_empty() {
92 self.store
93 .set_message_labels(&synced.envelope.id, &label_ids)
94 .await
95 .map_err(|e| MxrError::Store(e.to_string()))?;
96 }
97 }
98
99 {
101 let mut search = self.search.lock().await;
102 search.index_body(&synced.envelope, &synced.body)?;
103 }
104 }
105
106 if !batch.deleted_provider_ids.is_empty() {
108 self.store
109 .delete_messages_by_provider_ids(account_id, &batch.deleted_provider_ids)
110 .await
111 .map_err(|e| MxrError::Store(e.to_string()))?;
112 }
113
114 for change in &batch.label_changes {
116 if let Ok(Some(message_id)) = self
117 .store
118 .get_message_id_by_provider_id(account_id, &change.provider_message_id)
119 .await
120 {
121 if !change.added_labels.is_empty() {
122 if let Ok(add_ids) = self
123 .store
124 .find_labels_by_provider_ids(account_id, &change.added_labels)
125 .await
126 {
127 for lid in &add_ids {
128 let _ = self.store.add_message_label(&message_id, lid).await;
129 }
130 }
131 }
132 if !change.removed_labels.is_empty() {
133 if let Ok(rm_ids) = self
134 .store
135 .find_labels_by_provider_ids(account_id, &change.removed_labels)
136 .await
137 {
138 for lid in &rm_ids {
139 let _ = self.store.remove_message_label(&message_id, lid).await;
140 }
141 }
142 }
143
144 if let (Ok(Some(envelope)), Ok(Some(body))) = (
145 self.store.get_envelope(&message_id).await,
146 self.store.get_body(&message_id).await,
147 ) {
148 let mut search = self.search.lock().await;
149 search.index_body(&envelope, &body)?;
150 }
151 }
152 }
153
154 {
156 let mut search = self.search.lock().await;
157 search.commit()?;
158 }
159
160 self.store
162 .recalculate_label_counts(account_id)
163 .await
164 .map_err(|e| MxrError::Store(e.to_string()))?;
165
166 if !provider.capabilities().native_thread_ids {
167 self.rethread_account(account_id).await?;
168 }
169
170 tracing::info!(next_cursor = ?batch.next_cursor, "sync_account: saving cursor");
172 self.store
173 .set_sync_cursor(account_id, &batch.next_cursor)
174 .await
175 .map_err(|e| MxrError::Store(e.to_string()))?;
176
177 let junction_count = self
181 .store
182 .count_message_labels()
183 .await
184 .map_err(|e| MxrError::Store(e.to_string()))?;
185 let message_count = self
186 .store
187 .count_messages_by_account(account_id)
188 .await
189 .map_err(|e| MxrError::Store(e.to_string()))?;
190 if provider.capabilities().labels && junction_count == 0 && message_count > 0 {
191 tracing::warn!(
192 message_count,
193 "Junction table empty — resetting sync cursor for full re-sync"
194 );
195 self.store
196 .set_sync_cursor(account_id, &SyncCursor::Initial)
197 .await
198 .map_err(|e| MxrError::Store(e.to_string()))?;
199 continue;
200 }
201
202 return Ok(SyncOutcome {
203 synced_count,
204 upserted_message_ids,
205 });
206 }
207 }
208
209 async fn rethread_account(&self, account_id: &AccountId) -> Result<(), MxrError> {
210 let envelopes = self
211 .store
212 .list_envelopes_by_account(account_id, 10_000, 0)
213 .await
214 .map_err(|e| MxrError::Store(e.to_string()))?;
215
216 let by_header: HashMap<String, Envelope> = envelopes
217 .iter()
218 .filter_map(|envelope| {
219 envelope
220 .message_id_header
221 .clone()
222 .map(|header| (header, envelope.clone()))
223 })
224 .collect();
225
226 let threading_input: Vec<MessageForThreading> = envelopes
227 .iter()
228 .filter_map(|envelope| {
229 envelope
230 .message_id_header
231 .clone()
232 .map(|message_id| MessageForThreading {
233 message_id,
234 in_reply_to: envelope.in_reply_to.clone(),
235 references: envelope.references.clone(),
236 date: envelope.date,
237 subject: envelope.subject.clone(),
238 })
239 })
240 .collect();
241
242 for thread in thread_messages(&threading_input) {
243 let thread_members: Vec<Envelope> = thread
244 .messages
245 .iter()
246 .filter_map(|message_id| by_header.get(message_id).cloned())
247 .collect();
248
249 if thread_members.is_empty() {
250 continue;
251 }
252
253 let canonical_thread_id = by_header
254 .get(&thread.root_message_id)
255 .map(|root| root.thread_id.clone())
256 .or_else(|| {
257 thread_members
258 .first()
259 .map(|member| member.thread_id.clone())
260 })
261 .unwrap_or_default();
262
263 for member in thread_members {
264 if member.thread_id != canonical_thread_id {
265 self.store
266 .update_message_thread_id(&member.id, &canonical_thread_id)
267 .await
268 .map_err(|e| MxrError::Store(e.to_string()))?;
269 }
270 }
271 }
272
273 Ok(())
274 }
275
276 pub async fn get_body(&self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
278 self.store
279 .get_body(message_id)
280 .await
281 .map_err(|e| MxrError::Store(e.to_string()))?
282 .ok_or_else(|| MxrError::NotFound(format!("Body for message {}", message_id)))
283 }
284
285 pub async fn check_snoozes(&self) -> Result<Vec<MessageId>, MxrError> {
286 let now = chrono::Utc::now();
287 let due = self
288 .store
289 .get_due_snoozes(now)
290 .await
291 .map_err(|e| MxrError::Store(e.to_string()))?;
292
293 let mut woken = Vec::new();
294 for snoozed in &due {
295 self.store
296 .remove_snooze(&snoozed.message_id)
297 .await
298 .map_err(|e| MxrError::Store(e.to_string()))?;
299 woken.push(snoozed.message_id.clone());
300 }
301
302 Ok(woken)
303 }
304}