lb_rs/service/
sync.rs

1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5    ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
6    GetUsernameError, GetUsernameRequest, UpsertRequest,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_file::SignedFile;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41    current: usize,
42    total: usize,
43
44    pk_cache: HashMap<Owner, String>,
45    last_synced: u64,
46    remote_changes: Vec<SignedFile>,
47    update_as_of: u64,
48    root: Option<Uuid>,
49    pushed_metas: Vec<FileDiff<SignedFile>>,
50    pushed_docs: Vec<FileDiff<SignedFile>>,
51    pulled_docs: Vec<Uuid>,
52}
53
54impl Lb {
55    #[instrument(level = "debug", skip_all, err(Debug))]
56    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
57        let tx = self.ro_tx().await;
58        let db = tx.db();
59        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
60        drop(tx);
61
62        let remote_changes = self
63            .client
64            .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
65            .await?;
66        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
67        let remote_dirty = deduped
68            .into_iter()
69            .map(|f| *f.id())
70            .map(WorkUnit::ServerChange);
71
72        self.prune().await?;
73
74        let tx = self.ro_tx().await;
75        let db = tx.db();
76
77        let locally_dirty = db
78            .local_metadata
79            .get()
80            .keys()
81            .copied()
82            .map(WorkUnit::LocalChange);
83
84        let mut work_units: Vec<WorkUnit> = Vec::new();
85        work_units.extend(locally_dirty.chain(remote_dirty));
86        Ok(SyncStatus { work_units, latest_server_ts })
87    }
88
89    #[instrument(level = "debug", skip_all, err(Debug))]
90    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
91        let old = self.syncing.swap(true, Ordering::SeqCst);
92        if old {
93            return Err(LbErrKind::AlreadySyncing.into());
94        }
95
96        let mut ctx = self.setup_sync(f).await?;
97
98        let mut got_updates = false;
99        let mut pipeline: LbResult<()> = async {
100            ctx.msg("Preparing Sync..."); // todo remove
101            self.events.sync(SyncIncrement::SyncStarted);
102            self.prune().await?;
103            got_updates = self.fetch_meta(&mut ctx).await?;
104            self.populate_pk_cache(&mut ctx).await?;
105            self.docs.dont_delete.store(true, Ordering::SeqCst);
106            self.fetch_docs(&mut ctx).await?;
107            self.merge(&mut ctx).await?;
108            self.push_meta(&mut ctx).await?;
109            self.push_docs(&mut ctx).await?;
110            Ok(())
111        }
112        .await;
113
114        self.docs.dont_delete.store(false, Ordering::SeqCst);
115
116        if pipeline.is_ok() {
117            pipeline = self.commit_last_synced(&mut ctx).await;
118        }
119
120        let cleanup = self.cleanup().await;
121
122        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
123        self.events.sync(SyncIncrement::SyncFinished(ekind));
124
125        self.syncing.store(false, Ordering::Relaxed);
126        pipeline?;
127        cleanup?;
128
129        // done not being sent if pipeline is an error is likely the reason we get stuck offline
130        ctx.done_msg();
131
132        if got_updates {
133            // did it?
134            self.events.meta_changed();
135            for id in &ctx.pulled_docs {
136                self.events.doc_written(*id, Some(Actor::Sync));
137            }
138        }
139
140        Ok(ctx.summarize())
141    }
142
143    async fn setup_sync(
144        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
145    ) -> LbResult<SyncContext> {
146        let tx = self.ro_tx().await;
147        let db = tx.db();
148
149        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
150        let pk_cache = db.pub_key_lookup.get().clone();
151
152        let current = 0;
153        let total = 7;
154
155        Ok(SyncContext {
156            last_synced,
157            pk_cache,
158
159            progress,
160            current,
161            total,
162
163            root: Default::default(),
164            update_as_of: Default::default(),
165            remote_changes: Default::default(),
166            pushed_docs: Default::default(),
167            pushed_metas: Default::default(),
168            pulled_docs: Default::default(),
169        })
170    }
171
172    async fn prune(&self) -> LbResult<()> {
173        let server_ids = self
174            .client
175            .request(self.get_account()?, GetFileIdsRequest {})
176            .await?
177            .ids;
178
179        let mut tx = self.begin_tx().await;
180        let db = tx.db();
181
182        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
183        let base_ids = local.tree.base.ids();
184
185        let mut prunable_ids = base_ids;
186        prunable_ids.retain(|id| !server_ids.contains(id));
187        for id in prunable_ids.clone() {
188            prunable_ids.extend(local.descendants(&id)?.into_iter());
189        }
190        for id in &prunable_ids {
191            if let Some(base_file) = local.tree.base.maybe_find(id) {
192                self.docs
193                    .delete(*id, base_file.document_hmac().copied())
194                    .await?;
195            }
196            if let Some(local_file) = local.maybe_find(id) {
197                self.docs
198                    .delete(*id, local_file.document_hmac().copied())
199                    .await?;
200            }
201        }
202
203        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
204        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
205        base_staged.promote()?;
206
207        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
208        local_staged.tree.removed = prunable_ids.into_iter().collect();
209        local_staged.promote()?;
210
211        Ok(())
212    }
213
214    /// Returns true if there were any updates
215    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
216        ctx.msg("Fetching tree updates...");
217        let updates = self
218            .client
219            .request(
220                self.get_account()?,
221                GetUpdatesRequest { since_metadata_version: ctx.last_synced },
222            )
223            .await?;
224
225        let empty = updates.file_metadata.is_empty();
226        let (remote, as_of, root) = self.dedup(updates).await?;
227
228        ctx.remote_changes = remote;
229        ctx.update_as_of = as_of;
230        ctx.root = root;
231
232        Ok(!empty)
233    }
234
235    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
236        ctx.msg("Updating public key cache...");
237        let mut all_owners = HashSet::new();
238        for file in &ctx.remote_changes {
239            for user_access_key in file.user_access_keys() {
240                all_owners.insert(Owner(user_access_key.encrypted_by));
241                all_owners.insert(Owner(user_access_key.encrypted_for));
242            }
243        }
244
245        let mut new_entries = HashMap::new();
246
247        for owner in all_owners {
248            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
249                let username_result = self
250                    .client
251                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
252                    .await;
253                let username = match username_result {
254                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
255                        "<unknown>".to_string()
256                    }
257                    _ => username_result?.username.clone(),
258                };
259                new_entries.insert(owner, username.clone());
260                e.insert(username.clone());
261            }
262        }
263
264        let mut tx = self.begin_tx().await;
265        let db = tx.db();
266
267        for (owner, username) in new_entries {
268            db.pub_key_lookup.insert(owner, username)?;
269        }
270        Ok(())
271    }
272
273    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
274        ctx.msg("Fetching documents...");
275        let mut docs_to_pull = vec![];
276
277        let tx = self.ro_tx().await;
278        let db = tx.db();
279        let start = Instant::now();
280
281        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
282        for id in remote.tree.staged.ids() {
283            if remote.calculate_deleted(&id)? {
284                continue;
285            }
286            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
287            let base_hmac = remote
288                .tree
289                .base
290                .maybe_find(&id)
291                .and_then(|f| f.document_hmac())
292                .cloned();
293            if base_hmac == remote_hmac {
294                continue;
295            }
296
297            if let Some(remote_hmac) = remote_hmac {
298                docs_to_pull.push((id, remote_hmac));
299                self.events.sync(SyncIncrement::PullingDocument(id, true));
300            }
301        }
302
303        drop(tx);
304        if start.elapsed() > std::time::Duration::from_millis(100) {
305            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
306        }
307
308        let num_docs = docs_to_pull.len();
309        ctx.total += num_docs;
310
311        let futures = docs_to_pull
312            .into_iter()
313            .map(|(id, hmac)| self.fetch_doc(id, hmac));
314
315        let mut stream = stream::iter(futures).buffer_unordered(
316            thread::available_parallelism()
317                .unwrap_or(NonZeroUsize::new(4).unwrap())
318                .into(),
319        );
320
321        let mut idx = 0;
322        while let Some(fut) = stream.next().await {
323            let id = fut?;
324            ctx.pulled_docs.push(id);
325            self.events.sync(SyncIncrement::PullingDocument(id, false));
326            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
327            idx += 1;
328        }
329        Ok(())
330    }
331
332    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
333        let remote_document = self
334            .client
335            .request(self.get_account()?, GetDocRequest { id, hmac })
336            .await?;
337        self.docs
338            .insert(id, Some(hmac), &remote_document.content)
339            .await?;
340
341        Ok(id)
342    }
343
344    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
345    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
346    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
347        let mut tx = self.begin_tx().await;
348        let db = tx.db();
349        let start = Instant::now();
350
351        let remote_changes = &ctx.remote_changes;
352
353        // fetch document updates and local documents for merge
354        let me = Owner(self.keychain.get_pk()?);
355
356        // compute merge changes
357        let merge_changes = {
358            // assemble trees
359            let mut base = (&db.base_metadata).to_lazy();
360            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
361            let mut remote = remote_unlazy.as_lazy();
362            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
363
364            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
365            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
366            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
367            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
368            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
369            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
370
371            'merge_construction: loop {
372                // process just the edits which allow us to check deletions in the result
373                let mut deletions = {
374                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
375
376                    // creations
377                    let mut deletion_creations = HashSet::new();
378                    for id in db.local_metadata.ids() {
379                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
380                            deletion_creations.insert(id);
381                        }
382                    }
383                    'drain_creations: while !deletion_creations.is_empty() {
384                        'choose_a_creation: for id in &deletion_creations {
385                            // create
386                            let id = *id;
387                            let local_file = local.find(&id)?.clone();
388                            let result = deletions.create_unvalidated(
389                                id,
390                                symkey::generate_key(),
391                                local_file.parent(),
392                                &local.name(&id, &self.keychain)?,
393                                local_file.file_type(),
394                                &self.keychain,
395                            );
396                            match result {
397                                Ok(_) => {
398                                    deletion_creations.remove(&id);
399                                    continue 'drain_creations;
400                                }
401                                Err(ref err) => match err.kind {
402                                    LbErrKind::FileParentNonexistent => {
403                                        continue 'choose_a_creation;
404                                    }
405                                    _ => {
406                                        result?;
407                                    }
408                                },
409                            }
410                        }
411                        return Err(LbErrKind::Unexpected(format!(
412                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
413                        ))
414                        .into());
415                    }
416
417                    // moves (creations happen first in case a file is moved into a new folder)
418                    for id in db.local_metadata.ids() {
419                        let local_file = local.find(&id)?.clone();
420                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
421                            if !local_file.explicitly_deleted()
422                                && local_file.parent() != base_file.parent()
423                                && !files_to_unmove.contains(&id)
424                            {
425                                // move
426                                deletions.move_unvalidated(
427                                    &id,
428                                    local_file.parent(),
429                                    &self.keychain,
430                                )?;
431                            }
432                        }
433                    }
434
435                    // deletions (moves happen first in case a file is moved into a deleted folder)
436                    for id in db.local_metadata.ids() {
437                        let local_file = local.find(&id)?.clone();
438                        if local_file.explicitly_deleted() {
439                            // delete
440                            deletions.delete_unvalidated(&id, &self.keychain)?;
441                        }
442                    }
443                    deletions
444                };
445
446                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
447                let mut merge = {
448                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
449
450                    // creations and edits of created documents
451                    let mut creations = HashSet::new();
452                    for id in db.local_metadata.ids() {
453                        if deletions.maybe_find(&id).is_some()
454                            && !deletions.calculate_deleted(&id)?
455                            && remote.maybe_find(&id).is_none()
456                            && !links_to_delete.contains(&id)
457                        {
458                            creations.insert(id);
459                        }
460                    }
461                    'drain_creations: while !creations.is_empty() {
462                        'choose_a_creation: for id in &creations {
463                            // create
464                            let id = *id;
465                            let local_file = local.find(&id)?.clone();
466                            let result = merge.create_unvalidated(
467                                id,
468                                local.decrypt_key(&id, &self.keychain)?,
469                                local_file.parent(),
470                                &local.name(&id, &self.keychain)?,
471                                local_file.file_type(),
472                                &self.keychain,
473                            );
474                            match result {
475                                Ok(_) => {
476                                    creations.remove(&id);
477                                    continue 'drain_creations;
478                                }
479                                Err(ref err) => match err.kind {
480                                    LbErrKind::FileParentNonexistent => {
481                                        continue 'choose_a_creation;
482                                    }
483                                    _ => {
484                                        result?;
485                                    }
486                                },
487                            }
488                        }
489                        return Err(LbErrKind::Unexpected(format!(
490                            "sync failed to find a topomodelal order for file creations: {creations:?}"
491                        ))
492                        .into());
493                    }
494
495                    // moves, renames, edits, and shares
496                    // creations happen first in case a file is moved into a new folder
497                    for id in db.local_metadata.ids() {
498                        // skip files that are already deleted or will be deleted
499                        if deletions.maybe_find(&id).is_none()
500                            || deletions.calculate_deleted(&id)?
501                            || (remote.maybe_find(&id).is_some()
502                                && remote.calculate_deleted(&id)?)
503                        {
504                            continue;
505                        }
506
507                        let local_file = local.find(&id)?.clone();
508                        let local_name = local.name(&id, &self.keychain)?;
509                        let maybe_base_file = base.maybe_find(&id).cloned();
510                        let maybe_remote_file = remote.maybe_find(&id).cloned();
511                        if let Some(ref base_file) = maybe_base_file {
512                            let base_name = base.name(&id, &self.keychain)?;
513                            let remote_file = remote.find(&id)?.clone();
514                            let remote_name = remote.name(&id, &self.keychain)?;
515
516                            // move
517                            if local_file.parent() != base_file.parent()
518                                && remote_file.parent() == base_file.parent()
519                                && !files_to_unmove.contains(&id)
520                            {
521                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
522                            }
523
524                            // rename
525                            if local_name != base_name && remote_name == base_name {
526                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
527                            }
528                        }
529
530                        // share
531                        let mut remote_keys = HashMap::new();
532                        if let Some(ref remote_file) = maybe_remote_file {
533                            for key in remote_file.user_access_keys() {
534                                remote_keys.insert(
535                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
536                                    (key.mode, key.deleted),
537                                );
538                            }
539                        }
540                        for key in local_file.user_access_keys() {
541                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
542                            if let Some(&(remote_mode, remote_deleted)) =
543                                remote_keys.get(&(by, for_))
544                            {
545                                // upgrade share
546                                if key.mode > remote_mode || !key.deleted && remote_deleted {
547                                    let mode = match key.mode {
548                                        UserAccessMode::Read => ShareMode::Read,
549                                        UserAccessMode::Write => ShareMode::Write,
550                                        UserAccessMode::Owner => continue,
551                                    };
552                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
553                                }
554                                // delete share
555                                if key.deleted && !remote_deleted {
556                                    merge.delete_share_unvalidated(
557                                        &id,
558                                        Some(for_.0),
559                                        &self.keychain,
560                                    )?;
561                                }
562                            } else {
563                                // add share
564                                let mode = match key.mode {
565                                    UserAccessMode::Read => ShareMode::Read,
566                                    UserAccessMode::Write => ShareMode::Write,
567                                    UserAccessMode::Owner => continue,
568                                };
569                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
570                            }
571                        }
572
573                        // share deletion due to conflicts
574                        if files_to_unshare.contains(&id) {
575                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
576                        }
577
578                        // rename due to path conflict
579                        if let Some(&rename_increment) = rename_increments.get(&id) {
580                            let name = NameComponents::from(&local_name)
581                                .generate_incremented(rename_increment)
582                                .to_name();
583                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
584                        }
585
586                        // edit
587                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
588                        let remote_hmac =
589                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
590                        let local_hmac = local_file.document_hmac().cloned();
591                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
592                            && local_hmac != base_hmac
593                        {
594                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
595                                // merge
596                                let merge_name = merge.name(&id, &self.keychain)?;
597                                let document_type =
598                                    DocumentType::from_file_name_using_extension(&merge_name);
599
600                                // todo these accesses are potentially problematic
601                                // maybe not if service/docs is the persion doing network io
602                                let base_document =
603                                    self.read_document_helper(id, &mut base).await?;
604                                let remote_document =
605                                    self.read_document_helper(id, &mut remote).await?;
606                                let local_document =
607                                    self.read_document_helper(id, &mut local).await?;
608
609                                match document_type {
610                                    DocumentType::Text => {
611                                        // 3-way merge
612                                        // todo: a couple more clones than necessary
613                                        let base_document =
614                                            String::from_utf8_lossy(&base_document).to_string();
615                                        let remote_document =
616                                            String::from_utf8_lossy(&remote_document).to_string();
617                                        let local_document =
618                                            String::from_utf8_lossy(&local_document).to_string();
619                                        let merged_document = Buffer::from(base_document.as_str())
620                                            .merge(local_document, remote_document);
621                                        let encrypted_document = merge
622                                            .update_document_unvalidated(
623                                                &id,
624                                                &merged_document.into_bytes(),
625                                                &self.keychain,
626                                            )?;
627                                        let hmac = merge.find(&id)?.document_hmac().copied();
628                                        self.docs.insert(id, hmac, &encrypted_document).await?;
629                                    }
630                                    DocumentType::Drawing => {
631                                        let base_document =
632                                            String::from_utf8_lossy(&base_document).to_string();
633                                        let remote_document =
634                                            String::from_utf8_lossy(&remote_document).to_string();
635                                        let local_document =
636                                            String::from_utf8_lossy(&local_document).to_string();
637
638                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
639                                        let remote_buffer =
640                                            svg::buffer::Buffer::new(&remote_document);
641                                        let mut local_buffer =
642                                            svg::buffer::Buffer::new(&local_document);
643
644                                        for (_, el) in local_buffer.elements.iter_mut() {
645                                            if let Element::Path(path) = el {
646                                                path.data.apply_transform(u_transform_to_bezier(
647                                                    &Transform::from(
648                                                        local_buffer
649                                                            .weak_viewport_settings
650                                                            .master_transform,
651                                                    ),
652                                                ));
653                                            }
654                                        }
655                                        svg::buffer::Buffer::reload(
656                                            &mut local_buffer.elements,
657                                            &mut local_buffer.weak_images,
658                                            &mut local_buffer.weak_path_pressures,
659                                            &mut local_buffer.weak_viewport_settings,
660                                            &base_buffer,
661                                            &remote_buffer,
662                                        );
663
664                                        let merged_document = local_buffer.serialize();
665                                        let encrypted_document = merge
666                                            .update_document_unvalidated(
667                                                &id,
668                                                &merged_document.into_bytes(),
669                                                &self.keychain,
670                                            )?;
671                                        let hmac = merge.find(&id)?.document_hmac().copied();
672                                        self.docs.insert(id, hmac, &encrypted_document).await?;
673                                    }
674                                    DocumentType::Other => {
675                                        // duplicate file
676                                        let merge_parent = *merge.find(&id)?.parent();
677                                        let duplicate_id = if let Some(&duplicate_id) =
678                                            duplicate_file_ids.get(&id)
679                                        {
680                                            duplicate_id
681                                        } else {
682                                            let duplicate_id = Uuid::new_v4();
683                                            duplicate_file_ids.insert(id, duplicate_id);
684                                            rename_increments.insert(duplicate_id, 1);
685                                            duplicate_id
686                                        };
687
688                                        let mut merge_name = merge_name;
689                                        merge_name = NameComponents::from(&merge_name)
690                                            .generate_incremented(
691                                                rename_increments
692                                                    .get(&duplicate_id)
693                                                    .copied()
694                                                    .unwrap_or_default(),
695                                            )
696                                            .to_name();
697
698                                        merge.create_unvalidated(
699                                            duplicate_id,
700                                            symkey::generate_key(),
701                                            &merge_parent,
702                                            &merge_name,
703                                            FileType::Document,
704                                            &self.keychain,
705                                        )?;
706                                        let encrypted_document = merge
707                                            .update_document_unvalidated(
708                                                &duplicate_id,
709                                                &local_document,
710                                                &self.keychain,
711                                            )?;
712                                        let duplicate_hmac =
713                                            merge.find(&duplicate_id)?.document_hmac().copied();
714                                        self.docs
715                                            .insert(
716                                                duplicate_id,
717                                                duplicate_hmac,
718                                                &encrypted_document,
719                                            )
720                                            .await?;
721                                    }
722                                }
723                            } else {
724                                // overwrite (todo: avoid reading/decrypting/encrypting document)
725                                let document = self.read_document_helper(id, &mut local).await?;
726                                merge.update_document_unvalidated(
727                                    &id,
728                                    &document,
729                                    &self.keychain,
730                                )?;
731                            }
732                        }
733                    }
734
735                    // deletes
736                    // moves happen first in case a file is moved into a deleted folder
737                    for id in db.local_metadata.ids() {
738                        if db.base_metadata.maybe_find(&id).is_some()
739                            && deletions.calculate_deleted(&id)?
740                            && !merge.calculate_deleted(&id)?
741                        {
742                            // delete
743                            merge.delete_unvalidated(&id, &self.keychain)?;
744                        }
745                    }
746                    for &id in &links_to_delete {
747                        // delete
748                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
749                            merge.delete_unvalidated(&id, &self.keychain)?;
750                        }
751                    }
752
753                    merge
754                };
755
756                // validate; handle failures by introducing changeset constraints
757                for link in merge.ids() {
758                    if !merge.calculate_deleted(&link)? {
759                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
760                            if merge.maybe_find(&target).is_some()
761                                && merge.calculate_deleted(&target)?
762                            {
763                                // delete links to deleted files
764                                if links_to_delete.insert(link) {
765                                    continue 'merge_construction;
766                                } else {
767                                    return Err(LbErrKind::Unexpected(format!(
768                                        "sync failed to resolve broken link (deletion): {link:?}"
769                                    ))
770                                    .into());
771                                }
772                            }
773                        }
774                    }
775                }
776
777                let validate_result = merge.validate(me);
778                match validate_result {
779                    // merge changeset is valid
780                    Ok(_) => {
781                        let (_, merge_changes) = merge.unstage();
782                        break merge_changes;
783                    }
784                    Err(ref err) => match err.kind {
785                        LbErrKind::Validation(ref vf) => match vf {
786                            // merge changeset has resolvable validation errors and needs modification
787                            ValidationFailure::Cycle(ids) => {
788                                // revert all local moves in the cycle
789                                let mut progress = false;
790                                for &id in ids {
791                                    if db.local_metadata.maybe_find(&id).is_some()
792                                        && files_to_unmove.insert(id)
793                                    {
794                                        progress = true;
795                                    }
796                                }
797                                if !progress {
798                                    return Err(LbErrKind::Unexpected(format!(
799                                        "sync failed to resolve cycle: {ids:?}"
800                                    ))
801                                    .into());
802                                }
803                            }
804                            ValidationFailure::PathConflict(ids) => {
805                                // pick one local id and generate a non-conflicting filename
806                                let mut progress = false;
807                                for &id in ids {
808                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
809                                        *rename_increments.entry(id).or_insert(0) += 1;
810                                        progress = true;
811                                        break;
812                                    }
813                                }
814                                if !progress {
815                                    for &id in ids {
816                                        if db.local_metadata.maybe_find(&id).is_some() {
817                                            *rename_increments.entry(id).or_insert(0) += 1;
818                                            progress = true;
819                                            break;
820                                        }
821                                    }
822                                }
823                                if !progress {
824                                    return Err(LbErrKind::Unexpected(format!(
825                                        "sync failed to resolve path conflict: {ids:?}"
826                                    ))
827                                    .into());
828                                }
829                            }
830                            ValidationFailure::SharedLink { link, shared_ancestor } => {
831                                // if ancestor is newly shared, delete share, otherwise delete link
832                                let mut progress = false;
833                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
834                                {
835                                    if !base_shared_ancestor.is_shared()
836                                        && files_to_unshare.insert(*shared_ancestor)
837                                    {
838                                        progress = true;
839                                    }
840                                }
841                                if !progress && links_to_delete.insert(*link) {
842                                    progress = true;
843                                }
844                                if !progress {
845                                    return Err(LbErrKind::Unexpected(format!(
846                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
847                                )).into());
848                                }
849                            }
850                            ValidationFailure::DuplicateLink { target } => {
851                                // delete local link with this target
852                                let mut progress = false;
853                                if let Some(link) = local.linked_by(target)? {
854                                    if links_to_delete.insert(link) {
855                                        progress = true;
856                                    }
857                                }
858                                if !progress {
859                                    return Err(LbErrKind::Unexpected(format!(
860                                        "sync failed to resolve duplicate link: target: {target:?}"
861                                    ))
862                                    .into());
863                                }
864                            }
865                            ValidationFailure::BrokenLink(link) => {
866                                // delete local link with this target
867                                if !links_to_delete.insert(*link) {
868                                    return Err(LbErrKind::Unexpected(format!(
869                                        "sync failed to resolve broken link: {link:?}"
870                                    ))
871                                    .into());
872                                }
873                            }
874                            ValidationFailure::OwnedLink(link) => {
875                                // if target is newly owned, unmove target, otherwise delete link
876                                let mut progress = false;
877                                if let Some(remote_link) = remote.maybe_find(link) {
878                                    if let FileType::Link { target } = remote_link.file_type() {
879                                        let remote_target = remote.find(&target)?;
880                                        if remote_target.owner() != me
881                                            && files_to_unmove.insert(target)
882                                        {
883                                            progress = true;
884                                        }
885                                    }
886                                }
887                                if !progress && links_to_delete.insert(*link) {
888                                    progress = true;
889                                }
890                                if !progress {
891                                    return Err(LbErrKind::Unexpected(format!(
892                                        "sync failed to resolve owned link: {link:?}"
893                                    ))
894                                    .into());
895                                }
896                            }
897                            // merge changeset has unexpected validation errors
898                            ValidationFailure::Orphan(_)
899                            | ValidationFailure::NonFolderWithChildren(_)
900                            | ValidationFailure::FileWithDifferentOwnerParent(_)
901                            | ValidationFailure::FileNameTooLong(_)
902                            | ValidationFailure::DeletedFileUpdated(_)
903                            | ValidationFailure::NonDecryptableFileName(_) => {
904                                validate_result?;
905                            }
906                        },
907                        // merge changeset has unexpected errors
908                        _ => {
909                            validate_result?;
910                        }
911                    },
912                }
913            }
914        };
915
916        // base = remote; local = merge
917        (&mut db.base_metadata)
918            .to_staged(remote_changes.clone())
919            .to_lazy()
920            .promote()?;
921        db.local_metadata.clear()?;
922        (&mut db.local_metadata)
923            .to_staged(merge_changes)
924            .to_lazy()
925            .promote()?;
926
927        // todo who else calls this did they manage locks right?
928        // self.cleanup_local_metadata()?;
929        db.base_metadata.stage(&mut db.local_metadata).prune()?;
930
931        if start.elapsed() > std::time::Duration::from_millis(100) {
932            warn!("sync merge held lock for {:?}", start.elapsed());
933        }
934
935        Ok(())
936    }
937
938    /// Updates remote and base metadata to local.
939    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
940        ctx.msg("Pushing tree changes...");
941        let mut updates = vec![];
942        let mut local_changes_no_digests = Vec::new();
943
944        let tx = self.ro_tx().await;
945        let db = tx.db();
946
947        // remote = local
948        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
949
950        for id in local.tree.staged.ids() {
951            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
952            let maybe_base_file = local.tree.base.maybe_find(&id);
953
954            // change everything but document hmac and re-sign
955            local_change.document_hmac =
956                maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
957            let local_change = local_change.sign(&self.keychain)?;
958
959            local_changes_no_digests.push(local_change.clone());
960            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
961            updates.push(file_diff);
962        }
963
964        drop(tx);
965
966        if !updates.is_empty() {
967            self.client
968                .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
969                .await?;
970            ctx.pushed_metas = updates;
971        }
972
973        let mut tx = self.begin_tx().await;
974        let db = tx.db();
975
976        // base = local
977        (&mut db.base_metadata)
978            .to_lazy()
979            .stage(local_changes_no_digests)
980            .promote()?;
981        db.base_metadata.stage(&mut db.local_metadata).prune()?;
982
983        tx.end();
984
985        Ok(())
986    }
987
988    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
989    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
990        ctx.msg("Pushing document changes...");
991        let mut updates = vec![];
992        let mut local_changes_digests_only = vec![];
993
994        let tx = self.ro_tx().await;
995        let db = tx.db();
996        let start = Instant::now();
997
998        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
999
1000        for id in local.tree.staged.ids() {
1001            let base_file = local.tree.base.find(&id)?.clone();
1002
1003            // change only document hmac and re-sign
1004            let mut local_change = base_file.timestamped_value.value.clone();
1005            local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1006
1007            if base_file.document_hmac() == local_change.document_hmac()
1008                || local_change.document_hmac.is_none()
1009            {
1010                continue;
1011            }
1012
1013            let local_change = local_change.sign(&self.keychain)?;
1014
1015            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1016            local_changes_digests_only.push(local_change);
1017            self.events.sync(SyncIncrement::PushingDocument(id, true));
1018        }
1019
1020        drop(tx);
1021        if start.elapsed() > std::time::Duration::from_millis(100) {
1022            warn!("sync push_docs held lock for {:?}", start.elapsed());
1023        }
1024
1025        let docs_count = updates.len();
1026        ctx.total += docs_count;
1027        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1028
1029        let mut stream = stream::iter(futures).buffer_unordered(
1030            thread::available_parallelism()
1031                .unwrap_or(NonZeroUsize::new(4).unwrap())
1032                .into(),
1033        );
1034
1035        let mut idx = 0;
1036        while let Some(fut) = stream.next().await {
1037            let id = fut?;
1038            self.events.sync(SyncIncrement::PushingDocument(id, false));
1039            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1040            idx += 1;
1041        }
1042        ctx.pushed_docs = updates;
1043
1044        let mut tx = self.begin_tx().await;
1045        let db = tx.db();
1046        // base = local (metadata)
1047        (&mut db.base_metadata)
1048            .to_lazy()
1049            .stage(local_changes_digests_only)
1050            .promote()?;
1051
1052        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1053
1054        tx.end();
1055
1056        Ok(())
1057    }
1058
1059    async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1060        let id = *diff.new.id();
1061        let hmac = diff.new.document_hmac();
1062        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1063        self.client
1064            .request(
1065                self.get_account()?,
1066                ChangeDocRequest { diff, new_content: local_document_change },
1067            )
1068            .await?;
1069
1070        Ok(id)
1071    }
1072
1073    async fn dedup(
1074        &self, updates: GetUpdatesResponse,
1075    ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1076        let tx = self.ro_tx().await;
1077        let db = tx.db();
1078
1079        let mut root_id = None;
1080        let (remote_changes, update_as_of) = {
1081            let mut remote_changes = updates.file_metadata;
1082            let update_as_of = updates.as_of_metadata_version;
1083
1084            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1085
1086            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1087
1088            let (_, remote_changes) = remote.unstage();
1089            (remote_changes, update_as_of)
1090        };
1091
1092        // initialize root if this is the first pull on this device
1093        if db.root.get().is_none() {
1094            let root = remote_changes
1095                .all_files()?
1096                .into_iter()
1097                .find(|f| f.is_root())
1098                .ok_or(LbErrKind::RootNonexistent)?;
1099            root_id = Some(*root.id());
1100        }
1101
1102        Ok((remote_changes, update_as_of, root_id))
1103    }
1104
1105    async fn prune_remote_orphans(
1106        &self, remote_changes: Vec<SignedFile>,
1107    ) -> LbResult<Vec<SignedFile>> {
1108        let tx = self.ro_tx().await;
1109        let db = tx.db();
1110
1111        let me = Owner(self.keychain.get_pk()?);
1112        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1113        let mut result = Vec::new();
1114
1115        for id in remote.tree.staged.ids() {
1116            let meta = remote.find(&id)?;
1117            if remote.maybe_find_parent(meta).is_some()
1118                || meta
1119                    .user_access_keys()
1120                    .iter()
1121                    .any(|k| k.encrypted_for == me.0)
1122            {
1123                result.push(remote.find(&id)?.clone()); // todo: don't clone
1124            }
1125        }
1126        Ok(result)
1127    }
1128
1129    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1130        ctx.msg("Cleaning up...");
1131        let mut tx = self.begin_tx().await;
1132        let db = tx.db();
1133        db.last_synced.insert(ctx.update_as_of as i64)?;
1134
1135        if let Some(root) = ctx.root {
1136            db.root.insert(root)?;
1137        }
1138
1139        Ok(())
1140    }
1141
1142    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1143        let tx = self.ro_tx().await;
1144        let db = tx.db();
1145        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1146
1147        Ok(self.get_timestamp_human_string(last_synced))
1148    }
1149
1150    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1151        if timestamp != 0 {
1152            Duration::milliseconds(clock::get_time().0 - timestamp)
1153                .format_human()
1154                .to_string()
1155        } else {
1156            "never".to_string()
1157        }
1158    }
1159}
1160
1161impl SyncContext {
1162    fn summarize(&self) -> SyncStatus {
1163        let mut local = HashSet::new();
1164        let mut server = HashSet::new();
1165        let mut work_units = vec![];
1166
1167        for meta in &self.pushed_metas {
1168            local.insert(meta.new.id());
1169        }
1170
1171        for meta in &self.pushed_docs {
1172            local.insert(meta.new.id());
1173        }
1174
1175        for meta in &self.remote_changes {
1176            server.insert(meta.id());
1177        }
1178
1179        for id in local {
1180            work_units.push(WorkUnit::LocalChange(*id));
1181        }
1182
1183        for id in server {
1184            work_units.push(WorkUnit::ServerChange(*id));
1185        }
1186
1187        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1188    }
1189
1190    fn msg(&mut self, msg: &str) {
1191        self.current += 1;
1192        if let Some(f) = &self.progress {
1193            f(SyncProgress {
1194                total: self.total,
1195                progress: self.current,
1196                file_being_processed: Default::default(),
1197                msg: msg.to_string(),
1198            })
1199        }
1200    }
1201
1202    fn file_msg(&mut self, id: Uuid, msg: &str) {
1203        self.current += 1;
1204        if let Some(f) = &self.progress {
1205            f(SyncProgress {
1206                total: self.total,
1207                progress: self.current,
1208                file_being_processed: Some(id),
1209                msg: msg.to_string(),
1210            })
1211        }
1212    }
1213
1214    fn done_msg(&mut self) {
1215        self.current = self.total;
1216        if let Some(f) = &self.progress {
1217            f(SyncProgress {
1218                total: self.total,
1219                progress: self.current,
1220                file_being_processed: None,
1221                msg: "Sync successful!".to_string(),
1222            })
1223        }
1224    }
1225}
1226
1227#[derive(Debug, Serialize, Clone)]
1228pub struct SyncStatus {
1229    pub work_units: Vec<WorkUnit>,
1230    pub latest_server_ts: u64,
1231}
1232
1233#[derive(Clone)]
1234pub struct SyncProgress {
1235    pub total: usize,
1236    pub progress: usize,
1237    pub file_being_processed: Option<Uuid>,
1238    pub msg: String,
1239}
1240
1241impl Display for SyncProgress {
1242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1243        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1244    }
1245}
1246
1247#[derive(Debug, Clone)]
1248pub enum SyncIncrement {
1249    SyncStarted,
1250    PullingDocument(Uuid, bool),
1251    PushingDocument(Uuid, bool),
1252    SyncFinished(Option<LbErrKind>),
1253}