lb_rs/service/
sync.rs

1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4    ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5    GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use uuid::Uuid;
35
36pub type SyncFlag = Arc<AtomicBool>;
37
38pub struct SyncContext {
39    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
40    current: usize,
41    total: usize,
42
43    pk_cache: HashMap<Owner, String>,
44    last_synced: u64,
45    remote_changes: Vec<SignedFile>,
46    update_as_of: u64,
47    root: Option<Uuid>,
48    pushed_metas: Vec<FileDiff<SignedFile>>,
49    pushed_docs: Vec<FileDiff<SignedFile>>,
50    pulled_docs: Vec<Uuid>,
51}
52
53impl Lb {
54    #[instrument(level = "debug", skip_all, err(Debug))]
55    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
56        let tx = self.ro_tx().await;
57        let db = tx.db();
58        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
59        drop(tx);
60
61        let remote_changes = self
62            .client
63            .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
64            .await?;
65        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
66        let remote_dirty = deduped
67            .into_iter()
68            .map(|f| *f.id())
69            .map(WorkUnit::ServerChange);
70
71        self.prune().await?;
72
73        let tx = self.ro_tx().await;
74        let db = tx.db();
75
76        let locally_dirty = db
77            .local_metadata
78            .get()
79            .keys()
80            .copied()
81            .map(WorkUnit::LocalChange);
82
83        let mut work_units: Vec<WorkUnit> = Vec::new();
84        work_units.extend(locally_dirty.chain(remote_dirty));
85        Ok(SyncStatus { work_units, latest_server_ts })
86    }
87
88    #[instrument(level = "debug", skip_all, err(Debug))]
89    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
90        let old = self.syncing.swap(true, Ordering::SeqCst);
91        if old {
92            return Err(LbErrKind::AlreadySyncing.into());
93        }
94
95        let mut ctx = self.setup_sync(f).await?;
96
97        let mut got_updates = false;
98        let mut pipeline: LbResult<()> = async {
99            ctx.msg("Preparing Sync..."); // todo remove
100            self.events.sync(SyncIncrement::SyncStarted);
101            self.prune().await?;
102            got_updates = self.fetch_meta(&mut ctx).await?;
103            self.populate_pk_cache(&mut ctx).await?;
104            self.docs.dont_delete.store(true, Ordering::SeqCst);
105            self.fetch_docs(&mut ctx).await?;
106            self.merge(&mut ctx).await?;
107            self.push_meta(&mut ctx).await?;
108            self.push_docs(&mut ctx).await?;
109            Ok(())
110        }
111        .await;
112
113        self.docs.dont_delete.store(false, Ordering::SeqCst);
114
115        if pipeline.is_ok() {
116            pipeline = self.commit_last_synced(&mut ctx).await;
117        }
118
119        let cleanup = self.cleanup().await;
120
121        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
122        self.events.sync(SyncIncrement::SyncFinished(ekind));
123
124        self.syncing.store(false, Ordering::Relaxed);
125        pipeline?;
126        cleanup?;
127
128        // done not being sent if pipeline is an error is likely the reason we get stuck offline
129        ctx.done_msg();
130
131        if got_updates {
132            // did it?
133            self.events.meta_changed();
134            for id in &ctx.pulled_docs {
135                self.events.doc_written(*id);
136            }
137        }
138
139        Ok(ctx.summarize())
140    }
141
142    async fn setup_sync(
143        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
144    ) -> LbResult<SyncContext> {
145        let tx = self.ro_tx().await;
146        let db = tx.db();
147
148        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
149        let pk_cache = db.pub_key_lookup.get().clone();
150
151        let current = 0;
152        let total = 7;
153
154        Ok(SyncContext {
155            last_synced,
156            pk_cache,
157
158            progress,
159            current,
160            total,
161
162            root: Default::default(),
163            update_as_of: Default::default(),
164            remote_changes: Default::default(),
165            pushed_docs: Default::default(),
166            pushed_metas: Default::default(),
167            pulled_docs: Default::default(),
168        })
169    }
170
171    async fn prune(&self) -> LbResult<()> {
172        let server_ids = self
173            .client
174            .request(self.get_account()?, GetFileIdsRequest {})
175            .await?
176            .ids;
177
178        let mut tx = self.begin_tx().await;
179        let db = tx.db();
180
181        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
182        let base_ids = local.tree.base.ids();
183
184        let mut prunable_ids = base_ids;
185        prunable_ids.retain(|id| !server_ids.contains(id));
186        for id in prunable_ids.clone() {
187            prunable_ids.extend(local.descendants(&id)?.into_iter());
188        }
189        for id in &prunable_ids {
190            if let Some(base_file) = local.tree.base.maybe_find(id) {
191                self.docs
192                    .delete(*id, base_file.document_hmac().copied())
193                    .await?;
194            }
195            if let Some(local_file) = local.maybe_find(id) {
196                self.docs
197                    .delete(*id, local_file.document_hmac().copied())
198                    .await?;
199            }
200        }
201
202        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
203        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
204        base_staged.promote()?;
205
206        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
207        local_staged.tree.removed = prunable_ids.into_iter().collect();
208        local_staged.promote()?;
209
210        Ok(())
211    }
212
213    /// Returns true if there were any updates
214    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
215        ctx.msg("Fetching tree updates...");
216        let updates = self
217            .client
218            .request(
219                self.get_account()?,
220                GetUpdatesRequest { since_metadata_version: ctx.last_synced },
221            )
222            .await?;
223
224        let empty = updates.file_metadata.is_empty();
225        let (remote, as_of, root) = self.dedup(updates).await?;
226
227        ctx.remote_changes = remote;
228        ctx.update_as_of = as_of;
229        ctx.root = root;
230
231        Ok(!empty)
232    }
233
234    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
235        ctx.msg("Updating public key cache...");
236        let mut all_owners = HashSet::new();
237        for file in &ctx.remote_changes {
238            for user_access_key in file.user_access_keys() {
239                all_owners.insert(Owner(user_access_key.encrypted_by));
240                all_owners.insert(Owner(user_access_key.encrypted_for));
241            }
242        }
243
244        let mut new_entries = HashMap::new();
245
246        for owner in all_owners {
247            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
248                let username_result = self
249                    .client
250                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
251                    .await;
252                let username = match username_result {
253                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
254                        "<unknown>".to_string()
255                    }
256                    _ => username_result?.username.clone(),
257                };
258                new_entries.insert(owner, username.clone());
259                e.insert(username.clone());
260            }
261        }
262
263        let mut tx = self.begin_tx().await;
264        let db = tx.db();
265
266        for (owner, username) in new_entries {
267            db.pub_key_lookup.insert(owner, username)?;
268        }
269        Ok(())
270    }
271
272    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
273        ctx.msg("Fetching documents...");
274        let mut docs_to_pull = vec![];
275
276        let tx = self.ro_tx().await;
277        let db = tx.db();
278        let start = Instant::now();
279
280        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
281        for id in remote.tree.staged.ids() {
282            if remote.calculate_deleted(&id)? {
283                continue;
284            }
285            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
286            let base_hmac = remote
287                .tree
288                .base
289                .maybe_find(&id)
290                .and_then(|f| f.document_hmac())
291                .cloned();
292            if base_hmac == remote_hmac {
293                continue;
294            }
295
296            if let Some(remote_hmac) = remote_hmac {
297                docs_to_pull.push((id, remote_hmac));
298                self.events.sync(SyncIncrement::PullingDocument(id, true));
299            }
300        }
301
302        drop(tx);
303        if start.elapsed() > std::time::Duration::from_millis(100) {
304            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
305        }
306
307        let num_docs = docs_to_pull.len();
308        ctx.total += num_docs;
309
310        let futures = docs_to_pull
311            .into_iter()
312            .map(|(id, hmac)| self.fetch_doc(id, hmac));
313
314        let mut stream = stream::iter(futures).buffer_unordered(
315            thread::available_parallelism()
316                .unwrap_or(NonZeroUsize::new(4).unwrap())
317                .into(),
318        );
319
320        let mut idx = 0;
321        while let Some(fut) = stream.next().await {
322            let id = fut?;
323            ctx.pulled_docs.push(id);
324            self.events.sync(SyncIncrement::PullingDocument(id, false));
325            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
326            idx += 1;
327        }
328        Ok(())
329    }
330
331    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
332        let remote_document = self
333            .client
334            .request(self.get_account()?, GetDocRequest { id, hmac })
335            .await?;
336        self.docs
337            .insert(id, Some(hmac), &remote_document.content)
338            .await?;
339
340        Ok(id)
341    }
342
343    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
344    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
345    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
346        let mut tx = self.begin_tx().await;
347        let db = tx.db();
348        let start = Instant::now();
349
350        let remote_changes = &ctx.remote_changes;
351
352        // fetch document updates and local documents for merge
353        let me = Owner(self.keychain.get_pk()?);
354
355        // compute merge changes
356        let merge_changes = {
357            // assemble trees
358            let mut base = (&db.base_metadata).to_lazy();
359            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
360            let mut remote = remote_unlazy.as_lazy();
361            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
362
363            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
364            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
365            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
366            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
367            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
368            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
369
370            'merge_construction: loop {
371                // process just the edits which allow us to check deletions in the result
372                let mut deletions = {
373                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
374
375                    // creations
376                    let mut deletion_creations = HashSet::new();
377                    for id in db.local_metadata.ids() {
378                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
379                            deletion_creations.insert(id);
380                        }
381                    }
382                    'drain_creations: while !deletion_creations.is_empty() {
383                        'choose_a_creation: for id in &deletion_creations {
384                            // create
385                            let id = *id;
386                            let local_file = local.find(&id)?.clone();
387                            let result = deletions.create_unvalidated(
388                                id,
389                                symkey::generate_key(),
390                                local_file.parent(),
391                                &local.name(&id, &self.keychain)?,
392                                local_file.file_type(),
393                                &self.keychain,
394                            );
395                            match result {
396                                Ok(_) => {
397                                    deletion_creations.remove(&id);
398                                    continue 'drain_creations;
399                                }
400                                Err(ref err) => match err.kind {
401                                    LbErrKind::FileParentNonexistent => {
402                                        continue 'choose_a_creation;
403                                    }
404                                    _ => {
405                                        result?;
406                                    }
407                                },
408                            }
409                        }
410                        return Err(LbErrKind::Unexpected(format!(
411                            "sync failed to find a topomodelal order for file creations: {:?}",
412                            deletion_creations
413                        ))
414                        .into());
415                    }
416
417                    // moves (creations happen first in case a file is moved into a new folder)
418                    for id in db.local_metadata.ids() {
419                        let local_file = local.find(&id)?.clone();
420                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
421                            if !local_file.explicitly_deleted()
422                                && local_file.parent() != base_file.parent()
423                                && !files_to_unmove.contains(&id)
424                            {
425                                // move
426                                deletions.move_unvalidated(
427                                    &id,
428                                    local_file.parent(),
429                                    &self.keychain,
430                                )?;
431                            }
432                        }
433                    }
434
435                    // deletions (moves happen first in case a file is moved into a deleted folder)
436                    for id in db.local_metadata.ids() {
437                        let local_file = local.find(&id)?.clone();
438                        if local_file.explicitly_deleted() {
439                            // delete
440                            deletions.delete_unvalidated(&id, &self.keychain)?;
441                        }
442                    }
443                    deletions
444                };
445
446                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
447                let mut merge = {
448                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
449
450                    // creations and edits of created documents
451                    let mut creations = HashSet::new();
452                    for id in db.local_metadata.ids() {
453                        if deletions.maybe_find(&id).is_some()
454                            && !deletions.calculate_deleted(&id)?
455                            && remote.maybe_find(&id).is_none()
456                            && !links_to_delete.contains(&id)
457                        {
458                            creations.insert(id);
459                        }
460                    }
461                    'drain_creations: while !creations.is_empty() {
462                        'choose_a_creation: for id in &creations {
463                            // create
464                            let id = *id;
465                            let local_file = local.find(&id)?.clone();
466                            let result = merge.create_unvalidated(
467                                id,
468                                local.decrypt_key(&id, &self.keychain)?,
469                                local_file.parent(),
470                                &local.name(&id, &self.keychain)?,
471                                local_file.file_type(),
472                                &self.keychain,
473                            );
474                            match result {
475                                Ok(_) => {
476                                    creations.remove(&id);
477                                    continue 'drain_creations;
478                                }
479                                Err(ref err) => match err.kind {
480                                    LbErrKind::FileParentNonexistent => {
481                                        continue 'choose_a_creation;
482                                    }
483                                    _ => {
484                                        result?;
485                                    }
486                                },
487                            }
488                        }
489                        return Err(LbErrKind::Unexpected(format!(
490                            "sync failed to find a topomodelal order for file creations: {:?}",
491                            creations
492                        ))
493                        .into());
494                    }
495
496                    // moves, renames, edits, and shares
497                    // creations happen first in case a file is moved into a new folder
498                    for id in db.local_metadata.ids() {
499                        // skip files that are already deleted or will be deleted
500                        if deletions.maybe_find(&id).is_none()
501                            || deletions.calculate_deleted(&id)?
502                            || (remote.maybe_find(&id).is_some()
503                                && remote.calculate_deleted(&id)?)
504                        {
505                            continue;
506                        }
507
508                        let local_file = local.find(&id)?.clone();
509                        let local_name = local.name(&id, &self.keychain)?;
510                        let maybe_base_file = base.maybe_find(&id).cloned();
511                        let maybe_remote_file = remote.maybe_find(&id).cloned();
512                        if let Some(ref base_file) = maybe_base_file {
513                            let base_name = base.name(&id, &self.keychain)?;
514                            let remote_file = remote.find(&id)?.clone();
515                            let remote_name = remote.name(&id, &self.keychain)?;
516
517                            // move
518                            if local_file.parent() != base_file.parent()
519                                && remote_file.parent() == base_file.parent()
520                                && !files_to_unmove.contains(&id)
521                            {
522                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
523                            }
524
525                            // rename
526                            if local_name != base_name && remote_name == base_name {
527                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
528                            }
529                        }
530
531                        // share
532                        let mut remote_keys = HashMap::new();
533                        if let Some(ref remote_file) = maybe_remote_file {
534                            for key in remote_file.user_access_keys() {
535                                remote_keys.insert(
536                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
537                                    (key.mode, key.deleted),
538                                );
539                            }
540                        }
541                        for key in local_file.user_access_keys() {
542                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
543                            if let Some(&(remote_mode, remote_deleted)) =
544                                remote_keys.get(&(by, for_))
545                            {
546                                // upgrade share
547                                if key.mode > remote_mode || !key.deleted && remote_deleted {
548                                    let mode = match key.mode {
549                                        UserAccessMode::Read => ShareMode::Read,
550                                        UserAccessMode::Write => ShareMode::Write,
551                                        UserAccessMode::Owner => continue,
552                                    };
553                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
554                                }
555                                // delete share
556                                if key.deleted && !remote_deleted {
557                                    merge.delete_share_unvalidated(
558                                        &id,
559                                        Some(for_.0),
560                                        &self.keychain,
561                                    )?;
562                                }
563                            } else {
564                                // add share
565                                let mode = match key.mode {
566                                    UserAccessMode::Read => ShareMode::Read,
567                                    UserAccessMode::Write => ShareMode::Write,
568                                    UserAccessMode::Owner => continue,
569                                };
570                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
571                            }
572                        }
573
574                        // share deletion due to conflicts
575                        if files_to_unshare.contains(&id) {
576                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
577                        }
578
579                        // rename due to path conflict
580                        if let Some(&rename_increment) = rename_increments.get(&id) {
581                            let name = NameComponents::from(&local_name)
582                                .generate_incremented(rename_increment)
583                                .to_name();
584                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
585                        }
586
587                        // edit
588                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
589                        let remote_hmac =
590                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
591                        let local_hmac = local_file.document_hmac().cloned();
592                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
593                            && local_hmac != base_hmac
594                        {
595                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
596                                // merge
597                                let merge_name = merge.name(&id, &self.keychain)?;
598                                let document_type =
599                                    DocumentType::from_file_name_using_extension(&merge_name);
600
601                                // todo these accesses are potentially problematic
602                                // maybe not if service/docs is the persion doing network io
603                                let base_document =
604                                    self.read_document_helper(id, &mut base).await?;
605                                let remote_document =
606                                    self.read_document_helper(id, &mut remote).await?;
607                                let local_document =
608                                    self.read_document_helper(id, &mut local).await?;
609
610                                match document_type {
611                                    DocumentType::Text => {
612                                        // 3-way merge
613                                        // todo: a couple more clones than necessary
614                                        let base_document =
615                                            String::from_utf8_lossy(&base_document).to_string();
616                                        let remote_document =
617                                            String::from_utf8_lossy(&remote_document).to_string();
618                                        let local_document =
619                                            String::from_utf8_lossy(&local_document).to_string();
620                                        let merged_document = Buffer::from(base_document.as_str())
621                                            .merge(local_document, remote_document);
622                                        let encrypted_document = merge
623                                            .update_document_unvalidated(
624                                                &id,
625                                                &merged_document.into_bytes(),
626                                                &self.keychain,
627                                            )?;
628                                        let hmac = merge.find(&id)?.document_hmac().copied();
629                                        self.docs.insert(id, hmac, &encrypted_document).await?;
630                                    }
631                                    DocumentType::Drawing => {
632                                        println!("sync merge");
633                                        let base_document =
634                                            String::from_utf8_lossy(&base_document).to_string();
635                                        let remote_document =
636                                            String::from_utf8_lossy(&remote_document).to_string();
637                                        let local_document =
638                                            String::from_utf8_lossy(&local_document).to_string();
639
640                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
641                                        let remote_buffer =
642                                            svg::buffer::Buffer::new(&remote_document);
643                                        let mut local_buffer =
644                                            svg::buffer::Buffer::new(&local_document);
645
646                                        for (_, el) in local_buffer.elements.iter_mut() {
647                                            if let Element::Path(path) = el {
648                                                path.data.apply_transform(u_transform_to_bezier(
649                                                    &local_buffer.master_transform,
650                                                ));
651                                            }
652                                        }
653                                        svg::buffer::Buffer::reload(
654                                            &mut local_buffer.elements,
655                                            &mut local_buffer.weak_images,
656                                            local_buffer.master_transform,
657                                            &base_buffer,
658                                            &remote_buffer,
659                                        );
660
661                                        let merged_document = local_buffer.serialize();
662                                        let encrypted_document = merge
663                                            .update_document_unvalidated(
664                                                &id,
665                                                &merged_document.into_bytes(),
666                                                &self.keychain,
667                                            )?;
668                                        let hmac = merge.find(&id)?.document_hmac().copied();
669                                        self.docs.insert(id, hmac, &encrypted_document).await?;
670                                    }
671                                    DocumentType::Other => {
672                                        // duplicate file
673                                        let merge_parent = *merge.find(&id)?.parent();
674                                        let duplicate_id = if let Some(&duplicate_id) =
675                                            duplicate_file_ids.get(&id)
676                                        {
677                                            duplicate_id
678                                        } else {
679                                            let duplicate_id = Uuid::new_v4();
680                                            duplicate_file_ids.insert(id, duplicate_id);
681                                            rename_increments.insert(duplicate_id, 1);
682                                            duplicate_id
683                                        };
684
685                                        let mut merge_name = merge_name;
686                                        merge_name = NameComponents::from(&merge_name)
687                                            .generate_incremented(
688                                                rename_increments
689                                                    .get(&duplicate_id)
690                                                    .copied()
691                                                    .unwrap_or_default(),
692                                            )
693                                            .to_name();
694
695                                        merge.create_unvalidated(
696                                            duplicate_id,
697                                            symkey::generate_key(),
698                                            &merge_parent,
699                                            &merge_name,
700                                            FileType::Document,
701                                            &self.keychain,
702                                        )?;
703                                        let encrypted_document = merge
704                                            .update_document_unvalidated(
705                                                &duplicate_id,
706                                                &local_document,
707                                                &self.keychain,
708                                            )?;
709                                        let duplicate_hmac =
710                                            merge.find(&duplicate_id)?.document_hmac().copied();
711                                        self.docs
712                                            .insert(
713                                                duplicate_id,
714                                                duplicate_hmac,
715                                                &encrypted_document,
716                                            )
717                                            .await?;
718                                    }
719                                }
720                            } else {
721                                // overwrite (todo: avoid reading/decrypting/encrypting document)
722                                let document = self.read_document_helper(id, &mut local).await?;
723                                merge.update_document_unvalidated(
724                                    &id,
725                                    &document,
726                                    &self.keychain,
727                                )?;
728                            }
729                        }
730                    }
731
732                    // deletes
733                    // moves happen first in case a file is moved into a deleted folder
734                    for id in db.local_metadata.ids() {
735                        if db.base_metadata.maybe_find(&id).is_some()
736                            && deletions.calculate_deleted(&id)?
737                            && !merge.calculate_deleted(&id)?
738                        {
739                            // delete
740                            merge.delete_unvalidated(&id, &self.keychain)?;
741                        }
742                    }
743                    for &id in &links_to_delete {
744                        // delete
745                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
746                            merge.delete_unvalidated(&id, &self.keychain)?;
747                        }
748                    }
749
750                    merge
751                };
752
753                // validate; handle failures by introducing changeset constraints
754                for link in merge.ids() {
755                    if !merge.calculate_deleted(&link)? {
756                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
757                            if merge.maybe_find(&target).is_some()
758                                && merge.calculate_deleted(&target)?
759                            {
760                                // delete links to deleted files
761                                if links_to_delete.insert(link) {
762                                    continue 'merge_construction;
763                                } else {
764                                    return Err(LbErrKind::Unexpected(format!(
765                                        "sync failed to resolve broken link (deletion): {:?}",
766                                        link
767                                    ))
768                                    .into());
769                                }
770                            }
771                        }
772                    }
773                }
774
775                let validate_result = merge.validate(me);
776                match validate_result {
777                    // merge changeset is valid
778                    Ok(_) => {
779                        let (_, merge_changes) = merge.unstage();
780                        break merge_changes;
781                    }
782                    Err(ref err) => match err.kind {
783                        LbErrKind::Validation(ref vf) => match vf {
784                            // merge changeset has resolvable validation errors and needs modification
785                            ValidationFailure::Cycle(ids) => {
786                                // revert all local moves in the cycle
787                                let mut progress = false;
788                                for &id in ids {
789                                    if db.local_metadata.maybe_find(&id).is_some()
790                                        && files_to_unmove.insert(id)
791                                    {
792                                        progress = true;
793                                    }
794                                }
795                                if !progress {
796                                    return Err(LbErrKind::Unexpected(format!(
797                                        "sync failed to resolve cycle: {:?}",
798                                        ids
799                                    ))
800                                    .into());
801                                }
802                            }
803                            ValidationFailure::PathConflict(ids) => {
804                                // pick one local id and generate a non-conflicting filename
805                                let mut progress = false;
806                                for &id in ids {
807                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
808                                        *rename_increments.entry(id).or_insert(0) += 1;
809                                        progress = true;
810                                        break;
811                                    }
812                                }
813                                if !progress {
814                                    for &id in ids {
815                                        if db.local_metadata.maybe_find(&id).is_some() {
816                                            *rename_increments.entry(id).or_insert(0) += 1;
817                                            progress = true;
818                                            break;
819                                        }
820                                    }
821                                }
822                                if !progress {
823                                    return Err(LbErrKind::Unexpected(format!(
824                                        "sync failed to resolve path conflict: {:?}",
825                                        ids
826                                    ))
827                                    .into());
828                                }
829                            }
830                            ValidationFailure::SharedLink { link, shared_ancestor } => {
831                                // if ancestor is newly shared, delete share, otherwise delete link
832                                let mut progress = false;
833                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
834                                {
835                                    if !base_shared_ancestor.is_shared()
836                                        && files_to_unshare.insert(*shared_ancestor)
837                                    {
838                                        progress = true;
839                                    }
840                                }
841                                if !progress && links_to_delete.insert(*link) {
842                                    progress = true;
843                                }
844                                if !progress {
845                                    return Err(LbErrKind::Unexpected(format!(
846                                    "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
847                                    link, shared_ancestor
848                                )).into());
849                                }
850                            }
851                            ValidationFailure::DuplicateLink { target } => {
852                                // delete local link with this target
853                                let mut progress = false;
854                                if let Some(link) = local.linked_by(target)? {
855                                    if links_to_delete.insert(link) {
856                                        progress = true;
857                                    }
858                                }
859                                if !progress {
860                                    return Err(LbErrKind::Unexpected(format!(
861                                        "sync failed to resolve duplicate link: target: {:?}",
862                                        target
863                                    ))
864                                    .into());
865                                }
866                            }
867                            ValidationFailure::BrokenLink(link) => {
868                                // delete local link with this target
869                                if !links_to_delete.insert(*link) {
870                                    return Err(LbErrKind::Unexpected(format!(
871                                        "sync failed to resolve broken link: {:?}",
872                                        link
873                                    ))
874                                    .into());
875                                }
876                            }
877                            ValidationFailure::OwnedLink(link) => {
878                                // if target is newly owned, unmove target, otherwise delete link
879                                let mut progress = false;
880                                if let Some(remote_link) = remote.maybe_find(link) {
881                                    if let FileType::Link { target } = remote_link.file_type() {
882                                        let remote_target = remote.find(&target)?;
883                                        if remote_target.owner() != me
884                                            && files_to_unmove.insert(target)
885                                        {
886                                            progress = true;
887                                        }
888                                    }
889                                }
890                                if !progress && links_to_delete.insert(*link) {
891                                    progress = true;
892                                }
893                                if !progress {
894                                    return Err(LbErrKind::Unexpected(format!(
895                                        "sync failed to resolve owned link: {:?}",
896                                        link
897                                    ))
898                                    .into());
899                                }
900                            }
901                            // merge changeset has unexpected validation errors
902                            ValidationFailure::Orphan(_)
903                            | ValidationFailure::NonFolderWithChildren(_)
904                            | ValidationFailure::FileWithDifferentOwnerParent(_)
905                            | ValidationFailure::FileNameTooLong(_)
906                            | ValidationFailure::DeletedFileUpdated(_)
907                            | ValidationFailure::NonDecryptableFileName(_) => {
908                                validate_result?;
909                            }
910                        },
911                        // merge changeset has unexpected errors
912                        _ => {
913                            validate_result?;
914                        }
915                    },
916                }
917            }
918        };
919
920        // base = remote; local = merge
921        (&mut db.base_metadata)
922            .to_staged(remote_changes.clone())
923            .to_lazy()
924            .promote()?;
925        db.local_metadata.clear()?;
926        (&mut db.local_metadata)
927            .to_staged(merge_changes)
928            .to_lazy()
929            .promote()?;
930
931        // todo who else calls this did they manage locks right?
932        // self.cleanup_local_metadata()?;
933        db.base_metadata.stage(&mut db.local_metadata).prune()?;
934
935        if start.elapsed() > std::time::Duration::from_millis(100) {
936            warn!("sync merge held lock for {:?}", start.elapsed());
937        }
938
939        Ok(())
940    }
941
942    /// Updates remote and base metadata to local.
943    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
944        ctx.msg("Pushing tree changes...");
945        let mut updates = vec![];
946        let mut local_changes_no_digests = Vec::new();
947
948        let tx = self.ro_tx().await;
949        let db = tx.db();
950
951        // remote = local
952        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
953
954        for id in local.tree.staged.ids() {
955            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
956            let maybe_base_file = local.tree.base.maybe_find(&id);
957
958            // change everything but document hmac and re-sign
959            local_change.document_hmac =
960                maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
961            let local_change = local_change.sign(&self.keychain)?;
962
963            local_changes_no_digests.push(local_change.clone());
964            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
965            updates.push(file_diff);
966        }
967
968        drop(tx);
969
970        if !updates.is_empty() {
971            self.client
972                .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
973                .await?;
974            ctx.pushed_metas = updates;
975        }
976
977        let mut tx = self.begin_tx().await;
978        let db = tx.db();
979
980        // base = local
981        (&mut db.base_metadata)
982            .to_lazy()
983            .stage(local_changes_no_digests)
984            .promote()?;
985        db.base_metadata.stage(&mut db.local_metadata).prune()?;
986
987        tx.end();
988
989        Ok(())
990    }
991
992    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
993    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
994        ctx.msg("Pushing document changes...");
995        let mut updates = vec![];
996        let mut local_changes_digests_only = vec![];
997
998        let tx = self.ro_tx().await;
999        let db = tx.db();
1000        let start = Instant::now();
1001
1002        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1003
1004        for id in local.tree.staged.ids() {
1005            let base_file = local.tree.base.find(&id)?.clone();
1006
1007            // change only document hmac and re-sign
1008            let mut local_change = base_file.timestamped_value.value.clone();
1009            local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1010
1011            if base_file.document_hmac() == local_change.document_hmac()
1012                || local_change.document_hmac.is_none()
1013            {
1014                continue;
1015            }
1016
1017            let local_change = local_change.sign(&self.keychain)?;
1018
1019            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1020            local_changes_digests_only.push(local_change);
1021            self.events.sync(SyncIncrement::PushingDocument(id, true));
1022        }
1023
1024        drop(tx);
1025        if start.elapsed() > std::time::Duration::from_millis(100) {
1026            warn!("sync push_docs held lock for {:?}", start.elapsed());
1027        }
1028
1029        let docs_count = updates.len();
1030        ctx.total += docs_count;
1031        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1032
1033        let mut stream = stream::iter(futures).buffer_unordered(
1034            thread::available_parallelism()
1035                .unwrap_or(NonZeroUsize::new(4).unwrap())
1036                .into(),
1037        );
1038
1039        let mut idx = 0;
1040        while let Some(fut) = stream.next().await {
1041            let id = fut?;
1042            self.events.sync(SyncIncrement::PushingDocument(id, false));
1043            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1044            idx += 1;
1045        }
1046        ctx.pushed_docs = updates;
1047
1048        let mut tx = self.begin_tx().await;
1049        let db = tx.db();
1050        // base = local (metadata)
1051        (&mut db.base_metadata)
1052            .to_lazy()
1053            .stage(local_changes_digests_only)
1054            .promote()?;
1055
1056        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1057
1058        tx.end();
1059
1060        Ok(())
1061    }
1062
1063    async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1064        let id = *diff.new.id();
1065        let hmac = diff.new.document_hmac();
1066        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1067        self.client
1068            .request(
1069                self.get_account()?,
1070                ChangeDocRequest { diff, new_content: local_document_change },
1071            )
1072            .await?;
1073
1074        Ok(id)
1075    }
1076
1077    async fn dedup(
1078        &self, updates: GetUpdatesResponse,
1079    ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1080        let tx = self.ro_tx().await;
1081        let db = tx.db();
1082
1083        let mut root_id = None;
1084        let (remote_changes, update_as_of) = {
1085            let mut remote_changes = updates.file_metadata;
1086            let update_as_of = updates.as_of_metadata_version;
1087
1088            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1089
1090            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1091
1092            let (_, remote_changes) = remote.unstage();
1093            (remote_changes, update_as_of)
1094        };
1095
1096        // initialize root if this is the first pull on this device
1097        if db.root.get().is_none() {
1098            let root = remote_changes
1099                .all_files()?
1100                .into_iter()
1101                .find(|f| f.is_root())
1102                .ok_or(LbErrKind::RootNonexistent)?;
1103            root_id = Some(*root.id());
1104        }
1105
1106        Ok((remote_changes, update_as_of, root_id))
1107    }
1108
1109    async fn prune_remote_orphans(
1110        &self, remote_changes: Vec<SignedFile>,
1111    ) -> LbResult<Vec<SignedFile>> {
1112        let tx = self.ro_tx().await;
1113        let db = tx.db();
1114
1115        let me = Owner(self.keychain.get_pk()?);
1116        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1117        let mut result = Vec::new();
1118
1119        for id in remote.tree.staged.ids() {
1120            let meta = remote.find(&id)?;
1121            if remote.maybe_find_parent(meta).is_some()
1122                || meta
1123                    .user_access_keys()
1124                    .iter()
1125                    .any(|k| k.encrypted_for == me.0)
1126            {
1127                result.push(remote.find(&id)?.clone()); // todo: don't clone
1128            }
1129        }
1130        Ok(result)
1131    }
1132
1133    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1134        ctx.msg("Cleaning up...");
1135        let mut tx = self.begin_tx().await;
1136        let db = tx.db();
1137        db.last_synced.insert(ctx.update_as_of as i64)?;
1138
1139        if let Some(root) = ctx.root {
1140            db.root.insert(root)?;
1141        }
1142
1143        Ok(())
1144    }
1145
1146    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1147        let tx = self.ro_tx().await;
1148        let db = tx.db();
1149        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1150
1151        Ok(self.get_timestamp_human_string(last_synced))
1152    }
1153
1154    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1155        if timestamp != 0 {
1156            Duration::milliseconds(clock::get_time().0 - timestamp)
1157                .format_human()
1158                .to_string()
1159        } else {
1160            "never".to_string()
1161        }
1162    }
1163}
1164
1165impl SyncContext {
1166    fn summarize(&self) -> SyncStatus {
1167        let mut local = HashSet::new();
1168        let mut server = HashSet::new();
1169        let mut work_units = vec![];
1170
1171        for meta in &self.pushed_metas {
1172            local.insert(meta.new.id());
1173        }
1174
1175        for meta in &self.pushed_docs {
1176            local.insert(meta.new.id());
1177        }
1178
1179        for meta in &self.remote_changes {
1180            server.insert(meta.id());
1181        }
1182
1183        for id in local {
1184            work_units.push(WorkUnit::LocalChange(*id));
1185        }
1186
1187        for id in server {
1188            work_units.push(WorkUnit::ServerChange(*id));
1189        }
1190
1191        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1192    }
1193
1194    fn msg(&mut self, msg: &str) {
1195        self.current += 1;
1196        if let Some(f) = &self.progress {
1197            f(SyncProgress {
1198                total: self.total,
1199                progress: self.current,
1200                file_being_processed: Default::default(),
1201                msg: msg.to_string(),
1202            })
1203        }
1204    }
1205
1206    fn file_msg(&mut self, id: Uuid, msg: &str) {
1207        self.current += 1;
1208        if let Some(f) = &self.progress {
1209            f(SyncProgress {
1210                total: self.total,
1211                progress: self.current,
1212                file_being_processed: Some(id),
1213                msg: msg.to_string(),
1214            })
1215        }
1216    }
1217
1218    fn done_msg(&mut self) {
1219        self.current = self.total;
1220        if let Some(f) = &self.progress {
1221            f(SyncProgress {
1222                total: self.total,
1223                progress: self.current,
1224                file_being_processed: None,
1225                msg: "Sync successful!".to_string(),
1226            })
1227        }
1228    }
1229}
1230
1231#[derive(Debug, Serialize, Clone)]
1232pub struct SyncStatus {
1233    pub work_units: Vec<WorkUnit>,
1234    pub latest_server_ts: u64,
1235}
1236
1237#[derive(Clone)]
1238pub struct SyncProgress {
1239    pub total: usize,
1240    pub progress: usize,
1241    pub file_being_processed: Option<Uuid>,
1242    pub msg: String,
1243}
1244
1245impl Display for SyncProgress {
1246    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1247        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1248    }
1249}
1250
1251#[derive(Debug, Clone)]
1252pub enum SyncIncrement {
1253    SyncStarted,
1254    PullingDocument(Uuid, bool),
1255    PushingDocument(Uuid, bool),
1256    SyncFinished(Option<LbErrKind>),
1257}