sos_protocol/
diff.rs

1//! Types and functions to compute diffs.
2use indexmap::IndexMap;
3use sos_core::{
4    commit::Comparison,
5    events::{patch::FolderDiff, EventLog},
6    VaultId,
7};
8use sos_sync::{
9    MaybeDiff, StorageEventLogs, SyncDiff, SyncStatus, SyncStorage,
10};
11
12#[cfg(feature = "files")]
13use sos_core::events::patch::FileDiff;
14
15/// Comparison between local and remote status.
16#[derive(Debug)]
17pub struct SyncComparison {
18    /// Local sync status.
19    pub local_status: SyncStatus,
20    /// Remote sync status.
21    pub remote_status: SyncStatus,
22    /// Comparison of the identity event log.
23    pub identity: Comparison,
24    /// Comparison of the account event log.
25    pub account: Comparison,
26    /// Comparison of the device event log.
27    pub device: Comparison,
28    /// Comparison of the files event log.
29    #[cfg(feature = "files")]
30    pub files: Option<Comparison>,
31    /// Comparison for each folder in the account.
32    pub folders: IndexMap<VaultId, Comparison>,
33}
34
35impl SyncComparison {
36    /// Create a new sync comparison.
37    pub async fn new<S, E>(
38        storage: &S,
39        remote_status: SyncStatus,
40    ) -> std::result::Result<SyncComparison, E>
41    where
42        S: SyncStorage,
43        E: From<<S as StorageEventLogs>::Error> + From<sos_core::Error>,
44    {
45        let local_status = storage.sync_status().await?;
46
47        let identity = {
48            let identity = storage.identity_log().await?;
49            let reader = identity.read().await;
50            reader.tree().compare(&remote_status.identity.1)?
51        };
52
53        let account = {
54            let account = storage.account_log().await?;
55            let reader = account.read().await;
56            reader.tree().compare(&remote_status.account.1)?
57        };
58
59        let device = {
60            let device = storage.device_log().await?;
61            let reader = device.read().await;
62            reader.tree().compare(&remote_status.device.1)?
63        };
64
65        #[cfg(feature = "files")]
66        let files = {
67            let files = storage.file_log().await?;
68            let reader = files.read().await;
69            if let Some(files) = &remote_status.files {
70                if reader.tree().is_empty() {
71                    None
72                } else {
73                    Some(reader.tree().compare(&files.1)?)
74                }
75            } else if reader.tree().is_empty() {
76                None
77            } else {
78                Some(Comparison::Unknown)
79            }
80        };
81
82        let folders = {
83            let mut folders = IndexMap::new();
84            for (id, folder) in &remote_status.folders {
85                // Folder may exist on remote but not locally
86                // if we have just deleted a folder
87                if let Ok(event_log) = storage.folder_log(id).await {
88                    let event_log = event_log.read().await;
89                    folders.insert(*id, event_log.tree().compare(&folder.1)?);
90                }
91            }
92
93            folders
94        };
95
96        Ok(SyncComparison {
97            local_status,
98            remote_status,
99            identity,
100            account,
101            device,
102            #[cfg(feature = "files")]
103            files,
104            folders,
105        })
106    }
107
108    /// Determine if synchronization is required.
109    pub fn needs_sync(&self) -> bool {
110        self.local_status != self.remote_status
111    }
112
113    /// Build a diff from this comparison.
114    ///
115    /// The diff includes changes on local that are not yet
116    /// present on the remote or information that will allow
117    /// a comparison on the remote.
118    pub async fn diff<S, E>(
119        &self,
120        storage: &S,
121    ) -> std::result::Result<SyncDiff, E>
122    where
123        S: SyncStorage,
124        E: std::error::Error
125            + std::fmt::Debug
126            + From<<S as StorageEventLogs>::Error>
127            + From<sos_backend::Error>
128            + From<sos_backend::StorageError>
129            + From<sos_core::Error>,
130    {
131        let mut diff: SyncDiff = Default::default();
132
133        match self.identity {
134            Comparison::Equal => {}
135            Comparison::Contains(_) => {
136                // Need to push changes to remote
137                let log = storage.identity_log().await?;
138                let reader = log.read().await;
139                let is_last_commit = Some(&self.remote_status.identity.0)
140                    == reader.tree().last_commit().as_ref();
141
142                // Avoid empty patches when commit is already the last
143                if !is_last_commit {
144                    let identity = reader
145                        .diff_checked(
146                            Some(self.remote_status.identity.0),
147                            self.remote_status.identity.1.clone(),
148                        )
149                        .await?;
150                    diff.identity = Some(MaybeDiff::Diff(identity));
151                }
152            }
153            Comparison::Unknown => {
154                tracing::info!(
155                    local = ?self.local_status.identity,
156                    remote = ?self.remote_status.identity,
157                    "identity folder divergence"
158                );
159
160                diff.identity = Some(MaybeDiff::Compare(Some(
161                    self.local_status.identity.clone(),
162                )));
163            }
164        }
165
166        match self.account {
167            Comparison::Equal => {}
168            Comparison::Contains(_) => {
169                // Need to push changes to remote
170                let log = storage.account_log().await?;
171                let reader = log.read().await;
172
173                let is_last_commit = Some(&self.remote_status.account.0)
174                    == reader.tree().last_commit().as_ref();
175
176                // Avoid empty patches when commit is already the last
177                if !is_last_commit {
178                    let account = reader
179                        .diff_checked(
180                            Some(self.remote_status.account.0),
181                            self.remote_status.account.1.clone(),
182                        )
183                        .await?;
184                    diff.account = Some(MaybeDiff::Diff(account));
185                }
186            }
187            Comparison::Unknown => {
188                tracing::info!(
189                    local = ?self.local_status.account,
190                    remote = ?self.remote_status.account,
191                    "account events divergence"
192                );
193
194                diff.account = Some(MaybeDiff::Compare(Some(
195                    self.local_status.account.clone(),
196                )));
197            }
198        }
199
200        match self.device {
201            Comparison::Equal => {}
202            Comparison::Contains(_) => {
203                // Need to push changes to remote
204                let log = storage.device_log().await?;
205                let reader = log.read().await;
206
207                let is_last_commit = Some(&self.remote_status.device.0)
208                    == reader.tree().last_commit().as_ref();
209
210                // Avoid empty patches when commit is already the last
211                if !is_last_commit {
212                    let device = reader
213                        .diff_checked(
214                            Some(self.remote_status.device.0),
215                            self.remote_status.device.1.clone(),
216                        )
217                        .await?;
218                    diff.device = Some(MaybeDiff::Diff(device));
219                }
220            }
221            Comparison::Unknown => {
222                tracing::info!(
223                    local = ?self.local_status.device,
224                    remote = ?self.remote_status.device,
225                    "device events divergence"
226                );
227
228                // NOTE: this will break the device revoke test spec!
229                /*
230                diff.device = Some(MaybeDiff::Compare(Some(
231                    self.local_status.device.clone(),
232                )));
233                */
234            }
235        }
236
237        #[cfg(feature = "files")]
238        match (&self.files, &self.remote_status.files) {
239            (Some(files), Some(remote_files)) => {
240                match files {
241                    Comparison::Equal => {}
242                    Comparison::Contains(_) => {
243                        // Need to push changes to remote
244                        let log = storage.file_log().await?;
245                        let reader = log.read().await;
246
247                        let is_last_commit = Some(&remote_files.0)
248                            == reader.tree().last_commit().as_ref();
249
250                        // Avoid empty patches when commit is already the last
251                        if !is_last_commit {
252                            let files = reader
253                                .diff_checked(
254                                    Some(remote_files.0),
255                                    remote_files.1.clone(),
256                                )
257                                .await?;
258
259                            diff.files = Some(MaybeDiff::Diff(files));
260                        }
261                    }
262                    Comparison::Unknown => {
263                        tracing::info!(
264                            local = ?files,
265                            remote = ?remote_files,
266                            "file events divergence"
267                        );
268
269                        diff.files = Some(MaybeDiff::Compare(
270                            self.local_status.files.clone(),
271                        ));
272                    }
273                }
274            }
275            // Remote does not have any files yet so we need
276            // to send the entire file event log
277            (Some(Comparison::Unknown), None) => {
278                // Need to push changes to remote
279                let log = storage.file_log().await?;
280                let reader = log.read().await;
281                if !reader.tree().is_empty() {
282                    let files = FileDiff {
283                        last_commit: None,
284                        patch: reader.diff_events(None).await?,
285                        checkpoint: Default::default(),
286                    };
287                    diff.files = Some(MaybeDiff::Diff(files));
288                }
289            }
290            _ => {}
291        }
292
293        for (id, folder) in &self.folders {
294            let commit_state = self
295                .remote_status
296                .folders
297                .get(id)
298                .ok_or(sos_backend::StorageError::FolderNotFound(*id))?;
299
300            match folder {
301                Comparison::Equal => {}
302                Comparison::Contains(_) => {
303                    // Need to push changes to remote
304                    let log = storage.folder_log(id).await?;
305
306                    let log = log.read().await;
307                    let folder = log
308                        .diff_checked(
309                            Some(commit_state.0),
310                            commit_state.1.clone(),
311                        )
312                        .await?;
313
314                    if !folder.patch.is_empty() {
315                        diff.folders.insert(*id, MaybeDiff::Diff(folder));
316                    }
317                }
318                Comparison::Unknown => {
319                    tracing::info!(
320                        id = %id,
321                        local = ?self.local_status.folders.get(id),
322                        remote = ?commit_state,
323                        "folder events divergence"
324                    );
325
326                    diff.folders.insert(
327                        *id,
328                        MaybeDiff::Compare(
329                            self.local_status.folders.get(id).cloned(),
330                        ),
331                    );
332                }
333            }
334        }
335
336        // Handle events for new folders on local that
337        // don't exist on remote yet
338        for (id, _) in &self.local_status.folders {
339            if self.remote_status.folders.get(id).is_none() {
340                let log = storage.folder_log(id).await?;
341                let log = log.read().await;
342                let first_commit = log.tree().first_commit()?;
343
344                let folder = FolderDiff {
345                    last_commit: Some(first_commit.0),
346                    patch: log.diff_events(Some(&first_commit.0)).await?,
347                    checkpoint: first_commit.1,
348                };
349
350                if !folder.patch.is_empty() {
351                    diff.folders.insert(*id, MaybeDiff::Diff(folder));
352                }
353            }
354        }
355
356        Ok(diff)
357    }
358}
359
360/// Difference between a local sync status and a remote
361/// sync status.
362pub async fn diff<S, E>(
363    storage: &S,
364    remote_status: SyncStatus,
365) -> std::result::Result<(bool, SyncStatus, SyncDiff), E>
366where
367    S: SyncStorage,
368    E: std::error::Error
369        + std::fmt::Debug
370        + From<<S as StorageEventLogs>::Error>
371        + From<sos_core::Error>
372        + From<sos_backend::Error>
373        + From<sos_backend::StorageError>
374        + Send
375        + Sync
376        + 'static,
377{
378    let comparison = {
379        // Compare local status to the remote
380        SyncComparison::new::<_, E>(storage, remote_status).await?
381    };
382
383    let needs_sync = comparison.needs_sync();
384    let mut diff = comparison.diff::<_, E>(storage).await?;
385
386    let is_server = !storage.is_client_storage();
387    if is_server {
388        let storage_folders = storage.folder_details().await?;
389        diff.folders.retain(|k, _| {
390            if let Some(folder) = storage_folders.iter().find(|s| s.id() == k)
391            {
392                !folder.flags().is_sync_disabled()
393            } else {
394                true
395            }
396        });
397    }
398
399    Ok((needs_sync, comparison.local_status, diff))
400}