1use async_trait::async_trait;
2use mxr_core::{
3 AccountId, Address, Draft, Label, LabelChange, LabelId, LabelKind, MailSendProvider,
4 MailSyncProvider, MxrError, SendReceipt, SyncBatch, SyncCapabilities, SyncCursor,
5};
6use tracing::{debug, warn};
7
8use crate::client::{GmailApi, GmailClient, MessageFormat};
9use crate::parse::{extract_message_body, gmail_message_to_envelope};
10use crate::send;
11use mxr_core::types::SyncedMessage;
12
13pub struct GmailProvider {
14 account_id: AccountId,
15 client: Box<dyn GmailApi>,
16}
17
18impl GmailProvider {
19 pub fn new(account_id: AccountId, client: GmailClient) -> Self {
20 Self {
21 account_id,
22 client: Box::new(client),
23 }
24 }
25
26 #[cfg(test)]
27 fn with_api(account_id: AccountId, client: Box<dyn GmailApi>) -> Self {
28 Self { account_id, client }
29 }
30
31 fn map_label(&self, gl: crate::types::GmailLabel) -> Label {
32 let kind = match gl.label_type.as_deref() {
33 Some("system") => LabelKind::System,
34 _ => LabelKind::User,
35 };
36
37 let color = gl.color.as_ref().and_then(|c| c.background_color.clone());
38
39 Label {
40 id: LabelId::from_provider_id("gmail", &gl.id),
41 account_id: self.account_id.clone(),
42 name: gl.name,
43 kind,
44 color,
45 provider_id: gl.id,
46 unread_count: gl.messages_unread.unwrap_or(0),
47 total_count: gl.messages_total.unwrap_or(0),
48 }
49 }
50
51 async fn initial_sync(&self) -> Result<SyncBatch, MxrError> {
52 debug!("Starting initial sync for account {}", self.account_id);
53
54 let mut all_messages = Vec::new();
55 let mut page_token: Option<String> = None;
56 let mut latest_history_id: Option<u64> = None;
57 const MAX_INITIAL_MESSAGES: usize = 200;
62
63 loop {
64 let batch_size = (MAX_INITIAL_MESSAGES - all_messages.len()).min(100) as u32;
65 if batch_size == 0 {
66 tracing::info!(
67 "Initial sync: fetched {MAX_INITIAL_MESSAGES} messages, \
68 remaining pages will be backfilled in background"
69 );
70 break;
71 }
72
73 let resp = self
74 .client
75 .list_messages(None, page_token.as_deref(), batch_size)
76 .await
77 .map_err(MxrError::from)?;
78
79 let refs = resp.messages.unwrap_or_default();
80 if refs.is_empty() {
81 break;
82 }
83
84 let ids: Vec<String> = refs.iter().map(|r| r.id.clone()).collect();
85 let messages = self
86 .client
87 .batch_get_messages(&ids, MessageFormat::Full)
88 .await
89 .map_err(MxrError::from)?;
90
91 for msg in &messages {
92 if let Some(ref hid) = msg.history_id {
93 if let Ok(h) = hid.parse::<u64>() {
94 latest_history_id =
95 Some(latest_history_id.map_or(h, |cur: u64| cur.max(h)));
96 }
97 }
98 match gmail_message_to_envelope(msg, &self.account_id) {
99 Ok(env) => {
100 let body = extract_message_body(msg);
101 all_messages.push(SyncedMessage { envelope: env, body });
102 }
103 Err(e) => warn!(msg_id = %msg.id, error = %e, "Failed to parse message"),
104 }
105 }
106
107 match resp.next_page_token {
108 Some(token) => page_token = Some(token),
109 None => break,
110 }
111 }
112
113 let next_cursor = match (latest_history_id, &page_token) {
114 (Some(hid), Some(token)) => {
115 tracing::info!(
116 history_id = hid,
117 "Initial sync producing GmailBackfill cursor for background sync"
118 );
119 SyncCursor::GmailBackfill {
120 history_id: hid,
121 page_token: token.clone(),
122 }
123 }
124 (Some(hid), None) => {
125 tracing::info!(
126 history_id = hid,
127 total = all_messages.len(),
128 "Initial sync complete — all messages fetched, delta-ready"
129 );
130 SyncCursor::Gmail { history_id: hid }
131 }
132 _ => SyncCursor::Initial,
133 };
134
135 Ok(SyncBatch {
136 upserted: all_messages,
137 deleted_provider_ids: vec![],
138 label_changes: vec![],
139 next_cursor,
140 })
141 }
142
143 async fn backfill_sync(
144 &self,
145 history_id: u64,
146 page_token: &str,
147 ) -> Result<SyncBatch, MxrError> {
148 tracing::info!(
149 "Backfill sync: fetching next page for account {}",
150 self.account_id,
151 );
152
153 const BACKFILL_BATCH: u32 = 100;
154 let resp = self
155 .client
156 .list_messages(None, Some(page_token), BACKFILL_BATCH)
157 .await
158 .map_err(MxrError::from)?;
159
160 let refs = resp.messages.unwrap_or_default();
161 if refs.is_empty() {
162 return Ok(SyncBatch {
163 upserted: vec![],
164 deleted_provider_ids: vec![],
165 label_changes: vec![],
166 next_cursor: SyncCursor::Gmail { history_id },
167 });
168 }
169
170 let ids: Vec<String> = refs.iter().map(|r| r.id.clone()).collect();
171 debug!("Backfill: fetching {} messages (full)", ids.len());
172 let messages = self
173 .client
174 .batch_get_messages(&ids, MessageFormat::Full)
175 .await
176 .map_err(MxrError::from)?;
177
178 let mut synced = Vec::new();
179 for msg in &messages {
180 match gmail_message_to_envelope(msg, &self.account_id) {
181 Ok(env) => {
182 let body = extract_message_body(msg);
183 synced.push(SyncedMessage { envelope: env, body });
184 }
185 Err(e) => {
186 warn!(msg_id = %msg.id, error = %e, "Failed to parse message in backfill")
187 }
188 }
189 }
190
191 let has_more = resp.next_page_token.is_some();
192 let next_cursor = match resp.next_page_token {
193 Some(token) => SyncCursor::GmailBackfill {
194 history_id,
195 page_token: token,
196 },
197 None => SyncCursor::Gmail { history_id },
198 };
199
200 tracing::info!(
201 fetched = synced.len(),
202 has_more,
203 "Backfill batch complete"
204 );
205
206 Ok(SyncBatch {
207 upserted: synced,
208 deleted_provider_ids: vec![],
209 label_changes: vec![],
210 next_cursor,
211 })
212 }
213
214 async fn delta_sync(&self, history_id: u64) -> Result<SyncBatch, MxrError> {
215 debug!(
216 history_id,
217 "Starting delta sync for account {}", self.account_id
218 );
219
220 let mut upserted_ids = std::collections::HashSet::new();
221 let mut deleted_ids = Vec::new();
222 let mut label_changes = Vec::new();
223 let mut latest_history_id = history_id;
224 let mut page_token: Option<String> = None;
225
226 loop {
227 let resp = self
228 .client
229 .list_history(history_id, page_token.as_deref())
230 .await
231 .map_err(MxrError::from)?;
232
233 if let Some(ref hid) = resp.history_id {
234 if let Ok(h) = hid.parse::<u64>() {
235 latest_history_id = latest_history_id.max(h);
236 }
237 }
238
239 let records = resp.history.unwrap_or_default();
240 for record in records {
241 if let Some(added) = record.messages_added {
243 for a in added {
244 upserted_ids.insert(a.message.id);
245 }
246 }
247
248 if let Some(deleted) = record.messages_deleted {
250 for d in deleted {
251 deleted_ids.push(d.message.id);
252 }
253 }
254
255 if let Some(label_added) = record.labels_added {
257 for la in label_added {
258 label_changes.push(LabelChange {
259 provider_message_id: la.message.id,
260 added_labels: la.label_ids.unwrap_or_default(),
261 removed_labels: vec![],
262 });
263 }
264 }
265
266 if let Some(label_removed) = record.labels_removed {
268 for lr in label_removed {
269 label_changes.push(LabelChange {
270 provider_message_id: lr.message.id,
271 added_labels: vec![],
272 removed_labels: lr.label_ids.unwrap_or_default(),
273 });
274 }
275 }
276 }
277
278 match resp.next_page_token {
279 Some(token) => page_token = Some(token),
280 None => break,
281 }
282 }
283
284 let ids_to_fetch: Vec<String> = upserted_ids.into_iter().collect();
286 let mut synced = Vec::new();
287
288 if !ids_to_fetch.is_empty() {
289 let messages = self
290 .client
291 .batch_get_messages(&ids_to_fetch, MessageFormat::Full)
292 .await
293 .map_err(MxrError::from)?;
294
295 for msg in &messages {
296 match gmail_message_to_envelope(msg, &self.account_id) {
297 Ok(env) => {
298 let body = extract_message_body(msg);
299 synced.push(SyncedMessage { envelope: env, body });
300 }
301 Err(e) => warn!(msg_id = %msg.id, error = %e, "Failed to parse message"),
302 }
303 }
304 }
305
306 Ok(SyncBatch {
307 upserted: synced,
308 deleted_provider_ids: deleted_ids,
309 label_changes,
310 next_cursor: SyncCursor::Gmail {
311 history_id: latest_history_id,
312 },
313 })
314 }
315}
316
317#[async_trait]
318impl MailSyncProvider for GmailProvider {
319 fn name(&self) -> &str {
320 "gmail"
321 }
322
323 fn account_id(&self) -> &AccountId {
324 &self.account_id
325 }
326
327 fn capabilities(&self) -> SyncCapabilities {
328 SyncCapabilities {
329 labels: true,
330 server_search: true,
331 delta_sync: true,
332 push: false, batch_operations: true,
334 native_thread_ids: true,
335 }
336 }
337
338 async fn authenticate(&mut self) -> mxr_core::provider::Result<()> {
339 Ok(())
341 }
342
343 async fn refresh_auth(&mut self) -> mxr_core::provider::Result<()> {
344 Ok(())
346 }
347
348 async fn sync_labels(&self) -> mxr_core::provider::Result<Vec<Label>> {
349 let resp = self.client.list_labels().await.map_err(MxrError::from)?;
350
351 let gmail_labels = resp.labels.unwrap_or_default();
352 let mut labels = Vec::with_capacity(gmail_labels.len());
353
354 for gl in gmail_labels {
355 labels.push(self.map_label(gl));
356 }
357
358 Ok(labels)
359 }
360
361 async fn sync_messages(&self, cursor: &SyncCursor) -> mxr_core::provider::Result<SyncBatch> {
362 match cursor {
363 SyncCursor::Initial => self.initial_sync().await,
364 SyncCursor::Gmail { history_id } => self.delta_sync(*history_id).await,
365 SyncCursor::GmailBackfill {
366 history_id,
367 page_token,
368 } => self.backfill_sync(*history_id, page_token).await,
369 other => Err(MxrError::Provider(format!(
370 "Gmail provider received incompatible cursor: {other:?}"
371 ))),
372 }
373 }
374
375 async fn fetch_attachment(
376 &self,
377 provider_message_id: &str,
378 provider_attachment_id: &str,
379 ) -> mxr_core::provider::Result<Vec<u8>> {
380 self.client
381 .get_attachment(provider_message_id, provider_attachment_id)
382 .await
383 .map_err(MxrError::from)
384 }
385
386 async fn modify_labels(
387 &self,
388 provider_message_id: &str,
389 add: &[String],
390 remove: &[String],
391 ) -> mxr_core::provider::Result<()> {
392 let add_refs: Vec<&str> = add.iter().map(|s| s.as_str()).collect();
393 let remove_refs: Vec<&str> = remove.iter().map(|s| s.as_str()).collect();
394 self.client
395 .modify_message(provider_message_id, &add_refs, &remove_refs)
396 .await
397 .map_err(MxrError::from)
398 }
399
400 async fn create_label(&self, name: &str, color: Option<&str>) -> mxr_core::provider::Result<Label> {
401 let label = self
402 .client
403 .create_label(name, color)
404 .await
405 .map_err(MxrError::from)?;
406 Ok(self.map_label(label))
407 }
408
409 async fn rename_label(
410 &self,
411 provider_label_id: &str,
412 new_name: &str,
413 ) -> mxr_core::provider::Result<Label> {
414 let label = self
415 .client
416 .rename_label(provider_label_id, new_name)
417 .await
418 .map_err(MxrError::from)?;
419 Ok(self.map_label(label))
420 }
421
422 async fn delete_label(&self, provider_label_id: &str) -> mxr_core::provider::Result<()> {
423 self.client
424 .delete_label(provider_label_id)
425 .await
426 .map_err(MxrError::from)
427 }
428
429 async fn trash(&self, provider_message_id: &str) -> mxr_core::provider::Result<()> {
430 self.client
431 .trash_message(provider_message_id)
432 .await
433 .map_err(MxrError::from)
434 }
435
436 async fn set_read(
437 &self,
438 provider_message_id: &str,
439 read: bool,
440 ) -> mxr_core::provider::Result<()> {
441 if read {
442 self.client
443 .modify_message(provider_message_id, &[], &["UNREAD"])
444 .await
445 .map_err(MxrError::from)
446 } else {
447 self.client
448 .modify_message(provider_message_id, &["UNREAD"], &[])
449 .await
450 .map_err(MxrError::from)
451 }
452 }
453
454 async fn set_starred(
455 &self,
456 provider_message_id: &str,
457 starred: bool,
458 ) -> mxr_core::provider::Result<()> {
459 if starred {
460 self.client
461 .modify_message(provider_message_id, &["STARRED"], &[])
462 .await
463 .map_err(MxrError::from)
464 } else {
465 self.client
466 .modify_message(provider_message_id, &[], &["STARRED"])
467 .await
468 .map_err(MxrError::from)
469 }
470 }
471
472 async fn search_remote(&self, query: &str) -> mxr_core::provider::Result<Vec<String>> {
473 let resp = self
474 .client
475 .list_messages(Some(query), None, 100)
476 .await
477 .map_err(MxrError::from)?;
478
479 let ids = resp
480 .messages
481 .unwrap_or_default()
482 .into_iter()
483 .map(|m| m.id)
484 .collect();
485
486 Ok(ids)
487 }
488}
489
490#[async_trait]
491impl MailSendProvider for GmailProvider {
492 fn name(&self) -> &str {
493 "gmail"
494 }
495
496 async fn send(&self, draft: &Draft, from: &Address) -> mxr_core::provider::Result<SendReceipt> {
497 let rfc2822 = send::build_rfc2822(draft, from)
498 .map_err(|e| MxrError::Provider(e.to_string()))?;
499 let encoded = send::encode_for_gmail(&rfc2822);
500
501 let result = self
502 .client
503 .send_message(&encoded)
504 .await
505 .map_err(MxrError::from)?;
506
507 let message_id = result["id"].as_str().map(|s| s.to_string());
508
509 Ok(SendReceipt {
510 provider_message_id: message_id,
511 sent_at: chrono::Utc::now(),
512 })
513 }
514
515 async fn save_draft(
516 &self,
517 draft: &Draft,
518 from: &Address,
519 ) -> mxr_core::provider::Result<Option<String>> {
520 let rfc2822 = send::build_rfc2822(draft, from)
521 .map_err(|e| MxrError::Provider(e.to_string()))?;
522 let encoded = send::encode_for_gmail(&rfc2822);
523
524 let draft_id = self
525 .client
526 .create_draft(&encoded)
527 .await
528 .map_err(MxrError::from)?;
529
530 Ok(Some(draft_id))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use crate::error::GmailError;
538 use crate::types::*;
539 use std::collections::HashMap;
540 use std::sync::Mutex;
541 use serde_json::json;
542 struct MockGmailApi {
543 messages: HashMap<String, GmailMessage>,
544 labels: Vec<GmailLabel>,
545 modified: Mutex<Vec<String>>,
546 }
547
548 #[async_trait]
549 impl GmailApi for MockGmailApi {
550 async fn list_messages(
551 &self,
552 _query: Option<&str>,
553 page_token: Option<&str>,
554 _max_results: u32,
555 ) -> Result<GmailListResponse, GmailError> {
556 Ok(match page_token {
557 Some("page-2") => GmailListResponse {
558 messages: Some(vec![GmailMessageRef {
559 id: "msg-backfill".into(),
560 thread_id: "thread-backfill".into(),
561 }]),
562 next_page_token: None,
563 result_size_estimate: Some(3),
564 },
565 _ => GmailListResponse {
566 messages: Some(vec![
567 GmailMessageRef {
568 id: "msg-1".into(),
569 thread_id: "thread-1".into(),
570 },
571 GmailMessageRef {
572 id: "msg-attach".into(),
573 thread_id: "thread-attach".into(),
574 },
575 ]),
576 next_page_token: Some("page-2".into()),
577 result_size_estimate: Some(3),
578 },
579 })
580 }
581
582 async fn batch_get_messages(
583 &self,
584 message_ids: &[String],
585 _format: MessageFormat,
586 ) -> Result<Vec<GmailMessage>, GmailError> {
587 Ok(message_ids
588 .iter()
589 .filter_map(|id| self.messages.get(id).cloned())
590 .collect())
591 }
592
593 async fn list_history(
594 &self,
595 _start_history_id: u64,
596 _page_token: Option<&str>,
597 ) -> Result<GmailHistoryResponse, GmailError> {
598 Ok(GmailHistoryResponse {
599 history: Some(vec![GmailHistoryRecord {
600 id: "23".into(),
601 messages: None,
602 messages_added: Some(vec![GmailHistoryMessageAdded {
603 message: GmailMessageRef {
604 id: "msg-3".into(),
605 thread_id: "thread-3".into(),
606 },
607 }]),
608 messages_deleted: Some(vec![GmailHistoryMessageDeleted {
609 message: GmailMessageRef {
610 id: "msg-1".into(),
611 thread_id: "thread-1".into(),
612 },
613 }]),
614 labels_added: Some(vec![GmailHistoryLabelAdded {
615 message: GmailMessageRef {
616 id: "msg-attach".into(),
617 thread_id: "thread-attach".into(),
618 },
619 label_ids: Some(vec!["STARRED".into()]),
620 }]),
621 labels_removed: None,
622 }]),
623 next_page_token: None,
624 history_id: Some("23".into()),
625 })
626 }
627
628 async fn modify_message(
629 &self,
630 message_id: &str,
631 _add_labels: &[&str],
632 _remove_labels: &[&str],
633 ) -> Result<(), GmailError> {
634 self.modified.lock().unwrap().push(message_id.to_string());
635 Ok(())
636 }
637
638 async fn trash_message(&self, message_id: &str) -> Result<(), GmailError> {
639 self.modified
640 .lock()
641 .unwrap()
642 .push(format!("trash:{message_id}"));
643 Ok(())
644 }
645
646 async fn send_message(&self, _raw_base64url: &str) -> Result<serde_json::Value, GmailError> {
647 Ok(json!({"id": "sent-1"}))
648 }
649
650 async fn get_attachment(
651 &self,
652 _message_id: &str,
653 _attachment_id: &str,
654 ) -> Result<Vec<u8>, GmailError> {
655 Ok(b"Hello".to_vec())
656 }
657
658 async fn create_draft(&self, _raw_base64url: &str) -> Result<String, GmailError> {
659 Ok("draft-1".into())
660 }
661
662 async fn list_labels(&self) -> Result<GmailLabelsResponse, GmailError> {
663 Ok(GmailLabelsResponse {
664 labels: Some(self.labels.clone()),
665 })
666 }
667
668 async fn create_label(
669 &self,
670 name: &str,
671 color: Option<&str>,
672 ) -> Result<GmailLabel, GmailError> {
673 Ok(GmailLabel {
674 id: "Label_2".into(),
675 name: name.into(),
676 label_type: Some("user".into()),
677 messages_total: Some(0),
678 messages_unread: Some(0),
679 color: color.map(|color| GmailLabelColor {
680 text_color: Some("#000000".into()),
681 background_color: Some(color.into()),
682 }),
683 })
684 }
685
686 async fn rename_label(&self, label_id: &str, new_name: &str) -> Result<GmailLabel, GmailError> {
687 Ok(GmailLabel {
688 id: label_id.into(),
689 name: new_name.into(),
690 label_type: Some("user".into()),
691 messages_total: Some(0),
692 messages_unread: Some(0),
693 color: None,
694 })
695 }
696
697 async fn delete_label(&self, _label_id: &str) -> Result<(), GmailError> {
698 Ok(())
699 }
700 }
701
702 fn gmail_provider() -> GmailProvider {
703 let mut messages = HashMap::new();
704 for message in [
705 serde_json::from_value::<GmailMessage>(gmail_message("msg-1", "thread-1", "Welcome")).unwrap(),
706 serde_json::from_value::<GmailMessage>(gmail_attachment_message()).unwrap(),
707 serde_json::from_value::<GmailMessage>(gmail_message("msg-3", "thread-3", "Delta message")).unwrap(),
708 serde_json::from_value::<GmailMessage>(gmail_message("msg-backfill", "thread-backfill", "Backfill message")).unwrap(),
709 ] {
710 messages.insert(message.id.clone(), message);
711 }
712
713 GmailProvider::with_api(
714 AccountId::new(),
715 Box::new(MockGmailApi {
716 messages,
717 labels: vec![
718 GmailLabel {
719 id: "INBOX".into(),
720 name: "INBOX".into(),
721 label_type: Some("system".into()),
722 messages_total: Some(2),
723 messages_unread: Some(1),
724 color: None,
725 },
726 GmailLabel {
727 id: "Label_1".into(),
728 name: "Projects".into(),
729 label_type: Some("user".into()),
730 messages_total: Some(1),
731 messages_unread: Some(0),
732 color: None,
733 },
734 ],
735 modified: Mutex::new(Vec::new()),
736 }),
737 )
738 }
739
740 fn gmail_message(id: &str, thread_id: &str, subject: &str) -> serde_json::Value {
741 json!({
742 "id": id,
743 "threadId": thread_id,
744 "labelIds": ["INBOX"],
745 "snippet": format!("Snippet for {subject}"),
746 "historyId": "22",
747 "internalDate": "1710495000000",
748 "sizeEstimate": 1024,
749 "payload": {
750 "mimeType": "multipart/mixed",
751 "headers": [
752 {"name": "From", "value": "Alice Example <alice@example.com>"},
753 {"name": "To", "value": "Bob Example <bob@example.com>"},
754 {"name": "Subject", "value": subject},
755 {"name": "Date", "value": "Fri, 15 Mar 2024 09:30:00 +0000"},
756 {"name": "Message-ID", "value": format!("<{id}@example.com>")}
757 ],
758 "parts": [
759 {
760 "mimeType": "text/plain",
761 "body": {"size": 12, "data": "SGVsbG8gd29ybGQ"}
762 },
763 {
764 "mimeType": "text/html",
765 "body": {"size": 33, "data": "PHA-SGVsbG8gd29ybGQ8L3A-"}
766 }
767 ]
768 }
769 })
770 }
771
772 fn gmail_attachment_message() -> serde_json::Value {
773 json!({
774 "id": "msg-attach",
775 "threadId": "thread-attach",
776 "labelIds": ["INBOX", "UNREAD"],
777 "snippet": "Attachment snippet",
778 "historyId": "21",
779 "internalDate": "1710495000000",
780 "sizeEstimate": 2048,
781 "payload": {
782 "mimeType": "multipart/mixed",
783 "headers": [
784 {"name": "From", "value": "Calendar Bot <calendar@example.com>"},
785 {"name": "To", "value": "Bob Example <bob@example.com>"},
786 {"name": "Subject", "value": "Calendar invite"},
787 {"name": "Date", "value": "Fri, 15 Mar 2024 09:30:00 +0000"},
788 {"name": "Message-ID", "value": "<msg-attach@example.com>"},
789 {"name": "List-Unsubscribe", "value": "<https://example.com/unsubscribe>"},
790 {"name": "Authentication-Results", "value": "mx.example.net; dkim=pass"},
791 {"name": "Content-Language", "value": "en"}
792 ],
793 "parts": [
794 {
795 "mimeType": "text/plain",
796 "body": {"size": 16, "data": "QXR0YWNobWVudCBib2R5"}
797 },
798 {
799 "mimeType": "application/pdf",
800 "filename": "report.pdf",
801 "body": {"attachmentId": "att-1", "size": 5}
802 }
803 ]
804 }
805 })
806 }
807
808 #[tokio::test]
809 async fn gmail_provider_passes_sync_and_send_conformance() {
810 let provider = gmail_provider();
811 mxr_provider_fake::conformance::run_sync_conformance(&provider).await;
812 mxr_provider_fake::conformance::run_send_conformance(&provider).await;
813 }
814
815 #[tokio::test]
816 async fn gmail_delta_sync_tracks_history_changes() {
817 let provider = gmail_provider();
818 let batch = provider
819 .sync_messages(&SyncCursor::Gmail { history_id: 22 })
820 .await
821 .unwrap();
822
823 assert_eq!(batch.deleted_provider_ids, vec!["msg-1"]);
824 assert_eq!(batch.label_changes.len(), 1);
825 assert_eq!(batch.upserted.len(), 1);
826 assert_eq!(batch.upserted[0].envelope.provider_id, "msg-3");
827 assert!(matches!(batch.next_cursor, SyncCursor::Gmail { history_id: 23 }));
828 }
829}