sos_remote_sync/
remote.rs

1//! Handler that can synchronize account data between a
2//! remote data source and local account.
3use async_trait::async_trait;
4use sos_account::Account;
5use sos_core::{AccountId, Origin};
6use sos_protocol::{AsConflict, ConflictError, SyncClient};
7use sos_sync::{
8    MaybeDiff, Merge, MergeOutcome, StorageEventLogs, SyncDirection,
9    SyncPacket, SyncStatus, SyncStorage,
10};
11use sos_vfs as vfs;
12use std::{collections::HashMap, sync::Arc};
13use tokio::sync::Mutex;
14
15#[cfg(feature = "files")]
16use sos_protocol::transfer::{
17    FileOperation, FileTransferQueueSender, TransferOperation,
18};
19
20/// Trait for types that bridge between a remote data source
21/// and a local account.
22#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
23#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24pub trait RemoteSyncHandler {
25    /// Client used to fetch data from the data source.
26    type Client: SyncClient + Send + Sync + 'static;
27
28    /// Local account.
29    type Account: Account + SyncStorage;
30
31    /// Error implementation.
32    type Error: std::error::Error
33        + std::fmt::Debug
34        + AsConflict
35        + From<ConflictError>
36        + From<crate::Error>
37        + From<sos_core::Error>
38        + From<sos_backend::StorageError>
39        + From<sos_account::Error>
40        + From<sos_backend::Error>
41        + From<std::io::Error>
42        + From<<Self::Account as Account>::Error>
43        + From<<Self::Account as StorageEventLogs>::Error>
44        + From<<Self::Client as SyncClient>::Error>
45        + Send
46        + Sync
47        + 'static;
48
49    /// Client implementation.
50    fn client(&self) -> &Self::Client;
51
52    /// Remote origin.
53    fn origin(&self) -> &Origin;
54
55    /// Account identifier.
56    fn account_id(&self) -> &AccountId;
57
58    /// Local account.
59    fn account(&self) -> Arc<Mutex<Self::Account>>;
60
61    /// Direction for account creation and auto merge.
62    fn direction(&self) -> SyncDirection;
63
64    /// Queue for file transfers.
65    #[cfg(feature = "files")]
66    fn file_transfer_queue(&self) -> &FileTransferQueueSender;
67
68    /// Sync file transfers.
69    #[cfg(feature = "files")]
70    async fn execute_sync_file_transfers(&self) -> Result<(), Self::Error>;
71
72    /// Push an account to the remote.
73    #[doc(hidden)]
74    async fn create_push_account(&self) -> Result<(), Self::Error> {
75        {
76            let account = self.account();
77            let account = account.lock().await;
78            let public_account = account.create_set().await?;
79            self.client().create_account(public_account).await?;
80        }
81
82        #[cfg(feature = "files")]
83        self.execute_sync_file_transfers().await?;
84
85        Ok(())
86    }
87
88    /// Pull an account from the remote.
89    #[doc(hidden)]
90    async fn create_pull_account(&self) -> Result<(), Self::Error> {
91        tracing::info!("create_pull_account");
92
93        // Get account data from the remote.
94        let public_account = self.client().fetch_account().await?;
95
96        tracing::info!("create_pull_account::fetch_completed");
97
98        {
99            let account = self.account();
100            let mut account = account.lock().await;
101            account.import_account_events(public_account).await?;
102        }
103
104        /*
105        #[cfg(feature = "files")]
106        self.execute_sync_file_transfers().await?;
107        */
108
109        Ok(())
110    }
111
112    /// Create an account on local or remote depending
113    /// on the sync direction.
114    async fn create_account(&self) -> Result<(), Self::Error> {
115        match self.direction() {
116            SyncDirection::Push => self.create_push_account().await,
117            SyncDirection::Pull => self.create_pull_account().await,
118        }
119    }
120
121    /// Sync the account.
122    async fn sync_account(
123        &self,
124        remote_status: SyncStatus,
125    ) -> Result<MergeOutcome, Self::Error> {
126        let account = self.account();
127        let mut account = account.lock().await;
128
129        tracing::debug!("merge_client");
130
131        let (needs_sync, local_status, local_changes) =
132            sos_protocol::diff::<_, Self::Error>(&*account, remote_status)
133                .await?;
134
135        tracing::debug!(needs_sync = %needs_sync, "merge_client");
136
137        let mut outcome = MergeOutcome::default();
138
139        if needs_sync {
140            let packet = SyncPacket {
141                status: local_status,
142                diff: local_changes,
143                compare: None,
144            };
145            let remote_changes = self.client().sync(packet.clone()).await?;
146
147            let maybe_conflict = remote_changes
148                .compare
149                .as_ref()
150                .map(|c| c.maybe_conflict())
151                .unwrap_or_default();
152            let has_conflicts = maybe_conflict.has_conflicts();
153
154            if !has_conflicts {
155                account.merge(remote_changes.diff, &mut outcome).await?;
156
157                // Compute which external files need to be downloaded
158                // and add to the transfers queue
159                #[cfg(feature = "files")]
160                if !outcome.external_files.is_empty() {
161                    use sos_account::Account;
162                    let paths = account.paths();
163                    // let mut writer = self.transfers.write().await;
164
165                    for file in outcome.external_files.drain(..) {
166                        let file_path = paths.into_file_path(&file);
167                        if !vfs::try_exists(file_path).await? {
168                            tracing::debug!(
169                                file = ?file,
170                                "add file download to transfers",
171                            );
172
173                            if self.file_transfer_queue().receiver_count() > 0
174                            {
175                                let _ =
176                                    self.file_transfer_queue().send(vec![
177                                        FileOperation(
178                                            file,
179                                            TransferOperation::Download,
180                                        ),
181                                    ]);
182                            }
183                        }
184                    }
185                }
186
187                // self.compare(&mut *account, remote_changes).await?;
188            } else {
189                // Some parts of the remote patch may not
190                // be in conflict and must still be merged
191                if !maybe_conflict.identity {
192                    if let Some(MaybeDiff::Diff(diff)) =
193                        remote_changes.diff.identity
194                    {
195                        account.merge_identity(diff, &mut outcome).await?;
196                    }
197                }
198                if !maybe_conflict.account {
199                    if let Some(MaybeDiff::Diff(diff)) =
200                        remote_changes.diff.account
201                    {
202                        account.merge_account(diff, &mut outcome).await?;
203                    }
204                }
205                if !maybe_conflict.device {
206                    if let Some(MaybeDiff::Diff(diff)) =
207                        remote_changes.diff.device
208                    {
209                        account.merge_device(diff, &mut outcome).await?;
210                    }
211                }
212                #[cfg(feature = "files")]
213                if !maybe_conflict.files {
214                    if let Some(MaybeDiff::Diff(diff)) =
215                        remote_changes.diff.files
216                    {
217                        account.merge_files(diff, &mut outcome).await?;
218                    }
219                }
220
221                let merge_folders = remote_changes
222                    .diff
223                    .folders
224                    .into_iter()
225                    .filter(|(k, _)| maybe_conflict.folders.get(k).is_none())
226                    .collect::<HashMap<_, _>>();
227                for (id, maybe_diff) in merge_folders {
228                    if let MaybeDiff::Diff(diff) = maybe_diff {
229                        account.merge_folder(&id, diff, &mut outcome).await?;
230                    }
231                }
232                return Err(ConflictError::Soft {
233                    conflict: maybe_conflict,
234                    local: packet.status,
235                    remote: remote_changes.status,
236                }
237                .into());
238            }
239        }
240
241        Ok(outcome)
242    }
243}