sos_sync/
traits.rs

1//! Core traits for storage that supports synchronization.
2use crate::{
3    CreateSet, DebugEventLogs, DebugEvents, DebugTree, MaybeDiff,
4    MergeOutcome, SyncCompare, SyncDiff, SyncStatus, TrackedChanges,
5    UpdateSet,
6};
7use async_trait::async_trait;
8use indexmap::{IndexMap, IndexSet};
9use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
10use sos_core::commit::CommitHash;
11use sos_core::events::WriteEvent;
12use sos_core::AccountId;
13use sos_core::{
14    commit::{CommitState, CommitTree, Comparison},
15    events::{
16        patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
17        EventLog,
18    },
19    VaultId,
20};
21use sos_vault::Summary;
22use std::{
23    collections::{HashMap, HashSet},
24    sync::Arc,
25};
26use tokio::sync::RwLock;
27
28#[cfg(feature = "files")]
29use {
30    sos_backend::FileEventLog,
31    sos_core::{events::patch::FileDiff, ExternalFile},
32};
33
34macro_rules! debug_tree_events {
35    ($event_log:expr) => {{
36        let root = $event_log.tree().root();
37        let leaves: Vec<_> = $event_log
38            .tree()
39            .leaves()
40            .unwrap_or_default()
41            .into_iter()
42            .map(CommitHash)
43            .collect();
44        DebugEvents {
45            root,
46            length: leaves.len(),
47            leaves,
48        }
49    }};
50}
51
52/// References to the storage event logs.
53#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
54#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
55pub trait StorageEventLogs: Send + Sync + 'static {
56    /// Error type for storage event logs.
57    type Error: std::error::Error
58        + std::fmt::Debug
59        + From<sos_core::Error>
60        + From<sos_backend::Error>
61        + From<crate::Error>
62        + Send
63        + Sync
64        + 'static;
65
66    /// Clone of the identity log.
67    async fn identity_log(
68        &self,
69    ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
70
71    /// Clone of the account log.
72    async fn account_log(
73        &self,
74    ) -> Result<Arc<RwLock<AccountEventLog>>, Self::Error>;
75
76    /// Clone of the device log.
77    async fn device_log(
78        &self,
79    ) -> Result<Arc<RwLock<DeviceEventLog>>, Self::Error>;
80
81    /// Clone of the file log.
82    #[cfg(feature = "files")]
83    async fn file_log(
84        &self,
85    ) -> Result<Arc<RwLock<FileEventLog>>, Self::Error>;
86
87    /// Canonical collection of files reduced from the file event log.
88    #[cfg(feature = "files")]
89    async fn canonical_files(
90        &self,
91    ) -> Result<IndexSet<ExternalFile>, Self::Error> {
92        use sos_reducers::FileReducer;
93        let files = self.file_log().await?;
94        let event_log = files.read().await;
95
96        // Canonical list of external files.
97        let reducer = FileReducer::new(&*event_log);
98        Ok(reducer.reduce(None).await?)
99    }
100
101    /// Folders managed by this storage.
102    ///
103    /// Built from the in-memory list of folders.
104    async fn folder_details(&self) -> Result<IndexSet<Summary>, Self::Error>;
105
106    /// Folder event log.
107    async fn folder_log(
108        &self,
109        id: &VaultId,
110    ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
111}
112
113/// Types that can merge diffs.
114#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
115#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
116pub trait Merge: StorageEventLogs {
117    /// Merge changes to the identity folder.
118    async fn merge_identity(
119        &mut self,
120        diff: FolderDiff,
121        outcome: &mut MergeOutcome,
122    ) -> std::result::Result<CheckedPatch, Self::Error>;
123
124    /// Compare the identity folder.
125    async fn compare_identity(
126        &self,
127        state: &CommitState,
128    ) -> std::result::Result<Comparison, Self::Error> {
129        let log = self.identity_log().await?;
130        let event_log = log.read().await;
131        Ok(event_log.tree().compare(&state.1)?)
132    }
133
134    /// Merge changes to the account event log.
135    async fn merge_account(
136        &mut self,
137        diff: AccountDiff,
138        outcome: &mut MergeOutcome,
139    ) -> std::result::Result<(CheckedPatch, HashSet<VaultId>), Self::Error>;
140
141    /// Compare the account events.
142    async fn compare_account(
143        &self,
144        state: &CommitState,
145    ) -> std::result::Result<Comparison, Self::Error> {
146        let log = self.account_log().await?;
147        let event_log = log.read().await;
148        Ok(event_log.tree().compare(&state.1)?)
149    }
150
151    /// Merge changes to the devices event log.
152    async fn merge_device(
153        &mut self,
154        diff: DeviceDiff,
155        outcome: &mut MergeOutcome,
156    ) -> std::result::Result<CheckedPatch, Self::Error>;
157
158    /// Compare the device events.
159    async fn compare_device(
160        &self,
161        state: &CommitState,
162    ) -> std::result::Result<Comparison, Self::Error> {
163        let log = self.device_log().await?;
164        let event_log = log.read().await;
165        Ok(event_log.tree().compare(&state.1)?)
166    }
167
168    /// Merge changes to the files event log.
169    #[cfg(feature = "files")]
170    async fn merge_files(
171        &mut self,
172        diff: FileDiff,
173        outcome: &mut MergeOutcome,
174    ) -> std::result::Result<CheckedPatch, Self::Error>;
175
176    /// Compare the file events.
177    #[cfg(feature = "files")]
178    async fn compare_files(
179        &self,
180        state: &CommitState,
181    ) -> std::result::Result<Comparison, Self::Error> {
182        let log = self.file_log().await?;
183        let event_log = log.read().await;
184        Ok(event_log.tree().compare(&state.1)?)
185    }
186
187    /// Merge changes to a folder.
188    async fn merge_folder(
189        &mut self,
190        folder_id: &VaultId,
191        diff: FolderDiff,
192        outcome: &mut MergeOutcome,
193    ) -> std::result::Result<(CheckedPatch, Vec<WriteEvent>), Self::Error>;
194
195    /// Compare folder events.
196    async fn compare_folder(
197        &self,
198        folder_id: &VaultId,
199        state: &CommitState,
200    ) -> std::result::Result<Comparison, Self::Error> {
201        let event_log = self.folder_log(folder_id).await?;
202        let reader = event_log.read().await;
203        Ok(reader.tree().compare(&state.1)?)
204    }
205
206    /// Compare the local state to a remote status.
207    async fn compare(
208        &mut self,
209        remote_status: &SyncStatus,
210    ) -> std::result::Result<SyncCompare, Self::Error> {
211        let mut compare = SyncCompare::default();
212
213        compare.identity =
214            Some(self.compare_identity(&remote_status.identity).await?);
215
216        compare.account =
217            Some(self.compare_account(&remote_status.account).await?);
218
219        compare.device =
220            Some(self.compare_device(&remote_status.device).await?);
221
222        #[cfg(feature = "files")]
223        if let Some(files) = &remote_status.files {
224            compare.files = Some(self.compare_files(files).await?);
225        }
226
227        for (id, folder_status) in &remote_status.folders {
228            compare
229                .folders
230                .insert(*id, self.compare_folder(id, folder_status).await?);
231        }
232
233        Ok(compare)
234    }
235
236    /// Merge a diff into this storage.
237    async fn merge(
238        &mut self,
239        diff: SyncDiff,
240        outcome: &mut MergeOutcome,
241    ) -> std::result::Result<SyncCompare, Self::Error> {
242        let mut compare = SyncCompare::default();
243
244        match diff.identity {
245            Some(MaybeDiff::Diff(diff)) => {
246                self.merge_identity(diff, outcome).await?;
247            }
248            Some(MaybeDiff::Compare(state)) => {
249                if let Some(state) = state {
250                    compare.identity =
251                        Some(self.compare_identity(&state).await?);
252                }
253            }
254            None => {}
255        }
256
257        let mut deleted_folders = HashSet::new();
258
259        match diff.account {
260            Some(MaybeDiff::Diff(diff)) => {
261                let (_, deletions) =
262                    self.merge_account(diff, outcome).await?;
263                deleted_folders = deletions;
264            }
265            Some(MaybeDiff::Compare(state)) => {
266                if let Some(state) = state {
267                    compare.account =
268                        Some(self.compare_account(&state).await?);
269                }
270            }
271            None => {}
272        }
273
274        match diff.device {
275            Some(MaybeDiff::Diff(diff)) => {
276                self.merge_device(diff, outcome).await?;
277            }
278            Some(MaybeDiff::Compare(state)) => {
279                if let Some(state) = state {
280                    compare.device = Some(self.compare_device(&state).await?);
281                }
282            }
283            None => {}
284        }
285
286        #[cfg(feature = "files")]
287        match diff.files {
288            Some(MaybeDiff::Diff(diff)) => {
289                self.merge_files(diff, outcome).await?;
290            }
291            Some(MaybeDiff::Compare(state)) => {
292                if let Some(state) = state {
293                    compare.files = Some(self.compare_files(&state).await?);
294                }
295            }
296            None => {}
297        }
298
299        for (id, maybe_diff) in diff.folders {
300            // Don't bother trying to merge folders that
301            // have been deleted
302            if deleted_folders.contains(&id) {
303                tracing::debug!(
304                    folder_id = %id,
305                    "merge::ignore_deleted_folder");
306                continue;
307            }
308            match maybe_diff {
309                MaybeDiff::Diff(diff) => {
310                    self.merge_folder(&id, diff, outcome).await?;
311                }
312                MaybeDiff::Compare(state) => {
313                    if let Some(state) = state {
314                        compare.folders.insert(
315                            id,
316                            self.compare_folder(&id, &state).await?,
317                        );
318                    }
319                }
320            }
321        }
322
323        tracing::debug!(num_changes = %outcome.changes, "merge complete");
324
325        Ok(compare)
326    }
327}
328
329/// Types that can force merge a diff.
330///
331/// Force merge deletes all events from the log and
332/// applies the diff patch as a new set of events.
333///
334/// Use this when event logs have completely diverged
335/// and need to be rewritten.
336#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
337#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
338pub trait ForceMerge: Merge {
339    /// Force merge from a set of updates.
340    ///
341    /// Update an account from a change set of event diffs.
342    ///
343    /// Overwrites all existing account data with the event logs
344    /// in the change set.
345    ///
346    /// Intended to be used to perform a destructive overwrite
347    /// when changing the encryption cipher or other events
348    /// which rewrite the account data.
349    async fn force_merge_update(
350        &mut self,
351        mut update_set: UpdateSet,
352        outcome: &mut MergeOutcome,
353    ) -> std::result::Result<(), Self::Error> {
354        if let Some(diff) = update_set.identity.take() {
355            self.force_merge_identity(diff, outcome).await?;
356        }
357
358        if let Some(diff) = update_set.account.take() {
359            self.force_merge_account(diff, outcome).await?;
360        }
361
362        if let Some(diff) = update_set.device.take() {
363            self.force_merge_device(diff, outcome).await?;
364        }
365
366        #[cfg(feature = "files")]
367        if let Some(diff) = update_set.files.take() {
368            self.force_merge_files(diff, outcome).await?;
369        }
370
371        for (id, folder) in update_set.folders {
372            self.force_merge_folder(&id, folder, outcome).await?;
373        }
374        Ok(())
375    }
376
377    /// Force merge changes to the identity folder.
378    async fn force_merge_identity(
379        &mut self,
380        source: FolderDiff,
381        outcome: &mut MergeOutcome,
382    ) -> std::result::Result<(), Self::Error>;
383
384    /// Force merge changes to the account event log.
385    async fn force_merge_account(
386        &mut self,
387        diff: AccountDiff,
388        outcome: &mut MergeOutcome,
389    ) -> std::result::Result<(), Self::Error> {
390        let len = diff.patch.len() as u64;
391
392        tracing::debug!(
393            checkpoint = ?diff.checkpoint,
394            num_events = len,
395            "force_merge::account",
396        );
397
398        let event_log = self.account_log().await?;
399        let mut event_log = event_log.write().await;
400        event_log.replace_all_events(&diff).await?;
401
402        outcome.changes += len;
403        outcome.tracked.account =
404            TrackedChanges::new_account_records(&diff.patch).await?;
405
406        Ok(())
407    }
408
409    /// Force merge changes to the devices event log.
410    async fn force_merge_device(
411        &mut self,
412        diff: DeviceDiff,
413        outcome: &mut MergeOutcome,
414    ) -> std::result::Result<(), Self::Error> {
415        let len = diff.patch.len() as u64;
416
417        tracing::debug!(
418            checkpoint = ?diff.checkpoint,
419            num_events = len,
420            "force_merge::device",
421        );
422
423        let event_log = self.device_log().await?;
424        let mut event_log = event_log.write().await;
425        event_log.replace_all_events(&diff).await?;
426
427        outcome.changes += len;
428        outcome.tracked.device =
429            TrackedChanges::new_device_records(&diff.patch).await?;
430
431        Ok(())
432    }
433
434    /// Force merge changes to the files event log.
435    #[cfg(feature = "files")]
436    async fn force_merge_files(
437        &mut self,
438        diff: FileDiff,
439        outcome: &mut MergeOutcome,
440    ) -> std::result::Result<(), Self::Error> {
441        let len = diff.patch.len() as u64;
442
443        tracing::debug!(
444            checkpoint = ?diff.checkpoint,
445            num_events = len,
446            "force_merge::files",
447        );
448
449        let event_log = self.file_log().await?;
450        let mut event_log = event_log.write().await;
451        event_log.replace_all_events(&diff).await?;
452
453        outcome.changes += len;
454        outcome.tracked.files =
455            TrackedChanges::new_file_records(&diff.patch).await?;
456
457        Ok(())
458    }
459
460    /// Force merge changes to a folder.
461    async fn force_merge_folder(
462        &mut self,
463        folder_id: &VaultId,
464        source: FolderDiff,
465        outcome: &mut MergeOutcome,
466    ) -> std::result::Result<(), Self::Error>;
467}
468
469/// Storage implementations that can synchronize.
470#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
471#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
472pub trait SyncStorage: ForceMerge {
473    /// Determine if this is client-side storage.
474    fn is_client_storage(&self) -> bool;
475
476    /// Sync status for the storage.
477    async fn sync_status(
478        &self,
479    ) -> std::result::Result<SyncStatus, Self::Error> {
480        // NOTE: the order for computing the cumulative
481        // NOTE: root hash must be identical to the logic
482        // NOTE: in the server implementation and the folders
483        // NOTE: collection must be sorted so that the folders
484        // NOTE: root hash is deterministic
485
486        let summaries = self.folder_details().await?;
487
488        let identity = {
489            let event_log = self.identity_log().await?;
490            let reader = event_log.read().await;
491            reader.tree().commit_state()?
492        };
493
494        let account = {
495            let event_log = self.account_log().await?;
496            let reader = event_log.read().await;
497            reader.tree().commit_state()?
498        };
499
500        let device = {
501            let event_log = self.device_log().await?;
502            let reader = event_log.read().await;
503            reader.tree().commit_state()?
504        };
505
506        #[cfg(feature = "files")]
507        let files = {
508            let event_log = self.file_log().await?;
509            let reader = event_log.read().await;
510            if reader.tree().is_empty() {
511                None
512            } else {
513                Some(reader.tree().commit_state()?)
514            }
515        };
516
517        let mut folders = IndexMap::new();
518        let mut folder_roots: Vec<(&VaultId, [u8; 32])> = Vec::new();
519        for summary in &summaries {
520            let event_log = self.folder_log(summary.id()).await?;
521            let reader = event_log.read().await;
522            let commit_state = reader.tree().commit_state()?;
523            folder_roots.push((summary.id(), commit_state.1.root().into()));
524            folders.insert(*summary.id(), commit_state);
525        }
526
527        // Compute a root hash of all the trees for an account
528        let mut root_tree = CommitTree::new();
529        let mut root_commits = vec![
530            identity.1.root().into(),
531            account.1.root().into(),
532            device.1.root().into(),
533        ];
534        #[cfg(feature = "files")]
535        if let Some(files) = &files {
536            root_commits.push(files.1.root().into());
537        }
538
539        folder_roots.sort_by(|a, b| a.0.cmp(b.0));
540        let mut folder_roots =
541            folder_roots.into_iter().map(|f| f.1).collect::<Vec<_>>();
542        root_commits.append(&mut folder_roots);
543        root_tree.append(&mut root_commits);
544        root_tree.commit();
545
546        let root = root_tree.root().ok_or(sos_core::Error::NoRootCommit)?;
547
548        Ok(SyncStatus {
549            root,
550            identity,
551            account,
552            device,
553            #[cfg(feature = "files")]
554            files,
555            folders,
556        })
557    }
558
559    /// Set of all event logs.
560    ///
561    /// Used by network aware implementations to transfer
562    /// entire accounts.
563    async fn create_set(
564        &self,
565    ) -> std::result::Result<CreateSet, Self::Error> {
566        let identity = {
567            let log = self.identity_log().await?;
568            let reader = log.read().await;
569            reader.diff_events(None).await?
570        };
571
572        let account = {
573            let log = self.account_log().await?;
574            let reader = log.read().await;
575            reader.diff_events(None).await?
576        };
577
578        let device = {
579            let log = self.device_log().await?;
580            let reader = log.read().await;
581            reader.diff_events(None).await?
582        };
583
584        #[cfg(feature = "files")]
585        let files = {
586            let log = self.file_log().await?;
587            let reader = log.read().await;
588            reader.diff_events(None).await?
589        };
590
591        let mut folders = HashMap::new();
592        let details = self.folder_details().await?;
593
594        for folder in details {
595            if folder.flags().is_sync_disabled() {
596                tracing::debug!(
597                    folder_id = %folder.id(),
598                    "create_set::ignore::no_sync_flag");
599                continue;
600            }
601            let event_log = self.folder_log(folder.id()).await?;
602            let log_file = event_log.read().await;
603            folders.insert(*folder.id(), log_file.diff_events(None).await?);
604        }
605
606        Ok(CreateSet {
607            identity,
608            account,
609            folders,
610            device,
611            #[cfg(feature = "files")]
612            files,
613        })
614    }
615
616    /// Create a debug tree of this account.
617    async fn debug_account_tree(
618        &self,
619        account_id: AccountId,
620    ) -> Result<DebugTree, Self::Error> {
621        let status = self.sync_status().await?;
622
623        // Redact folder names
624        let folder_list = {
625            let mut folder_list = self.folder_details().await?;
626            let mut redacted = IndexSet::new();
627            for mut folder in folder_list.drain(..) {
628                folder.set_name(String::new());
629                redacted.insert(folder);
630            }
631            redacted
632        };
633
634        let identity = {
635            let event_log = self.identity_log().await?;
636            let event_log = event_log.read().await;
637            debug_tree_events!(&*event_log)
638        };
639
640        let account = {
641            let event_log = self.account_log().await?;
642            let event_log = event_log.read().await;
643            debug_tree_events!(&*event_log)
644        };
645
646        let device = {
647            let event_log = self.device_log().await?;
648            let event_log = event_log.read().await;
649            debug_tree_events!(&*event_log)
650        };
651
652        #[cfg(feature = "files")]
653        let file = {
654            let event_log = self.file_log().await?;
655            let event_log = event_log.read().await;
656            debug_tree_events!(&*event_log)
657        };
658
659        let folders = {
660            let mut folders = HashMap::new();
661            for summary in &folder_list {
662                let event_log = self.folder_log(summary.id()).await?;
663                let event_log = event_log.read().await;
664                folders
665                    .insert(*summary.id(), debug_tree_events!(&*event_log));
666            }
667            folders
668        };
669
670        Ok(DebugTree {
671            account_id,
672            status,
673            folders: folder_list,
674            events: DebugEventLogs {
675                identity,
676                account,
677                device,
678                #[cfg(feature = "files")]
679                file,
680                folders,
681            },
682        })
683    }
684}