sos_remote_sync/
auto_merge.rs

1//! Implements auto merge logic for a remote.
2use async_trait::async_trait;
3use sos_account::Account;
4use sos_core::{
5    commit::{CommitHash, CommitProof, CommitTree},
6    events::{
7        patch::{
8            AccountDiff, CheckedPatch, DeviceDiff, Diff, FolderDiff, Patch,
9        },
10        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
11        WriteEvent,
12    },
13    VaultId,
14};
15use sos_protocol::{
16    AsConflict, ConflictError, DiffRequest, HardConflictResolver,
17    PatchRequest, ScanRequest, SyncClient, SyncOptions,
18};
19use sos_sync::{
20    ForceMerge, MaybeConflict, Merge, MergeOutcome, StorageEventLogs,
21    SyncDirection, SyncStatus,
22};
23use std::collections::HashSet;
24use tracing::instrument;
25
26const PROOF_SCAN_LIMIT: u16 = 32;
27
28#[cfg(feature = "files")]
29use sos_core::events::{patch::FileDiff, FileEvent};
30
31use super::RemoteSyncHandler;
32
33/// State used while scanning commit proofs on a remote data source.
34#[doc(hidden)]
35pub enum ScanState {
36    Result((CommitHash, CommitProof)),
37    Continue(ScanRequest),
38    Exhausted,
39}
40
41/// Whether to apply an auto merge to local or remote.
42#[doc(hidden)]
43pub enum AutoMergeStatus {
44    /// Apply the events to the local event log.
45    RewindLocal(Vec<EventRecord>),
46    /// Push events to the remote.
47    PushRemote(Vec<EventRecord>),
48}
49
50/// Support for auto merge on sync.
51#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
52#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
53pub trait AutoMerge: RemoteSyncHandler {
54    /// Execute the sync operation.
55    ///
56    /// If the account does not exist it is created
57    /// on the remote, otherwise the account is synced.
58    #[doc(hidden)]
59    async fn execute_sync(
60        &self,
61        options: &SyncOptions,
62    ) -> Result<Option<MergeOutcome>, Self::Error> {
63        match self.direction() {
64            SyncDirection::Push => {
65                let exists = self.client().account_exists().await?;
66                if exists {
67                    self.perform_sync(options).await
68                } else {
69                    self.create_account().await?;
70                    Ok(None)
71                }
72            }
73            SyncDirection::Pull => {
74                let exists = {
75                    let account = self.account();
76                    let account = account.lock().await;
77                    account.paths().is_usable().await?
78                };
79                if exists {
80                    self.perform_sync(options).await
81                } else {
82                    self.create_account().await?;
83                    Ok(None)
84                }
85            }
86        }
87    }
88
89    #[doc(hidden)]
90    async fn perform_sync(
91        &self,
92        options: &SyncOptions,
93    ) -> Result<Option<MergeOutcome>, Self::Error> {
94        let sync_status = self.client().sync_status().await?;
95        match self.sync_account(sync_status).await {
96            Ok(outcome) => Ok(Some(outcome)),
97            Err(e) => {
98                if e.is_conflict() {
99                    let conflict = e.take_conflict().unwrap();
100                    match conflict {
101                        ConflictError::Soft {
102                            conflict,
103                            local,
104                            remote,
105                        } => {
106                            let outcome = self
107                                .auto_merge(options, conflict, local, remote)
108                                .await?;
109                            Ok(Some(outcome))
110                        }
111                        _ => Err(conflict.into()),
112                    }
113                } else {
114                    Err(e)
115                }
116            }
117        }
118    }
119
120    #[doc(hidden)]
121    async fn auto_merge_scan<T>(
122        &self,
123        log_id: &'static str,
124        log_type: EventLogType,
125    ) -> Result<bool, <Self as RemoteSyncHandler>::Error>
126    where
127        T: Default + Send + Sync,
128    {
129        tracing::debug!(log_id);
130
131        let req = ScanRequest {
132            log_type,
133            offset: 0,
134            limit: PROOF_SCAN_LIMIT,
135        };
136        match self.scan_proofs(req).await {
137            Ok(Some((ancestor_commit, proof))) => {
138                self.try_merge_from_ancestor::<T>(
139                    EventLogType::Identity,
140                    ancestor_commit,
141                    proof,
142                )
143                .await?;
144                Ok(false)
145            }
146            Err(e) => {
147                if e.is_hard_conflict() {
148                    Ok(true)
149                } else {
150                    Err(e)
151                }
152            }
153            _ => Err(ConflictError::Hard.into()),
154        }
155    }
156
157    /// Auto merge identity folders.
158    async fn auto_merge_identity(
159        &self,
160        options: &SyncOptions,
161        outcome: &mut MergeOutcome,
162    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
163        let handle_conflict = self
164            .auto_merge_scan::<WriteEvent>(
165                "auto_merge::identity",
166                EventLogType::Identity,
167            )
168            .await?;
169        if handle_conflict {
170            self.identity_hard_conflict(options, outcome).await?;
171        }
172        Ok(handle_conflict)
173    }
174
175    /// Auto merge account events.
176    async fn auto_merge_account(
177        &self,
178        options: &SyncOptions,
179        outcome: &mut MergeOutcome,
180    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
181        let handle_conflict = self
182            .auto_merge_scan::<AccountEvent>(
183                "auto_merge::account",
184                EventLogType::Account,
185            )
186            .await?;
187        if handle_conflict {
188            self.account_hard_conflict(options, outcome).await?;
189        }
190        Ok(handle_conflict)
191    }
192
193    /// Auto merge device events.
194    async fn auto_merge_device(
195        &self,
196        options: &SyncOptions,
197        outcome: &mut MergeOutcome,
198    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
199        let handle_conflict = self
200            .auto_merge_scan::<DeviceEvent>(
201                "auto_merge::device",
202                EventLogType::Device,
203            )
204            .await?;
205        if handle_conflict {
206            self.device_hard_conflict(options, outcome).await?;
207        }
208        Ok(handle_conflict)
209    }
210
211    /// Auto merge file events.
212    #[cfg(feature = "files")]
213    async fn auto_merge_files(
214        &self,
215        options: &SyncOptions,
216        outcome: &mut MergeOutcome,
217    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
218        let handle_conflict = self
219            .auto_merge_scan::<FileEvent>(
220                "auto_merge::files",
221                EventLogType::Files,
222            )
223            .await?;
224        if handle_conflict {
225            self.files_hard_conflict(options, outcome).await?;
226        }
227        Ok(handle_conflict)
228    }
229
230    #[doc(hidden)]
231    async fn hard_conflict_diff<EventType>(
232        &self,
233        log_id: &'static str,
234        log_type: EventLogType,
235        options: &SyncOptions,
236    ) -> Result<Diff<EventType>, <Self as RemoteSyncHandler>::Error> {
237        match &options.hard_conflict_resolver {
238            HardConflictResolver::AutomaticFetch => {
239                tracing::debug!(log_id);
240
241                let request = DiffRequest {
242                    log_type,
243                    from_hash: None,
244                };
245                let response = self.client().diff(request).await?;
246                let patch = Patch::<EventType>::new(response.patch);
247                let diff =
248                    Diff::<EventType>::new(patch, response.checkpoint, None);
249                Ok(diff)
250            }
251        }
252    }
253
254    #[doc(hidden)]
255    async fn identity_hard_conflict(
256        &self,
257        options: &SyncOptions,
258        outcome: &mut MergeOutcome,
259    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
260        let diff = self
261            .hard_conflict_diff::<WriteEvent>(
262                "hard_conflict::force_merge::identity",
263                EventLogType::Identity,
264                options,
265            )
266            .await?;
267
268        let account = self.account();
269        let mut account = account.lock().await;
270        Ok(account.force_merge_identity(diff, outcome).await?)
271    }
272
273    #[doc(hidden)]
274    async fn account_hard_conflict(
275        &self,
276        options: &SyncOptions,
277        outcome: &mut MergeOutcome,
278    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
279        let diff = self
280            .hard_conflict_diff::<AccountEvent>(
281                "hard_conflict::force_merge::account",
282                EventLogType::Account,
283                options,
284            )
285            .await?;
286
287        let account = self.account();
288        let mut account = account.lock().await;
289        Ok(account.force_merge_account(diff, outcome).await?)
290    }
291
292    #[doc(hidden)]
293    async fn device_hard_conflict(
294        &self,
295        options: &SyncOptions,
296        outcome: &mut MergeOutcome,
297    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
298        let diff = self
299            .hard_conflict_diff::<DeviceEvent>(
300                "hard_conflict::force_merge::device",
301                EventLogType::Device,
302                options,
303            )
304            .await?;
305
306        let account = self.account();
307        let mut account = account.lock().await;
308        Ok(account.force_merge_device(diff, outcome).await?)
309    }
310
311    #[doc(hidden)]
312    #[cfg(feature = "files")]
313    async fn files_hard_conflict(
314        &self,
315        options: &SyncOptions,
316        outcome: &mut MergeOutcome,
317    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
318        let diff = self
319            .hard_conflict_diff::<FileEvent>(
320                "hard_conflict::force_merge::files",
321                EventLogType::Files,
322                options,
323            )
324            .await?;
325
326        let account = self.account();
327        let mut account = account.lock().await;
328        Ok(account.force_merge_files(diff, outcome).await?)
329    }
330
331    /// Try to auto merge on conflict.
332    ///
333    /// Searches the remote event log for a common ancestor and
334    /// attempts to merge commits from the common ancestor with the
335    /// diff that would have been applied.
336    ///
337    /// Once the changes have been merged a force update of the
338    /// server is necessary.
339    #[instrument(skip_all)]
340    async fn auto_merge(
341        &self,
342        options: &SyncOptions,
343        conflict: MaybeConflict,
344        local: SyncStatus,
345        _remote: SyncStatus,
346    ) -> Result<MergeOutcome, <Self as RemoteSyncHandler>::Error> {
347        let mut force_merge_outcome = MergeOutcome::default();
348        let mut has_hard_conflict = false;
349
350        if conflict.identity {
351            let hard_conflict = self
352                .auto_merge_identity(options, &mut force_merge_outcome)
353                .await?;
354
355            if hard_conflict {
356                tracing::warn!("hard_conflict::identity");
357            }
358
359            has_hard_conflict = has_hard_conflict || hard_conflict;
360        }
361
362        if conflict.account {
363            let hard_conflict = self
364                .auto_merge_account(options, &mut force_merge_outcome)
365                .await?;
366
367            if hard_conflict {
368                tracing::warn!("hard_conflict::account");
369            }
370
371            has_hard_conflict = has_hard_conflict || hard_conflict;
372        }
373
374        if conflict.device {
375            let hard_conflict = self
376                .auto_merge_device(options, &mut force_merge_outcome)
377                .await?;
378
379            if hard_conflict {
380                tracing::warn!("hard_conflict::device");
381            }
382
383            has_hard_conflict = has_hard_conflict || hard_conflict;
384        }
385
386        #[cfg(feature = "files")]
387        if conflict.files {
388            let hard_conflict = self
389                .auto_merge_files(options, &mut force_merge_outcome)
390                .await?;
391
392            if hard_conflict {
393                tracing::warn!("hard_conflict::files");
394            }
395
396            has_hard_conflict = has_hard_conflict || hard_conflict;
397        }
398
399        for (folder_id, _) in &conflict.folders {
400            let hard_conflict = self
401                .auto_merge_folder(
402                    options,
403                    &local,
404                    folder_id,
405                    &mut force_merge_outcome,
406                )
407                .await?;
408
409            if hard_conflict {
410                tracing::warn!(
411                    folder_id = %folder_id,
412                    "hard_conflict::folder",
413                );
414            }
415
416            has_hard_conflict = has_hard_conflict || hard_conflict;
417        }
418
419        /*
420        if has_hard_conflict {
421            tracing::warn!(
422                outcome = ?force_merge_outcome,
423                "hard_conflict::sign_out");
424            let account = self.account();
425            let mut account = account.lock().await;
426            account.sign_out().await?;
427        }
428        */
429
430        // FIXME: put conflict info in merge outcome
431
432        Ok(force_merge_outcome)
433    }
434
435    /// Auto merge a folder.
436    async fn auto_merge_folder(
437        &self,
438        options: &SyncOptions,
439        _local_status: &SyncStatus,
440        folder_id: &VaultId,
441        outcome: &mut MergeOutcome,
442    ) -> Result<bool, <Self as RemoteSyncHandler>::Error> {
443        tracing::debug!(folder_id = %folder_id, "auto_merge::folder");
444
445        let req = ScanRequest {
446            log_type: EventLogType::Folder(*folder_id),
447            offset: 0,
448            limit: PROOF_SCAN_LIMIT,
449        };
450        match self.scan_proofs(req).await {
451            Ok(Some((ancestor_commit, proof))) => {
452                self.try_merge_from_ancestor::<WriteEvent>(
453                    EventLogType::Folder(*folder_id),
454                    ancestor_commit,
455                    proof,
456                )
457                .await?;
458                Ok(false)
459            }
460            Err(e) => {
461                if e.is_hard_conflict() {
462                    self.folder_hard_conflict(folder_id, options, outcome)
463                        .await?;
464                    Ok(true)
465                } else {
466                    Err(e)
467                }
468            }
469            _ => Err(ConflictError::Hard.into()),
470        }
471    }
472
473    #[doc(hidden)]
474    async fn folder_hard_conflict(
475        &self,
476        folder_id: &VaultId,
477        options: &SyncOptions,
478        outcome: &mut MergeOutcome,
479    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
480        match &options.hard_conflict_resolver {
481            HardConflictResolver::AutomaticFetch => {
482                let request = DiffRequest {
483                    log_type: EventLogType::Folder(*folder_id),
484                    from_hash: None,
485                };
486                let response = self.client().diff(request).await?;
487                let patch = Patch::<WriteEvent>::new(response.patch);
488                let diff = FolderDiff {
489                    patch,
490                    checkpoint: response.checkpoint,
491                    last_commit: None,
492                };
493                let account = self.account();
494                let mut account = account.lock().await;
495                Ok(account
496                    .force_merge_folder(folder_id, diff, outcome)
497                    .await?)
498            }
499        }
500    }
501
502    /// Try to merge from a shared ancestor commit.
503    #[doc(hidden)]
504    async fn try_merge_from_ancestor<T>(
505        &self,
506        log_type: EventLogType,
507        commit: CommitHash,
508        proof: CommitProof,
509    ) -> Result<(), <Self as RemoteSyncHandler>::Error>
510    where
511        T: Default + Send + Sync,
512    {
513        tracing::debug!(commit = %commit, "auto_merge::try_merge_from_ancestor");
514
515        // Get the patch from local
516        let local_patch = {
517            let account = self.account();
518            let account = account.lock().await;
519            match &log_type {
520                EventLogType::Identity => {
521                    let log = account.identity_log().await?;
522                    let event_log = log.read().await;
523                    event_log.diff_records(Some(&commit)).await?
524                }
525                EventLogType::Account => {
526                    let log = account.account_log().await?;
527                    let event_log = log.read().await;
528                    event_log.diff_records(Some(&commit)).await?
529                }
530                EventLogType::Device => {
531                    let log = account.device_log().await?;
532                    let event_log = log.read().await;
533                    event_log.diff_records(Some(&commit)).await?
534                }
535                #[cfg(feature = "files")]
536                EventLogType::Files => {
537                    let log = account.file_log().await?;
538                    let event_log = log.read().await;
539                    event_log.diff_records(Some(&commit)).await?
540                }
541                EventLogType::Folder(id) => {
542                    let log = account.folder_log(id).await?;
543                    let event_log = log.read().await;
544                    event_log.diff_records(Some(&commit)).await?
545                }
546            }
547        };
548
549        // Fetch the patch of remote events
550        let request = DiffRequest {
551            log_type,
552            from_hash: Some(commit),
553        };
554        let remote_patch = self.client().diff(request).await?.patch;
555
556        let result = self.merge_patches(local_patch, remote_patch).await?;
557
558        match result {
559            AutoMergeStatus::RewindLocal(events) => {
560                let local_patch = self
561                    .rewind_local(&log_type, commit, proof, events)
562                    .await?;
563
564                let success = matches!(local_patch, CheckedPatch::Success(_));
565
566                if success {
567                    tracing::info!("auto_merge::rewind_local::success");
568                }
569            }
570            AutoMergeStatus::PushRemote(events) => {
571                let (remote_patch, local_patch) = self
572                    .push_remote::<T>(&log_type, commit, proof, events)
573                    .await?;
574
575                let success =
576                    matches!(remote_patch, CheckedPatch::Success(_))
577                        && matches!(
578                            local_patch,
579                            Some(CheckedPatch::Success(_))
580                        );
581
582                if success {
583                    tracing::info!("auto_merge::push_remote::success");
584                }
585            }
586        }
587
588        Ok(())
589    }
590
591    #[doc(hidden)]
592    async fn merge_patches(
593        &self,
594        mut local: Vec<EventRecord>,
595        remote: Vec<EventRecord>,
596    ) -> Result<AutoMergeStatus, <Self as RemoteSyncHandler>::Error> {
597        tracing::info!(
598            local_len = local.len(),
599            remote_len = remote.len(),
600            "auto_merge::merge_patches",
601        );
602
603        let local_commits =
604            local.iter().map(|r| r.commit()).collect::<HashSet<_>>();
605        let remote_commits =
606            remote.iter().map(|r| r.commit()).collect::<HashSet<_>>();
607
608        // If all the local commits exist in the remote
609        // then apply the remote events to the local event
610        // log.
611        //
612        // If we didn't do this then automerge could go on
613        // ad infinitum.
614        if local_commits.is_subset(&remote_commits) {
615            return Ok(AutoMergeStatus::RewindLocal(remote));
616        }
617
618        // Combine the event records
619        local.extend(remote.into_iter());
620
621        // Sort by time so the more recent changes will win (LWW)
622        local.sort_by(|a, b| a.time().cmp(b.time()));
623
624        Ok(AutoMergeStatus::PushRemote(local))
625    }
626
627    /// Rewind a local event log and apply the events.
628    #[doc(hidden)]
629    async fn rewind_local(
630        &self,
631        log_type: &EventLogType,
632        commit: CommitHash,
633        proof: CommitProof,
634        events: Vec<EventRecord>,
635    ) -> Result<CheckedPatch, <Self as RemoteSyncHandler>::Error> {
636        tracing::debug!(
637          log_type = ?log_type,
638          commit = %commit,
639          length = %events.len(),
640          "auto_merge::rewind_local",
641        );
642
643        // Rewind the event log to the target commit
644        let records = self.rewind_event_log(log_type, &commit).await?;
645
646        let mut outcome = MergeOutcome::default();
647
648        // Merge the events after rewinding
649        let checked_patch = {
650            let account = self.account();
651            let mut account = account.lock().await;
652            match &log_type {
653                EventLogType::Identity => {
654                    let patch = Patch::<WriteEvent>::new(events);
655                    let diff = FolderDiff {
656                        last_commit: Some(commit),
657                        checkpoint: proof,
658                        patch,
659                    };
660                    account.merge_identity(diff, &mut outcome).await?
661                }
662                EventLogType::Account => {
663                    let patch = Patch::<AccountEvent>::new(events);
664                    let diff = AccountDiff {
665                        last_commit: Some(commit),
666                        checkpoint: proof,
667                        patch,
668                    };
669                    account.merge_account(diff, &mut outcome).await?.0
670                }
671                EventLogType::Device => {
672                    let patch = Patch::<DeviceEvent>::new(events);
673                    let diff = DeviceDiff {
674                        last_commit: Some(commit),
675                        checkpoint: proof,
676                        patch,
677                    };
678                    account.merge_device(diff, &mut outcome).await?
679                }
680                #[cfg(feature = "files")]
681                EventLogType::Files => {
682                    let patch = Patch::<FileEvent>::new(events);
683                    let diff = FileDiff {
684                        last_commit: Some(commit),
685                        checkpoint: proof,
686                        patch,
687                    };
688                    account.merge_files(diff, &mut outcome).await?
689                }
690                EventLogType::Folder(id) => {
691                    let patch = Patch::<WriteEvent>::new(events);
692                    let diff = FolderDiff {
693                        last_commit: Some(commit),
694                        checkpoint: proof,
695                        patch,
696                    };
697                    account.merge_folder(id, diff, &mut outcome).await?.0
698                }
699            }
700        };
701
702        if let CheckedPatch::Conflict { head, .. } = &checked_patch {
703            tracing::warn!(
704                head = ?head,
705                num_records = ?records.len(),
706                "auto_merge::rollback_rewind");
707
708            self.rollback_rewind(log_type, records).await?;
709        }
710
711        Ok(checked_patch)
712    }
713
714    #[doc(hidden)]
715    async fn rollback_rewind(
716        &self,
717        log_type: &EventLogType,
718        records: Vec<EventRecord>,
719    ) -> Result<(), <Self as RemoteSyncHandler>::Error> {
720        let account = self.account();
721        let account = account.lock().await;
722        match log_type {
723            EventLogType::Identity => {
724                let log = account.identity_log().await?;
725                let mut event_log = log.write().await;
726                event_log.apply_records(records).await?;
727            }
728            EventLogType::Account => {
729                let log = account.account_log().await?;
730                let mut event_log = log.write().await;
731                event_log.apply_records(records).await?;
732            }
733            EventLogType::Device => {
734                let log = account.device_log().await?;
735                let mut event_log = log.write().await;
736                event_log.apply_records(records).await?;
737            }
738            #[cfg(feature = "files")]
739            EventLogType::Files => {
740                let log = account.file_log().await?;
741                let mut event_log = log.write().await;
742                event_log.apply_records(records).await?;
743            }
744            EventLogType::Folder(id) => {
745                let log = account.folder_log(id).await?;
746                let mut event_log = log.write().await;
747                event_log.apply_records(records).await?;
748            }
749        }
750
751        Ok(())
752    }
753
754    /// Push the events to a remote and rewind local.
755    #[doc(hidden)]
756    async fn push_remote<T>(
757        &self,
758        log_type: &EventLogType,
759        commit: CommitHash,
760        proof: CommitProof,
761        events: Vec<EventRecord>,
762    ) -> Result<
763        (CheckedPatch, Option<CheckedPatch>),
764        <Self as RemoteSyncHandler>::Error,
765    >
766    where
767        T: Default + Send + Sync,
768    {
769        tracing::debug!(
770          log_type = ?log_type,
771          commit = %commit,
772          length = %events.len(),
773          "auto_merge::push_remote",
774        );
775
776        let req = PatchRequest {
777            log_type: *log_type,
778            commit: Some(commit),
779            proof: proof.clone(),
780            patch: events.clone(),
781        };
782
783        let remote_patch = self.client().patch(req).await?.checked_patch;
784        let local_patch = match &remote_patch {
785            CheckedPatch::Success(_) => {
786                let local_patch = self
787                    .rewind_local(log_type, commit, proof, events)
788                    .await?;
789                Some(local_patch)
790            }
791            CheckedPatch::Conflict { head, contains } => {
792                tracing::error!(
793                  head = ?head,
794                  contains = ?contains,
795                  "auto_merge::patch::conflict",
796                );
797                None
798            }
799        };
800
801        Ok((remote_patch, local_patch))
802    }
803
804    /// Rewind an event log to a specific commit.
805    #[doc(hidden)]
806    async fn rewind_event_log(
807        &self,
808        log_type: &EventLogType,
809        commit: &CommitHash,
810    ) -> Result<Vec<EventRecord>, <Self as RemoteSyncHandler>::Error> {
811        tracing::debug!(
812          log_type = ?log_type,
813          commit = %commit,
814          "automerge::rewind_event_log",
815        );
816        // Rewind the event log
817        let account = self.account();
818        let account = account.lock().await;
819        Ok(match &log_type {
820            EventLogType::Identity => {
821                let log = account.identity_log().await?;
822                let mut event_log = log.write().await;
823                event_log.rewind(commit).await?
824            }
825            EventLogType::Account => {
826                let log = account.account_log().await?;
827                let mut event_log = log.write().await;
828                event_log.rewind(commit).await?
829            }
830            EventLogType::Device => {
831                let log = account.device_log().await?;
832                let mut event_log = log.write().await;
833                event_log.rewind(commit).await?
834            }
835            #[cfg(feature = "files")]
836            EventLogType::Files => {
837                let log = account.file_log().await?;
838                let mut event_log = log.write().await;
839                event_log.rewind(commit).await?
840            }
841            EventLogType::Folder(id) => {
842                let log = account.folder_log(id).await?;
843                let mut event_log = log.write().await;
844                event_log.rewind(commit).await?
845            }
846        })
847    }
848
849    /// Scan the remote for proofs that match this client.
850    #[doc(hidden)]
851    async fn scan_proofs(
852        &self,
853        request: ScanRequest,
854    ) -> Result<
855        Option<(CommitHash, CommitProof)>,
856        <Self as RemoteSyncHandler>::Error,
857    > {
858        tracing::debug!(request = ?request, "auto_merge::scan_proofs");
859
860        let leaves = {
861            let account = self.account();
862            let account = account.lock().await;
863            match &request.log_type {
864                EventLogType::Identity => {
865                    let log = account.identity_log().await?;
866                    let event_log = log.read().await;
867                    event_log.tree().leaves().unwrap_or_default()
868                }
869                EventLogType::Account => {
870                    let log = account.account_log().await?;
871                    let event_log = log.read().await;
872                    event_log.tree().leaves().unwrap_or_default()
873                }
874                EventLogType::Device => {
875                    let log = account.device_log().await?;
876                    let event_log = log.read().await;
877                    event_log.tree().leaves().unwrap_or_default()
878                }
879                #[cfg(feature = "files")]
880                EventLogType::Files => {
881                    let log = account.file_log().await?;
882                    let event_log = log.read().await;
883                    event_log.tree().leaves().unwrap_or_default()
884                }
885                EventLogType::Folder(id) => {
886                    let log = account.folder_log(id).await?;
887                    let event_log = log.read().await;
888                    event_log.tree().leaves().unwrap_or_default()
889                }
890            }
891        };
892
893        let mut req = request.clone();
894        loop {
895            match self.iterate_scan_proofs(req.clone(), &leaves).await? {
896                ScanState::Result(value) => return Ok(Some(value)),
897                ScanState::Continue(scan) => req = scan,
898                ScanState::Exhausted => return Ok(None),
899            }
900        }
901    }
902
903    /// Scan the remote for proofs that match this client.
904    #[doc(hidden)]
905    async fn iterate_scan_proofs(
906        &self,
907        request: ScanRequest,
908        leaves: &[[u8; 32]],
909    ) -> Result<ScanState, <Self as RemoteSyncHandler>::Error> {
910        tracing::debug!(
911          request = ?request,
912          "auto_merge::iterate_scan_proofs");
913
914        let response = self.client().scan(request.clone()).await?;
915
916        // If the server gave us a first proof and we don't
917        // have it in our event log then there is no point scanning
918        // as we know the trees have diverged
919        if let Some(first_proof) = &response.first_proof {
920            let (verified, _) = first_proof.verify_leaves(leaves);
921            if !verified {
922                return Err(ConflictError::Hard.into());
923            }
924        }
925
926        if !response.proofs.is_empty() {
927            // Proofs are returned in the event log order
928            // but we always want to scan from the end of
929            // the event log so reverse the iteration
930            for proof in response.proofs.iter().rev() {
931                // Find the last matching commit from the indices
932                // to prove
933                if let Some(commit_hash) = self.compare_proof(proof, leaves) {
934                    // Compute the root hash and proof for the
935                    // matched index
936                    let index = proof.indices.last().copied().unwrap();
937                    let new_leaves = &leaves[0..=index];
938                    let mut new_leaves = new_leaves.to_vec();
939                    let mut new_tree = CommitTree::new();
940                    new_tree.append(&mut new_leaves);
941                    new_tree.commit();
942
943                    let checkpoint_proof = new_tree.head()?;
944                    return Ok(ScanState::Result((
945                        commit_hash,
946                        checkpoint_proof,
947                    )));
948                }
949            }
950
951            // Try to scan more proofs
952            let mut req = request;
953            req.offset = response.offset;
954
955            Ok(ScanState::Continue(req))
956        } else {
957            Ok(ScanState::Exhausted)
958        }
959    }
960
961    /// Determine if a local event log contains a proof
962    /// received from the server.
963    #[doc(hidden)]
964    fn compare_proof(
965        &self,
966        proof: &CommitProof,
967        leaves: &[[u8; 32]],
968    ) -> Option<CommitHash> {
969        let (verified, leaves) = proof.verify_leaves(leaves);
970
971        tracing::trace!(
972            proof = ?proof,
973            verified = ?verified,
974            "auto_merge::compare_proof",
975        );
976
977        if verified {
978            leaves.last().copied().map(CommitHash)
979        } else {
980            None
981        }
982    }
983}