sos_sync/
traits.rs

1//! Core traits for storage that supports synchronization.
2use crate::UpdateSet;
3use crate::{
4    CreateSet, MaybeDiff, MergeOutcome, SyncCompare, SyncDiff, SyncStatus,
5    TrackedChanges,
6};
7use async_trait::async_trait;
8use indexmap::IndexMap;
9use indexmap::IndexSet;
10use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
11use sos_core::events::WriteEvent;
12use sos_core::{
13    commit::{CommitState, CommitTree, Comparison},
14    events::{
15        patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
16        EventLog,
17    },
18    VaultId,
19};
20use sos_vault::Summary;
21use std::{
22    collections::{HashMap, HashSet},
23    sync::Arc,
24};
25use tokio::sync::RwLock;
26
27#[cfg(feature = "files")]
28use {
29    sos_backend::FileEventLog,
30    sos_core::{events::patch::FileDiff, ExternalFile},
31};
32
33/// References to the storage event logs.
34#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
35#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
36pub trait StorageEventLogs: Send + Sync + 'static {
37    /// Error type for storage event logs.
38    type Error: std::error::Error
39        + std::fmt::Debug
40        + From<sos_core::Error>
41        + From<sos_backend::Error>
42        + From<crate::Error>
43        + Send
44        + Sync
45        + 'static;
46
47    /// Clone of the identity log.
48    async fn identity_log(
49        &self,
50    ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
51
52    /// Clone of the account log.
53    async fn account_log(
54        &self,
55    ) -> Result<Arc<RwLock<AccountEventLog>>, Self::Error>;
56
57    /// Clone of the device log.
58    async fn device_log(
59        &self,
60    ) -> Result<Arc<RwLock<DeviceEventLog>>, Self::Error>;
61
62    /// Clone of the file log.
63    #[cfg(feature = "files")]
64    async fn file_log(
65        &self,
66    ) -> Result<Arc<RwLock<FileEventLog>>, Self::Error>;
67
68    /// Canonical collection of files reduced from the file event log.
69    #[cfg(feature = "files")]
70    async fn canonical_files(
71        &self,
72    ) -> Result<IndexSet<ExternalFile>, Self::Error> {
73        use sos_reducers::FileReducer;
74        let files = self.file_log().await?;
75        let event_log = files.read().await;
76
77        // Canonical list of external files.
78        let reducer = FileReducer::new(&*event_log);
79        Ok(reducer.reduce(None).await?)
80    }
81
82    /// Folders managed by this storage.
83    ///
84    /// Built from the in-memory list of folders.
85    async fn folder_details(&self) -> Result<IndexSet<Summary>, Self::Error>;
86
87    /// Folder event log.
88    async fn folder_log(
89        &self,
90        id: &VaultId,
91    ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
92
93    /*
94    /// Load all commit trees into memory.
95    ///
96    /// Typically, commit trees are loaded into memory on-demand
97    /// however sometimes it's useful to compute the sync status
98    /// for storage; for example, it is used by the database
99    /// upgrader to ensure imported accounts exactly match the
100    /// legacy file system accounts.
101    async fn load_all_events(&self) -> Result<(), Self::Error> {
102        let identity = self.identity_log().await?;
103        let mut identity = identity.write().await;
104        identity.load_tree().await?;
105
106        let account = self.account_log().await?;
107        let mut account = account.write().await;
108        account.load_tree().await?;
109
110        let device = self.device_log().await?;
111        let mut device = device.write().await;
112        device.load_tree().await?;
113
114        #[cfg(feature = "files")]
115        {
116            let files = self.file_log().await?;
117            let mut files = files.write().await;
118            files.load_tree().await?;
119        }
120
121        let folders = self.folder_details().await?;
122        for folder in &folders {
123            let event_log = self.folder_log(folder.id()).await?;
124            let mut event_log = event_log.write().await;
125            event_log.load_tree().await?;
126        }
127
128        Ok(())
129    }
130    */
131}
132
133/// Types that can merge diffs.
134#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136pub trait Merge: StorageEventLogs {
137    /// Merge changes to the identity folder.
138    async fn merge_identity(
139        &mut self,
140        diff: FolderDiff,
141        outcome: &mut MergeOutcome,
142    ) -> std::result::Result<CheckedPatch, Self::Error>;
143
144    /// Compare the identity folder.
145    async fn compare_identity(
146        &self,
147        state: &CommitState,
148    ) -> std::result::Result<Comparison, Self::Error> {
149        let log = self.identity_log().await?;
150        let event_log = log.read().await;
151        Ok(event_log.tree().compare(&state.1)?)
152    }
153
154    /// Merge changes to the account event log.
155    async fn merge_account(
156        &mut self,
157        diff: AccountDiff,
158        outcome: &mut MergeOutcome,
159    ) -> std::result::Result<(CheckedPatch, HashSet<VaultId>), Self::Error>;
160
161    /// Compare the account events.
162    async fn compare_account(
163        &self,
164        state: &CommitState,
165    ) -> std::result::Result<Comparison, Self::Error> {
166        let log = self.account_log().await?;
167        let event_log = log.read().await;
168        Ok(event_log.tree().compare(&state.1)?)
169    }
170
171    /// Merge changes to the devices event log.
172    async fn merge_device(
173        &mut self,
174        diff: DeviceDiff,
175        outcome: &mut MergeOutcome,
176    ) -> std::result::Result<CheckedPatch, Self::Error>;
177
178    /// Compare the device events.
179    async fn compare_device(
180        &self,
181        state: &CommitState,
182    ) -> std::result::Result<Comparison, Self::Error> {
183        let log = self.device_log().await?;
184        let event_log = log.read().await;
185        Ok(event_log.tree().compare(&state.1)?)
186    }
187
188    /// Merge changes to the files event log.
189    #[cfg(feature = "files")]
190    async fn merge_files(
191        &mut self,
192        diff: FileDiff,
193        outcome: &mut MergeOutcome,
194    ) -> std::result::Result<CheckedPatch, Self::Error>;
195
196    /// Compare the file events.
197    #[cfg(feature = "files")]
198    async fn compare_files(
199        &self,
200        state: &CommitState,
201    ) -> std::result::Result<Comparison, Self::Error> {
202        let log = self.file_log().await?;
203        let event_log = log.read().await;
204        Ok(event_log.tree().compare(&state.1)?)
205    }
206
207    /// Merge changes to a folder.
208    async fn merge_folder(
209        &mut self,
210        folder_id: &VaultId,
211        diff: FolderDiff,
212        outcome: &mut MergeOutcome,
213    ) -> std::result::Result<(CheckedPatch, Vec<WriteEvent>), Self::Error>;
214
215    /// Compare folder events.
216    async fn compare_folder(
217        &self,
218        folder_id: &VaultId,
219        state: &CommitState,
220    ) -> std::result::Result<Comparison, Self::Error> {
221        let event_log = self.folder_log(folder_id).await?;
222        let reader = event_log.read().await;
223        Ok(reader.tree().compare(&state.1)?)
224    }
225
226    /// Compare the local state to a remote status.
227    async fn compare(
228        &mut self,
229        remote_status: &SyncStatus,
230    ) -> std::result::Result<SyncCompare, Self::Error> {
231        let mut compare = SyncCompare::default();
232
233        compare.identity =
234            Some(self.compare_identity(&remote_status.identity).await?);
235
236        compare.account =
237            Some(self.compare_account(&remote_status.account).await?);
238
239        compare.device =
240            Some(self.compare_device(&remote_status.device).await?);
241
242        #[cfg(feature = "files")]
243        if let Some(files) = &remote_status.files {
244            compare.files = Some(self.compare_files(files).await?);
245        }
246
247        for (id, folder_status) in &remote_status.folders {
248            compare
249                .folders
250                .insert(*id, self.compare_folder(id, folder_status).await?);
251        }
252
253        Ok(compare)
254    }
255
256    /// Merge a diff into this storage.
257    async fn merge(
258        &mut self,
259        diff: SyncDiff,
260        outcome: &mut MergeOutcome,
261    ) -> std::result::Result<SyncCompare, Self::Error> {
262        let mut compare = SyncCompare::default();
263
264        match diff.identity {
265            Some(MaybeDiff::Diff(diff)) => {
266                self.merge_identity(diff, outcome).await?;
267            }
268            Some(MaybeDiff::Compare(state)) => {
269                if let Some(state) = state {
270                    compare.identity =
271                        Some(self.compare_identity(&state).await?);
272                }
273            }
274            None => {}
275        }
276
277        let mut deleted_folders = HashSet::new();
278
279        match diff.account {
280            Some(MaybeDiff::Diff(diff)) => {
281                let (_, deletions) =
282                    self.merge_account(diff, outcome).await?;
283                deleted_folders = deletions;
284            }
285            Some(MaybeDiff::Compare(state)) => {
286                if let Some(state) = state {
287                    compare.account =
288                        Some(self.compare_account(&state).await?);
289                }
290            }
291            None => {}
292        }
293
294        match diff.device {
295            Some(MaybeDiff::Diff(diff)) => {
296                self.merge_device(diff, outcome).await?;
297            }
298            Some(MaybeDiff::Compare(state)) => {
299                if let Some(state) = state {
300                    compare.device = Some(self.compare_device(&state).await?);
301                }
302            }
303            None => {}
304        }
305
306        #[cfg(feature = "files")]
307        match diff.files {
308            Some(MaybeDiff::Diff(diff)) => {
309                self.merge_files(diff, outcome).await?;
310            }
311            Some(MaybeDiff::Compare(state)) => {
312                if let Some(state) = state {
313                    compare.files = Some(self.compare_files(&state).await?);
314                }
315            }
316            None => {}
317        }
318
319        for (id, maybe_diff) in diff.folders {
320            // Don't bother trying to merge folders that
321            // have been deleted
322            if deleted_folders.contains(&id) {
323                tracing::debug!(
324                    folder_id = %id,
325                    "merge::ignore_deleted_folder");
326                continue;
327            }
328            match maybe_diff {
329                MaybeDiff::Diff(diff) => {
330                    self.merge_folder(&id, diff, outcome).await?;
331                }
332                MaybeDiff::Compare(state) => {
333                    if let Some(state) = state {
334                        compare.folders.insert(
335                            id,
336                            self.compare_folder(&id, &state).await?,
337                        );
338                    }
339                }
340            }
341        }
342
343        tracing::debug!(num_changes = %outcome.changes, "merge complete");
344
345        Ok(compare)
346    }
347}
348
349/// Types that can force merge a diff.
350///
351/// Force merge deletes all events from the log and
352/// applies the diff patch as a new set of events.
353///
354/// Use this when event logs have completely diverged
355/// and need to be rewritten.
356#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
357#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
358pub trait ForceMerge: Merge {
359    /// Force merge from a set of updates.
360    ///
361    /// Update an account from a change set of event diffs.
362    ///
363    /// Overwrites all existing account data with the event logs
364    /// in the change set.
365    ///
366    /// Intended to be used to perform a destructive overwrite
367    /// when changing the encryption cipher or other events
368    /// which rewrite the account data.
369    async fn force_merge_update(
370        &mut self,
371        mut update_set: UpdateSet,
372        outcome: &mut MergeOutcome,
373    ) -> std::result::Result<(), Self::Error> {
374        if let Some(diff) = update_set.identity.take() {
375            self.force_merge_identity(diff, outcome).await?;
376        }
377
378        if let Some(diff) = update_set.account.take() {
379            self.force_merge_account(diff, outcome).await?;
380        }
381
382        if let Some(diff) = update_set.device.take() {
383            self.force_merge_device(diff, outcome).await?;
384        }
385
386        #[cfg(feature = "files")]
387        if let Some(diff) = update_set.files.take() {
388            self.force_merge_files(diff, outcome).await?;
389        }
390
391        for (id, folder) in update_set.folders {
392            self.force_merge_folder(&id, folder, outcome).await?;
393        }
394        Ok(())
395    }
396
397    /// Force merge changes to the identity folder.
398    async fn force_merge_identity(
399        &mut self,
400        source: FolderDiff,
401        outcome: &mut MergeOutcome,
402    ) -> std::result::Result<(), Self::Error>;
403
404    /// Force merge changes to the account event log.
405    async fn force_merge_account(
406        &mut self,
407        diff: AccountDiff,
408        outcome: &mut MergeOutcome,
409    ) -> std::result::Result<(), Self::Error> {
410        let len = diff.patch.len() as u64;
411
412        tracing::debug!(
413            checkpoint = ?diff.checkpoint,
414            num_events = len,
415            "force_merge::account",
416        );
417
418        let event_log = self.account_log().await?;
419        let mut event_log = event_log.write().await;
420        event_log.replace_all_events(&diff).await?;
421
422        outcome.changes += len;
423        outcome.tracked.account =
424            TrackedChanges::new_account_records(&diff.patch).await?;
425
426        Ok(())
427    }
428
429    /// Force merge changes to the devices event log.
430    async fn force_merge_device(
431        &mut self,
432        diff: DeviceDiff,
433        outcome: &mut MergeOutcome,
434    ) -> std::result::Result<(), Self::Error> {
435        let len = diff.patch.len() as u64;
436
437        tracing::debug!(
438            checkpoint = ?diff.checkpoint,
439            num_events = len,
440            "force_merge::device",
441        );
442
443        let event_log = self.device_log().await?;
444        let mut event_log = event_log.write().await;
445        event_log.replace_all_events(&diff).await?;
446
447        outcome.changes += len;
448        outcome.tracked.device =
449            TrackedChanges::new_device_records(&diff.patch).await?;
450
451        Ok(())
452    }
453
454    /// Force merge changes to the files event log.
455    #[cfg(feature = "files")]
456    async fn force_merge_files(
457        &mut self,
458        diff: FileDiff,
459        outcome: &mut MergeOutcome,
460    ) -> std::result::Result<(), Self::Error> {
461        let len = diff.patch.len() as u64;
462
463        tracing::debug!(
464            checkpoint = ?diff.checkpoint,
465            num_events = len,
466            "force_merge::files",
467        );
468
469        let event_log = self.file_log().await?;
470        let mut event_log = event_log.write().await;
471        event_log.replace_all_events(&diff).await?;
472
473        outcome.changes += len;
474        outcome.tracked.files =
475            TrackedChanges::new_file_records(&diff.patch).await?;
476
477        Ok(())
478    }
479
480    /// Force merge changes to a folder.
481    async fn force_merge_folder(
482        &mut self,
483        folder_id: &VaultId,
484        source: FolderDiff,
485        outcome: &mut MergeOutcome,
486    ) -> std::result::Result<(), Self::Error>;
487}
488
489/// Storage implementations that can synchronize.
490#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
491#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
492pub trait SyncStorage: ForceMerge {
493    /// Determine if this is client-side storage.
494    fn is_client_storage(&self) -> bool;
495
496    /// Sync status for the storage.
497    async fn sync_status(
498        &self,
499    ) -> std::result::Result<SyncStatus, Self::Error> {
500        // NOTE: the order for computing the cumulative
501        // NOTE: root hash must be identical to the logic
502        // NOTE: in the server implementation and the folders
503        // NOTE: collection must be sorted so that the folders
504        // NOTE: root hash is deterministic
505
506        let summaries = self.folder_details().await?;
507
508        let identity = {
509            let event_log = self.identity_log().await?;
510            let reader = event_log.read().await;
511            reader.tree().commit_state()?
512        };
513
514        let account = {
515            let event_log = self.account_log().await?;
516            let reader = event_log.read().await;
517            reader.tree().commit_state()?
518        };
519
520        let device = {
521            let event_log = self.device_log().await?;
522            let reader = event_log.read().await;
523            reader.tree().commit_state()?
524        };
525
526        #[cfg(feature = "files")]
527        let files = {
528            let event_log = self.file_log().await?;
529            let reader = event_log.read().await;
530            if reader.tree().is_empty() {
531                None
532            } else {
533                Some(reader.tree().commit_state()?)
534            }
535        };
536
537        let mut folders = IndexMap::new();
538        let mut folder_roots: Vec<(&VaultId, [u8; 32])> = Vec::new();
539        for summary in &summaries {
540            let event_log = self.folder_log(summary.id()).await?;
541            let reader = event_log.read().await;
542            let commit_state = reader.tree().commit_state()?;
543            folder_roots.push((summary.id(), commit_state.1.root().into()));
544            folders.insert(*summary.id(), commit_state);
545        }
546
547        // Compute a root hash of all the trees for an account
548        let mut root_tree = CommitTree::new();
549        let mut root_commits = vec![
550            identity.1.root().into(),
551            account.1.root().into(),
552            device.1.root().into(),
553        ];
554        #[cfg(feature = "files")]
555        if let Some(files) = &files {
556            root_commits.push(files.1.root().into());
557        }
558
559        folder_roots.sort_by(|a, b| a.0.cmp(b.0));
560        let mut folder_roots =
561            folder_roots.into_iter().map(|f| f.1).collect::<Vec<_>>();
562        root_commits.append(&mut folder_roots);
563        root_tree.append(&mut root_commits);
564        root_tree.commit();
565
566        let root = root_tree.root().ok_or(sos_core::Error::NoRootCommit)?;
567
568        Ok(SyncStatus {
569            root,
570            identity,
571            account,
572            device,
573            #[cfg(feature = "files")]
574            files,
575            folders,
576        })
577    }
578
579    /// Change set of all event logs.
580    ///
581    /// Used by network aware implementations to transfer
582    /// entire accounts.
583    async fn change_set(
584        &self,
585    ) -> std::result::Result<CreateSet, Self::Error> {
586        let identity = {
587            let log = self.identity_log().await?;
588            let reader = log.read().await;
589            reader.diff_events(None).await?
590        };
591
592        let account = {
593            let log = self.account_log().await?;
594            let reader = log.read().await;
595            reader.diff_events(None).await?
596        };
597
598        let device = {
599            let log = self.device_log().await?;
600            let reader = log.read().await;
601            reader.diff_events(None).await?
602        };
603
604        #[cfg(feature = "files")]
605        let files = {
606            let log = self.file_log().await?;
607            let reader = log.read().await;
608            reader.diff_events(None).await?
609        };
610
611        let mut folders = HashMap::new();
612        let details = self.folder_details().await?;
613
614        for folder in details {
615            if folder.flags().is_sync_disabled() {
616                tracing::debug!(
617                    folder_id = %folder.id(),
618                    "change_set::ignore::no_sync_flag");
619                continue;
620            }
621            let event_log = self.folder_log(folder.id()).await?;
622            let log_file = event_log.read().await;
623            folders.insert(*folder.id(), log_file.diff_events(None).await?);
624        }
625
626        Ok(CreateSet {
627            identity,
628            account,
629            folders,
630            device,
631            #[cfg(feature = "files")]
632            files,
633        })
634    }
635}