sos_remote_sync/
auto_merge.rs

1//! Implements auto merge logic for a remote.
2use async_trait::async_trait;
3use sos_account::Account;
4use sos_core::{
5    commit::{CommitHash, CommitProof, CommitTree},
6    events::{
7        patch::{
8            AccountDiff, CheckedPatch, DeviceDiff, Diff, FolderDiff, Patch,
9        },
10        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
11        WriteEvent,
12    },
13    VaultId,
14};
15use sos_protocol::{
16    AsConflict, ConflictError, DiffRequest, HardConflictResolver,
17    PatchRequest, ScanRequest, SyncClient, SyncOptions,
18};
19use sos_sync::{
20    ForceMerge, MaybeConflict, Merge, MergeOutcome, StorageEventLogs,
21    SyncDirection, SyncStatus,
22};
23use std::collections::HashSet;
24use tracing::instrument;
25
26const PROOF_SCAN_LIMIT: u16 = 32;
27
28#[cfg(feature = "files")]
29use sos_core::events::{patch::FileDiff, FileEvent};
30
31use super::RemoteSyncHandler;
32
33/// State used while scanning commit proofs on a remote data source.
34#[doc(hidden)]
35pub enum ScanState {
36    Result((CommitHash, CommitProof)),
37    Continue(ScanRequest),
38    Exhausted,
39}
40
41/// Whether to apply an auto merge to local or remote.
42#[doc(hidden)]
43pub enum AutoMergeStatus {
44    /// Apply the events to the local event log.
45    RewindLocal(Vec<EventRecord>),
46    /// Push events to the remote.
47    PushRemote(Vec<EventRecord>),
48}
49
50/// Support for auto merge on sync.
51#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
52#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
53pub trait AutoMerge: RemoteSyncHandler {
54    /// Execute the sync operation.
55    ///
56    /// If the account does not exist it is created
57    /// on the remote, otherwise the account is synced.
58    #[doc(hidden)]
59    async fn execute_sync(
60        &self,
61        options: &SyncOptions,
62    ) -> Result<Option<MergeOutcome>, Self::Error> {
63        match self.direction() {
64            SyncDirection::Push => {
65                let exists = self.client().account_exists().await?;
66                if exists {
67                    self.perform_sync(options).await
68                } else {
69                    self.create_account().await?;
70                    Ok(None)
71                }
72            }
73            SyncDirection::Pull => {
74                let exists = {
75                    let account = self.account();
76                    let account = account.lock().await;
77                    account.paths().is_usable().await?
78                };
79                if exists {
80                    self.perform_sync(options).await
81                } else {
82                    self.create_account().await?;
83                    Ok(None)
84                }
85            }
86        }
87    }
88
89    #[doc(hidden)]
90    async fn perform_sync(
91        &self,
92        options: &SyncOptions,
93    ) -> Result<Option<MergeOutcome>, Self::Error> {
94        let sync_status = self.client().sync_status().await?;
95        match self.sync_account(sync_status).await {
96            Ok(outcome) => Ok(Some(outcome)),
97            Err(e) => {
98                if e.is_conflict() {
99                    let conflict = e.take_conflict().unwrap();
100                    match conflict {
101                        ConflictError::Soft {
102                            conflict,
103                            local,
104                            remote,
105                        } => {
106                            let outcome = self
107                                .auto_merge(options, conflict, local, remote)
108                                .await?;
109                            Ok(Some(outcome))
110                        }
111                        _ => Err(conflict.into()),
112                    }
113                } else {
114                    Err(e)
115                }
116            }
117        }
118    }
119
120    #[doc(hidden)]
121    async fn auto_merge_scan<T>(
122        &self,
123        log_id: &'static str,
124        log_type: EventLogType,
125    ) -> Result<bool, <Self as RemoteSyncHandler>::Error>
126    where
127        T: Default + Send + Sync,
128    {
129        tracing::debug!(log_id);
130
131        let req = ScanRequest {
132            log_type,
133            offset: 0,
134            limit: PROOF_SCAN_LIMIT,
135        };
136        match self.scan_proofs(req).await {
137            Ok(Some((ancestor_commit, proof))) => {
138                self.try_merge_from_ancestor::<T>(
139                    EventLogType::Identity,
140                    ancestor_commit,
141                    proof,
142                )
143                .await?;
144                Ok(false)
145            }
146            Err(e) => {
147                if e.is_hard_conflict() {
148                    Ok(true)
149                } else {
150                    Err(e)
151                }
152            }
153            _ => Err(ConflictError::Hard.into()),
154        }
155    }
156
157    /// Auto merge identity folders.
158    async fn auto_merge_identity(
159        &self,
160        options: &SyncOptions,
161        outcome: &mut MergeOutcome,
162    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
163        let handle_conflict = self
164            .auto_merge_scan::<WriteEvent>(
165                "auto_merge::identity",
166                EventLogType::Identity,
167            )
168            .await?;
169        if handle_conflict {
170            self.identity_hard_conflict(options, outcome).await?;
171        }
172        Ok(handle_conflict)
173    }
174
175    /// Auto merge account events.
176    async fn auto_merge_account(
177        &self,
178        options: &SyncOptions,
179        outcome: &mut MergeOutcome,
180    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
181        let handle_conflict = self
182            .auto_merge_scan::<AccountEvent>(
183                "auto_merge::account",
184                EventLogType::Account,
185            )
186            .await?;
187        if handle_conflict {
188            self.account_hard_conflict(options, outcome).await?;
189        }
190        Ok(handle_conflict)
191    }
192
193    /// Auto merge device events.
194    async fn auto_merge_device(
195        &self,
196        options: &SyncOptions,
197        outcome: &mut MergeOutcome,
198    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
199        let handle_conflict = self
200            .auto_merge_scan::<DeviceEvent>(
201                "auto_merge::device",
202                EventLogType::Device,
203            )
204            .await?;
205        if handle_conflict {
206            self.device_hard_conflict(options, outcome).await?;
207        }
208        Ok(handle_conflict)
209    }
210
211    /// Auto merge file events.
212    #[cfg(feature = "files")]
213    async fn auto_merge_files(
214        &self,
215        options: &SyncOptions,
216        outcome: &mut MergeOutcome,
217    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
218        let handle_conflict = self
219            .auto_merge_scan::<FileEvent>(
220                "auto_merge::files",
221                EventLogType::Files,
222            )
223            .await?;
224        if handle_conflict {
225            self.files_hard_conflict(options, outcome).await?;
226        }
227        Ok(handle_conflict)
228    }
229
230    #[doc(hidden)]
231    async fn hard_conflict_diff<EventType>(
232        &self,
233        log_id: &'static str,
234        log_type: EventLogType,
235        options: &SyncOptions,
236    ) -> Result<Diff<EventType>, <Self as RemoteSyncHandler>::Error> {
237        match &options.hard_conflict_resolver {
238            HardConflictResolver::AutomaticFetch => {
239                tracing::debug!(log_id);
240
241                let request = DiffRequest {
242                    log_type,
243                    from_hash: None,
244                };
245                let response = self.client().diff(request).await?;
246                let patch = Patch::<EventType>::new(response.patch);
247                let diff =
248                    Diff::<EventType>::new(patch, response.checkpoint, None);
249                Ok(diff)
250            }
251        }
252    }
253
254    #[doc(hidden)]
255    async fn identity_hard_conflict(
256        &self,
257        options: &SyncOptions,
258        outcome: &mut MergeOutcome,
259    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
260        let diff = self
261            .hard_conflict_diff::<WriteEvent>(
262                "hard_conflict::force_merge::identity",
263                EventLogType::Identity,
264                options,
265            )
266            .await?;
267
268        let account = self.account();
269        let mut account = account.lock().await;
270        Ok(account.force_merge_identity(diff, outcome).await?)
271    }
272
273    #[doc(hidden)]
274    async fn account_hard_conflict(
275        &self,
276        options: &SyncOptions,
277        outcome: &mut MergeOutcome,
278    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
279        let diff = self
280            .hard_conflict_diff::<AccountEvent>(
281                "hard_conflict::force_merge::account",
282                EventLogType::Account,
283                options,
284            )
285            .await?;
286
287        let account = self.account();
288        let mut account = account.lock().await;
289        Ok(account.force_merge_account(diff, outcome).await?)
290    }
291
292    #[doc(hidden)]
293    async fn device_hard_conflict(
294        &self,
295        options: &SyncOptions,
296        outcome: &mut MergeOutcome,
297    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
298        let diff = self
299            .hard_conflict_diff::<DeviceEvent>(
300                "hard_conflict::force_merge::device",
301                EventLogType::Device,
302                options,
303            )
304            .await?;
305
306        let account = self.account();
307        let mut account = account.lock().await;
308        Ok(account.force_merge_device(diff, outcome).await?)
309    }
310
311    #[doc(hidden)]
312    #[cfg(feature = "files")]
313    async fn files_hard_conflict(
314        &self,
315        options: &SyncOptions,
316        outcome: &mut MergeOutcome,
317    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
318        let diff = self
319            .hard_conflict_diff::<FileEvent>(
320                "hard_conflict::force_merge::files",
321                EventLogType::Files,
322                options,
323            )
324            .await?;
325
326        let account = self.account();
327        let mut account = account.lock().await;
328        Ok(account.force_merge_files(diff, outcome).await?)
329    }
330
331    /// Try to auto merge on conflict.
332    ///
333    /// Searches the remote event log for a common ancestor and
334    /// attempts to merge commits from the common ancestor with the
335    /// diff that would have been applied.
336    ///
337    /// Once the changes have been merged a force update of the
338    /// server is necessary.
339    #[instrument(skip_all)]
340    async fn auto_merge(
341        &self,
342        options: &SyncOptions,
343        conflict: MaybeConflict,
344        local: SyncStatus,
345        _remote: SyncStatus,
346    ) -> Result<MergeOutcome, <Self as RemoteSyncHandler>::Error> {
347        let mut force_merge_outcome = MergeOutcome::default();
348        let mut has_hard_conflict = false;
349
350        if conflict.identity {
351            let hard_conflict = self
352                .auto_merge_identity(options, &mut force_merge_outcome)
353                .await?;
354
355            if hard_conflict {
356                tracing::warn!("hard_conflict::identity");
357            }
358
359            has_hard_conflict = has_hard_conflict || hard_conflict;
360        }
361
362        if conflict.account {
363            let hard_conflict = self
364                .auto_merge_account(options, &mut force_merge_outcome)
365                .await?;
366
367            if hard_conflict {
368                tracing::warn!("hard_conflict::account");
369            }
370
371            has_hard_conflict = has_hard_conflict || hard_conflict;
372        }
373
374        if conflict.device {
375            let hard_conflict = self
376                .auto_merge_device(options, &mut force_merge_outcome)
377                .await?;
378
379            if hard_conflict {
380                tracing::warn!("hard_conflict::device");
381            }
382
383            has_hard_conflict = has_hard_conflict || hard_conflict;
384        }
385
386        #[cfg(feature = "files")]
387        if conflict.files {
388            let hard_conflict = self
389                .auto_merge_files(options, &mut force_merge_outcome)
390                .await?;
391
392            if hard_conflict {
393                tracing::warn!("hard_conflict::files");
394            }
395
396            has_hard_conflict = has_hard_conflict || hard_conflict;
397        }
398
399        for (folder_id, _) in &conflict.folders {
400            let hard_conflict = self
401                .auto_merge_folder(
402                    options,
403                    &local,
404                    folder_id,
405                    &mut force_merge_outcome,
406                )
407                .await?;
408
409            if hard_conflict {
410                tracing::warn!(
411                    folder_id = %folder_id,
412                    "hard_conflict::folder",
413                );
414            }
415
416            has_hard_conflict = has_hard_conflict || hard_conflict;
417        }
418
419        /*
420        if has_hard_conflict {
421            tracing::warn!(
422                outcome = ?force_merge_outcome,
423                "hard_conflict::sign_out");
424            let account = self.account();
425            let mut account = account.lock().await;
426            account.sign_out().await?;
427        }
428        */
429
430        // FIXME: put conflict info in merge outcome
431
432        Ok(force_merge_outcome)
433    }
434
435    /// Auto merge a folder.
436    async fn auto_merge_folder(
437        &self,
438        options: &SyncOptions,
439        _local_status: &SyncStatus,
440        folder_id: &VaultId,
441        outcome: &mut MergeOutcome,
442    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
443        tracing::debug!(folder_id = %folder_id, "auto_merge::folder");
444
445        let req = ScanRequest {
446            log_type: EventLogType::Folder(*folder_id),
447            offset: 0,
448            limit: PROOF_SCAN_LIMIT,
449        };
450        match self.scan_proofs(req).await {
451            Ok(Some((ancestor_commit, proof))) => {
452                self.try_merge_from_ancestor::<WriteEvent>(
453                    EventLogType::Folder(*folder_id),
454                    ancestor_commit,
455                    proof,
456                )
457                .await?;
458                Ok(false)
459            }
460            Err(e) => {
461                if e.is_hard_conflict() {
462                    self.folder_hard_conflict(folder_id, options, outcome)
463                        .await?;
464                    Ok(true)
465                } else {
466                    Err(e)
467                }
468            }
469            _ => Err(ConflictError::Hard.into()),
470        }
471    }
472
473    #[doc(hidden)]
474    async fn folder_hard_conflict(
475        &self,
476        folder_id: &VaultId,
477        options: &SyncOptions,
478        outcome: &mut MergeOutcome,
479    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
480        match &options.hard_conflict_resolver {
481            HardConflictResolver::AutomaticFetch => {
482                let request = DiffRequest {
483                    log_type: EventLogType::Folder(*folder_id),
484                    from_hash: None,
485                };
486                let response = self.client().diff(request).await?;
487                let patch = Patch::<WriteEvent>::new(response.patch);
488                let diff = FolderDiff {
489                    patch,
490                    checkpoint: response.checkpoint,
491                    last_commit: None,
492                };
493                let account = self.account();
494                let mut account = account.lock().await;
495                Ok(account
496                    .force_merge_folder(folder_id, diff, outcome)
497                    .await?)
498            }
499        }
500    }
501
502    /// Try to merge from a shared ancestor commit.
503    #[doc(hidden)]
504    async fn try_merge_from_ancestor<T>(
505        &self,
506        log_type: EventLogType,
507        commit: CommitHash,
508        proof: CommitProof,
509    ) -> Result<(), <Self as RemoteSyncHandler>::Error>
510    where
511        T: Default + Send + Sync,
512    {
513        tracing::debug!(
514            commit = %commit,
515            "auto_merge::try_merge_from_ancestor",
516        );
517
518        // Get the patch from local
519        let local_patch = {
520            let account = self.account();
521            let account = account.lock().await;
522            match &log_type {
523                EventLogType::Identity => {
524                    let log = account.identity_log().await?;
525                    let event_log = log.read().await;
526                    event_log.diff_records(Some(&commit)).await?
527                }
528                EventLogType::Account => {
529                    let log = account.account_log().await?;
530                    let event_log = log.read().await;
531                    event_log.diff_records(Some(&commit)).await?
532                }
533                EventLogType::Device => {
534                    let log = account.device_log().await?;
535                    let event_log = log.read().await;
536                    event_log.diff_records(Some(&commit)).await?
537                }
538                #[cfg(feature = "files")]
539                EventLogType::Files => {
540                    let log = account.file_log().await?;
541                    let event_log = log.read().await;
542                    event_log.diff_records(Some(&commit)).await?
543                }
544                EventLogType::Folder(id) => {
545                    let log = account.folder_log(id).await?;
546                    let event_log = log.read().await;
547                    event_log.diff_records(Some(&commit)).await?
548                }
549            }
550        };
551
552        // Fetch the patch of remote events
553        let request = DiffRequest {
554            log_type,
555            from_hash: Some(commit),
556        };
557        let remote_patch = self.client().diff(request).await?.patch;
558
559        let result = self.merge_patches(local_patch, remote_patch).await?;
560
561        match result {
562            AutoMergeStatus::RewindLocal(events) => {
563                let local_patch = self
564                    .rewind_local(&log_type, commit, proof, events)
565                    .await?;
566
567                let success = matches!(local_patch, CheckedPatch::Success(_));
568
569                if success {
570                    tracing::info!("auto_merge::rewind_local::success");
571                }
572            }
573            AutoMergeStatus::PushRemote(events) => {
574                let (remote_patch, local_patch) = self
575                    .push_remote::<T>(&log_type, commit, proof, events)
576                    .await?;
577
578                let success =
579                    matches!(remote_patch, CheckedPatch::Success(_))
580                        && matches!(
581                            local_patch,
582                            Some(CheckedPatch::Success(_))
583                        );
584
585                if success {
586                    tracing::info!("auto_merge::push_remote::success");
587                }
588            }
589        }
590
591        Ok(())
592    }
593
594    #[doc(hidden)]
595    async fn merge_patches(
596        &self,
597        mut local: Vec<EventRecord>,
598        remote: Vec<EventRecord>,
599    ) -> Result<AutoMergeStatus, <Self as RemoteSyncHandler>::Error> {
600        tracing::info!(
601            local_len = local.len(),
602            remote_len = remote.len(),
603            "auto_merge::merge_patches",
604        );
605
606        let local_commits =
607            local.iter().map(|r| r.commit()).collect::<HashSet<_>>();
608        let remote_commits =
609            remote.iter().map(|r| r.commit()).collect::<HashSet<_>>();
610
611        // If all the local commits exist in the remote
612        // then apply the remote events to the local event
613        // log.
614        //
615        // If we didn't do this then automerge could go on
616        // ad infinitum.
617        if local_commits.is_subset(&remote_commits) {
618            return Ok(AutoMergeStatus::RewindLocal(remote));
619        }
620
621        // Combine the event records
622        local.extend(remote.into_iter());
623
624        // Sort by time so the more recent changes will win (LWW)
625        local.sort_by(|a, b| a.time().cmp(b.time()));
626
627        Ok(AutoMergeStatus::PushRemote(local))
628    }
629
630    /// Rewind a local event log and apply the events.
631    #[doc(hidden)]
632    async fn rewind_local(
633        &self,
634        log_type: &EventLogType,
635        commit: CommitHash,
636        proof: CommitProof,
637        events: Vec<EventRecord>,
638    ) -> Result<CheckedPatch, <Self as RemoteSyncHandler>::Error> {
639        tracing::debug!(
640          log_type = ?log_type,
641          commit = %commit,
642          length = %events.len(),
643          "auto_merge::rewind_local",
644        );
645
646        // Rewind the event log to the target commit
647        let records = self.rewind_event_log(log_type, &commit).await?;
648
649        let mut outcome = MergeOutcome::default();
650
651        // Merge the events after rewinding
652        let checked_patch = {
653            let account = self.account();
654            let mut account = account.lock().await;
655            match &log_type {
656                EventLogType::Identity => {
657                    let patch = Patch::<WriteEvent>::new(events);
658                    let diff = FolderDiff {
659                        last_commit: Some(commit),
660                        checkpoint: proof,
661                        patch,
662                    };
663                    account.merge_identity(diff, &mut outcome).await?
664                }
665                EventLogType::Account => {
666                    let patch = Patch::<AccountEvent>::new(events);
667                    let diff = AccountDiff {
668                        last_commit: Some(commit),
669                        checkpoint: proof,
670                        patch,
671                    };
672                    account.merge_account(diff, &mut outcome).await?.0
673                }
674                EventLogType::Device => {
675                    let patch = Patch::<DeviceEvent>::new(events);
676                    let diff = DeviceDiff {
677                        last_commit: Some(commit),
678                        checkpoint: proof,
679                        patch,
680                    };
681                    account.merge_device(diff, &mut outcome).await?
682                }
683                #[cfg(feature = "files")]
684                EventLogType::Files => {
685                    let patch = Patch::<FileEvent>::new(events);
686                    let diff = FileDiff {
687                        last_commit: Some(commit),
688                        checkpoint: proof,
689                        patch,
690                    };
691                    account.merge_files(diff, &mut outcome).await?
692                }
693                EventLogType::Folder(id) => {
694                    let patch = Patch::<WriteEvent>::new(events);
695                    let diff = FolderDiff {
696                        last_commit: Some(commit),
697                        checkpoint: proof,
698                        patch,
699                    };
700
701                    account.merge_folder(id, diff, &mut outcome).await?.0
702                }
703            }
704        };
705
706        if let CheckedPatch::Conflict { head, .. } = &checked_patch {
707            tracing::warn!(
708                head = ?head,
709                num_records = ?records.len(),
710                "auto_merge::rollback_rewind");
711
712            self.rollback_rewind(log_type, records).await?;
713        }
714
715        Ok(checked_patch)
716    }
717
718    #[doc(hidden)]
719    async fn rollback_rewind(
720        &self,
721        log_type: &EventLogType,
722        records: Vec<EventRecord>,
723    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
724        let account = self.account();
725        let account = account.lock().await;
726        match log_type {
727            EventLogType::Identity => {
728                let log = account.identity_log().await?;
729                let mut event_log = log.write().await;
730                event_log.apply_records(records).await?;
731            }
732            EventLogType::Account => {
733                let log = account.account_log().await?;
734                let mut event_log = log.write().await;
735                event_log.apply_records(records).await?;
736            }
737            EventLogType::Device => {
738                let log = account.device_log().await?;
739                let mut event_log = log.write().await;
740                event_log.apply_records(records).await?;
741            }
742            #[cfg(feature = "files")]
743            EventLogType::Files => {
744                let log = account.file_log().await?;
745                let mut event_log = log.write().await;
746                event_log.apply_records(records).await?;
747            }
748            EventLogType::Folder(id) => {
749                let log = account.folder_log(id).await?;
750                let mut event_log = log.write().await;
751                event_log.apply_records(records).await?;
752            }
753        }
754
755        Ok(())
756    }
757
758    /// Push the events to a remote and rewind local.
759    #[doc(hidden)]
760    async fn push_remote<T>(
761        &self,
762        log_type: &EventLogType,
763        commit: CommitHash,
764        proof: CommitProof,
765        events: Vec<EventRecord>,
766    ) -> Result<
767        (CheckedPatch, Option<CheckedPatch>),
768        <Self as RemoteSyncHandler>::Error,
769    >
770    where
771        T: Default + Send + Sync,
772    {
773        tracing::debug!(
774          log_type = ?log_type,
775          commit = %commit,
776          length = %events.len(),
777          "auto_merge::push_remote",
778        );
779
780        let req = PatchRequest {
781            log_type: *log_type,
782            commit: Some(commit),
783            proof: proof.clone(),
784            patch: events.clone(),
785        };
786
787        let remote_patch = self.client().patch(req).await?.checked_patch;
788        let local_patch = match &remote_patch {
789            CheckedPatch::Success(_) => {
790                let local_patch = self
791                    .rewind_local(log_type, commit, proof, events)
792                    .await?;
793                Some(local_patch)
794            }
795            CheckedPatch::Conflict { head, contains } => {
796                tracing::error!(
797                  head = ?head,
798                  contains = ?contains,
799                  "auto_merge::patch::conflict",
800                );
801                None
802            }
803        };
804
805        Ok((remote_patch, local_patch))
806    }
807
808    /// Rewind an event log to a specific commit.
809    #[doc(hidden)]
810    async fn rewind_event_log(
811        &self,
812        log_type: &EventLogType,
813        commit: &CommitHash,
814    ) -> Result<Vec<EventRecord>, <Self as RemoteSyncHandler>::Error> {
815        tracing::debug!(
816          log_type = ?log_type,
817          commit = %commit,
818          "automerge::rewind_event_log",
819        );
820        // Rewind the event log
821        let account = self.account();
822        let account = account.lock().await;
823        Ok(match &log_type {
824            EventLogType::Identity => {
825                let log = account.identity_log().await?;
826                let mut event_log = log.write().await;
827                event_log.rewind(commit).await?
828            }
829            EventLogType::Account => {
830                let log = account.account_log().await?;
831                let mut event_log = log.write().await;
832                event_log.rewind(commit).await?
833            }
834            EventLogType::Device => {
835                let log = account.device_log().await?;
836                let mut event_log = log.write().await;
837                event_log.rewind(commit).await?
838            }
839            #[cfg(feature = "files")]
840            EventLogType::Files => {
841                let log = account.file_log().await?;
842                let mut event_log = log.write().await;
843                event_log.rewind(commit).await?
844            }
845            EventLogType::Folder(id) => {
846                let log = account.folder_log(id).await?;
847                let mut event_log = log.write().await;
848                event_log.rewind(commit).await?
849            }
850        })
851    }
852
853    /// Scan the remote for proofs that match this client.
854    #[doc(hidden)]
855    async fn scan_proofs(
856        &self,
857        request: ScanRequest,
858    ) -> Result<
859        Option<(CommitHash, CommitProof)>,
860        <Self as RemoteSyncHandler>::Error,
861    > {
862        tracing::debug!(request = ?request, "auto_merge::scan_proofs");
863
864        let leaves = {
865            let account = self.account();
866            let account = account.lock().await;
867            match &request.log_type {
868                EventLogType::Identity => {
869                    let log = account.identity_log().await?;
870                    let event_log = log.read().await;
871                    event_log.tree().leaves().unwrap_or_default()
872                }
873                EventLogType::Account => {
874                    let log = account.account_log().await?;
875                    let event_log = log.read().await;
876                    event_log.tree().leaves().unwrap_or_default()
877                }
878                EventLogType::Device => {
879                    let log = account.device_log().await?;
880                    let event_log = log.read().await;
881                    event_log.tree().leaves().unwrap_or_default()
882                }
883                #[cfg(feature = "files")]
884                EventLogType::Files => {
885                    let log = account.file_log().await?;
886                    let event_log = log.read().await;
887                    event_log.tree().leaves().unwrap_or_default()
888                }
889                EventLogType::Folder(id) => {
890                    let log = account.folder_log(id).await?;
891                    let event_log = log.read().await;
892                    event_log.tree().leaves().unwrap_or_default()
893                }
894            }
895        };
896
897        let mut req = request.clone();
898        loop {
899            match self.iterate_scan_proofs(req.clone(), &leaves).await? {
900                ScanState::Result(value) => return Ok(Some(value)),
901                ScanState::Continue(scan) => req = scan,
902                ScanState::Exhausted => return Ok(None),
903            }
904        }
905    }
906
907    /// Scan the remote for proofs that match this client.
908    #[doc(hidden)]
909    async fn iterate_scan_proofs(
910        &self,
911        request: ScanRequest,
912        leaves: &[[u8; 32]],
913    ) -> Result<ScanState, <Self as RemoteSyncHandler>::Error> {
914        tracing::debug!(
915          request = ?request,
916          "auto_merge::iterate_scan_proofs");
917
918        let response = self.client().scan(request.clone()).await?;
919
920        // If the server gave us a first proof and we don't
921        // have it in our event log then there is no point scanning
922        // as we know the trees have diverged
923        if let Some(first_proof) = &response.first_proof {
924            let (verified, _) = first_proof.verify_leaves(leaves);
925            if !verified {
926                return Err(ConflictError::Hard.into());
927            }
928        }
929
930        if !response.proofs.is_empty() {
931            // Proofs are returned in the event log order
932            // but we always want to scan from the end of
933            // the event log so reverse the iteration
934            for proof in response.proofs.iter().rev() {
935                // Find the last matching commit from the indices
936                // to prove
937                if let Some(commit_hash) = self.compare_proof(proof, leaves) {
938                    // Compute the root hash and proof for the
939                    // matched index
940                    let index = proof.indices.last().copied().unwrap();
941                    let new_leaves = &leaves[0..=index];
942                    let mut new_leaves = new_leaves.to_vec();
943                    let mut new_tree = CommitTree::new();
944                    new_tree.append(&mut new_leaves);
945                    new_tree.commit();
946
947                    let checkpoint_proof = new_tree.head()?;
948                    return Ok(ScanState::Result((
949                        commit_hash,
950                        checkpoint_proof,
951                    )));
952                }
953            }
954
955            // Try to scan more proofs
956            let mut req = request;
957            req.offset = response.offset;
958
959            Ok(ScanState::Continue(req))
960        } else {
961            Ok(ScanState::Exhausted)
962        }
963    }
964
965    /// Determine if a local event log contains a proof
966    /// received from the server.
967    #[doc(hidden)]
968    fn compare_proof(
969        &self,
970        proof: &CommitProof,
971        leaves: &[[u8; 32]],
972    ) -> Option<CommitHash> {
973        let (verified, leaves) = proof.verify_leaves(leaves);
974
975        tracing::trace!(
976            proof = ?proof,
977            verified = ?verified,
978            "auto_merge::compare_proof",
979        );
980
981        if verified {
982            leaves.last().copied().map(CommitHash)
983        } else {
984            None
985        }
986    }
987}