1use crate::{
3 CreateSet, DebugEventLogs, DebugEvents, DebugTree, MaybeDiff,
4 MergeOutcome, SyncCompare, SyncDiff, SyncStatus, TrackedChanges,
5 UpdateSet,
6};
7use async_trait::async_trait;
8use indexmap::{IndexMap, IndexSet};
9use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
10use sos_core::commit::CommitHash;
11use sos_core::events::WriteEvent;
12use sos_core::AccountId;
13use sos_core::{
14 commit::{CommitState, CommitTree, Comparison},
15 events::{
16 patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
17 EventLog,
18 },
19 VaultId,
20};
21use sos_vault::Summary;
22use std::{
23 collections::{HashMap, HashSet},
24 sync::Arc,
25};
26use tokio::sync::RwLock;
27
28#[cfg(feature = "files")]
29use {
30 sos_backend::FileEventLog,
31 sos_core::{events::patch::FileDiff, ExternalFile},
32};
33
34macro_rules! debug_tree_events {
35 ($event_log:expr) => {{
36 let root = $event_log.tree().root();
37 let leaves: Vec<_> = $event_log
38 .tree()
39 .leaves()
40 .unwrap_or_default()
41 .into_iter()
42 .map(CommitHash)
43 .collect();
44 DebugEvents {
45 root,
46 length: leaves.len(),
47 leaves,
48 }
49 }};
50}
51
52#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
54#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
55pub trait StorageEventLogs: Send + Sync + 'static {
56 type Error: std::error::Error
58 + std::fmt::Debug
59 + From<sos_core::Error>
60 + From<sos_backend::Error>
61 + From<crate::Error>
62 + Send
63 + Sync
64 + 'static;
65
66 async fn identity_log(
68 &self,
69 ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
70
71 async fn account_log(
73 &self,
74 ) -> Result<Arc<RwLock<AccountEventLog>>, Self::Error>;
75
76 async fn device_log(
78 &self,
79 ) -> Result<Arc<RwLock<DeviceEventLog>>, Self::Error>;
80
81 #[cfg(feature = "files")]
83 async fn file_log(
84 &self,
85 ) -> Result<Arc<RwLock<FileEventLog>>, Self::Error>;
86
87 #[cfg(feature = "files")]
89 async fn canonical_files(
90 &self,
91 ) -> Result<IndexSet<ExternalFile>, Self::Error> {
92 use sos_reducers::FileReducer;
93 let files = self.file_log().await?;
94 let event_log = files.read().await;
95
96 let reducer = FileReducer::new(&*event_log);
98 Ok(reducer.reduce(None).await?)
99 }
100
101 async fn folder_details(&self) -> Result<IndexSet<Summary>, Self::Error>;
105
106 async fn folder_log(
108 &self,
109 id: &VaultId,
110 ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
111}
112
113#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
115#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
116pub trait Merge: StorageEventLogs {
117 async fn merge_identity(
119 &mut self,
120 diff: FolderDiff,
121 outcome: &mut MergeOutcome,
122 ) -> std::result::Result<CheckedPatch, Self::Error>;
123
124 async fn compare_identity(
126 &self,
127 state: &CommitState,
128 ) -> std::result::Result<Comparison, Self::Error> {
129 let log = self.identity_log().await?;
130 let event_log = log.read().await;
131 Ok(event_log.tree().compare(&state.1)?)
132 }
133
134 async fn merge_account(
136 &mut self,
137 diff: AccountDiff,
138 outcome: &mut MergeOutcome,
139 ) -> std::result::Result<(CheckedPatch, HashSet<VaultId>), Self::Error>;
140
141 async fn compare_account(
143 &self,
144 state: &CommitState,
145 ) -> std::result::Result<Comparison, Self::Error> {
146 let log = self.account_log().await?;
147 let event_log = log.read().await;
148 Ok(event_log.tree().compare(&state.1)?)
149 }
150
151 async fn merge_device(
153 &mut self,
154 diff: DeviceDiff,
155 outcome: &mut MergeOutcome,
156 ) -> std::result::Result<CheckedPatch, Self::Error>;
157
158 async fn compare_device(
160 &self,
161 state: &CommitState,
162 ) -> std::result::Result<Comparison, Self::Error> {
163 let log = self.device_log().await?;
164 let event_log = log.read().await;
165 Ok(event_log.tree().compare(&state.1)?)
166 }
167
168 #[cfg(feature = "files")]
170 async fn merge_files(
171 &mut self,
172 diff: FileDiff,
173 outcome: &mut MergeOutcome,
174 ) -> std::result::Result<CheckedPatch, Self::Error>;
175
176 #[cfg(feature = "files")]
178 async fn compare_files(
179 &self,
180 state: &CommitState,
181 ) -> std::result::Result<Comparison, Self::Error> {
182 let log = self.file_log().await?;
183 let event_log = log.read().await;
184 Ok(event_log.tree().compare(&state.1)?)
185 }
186
187 async fn merge_folder(
189 &mut self,
190 folder_id: &VaultId,
191 diff: FolderDiff,
192 outcome: &mut MergeOutcome,
193 ) -> std::result::Result<(CheckedPatch, Vec<WriteEvent>), Self::Error>;
194
195 async fn compare_folder(
197 &self,
198 folder_id: &VaultId,
199 state: &CommitState,
200 ) -> std::result::Result<Comparison, Self::Error> {
201 let event_log = self.folder_log(folder_id).await?;
202 let reader = event_log.read().await;
203 Ok(reader.tree().compare(&state.1)?)
204 }
205
206 async fn compare(
208 &mut self,
209 remote_status: &SyncStatus,
210 ) -> std::result::Result<SyncCompare, Self::Error> {
211 let mut compare = SyncCompare::default();
212
213 compare.identity =
214 Some(self.compare_identity(&remote_status.identity).await?);
215
216 compare.account =
217 Some(self.compare_account(&remote_status.account).await?);
218
219 compare.device =
220 Some(self.compare_device(&remote_status.device).await?);
221
222 #[cfg(feature = "files")]
223 if let Some(files) = &remote_status.files {
224 compare.files = Some(self.compare_files(files).await?);
225 }
226
227 for (id, folder_status) in &remote_status.folders {
228 compare
229 .folders
230 .insert(*id, self.compare_folder(id, folder_status).await?);
231 }
232
233 Ok(compare)
234 }
235
236 async fn merge(
238 &mut self,
239 diff: SyncDiff,
240 outcome: &mut MergeOutcome,
241 ) -> std::result::Result<SyncCompare, Self::Error> {
242 let mut compare = SyncCompare::default();
243
244 match diff.identity {
245 Some(MaybeDiff::Diff(diff)) => {
246 self.merge_identity(diff, outcome).await?;
247 }
248 Some(MaybeDiff::Compare(state)) => {
249 if let Some(state) = state {
250 compare.identity =
251 Some(self.compare_identity(&state).await?);
252 }
253 }
254 None => {}
255 }
256
257 let mut deleted_folders = HashSet::new();
258
259 match diff.account {
260 Some(MaybeDiff::Diff(diff)) => {
261 let (_, deletions) =
262 self.merge_account(diff, outcome).await?;
263 deleted_folders = deletions;
264 }
265 Some(MaybeDiff::Compare(state)) => {
266 if let Some(state) = state {
267 compare.account =
268 Some(self.compare_account(&state).await?);
269 }
270 }
271 None => {}
272 }
273
274 match diff.device {
275 Some(MaybeDiff::Diff(diff)) => {
276 self.merge_device(diff, outcome).await?;
277 }
278 Some(MaybeDiff::Compare(state)) => {
279 if let Some(state) = state {
280 compare.device = Some(self.compare_device(&state).await?);
281 }
282 }
283 None => {}
284 }
285
286 #[cfg(feature = "files")]
287 match diff.files {
288 Some(MaybeDiff::Diff(diff)) => {
289 self.merge_files(diff, outcome).await?;
290 }
291 Some(MaybeDiff::Compare(state)) => {
292 if let Some(state) = state {
293 compare.files = Some(self.compare_files(&state).await?);
294 }
295 }
296 None => {}
297 }
298
299 for (id, maybe_diff) in diff.folders {
300 if deleted_folders.contains(&id) {
303 tracing::debug!(
304 folder_id = %id,
305 "merge::ignore_deleted_folder");
306 continue;
307 }
308 match maybe_diff {
309 MaybeDiff::Diff(diff) => {
310 self.merge_folder(&id, diff, outcome).await?;
311 }
312 MaybeDiff::Compare(state) => {
313 if let Some(state) = state {
314 compare.folders.insert(
315 id,
316 self.compare_folder(&id, &state).await?,
317 );
318 }
319 }
320 }
321 }
322
323 tracing::debug!(num_changes = %outcome.changes, "merge complete");
324
325 Ok(compare)
326 }
327}
328
329#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
337#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
338pub trait ForceMerge: Merge {
339 async fn force_merge_update(
350 &mut self,
351 mut update_set: UpdateSet,
352 outcome: &mut MergeOutcome,
353 ) -> std::result::Result<(), Self::Error> {
354 if let Some(diff) = update_set.identity.take() {
355 self.force_merge_identity(diff, outcome).await?;
356 }
357
358 if let Some(diff) = update_set.account.take() {
359 self.force_merge_account(diff, outcome).await?;
360 }
361
362 if let Some(diff) = update_set.device.take() {
363 self.force_merge_device(diff, outcome).await?;
364 }
365
366 #[cfg(feature = "files")]
367 if let Some(diff) = update_set.files.take() {
368 self.force_merge_files(diff, outcome).await?;
369 }
370
371 for (id, folder) in update_set.folders {
372 self.force_merge_folder(&id, folder, outcome).await?;
373 }
374 Ok(())
375 }
376
377 async fn force_merge_identity(
379 &mut self,
380 source: FolderDiff,
381 outcome: &mut MergeOutcome,
382 ) -> std::result::Result<(), Self::Error>;
383
384 async fn force_merge_account(
386 &mut self,
387 diff: AccountDiff,
388 outcome: &mut MergeOutcome,
389 ) -> std::result::Result<(), Self::Error> {
390 let len = diff.patch.len() as u64;
391
392 tracing::debug!(
393 checkpoint = ?diff.checkpoint,
394 num_events = len,
395 "force_merge::account",
396 );
397
398 let event_log = self.account_log().await?;
399 let mut event_log = event_log.write().await;
400 event_log.replace_all_events(&diff).await?;
401
402 outcome.changes += len;
403 outcome.tracked.account =
404 TrackedChanges::new_account_records(&diff.patch).await?;
405
406 Ok(())
407 }
408
409 async fn force_merge_device(
411 &mut self,
412 diff: DeviceDiff,
413 outcome: &mut MergeOutcome,
414 ) -> std::result::Result<(), Self::Error> {
415 let len = diff.patch.len() as u64;
416
417 tracing::debug!(
418 checkpoint = ?diff.checkpoint,
419 num_events = len,
420 "force_merge::device",
421 );
422
423 let event_log = self.device_log().await?;
424 let mut event_log = event_log.write().await;
425 event_log.replace_all_events(&diff).await?;
426
427 outcome.changes += len;
428 outcome.tracked.device =
429 TrackedChanges::new_device_records(&diff.patch).await?;
430
431 Ok(())
432 }
433
434 #[cfg(feature = "files")]
436 async fn force_merge_files(
437 &mut self,
438 diff: FileDiff,
439 outcome: &mut MergeOutcome,
440 ) -> std::result::Result<(), Self::Error> {
441 let len = diff.patch.len() as u64;
442
443 tracing::debug!(
444 checkpoint = ?diff.checkpoint,
445 num_events = len,
446 "force_merge::files",
447 );
448
449 let event_log = self.file_log().await?;
450 let mut event_log = event_log.write().await;
451 event_log.replace_all_events(&diff).await?;
452
453 outcome.changes += len;
454 outcome.tracked.files =
455 TrackedChanges::new_file_records(&diff.patch).await?;
456
457 Ok(())
458 }
459
460 async fn force_merge_folder(
462 &mut self,
463 folder_id: &VaultId,
464 source: FolderDiff,
465 outcome: &mut MergeOutcome,
466 ) -> std::result::Result<(), Self::Error>;
467}
468
469#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
471#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
472pub trait SyncStorage: ForceMerge {
473 fn is_client_storage(&self) -> bool;
475
476 async fn sync_status(
478 &self,
479 ) -> std::result::Result<SyncStatus, Self::Error> {
480 let summaries = self.folder_details().await?;
487
488 let identity = {
489 let event_log = self.identity_log().await?;
490 let reader = event_log.read().await;
491 reader.tree().commit_state()?
492 };
493
494 let account = {
495 let event_log = self.account_log().await?;
496 let reader = event_log.read().await;
497 reader.tree().commit_state()?
498 };
499
500 let device = {
501 let event_log = self.device_log().await?;
502 let reader = event_log.read().await;
503 reader.tree().commit_state()?
504 };
505
506 #[cfg(feature = "files")]
507 let files = {
508 let event_log = self.file_log().await?;
509 let reader = event_log.read().await;
510 if reader.tree().is_empty() {
511 None
512 } else {
513 Some(reader.tree().commit_state()?)
514 }
515 };
516
517 let mut folders = IndexMap::new();
518 let mut folder_roots: Vec<(&VaultId, [u8; 32])> = Vec::new();
519 for summary in &summaries {
520 let event_log = self.folder_log(summary.id()).await?;
521 let reader = event_log.read().await;
522 let commit_state = reader.tree().commit_state()?;
523 folder_roots.push((summary.id(), commit_state.1.root().into()));
524 folders.insert(*summary.id(), commit_state);
525 }
526
527 let mut root_tree = CommitTree::new();
529 let mut root_commits = vec![
530 identity.1.root().into(),
531 account.1.root().into(),
532 device.1.root().into(),
533 ];
534 #[cfg(feature = "files")]
535 if let Some(files) = &files {
536 root_commits.push(files.1.root().into());
537 }
538
539 folder_roots.sort_by(|a, b| a.0.cmp(b.0));
540 let mut folder_roots =
541 folder_roots.into_iter().map(|f| f.1).collect::<Vec<_>>();
542 root_commits.append(&mut folder_roots);
543 root_tree.append(&mut root_commits);
544 root_tree.commit();
545
546 let root = root_tree.root().ok_or(sos_core::Error::NoRootCommit)?;
547
548 Ok(SyncStatus {
549 root,
550 identity,
551 account,
552 device,
553 #[cfg(feature = "files")]
554 files,
555 folders,
556 })
557 }
558
559 async fn create_set(
564 &self,
565 ) -> std::result::Result<CreateSet, Self::Error> {
566 let identity = {
567 let log = self.identity_log().await?;
568 let reader = log.read().await;
569 reader.diff_events(None).await?
570 };
571
572 let account = {
573 let log = self.account_log().await?;
574 let reader = log.read().await;
575 reader.diff_events(None).await?
576 };
577
578 let device = {
579 let log = self.device_log().await?;
580 let reader = log.read().await;
581 reader.diff_events(None).await?
582 };
583
584 #[cfg(feature = "files")]
585 let files = {
586 let log = self.file_log().await?;
587 let reader = log.read().await;
588 reader.diff_events(None).await?
589 };
590
591 let mut folders = HashMap::new();
592 let details = self.folder_details().await?;
593
594 for folder in details {
595 if folder.flags().is_sync_disabled() {
596 tracing::debug!(
597 folder_id = %folder.id(),
598 "create_set::ignore::no_sync_flag");
599 continue;
600 }
601 let event_log = self.folder_log(folder.id()).await?;
602 let log_file = event_log.read().await;
603 folders.insert(*folder.id(), log_file.diff_events(None).await?);
604 }
605
606 Ok(CreateSet {
607 identity,
608 account,
609 folders,
610 device,
611 #[cfg(feature = "files")]
612 files,
613 })
614 }
615
616 async fn debug_account_tree(
618 &self,
619 account_id: AccountId,
620 ) -> Result<DebugTree, Self::Error> {
621 let status = self.sync_status().await?;
622
623 let folder_list = {
625 let mut folder_list = self.folder_details().await?;
626 let mut redacted = IndexSet::new();
627 for mut folder in folder_list.drain(..) {
628 folder.set_name(String::new());
629 redacted.insert(folder);
630 }
631 redacted
632 };
633
634 let identity = {
635 let event_log = self.identity_log().await?;
636 let event_log = event_log.read().await;
637 debug_tree_events!(&*event_log)
638 };
639
640 let account = {
641 let event_log = self.account_log().await?;
642 let event_log = event_log.read().await;
643 debug_tree_events!(&*event_log)
644 };
645
646 let device = {
647 let event_log = self.device_log().await?;
648 let event_log = event_log.read().await;
649 debug_tree_events!(&*event_log)
650 };
651
652 #[cfg(feature = "files")]
653 let file = {
654 let event_log = self.file_log().await?;
655 let event_log = event_log.read().await;
656 debug_tree_events!(&*event_log)
657 };
658
659 let folders = {
660 let mut folders = HashMap::new();
661 for summary in &folder_list {
662 let event_log = self.folder_log(summary.id()).await?;
663 let event_log = event_log.read().await;
664 folders
665 .insert(*summary.id(), debug_tree_events!(&*event_log));
666 }
667 folders
668 };
669
670 Ok(DebugTree {
671 account_id,
672 status,
673 folders: folder_list,
674 events: DebugEventLogs {
675 identity,
676 account,
677 device,
678 #[cfg(feature = "files")]
679 file,
680 folders,
681 },
682 })
683 }
684}