1use crate::UpdateSet;
3use crate::{
4 CreateSet, MaybeDiff, MergeOutcome, SyncCompare, SyncDiff, SyncStatus,
5 TrackedChanges,
6};
7use async_trait::async_trait;
8use indexmap::IndexMap;
9use indexmap::IndexSet;
10use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
11use sos_core::events::WriteEvent;
12use sos_core::{
13 commit::{CommitState, CommitTree, Comparison},
14 events::{
15 patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
16 EventLog,
17 },
18 VaultId,
19};
20use sos_vault::Summary;
21use std::{
22 collections::{HashMap, HashSet},
23 sync::Arc,
24};
25use tokio::sync::RwLock;
26
27#[cfg(feature = "files")]
28use {
29 sos_backend::FileEventLog,
30 sos_core::{events::patch::FileDiff, ExternalFile},
31};
32
33#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
35#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
36pub trait StorageEventLogs: Send + Sync + 'static {
37 type Error: std::error::Error
39 + std::fmt::Debug
40 + From<sos_core::Error>
41 + From<sos_backend::Error>
42 + From<crate::Error>
43 + Send
44 + Sync
45 + 'static;
46
47 async fn identity_log(
49 &self,
50 ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
51
52 async fn account_log(
54 &self,
55 ) -> Result<Arc<RwLock<AccountEventLog>>, Self::Error>;
56
57 async fn device_log(
59 &self,
60 ) -> Result<Arc<RwLock<DeviceEventLog>>, Self::Error>;
61
62 #[cfg(feature = "files")]
64 async fn file_log(
65 &self,
66 ) -> Result<Arc<RwLock<FileEventLog>>, Self::Error>;
67
68 #[cfg(feature = "files")]
70 async fn canonical_files(
71 &self,
72 ) -> Result<IndexSet<ExternalFile>, Self::Error> {
73 use sos_reducers::FileReducer;
74 let files = self.file_log().await?;
75 let event_log = files.read().await;
76
77 let reducer = FileReducer::new(&*event_log);
79 Ok(reducer.reduce(None).await?)
80 }
81
82 async fn folder_details(&self) -> Result<IndexSet<Summary>, Self::Error>;
86
87 async fn folder_log(
89 &self,
90 id: &VaultId,
91 ) -> Result<Arc<RwLock<FolderEventLog>>, Self::Error>;
92
93 }
132
133#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
135#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136pub trait Merge: StorageEventLogs {
137 async fn merge_identity(
139 &mut self,
140 diff: FolderDiff,
141 outcome: &mut MergeOutcome,
142 ) -> std::result::Result<CheckedPatch, Self::Error>;
143
144 async fn compare_identity(
146 &self,
147 state: &CommitState,
148 ) -> std::result::Result<Comparison, Self::Error> {
149 let log = self.identity_log().await?;
150 let event_log = log.read().await;
151 Ok(event_log.tree().compare(&state.1)?)
152 }
153
154 async fn merge_account(
156 &mut self,
157 diff: AccountDiff,
158 outcome: &mut MergeOutcome,
159 ) -> std::result::Result<(CheckedPatch, HashSet<VaultId>), Self::Error>;
160
161 async fn compare_account(
163 &self,
164 state: &CommitState,
165 ) -> std::result::Result<Comparison, Self::Error> {
166 let log = self.account_log().await?;
167 let event_log = log.read().await;
168 Ok(event_log.tree().compare(&state.1)?)
169 }
170
171 async fn merge_device(
173 &mut self,
174 diff: DeviceDiff,
175 outcome: &mut MergeOutcome,
176 ) -> std::result::Result<CheckedPatch, Self::Error>;
177
178 async fn compare_device(
180 &self,
181 state: &CommitState,
182 ) -> std::result::Result<Comparison, Self::Error> {
183 let log = self.device_log().await?;
184 let event_log = log.read().await;
185 Ok(event_log.tree().compare(&state.1)?)
186 }
187
188 #[cfg(feature = "files")]
190 async fn merge_files(
191 &mut self,
192 diff: FileDiff,
193 outcome: &mut MergeOutcome,
194 ) -> std::result::Result<CheckedPatch, Self::Error>;
195
196 #[cfg(feature = "files")]
198 async fn compare_files(
199 &self,
200 state: &CommitState,
201 ) -> std::result::Result<Comparison, Self::Error> {
202 let log = self.file_log().await?;
203 let event_log = log.read().await;
204 Ok(event_log.tree().compare(&state.1)?)
205 }
206
207 async fn merge_folder(
209 &mut self,
210 folder_id: &VaultId,
211 diff: FolderDiff,
212 outcome: &mut MergeOutcome,
213 ) -> std::result::Result<(CheckedPatch, Vec<WriteEvent>), Self::Error>;
214
215 async fn compare_folder(
217 &self,
218 folder_id: &VaultId,
219 state: &CommitState,
220 ) -> std::result::Result<Comparison, Self::Error> {
221 let event_log = self.folder_log(folder_id).await?;
222 let reader = event_log.read().await;
223 Ok(reader.tree().compare(&state.1)?)
224 }
225
226 async fn compare(
228 &mut self,
229 remote_status: &SyncStatus,
230 ) -> std::result::Result<SyncCompare, Self::Error> {
231 let mut compare = SyncCompare::default();
232
233 compare.identity =
234 Some(self.compare_identity(&remote_status.identity).await?);
235
236 compare.account =
237 Some(self.compare_account(&remote_status.account).await?);
238
239 compare.device =
240 Some(self.compare_device(&remote_status.device).await?);
241
242 #[cfg(feature = "files")]
243 if let Some(files) = &remote_status.files {
244 compare.files = Some(self.compare_files(files).await?);
245 }
246
247 for (id, folder_status) in &remote_status.folders {
248 compare
249 .folders
250 .insert(*id, self.compare_folder(id, folder_status).await?);
251 }
252
253 Ok(compare)
254 }
255
256 async fn merge(
258 &mut self,
259 diff: SyncDiff,
260 outcome: &mut MergeOutcome,
261 ) -> std::result::Result<SyncCompare, Self::Error> {
262 let mut compare = SyncCompare::default();
263
264 match diff.identity {
265 Some(MaybeDiff::Diff(diff)) => {
266 self.merge_identity(diff, outcome).await?;
267 }
268 Some(MaybeDiff::Compare(state)) => {
269 if let Some(state) = state {
270 compare.identity =
271 Some(self.compare_identity(&state).await?);
272 }
273 }
274 None => {}
275 }
276
277 let mut deleted_folders = HashSet::new();
278
279 match diff.account {
280 Some(MaybeDiff::Diff(diff)) => {
281 let (_, deletions) =
282 self.merge_account(diff, outcome).await?;
283 deleted_folders = deletions;
284 }
285 Some(MaybeDiff::Compare(state)) => {
286 if let Some(state) = state {
287 compare.account =
288 Some(self.compare_account(&state).await?);
289 }
290 }
291 None => {}
292 }
293
294 match diff.device {
295 Some(MaybeDiff::Diff(diff)) => {
296 self.merge_device(diff, outcome).await?;
297 }
298 Some(MaybeDiff::Compare(state)) => {
299 if let Some(state) = state {
300 compare.device = Some(self.compare_device(&state).await?);
301 }
302 }
303 None => {}
304 }
305
306 #[cfg(feature = "files")]
307 match diff.files {
308 Some(MaybeDiff::Diff(diff)) => {
309 self.merge_files(diff, outcome).await?;
310 }
311 Some(MaybeDiff::Compare(state)) => {
312 if let Some(state) = state {
313 compare.files = Some(self.compare_files(&state).await?);
314 }
315 }
316 None => {}
317 }
318
319 for (id, maybe_diff) in diff.folders {
320 if deleted_folders.contains(&id) {
323 tracing::debug!(
324 folder_id = %id,
325 "merge::ignore_deleted_folder");
326 continue;
327 }
328 match maybe_diff {
329 MaybeDiff::Diff(diff) => {
330 self.merge_folder(&id, diff, outcome).await?;
331 }
332 MaybeDiff::Compare(state) => {
333 if let Some(state) = state {
334 compare.folders.insert(
335 id,
336 self.compare_folder(&id, &state).await?,
337 );
338 }
339 }
340 }
341 }
342
343 tracing::debug!(num_changes = %outcome.changes, "merge complete");
344
345 Ok(compare)
346 }
347}
348
349#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
357#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
358pub trait ForceMerge: Merge {
359 async fn force_merge_update(
370 &mut self,
371 mut update_set: UpdateSet,
372 outcome: &mut MergeOutcome,
373 ) -> std::result::Result<(), Self::Error> {
374 if let Some(diff) = update_set.identity.take() {
375 self.force_merge_identity(diff, outcome).await?;
376 }
377
378 if let Some(diff) = update_set.account.take() {
379 self.force_merge_account(diff, outcome).await?;
380 }
381
382 if let Some(diff) = update_set.device.take() {
383 self.force_merge_device(diff, outcome).await?;
384 }
385
386 #[cfg(feature = "files")]
387 if let Some(diff) = update_set.files.take() {
388 self.force_merge_files(diff, outcome).await?;
389 }
390
391 for (id, folder) in update_set.folders {
392 self.force_merge_folder(&id, folder, outcome).await?;
393 }
394 Ok(())
395 }
396
397 async fn force_merge_identity(
399 &mut self,
400 source: FolderDiff,
401 outcome: &mut MergeOutcome,
402 ) -> std::result::Result<(), Self::Error>;
403
404 async fn force_merge_account(
406 &mut self,
407 diff: AccountDiff,
408 outcome: &mut MergeOutcome,
409 ) -> std::result::Result<(), Self::Error> {
410 let len = diff.patch.len() as u64;
411
412 tracing::debug!(
413 checkpoint = ?diff.checkpoint,
414 num_events = len,
415 "force_merge::account",
416 );
417
418 let event_log = self.account_log().await?;
419 let mut event_log = event_log.write().await;
420 event_log.replace_all_events(&diff).await?;
421
422 outcome.changes += len;
423 outcome.tracked.account =
424 TrackedChanges::new_account_records(&diff.patch).await?;
425
426 Ok(())
427 }
428
429 async fn force_merge_device(
431 &mut self,
432 diff: DeviceDiff,
433 outcome: &mut MergeOutcome,
434 ) -> std::result::Result<(), Self::Error> {
435 let len = diff.patch.len() as u64;
436
437 tracing::debug!(
438 checkpoint = ?diff.checkpoint,
439 num_events = len,
440 "force_merge::device",
441 );
442
443 let event_log = self.device_log().await?;
444 let mut event_log = event_log.write().await;
445 event_log.replace_all_events(&diff).await?;
446
447 outcome.changes += len;
448 outcome.tracked.device =
449 TrackedChanges::new_device_records(&diff.patch).await?;
450
451 Ok(())
452 }
453
454 #[cfg(feature = "files")]
456 async fn force_merge_files(
457 &mut self,
458 diff: FileDiff,
459 outcome: &mut MergeOutcome,
460 ) -> std::result::Result<(), Self::Error> {
461 let len = diff.patch.len() as u64;
462
463 tracing::debug!(
464 checkpoint = ?diff.checkpoint,
465 num_events = len,
466 "force_merge::files",
467 );
468
469 let event_log = self.file_log().await?;
470 let mut event_log = event_log.write().await;
471 event_log.replace_all_events(&diff).await?;
472
473 outcome.changes += len;
474 outcome.tracked.files =
475 TrackedChanges::new_file_records(&diff.patch).await?;
476
477 Ok(())
478 }
479
480 async fn force_merge_folder(
482 &mut self,
483 folder_id: &VaultId,
484 source: FolderDiff,
485 outcome: &mut MergeOutcome,
486 ) -> std::result::Result<(), Self::Error>;
487}
488
489#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
491#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
492pub trait SyncStorage: ForceMerge {
493 fn is_client_storage(&self) -> bool;
495
496 async fn sync_status(
498 &self,
499 ) -> std::result::Result<SyncStatus, Self::Error> {
500 let summaries = self.folder_details().await?;
507
508 let identity = {
509 let event_log = self.identity_log().await?;
510 let reader = event_log.read().await;
511 reader.tree().commit_state()?
512 };
513
514 let account = {
515 let event_log = self.account_log().await?;
516 let reader = event_log.read().await;
517 reader.tree().commit_state()?
518 };
519
520 let device = {
521 let event_log = self.device_log().await?;
522 let reader = event_log.read().await;
523 reader.tree().commit_state()?
524 };
525
526 #[cfg(feature = "files")]
527 let files = {
528 let event_log = self.file_log().await?;
529 let reader = event_log.read().await;
530 if reader.tree().is_empty() {
531 None
532 } else {
533 Some(reader.tree().commit_state()?)
534 }
535 };
536
537 let mut folders = IndexMap::new();
538 let mut folder_roots: Vec<(&VaultId, [u8; 32])> = Vec::new();
539 for summary in &summaries {
540 let event_log = self.folder_log(summary.id()).await?;
541 let reader = event_log.read().await;
542 let commit_state = reader.tree().commit_state()?;
543 folder_roots.push((summary.id(), commit_state.1.root().into()));
544 folders.insert(*summary.id(), commit_state);
545 }
546
547 let mut root_tree = CommitTree::new();
549 let mut root_commits = vec![
550 identity.1.root().into(),
551 account.1.root().into(),
552 device.1.root().into(),
553 ];
554 #[cfg(feature = "files")]
555 if let Some(files) = &files {
556 root_commits.push(files.1.root().into());
557 }
558
559 folder_roots.sort_by(|a, b| a.0.cmp(b.0));
560 let mut folder_roots =
561 folder_roots.into_iter().map(|f| f.1).collect::<Vec<_>>();
562 root_commits.append(&mut folder_roots);
563 root_tree.append(&mut root_commits);
564 root_tree.commit();
565
566 let root = root_tree.root().ok_or(sos_core::Error::NoRootCommit)?;
567
568 Ok(SyncStatus {
569 root,
570 identity,
571 account,
572 device,
573 #[cfg(feature = "files")]
574 files,
575 folders,
576 })
577 }
578
579 async fn change_set(
584 &self,
585 ) -> std::result::Result<CreateSet, Self::Error> {
586 let identity = {
587 let log = self.identity_log().await?;
588 let reader = log.read().await;
589 reader.diff_events(None).await?
590 };
591
592 let account = {
593 let log = self.account_log().await?;
594 let reader = log.read().await;
595 reader.diff_events(None).await?
596 };
597
598 let device = {
599 let log = self.device_log().await?;
600 let reader = log.read().await;
601 reader.diff_events(None).await?
602 };
603
604 #[cfg(feature = "files")]
605 let files = {
606 let log = self.file_log().await?;
607 let reader = log.read().await;
608 reader.diff_events(None).await?
609 };
610
611 let mut folders = HashMap::new();
612 let details = self.folder_details().await?;
613
614 for folder in details {
615 if folder.flags().is_sync_disabled() {
616 tracing::debug!(
617 folder_id = %folder.id(),
618 "change_set::ignore::no_sync_flag");
619 continue;
620 }
621 let event_log = self.folder_log(folder.id()).await?;
622 let log_file = event_log.read().await;
623 folders.insert(*folder.id(), log_file.diff_events(None).await?);
624 }
625
626 Ok(CreateSet {
627 identity,
628 account,
629 folders,
630 device,
631 #[cfg(feature = "files")]
632 files,
633 })
634 }
635}