sos_remote_sync/
remote.rs1use async_trait::async_trait;
4use sos_account::Account;
5use sos_core::{AccountId, Origin};
6use sos_protocol::{AsConflict, ConflictError, SyncClient};
7use sos_sync::{
8 MaybeDiff, Merge, MergeOutcome, StorageEventLogs, SyncDirection,
9 SyncPacket, SyncStatus, SyncStorage,
10};
11use sos_vfs as vfs;
12use std::{collections::HashMap, sync::Arc};
13use tokio::sync::Mutex;
14
15#[cfg(feature = "files")]
16use sos_protocol::transfer::{
17 FileOperation, FileTransferQueueSender, TransferOperation,
18};
19
20#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
23#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24pub trait RemoteSyncHandler {
25 type Client: SyncClient + Send + Sync + 'static;
27
28 type Account: Account + SyncStorage;
30
31 type Error: std::error::Error
33 + std::fmt::Debug
34 + AsConflict
35 + From<ConflictError>
36 + From<crate::Error>
37 + From<sos_core::Error>
38 + From<sos_backend::StorageError>
39 + From<sos_account::Error>
40 + From<sos_backend::Error>
41 + From<std::io::Error>
42 + From<<Self::Account as Account>::Error>
43 + From<<Self::Account as StorageEventLogs>::Error>
44 + From<<Self::Client as SyncClient>::Error>
45 + Send
46 + Sync
47 + 'static;
48
49 fn client(&self) -> &Self::Client;
51
52 fn origin(&self) -> &Origin;
54
55 fn account_id(&self) -> &AccountId;
57
58 fn account(&self) -> Arc<Mutex<Self::Account>>;
60
61 fn direction(&self) -> SyncDirection;
63
64 #[cfg(feature = "files")]
66 fn file_transfer_queue(&self) -> &FileTransferQueueSender;
67
68 #[cfg(feature = "files")]
70 async fn execute_sync_file_transfers(&self) -> Result<(), Self::Error>;
71
72 #[doc(hidden)]
74 async fn create_push_account(&self) -> Result<(), Self::Error> {
75 {
76 let account = self.account();
77 let account = account.lock().await;
78 let public_account = account.create_set().await?;
79 self.client().create_account(public_account).await?;
80 }
81
82 #[cfg(feature = "files")]
83 self.execute_sync_file_transfers().await?;
84
85 Ok(())
86 }
87
88 #[doc(hidden)]
90 async fn create_pull_account(&self) -> Result<(), Self::Error> {
91 tracing::info!("create_pull_account");
92
93 let public_account = self.client().fetch_account().await?;
95
96 tracing::info!("create_pull_account::fetch_completed");
97
98 {
99 let account = self.account();
100 let mut account = account.lock().await;
101 account.import_account_events(public_account).await?;
102 }
103
104 Ok(())
110 }
111
112 async fn create_account(&self) -> Result<(), Self::Error> {
115 match self.direction() {
116 SyncDirection::Push => self.create_push_account().await,
117 SyncDirection::Pull => self.create_pull_account().await,
118 }
119 }
120
121 async fn sync_account(
123 &self,
124 remote_status: SyncStatus,
125 ) -> Result<MergeOutcome, Self::Error> {
126 let account = self.account();
127 let mut account = account.lock().await;
128
129 tracing::debug!("merge_client");
130
131 let (needs_sync, local_status, local_changes) =
132 sos_protocol::diff::<_, Self::Error>(&*account, remote_status)
133 .await?;
134
135 tracing::debug!(needs_sync = %needs_sync, "merge_client");
136
137 let mut outcome = MergeOutcome::default();
138
139 if needs_sync {
140 let packet = SyncPacket {
141 status: local_status,
142 diff: local_changes,
143 compare: None,
144 };
145 let remote_changes = self.client().sync(packet.clone()).await?;
146
147 let maybe_conflict = remote_changes
148 .compare
149 .as_ref()
150 .map(|c| c.maybe_conflict())
151 .unwrap_or_default();
152 let has_conflicts = maybe_conflict.has_conflicts();
153
154 if !has_conflicts {
155 account.merge(remote_changes.diff, &mut outcome).await?;
156
157 #[cfg(feature = "files")]
160 if !outcome.external_files.is_empty() {
161 use sos_account::Account;
162 let paths = account.paths();
163 for file in outcome.external_files.drain(..) {
166 let file_path = paths.into_file_path(&file);
167 if !vfs::try_exists(file_path).await? {
168 tracing::debug!(
169 file = ?file,
170 "add file download to transfers",
171 );
172
173 if self.file_transfer_queue().receiver_count() > 0
174 {
175 let _ =
176 self.file_transfer_queue().send(vec![
177 FileOperation(
178 file,
179 TransferOperation::Download,
180 ),
181 ]);
182 }
183 }
184 }
185 }
186
187 } else {
189 if !maybe_conflict.identity {
192 if let Some(MaybeDiff::Diff(diff)) =
193 remote_changes.diff.identity
194 {
195 account.merge_identity(diff, &mut outcome).await?;
196 }
197 }
198 if !maybe_conflict.account {
199 if let Some(MaybeDiff::Diff(diff)) =
200 remote_changes.diff.account
201 {
202 account.merge_account(diff, &mut outcome).await?;
203 }
204 }
205 if !maybe_conflict.device {
206 if let Some(MaybeDiff::Diff(diff)) =
207 remote_changes.diff.device
208 {
209 account.merge_device(diff, &mut outcome).await?;
210 }
211 }
212 #[cfg(feature = "files")]
213 if !maybe_conflict.files {
214 if let Some(MaybeDiff::Diff(diff)) =
215 remote_changes.diff.files
216 {
217 account.merge_files(diff, &mut outcome).await?;
218 }
219 }
220
221 let merge_folders = remote_changes
222 .diff
223 .folders
224 .into_iter()
225 .filter(|(k, _)| maybe_conflict.folders.get(k).is_none())
226 .collect::<HashMap<_, _>>();
227 for (id, maybe_diff) in merge_folders {
228 if let MaybeDiff::Diff(diff) = maybe_diff {
229 account.merge_folder(&id, diff, &mut outcome).await?;
230 }
231 }
232 return Err(ConflictError::Soft {
233 conflict: maybe_conflict,
234 local: packet.status,
235 remote: remote_changes.status,
236 }
237 .into());
238 }
239 }
240
241 Ok(outcome)
242 }
243}