1use async_trait::async_trait;
3use sos_account::Account;
4use sos_core::{
5 commit::{CommitHash, CommitProof, CommitTree},
6 events::{
7 patch::{
8 AccountDiff, CheckedPatch, DeviceDiff, Diff, FolderDiff, Patch,
9 },
10 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
11 WriteEvent,
12 },
13 VaultId,
14};
15use sos_protocol::{
16 AsConflict, ConflictError, DiffRequest, HardConflictResolver,
17 PatchRequest, ScanRequest, SyncClient, SyncOptions,
18};
19use sos_sync::{
20 ForceMerge, MaybeConflict, Merge, MergeOutcome, StorageEventLogs,
21 SyncDirection, SyncStatus,
22};
23use std::collections::HashSet;
24use tracing::instrument;
25
26const PROOF_SCAN_LIMIT: u16 = 32;
27
28#[cfg(feature = "files")]
29use sos_core::events::{patch::FileDiff, FileEvent};
30
31use super::RemoteSyncHandler;
32
33#[doc(hidden)]
35pub enum ScanState {
36 Result((CommitHash, CommitProof)),
37 Continue(ScanRequest),
38 Exhausted,
39}
40
41#[doc(hidden)]
43pub enum AutoMergeStatus {
44 RewindLocal(Vec<EventRecord>),
46 PushRemote(Vec<EventRecord>),
48}
49
50#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
52#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
53pub trait AutoMerge: RemoteSyncHandler {
54 #[doc(hidden)]
59 async fn execute_sync(
60 &self,
61 options: &SyncOptions,
62 ) -> Result<Option<MergeOutcome>, Self::Error> {
63 match self.direction() {
64 SyncDirection::Push => {
65 let exists = self.client().account_exists().await?;
66 if exists {
67 self.perform_sync(options).await
68 } else {
69 self.create_account().await?;
70 Ok(None)
71 }
72 }
73 SyncDirection::Pull => {
74 let exists = {
75 let account = self.account();
76 let account = account.lock().await;
77 account.paths().is_usable().await?
78 };
79 if exists {
80 self.perform_sync(options).await
81 } else {
82 self.create_account().await?;
83 Ok(None)
84 }
85 }
86 }
87 }
88
89 #[doc(hidden)]
90 async fn perform_sync(
91 &self,
92 options: &SyncOptions,
93 ) -> Result<Option<MergeOutcome>, Self::Error> {
94 let sync_status = self.client().sync_status().await?;
95 match self.sync_account(sync_status).await {
96 Ok(outcome) => Ok(Some(outcome)),
97 Err(e) => {
98 if e.is_conflict() {
99 let conflict = e.take_conflict().unwrap();
100 match conflict {
101 ConflictError::Soft {
102 conflict,
103 local,
104 remote,
105 } => {
106 let outcome = self
107 .auto_merge(options, conflict, local, remote)
108 .await?;
109 Ok(Some(outcome))
110 }
111 _ => Err(conflict.into()),
112 }
113 } else {
114 Err(e)
115 }
116 }
117 }
118 }
119
120 #[doc(hidden)]
121 async fn auto_merge_scan<T>(
122 &self,
123 log_id: &'static str,
124 log_type: EventLogType,
125 ) -> Result<bool, <Self as RemoteSyncHandler>::Error>
126 where
127 T: Default + Send + Sync,
128 {
129 tracing::debug!(log_id);
130
131 let req = ScanRequest {
132 log_type,
133 offset: 0,
134 limit: PROOF_SCAN_LIMIT,
135 };
136 match self.scan_proofs(req).await {
137 Ok(Some((ancestor_commit, proof))) => {
138 self.try_merge_from_ancestor::<T>(
139 EventLogType::Identity,
140 ancestor_commit,
141 proof,
142 )
143 .await?;
144 Ok(false)
145 }
146 Err(e) => {
147 if e.is_hard_conflict() {
148 Ok(true)
149 } else {
150 Err(e)
151 }
152 }
153 _ => Err(ConflictError::Hard.into()),
154 }
155 }
156
157 async fn auto_merge_identity(
159 &self,
160 options: &SyncOptions,
161 outcome: &mut MergeOutcome,
162 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
163 let handle_conflict = self
164 .auto_merge_scan::<WriteEvent>(
165 "auto_merge::identity",
166 EventLogType::Identity,
167 )
168 .await?;
169 if handle_conflict {
170 self.identity_hard_conflict(options, outcome).await?;
171 }
172 Ok(handle_conflict)
173 }
174
175 async fn auto_merge_account(
177 &self,
178 options: &SyncOptions,
179 outcome: &mut MergeOutcome,
180 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
181 let handle_conflict = self
182 .auto_merge_scan::<AccountEvent>(
183 "auto_merge::account",
184 EventLogType::Account,
185 )
186 .await?;
187 if handle_conflict {
188 self.account_hard_conflict(options, outcome).await?;
189 }
190 Ok(handle_conflict)
191 }
192
193 async fn auto_merge_device(
195 &self,
196 options: &SyncOptions,
197 outcome: &mut MergeOutcome,
198 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
199 let handle_conflict = self
200 .auto_merge_scan::<DeviceEvent>(
201 "auto_merge::device",
202 EventLogType::Device,
203 )
204 .await?;
205 if handle_conflict {
206 self.device_hard_conflict(options, outcome).await?;
207 }
208 Ok(handle_conflict)
209 }
210
211 #[cfg(feature = "files")]
213 async fn auto_merge_files(
214 &self,
215 options: &SyncOptions,
216 outcome: &mut MergeOutcome,
217 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
218 let handle_conflict = self
219 .auto_merge_scan::<FileEvent>(
220 "auto_merge::files",
221 EventLogType::Files,
222 )
223 .await?;
224 if handle_conflict {
225 self.files_hard_conflict(options, outcome).await?;
226 }
227 Ok(handle_conflict)
228 }
229
230 #[doc(hidden)]
231 async fn hard_conflict_diff<EventType>(
232 &self,
233 log_id: &'static str,
234 log_type: EventLogType,
235 options: &SyncOptions,
236 ) -> Result<Diff<EventType>, <Self as RemoteSyncHandler>::Error> {
237 match &options.hard_conflict_resolver {
238 HardConflictResolver::AutomaticFetch => {
239 tracing::debug!(log_id);
240
241 let request = DiffRequest {
242 log_type,
243 from_hash: None,
244 };
245 let response = self.client().diff(request).await?;
246 let patch = Patch::<EventType>::new(response.patch);
247 let diff =
248 Diff::<EventType>::new(patch, response.checkpoint, None);
249 Ok(diff)
250 }
251 }
252 }
253
254 #[doc(hidden)]
255 async fn identity_hard_conflict(
256 &self,
257 options: &SyncOptions,
258 outcome: &mut MergeOutcome,
259 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
260 let diff = self
261 .hard_conflict_diff::<WriteEvent>(
262 "hard_conflict::force_merge::identity",
263 EventLogType::Identity,
264 options,
265 )
266 .await?;
267
268 let account = self.account();
269 let mut account = account.lock().await;
270 Ok(account.force_merge_identity(diff, outcome).await?)
271 }
272
273 #[doc(hidden)]
274 async fn account_hard_conflict(
275 &self,
276 options: &SyncOptions,
277 outcome: &mut MergeOutcome,
278 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
279 let diff = self
280 .hard_conflict_diff::<AccountEvent>(
281 "hard_conflict::force_merge::account",
282 EventLogType::Account,
283 options,
284 )
285 .await?;
286
287 let account = self.account();
288 let mut account = account.lock().await;
289 Ok(account.force_merge_account(diff, outcome).await?)
290 }
291
292 #[doc(hidden)]
293 async fn device_hard_conflict(
294 &self,
295 options: &SyncOptions,
296 outcome: &mut MergeOutcome,
297 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
298 let diff = self
299 .hard_conflict_diff::<DeviceEvent>(
300 "hard_conflict::force_merge::device",
301 EventLogType::Device,
302 options,
303 )
304 .await?;
305
306 let account = self.account();
307 let mut account = account.lock().await;
308 Ok(account.force_merge_device(diff, outcome).await?)
309 }
310
311 #[doc(hidden)]
312 #[cfg(feature = "files")]
313 async fn files_hard_conflict(
314 &self,
315 options: &SyncOptions,
316 outcome: &mut MergeOutcome,
317 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
318 let diff = self
319 .hard_conflict_diff::<FileEvent>(
320 "hard_conflict::force_merge::files",
321 EventLogType::Files,
322 options,
323 )
324 .await?;
325
326 let account = self.account();
327 let mut account = account.lock().await;
328 Ok(account.force_merge_files(diff, outcome).await?)
329 }
330
331 #[instrument(skip_all)]
340 async fn auto_merge(
341 &self,
342 options: &SyncOptions,
343 conflict: MaybeConflict,
344 local: SyncStatus,
345 _remote: SyncStatus,
346 ) -> Result<MergeOutcome, <Self as RemoteSyncHandler>::Error> {
347 let mut force_merge_outcome = MergeOutcome::default();
348 let mut has_hard_conflict = false;
349
350 if conflict.identity {
351 let hard_conflict = self
352 .auto_merge_identity(options, &mut force_merge_outcome)
353 .await?;
354
355 if hard_conflict {
356 tracing::warn!("hard_conflict::identity");
357 }
358
359 has_hard_conflict = has_hard_conflict || hard_conflict;
360 }
361
362 if conflict.account {
363 let hard_conflict = self
364 .auto_merge_account(options, &mut force_merge_outcome)
365 .await?;
366
367 if hard_conflict {
368 tracing::warn!("hard_conflict::account");
369 }
370
371 has_hard_conflict = has_hard_conflict || hard_conflict;
372 }
373
374 if conflict.device {
375 let hard_conflict = self
376 .auto_merge_device(options, &mut force_merge_outcome)
377 .await?;
378
379 if hard_conflict {
380 tracing::warn!("hard_conflict::device");
381 }
382
383 has_hard_conflict = has_hard_conflict || hard_conflict;
384 }
385
386 #[cfg(feature = "files")]
387 if conflict.files {
388 let hard_conflict = self
389 .auto_merge_files(options, &mut force_merge_outcome)
390 .await?;
391
392 if hard_conflict {
393 tracing::warn!("hard_conflict::files");
394 }
395
396 has_hard_conflict = has_hard_conflict || hard_conflict;
397 }
398
399 for (folder_id, _) in &conflict.folders {
400 let hard_conflict = self
401 .auto_merge_folder(
402 options,
403 &local,
404 folder_id,
405 &mut force_merge_outcome,
406 )
407 .await?;
408
409 if hard_conflict {
410 tracing::warn!(
411 folder_id = %folder_id,
412 "hard_conflict::folder",
413 );
414 }
415
416 has_hard_conflict = has_hard_conflict || hard_conflict;
417 }
418
419 Ok(force_merge_outcome)
433 }
434
435 async fn auto_merge_folder(
437 &self,
438 options: &SyncOptions,
439 _local_status: &SyncStatus,
440 folder_id: &VaultId,
441 outcome: &mut MergeOutcome,
442 ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
443 tracing::debug!(folder_id = %folder_id, "auto_merge::folder");
444
445 let req = ScanRequest {
446 log_type: EventLogType::Folder(*folder_id),
447 offset: 0,
448 limit: PROOF_SCAN_LIMIT,
449 };
450 match self.scan_proofs(req).await {
451 Ok(Some((ancestor_commit, proof))) => {
452 self.try_merge_from_ancestor::<WriteEvent>(
453 EventLogType::Folder(*folder_id),
454 ancestor_commit,
455 proof,
456 )
457 .await?;
458 Ok(false)
459 }
460 Err(e) => {
461 if e.is_hard_conflict() {
462 self.folder_hard_conflict(folder_id, options, outcome)
463 .await?;
464 Ok(true)
465 } else {
466 Err(e)
467 }
468 }
469 _ => Err(ConflictError::Hard.into()),
470 }
471 }
472
473 #[doc(hidden)]
474 async fn folder_hard_conflict(
475 &self,
476 folder_id: &VaultId,
477 options: &SyncOptions,
478 outcome: &mut MergeOutcome,
479 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
480 match &options.hard_conflict_resolver {
481 HardConflictResolver::AutomaticFetch => {
482 let request = DiffRequest {
483 log_type: EventLogType::Folder(*folder_id),
484 from_hash: None,
485 };
486 let response = self.client().diff(request).await?;
487 let patch = Patch::<WriteEvent>::new(response.patch);
488 let diff = FolderDiff {
489 patch,
490 checkpoint: response.checkpoint,
491 last_commit: None,
492 };
493 let account = self.account();
494 let mut account = account.lock().await;
495 Ok(account
496 .force_merge_folder(folder_id, diff, outcome)
497 .await?)
498 }
499 }
500 }
501
502 #[doc(hidden)]
504 async fn try_merge_from_ancestor<T>(
505 &self,
506 log_type: EventLogType,
507 commit: CommitHash,
508 proof: CommitProof,
509 ) -> Result<(), <Self as RemoteSyncHandler>::Error>
510 where
511 T: Default + Send + Sync,
512 {
513 tracing::debug!(
514 commit = %commit,
515 "auto_merge::try_merge_from_ancestor",
516 );
517
518 let local_patch = {
520 let account = self.account();
521 let account = account.lock().await;
522 match &log_type {
523 EventLogType::Identity => {
524 let log = account.identity_log().await?;
525 let event_log = log.read().await;
526 event_log.diff_records(Some(&commit)).await?
527 }
528 EventLogType::Account => {
529 let log = account.account_log().await?;
530 let event_log = log.read().await;
531 event_log.diff_records(Some(&commit)).await?
532 }
533 EventLogType::Device => {
534 let log = account.device_log().await?;
535 let event_log = log.read().await;
536 event_log.diff_records(Some(&commit)).await?
537 }
538 #[cfg(feature = "files")]
539 EventLogType::Files => {
540 let log = account.file_log().await?;
541 let event_log = log.read().await;
542 event_log.diff_records(Some(&commit)).await?
543 }
544 EventLogType::Folder(id) => {
545 let log = account.folder_log(id).await?;
546 let event_log = log.read().await;
547 event_log.diff_records(Some(&commit)).await?
548 }
549 }
550 };
551
552 let request = DiffRequest {
554 log_type,
555 from_hash: Some(commit),
556 };
557 let remote_patch = self.client().diff(request).await?.patch;
558
559 let result = self.merge_patches(local_patch, remote_patch).await?;
560
561 match result {
562 AutoMergeStatus::RewindLocal(events) => {
563 let local_patch = self
564 .rewind_local(&log_type, commit, proof, events)
565 .await?;
566
567 let success = matches!(local_patch, CheckedPatch::Success(_));
568
569 if success {
570 tracing::info!("auto_merge::rewind_local::success");
571 }
572 }
573 AutoMergeStatus::PushRemote(events) => {
574 let (remote_patch, local_patch) = self
575 .push_remote::<T>(&log_type, commit, proof, events)
576 .await?;
577
578 let success =
579 matches!(remote_patch, CheckedPatch::Success(_))
580 && matches!(
581 local_patch,
582 Some(CheckedPatch::Success(_))
583 );
584
585 if success {
586 tracing::info!("auto_merge::push_remote::success");
587 }
588 }
589 }
590
591 Ok(())
592 }
593
594 #[doc(hidden)]
595 async fn merge_patches(
596 &self,
597 mut local: Vec<EventRecord>,
598 remote: Vec<EventRecord>,
599 ) -> Result<AutoMergeStatus, <Self as RemoteSyncHandler>::Error> {
600 tracing::info!(
601 local_len = local.len(),
602 remote_len = remote.len(),
603 "auto_merge::merge_patches",
604 );
605
606 let local_commits =
607 local.iter().map(|r| r.commit()).collect::<HashSet<_>>();
608 let remote_commits =
609 remote.iter().map(|r| r.commit()).collect::<HashSet<_>>();
610
611 if local_commits.is_subset(&remote_commits) {
618 return Ok(AutoMergeStatus::RewindLocal(remote));
619 }
620
621 local.extend(remote.into_iter());
623
624 local.sort_by(|a, b| a.time().cmp(b.time()));
626
627 Ok(AutoMergeStatus::PushRemote(local))
628 }
629
630 #[doc(hidden)]
632 async fn rewind_local(
633 &self,
634 log_type: &EventLogType,
635 commit: CommitHash,
636 proof: CommitProof,
637 events: Vec<EventRecord>,
638 ) -> Result<CheckedPatch, <Self as RemoteSyncHandler>::Error> {
639 tracing::debug!(
640 log_type = ?log_type,
641 commit = %commit,
642 length = %events.len(),
643 "auto_merge::rewind_local",
644 );
645
646 let records = self.rewind_event_log(log_type, &commit).await?;
648
649 let mut outcome = MergeOutcome::default();
650
651 let checked_patch = {
653 let account = self.account();
654 let mut account = account.lock().await;
655 match &log_type {
656 EventLogType::Identity => {
657 let patch = Patch::<WriteEvent>::new(events);
658 let diff = FolderDiff {
659 last_commit: Some(commit),
660 checkpoint: proof,
661 patch,
662 };
663 account.merge_identity(diff, &mut outcome).await?
664 }
665 EventLogType::Account => {
666 let patch = Patch::<AccountEvent>::new(events);
667 let diff = AccountDiff {
668 last_commit: Some(commit),
669 checkpoint: proof,
670 patch,
671 };
672 account.merge_account(diff, &mut outcome).await?.0
673 }
674 EventLogType::Device => {
675 let patch = Patch::<DeviceEvent>::new(events);
676 let diff = DeviceDiff {
677 last_commit: Some(commit),
678 checkpoint: proof,
679 patch,
680 };
681 account.merge_device(diff, &mut outcome).await?
682 }
683 #[cfg(feature = "files")]
684 EventLogType::Files => {
685 let patch = Patch::<FileEvent>::new(events);
686 let diff = FileDiff {
687 last_commit: Some(commit),
688 checkpoint: proof,
689 patch,
690 };
691 account.merge_files(diff, &mut outcome).await?
692 }
693 EventLogType::Folder(id) => {
694 let patch = Patch::<WriteEvent>::new(events);
695 let diff = FolderDiff {
696 last_commit: Some(commit),
697 checkpoint: proof,
698 patch,
699 };
700
701 account.merge_folder(id, diff, &mut outcome).await?.0
702 }
703 }
704 };
705
706 if let CheckedPatch::Conflict { head, .. } = &checked_patch {
707 tracing::warn!(
708 head = ?head,
709 num_records = ?records.len(),
710 "auto_merge::rollback_rewind");
711
712 self.rollback_rewind(log_type, records).await?;
713 }
714
715 Ok(checked_patch)
716 }
717
718 #[doc(hidden)]
719 async fn rollback_rewind(
720 &self,
721 log_type: &EventLogType,
722 records: Vec<EventRecord>,
723 ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
724 let account = self.account();
725 let account = account.lock().await;
726 match log_type {
727 EventLogType::Identity => {
728 let log = account.identity_log().await?;
729 let mut event_log = log.write().await;
730 event_log.apply_records(records).await?;
731 }
732 EventLogType::Account => {
733 let log = account.account_log().await?;
734 let mut event_log = log.write().await;
735 event_log.apply_records(records).await?;
736 }
737 EventLogType::Device => {
738 let log = account.device_log().await?;
739 let mut event_log = log.write().await;
740 event_log.apply_records(records).await?;
741 }
742 #[cfg(feature = "files")]
743 EventLogType::Files => {
744 let log = account.file_log().await?;
745 let mut event_log = log.write().await;
746 event_log.apply_records(records).await?;
747 }
748 EventLogType::Folder(id) => {
749 let log = account.folder_log(id).await?;
750 let mut event_log = log.write().await;
751 event_log.apply_records(records).await?;
752 }
753 }
754
755 Ok(())
756 }
757
758 #[doc(hidden)]
760 async fn push_remote<T>(
761 &self,
762 log_type: &EventLogType,
763 commit: CommitHash,
764 proof: CommitProof,
765 events: Vec<EventRecord>,
766 ) -> Result<
767 (CheckedPatch, Option<CheckedPatch>),
768 <Self as RemoteSyncHandler>::Error,
769 >
770 where
771 T: Default + Send + Sync,
772 {
773 tracing::debug!(
774 log_type = ?log_type,
775 commit = %commit,
776 length = %events.len(),
777 "auto_merge::push_remote",
778 );
779
780 let req = PatchRequest {
781 log_type: *log_type,
782 commit: Some(commit),
783 proof: proof.clone(),
784 patch: events.clone(),
785 };
786
787 let remote_patch = self.client().patch(req).await?.checked_patch;
788 let local_patch = match &remote_patch {
789 CheckedPatch::Success(_) => {
790 let local_patch = self
791 .rewind_local(log_type, commit, proof, events)
792 .await?;
793 Some(local_patch)
794 }
795 CheckedPatch::Conflict { head, contains } => {
796 tracing::error!(
797 head = ?head,
798 contains = ?contains,
799 "auto_merge::patch::conflict",
800 );
801 None
802 }
803 };
804
805 Ok((remote_patch, local_patch))
806 }
807
808 #[doc(hidden)]
810 async fn rewind_event_log(
811 &self,
812 log_type: &EventLogType,
813 commit: &CommitHash,
814 ) -> Result<Vec<EventRecord>, <Self as RemoteSyncHandler>::Error> {
815 tracing::debug!(
816 log_type = ?log_type,
817 commit = %commit,
818 "automerge::rewind_event_log",
819 );
820 let account = self.account();
822 let account = account.lock().await;
823 Ok(match &log_type {
824 EventLogType::Identity => {
825 let log = account.identity_log().await?;
826 let mut event_log = log.write().await;
827 event_log.rewind(commit).await?
828 }
829 EventLogType::Account => {
830 let log = account.account_log().await?;
831 let mut event_log = log.write().await;
832 event_log.rewind(commit).await?
833 }
834 EventLogType::Device => {
835 let log = account.device_log().await?;
836 let mut event_log = log.write().await;
837 event_log.rewind(commit).await?
838 }
839 #[cfg(feature = "files")]
840 EventLogType::Files => {
841 let log = account.file_log().await?;
842 let mut event_log = log.write().await;
843 event_log.rewind(commit).await?
844 }
845 EventLogType::Folder(id) => {
846 let log = account.folder_log(id).await?;
847 let mut event_log = log.write().await;
848 event_log.rewind(commit).await?
849 }
850 })
851 }
852
853 #[doc(hidden)]
855 async fn scan_proofs(
856 &self,
857 request: ScanRequest,
858 ) -> Result<
859 Option<(CommitHash, CommitProof)>,
860 <Self as RemoteSyncHandler>::Error,
861 > {
862 tracing::debug!(request = ?request, "auto_merge::scan_proofs");
863
864 let leaves = {
865 let account = self.account();
866 let account = account.lock().await;
867 match &request.log_type {
868 EventLogType::Identity => {
869 let log = account.identity_log().await?;
870 let event_log = log.read().await;
871 event_log.tree().leaves().unwrap_or_default()
872 }
873 EventLogType::Account => {
874 let log = account.account_log().await?;
875 let event_log = log.read().await;
876 event_log.tree().leaves().unwrap_or_default()
877 }
878 EventLogType::Device => {
879 let log = account.device_log().await?;
880 let event_log = log.read().await;
881 event_log.tree().leaves().unwrap_or_default()
882 }
883 #[cfg(feature = "files")]
884 EventLogType::Files => {
885 let log = account.file_log().await?;
886 let event_log = log.read().await;
887 event_log.tree().leaves().unwrap_or_default()
888 }
889 EventLogType::Folder(id) => {
890 let log = account.folder_log(id).await?;
891 let event_log = log.read().await;
892 event_log.tree().leaves().unwrap_or_default()
893 }
894 }
895 };
896
897 let mut req = request.clone();
898 loop {
899 match self.iterate_scan_proofs(req.clone(), &leaves).await? {
900 ScanState::Result(value) => return Ok(Some(value)),
901 ScanState::Continue(scan) => req = scan,
902 ScanState::Exhausted => return Ok(None),
903 }
904 }
905 }
906
907 #[doc(hidden)]
909 async fn iterate_scan_proofs(
910 &self,
911 request: ScanRequest,
912 leaves: &[[u8; 32]],
913 ) -> Result<ScanState, <Self as RemoteSyncHandler>::Error> {
914 tracing::debug!(
915 request = ?request,
916 "auto_merge::iterate_scan_proofs");
917
918 let response = self.client().scan(request.clone()).await?;
919
920 if let Some(first_proof) = &response.first_proof {
924 let (verified, _) = first_proof.verify_leaves(leaves);
925 if !verified {
926 return Err(ConflictError::Hard.into());
927 }
928 }
929
930 if !response.proofs.is_empty() {
931 for proof in response.proofs.iter().rev() {
935 if let Some(commit_hash) = self.compare_proof(proof, leaves) {
938 let index = proof.indices.last().copied().unwrap();
941 let new_leaves = &leaves[0..=index];
942 let mut new_leaves = new_leaves.to_vec();
943 let mut new_tree = CommitTree::new();
944 new_tree.append(&mut new_leaves);
945 new_tree.commit();
946
947 let checkpoint_proof = new_tree.head()?;
948 return Ok(ScanState::Result((
949 commit_hash,
950 checkpoint_proof,
951 )));
952 }
953 }
954
955 let mut req = request;
957 req.offset = response.offset;
958
959 Ok(ScanState::Continue(req))
960 } else {
961 Ok(ScanState::Exhausted)
962 }
963 }
964
965 #[doc(hidden)]
968 fn compare_proof(
969 &self,
970 proof: &CommitProof,
971 leaves: &[[u8; 32]],
972 ) -> Option<CommitHash> {
973 let (verified, leaves) = proof.verify_leaves(leaves);
974
975 tracing::trace!(
976 proof = ?proof,
977 verified = ?verified,
978 "auto_merge::compare_proof",
979 );
980
981 if verified {
982 leaves.last().copied().map(CommitHash)
983 } else {
984 None
985 }
986 }
987}