lb_rs/service/
sync.rs

1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5    ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6    GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41    current: usize,
42    total: usize,
43
44    pk_cache: HashMap<Owner, String>,
45    last_synced: u64,
46    remote_changes: Vec<SignedMeta>,
47    update_as_of: u64,
48
49    /// is this the sync that populated root?
50    new_root: Option<Uuid>,
51    pushed_metas: Vec<FileDiff<SignedMeta>>,
52    pushed_docs: Vec<FileDiff<SignedMeta>>,
53    pulled_docs: Vec<Uuid>,
54}
55
56impl Lb {
57    #[instrument(level = "debug", skip_all, err(Debug))]
58    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
59        let tx = self.ro_tx().await;
60        let db = tx.db();
61        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
62        drop(tx);
63
64        let remote_changes = self
65            .client
66            .request(
67                self.get_account()?,
68                GetUpdatesRequestV2 { since_metadata_version: last_synced },
69            )
70            .await?;
71        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
72        let remote_dirty = deduped
73            .into_iter()
74            .map(|f| *f.id())
75            .map(WorkUnit::ServerChange);
76
77        self.prune().await?;
78
79        let tx = self.ro_tx().await;
80        let db = tx.db();
81
82        let locally_dirty = db
83            .local_metadata
84            .get()
85            .keys()
86            .copied()
87            .map(WorkUnit::LocalChange);
88
89        let mut work_units: Vec<WorkUnit> = Vec::new();
90        work_units.extend(locally_dirty.chain(remote_dirty));
91        Ok(SyncStatus { work_units, latest_server_ts })
92    }
93
94    #[instrument(level = "debug", skip_all, err(Debug))]
95    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
96        let old = self.syncing.swap(true, Ordering::SeqCst);
97        if old {
98            return Err(LbErrKind::AlreadySyncing.into());
99        }
100
101        let mut ctx = self.setup_sync(f).await?;
102
103        let mut got_updates = false;
104        let mut pipeline: LbResult<()> = async {
105            ctx.msg("Preparing Sync..."); // todo remove
106            self.events.sync(SyncIncrement::SyncStarted);
107            self.prune().await?;
108            got_updates = self.fetch_meta(&mut ctx).await?;
109            self.populate_pk_cache(&mut ctx).await?;
110            self.docs.dont_delete.store(true, Ordering::SeqCst);
111            self.fetch_docs(&mut ctx).await?;
112            self.merge(&mut ctx).await?;
113            self.push_meta(&mut ctx).await?;
114            self.push_docs(&mut ctx).await?;
115            Ok(())
116        }
117        .await;
118
119        self.docs.dont_delete.store(false, Ordering::SeqCst);
120
121        if pipeline.is_ok() {
122            pipeline = self.commit_last_synced(&mut ctx).await;
123        }
124
125        let cleanup = self.cleanup().await;
126
127        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
128        self.events.sync(SyncIncrement::SyncFinished(ekind));
129
130        self.syncing.store(false, Ordering::Relaxed);
131        pipeline?;
132        cleanup?;
133
134        // done not being sent if pipeline is an error is likely the reason we get stuck offline
135        ctx.done_msg();
136
137        if got_updates {
138            // did it?
139            self.events.meta_changed();
140            let owner = self.keychain.get_pk().map(Owner).ok();
141            // this is overly agressive as it'll notify on shares that have been accepted
142            // another strategy could be to diff the changes coming before and after a sync
143            if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
144                self.events.pending_shares_changed();
145            }
146            for id in &ctx.pulled_docs {
147                self.events.doc_written(*id, Some(Actor::Sync));
148            }
149        }
150
151        Ok(ctx.summarize())
152    }
153
154    async fn setup_sync(
155        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
156    ) -> LbResult<SyncContext> {
157        let tx = self.ro_tx().await;
158        let db = tx.db();
159
160        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
161        let pk_cache = db.pub_key_lookup.get().clone();
162
163        let current = 0;
164        let total = 7;
165
166        Ok(SyncContext {
167            last_synced,
168            pk_cache,
169
170            progress,
171            current,
172            total,
173
174            new_root: Default::default(),
175            update_as_of: Default::default(),
176            remote_changes: Default::default(),
177            pushed_docs: Default::default(),
178            pushed_metas: Default::default(),
179            pulled_docs: Default::default(),
180        })
181    }
182
183    async fn prune(&self) -> LbResult<()> {
184        let server_ids = self
185            .client
186            .request(self.get_account()?, GetFileIdsRequest {})
187            .await?
188            .ids;
189
190        let mut tx = self.begin_tx().await;
191        let db = tx.db();
192
193        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
194        let base_ids = local.tree.base.ids();
195
196        let mut prunable_ids = base_ids;
197        prunable_ids.retain(|id| !server_ids.contains(id));
198        for id in prunable_ids.clone() {
199            prunable_ids.extend(local.descendants(&id)?.into_iter());
200        }
201        for id in &prunable_ids {
202            if let Some(base_file) = local.tree.base.maybe_find(id) {
203                self.docs
204                    .delete(*id, base_file.document_hmac().copied())
205                    .await?;
206            }
207            if let Some(local_file) = local.maybe_find(id) {
208                self.docs
209                    .delete(*id, local_file.document_hmac().copied())
210                    .await?;
211            }
212        }
213
214        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
215        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
216        base_staged.promote()?;
217
218        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
219        local_staged.tree.removed = prunable_ids.into_iter().collect();
220        local_staged.promote()?;
221
222        Ok(())
223    }
224
225    /// Returns true if there were any updates
226    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
227        ctx.msg("Fetching tree updates...");
228        let updates = self
229            .client
230            .request(
231                self.get_account()?,
232                GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
233            )
234            .await?;
235
236        let empty = updates.file_metadata.is_empty();
237        let (remote, as_of, root) = self.dedup(updates).await?;
238
239        ctx.remote_changes = remote;
240        ctx.update_as_of = as_of;
241        ctx.new_root = root;
242
243        Ok(!empty)
244    }
245
246    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
247        ctx.msg("Updating public key cache...");
248        let mut all_owners = HashSet::new();
249        for file in &ctx.remote_changes {
250            for user_access_key in file.user_access_keys() {
251                all_owners.insert(Owner(user_access_key.encrypted_by));
252                all_owners.insert(Owner(user_access_key.encrypted_for));
253            }
254        }
255
256        let mut new_entries = HashMap::new();
257
258        for owner in all_owners {
259            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
260                let username_result = self
261                    .client
262                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
263                    .await;
264                let username = match username_result {
265                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
266                        "<unknown>".to_string()
267                    }
268                    _ => username_result?.username.clone(),
269                };
270                new_entries.insert(owner, username.clone());
271                e.insert(username.clone());
272            }
273        }
274
275        let mut tx = self.begin_tx().await;
276        let db = tx.db();
277
278        for (owner, username) in new_entries {
279            db.pub_key_lookup.insert(owner, username)?;
280        }
281        Ok(())
282    }
283
284    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
285        ctx.msg("Fetching documents...");
286        let mut docs_to_pull = vec![];
287
288        let tx = self.ro_tx().await;
289        let db = tx.db();
290        let start = Instant::now();
291
292        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
293        for id in remote.tree.staged.ids() {
294            if remote.calculate_deleted(&id)? {
295                continue;
296            }
297            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
298            let base_hmac = remote
299                .tree
300                .base
301                .maybe_find(&id)
302                .and_then(|f| f.document_hmac())
303                .cloned();
304            if base_hmac == remote_hmac {
305                continue;
306            }
307
308            if let Some(remote_hmac) = remote_hmac {
309                docs_to_pull.push((id, remote_hmac));
310                self.events.sync(SyncIncrement::PullingDocument(id, true));
311            }
312        }
313
314        drop(tx);
315        if start.elapsed() > std::time::Duration::from_millis(100) {
316            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
317        }
318
319        let num_docs = docs_to_pull.len();
320        ctx.total += num_docs;
321
322        let futures = docs_to_pull
323            .into_iter()
324            .map(|(id, hmac)| self.fetch_doc(id, hmac));
325
326        let mut stream = stream::iter(futures).buffer_unordered(
327            thread::available_parallelism()
328                .unwrap_or(NonZeroUsize::new(4).unwrap())
329                .into(),
330        );
331
332        let mut idx = 0;
333        while let Some(fut) = stream.next().await {
334            let id = fut?;
335            ctx.pulled_docs.push(id);
336            self.events.sync(SyncIncrement::PullingDocument(id, false));
337            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
338            idx += 1;
339        }
340        Ok(())
341    }
342
343    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
344        let remote_document = self
345            .client
346            .request(self.get_account()?, GetDocRequest { id, hmac })
347            .await?;
348        self.docs
349            .insert(id, Some(hmac), &remote_document.content)
350            .await?;
351
352        Ok(id)
353    }
354
355    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
356    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
357    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
358        let mut tx = self.begin_tx().await;
359        let db = tx.db();
360        let start = Instant::now();
361
362        let remote_changes = &ctx.remote_changes;
363
364        // fetch document updates and local documents for merge
365        let me = Owner(self.keychain.get_pk()?);
366
367        // compute merge changes
368        let merge_changes = {
369            // assemble trees
370            let mut base = (&db.base_metadata).to_lazy();
371            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
372            let mut remote = remote_unlazy.as_lazy();
373            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
374
375            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
376            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
377            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
378            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
379            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
380            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
381
382            'merge_construction: loop {
383                // process just the edits which allow us to check deletions in the result
384                let mut deletions = {
385                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
386
387                    // creations
388                    let mut deletion_creations = HashSet::new();
389                    for id in db.local_metadata.ids() {
390                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
391                            deletion_creations.insert(id);
392                        }
393                    }
394                    'drain_creations: while !deletion_creations.is_empty() {
395                        'choose_a_creation: for id in &deletion_creations {
396                            // create
397                            let id = *id;
398                            let local_file = local.find(&id)?.clone();
399                            let result = deletions.create_unvalidated(
400                                id,
401                                symkey::generate_key(),
402                                local_file.parent(),
403                                &local.name(&id, &self.keychain)?,
404                                local_file.file_type(),
405                                &self.keychain,
406                            );
407                            match result {
408                                Ok(_) => {
409                                    deletion_creations.remove(&id);
410                                    continue 'drain_creations;
411                                }
412                                Err(ref err) => match err.kind {
413                                    LbErrKind::FileParentNonexistent => {
414                                        continue 'choose_a_creation;
415                                    }
416                                    _ => {
417                                        result?;
418                                    }
419                                },
420                            }
421                        }
422                        return Err(LbErrKind::Unexpected(format!(
423                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
424                        ))
425                        .into());
426                    }
427
428                    // moves (creations happen first in case a file is moved into a new folder)
429                    for id in db.local_metadata.ids() {
430                        let local_file = local.find(&id)?.clone();
431                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
432                            if !local_file.explicitly_deleted()
433                                && local_file.parent() != base_file.parent()
434                                && !files_to_unmove.contains(&id)
435                            {
436                                // move
437                                deletions.move_unvalidated(
438                                    &id,
439                                    local_file.parent(),
440                                    &self.keychain,
441                                )?;
442                            }
443                        }
444                    }
445
446                    // deletions (moves happen first in case a file is moved into a deleted folder)
447                    for id in db.local_metadata.ids() {
448                        let local_file = local.find(&id)?.clone();
449                        if local_file.explicitly_deleted() {
450                            // delete
451                            deletions.delete_unvalidated(&id, &self.keychain)?;
452                        }
453                    }
454                    deletions
455                };
456
457                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
458                let mut merge = {
459                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
460
461                    // creations and edits of created documents
462                    let mut creations = HashSet::new();
463                    for id in db.local_metadata.ids() {
464                        if deletions.maybe_find(&id).is_some()
465                            && !deletions.calculate_deleted(&id)?
466                            && remote.maybe_find(&id).is_none()
467                            && !links_to_delete.contains(&id)
468                        {
469                            creations.insert(id);
470                        }
471                    }
472                    'drain_creations: while !creations.is_empty() {
473                        'choose_a_creation: for id in &creations {
474                            // create
475                            let id = *id;
476                            let local_file = local.find(&id)?.clone();
477                            let result = merge.create_unvalidated(
478                                id,
479                                local.decrypt_key(&id, &self.keychain)?,
480                                local_file.parent(),
481                                &local.name(&id, &self.keychain)?,
482                                local_file.file_type(),
483                                &self.keychain,
484                            );
485                            match result {
486                                Ok(_) => {
487                                    creations.remove(&id);
488                                    continue 'drain_creations;
489                                }
490                                Err(ref err) => match err.kind {
491                                    LbErrKind::FileParentNonexistent => {
492                                        continue 'choose_a_creation;
493                                    }
494                                    _ => {
495                                        result?;
496                                    }
497                                },
498                            }
499                        }
500                        return Err(LbErrKind::Unexpected(format!(
501                            "sync failed to find a topomodelal order for file creations: {creations:?}"
502                        ))
503                        .into());
504                    }
505
506                    // moves, renames, edits, and shares
507                    // creations happen first in case a file is moved into a new folder
508                    for id in db.local_metadata.ids() {
509                        // skip files that are already deleted or will be deleted
510                        if deletions.maybe_find(&id).is_none()
511                            || deletions.calculate_deleted(&id)?
512                            || (remote.maybe_find(&id).is_some()
513                                && remote.calculate_deleted(&id)?)
514                        {
515                            continue;
516                        }
517
518                        let local_file = local.find(&id)?.clone();
519                        let local_name = local.name(&id, &self.keychain)?;
520                        let maybe_base_file = base.maybe_find(&id).cloned();
521                        let maybe_remote_file = remote.maybe_find(&id).cloned();
522                        if let Some(ref base_file) = maybe_base_file {
523                            let base_name = base.name(&id, &self.keychain)?;
524                            let remote_file = remote.find(&id)?.clone();
525                            let remote_name = remote.name(&id, &self.keychain)?;
526
527                            // move
528                            if local_file.parent() != base_file.parent()
529                                && remote_file.parent() == base_file.parent()
530                                && !files_to_unmove.contains(&id)
531                            {
532                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
533                            }
534
535                            // rename
536                            if local_name != base_name && remote_name == base_name {
537                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
538                            }
539                        }
540
541                        // share
542                        let mut remote_keys = HashMap::new();
543                        if let Some(ref remote_file) = maybe_remote_file {
544                            for key in remote_file.user_access_keys() {
545                                remote_keys.insert(
546                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
547                                    (key.mode, key.deleted),
548                                );
549                            }
550                        }
551                        for key in local_file.user_access_keys() {
552                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
553                            if let Some(&(remote_mode, remote_deleted)) =
554                                remote_keys.get(&(by, for_))
555                            {
556                                // upgrade share
557                                if key.mode > remote_mode || !key.deleted && remote_deleted {
558                                    let mode = match key.mode {
559                                        UserAccessMode::Read => ShareMode::Read,
560                                        UserAccessMode::Write => ShareMode::Write,
561                                        UserAccessMode::Owner => continue,
562                                    };
563                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
564                                }
565                                // delete share
566                                if key.deleted && !remote_deleted {
567                                    merge.delete_share_unvalidated(
568                                        &id,
569                                        Some(for_.0),
570                                        &self.keychain,
571                                    )?;
572                                }
573                            } else {
574                                // add share
575                                let mode = match key.mode {
576                                    UserAccessMode::Read => ShareMode::Read,
577                                    UserAccessMode::Write => ShareMode::Write,
578                                    UserAccessMode::Owner => continue,
579                                };
580                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
581                            }
582                        }
583
584                        // share deletion due to conflicts
585                        if files_to_unshare.contains(&id) {
586                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
587                        }
588
589                        // rename due to path conflict
590                        if let Some(&rename_increment) = rename_increments.get(&id) {
591                            let name = NameComponents::from(&local_name)
592                                .generate_incremented(rename_increment)
593                                .to_name();
594                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
595                        }
596
597                        // edit
598                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
599                        let remote_hmac =
600                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
601                        let local_hmac = local_file.document_hmac().cloned();
602                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
603                            && local_hmac != base_hmac
604                        {
605                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
606                                // merge
607                                let merge_name = merge.name(&id, &self.keychain)?;
608                                let document_type =
609                                    DocumentType::from_file_name_using_extension(&merge_name);
610
611                                // todo these accesses are potentially problematic
612                                // maybe not if service/docs is the persion doing network io
613                                let base_document =
614                                    self.read_document_helper(id, &mut base).await?;
615                                let remote_document =
616                                    self.read_document_helper(id, &mut remote).await?;
617                                let local_document =
618                                    self.read_document_helper(id, &mut local).await?;
619
620                                match document_type {
621                                    DocumentType::Text => {
622                                        // 3-way merge
623                                        // todo: a couple more clones than necessary
624                                        let base_document =
625                                            String::from_utf8_lossy(&base_document).to_string();
626                                        let remote_document =
627                                            String::from_utf8_lossy(&remote_document).to_string();
628                                        let local_document =
629                                            String::from_utf8_lossy(&local_document).to_string();
630                                        let merged_document = Buffer::from(base_document.as_str())
631                                            .merge(local_document, remote_document);
632                                        let encrypted_document = merge
633                                            .update_document_unvalidated(
634                                                &id,
635                                                &merged_document.into_bytes(),
636                                                &self.keychain,
637                                            )?;
638                                        let hmac = merge.find(&id)?.document_hmac().copied();
639                                        self.docs.insert(id, hmac, &encrypted_document).await?;
640                                    }
641                                    DocumentType::Drawing => {
642                                        let base_document =
643                                            String::from_utf8_lossy(&base_document).to_string();
644                                        let remote_document =
645                                            String::from_utf8_lossy(&remote_document).to_string();
646                                        let local_document =
647                                            String::from_utf8_lossy(&local_document).to_string();
648
649                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
650                                        let remote_buffer =
651                                            svg::buffer::Buffer::new(&remote_document);
652                                        let mut local_buffer =
653                                            svg::buffer::Buffer::new(&local_document);
654
655                                        for (_, el) in local_buffer.elements.iter_mut() {
656                                            if let Element::Path(path) = el {
657                                                path.data.apply_transform(u_transform_to_bezier(
658                                                    &Transform::from(
659                                                        local_buffer
660                                                            .weak_viewport_settings
661                                                            .master_transform,
662                                                    ),
663                                                ));
664                                            }
665                                        }
666                                        svg::buffer::Buffer::reload(
667                                            &mut local_buffer.elements,
668                                            &mut local_buffer.weak_images,
669                                            &mut local_buffer.weak_path_pressures,
670                                            &mut local_buffer.weak_viewport_settings,
671                                            &base_buffer,
672                                            &remote_buffer,
673                                        );
674
675                                        let merged_document = local_buffer.serialize();
676                                        let encrypted_document = merge
677                                            .update_document_unvalidated(
678                                                &id,
679                                                &merged_document.into_bytes(),
680                                                &self.keychain,
681                                            )?;
682                                        let hmac = merge.find(&id)?.document_hmac().copied();
683                                        self.docs.insert(id, hmac, &encrypted_document).await?;
684                                    }
685                                    DocumentType::Other => {
686                                        // duplicate file
687                                        let merge_parent = *merge.find(&id)?.parent();
688                                        let duplicate_id = if let Some(&duplicate_id) =
689                                            duplicate_file_ids.get(&id)
690                                        {
691                                            duplicate_id
692                                        } else {
693                                            let duplicate_id = Uuid::new_v4();
694                                            duplicate_file_ids.insert(id, duplicate_id);
695                                            rename_increments.insert(duplicate_id, 1);
696                                            duplicate_id
697                                        };
698
699                                        let mut merge_name = merge_name;
700                                        merge_name = NameComponents::from(&merge_name)
701                                            .generate_incremented(
702                                                rename_increments
703                                                    .get(&duplicate_id)
704                                                    .copied()
705                                                    .unwrap_or_default(),
706                                            )
707                                            .to_name();
708
709                                        merge.create_unvalidated(
710                                            duplicate_id,
711                                            symkey::generate_key(),
712                                            &merge_parent,
713                                            &merge_name,
714                                            FileType::Document,
715                                            &self.keychain,
716                                        )?;
717                                        let encrypted_document = merge
718                                            .update_document_unvalidated(
719                                                &duplicate_id,
720                                                &local_document,
721                                                &self.keychain,
722                                            )?;
723                                        let duplicate_hmac =
724                                            merge.find(&duplicate_id)?.document_hmac().copied();
725                                        self.docs
726                                            .insert(
727                                                duplicate_id,
728                                                duplicate_hmac,
729                                                &encrypted_document,
730                                            )
731                                            .await?;
732                                    }
733                                }
734                            } else {
735                                // overwrite (todo: avoid reading/decrypting/encrypting document)
736                                let document = self.read_document_helper(id, &mut local).await?;
737                                merge.update_document_unvalidated(
738                                    &id,
739                                    &document,
740                                    &self.keychain,
741                                )?;
742                            }
743                        }
744                    }
745
746                    // deletes
747                    // moves happen first in case a file is moved into a deleted folder
748                    for id in db.local_metadata.ids() {
749                        if db.base_metadata.maybe_find(&id).is_some()
750                            && deletions.calculate_deleted(&id)?
751                            && !merge.calculate_deleted(&id)?
752                        {
753                            // delete
754                            merge.delete_unvalidated(&id, &self.keychain)?;
755                        }
756                    }
757                    for &id in &links_to_delete {
758                        // delete
759                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
760                            merge.delete_unvalidated(&id, &self.keychain)?;
761                        }
762                    }
763
764                    merge
765                };
766
767                // validate; handle failures by introducing changeset constraints
768                for link in merge.ids() {
769                    if !merge.calculate_deleted(&link)? {
770                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
771                            if merge.maybe_find(&target).is_some()
772                                && merge.calculate_deleted(&target)?
773                            {
774                                // delete links to deleted files
775                                if links_to_delete.insert(link) {
776                                    continue 'merge_construction;
777                                } else {
778                                    return Err(LbErrKind::Unexpected(format!(
779                                        "sync failed to resolve broken link (deletion): {link:?}"
780                                    ))
781                                    .into());
782                                }
783                            }
784                        }
785                    }
786                }
787
788                let validate_result = merge.validate(me);
789                match validate_result {
790                    // merge changeset is valid
791                    Ok(_) => {
792                        let (_, merge_changes) = merge.unstage();
793                        break merge_changes;
794                    }
795                    Err(ref err) => match err.kind {
796                        LbErrKind::Validation(ref vf) => match vf {
797                            // merge changeset has resolvable validation errors and needs modification
798                            ValidationFailure::Cycle(ids) => {
799                                // revert all local moves in the cycle
800                                let mut progress = false;
801                                for &id in ids {
802                                    if db.local_metadata.maybe_find(&id).is_some()
803                                        && files_to_unmove.insert(id)
804                                    {
805                                        progress = true;
806                                    }
807                                }
808                                if !progress {
809                                    return Err(LbErrKind::Unexpected(format!(
810                                        "sync failed to resolve cycle: {ids:?}"
811                                    ))
812                                    .into());
813                                }
814                            }
815                            ValidationFailure::PathConflict(ids) => {
816                                // pick one local id and generate a non-conflicting filename
817                                let mut progress = false;
818                                for &id in ids {
819                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
820                                        *rename_increments.entry(id).or_insert(0) += 1;
821                                        progress = true;
822                                        break;
823                                    }
824                                }
825                                if !progress {
826                                    for &id in ids {
827                                        if db.local_metadata.maybe_find(&id).is_some() {
828                                            *rename_increments.entry(id).or_insert(0) += 1;
829                                            progress = true;
830                                            break;
831                                        }
832                                    }
833                                }
834                                if !progress {
835                                    return Err(LbErrKind::Unexpected(format!(
836                                        "sync failed to resolve path conflict: {ids:?}"
837                                    ))
838                                    .into());
839                                }
840                            }
841                            ValidationFailure::SharedLink { link, shared_ancestor } => {
842                                // if ancestor is newly shared, delete share, otherwise delete link
843                                let mut progress = false;
844                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
845                                {
846                                    if !base_shared_ancestor.is_shared()
847                                        && files_to_unshare.insert(*shared_ancestor)
848                                    {
849                                        progress = true;
850                                    }
851                                }
852                                if !progress && links_to_delete.insert(*link) {
853                                    progress = true;
854                                }
855                                if !progress {
856                                    return Err(LbErrKind::Unexpected(format!(
857                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
858                                )).into());
859                                }
860                            }
861                            ValidationFailure::DuplicateLink { target } => {
862                                // delete local link with this target
863                                let mut progress = false;
864                                if let Some(link) = local.linked_by(target)? {
865                                    if links_to_delete.insert(link) {
866                                        progress = true;
867                                    }
868                                }
869                                if !progress {
870                                    return Err(LbErrKind::Unexpected(format!(
871                                        "sync failed to resolve duplicate link: target: {target:?}"
872                                    ))
873                                    .into());
874                                }
875                            }
876                            ValidationFailure::BrokenLink(link) => {
877                                // delete local link with this target
878                                if !links_to_delete.insert(*link) {
879                                    return Err(LbErrKind::Unexpected(format!(
880                                        "sync failed to resolve broken link: {link:?}"
881                                    ))
882                                    .into());
883                                }
884                            }
885                            ValidationFailure::OwnedLink(link) => {
886                                // if target is newly owned, unmove target, otherwise delete link
887                                let mut progress = false;
888                                if let Some(remote_link) = remote.maybe_find(link) {
889                                    if let FileType::Link { target } = remote_link.file_type() {
890                                        let remote_target = remote.find(&target)?;
891                                        if remote_target.owner() != me
892                                            && files_to_unmove.insert(target)
893                                        {
894                                            progress = true;
895                                        }
896                                    }
897                                }
898                                if !progress && links_to_delete.insert(*link) {
899                                    progress = true;
900                                }
901                                if !progress {
902                                    return Err(LbErrKind::Unexpected(format!(
903                                        "sync failed to resolve owned link: {link:?}"
904                                    ))
905                                    .into());
906                                }
907                            }
908                            // merge changeset has unexpected validation errors
909                            ValidationFailure::Orphan(_)
910                            | ValidationFailure::NonFolderWithChildren(_)
911                            | ValidationFailure::FileWithDifferentOwnerParent(_)
912                            | ValidationFailure::FileNameTooLong(_)
913                            | ValidationFailure::DeletedFileUpdated(_)
914                            | ValidationFailure::NonDecryptableFileName(_) => {
915                                validate_result?;
916                            }
917                        },
918                        // merge changeset has unexpected errors
919                        _ => {
920                            validate_result?;
921                        }
922                    },
923                }
924            }
925        };
926
927        // base = remote; local = merge
928        (&mut db.base_metadata)
929            .to_staged(remote_changes.clone())
930            .to_lazy()
931            .promote()?;
932        db.local_metadata.clear()?;
933        (&mut db.local_metadata)
934            .to_staged(merge_changes)
935            .to_lazy()
936            .promote()?;
937
938        // todo who else calls this did they manage locks right?
939        // self.cleanup_local_metadata()?;
940        db.base_metadata.stage(&mut db.local_metadata).prune()?;
941
942        if start.elapsed() > std::time::Duration::from_millis(100) {
943            warn!("sync merge held lock for {:?}", start.elapsed());
944        }
945
946        Ok(())
947    }
948
949    /// Updates remote and base metadata to local.
950    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
951        ctx.msg("Pushing tree changes...");
952        let mut updates = vec![];
953        let mut local_changes_no_digests = Vec::new();
954
955        let tx = self.ro_tx().await;
956        let db = tx.db();
957
958        // remote = local
959        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
960
961        for id in local.tree.staged.ids() {
962            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
963            let maybe_base_file = local.tree.base.maybe_find(&id);
964
965            // change everything but document hmac and re-sign
966            local_change.set_hmac_and_size(
967                maybe_base_file.and_then(|f| f.document_hmac().copied()),
968                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
969            );
970            let local_change = local_change.sign(&self.keychain)?;
971
972            local_changes_no_digests.push(local_change.clone());
973            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
974            updates.push(file_diff);
975        }
976
977        drop(tx);
978
979        if !updates.is_empty() {
980            self.client
981                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
982                .await?;
983            ctx.pushed_metas = updates;
984        }
985
986        let mut tx = self.begin_tx().await;
987        let db = tx.db();
988
989        // base = local
990        (&mut db.base_metadata)
991            .to_lazy()
992            .stage(local_changes_no_digests)
993            .promote()?;
994        db.base_metadata.stage(&mut db.local_metadata).prune()?;
995
996        tx.end();
997
998        Ok(())
999    }
1000
1001    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1002    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1003        ctx.msg("Pushing document changes...");
1004        let mut updates = vec![];
1005        let mut local_changes_digests_only = vec![];
1006
1007        let tx = self.ro_tx().await;
1008        let db = tx.db();
1009        let start = Instant::now();
1010
1011        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1012
1013        for id in local.tree.staged.ids() {
1014            let base_file = local.tree.base.find(&id)?.clone();
1015
1016            // change only document hmac and re-sign
1017            let mut local_change = base_file.timestamped_value.value.clone();
1018            local_change.set_hmac_and_size(
1019                local.find(&id)?.document_hmac().copied(),
1020                *local.find(&id)?.timestamped_value.value.doc_size(),
1021            );
1022
1023            if base_file.document_hmac() == local_change.document_hmac()
1024                || local_change.document_hmac().is_none()
1025            {
1026                continue;
1027            }
1028
1029            let local_change = local_change.sign(&self.keychain)?;
1030
1031            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1032            local_changes_digests_only.push(local_change);
1033            self.events.sync(SyncIncrement::PushingDocument(id, true));
1034        }
1035
1036        drop(tx);
1037        if start.elapsed() > std::time::Duration::from_millis(100) {
1038            warn!("sync push_docs held lock for {:?}", start.elapsed());
1039        }
1040
1041        let docs_count = updates.len();
1042        ctx.total += docs_count;
1043        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1044
1045        let mut stream = stream::iter(futures).buffer_unordered(
1046            thread::available_parallelism()
1047                .unwrap_or(NonZeroUsize::new(4).unwrap())
1048                .into(),
1049        );
1050
1051        let mut idx = 0;
1052        while let Some(fut) = stream.next().await {
1053            let id = fut?;
1054            self.events.sync(SyncIncrement::PushingDocument(id, false));
1055            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1056            idx += 1;
1057        }
1058        ctx.pushed_docs = updates;
1059
1060        let mut tx = self.begin_tx().await;
1061        let db = tx.db();
1062        // base = local (metadata)
1063        (&mut db.base_metadata)
1064            .to_lazy()
1065            .stage(local_changes_digests_only)
1066            .promote()?;
1067
1068        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1069
1070        tx.end();
1071
1072        Ok(())
1073    }
1074
1075    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1076        let id = *diff.new.id();
1077        let hmac = diff.new.document_hmac();
1078        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1079        self.client
1080            .request(
1081                self.get_account()?,
1082                ChangeDocRequestV2 { diff, new_content: local_document_change },
1083            )
1084            .await?;
1085
1086        Ok(id)
1087    }
1088
1089    async fn dedup(
1090        &self, updates: GetUpdatesResponseV2,
1091    ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1092        let tx = self.ro_tx().await;
1093        let db = tx.db();
1094
1095        let mut root_id = None;
1096        let (remote_changes, update_as_of) = {
1097            let mut remote_changes = updates.file_metadata;
1098            let update_as_of = updates.as_of_metadata_version;
1099
1100            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1101
1102            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1103
1104            let (_, remote_changes) = remote.unstage();
1105            (remote_changes, update_as_of)
1106        };
1107
1108        // initialize root if this is the first pull on this device
1109        if db.root.get().is_none() {
1110            let root = remote_changes
1111                .all_files()?
1112                .into_iter()
1113                .find(|f| f.is_root())
1114                .ok_or(LbErrKind::RootNonexistent)?;
1115            root_id = Some(*root.id());
1116        }
1117
1118        Ok((remote_changes, update_as_of, root_id))
1119    }
1120
1121    async fn prune_remote_orphans(
1122        &self, remote_changes: Vec<SignedMeta>,
1123    ) -> LbResult<Vec<SignedMeta>> {
1124        let tx = self.ro_tx().await;
1125        let db = tx.db();
1126
1127        let me = Owner(self.keychain.get_pk()?);
1128        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1129        let mut result = Vec::new();
1130
1131        for id in remote.tree.staged.ids() {
1132            let meta = remote.find(&id)?;
1133            if remote.maybe_find_parent(meta).is_some()
1134                || meta
1135                    .user_access_keys()
1136                    .iter()
1137                    .any(|k| k.encrypted_for == me.0)
1138            {
1139                result.push(remote.find(&id)?.clone()); // todo: don't clone
1140            }
1141        }
1142        Ok(result)
1143    }
1144
1145    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1146        ctx.msg("Cleaning up...");
1147        let mut tx = self.begin_tx().await;
1148        let db = tx.db();
1149        db.last_synced.insert(ctx.update_as_of as i64)?;
1150
1151        if let Some(root) = ctx.new_root {
1152            db.root.insert(root)?;
1153        }
1154
1155        Ok(())
1156    }
1157
1158    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1159        let tx = self.ro_tx().await;
1160        let db = tx.db();
1161        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1162
1163        Ok(self.get_timestamp_human_string(last_synced))
1164    }
1165
1166    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1167        if timestamp != 0 {
1168            Duration::milliseconds(clock::get_time().0 - timestamp)
1169                .format_human()
1170                .to_string()
1171        } else {
1172            "never".to_string()
1173        }
1174    }
1175}
1176
1177impl SyncContext {
1178    fn summarize(&self) -> SyncStatus {
1179        let mut local = HashSet::new();
1180        let mut server = HashSet::new();
1181        let mut work_units = vec![];
1182
1183        for meta in &self.pushed_metas {
1184            local.insert(meta.new.id());
1185        }
1186
1187        for meta in &self.pushed_docs {
1188            local.insert(meta.new.id());
1189        }
1190
1191        for meta in &self.remote_changes {
1192            server.insert(meta.id());
1193        }
1194
1195        for id in local {
1196            work_units.push(WorkUnit::LocalChange(*id));
1197        }
1198
1199        for id in server {
1200            work_units.push(WorkUnit::ServerChange(*id));
1201        }
1202
1203        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1204    }
1205
1206    fn msg(&mut self, msg: &str) {
1207        self.current += 1;
1208        if let Some(f) = &self.progress {
1209            f(SyncProgress {
1210                total: self.total,
1211                progress: self.current,
1212                file_being_processed: Default::default(),
1213                msg: msg.to_string(),
1214            })
1215        }
1216    }
1217
1218    fn file_msg(&mut self, id: Uuid, msg: &str) {
1219        self.current += 1;
1220        if let Some(f) = &self.progress {
1221            f(SyncProgress {
1222                total: self.total,
1223                progress: self.current,
1224                file_being_processed: Some(id),
1225                msg: msg.to_string(),
1226            })
1227        }
1228    }
1229
1230    fn done_msg(&mut self) {
1231        self.current = self.total;
1232        if let Some(f) = &self.progress {
1233            f(SyncProgress {
1234                total: self.total,
1235                progress: self.current,
1236                file_being_processed: None,
1237                msg: "Sync successful!".to_string(),
1238            })
1239        }
1240    }
1241}
1242
1243#[derive(Debug, Serialize, Clone)]
1244pub struct SyncStatus {
1245    pub work_units: Vec<WorkUnit>,
1246    pub latest_server_ts: u64,
1247}
1248
1249#[derive(Clone)]
1250pub struct SyncProgress {
1251    pub total: usize,
1252    pub progress: usize,
1253    pub file_being_processed: Option<Uuid>,
1254    pub msg: String,
1255}
1256
1257impl Display for SyncProgress {
1258    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1259        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1260    }
1261}
1262
1263#[derive(Debug, Clone)]
1264pub enum SyncIncrement {
1265    SyncStarted,
1266    PullingDocument(Uuid, bool),
1267    PushingDocument(Uuid, bool),
1268    SyncFinished(Option<LbErrKind>),
1269}