1use async_trait::async_trait;
3use sos_account::Account;
4use sos_core::{
5 commit::{CommitHash, CommitProof, CommitTree},
6 events::{
7 patch::{
8 AccountDiff, CheckedPatch, DeviceDiff, Diff, FolderDiff, Patch,
9 },
10 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
11 WriteEvent,
12 },
13 VaultId,
14};
15use sos_protocol::{
16 AsConflict, ConflictError, DiffRequest, HardConflictResolver,
17 PatchRequest, ScanRequest, SyncClient, SyncOptions,
18};
19use sos_sync::{
20 ForceMerge, MaybeConflict, Merge, MergeOutcome, StorageEventLogs,
21 SyncDirection, SyncStatus,
22};
23use std::collections::HashSet;
24use tracing::instrument;
25
26const PROOF_SCAN_LIMIT: u16 = 32;
27
28#[cfg(feature = "files")]
29use sos_core::events::{patch::FileDiff, FileEvent};
30
31use super::RemoteSyncHandler;
32
33#[doc(hidden)]
35pub enum ScanState {
36 Result((CommitHash, CommitProof)),
37 Continue(ScanRequest),
38 Exhausted,
39}
40
41#[doc(hidden)]
43pub enum AutoMergeStatus {
44 RewindLocal(Vec<EventRecord>),
46 PushRemote(Vec<EventRecord>),
48}
49
50#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
52#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
53pub trait AutoMerge: RemoteSyncHandler {
54 #[doc(hidden)]
59 async fn execute_sync(
60 &self,
61 options: &SyncOptions,
62 ) -> Result<Option<MergeOutcome>, Self::Error> {
63 match self.direction() {
64 SyncDirection::Push => {
65 let exists = self.client().account_exists().await?;
66 if exists {
67 self.perform_sync(options).await
68 } else {
69 self.create_account().await?;
70 Ok(None)
71 }
72 }
73 SyncDirection::Pull => {
74 let exists = {
75 let account = self.account();
76 let account = account.lock().await;
77 account.paths().is_usable().await?
78 };
79 if exists {
80 self.perform_sync(options).await
81 } else {
82 self.create_account().await?;
83 Ok(None)
84 }
85 }
86 }
87 }
88
89 #[doc(hidden)]
90 async fn perform_sync(
91 &self,
92 options: &SyncOptions,
93 ) -> Result<Option<MergeOutcome>, Self::Error> {
94 let sync_status = self.client().sync_status().await?;
95 match self.sync_account(sync_status).await {
96 Ok(outcome) => Ok(Some(outcome)),
97 Err(e) => {
98 if e.is_conflict() {
99 let conflict = e.take_conflict().unwrap();
100 match conflict {
101 ConflictError::Soft {
102 conflict,
103 local,
104 remote,
105 } => {
106 let outcome = self
107 .auto_merge(options, conflict, local, remote)
108 .await?;
109 Ok(Some(outcome))
110 }
111 _ => Err(conflict.into()),
112 }
113 } else {
114 Err(e)
115 }
116 }
117 }
118 }
119
120 #[doc(hidden)]
121 async fn auto_merge_scan<T>(
122 &self,
123 log_id: &'static str,
124 log_type: EventLogType,
125 ) -> Result<bool, <Self as RemoteSyncHandler>::Error>
126 where
127 T: Default + Send + Sync,
128 {
129 tracing::debug!(log_id);
130
131 let req = ScanRequest {
132 log_type,
133 offset: 0,
134 limit: PROOF_SCAN_LIMIT,
135 };
136 match self.scan_proofs(req).await {
137 Ok(Some((ancestor_commit, proof))) => {
138 self.try_merge_from_ancestor::<T>(
139 EventLogType::Identity,
140 ancestor_commit,
141 proof,
142 )
143 .await?;
144 Ok(false)
145 }
146 Err(e) => {
147 if e.is_hard_conflict() {
148 Ok(true)
149 } else {
150 Err(e)
151 }
152 }
153 _ => Err(ConflictError::Hard.into()),
154 }
155 }
156
157 async fn auto_merge_identity(
159 &self,
160 options: &SyncOptions,
161 outcome: &mut MergeOutcome,
162 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
163 let handle_conflict = self
164 .auto_merge_scan::<WriteEvent>(
165 "auto_merge::identity",
166 EventLogType::Identity,
167 )
168 .await?;
169 if handle_conflict {
170 self.identity_hard_conflict(options, outcome).await?;
171 }
172 Ok(handle_conflict)
173 }
174
175 async fn auto_merge_account(
177 &self,
178 options: &SyncOptions,
179 outcome: &mut MergeOutcome,
180 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
181 let handle_conflict = self
182 .auto_merge_scan::<AccountEvent>(
183 "auto_merge::account",
184 EventLogType::Account,
185 )
186 .await?;
187 if handle_conflict {
188 self.account_hard_conflict(options, outcome).await?;
189 }
190 Ok(handle_conflict)
191 }
192
193 async fn auto_merge_device(
195 &self,
196 options: &SyncOptions,
197 outcome: &mut MergeOutcome,
198 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
199 let handle_conflict = self
200 .auto_merge_scan::<DeviceEvent>(
201 "auto_merge::device",
202 EventLogType::Device,
203 )
204 .await?;
205 if handle_conflict {
206 self.device_hard_conflict(options, outcome).await?;
207 }
208 Ok(handle_conflict)
209 }
210
211 #[cfg(feature = "files")]
213 async fn auto_merge_files(
214 &self,
215 options: &SyncOptions,
216 outcome: &mut MergeOutcome,
217 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
218 let handle_conflict = self
219 .auto_merge_scan::<FileEvent>(
220 "auto_merge::files",
221 EventLogType::Files,
222 )
223 .await?;
224 if handle_conflict {
225 self.files_hard_conflict(options, outcome).await?;
226 }
227 Ok(handle_conflict)
228 }
229
230 #[doc(hidden)]
231 async fn hard_conflict_diff<EventType>(
232 &self,
233 log_id: &'static str,
234 log_type: EventLogType,
235 options: &SyncOptions,
236 ) -> Result<Diff<EventType>, <Self as RemoteSyncHandler>::Error> {
237 match &options.hard_conflict_resolver {
238 HardConflictResolver::AutomaticFetch => {
239 tracing::debug!(log_id);
240
241 let request = DiffRequest {
242 log_type,
243 from_hash: None,
244 };
245 let response = self.client().diff(request).await?;
246 let patch = Patch::<EventType>::new(response.patch);
247 let diff =
248 Diff::<EventType>::new(patch, response.checkpoint, None);
249 Ok(diff)
250 }
251 }
252 }
253
254 #[doc(hidden)]
255 async fn identity_hard_conflict(
256 &self,
257 options: &SyncOptions,
258 outcome: &mut MergeOutcome,
259 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
260 let diff = self
261 .hard_conflict_diff::<WriteEvent>(
262 "hard_conflict::force_merge::identity",
263 EventLogType::Identity,
264 options,
265 )
266 .await?;
267
268 let account = self.account();
269 let mut account = account.lock().await;
270 Ok(account.force_merge_identity(diff, outcome).await?)
271 }
272
273 #[doc(hidden)]
274 async fn account_hard_conflict(
275 &self,
276 options: &SyncOptions,
277 outcome: &mut MergeOutcome,
278 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
279 let diff = self
280 .hard_conflict_diff::<AccountEvent>(
281 "hard_conflict::force_merge::account",
282 EventLogType::Account,
283 options,
284 )
285 .await?;
286
287 let account = self.account();
288 let mut account = account.lock().await;
289 Ok(account.force_merge_account(diff, outcome).await?)
290 }
291
292 #[doc(hidden)]
293 async fn device_hard_conflict(
294 &self,
295 options: &SyncOptions,
296 outcome: &mut MergeOutcome,
297 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
298 let diff = self
299 .hard_conflict_diff::<DeviceEvent>(
300 "hard_conflict::force_merge::device",
301 EventLogType::Device,
302 options,
303 )
304 .await?;
305
306 let account = self.account();
307 let mut account = account.lock().await;
308 Ok(account.force_merge_device(diff, outcome).await?)
309 }
310
311 #[doc(hidden)]
312 #[cfg(feature = "files")]
313 async fn files_hard_conflict(
314 &self,
315 options: &SyncOptions,
316 outcome: &mut MergeOutcome,
317 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
318 let diff = self
319 .hard_conflict_diff::<FileEvent>(
320 "hard_conflict::force_merge::files",
321 EventLogType::Files,
322 options,
323 )
324 .await?;
325
326 let account = self.account();
327 let mut account = account.lock().await;
328 Ok(account.force_merge_files(diff, outcome).await?)
329 }
330
331 #[instrument(skip_all)]
340 async fn auto_merge(
341 &self,
342 options: &SyncOptions,
343 conflict: MaybeConflict,
344 local: SyncStatus,
345 _remote: SyncStatus,
346 ) -> Result<MergeOutcome, <Self as RemoteSyncHandler>::Error> {
347 let mut force_merge_outcome = MergeOutcome::default();
348 let mut has_hard_conflict = false;
349
350 if conflict.identity {
351 let hard_conflict = self
352 .auto_merge_identity(options, &mut force_merge_outcome)
353 .await?;
354
355 if hard_conflict {
356 tracing::warn!("hard_conflict::identity");
357 }
358
359 has_hard_conflict = has_hard_conflict || hard_conflict;
360 }
361
362 if conflict.account {
363 let hard_conflict = self
364 .auto_merge_account(options, &mut force_merge_outcome)
365 .await?;
366
367 if hard_conflict {
368 tracing::warn!("hard_conflict::account");
369 }
370
371 has_hard_conflict = has_hard_conflict || hard_conflict;
372 }
373
374 if conflict.device {
375 let hard_conflict = self
376 .auto_merge_device(options, &mut force_merge_outcome)
377 .await?;
378
379 if hard_conflict {
380 tracing::warn!("hard_conflict::device");
381 }
382
383 has_hard_conflict = has_hard_conflict || hard_conflict;
384 }
385
386 #[cfg(feature = "files")]
387 if conflict.files {
388 let hard_conflict = self
389 .auto_merge_files(options, &mut force_merge_outcome)
390 .await?;
391
392 if hard_conflict {
393 tracing::warn!("hard_conflict::files");
394 }
395
396 has_hard_conflict = has_hard_conflict || hard_conflict;
397 }
398
399 for (folder_id, _) in &conflict.folders {
400 let hard_conflict = self
401 .auto_merge_folder(
402 options,
403 &local,
404 folder_id,
405 &mut force_merge_outcome,
406 )
407 .await?;
408
409 if hard_conflict {
410 tracing::warn!(
411 folder_id = %folder_id,
412 "hard_conflict::folder",
413 );
414 }
415
416 has_hard_conflict = has_hard_conflict || hard_conflict;
417 }
418
419 Ok(force_merge_outcome)
433 }
434
435 async fn auto_merge_folder(
437 &self,
438 options: &SyncOptions,
439 _local_status: &SyncStatus,
440 folder_id: &VaultId,
441 outcome: &mut MergeOutcome,
442 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
443 tracing::debug!(folder_id = %folder_id, "auto_merge::folder");
444
445 let req = ScanRequest {
446 log_type: EventLogType::Folder(*folder_id),
447 offset: 0,
448 limit: PROOF_SCAN_LIMIT,
449 };
450 match self.scan_proofs(req).await {
451 Ok(Some((ancestor_commit, proof))) => {
452 self.try_merge_from_ancestor::<WriteEvent>(
453 EventLogType::Folder(*folder_id),
454 ancestor_commit,
455 proof,
456 )
457 .await?;
458 Ok(false)
459 }
460 Err(e) => {
461 if e.is_hard_conflict() {
462 self.folder_hard_conflict(folder_id, options, outcome)
463 .await?;
464 Ok(true)
465 } else {
466 Err(e)
467 }
468 }
469 _ => Err(ConflictError::Hard.into()),
470 }
471 }
472
473 #[doc(hidden)]
474 async fn folder_hard_conflict(
475 &self,
476 folder_id: &VaultId,
477 options: &SyncOptions,
478 outcome: &mut MergeOutcome,
479 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
480 match &options.hard_conflict_resolver {
481 HardConflictResolver::AutomaticFetch => {
482 let request = DiffRequest {
483 log_type: EventLogType::Folder(*folder_id),
484 from_hash: None,
485 };
486 let response = self.client().diff(request).await?;
487 let patch = Patch::<WriteEvent>::new(response.patch);
488 let diff = FolderDiff {
489 patch,
490 checkpoint: response.checkpoint,
491 last_commit: None,
492 };
493 let account = self.account();
494 let mut account = account.lock().await;
495 Ok(account
496 .force_merge_folder(folder_id, diff, outcome)
497 .await?)
498 }
499 }
500 }
501
502 #[doc(hidden)]
504 async fn try_merge_from_ancestor<T>(
505 &self,
506 log_type: EventLogType,
507 commit: CommitHash,
508 proof: CommitProof,
509 ) -> Result<(), <Self as RemoteSyncHandler>::Error>
510 where
511 T: Default + Send + Sync,
512 {
513 tracing::debug!(commit = %commit, "auto_merge::try_merge_from_ancestor");
514
515 let local_patch = {
517 let account = self.account();
518 let account = account.lock().await;
519 match &log_type {
520 EventLogType::Identity => {
521 let log = account.identity_log().await?;
522 let event_log = log.read().await;
523 event_log.diff_records(Some(&commit)).await?
524 }
525 EventLogType::Account => {
526 let log = account.account_log().await?;
527 let event_log = log.read().await;
528 event_log.diff_records(Some(&commit)).await?
529 }
530 EventLogType::Device => {
531 let log = account.device_log().await?;
532 let event_log = log.read().await;
533 event_log.diff_records(Some(&commit)).await?
534 }
535 #[cfg(feature = "files")]
536 EventLogType::Files => {
537 let log = account.file_log().await?;
538 let event_log = log.read().await;
539 event_log.diff_records(Some(&commit)).await?
540 }
541 EventLogType::Folder(id) => {
542 let log = account.folder_log(id).await?;
543 let event_log = log.read().await;
544 event_log.diff_records(Some(&commit)).await?
545 }
546 }
547 };
548
549 let request = DiffRequest {
551 log_type,
552 from_hash: Some(commit),
553 };
554 let remote_patch = self.client().diff(request).await?.patch;
555
556 let result = self.merge_patches(local_patch, remote_patch).await?;
557
558 match result {
559 AutoMergeStatus::RewindLocal(events) => {
560 let local_patch = self
561 .rewind_local(&log_type, commit, proof, events)
562 .await?;
563
564 let success = matches!(local_patch, CheckedPatch::Success(_));
565
566 if success {
567 tracing::info!("auto_merge::rewind_local::success");
568 }
569 }
570 AutoMergeStatus::PushRemote(events) => {
571 let (remote_patch, local_patch) = self
572 .push_remote::<T>(&log_type, commit, proof, events)
573 .await?;
574
575 let success =
576 matches!(remote_patch, CheckedPatch::Success(_))
577 && matches!(
578 local_patch,
579 Some(CheckedPatch::Success(_))
580 );
581
582 if success {
583 tracing::info!("auto_merge::push_remote::success");
584 }
585 }
586 }
587
588 Ok(())
589 }
590
591 #[doc(hidden)]
592 async fn merge_patches(
593 &self,
594 mut local: Vec<EventRecord>,
595 remote: Vec<EventRecord>,
596 ) -> Result<AutoMergeStatus, <Self as RemoteSyncHandler>::Error> {
597 tracing::info!(
598 local_len = local.len(),
599 remote_len = remote.len(),
600 "auto_merge::merge_patches",
601 );
602
603 let local_commits =
604 local.iter().map(|r| r.commit()).collect::<HashSet<_>>();
605 let remote_commits =
606 remote.iter().map(|r| r.commit()).collect::<HashSet<_>>();
607
608 if local_commits.is_subset(&remote_commits) {
615 return Ok(AutoMergeStatus::RewindLocal(remote));
616 }
617
618 local.extend(remote.into_iter());
620
621 local.sort_by(|a, b| a.time().cmp(b.time()));
623
624 Ok(AutoMergeStatus::PushRemote(local))
625 }
626
627 #[doc(hidden)]
629 async fn rewind_local(
630 &self,
631 log_type: &EventLogType,
632 commit: CommitHash,
633 proof: CommitProof,
634 events: Vec<EventRecord>,
635 ) -> Result<CheckedPatch, <Self as RemoteSyncHandler>::Error> {
636 tracing::debug!(
637 log_type = ?log_type,
638 commit = %commit,
639 length = %events.len(),
640 "auto_merge::rewind_local",
641 );
642
643 let records = self.rewind_event_log(log_type, &commit).await?;
645
646 let mut outcome = MergeOutcome::default();
647
648 let checked_patch = {
650 let account = self.account();
651 let mut account = account.lock().await;
652 match &log_type {
653 EventLogType::Identity => {
654 let patch = Patch::<WriteEvent>::new(events);
655 let diff = FolderDiff {
656 last_commit: Some(commit),
657 checkpoint: proof,
658 patch,
659 };
660 account.merge_identity(diff, &mut outcome).await?
661 }
662 EventLogType::Account => {
663 let patch = Patch::<AccountEvent>::new(events);
664 let diff = AccountDiff {
665 last_commit: Some(commit),
666 checkpoint: proof,
667 patch,
668 };
669 account.merge_account(diff, &mut outcome).await?.0
670 }
671 EventLogType::Device => {
672 let patch = Patch::<DeviceEvent>::new(events);
673 let diff = DeviceDiff {
674 last_commit: Some(commit),
675 checkpoint: proof,
676 patch,
677 };
678 account.merge_device(diff, &mut outcome).await?
679 }
680 #[cfg(feature = "files")]
681 EventLogType::Files => {
682 let patch = Patch::<FileEvent>::new(events);
683 let diff = FileDiff {
684 last_commit: Some(commit),
685 checkpoint: proof,
686 patch,
687 };
688 account.merge_files(diff, &mut outcome).await?
689 }
690 EventLogType::Folder(id) => {
691 let patch = Patch::<WriteEvent>::new(events);
692 let diff = FolderDiff {
693 last_commit: Some(commit),
694 checkpoint: proof,
695 patch,
696 };
697 account.merge_folder(id, diff, &mut outcome).await?.0
698 }
699 }
700 };
701
702 if let CheckedPatch::Conflict { head, .. } = &checked_patch {
703 tracing::warn!(
704 head = ?head,
705 num_records = ?records.len(),
706 "auto_merge::rollback_rewind");
707
708 self.rollback_rewind(log_type, records).await?;
709 }
710
711 Ok(checked_patch)
712 }
713
714 #[doc(hidden)]
715 async fn rollback_rewind(
716 &self,
717 log_type: &EventLogType,
718 records: Vec<EventRecord>,
719 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
720 let account = self.account();
721 let account = account.lock().await;
722 match log_type {
723 EventLogType::Identity => {
724 let log = account.identity_log().await?;
725 let mut event_log = log.write().await;
726 event_log.apply_records(records).await?;
727 }
728 EventLogType::Account => {
729 let log = account.account_log().await?;
730 let mut event_log = log.write().await;
731 event_log.apply_records(records).await?;
732 }
733 EventLogType::Device => {
734 let log = account.device_log().await?;
735 let mut event_log = log.write().await;
736 event_log.apply_records(records).await?;
737 }
738 #[cfg(feature = "files")]
739 EventLogType::Files => {
740 let log = account.file_log().await?;
741 let mut event_log = log.write().await;
742 event_log.apply_records(records).await?;
743 }
744 EventLogType::Folder(id) => {
745 let log = account.folder_log(id).await?;
746 let mut event_log = log.write().await;
747 event_log.apply_records(records).await?;
748 }
749 }
750
751 Ok(())
752 }
753
754 #[doc(hidden)]
756 async fn push_remote<T>(
757 &self,
758 log_type: &EventLogType,
759 commit: CommitHash,
760 proof: CommitProof,
761 events: Vec<EventRecord>,
762 ) -> Result<
763 (CheckedPatch, Option<CheckedPatch>),
764 <Self as RemoteSyncHandler>::Error,
765 >
766 where
767 T: Default + Send + Sync,
768 {
769 tracing::debug!(
770 log_type = ?log_type,
771 commit = %commit,
772 length = %events.len(),
773 "auto_merge::push_remote",
774 );
775
776 let req = PatchRequest {
777 log_type: *log_type,
778 commit: Some(commit),
779 proof: proof.clone(),
780 patch: events.clone(),
781 };
782
783 let remote_patch = self.client().patch(req).await?.checked_patch;
784 let local_patch = match &remote_patch {
785 CheckedPatch::Success(_) => {
786 let local_patch = self
787 .rewind_local(log_type, commit, proof, events)
788 .await?;
789 Some(local_patch)
790 }
791 CheckedPatch::Conflict { head, contains } => {
792 tracing::error!(
793 head = ?head,
794 contains = ?contains,
795 "auto_merge::patch::conflict",
796 );
797 None
798 }
799 };
800
801 Ok((remote_patch, local_patch))
802 }
803
804 #[doc(hidden)]
806 async fn rewind_event_log(
807 &self,
808 log_type: &EventLogType,
809 commit: &CommitHash,
810 ) -> Result<Vec<EventRecord>, <Self as RemoteSyncHandler>::Error> {
811 tracing::debug!(
812 log_type = ?log_type,
813 commit = %commit,
814 "automerge::rewind_event_log",
815 );
816 let account = self.account();
818 let account = account.lock().await;
819 Ok(match &log_type {
820 EventLogType::Identity => {
821 let log = account.identity_log().await?;
822 let mut event_log = log.write().await;
823 event_log.rewind(commit).await?
824 }
825 EventLogType::Account => {
826 let log = account.account_log().await?;
827 let mut event_log = log.write().await;
828 event_log.rewind(commit).await?
829 }
830 EventLogType::Device => {
831 let log = account.device_log().await?;
832 let mut event_log = log.write().await;
833 event_log.rewind(commit).await?
834 }
835 #[cfg(feature = "files")]
836 EventLogType::Files => {
837 let log = account.file_log().await?;
838 let mut event_log = log.write().await;
839 event_log.rewind(commit).await?
840 }
841 EventLogType::Folder(id) => {
842 let log = account.folder_log(id).await?;
843 let mut event_log = log.write().await;
844 event_log.rewind(commit).await?
845 }
846 })
847 }
848
849 #[doc(hidden)]
851 async fn scan_proofs(
852 &self,
853 request: ScanRequest,
854 ) -> Result<
855 Option<(CommitHash, CommitProof)>,
856 <Self as RemoteSyncHandler>::Error,
857 > {
858 tracing::debug!(request = ?request, "auto_merge::scan_proofs");
859
860 let leaves = {
861 let account = self.account();
862 let account = account.lock().await;
863 match &request.log_type {
864 EventLogType::Identity => {
865 let log = account.identity_log().await?;
866 let event_log = log.read().await;
867 event_log.tree().leaves().unwrap_or_default()
868 }
869 EventLogType::Account => {
870 let log = account.account_log().await?;
871 let event_log = log.read().await;
872 event_log.tree().leaves().unwrap_or_default()
873 }
874 EventLogType::Device => {
875 let log = account.device_log().await?;
876 let event_log = log.read().await;
877 event_log.tree().leaves().unwrap_or_default()
878 }
879 #[cfg(feature = "files")]
880 EventLogType::Files => {
881 let log = account.file_log().await?;
882 let event_log = log.read().await;
883 event_log.tree().leaves().unwrap_or_default()
884 }
885 EventLogType::Folder(id) => {
886 let log = account.folder_log(id).await?;
887 let event_log = log.read().await;
888 event_log.tree().leaves().unwrap_or_default()
889 }
890 }
891 };
892
893 let mut req = request.clone();
894 loop {
895 match self.iterate_scan_proofs(req.clone(), &leaves).await? {
896 ScanState::Result(value) => return Ok(Some(value)),
897 ScanState::Continue(scan) => req = scan,
898 ScanState::Exhausted => return Ok(None),
899 }
900 }
901 }
902
903 #[doc(hidden)]
905 async fn iterate_scan_proofs(
906 &self,
907 request: ScanRequest,
908 leaves: &[[u8; 32]],
909 ) -> Result<ScanState, <Self as RemoteSyncHandler>::Error> {
910 tracing::debug!(
911 request = ?request,
912 "auto_merge::iterate_scan_proofs");
913
914 let response = self.client().scan(request.clone()).await?;
915
916 if let Some(first_proof) = &response.first_proof {
920 let (verified, _) = first_proof.verify_leaves(leaves);
921 if !verified {
922 return Err(ConflictError::Hard.into());
923 }
924 }
925
926 if !response.proofs.is_empty() {
927 for proof in response.proofs.iter().rev() {
931 if let Some(commit_hash) = self.compare_proof(proof, leaves) {
934 let index = proof.indices.last().copied().unwrap();
937 let new_leaves = &leaves[0..=index];
938 let mut new_leaves = new_leaves.to_vec();
939 let mut new_tree = CommitTree::new();
940 new_tree.append(&mut new_leaves);
941 new_tree.commit();
942
943 let checkpoint_proof = new_tree.head()?;
944 return Ok(ScanState::Result((
945 commit_hash,
946 checkpoint_proof,
947 )));
948 }
949 }
950
951 let mut req = request;
953 req.offset = response.offset;
954
955 Ok(ScanState::Continue(req))
956 } else {
957 Ok(ScanState::Exhausted)
958 }
959 }
960
961 #[doc(hidden)]
964 fn compare_proof(
965 &self,
966 proof: &CommitProof,
967 leaves: &[[u8; 32]],
968 ) -> Option<CommitHash> {
969 let (verified, leaves) = proof.verify_leaves(leaves);
970
971 tracing::trace!(
972 proof = ?proof,
973 verified = ?verified,
974 "auto_merge::compare_proof",
975 );
976
977 if verified {
978 leaves.last().copied().map(CommitHash)
979 } else {
980 None
981 }
982 }
983}