1use crate::{NetworkAccount, Result};
6use async_trait::async_trait;
7use indexmap::IndexSet;
8use sos_backend::{AccountEventLog, DeviceEventLog, FolderEventLog};
9use sos_core::events::WriteEvent;
10use sos_core::{
11 commit::{CommitState, Comparison},
12 events::patch::{AccountDiff, CheckedPatch, DeviceDiff, FolderDiff},
13 Origin, VaultId,
14};
15use sos_protocol::{
16 AccountSync, RemoteSync, SyncClient, SyncOptions, SyncResult,
17};
18use sos_sync::{
19 ForceMerge, Merge, MergeOutcome, StorageEventLogs, SyncStatus,
20 SyncStorage, UpdateSet,
21};
22use sos_vault::Summary;
23use std::{
24 collections::{HashMap, HashSet},
25 sync::Arc,
26};
27use tokio::sync::RwLock;
28
29#[cfg(feature = "files")]
30use {
31 sos_backend::FileEventLog,
32 sos_core::events::patch::FileDiff,
33 sos_protocol::transfer::{FileSet, FileSyncClient, FileTransfersSet},
34};
35
36pub type ServerStatus =
38 HashMap<Origin, std::result::Result<SyncStatus, sos_protocol::Error>>;
39
40#[cfg(feature = "files")]
42pub type TransferStatus = HashMap<
43 Origin,
44 std::result::Result<FileTransfersSet, sos_protocol::Error>,
45>;
46
47impl NetworkAccount {
48 pub async fn server_status(&self, options: &SyncOptions) -> ServerStatus {
50 if self.offline {
51 tracing::warn!("offline mode active, ignoring server status");
52 return Default::default();
53 }
54
55 let remotes = self.remotes.read().await;
56 let mut server_status = HashMap::new();
57 for (origin, remote) in &*remotes {
58 let sync_remote = options.origins.is_empty()
59 || options.origins.contains(origin);
60
61 if sync_remote {
62 match remote.client.sync_status().await {
63 Ok(status) => {
64 server_status.insert(origin.clone(), Ok(status));
65 }
66 Err(e) => {
67 server_status.insert(origin.clone(), Err(e));
68 }
69 }
70 }
71 }
72 server_status
73 }
74
75 #[cfg(feature = "files")]
77 pub async fn transfer_status(
78 &self,
79 options: &SyncOptions,
80 ) -> Result<TransferStatus> {
81 let external_files = self.canonical_files().await?;
82 let local_files = FileSet(external_files);
83
84 let remotes = self.remotes.read().await;
85 let mut transfer_status = HashMap::new();
86 for (origin, remote) in &*remotes {
87 let sync_remote = options.origins.is_empty()
88 || options.origins.contains(origin);
89
90 if sync_remote {
91 match remote.client.compare_files(local_files.clone()).await {
92 Ok(status) => {
93 transfer_status.insert(origin.clone(), Ok(status));
94 }
95 Err(e) => {
96 transfer_status.insert(origin.clone(), Err(e));
97 }
98 }
99 }
100 }
101 Ok(transfer_status)
102 }
103}
104
105#[async_trait]
106impl AccountSync for NetworkAccount {
107 type Error = crate::Error;
108
109 async fn sync(&self) -> SyncResult<Self::Error> {
110 self.sync_with_options(&Default::default()).await
111 }
112
113 async fn sync_with_options(
114 &self,
115 options: &SyncOptions,
116 ) -> SyncResult<Self::Error> {
117 let mut result = SyncResult::default();
118 if self.offline {
119 tracing::warn!("offline mode active, ignoring sync");
120 return result;
121 }
122
123 let _ = self.sync_lock.lock().await;
124 let remotes = self.remotes.read().await;
125
126 for (origin, remote) in &*remotes {
127 let sync_remote = options.origins.is_empty()
128 || options.origins.contains(origin);
129
130 if !sync_remote {
131 tracing::warn!(origin = %origin, "skip_sync::sync_with_options");
132 continue;
133 }
134
135 let remote_result = remote.sync_with_options(options).await;
136 result.remotes.push(remote_result);
137 }
138 result
139 }
140
141 #[cfg(feature = "files")]
142 async fn sync_file_transfers(
143 &self,
144 options: &SyncOptions,
145 ) -> SyncResult<Self::Error> {
146 let mut result = SyncResult::default();
147 if self.offline {
148 tracing::warn!(
149 "offline mode active, ignoring sync file transfers"
150 );
151 return result;
152 }
153
154 let _ = self.sync_lock.lock().await;
155 let remotes = self.remotes.read().await;
156
157 for (origin, remote) in &*remotes {
158 let sync_remote = options.origins.is_empty()
159 || options.origins.contains(origin);
160
161 if !sync_remote {
162 tracing::warn!(origin = %origin, "skip_sync::sync_file_transfers");
163 continue;
164 }
165
166 let remote_result = remote.sync_file_transfers().await;
167 result.remotes.push(remote_result);
168 }
169 result
170 }
171
172 async fn force_update(
173 &self,
174 account_data: UpdateSet,
175 options: &SyncOptions,
176 ) -> SyncResult<Self::Error> {
177 let mut result = SyncResult::default();
178 if self.offline {
179 tracing::warn!("offline mode active, ignoring force update");
180 return result;
181 }
182
183 let _ = self.sync_lock.lock().await;
184 let remotes = self.remotes.read().await;
185
186 for (origin, remote) in &*remotes {
187 let sync_remote = options.origins.is_empty()
188 || options.origins.contains(origin);
189
190 if !sync_remote {
191 tracing::warn!(origin = %origin, "skip_sync::force_update");
192 continue;
193 }
194
195 let remote_result =
196 remote.force_update(account_data.clone()).await;
197 result.remotes.push(remote_result);
198 }
199 result
200 }
201}
202
203#[async_trait]
204impl StorageEventLogs for NetworkAccount {
205 type Error = crate::Error;
206
207 async fn identity_log(&self) -> Result<Arc<RwLock<FolderEventLog>>> {
208 let account = self.account.lock().await;
209 Ok(account.identity_log().await?)
210 }
211
212 async fn account_log(&self) -> Result<Arc<RwLock<AccountEventLog>>> {
213 let account = self.account.lock().await;
214 Ok(account.account_log().await?)
215 }
216
217 async fn device_log(&self) -> Result<Arc<RwLock<DeviceEventLog>>> {
218 let account = self.account.lock().await;
219 Ok(account.device_log().await?)
220 }
221
222 #[cfg(feature = "files")]
223 async fn file_log(&self) -> Result<Arc<RwLock<FileEventLog>>> {
224 let account = self.account.lock().await;
225 Ok(account.file_log().await?)
226 }
227
228 async fn folder_details(&self) -> Result<IndexSet<Summary>> {
229 let account = self.account.lock().await;
230 Ok(account.folder_details().await?)
231 }
232
233 async fn folder_log(
234 &self,
235 id: &VaultId,
236 ) -> Result<Arc<RwLock<FolderEventLog>>> {
237 let account = self.account.lock().await;
238 Ok(account.folder_log(id).await?)
239 }
240}
241
242#[async_trait]
243impl Merge for NetworkAccount {
244 async fn merge_identity(
245 &mut self,
246 diff: FolderDiff,
247 outcome: &mut MergeOutcome,
248 ) -> Result<CheckedPatch> {
249 let mut account = self.account.lock().await;
250 Ok(account.merge_identity(diff, outcome).await?)
251 }
252
253 async fn compare_identity(
254 &self,
255 state: &CommitState,
256 ) -> Result<Comparison> {
257 let account = self.account.lock().await;
258 Ok(account.compare_identity(state).await?)
259 }
260
261 async fn merge_account(
262 &mut self,
263 diff: AccountDiff,
264 outcome: &mut MergeOutcome,
265 ) -> Result<(CheckedPatch, HashSet<VaultId>)> {
266 let mut account = self.account.lock().await;
267 Ok(account.merge_account(diff, outcome).await?)
268 }
269
270 async fn compare_account(
271 &self,
272 state: &CommitState,
273 ) -> Result<Comparison> {
274 let account = self.account.lock().await;
275 Ok(account.compare_account(state).await?)
276 }
277
278 async fn merge_device(
279 &mut self,
280 diff: DeviceDiff,
281 outcome: &mut MergeOutcome,
282 ) -> Result<CheckedPatch> {
283 let mut account = self.account.lock().await;
284 Ok(account.merge_device(diff, outcome).await?)
285 }
286
287 async fn compare_device(
288 &self,
289 state: &CommitState,
290 ) -> Result<Comparison> {
291 let account = self.account.lock().await;
292 Ok(account.compare_device(state).await?)
293 }
294
295 #[cfg(feature = "files")]
296 async fn merge_files(
297 &mut self,
298 diff: FileDiff,
299 outcome: &mut MergeOutcome,
300 ) -> Result<CheckedPatch> {
301 let mut account = self.account.lock().await;
302 Ok(account.merge_files(diff, outcome).await?)
303 }
304
305 #[cfg(feature = "files")]
306 async fn compare_files(&self, state: &CommitState) -> Result<Comparison> {
307 let account = self.account.lock().await;
308 Ok(account.compare_files(state).await?)
309 }
310
311 async fn merge_folder(
312 &mut self,
313 folder_id: &VaultId,
314 diff: FolderDiff,
315 outcome: &mut MergeOutcome,
316 ) -> Result<(CheckedPatch, Vec<WriteEvent>)> {
317 let mut account = self.account.lock().await;
318 Ok(account.merge_folder(folder_id, diff, outcome).await?)
319 }
320
321 async fn compare_folder(
322 &self,
323 folder_id: &VaultId,
324 state: &CommitState,
325 ) -> Result<Comparison> {
326 let account = self.account.lock().await;
327 Ok(account.compare_folder(folder_id, state).await?)
328 }
329}
330
331#[async_trait]
332impl ForceMerge for NetworkAccount {
333 async fn force_merge_identity(
334 &mut self,
335 diff: FolderDiff,
336 outcome: &mut MergeOutcome,
337 ) -> Result<()> {
338 let mut account = self.account.lock().await;
339 Ok(account.force_merge_identity(diff, outcome).await?)
340 }
341
342 async fn force_merge_account(
343 &mut self,
344 diff: AccountDiff,
345 outcome: &mut MergeOutcome,
346 ) -> Result<()> {
347 let mut account = self.account.lock().await;
348 Ok(account.force_merge_account(diff, outcome).await?)
349 }
350
351 async fn force_merge_device(
352 &mut self,
353 diff: DeviceDiff,
354 outcome: &mut MergeOutcome,
355 ) -> Result<()> {
356 let mut account = self.account.lock().await;
357 Ok(account.force_merge_device(diff, outcome).await?)
358 }
359
360 #[cfg(feature = "files")]
362 async fn force_merge_files(
363 &mut self,
364 diff: FileDiff,
365 outcome: &mut MergeOutcome,
366 ) -> Result<()> {
367 let mut account = self.account.lock().await;
368 Ok(account.force_merge_files(diff, outcome).await?)
369 }
370
371 async fn force_merge_folder(
372 &mut self,
373 folder_id: &VaultId,
374 diff: FolderDiff,
375 outcome: &mut MergeOutcome,
376 ) -> Result<()> {
377 let mut account = self.account.lock().await;
378 Ok(account.force_merge_folder(folder_id, diff, outcome).await?)
379 }
380}
381
382#[async_trait]
383impl SyncStorage for NetworkAccount {
384 fn is_client_storage(&self) -> bool {
385 true
386 }
387
388 async fn sync_status(&self) -> Result<SyncStatus> {
389 let account = self.account.lock().await;
390 Ok(account.sync_status().await?)
391 }
392}