sos_net/account/
sync.rs

1//! Implements syncing for a network account.
2//!
3//! Delegates to the inner local account for merge and
4//! sync status operations.
5use crate::{NetworkAccount, Result};
6use async_trait::async_trait;
7use indexmap::IndexSet;
8use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
9use sos_core::events::WriteEvent;
10use sos_core::{
11    commit::{CommitState, Comparison},
12    events::patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
13    Origin, VaultId,
14};
15use sos_protocol::{
16    AccountSync, RemoteSync, SyncClient, SyncOptions, SyncResult,
17};
18use sos_sync::{
19    ForceMerge, Merge, MergeOutcome, StorageEventLogs, SyncStatus,
20    SyncStorage, UpdateSet,
21};
22use sos_vault::Summary;
23use std::{
24    collections::{HashMap, HashSet},
25    sync::Arc,
26};
27use tokio::sync::RwLock;
28
29#[cfg(feature = "files")]
30use {
31    sos_backend::FileEventLog,
32    sos_core::events::patch::FileDiff,
33    sos_protocol::transfer::{FileSet, FileSyncClient, FileTransfersSet},
34};
35
36/// Server status for all remote origins.
37pub type ServerStatus =
38    HashMap<Origin, std::result::Result<SyncStatus, sos_protocol::Error>>;
39
40/// Transfer status for all remote origins.
41#[cfg(feature = "files")]
42pub type TransferStatus = HashMap<
43    Origin,
44    std::result::Result<FileTransfersSet, sos_protocol::Error>,
45>;
46
47impl NetworkAccount {
48    /// Sync status for remote servers.
49    pub async fn server_status(&self, options: &SyncOptions) -> ServerStatus {
50        if self.offline {
51            tracing::warn!("offline mode active, ignoring server status");
52            return Default::default();
53        }
54
55        let remotes = self.remotes.read().await;
56        let mut server_status = HashMap::new();
57        for (origin, remote) in &*remotes {
58            let sync_remote = options.origins.is_empty()
59                || options.origins.contains(origin);
60
61            if sync_remote {
62                match remote.client.sync_status().await {
63                    Ok(status) => {
64                        server_status.insert(origin.clone(), Ok(status));
65                    }
66                    Err(e) => {
67                        server_status.insert(origin.clone(), Err(e));
68                    }
69                }
70            }
71        }
72        server_status
73    }
74
75    /// Transfer status for remote servers.
76    #[cfg(feature = "files")]
77    pub async fn transfer_status(
78        &self,
79        options: &SyncOptions,
80    ) -> Result<TransferStatus> {
81        let external_files = self.canonical_files().await?;
82        let local_files = FileSet(external_files);
83
84        let remotes = self.remotes.read().await;
85        let mut transfer_status = HashMap::new();
86        for (origin, remote) in &*remotes {
87            let sync_remote = options.origins.is_empty()
88                || options.origins.contains(origin);
89
90            if sync_remote {
91                match remote.client.compare_files(local_files.clone()).await {
92                    Ok(status) => {
93                        transfer_status.insert(origin.clone(), Ok(status));
94                    }
95                    Err(e) => {
96                        transfer_status.insert(origin.clone(), Err(e));
97                    }
98                }
99            }
100        }
101        Ok(transfer_status)
102    }
103}
104
105#[async_trait]
106impl AccountSync for NetworkAccount {
107    type Error = crate::Error;
108
109    async fn sync(&self) -> SyncResult<Self::Error> {
110        self.sync_with_options(&Default::default()).await
111    }
112
113    async fn sync_with_options(
114        &self,
115        options: &SyncOptions,
116    ) -> SyncResult<Self::Error> {
117        let mut result = SyncResult::default();
118        if self.offline {
119            tracing::warn!("offline mode active, ignoring sync");
120            return result;
121        }
122
123        let _ = self.sync_lock.lock().await;
124        let remotes = self.remotes.read().await;
125
126        for (origin, remote) in &*remotes {
127            let sync_remote = options.origins.is_empty()
128                || options.origins.contains(origin);
129
130            if !sync_remote {
131                tracing::warn!(origin = %origin, "skip_sync::sync_with_options");
132                continue;
133            }
134
135            let remote_result = remote.sync_with_options(options).await;
136            result.remotes.push(remote_result);
137        }
138        result
139    }
140
141    #[cfg(feature = "files")]
142    async fn sync_file_transfers(
143        &self,
144        options: &SyncOptions,
145    ) -> SyncResult<Self::Error> {
146        let mut result = SyncResult::default();
147        if self.offline {
148            tracing::warn!(
149                "offline mode active, ignoring sync file transfers"
150            );
151            return result;
152        }
153
154        let _ = self.sync_lock.lock().await;
155        let remotes = self.remotes.read().await;
156
157        for (origin, remote) in &*remotes {
158            let sync_remote = options.origins.is_empty()
159                || options.origins.contains(origin);
160
161            if !sync_remote {
162                tracing::warn!(origin = %origin, "skip_sync::sync_file_transfers");
163                continue;
164            }
165
166            let remote_result = remote.sync_file_transfers().await;
167            result.remotes.push(remote_result);
168        }
169        result
170    }
171
172    async fn force_update(
173        &self,
174        account_data: UpdateSet,
175        options: &SyncOptions,
176    ) -> SyncResult<Self::Error> {
177        let mut result = SyncResult::default();
178        if self.offline {
179            tracing::warn!("offline mode active, ignoring force update");
180            return result;
181        }
182
183        let _ = self.sync_lock.lock().await;
184        let remotes = self.remotes.read().await;
185
186        for (origin, remote) in &*remotes {
187            let sync_remote = options.origins.is_empty()
188                || options.origins.contains(origin);
189
190            if !sync_remote {
191                tracing::warn!(origin = %origin, "skip_sync::force_update");
192                continue;
193            }
194
195            let remote_result =
196                remote.force_update(account_data.clone()).await;
197            result.remotes.push(remote_result);
198        }
199        result
200    }
201}
202
203#[async_trait]
204impl StorageEventLogs for NetworkAccount {
205    type Error = crate::Error;
206
207    async fn identity_log(&self) -> Result<Arc<RwLock<FolderEventLog>>> {
208        let account = self.account.lock().await;
209        Ok(account.identity_log().await?)
210    }
211
212    async fn account_log(&self) -> Result<Arc<RwLock<AccountEventLog>>> {
213        let account = self.account.lock().await;
214        Ok(account.account_log().await?)
215    }
216
217    async fn device_log(&self) -> Result<Arc<RwLock<DeviceEventLog>>> {
218        let account = self.account.lock().await;
219        Ok(account.device_log().await?)
220    }
221
222    #[cfg(feature = "files")]
223    async fn file_log(&self) -> Result<Arc<RwLock<FileEventLog>>> {
224        let account = self.account.lock().await;
225        Ok(account.file_log().await?)
226    }
227
228    async fn folder_details(&self) -> Result<IndexSet<Summary>> {
229        let account = self.account.lock().await;
230        Ok(account.folder_details().await?)
231    }
232
233    async fn folder_log(
234        &self,
235        id: &VaultId,
236    ) -> Result<Arc<RwLock<FolderEventLog>>> {
237        let account = self.account.lock().await;
238        Ok(account.folder_log(id).await?)
239    }
240}
241
242#[async_trait]
243impl Merge for NetworkAccount {
244    async fn merge_identity(
245        &mut self,
246        diff: FolderDiff,
247        outcome: &mut MergeOutcome,
248    ) -> Result<CheckedPatch> {
249        let mut account = self.account.lock().await;
250        Ok(account.merge_identity(diff, outcome).await?)
251    }
252
253    async fn compare_identity(
254        &self,
255        state: &CommitState,
256    ) -> Result<Comparison> {
257        let account = self.account.lock().await;
258        Ok(account.compare_identity(state).await?)
259    }
260
261    async fn merge_account(
262        &mut self,
263        diff: AccountDiff,
264        outcome: &mut MergeOutcome,
265    ) -> Result<(CheckedPatch, HashSet<VaultId>)> {
266        let mut account = self.account.lock().await;
267        Ok(account.merge_account(diff, outcome).await?)
268    }
269
270    async fn compare_account(
271        &self,
272        state: &CommitState,
273    ) -> Result<Comparison> {
274        let account = self.account.lock().await;
275        Ok(account.compare_account(state).await?)
276    }
277
278    async fn merge_device(
279        &mut self,
280        diff: DeviceDiff,
281        outcome: &mut MergeOutcome,
282    ) -> Result<CheckedPatch> {
283        let mut account = self.account.lock().await;
284        Ok(account.merge_device(diff, outcome).await?)
285    }
286
287    async fn compare_device(
288        &self,
289        state: &CommitState,
290    ) -> Result<Comparison> {
291        let account = self.account.lock().await;
292        Ok(account.compare_device(state).await?)
293    }
294
295    #[cfg(feature = "files")]
296    async fn merge_files(
297        &mut self,
298        diff: FileDiff,
299        outcome: &mut MergeOutcome,
300    ) -> Result<CheckedPatch> {
301        let mut account = self.account.lock().await;
302        Ok(account.merge_files(diff, outcome).await?)
303    }
304
305    #[cfg(feature = "files")]
306    async fn compare_files(&self, state: &CommitState) -> Result<Comparison> {
307        let account = self.account.lock().await;
308        Ok(account.compare_files(state).await?)
309    }
310
311    async fn merge_folder(
312        &mut self,
313        folder_id: &VaultId,
314        diff: FolderDiff,
315        outcome: &mut MergeOutcome,
316    ) -> Result<(CheckedPatch, Vec<WriteEvent>)> {
317        let mut account = self.account.lock().await;
318        Ok(account.merge_folder(folder_id, diff, outcome).await?)
319    }
320
321    async fn compare_folder(
322        &self,
323        folder_id: &VaultId,
324        state: &CommitState,
325    ) -> Result<Comparison> {
326        let account = self.account.lock().await;
327        Ok(account.compare_folder(folder_id, state).await?)
328    }
329}
330
331#[async_trait]
332impl ForceMerge for NetworkAccount {
333    async fn force_merge_identity(
334        &mut self,
335        diff: FolderDiff,
336        outcome: &mut MergeOutcome,
337    ) -> Result<()> {
338        let mut account = self.account.lock().await;
339        Ok(account.force_merge_identity(diff, outcome).await?)
340    }
341
342    async fn force_merge_account(
343        &mut self,
344        diff: AccountDiff,
345        outcome: &mut MergeOutcome,
346    ) -> Result<()> {
347        let mut account = self.account.lock().await;
348        Ok(account.force_merge_account(diff, outcome).await?)
349    }
350
351    async fn force_merge_device(
352        &mut self,
353        diff: DeviceDiff,
354        outcome: &mut MergeOutcome,
355    ) -> Result<()> {
356        let mut account = self.account.lock().await;
357        Ok(account.force_merge_device(diff, outcome).await?)
358    }
359
360    /// Force merge changes to the files event log.
361    #[cfg(feature = "files")]
362    async fn force_merge_files(
363        &mut self,
364        diff: FileDiff,
365        outcome: &mut MergeOutcome,
366    ) -> Result<()> {
367        let mut account = self.account.lock().await;
368        Ok(account.force_merge_files(diff, outcome).await?)
369    }
370
371    async fn force_merge_folder(
372        &mut self,
373        folder_id: &VaultId,
374        diff: FolderDiff,
375        outcome: &mut MergeOutcome,
376    ) -> Result<()> {
377        let mut account = self.account.lock().await;
378        Ok(account.force_merge_folder(folder_id, diff, outcome).await?)
379    }
380}
381
382#[async_trait]
383impl SyncStorage for NetworkAccount {
384    fn is_client_storage(&self) -> bool {
385        true
386    }
387
388    async fn sync_status(&self) -> Result<SyncStatus> {
389        let account = self.account.lock().await;
390        Ok(account.sync_status().await?)
391    }
392}