lb_rs/service/
sync.rs

1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5    ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6    GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41    current: usize,
42    total: usize,
43
44    pk_cache: HashMap<Owner, String>,
45    last_synced: u64,
46    remote_changes: Vec<SignedMeta>,
47    update_as_of: u64,
48    root: Option<Uuid>,
49    pushed_metas: Vec<FileDiff<SignedMeta>>,
50    pushed_docs: Vec<FileDiff<SignedMeta>>,
51    pulled_docs: Vec<Uuid>,
52}
53
54impl Lb {
55    #[instrument(level = "debug", skip_all, err(Debug))]
56    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
57        let tx = self.ro_tx().await;
58        let db = tx.db();
59        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
60        drop(tx);
61
62        let remote_changes = self
63            .client
64            .request(
65                self.get_account()?,
66                GetUpdatesRequestV2 { since_metadata_version: last_synced },
67            )
68            .await?;
69        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
70        let remote_dirty = deduped
71            .into_iter()
72            .map(|f| *f.id())
73            .map(WorkUnit::ServerChange);
74
75        self.prune().await?;
76
77        let tx = self.ro_tx().await;
78        let db = tx.db();
79
80        let locally_dirty = db
81            .local_metadata
82            .get()
83            .keys()
84            .copied()
85            .map(WorkUnit::LocalChange);
86
87        let mut work_units: Vec<WorkUnit> = Vec::new();
88        work_units.extend(locally_dirty.chain(remote_dirty));
89        Ok(SyncStatus { work_units, latest_server_ts })
90    }
91
92    #[instrument(level = "debug", skip_all, err(Debug))]
93    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
94        let old = self.syncing.swap(true, Ordering::SeqCst);
95        if old {
96            return Err(LbErrKind::AlreadySyncing.into());
97        }
98
99        let mut ctx = self.setup_sync(f).await?;
100
101        let mut got_updates = false;
102        let mut pipeline: LbResult<()> = async {
103            ctx.msg("Preparing Sync..."); // todo remove
104            self.events.sync(SyncIncrement::SyncStarted);
105            self.prune().await?;
106            got_updates = self.fetch_meta(&mut ctx).await?;
107            self.populate_pk_cache(&mut ctx).await?;
108            self.docs.dont_delete.store(true, Ordering::SeqCst);
109            self.fetch_docs(&mut ctx).await?;
110            self.merge(&mut ctx).await?;
111            self.push_meta(&mut ctx).await?;
112            self.push_docs(&mut ctx).await?;
113            Ok(())
114        }
115        .await;
116
117        self.docs.dont_delete.store(false, Ordering::SeqCst);
118
119        if pipeline.is_ok() {
120            pipeline = self.commit_last_synced(&mut ctx).await;
121        }
122
123        let cleanup = self.cleanup().await;
124
125        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
126        self.events.sync(SyncIncrement::SyncFinished(ekind));
127
128        self.syncing.store(false, Ordering::Relaxed);
129        pipeline?;
130        cleanup?;
131
132        // done not being sent if pipeline is an error is likely the reason we get stuck offline
133        ctx.done_msg();
134
135        if got_updates {
136            // did it?
137            self.events.meta_changed();
138            for id in &ctx.pulled_docs {
139                self.events.doc_written(*id, Some(Actor::Sync));
140            }
141        }
142
143        Ok(ctx.summarize())
144    }
145
146    async fn setup_sync(
147        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
148    ) -> LbResult<SyncContext> {
149        let tx = self.ro_tx().await;
150        let db = tx.db();
151
152        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
153        let pk_cache = db.pub_key_lookup.get().clone();
154
155        let current = 0;
156        let total = 7;
157
158        Ok(SyncContext {
159            last_synced,
160            pk_cache,
161
162            progress,
163            current,
164            total,
165
166            root: Default::default(),
167            update_as_of: Default::default(),
168            remote_changes: Default::default(),
169            pushed_docs: Default::default(),
170            pushed_metas: Default::default(),
171            pulled_docs: Default::default(),
172        })
173    }
174
175    async fn prune(&self) -> LbResult<()> {
176        let server_ids = self
177            .client
178            .request(self.get_account()?, GetFileIdsRequest {})
179            .await?
180            .ids;
181
182        let mut tx = self.begin_tx().await;
183        let db = tx.db();
184
185        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
186        let base_ids = local.tree.base.ids();
187
188        let mut prunable_ids = base_ids;
189        prunable_ids.retain(|id| !server_ids.contains(id));
190        for id in prunable_ids.clone() {
191            prunable_ids.extend(local.descendants(&id)?.into_iter());
192        }
193        for id in &prunable_ids {
194            if let Some(base_file) = local.tree.base.maybe_find(id) {
195                self.docs
196                    .delete(*id, base_file.document_hmac().copied())
197                    .await?;
198            }
199            if let Some(local_file) = local.maybe_find(id) {
200                self.docs
201                    .delete(*id, local_file.document_hmac().copied())
202                    .await?;
203            }
204        }
205
206        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
207        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
208        base_staged.promote()?;
209
210        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
211        local_staged.tree.removed = prunable_ids.into_iter().collect();
212        local_staged.promote()?;
213
214        Ok(())
215    }
216
217    /// Returns true if there were any updates
218    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
219        ctx.msg("Fetching tree updates...");
220        let updates = self
221            .client
222            .request(
223                self.get_account()?,
224                GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
225            )
226            .await?;
227
228        let empty = updates.file_metadata.is_empty();
229        let (remote, as_of, root) = self.dedup(updates).await?;
230
231        ctx.remote_changes = remote;
232        ctx.update_as_of = as_of;
233        ctx.root = root;
234
235        Ok(!empty)
236    }
237
238    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
239        ctx.msg("Updating public key cache...");
240        let mut all_owners = HashSet::new();
241        for file in &ctx.remote_changes {
242            for user_access_key in file.user_access_keys() {
243                all_owners.insert(Owner(user_access_key.encrypted_by));
244                all_owners.insert(Owner(user_access_key.encrypted_for));
245            }
246        }
247
248        let mut new_entries = HashMap::new();
249
250        for owner in all_owners {
251            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
252                let username_result = self
253                    .client
254                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
255                    .await;
256                let username = match username_result {
257                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
258                        "<unknown>".to_string()
259                    }
260                    _ => username_result?.username.clone(),
261                };
262                new_entries.insert(owner, username.clone());
263                e.insert(username.clone());
264            }
265        }
266
267        let mut tx = self.begin_tx().await;
268        let db = tx.db();
269
270        for (owner, username) in new_entries {
271            db.pub_key_lookup.insert(owner, username)?;
272        }
273        Ok(())
274    }
275
276    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
277        ctx.msg("Fetching documents...");
278        let mut docs_to_pull = vec![];
279
280        let tx = self.ro_tx().await;
281        let db = tx.db();
282        let start = Instant::now();
283
284        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
285        for id in remote.tree.staged.ids() {
286            if remote.calculate_deleted(&id)? {
287                continue;
288            }
289            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
290            let base_hmac = remote
291                .tree
292                .base
293                .maybe_find(&id)
294                .and_then(|f| f.document_hmac())
295                .cloned();
296            if base_hmac == remote_hmac {
297                continue;
298            }
299
300            if let Some(remote_hmac) = remote_hmac {
301                docs_to_pull.push((id, remote_hmac));
302                self.events.sync(SyncIncrement::PullingDocument(id, true));
303            }
304        }
305
306        drop(tx);
307        if start.elapsed() > std::time::Duration::from_millis(100) {
308            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
309        }
310
311        let num_docs = docs_to_pull.len();
312        ctx.total += num_docs;
313
314        let futures = docs_to_pull
315            .into_iter()
316            .map(|(id, hmac)| self.fetch_doc(id, hmac));
317
318        let mut stream = stream::iter(futures).buffer_unordered(
319            thread::available_parallelism()
320                .unwrap_or(NonZeroUsize::new(4).unwrap())
321                .into(),
322        );
323
324        let mut idx = 0;
325        while let Some(fut) = stream.next().await {
326            let id = fut?;
327            ctx.pulled_docs.push(id);
328            self.events.sync(SyncIncrement::PullingDocument(id, false));
329            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
330            idx += 1;
331        }
332        Ok(())
333    }
334
335    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
336        let remote_document = self
337            .client
338            .request(self.get_account()?, GetDocRequest { id, hmac })
339            .await?;
340        self.docs
341            .insert(id, Some(hmac), &remote_document.content)
342            .await?;
343
344        Ok(id)
345    }
346
347    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
348    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
349    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
350        let mut tx = self.begin_tx().await;
351        let db = tx.db();
352        let start = Instant::now();
353
354        let remote_changes = &ctx.remote_changes;
355
356        // fetch document updates and local documents for merge
357        let me = Owner(self.keychain.get_pk()?);
358
359        // compute merge changes
360        let merge_changes = {
361            // assemble trees
362            let mut base = (&db.base_metadata).to_lazy();
363            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
364            let mut remote = remote_unlazy.as_lazy();
365            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
366
367            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
368            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
369            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
370            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
371            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
372            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
373
374            'merge_construction: loop {
375                // process just the edits which allow us to check deletions in the result
376                let mut deletions = {
377                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
378
379                    // creations
380                    let mut deletion_creations = HashSet::new();
381                    for id in db.local_metadata.ids() {
382                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
383                            deletion_creations.insert(id);
384                        }
385                    }
386                    'drain_creations: while !deletion_creations.is_empty() {
387                        'choose_a_creation: for id in &deletion_creations {
388                            // create
389                            let id = *id;
390                            let local_file = local.find(&id)?.clone();
391                            let result = deletions.create_unvalidated(
392                                id,
393                                symkey::generate_key(),
394                                local_file.parent(),
395                                &local.name(&id, &self.keychain)?,
396                                local_file.file_type(),
397                                &self.keychain,
398                            );
399                            match result {
400                                Ok(_) => {
401                                    deletion_creations.remove(&id);
402                                    continue 'drain_creations;
403                                }
404                                Err(ref err) => match err.kind {
405                                    LbErrKind::FileParentNonexistent => {
406                                        continue 'choose_a_creation;
407                                    }
408                                    _ => {
409                                        result?;
410                                    }
411                                },
412                            }
413                        }
414                        return Err(LbErrKind::Unexpected(format!(
415                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
416                        ))
417                        .into());
418                    }
419
420                    // moves (creations happen first in case a file is moved into a new folder)
421                    for id in db.local_metadata.ids() {
422                        let local_file = local.find(&id)?.clone();
423                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
424                            if !local_file.explicitly_deleted()
425                                && local_file.parent() != base_file.parent()
426                                && !files_to_unmove.contains(&id)
427                            {
428                                // move
429                                deletions.move_unvalidated(
430                                    &id,
431                                    local_file.parent(),
432                                    &self.keychain,
433                                )?;
434                            }
435                        }
436                    }
437
438                    // deletions (moves happen first in case a file is moved into a deleted folder)
439                    for id in db.local_metadata.ids() {
440                        let local_file = local.find(&id)?.clone();
441                        if local_file.explicitly_deleted() {
442                            // delete
443                            deletions.delete_unvalidated(&id, &self.keychain)?;
444                        }
445                    }
446                    deletions
447                };
448
449                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
450                let mut merge = {
451                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
452
453                    // creations and edits of created documents
454                    let mut creations = HashSet::new();
455                    for id in db.local_metadata.ids() {
456                        if deletions.maybe_find(&id).is_some()
457                            && !deletions.calculate_deleted(&id)?
458                            && remote.maybe_find(&id).is_none()
459                            && !links_to_delete.contains(&id)
460                        {
461                            creations.insert(id);
462                        }
463                    }
464                    'drain_creations: while !creations.is_empty() {
465                        'choose_a_creation: for id in &creations {
466                            // create
467                            let id = *id;
468                            let local_file = local.find(&id)?.clone();
469                            let result = merge.create_unvalidated(
470                                id,
471                                local.decrypt_key(&id, &self.keychain)?,
472                                local_file.parent(),
473                                &local.name(&id, &self.keychain)?,
474                                local_file.file_type(),
475                                &self.keychain,
476                            );
477                            match result {
478                                Ok(_) => {
479                                    creations.remove(&id);
480                                    continue 'drain_creations;
481                                }
482                                Err(ref err) => match err.kind {
483                                    LbErrKind::FileParentNonexistent => {
484                                        continue 'choose_a_creation;
485                                    }
486                                    _ => {
487                                        result?;
488                                    }
489                                },
490                            }
491                        }
492                        return Err(LbErrKind::Unexpected(format!(
493                            "sync failed to find a topomodelal order for file creations: {creations:?}"
494                        ))
495                        .into());
496                    }
497
498                    // moves, renames, edits, and shares
499                    // creations happen first in case a file is moved into a new folder
500                    for id in db.local_metadata.ids() {
501                        // skip files that are already deleted or will be deleted
502                        if deletions.maybe_find(&id).is_none()
503                            || deletions.calculate_deleted(&id)?
504                            || (remote.maybe_find(&id).is_some()
505                                && remote.calculate_deleted(&id)?)
506                        {
507                            continue;
508                        }
509
510                        let local_file = local.find(&id)?.clone();
511                        let local_name = local.name(&id, &self.keychain)?;
512                        let maybe_base_file = base.maybe_find(&id).cloned();
513                        let maybe_remote_file = remote.maybe_find(&id).cloned();
514                        if let Some(ref base_file) = maybe_base_file {
515                            let base_name = base.name(&id, &self.keychain)?;
516                            let remote_file = remote.find(&id)?.clone();
517                            let remote_name = remote.name(&id, &self.keychain)?;
518
519                            // move
520                            if local_file.parent() != base_file.parent()
521                                && remote_file.parent() == base_file.parent()
522                                && !files_to_unmove.contains(&id)
523                            {
524                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
525                            }
526
527                            // rename
528                            if local_name != base_name && remote_name == base_name {
529                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
530                            }
531                        }
532
533                        // share
534                        let mut remote_keys = HashMap::new();
535                        if let Some(ref remote_file) = maybe_remote_file {
536                            for key in remote_file.user_access_keys() {
537                                remote_keys.insert(
538                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
539                                    (key.mode, key.deleted),
540                                );
541                            }
542                        }
543                        for key in local_file.user_access_keys() {
544                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
545                            if let Some(&(remote_mode, remote_deleted)) =
546                                remote_keys.get(&(by, for_))
547                            {
548                                // upgrade share
549                                if key.mode > remote_mode || !key.deleted && remote_deleted {
550                                    let mode = match key.mode {
551                                        UserAccessMode::Read => ShareMode::Read,
552                                        UserAccessMode::Write => ShareMode::Write,
553                                        UserAccessMode::Owner => continue,
554                                    };
555                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
556                                }
557                                // delete share
558                                if key.deleted && !remote_deleted {
559                                    merge.delete_share_unvalidated(
560                                        &id,
561                                        Some(for_.0),
562                                        &self.keychain,
563                                    )?;
564                                }
565                            } else {
566                                // add share
567                                let mode = match key.mode {
568                                    UserAccessMode::Read => ShareMode::Read,
569                                    UserAccessMode::Write => ShareMode::Write,
570                                    UserAccessMode::Owner => continue,
571                                };
572                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
573                            }
574                        }
575
576                        // share deletion due to conflicts
577                        if files_to_unshare.contains(&id) {
578                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
579                        }
580
581                        // rename due to path conflict
582                        if let Some(&rename_increment) = rename_increments.get(&id) {
583                            let name = NameComponents::from(&local_name)
584                                .generate_incremented(rename_increment)
585                                .to_name();
586                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
587                        }
588
589                        // edit
590                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
591                        let remote_hmac =
592                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
593                        let local_hmac = local_file.document_hmac().cloned();
594                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
595                            && local_hmac != base_hmac
596                        {
597                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
598                                // merge
599                                let merge_name = merge.name(&id, &self.keychain)?;
600                                let document_type =
601                                    DocumentType::from_file_name_using_extension(&merge_name);
602
603                                // todo these accesses are potentially problematic
604                                // maybe not if service/docs is the persion doing network io
605                                let base_document =
606                                    self.read_document_helper(id, &mut base).await?;
607                                let remote_document =
608                                    self.read_document_helper(id, &mut remote).await?;
609                                let local_document =
610                                    self.read_document_helper(id, &mut local).await?;
611
612                                match document_type {
613                                    DocumentType::Text => {
614                                        // 3-way merge
615                                        // todo: a couple more clones than necessary
616                                        let base_document =
617                                            String::from_utf8_lossy(&base_document).to_string();
618                                        let remote_document =
619                                            String::from_utf8_lossy(&remote_document).to_string();
620                                        let local_document =
621                                            String::from_utf8_lossy(&local_document).to_string();
622                                        let merged_document = Buffer::from(base_document.as_str())
623                                            .merge(local_document, remote_document);
624                                        let encrypted_document = merge
625                                            .update_document_unvalidated(
626                                                &id,
627                                                &merged_document.into_bytes(),
628                                                &self.keychain,
629                                            )?;
630                                        let hmac = merge.find(&id)?.document_hmac().copied();
631                                        self.docs.insert(id, hmac, &encrypted_document).await?;
632                                    }
633                                    DocumentType::Drawing => {
634                                        let base_document =
635                                            String::from_utf8_lossy(&base_document).to_string();
636                                        let remote_document =
637                                            String::from_utf8_lossy(&remote_document).to_string();
638                                        let local_document =
639                                            String::from_utf8_lossy(&local_document).to_string();
640
641                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
642                                        let remote_buffer =
643                                            svg::buffer::Buffer::new(&remote_document);
644                                        let mut local_buffer =
645                                            svg::buffer::Buffer::new(&local_document);
646
647                                        for (_, el) in local_buffer.elements.iter_mut() {
648                                            if let Element::Path(path) = el {
649                                                path.data.apply_transform(u_transform_to_bezier(
650                                                    &Transform::from(
651                                                        local_buffer
652                                                            .weak_viewport_settings
653                                                            .master_transform,
654                                                    ),
655                                                ));
656                                            }
657                                        }
658                                        svg::buffer::Buffer::reload(
659                                            &mut local_buffer.elements,
660                                            &mut local_buffer.weak_images,
661                                            &mut local_buffer.weak_path_pressures,
662                                            &mut local_buffer.weak_viewport_settings,
663                                            &base_buffer,
664                                            &remote_buffer,
665                                        );
666
667                                        let merged_document = local_buffer.serialize();
668                                        let encrypted_document = merge
669                                            .update_document_unvalidated(
670                                                &id,
671                                                &merged_document.into_bytes(),
672                                                &self.keychain,
673                                            )?;
674                                        let hmac = merge.find(&id)?.document_hmac().copied();
675                                        self.docs.insert(id, hmac, &encrypted_document).await?;
676                                    }
677                                    DocumentType::Other => {
678                                        // duplicate file
679                                        let merge_parent = *merge.find(&id)?.parent();
680                                        let duplicate_id = if let Some(&duplicate_id) =
681                                            duplicate_file_ids.get(&id)
682                                        {
683                                            duplicate_id
684                                        } else {
685                                            let duplicate_id = Uuid::new_v4();
686                                            duplicate_file_ids.insert(id, duplicate_id);
687                                            rename_increments.insert(duplicate_id, 1);
688                                            duplicate_id
689                                        };
690
691                                        let mut merge_name = merge_name;
692                                        merge_name = NameComponents::from(&merge_name)
693                                            .generate_incremented(
694                                                rename_increments
695                                                    .get(&duplicate_id)
696                                                    .copied()
697                                                    .unwrap_or_default(),
698                                            )
699                                            .to_name();
700
701                                        merge.create_unvalidated(
702                                            duplicate_id,
703                                            symkey::generate_key(),
704                                            &merge_parent,
705                                            &merge_name,
706                                            FileType::Document,
707                                            &self.keychain,
708                                        )?;
709                                        let encrypted_document = merge
710                                            .update_document_unvalidated(
711                                                &duplicate_id,
712                                                &local_document,
713                                                &self.keychain,
714                                            )?;
715                                        let duplicate_hmac =
716                                            merge.find(&duplicate_id)?.document_hmac().copied();
717                                        self.docs
718                                            .insert(
719                                                duplicate_id,
720                                                duplicate_hmac,
721                                                &encrypted_document,
722                                            )
723                                            .await?;
724                                    }
725                                }
726                            } else {
727                                // overwrite (todo: avoid reading/decrypting/encrypting document)
728                                let document = self.read_document_helper(id, &mut local).await?;
729                                merge.update_document_unvalidated(
730                                    &id,
731                                    &document,
732                                    &self.keychain,
733                                )?;
734                            }
735                        }
736                    }
737
738                    // deletes
739                    // moves happen first in case a file is moved into a deleted folder
740                    for id in db.local_metadata.ids() {
741                        if db.base_metadata.maybe_find(&id).is_some()
742                            && deletions.calculate_deleted(&id)?
743                            && !merge.calculate_deleted(&id)?
744                        {
745                            // delete
746                            merge.delete_unvalidated(&id, &self.keychain)?;
747                        }
748                    }
749                    for &id in &links_to_delete {
750                        // delete
751                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
752                            merge.delete_unvalidated(&id, &self.keychain)?;
753                        }
754                    }
755
756                    merge
757                };
758
759                // validate; handle failures by introducing changeset constraints
760                for link in merge.ids() {
761                    if !merge.calculate_deleted(&link)? {
762                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
763                            if merge.maybe_find(&target).is_some()
764                                && merge.calculate_deleted(&target)?
765                            {
766                                // delete links to deleted files
767                                if links_to_delete.insert(link) {
768                                    continue 'merge_construction;
769                                } else {
770                                    return Err(LbErrKind::Unexpected(format!(
771                                        "sync failed to resolve broken link (deletion): {link:?}"
772                                    ))
773                                    .into());
774                                }
775                            }
776                        }
777                    }
778                }
779
780                let validate_result = merge.validate(me);
781                match validate_result {
782                    // merge changeset is valid
783                    Ok(_) => {
784                        let (_, merge_changes) = merge.unstage();
785                        break merge_changes;
786                    }
787                    Err(ref err) => match err.kind {
788                        LbErrKind::Validation(ref vf) => match vf {
789                            // merge changeset has resolvable validation errors and needs modification
790                            ValidationFailure::Cycle(ids) => {
791                                // revert all local moves in the cycle
792                                let mut progress = false;
793                                for &id in ids {
794                                    if db.local_metadata.maybe_find(&id).is_some()
795                                        && files_to_unmove.insert(id)
796                                    {
797                                        progress = true;
798                                    }
799                                }
800                                if !progress {
801                                    return Err(LbErrKind::Unexpected(format!(
802                                        "sync failed to resolve cycle: {ids:?}"
803                                    ))
804                                    .into());
805                                }
806                            }
807                            ValidationFailure::PathConflict(ids) => {
808                                // pick one local id and generate a non-conflicting filename
809                                let mut progress = false;
810                                for &id in ids {
811                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
812                                        *rename_increments.entry(id).or_insert(0) += 1;
813                                        progress = true;
814                                        break;
815                                    }
816                                }
817                                if !progress {
818                                    for &id in ids {
819                                        if db.local_metadata.maybe_find(&id).is_some() {
820                                            *rename_increments.entry(id).or_insert(0) += 1;
821                                            progress = true;
822                                            break;
823                                        }
824                                    }
825                                }
826                                if !progress {
827                                    return Err(LbErrKind::Unexpected(format!(
828                                        "sync failed to resolve path conflict: {ids:?}"
829                                    ))
830                                    .into());
831                                }
832                            }
833                            ValidationFailure::SharedLink { link, shared_ancestor } => {
834                                // if ancestor is newly shared, delete share, otherwise delete link
835                                let mut progress = false;
836                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
837                                {
838                                    if !base_shared_ancestor.is_shared()
839                                        && files_to_unshare.insert(*shared_ancestor)
840                                    {
841                                        progress = true;
842                                    }
843                                }
844                                if !progress && links_to_delete.insert(*link) {
845                                    progress = true;
846                                }
847                                if !progress {
848                                    return Err(LbErrKind::Unexpected(format!(
849                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
850                                )).into());
851                                }
852                            }
853                            ValidationFailure::DuplicateLink { target } => {
854                                // delete local link with this target
855                                let mut progress = false;
856                                if let Some(link) = local.linked_by(target)? {
857                                    if links_to_delete.insert(link) {
858                                        progress = true;
859                                    }
860                                }
861                                if !progress {
862                                    return Err(LbErrKind::Unexpected(format!(
863                                        "sync failed to resolve duplicate link: target: {target:?}"
864                                    ))
865                                    .into());
866                                }
867                            }
868                            ValidationFailure::BrokenLink(link) => {
869                                // delete local link with this target
870                                if !links_to_delete.insert(*link) {
871                                    return Err(LbErrKind::Unexpected(format!(
872                                        "sync failed to resolve broken link: {link:?}"
873                                    ))
874                                    .into());
875                                }
876                            }
877                            ValidationFailure::OwnedLink(link) => {
878                                // if target is newly owned, unmove target, otherwise delete link
879                                let mut progress = false;
880                                if let Some(remote_link) = remote.maybe_find(link) {
881                                    if let FileType::Link { target } = remote_link.file_type() {
882                                        let remote_target = remote.find(&target)?;
883                                        if remote_target.owner() != me
884                                            && files_to_unmove.insert(target)
885                                        {
886                                            progress = true;
887                                        }
888                                    }
889                                }
890                                if !progress && links_to_delete.insert(*link) {
891                                    progress = true;
892                                }
893                                if !progress {
894                                    return Err(LbErrKind::Unexpected(format!(
895                                        "sync failed to resolve owned link: {link:?}"
896                                    ))
897                                    .into());
898                                }
899                            }
900                            // merge changeset has unexpected validation errors
901                            ValidationFailure::Orphan(_)
902                            | ValidationFailure::NonFolderWithChildren(_)
903                            | ValidationFailure::FileWithDifferentOwnerParent(_)
904                            | ValidationFailure::FileNameTooLong(_)
905                            | ValidationFailure::DeletedFileUpdated(_)
906                            | ValidationFailure::NonDecryptableFileName(_) => {
907                                validate_result?;
908                            }
909                        },
910                        // merge changeset has unexpected errors
911                        _ => {
912                            validate_result?;
913                        }
914                    },
915                }
916            }
917        };
918
919        // base = remote; local = merge
920        (&mut db.base_metadata)
921            .to_staged(remote_changes.clone())
922            .to_lazy()
923            .promote()?;
924        db.local_metadata.clear()?;
925        (&mut db.local_metadata)
926            .to_staged(merge_changes)
927            .to_lazy()
928            .promote()?;
929
930        // todo who else calls this did they manage locks right?
931        // self.cleanup_local_metadata()?;
932        db.base_metadata.stage(&mut db.local_metadata).prune()?;
933
934        if start.elapsed() > std::time::Duration::from_millis(100) {
935            warn!("sync merge held lock for {:?}", start.elapsed());
936        }
937
938        Ok(())
939    }
940
941    /// Updates remote and base metadata to local.
942    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
943        ctx.msg("Pushing tree changes...");
944        let mut updates = vec![];
945        let mut local_changes_no_digests = Vec::new();
946
947        let tx = self.ro_tx().await;
948        let db = tx.db();
949
950        // remote = local
951        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
952
953        for id in local.tree.staged.ids() {
954            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
955            let maybe_base_file = local.tree.base.maybe_find(&id);
956
957            // change everything but document hmac and re-sign
958            local_change.set_hmac_and_size(
959                maybe_base_file.and_then(|f| f.document_hmac().copied()),
960                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
961            );
962            let local_change = local_change.sign(&self.keychain)?;
963
964            local_changes_no_digests.push(local_change.clone());
965            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
966            updates.push(file_diff);
967        }
968
969        drop(tx);
970
971        if !updates.is_empty() {
972            self.client
973                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
974                .await?;
975            ctx.pushed_metas = updates;
976        }
977
978        let mut tx = self.begin_tx().await;
979        let db = tx.db();
980
981        // base = local
982        (&mut db.base_metadata)
983            .to_lazy()
984            .stage(local_changes_no_digests)
985            .promote()?;
986        db.base_metadata.stage(&mut db.local_metadata).prune()?;
987
988        tx.end();
989
990        Ok(())
991    }
992
993    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
994    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
995        ctx.msg("Pushing document changes...");
996        let mut updates = vec![];
997        let mut local_changes_digests_only = vec![];
998
999        let tx = self.ro_tx().await;
1000        let db = tx.db();
1001        let start = Instant::now();
1002
1003        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1004
1005        for id in local.tree.staged.ids() {
1006            let base_file = local.tree.base.find(&id)?.clone();
1007
1008            // change only document hmac and re-sign
1009            let mut local_change = base_file.timestamped_value.value.clone();
1010            local_change.set_hmac_and_size(
1011                local.find(&id)?.document_hmac().copied(),
1012                *local.find(&id)?.timestamped_value.value.doc_size(),
1013            );
1014
1015            if base_file.document_hmac() == local_change.document_hmac()
1016                || local_change.document_hmac().is_none()
1017            {
1018                continue;
1019            }
1020
1021            let local_change = local_change.sign(&self.keychain)?;
1022
1023            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1024            local_changes_digests_only.push(local_change);
1025            self.events.sync(SyncIncrement::PushingDocument(id, true));
1026        }
1027
1028        drop(tx);
1029        if start.elapsed() > std::time::Duration::from_millis(100) {
1030            warn!("sync push_docs held lock for {:?}", start.elapsed());
1031        }
1032
1033        let docs_count = updates.len();
1034        ctx.total += docs_count;
1035        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1036
1037        let mut stream = stream::iter(futures).buffer_unordered(
1038            thread::available_parallelism()
1039                .unwrap_or(NonZeroUsize::new(4).unwrap())
1040                .into(),
1041        );
1042
1043        let mut idx = 0;
1044        while let Some(fut) = stream.next().await {
1045            let id = fut?;
1046            self.events.sync(SyncIncrement::PushingDocument(id, false));
1047            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1048            idx += 1;
1049        }
1050        ctx.pushed_docs = updates;
1051
1052        let mut tx = self.begin_tx().await;
1053        let db = tx.db();
1054        // base = local (metadata)
1055        (&mut db.base_metadata)
1056            .to_lazy()
1057            .stage(local_changes_digests_only)
1058            .promote()?;
1059
1060        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1061
1062        tx.end();
1063
1064        Ok(())
1065    }
1066
1067    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1068        let id = *diff.new.id();
1069        let hmac = diff.new.document_hmac();
1070        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1071        self.client
1072            .request(
1073                self.get_account()?,
1074                ChangeDocRequestV2 { diff, new_content: local_document_change },
1075            )
1076            .await?;
1077
1078        Ok(id)
1079    }
1080
1081    async fn dedup(
1082        &self, updates: GetUpdatesResponseV2,
1083    ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1084        let tx = self.ro_tx().await;
1085        let db = tx.db();
1086
1087        let mut root_id = None;
1088        let (remote_changes, update_as_of) = {
1089            let mut remote_changes = updates.file_metadata;
1090            let update_as_of = updates.as_of_metadata_version;
1091
1092            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1093
1094            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1095
1096            let (_, remote_changes) = remote.unstage();
1097            (remote_changes, update_as_of)
1098        };
1099
1100        // initialize root if this is the first pull on this device
1101        if db.root.get().is_none() {
1102            let root = remote_changes
1103                .all_files()?
1104                .into_iter()
1105                .find(|f| f.is_root())
1106                .ok_or(LbErrKind::RootNonexistent)?;
1107            root_id = Some(*root.id());
1108        }
1109
1110        Ok((remote_changes, update_as_of, root_id))
1111    }
1112
1113    async fn prune_remote_orphans(
1114        &self, remote_changes: Vec<SignedMeta>,
1115    ) -> LbResult<Vec<SignedMeta>> {
1116        let tx = self.ro_tx().await;
1117        let db = tx.db();
1118
1119        let me = Owner(self.keychain.get_pk()?);
1120        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1121        let mut result = Vec::new();
1122
1123        for id in remote.tree.staged.ids() {
1124            let meta = remote.find(&id)?;
1125            if remote.maybe_find_parent(meta).is_some()
1126                || meta
1127                    .user_access_keys()
1128                    .iter()
1129                    .any(|k| k.encrypted_for == me.0)
1130            {
1131                result.push(remote.find(&id)?.clone()); // todo: don't clone
1132            }
1133        }
1134        Ok(result)
1135    }
1136
1137    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1138        ctx.msg("Cleaning up...");
1139        let mut tx = self.begin_tx().await;
1140        let db = tx.db();
1141        db.last_synced.insert(ctx.update_as_of as i64)?;
1142
1143        if let Some(root) = ctx.root {
1144            db.root.insert(root)?;
1145        }
1146
1147        Ok(())
1148    }
1149
1150    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1151        let tx = self.ro_tx().await;
1152        let db = tx.db();
1153        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1154
1155        Ok(self.get_timestamp_human_string(last_synced))
1156    }
1157
1158    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1159        if timestamp != 0 {
1160            Duration::milliseconds(clock::get_time().0 - timestamp)
1161                .format_human()
1162                .to_string()
1163        } else {
1164            "never".to_string()
1165        }
1166    }
1167}
1168
1169impl SyncContext {
1170    fn summarize(&self) -> SyncStatus {
1171        let mut local = HashSet::new();
1172        let mut server = HashSet::new();
1173        let mut work_units = vec![];
1174
1175        for meta in &self.pushed_metas {
1176            local.insert(meta.new.id());
1177        }
1178
1179        for meta in &self.pushed_docs {
1180            local.insert(meta.new.id());
1181        }
1182
1183        for meta in &self.remote_changes {
1184            server.insert(meta.id());
1185        }
1186
1187        for id in local {
1188            work_units.push(WorkUnit::LocalChange(*id));
1189        }
1190
1191        for id in server {
1192            work_units.push(WorkUnit::ServerChange(*id));
1193        }
1194
1195        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1196    }
1197
1198    fn msg(&mut self, msg: &str) {
1199        self.current += 1;
1200        if let Some(f) = &self.progress {
1201            f(SyncProgress {
1202                total: self.total,
1203                progress: self.current,
1204                file_being_processed: Default::default(),
1205                msg: msg.to_string(),
1206            })
1207        }
1208    }
1209
1210    fn file_msg(&mut self, id: Uuid, msg: &str) {
1211        self.current += 1;
1212        if let Some(f) = &self.progress {
1213            f(SyncProgress {
1214                total: self.total,
1215                progress: self.current,
1216                file_being_processed: Some(id),
1217                msg: msg.to_string(),
1218            })
1219        }
1220    }
1221
1222    fn done_msg(&mut self) {
1223        self.current = self.total;
1224        if let Some(f) = &self.progress {
1225            f(SyncProgress {
1226                total: self.total,
1227                progress: self.current,
1228                file_being_processed: None,
1229                msg: "Sync successful!".to_string(),
1230            })
1231        }
1232    }
1233}
1234
1235#[derive(Debug, Serialize, Clone)]
1236pub struct SyncStatus {
1237    pub work_units: Vec<WorkUnit>,
1238    pub latest_server_ts: u64,
1239}
1240
1241#[derive(Clone)]
1242pub struct SyncProgress {
1243    pub total: usize,
1244    pub progress: usize,
1245    pub file_being_processed: Option<Uuid>,
1246    pub msg: String,
1247}
1248
1249impl Display for SyncProgress {
1250    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1251        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1252    }
1253}
1254
1255#[derive(Debug, Clone)]
1256pub enum SyncIncrement {
1257    SyncStarted,
1258    PullingDocument(Uuid, bool),
1259    PushingDocument(Uuid, bool),
1260    SyncFinished(Option<LbErrKind>),
1261}