Skip to main content

lb_rs/service/
sync.rs

1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5    ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6    GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use time::Duration;
31use usvg::Transform;
32use uuid::Uuid;
33use web_time::Instant;
34
35use super::events::Actor;
36
37#[cfg(not(target_family = "wasm"))]
38use crate::model::api::UpsertDebugInfoRequest;
39
40pub type SyncFlag = Arc<AtomicBool>;
41
42pub struct SyncContext {
43    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
44    current: usize,
45    total: usize,
46
47    pk_cache: HashMap<Owner, String>,
48    last_synced: u64,
49    remote_changes: Vec<SignedMeta>,
50    update_as_of: u64,
51
52    /// is this the sync that populated root?
53    new_root: Option<Uuid>,
54    pushed_metas: Vec<FileDiff<SignedMeta>>,
55    pushed_docs: Vec<FileDiff<SignedMeta>>,
56    pulled_docs: Vec<Uuid>,
57}
58
59impl Lb {
60    #[instrument(level = "debug", skip_all, err(Debug))]
61    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
62        let tx = self.ro_tx().await;
63        let db = tx.db();
64        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
65        drop(tx);
66
67        let remote_changes = self
68            .client
69            .request(
70                self.get_account()?,
71                GetUpdatesRequestV2 { since_metadata_version: last_synced },
72            )
73            .await?;
74        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
75        let remote_dirty = deduped
76            .into_iter()
77            .map(|f| *f.id())
78            .map(WorkUnit::ServerChange);
79
80        self.prune().await?;
81
82        let tx = self.ro_tx().await;
83        let db = tx.db();
84
85        let locally_dirty = db
86            .local_metadata
87            .get()
88            .keys()
89            .copied()
90            .map(WorkUnit::LocalChange);
91
92        let mut work_units: Vec<WorkUnit> = Vec::new();
93        work_units.extend(locally_dirty.chain(remote_dirty));
94        Ok(SyncStatus { work_units, latest_server_ts })
95    }
96
97    #[instrument(level = "debug", skip_all, err(Debug))]
98    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
99        let old = self.syncing.swap(true, Ordering::SeqCst);
100        if old {
101            return Err(LbErrKind::AlreadySyncing.into());
102        }
103
104        let mut ctx = self.setup_sync(f).await?;
105
106        let mut got_updates = false;
107        let mut pipeline: LbResult<()> = async {
108            ctx.msg("Preparing Sync..."); // todo remove
109            self.events.sync(SyncIncrement::SyncStarted);
110            self.prune().await?;
111            got_updates = self.fetch_meta(&mut ctx).await?;
112            self.populate_pk_cache(&mut ctx).await?;
113            self.docs.dont_delete.store(true, Ordering::SeqCst);
114            self.fetch_docs(&mut ctx).await?;
115            self.merge(&mut ctx).await?;
116            self.push_meta(&mut ctx).await?;
117            self.push_docs(&mut ctx).await?;
118            Ok(())
119        }
120        .await;
121
122        self.docs.dont_delete.store(false, Ordering::SeqCst);
123
124        if pipeline.is_ok() {
125            pipeline = self.commit_last_synced(&mut ctx).await;
126        }
127
128        let cleanup = self.cleanup().await;
129
130        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
131        self.events.sync(SyncIncrement::SyncFinished(ekind));
132
133        self.syncing.store(false, Ordering::Relaxed);
134        pipeline?;
135        cleanup?;
136
137        // done not being sent if pipeline is an error is likely the reason we get stuck offline
138        ctx.done_msg();
139
140        if got_updates {
141            // did it?
142            self.events.meta_changed();
143            let owner = self.keychain.get_pk().map(Owner).ok();
144            // this is overly agressive as it'll notify on shares that have been accepted
145            // another strategy could be to diff the changes coming before and after a sync
146            if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
147                self.events.pending_shares_changed();
148            }
149            for id in &ctx.pulled_docs {
150                self.events.doc_written(*id, Some(Actor::Sync));
151            }
152        }
153
154        #[cfg(not(target_family = "wasm"))]
155        {
156            let account = self.get_account()?;
157            if account.is_beta() {
158                let debug_info = self.debug_info("".into(), false).await?;
159
160                self.client
161                    .request(account, UpsertDebugInfoRequest { debug_info })
162                    .await?;
163            }
164        }
165
166        Ok(ctx.summarize())
167    }
168
169    async fn setup_sync(
170        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
171    ) -> LbResult<SyncContext> {
172        let tx = self.ro_tx().await;
173        let db = tx.db();
174
175        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
176        let pk_cache = db.pub_key_lookup.get().clone();
177
178        let current = 0;
179        let total = 7;
180
181        Ok(SyncContext {
182            last_synced,
183            pk_cache,
184
185            progress,
186            current,
187            total,
188
189            new_root: Default::default(),
190            update_as_of: Default::default(),
191            remote_changes: Default::default(),
192            pushed_docs: Default::default(),
193            pushed_metas: Default::default(),
194            pulled_docs: Default::default(),
195        })
196    }
197
198    async fn prune(&self) -> LbResult<()> {
199        let server_ids = self
200            .client
201            .request(self.get_account()?, GetFileIdsRequest {})
202            .await?
203            .ids;
204
205        let mut tx = self.begin_tx().await;
206        let db = tx.db();
207
208        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
209        let base_ids = local.tree.base.ids();
210
211        let mut prunable_ids = base_ids;
212        prunable_ids.retain(|id| !server_ids.contains(id));
213        for id in prunable_ids.clone() {
214            prunable_ids.extend(local.descendants(&id)?.into_iter());
215        }
216        for id in &prunable_ids {
217            if let Some(base_file) = local.tree.base.maybe_find(id) {
218                self.docs
219                    .delete(*id, base_file.document_hmac().copied())
220                    .await?;
221            }
222            if let Some(local_file) = local.maybe_find(id) {
223                self.docs
224                    .delete(*id, local_file.document_hmac().copied())
225                    .await?;
226            }
227        }
228
229        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
230        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
231        base_staged.promote()?;
232
233        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
234        local_staged.tree.removed = prunable_ids.into_iter().collect();
235        local_staged.promote()?;
236
237        Ok(())
238    }
239
240    /// Returns true if there were any updates
241    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
242        ctx.msg("Fetching tree updates...");
243        let updates = self
244            .client
245            .request(
246                self.get_account()?,
247                GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
248            )
249            .await?;
250
251        let empty = updates.file_metadata.is_empty();
252        let (remote, as_of, root) = self.dedup(updates).await?;
253
254        ctx.remote_changes = remote;
255        ctx.update_as_of = as_of;
256        ctx.new_root = root;
257
258        Ok(!empty)
259    }
260
261    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
262        ctx.msg("Updating public key cache...");
263        let mut all_owners = HashSet::new();
264        for file in &ctx.remote_changes {
265            for user_access_key in file.user_access_keys() {
266                all_owners.insert(Owner(user_access_key.encrypted_by));
267                all_owners.insert(Owner(user_access_key.encrypted_for));
268            }
269        }
270
271        let mut new_entries = HashMap::new();
272
273        for owner in all_owners {
274            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
275                let username_result = self
276                    .client
277                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
278                    .await;
279                let username = match username_result {
280                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
281                        "<unknown>".to_string()
282                    }
283                    _ => username_result?.username.clone(),
284                };
285                new_entries.insert(owner, username.clone());
286                e.insert(username.clone());
287            }
288        }
289
290        let mut tx = self.begin_tx().await;
291        let db = tx.db();
292
293        for (owner, username) in new_entries {
294            db.pub_key_lookup.insert(owner, username)?;
295        }
296        Ok(())
297    }
298
299    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
300        ctx.msg("Fetching documents...");
301        let mut docs_to_pull = vec![];
302
303        let tx = self.ro_tx().await;
304        let db = tx.db();
305        let start = Instant::now();
306
307        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
308        for id in remote.tree.staged.ids() {
309            if remote.calculate_deleted(&id)? {
310                continue;
311            }
312            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
313            let base_hmac = remote
314                .tree
315                .base
316                .maybe_find(&id)
317                .and_then(|f| f.document_hmac())
318                .cloned();
319            if base_hmac == remote_hmac {
320                continue;
321            }
322
323            if let Some(remote_hmac) = remote_hmac {
324                docs_to_pull.push((id, remote_hmac));
325                self.events.sync(SyncIncrement::PullingDocument(id, true));
326            }
327        }
328
329        drop(tx);
330        if start.elapsed() > web_time::Duration::from_millis(100) {
331            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
332        }
333
334        let num_docs = docs_to_pull.len();
335        ctx.total += num_docs;
336
337        let futures = docs_to_pull
338            .into_iter()
339            .map(|(id, hmac)| self.fetch_doc(id, hmac));
340
341        let mut stream = stream::iter(futures).buffer_unordered(
342            thread::available_parallelism()
343                .unwrap_or(NonZeroUsize::new(4).unwrap())
344                .into(),
345        );
346
347        let mut idx = 0;
348        while let Some(fut) = stream.next().await {
349            let id = fut?;
350            ctx.pulled_docs.push(id);
351            self.events.sync(SyncIncrement::PullingDocument(id, false));
352            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
353            idx += 1;
354        }
355        Ok(())
356    }
357
358    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
359        let remote_document = self
360            .client
361            .request(self.get_account()?, GetDocRequest { id, hmac })
362            .await?;
363        self.docs
364            .insert(id, Some(hmac), &remote_document.content)
365            .await?;
366
367        Ok(id)
368    }
369
370    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
371    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
372    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
373        let mut tx = self.begin_tx().await;
374        let db = tx.db();
375        let start = Instant::now();
376
377        let remote_changes = &ctx.remote_changes;
378
379        // fetch document updates and local documents for merge
380        let me = Owner(self.keychain.get_pk()?);
381
382        // compute merge changes
383        let merge_changes = {
384            // assemble trees
385            let mut base = (&db.base_metadata).to_lazy();
386            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
387            let mut remote = remote_unlazy.as_lazy();
388            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
389
390            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
391            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
392            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
393            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
394            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
395            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
396
397            'merge_construction: loop {
398                // process just the edits which allow us to check deletions in the result
399                let mut deletions = {
400                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
401
402                    // creations
403                    let mut deletion_creations = HashSet::new();
404                    for id in db.local_metadata.ids() {
405                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
406                            deletion_creations.insert(id);
407                        }
408                    }
409                    'drain_creations: while !deletion_creations.is_empty() {
410                        'choose_a_creation: for id in &deletion_creations {
411                            // create
412                            let id = *id;
413                            let local_file = local.find(&id)?.clone();
414                            let result = deletions.create_unvalidated(
415                                id,
416                                symkey::generate_key(),
417                                local_file.parent(),
418                                &local.name(&id, &self.keychain)?,
419                                local_file.file_type(),
420                                &self.keychain,
421                            );
422                            match result {
423                                Ok(_) => {
424                                    deletion_creations.remove(&id);
425                                    continue 'drain_creations;
426                                }
427                                Err(ref err) => match err.kind {
428                                    LbErrKind::FileParentNonexistent => {
429                                        continue 'choose_a_creation;
430                                    }
431                                    _ => {
432                                        result?;
433                                    }
434                                },
435                            }
436                        }
437                        return Err(LbErrKind::Unexpected(format!(
438                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
439                        ))
440                        .into());
441                    }
442
443                    // moves (creations happen first in case a file is moved into a new folder)
444                    for id in db.local_metadata.ids() {
445                        let local_file = local.find(&id)?.clone();
446                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
447                            if !local_file.explicitly_deleted()
448                                && local_file.parent() != base_file.parent()
449                                && !files_to_unmove.contains(&id)
450                            {
451                                // move
452                                deletions.move_unvalidated(
453                                    &id,
454                                    local_file.parent(),
455                                    &self.keychain,
456                                )?;
457                            }
458                        }
459                    }
460
461                    // deletions (moves happen first in case a file is moved into a deleted folder)
462                    for id in db.local_metadata.ids() {
463                        let local_file = local.find(&id)?.clone();
464                        if local_file.explicitly_deleted() {
465                            // delete
466                            deletions.delete_unvalidated(&id, &self.keychain)?;
467                        }
468                    }
469                    deletions
470                };
471
472                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
473                let mut merge = {
474                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
475
476                    // creations and edits of created documents
477                    let mut creations = HashSet::new();
478                    for id in db.local_metadata.ids() {
479                        if deletions.maybe_find(&id).is_some()
480                            && !deletions.calculate_deleted(&id)?
481                            && remote.maybe_find(&id).is_none()
482                            && !links_to_delete.contains(&id)
483                        {
484                            creations.insert(id);
485                        }
486                    }
487                    'drain_creations: while !creations.is_empty() {
488                        'choose_a_creation: for id in &creations {
489                            // create
490                            let id = *id;
491                            let local_file = local.find(&id)?.clone();
492                            let result = merge.create_unvalidated(
493                                id,
494                                local.decrypt_key(&id, &self.keychain)?,
495                                local_file.parent(),
496                                &local.name(&id, &self.keychain)?,
497                                local_file.file_type(),
498                                &self.keychain,
499                            );
500                            match result {
501                                Ok(_) => {
502                                    creations.remove(&id);
503                                    continue 'drain_creations;
504                                }
505                                Err(ref err) => match err.kind {
506                                    LbErrKind::FileParentNonexistent => {
507                                        continue 'choose_a_creation;
508                                    }
509                                    _ => {
510                                        result?;
511                                    }
512                                },
513                            }
514                        }
515                        return Err(LbErrKind::Unexpected(format!(
516                            "sync failed to find a topomodelal order for file creations: {creations:?}"
517                        ))
518                        .into());
519                    }
520
521                    // moves, renames, edits, and shares
522                    // creations happen first in case a file is moved into a new folder
523                    for id in db.local_metadata.ids() {
524                        // skip files that are already deleted or will be deleted
525                        if deletions.maybe_find(&id).is_none()
526                            || deletions.calculate_deleted(&id)?
527                            || (remote.maybe_find(&id).is_some()
528                                && remote.calculate_deleted(&id)?)
529                        {
530                            continue;
531                        }
532
533                        let local_file = local.find(&id)?.clone();
534                        let local_name = local.name(&id, &self.keychain)?;
535                        let maybe_base_file = base.maybe_find(&id).cloned();
536                        let maybe_remote_file = remote.maybe_find(&id).cloned();
537                        if let Some(ref base_file) = maybe_base_file {
538                            let base_name = base.name(&id, &self.keychain)?;
539                            let remote_file = remote.find(&id)?.clone();
540                            let remote_name = remote.name(&id, &self.keychain)?;
541
542                            // move
543                            if local_file.parent() != base_file.parent()
544                                && remote_file.parent() == base_file.parent()
545                                && !files_to_unmove.contains(&id)
546                            {
547                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
548                            }
549
550                            // rename
551                            if local_name != base_name && remote_name == base_name {
552                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
553                            }
554                        }
555
556                        // share
557                        let mut remote_keys = HashMap::new();
558                        if let Some(ref remote_file) = maybe_remote_file {
559                            for key in remote_file.user_access_keys() {
560                                remote_keys.insert(
561                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
562                                    (key.mode, key.deleted),
563                                );
564                            }
565                        }
566                        for key in local_file.user_access_keys() {
567                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
568                            if let Some(&(remote_mode, remote_deleted)) =
569                                remote_keys.get(&(by, for_))
570                            {
571                                // upgrade share
572                                if key.mode > remote_mode || !key.deleted && remote_deleted {
573                                    let mode = match key.mode {
574                                        UserAccessMode::Read => ShareMode::Read,
575                                        UserAccessMode::Write => ShareMode::Write,
576                                        UserAccessMode::Owner => continue,
577                                    };
578                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
579                                }
580                                // delete share
581                                if key.deleted && !remote_deleted {
582                                    merge.delete_share_unvalidated(
583                                        &id,
584                                        Some(for_.0),
585                                        &self.keychain,
586                                    )?;
587                                }
588                            } else {
589                                // add share
590                                let mode = match key.mode {
591                                    UserAccessMode::Read => ShareMode::Read,
592                                    UserAccessMode::Write => ShareMode::Write,
593                                    UserAccessMode::Owner => continue,
594                                };
595                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
596                            }
597                        }
598
599                        // share deletion due to conflicts
600                        if files_to_unshare.contains(&id) {
601                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
602                        }
603
604                        // rename due to path conflict
605                        if let Some(&rename_increment) = rename_increments.get(&id) {
606                            let name = NameComponents::from(&local_name)
607                                .generate_incremented(rename_increment)
608                                .to_name();
609                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
610                        }
611
612                        // edit
613                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
614                        let remote_hmac =
615                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
616                        let local_hmac = local_file.document_hmac().cloned();
617                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
618                            && local_hmac != base_hmac
619                        {
620                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
621                                // merge
622                                let merge_name = merge.name(&id, &self.keychain)?;
623                                let document_type =
624                                    DocumentType::from_file_name_using_extension(&merge_name);
625
626                                // todo these accesses are potentially problematic
627                                // maybe not if service/docs is the persion doing network io
628                                let base_document =
629                                    self.read_document_helper(id, &mut base).await?;
630                                let remote_document =
631                                    self.read_document_helper(id, &mut remote).await?;
632                                let local_document =
633                                    self.read_document_helper(id, &mut local).await?;
634
635                                match document_type {
636                                    DocumentType::Text => {
637                                        // 3-way merge
638                                        // todo: a couple more clones than necessary
639                                        let base_document =
640                                            String::from_utf8_lossy(&base_document).to_string();
641                                        let remote_document =
642                                            String::from_utf8_lossy(&remote_document).to_string();
643                                        let local_document =
644                                            String::from_utf8_lossy(&local_document).to_string();
645                                        let merged_document = Buffer::from(base_document.as_str())
646                                            .merge(local_document, remote_document);
647                                        let encrypted_document = merge
648                                            .update_document_unvalidated(
649                                                &id,
650                                                &merged_document.into_bytes(),
651                                                &self.keychain,
652                                            )?;
653                                        let hmac = merge.find(&id)?.document_hmac().copied();
654                                        self.docs.insert(id, hmac, &encrypted_document).await?;
655                                    }
656                                    DocumentType::Drawing => {
657                                        let base_document =
658                                            String::from_utf8_lossy(&base_document).to_string();
659                                        let remote_document =
660                                            String::from_utf8_lossy(&remote_document).to_string();
661                                        let local_document =
662                                            String::from_utf8_lossy(&local_document).to_string();
663
664                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
665                                        let remote_buffer =
666                                            svg::buffer::Buffer::new(&remote_document);
667                                        let mut local_buffer =
668                                            svg::buffer::Buffer::new(&local_document);
669
670                                        for (_, el) in local_buffer.elements.iter_mut() {
671                                            if let Element::Path(path) = el {
672                                                path.data.apply_transform(u_transform_to_bezier(
673                                                    &Transform::from(
674                                                        local_buffer
675                                                            .weak_viewport_settings
676                                                            .master_transform,
677                                                    ),
678                                                ));
679                                            }
680                                        }
681                                        svg::buffer::Buffer::reload(
682                                            &mut local_buffer.elements,
683                                            &mut local_buffer.weak_images,
684                                            &mut local_buffer.weak_path_pressures,
685                                            &mut local_buffer.weak_viewport_settings,
686                                            &base_buffer,
687                                            &remote_buffer,
688                                        );
689
690                                        let merged_document = local_buffer.serialize();
691                                        let encrypted_document = merge
692                                            .update_document_unvalidated(
693                                                &id,
694                                                &merged_document.into_bytes(),
695                                                &self.keychain,
696                                            )?;
697                                        let hmac = merge.find(&id)?.document_hmac().copied();
698                                        self.docs.insert(id, hmac, &encrypted_document).await?;
699                                    }
700                                    DocumentType::Other => {
701                                        // duplicate file
702                                        let merge_parent = *merge.find(&id)?.parent();
703                                        let duplicate_id = if let Some(&duplicate_id) =
704                                            duplicate_file_ids.get(&id)
705                                        {
706                                            duplicate_id
707                                        } else {
708                                            let duplicate_id = Uuid::new_v4();
709                                            duplicate_file_ids.insert(id, duplicate_id);
710                                            rename_increments.insert(duplicate_id, 1);
711                                            duplicate_id
712                                        };
713
714                                        let mut merge_name = merge_name;
715                                        merge_name = NameComponents::from(&merge_name)
716                                            .generate_incremented(
717                                                rename_increments
718                                                    .get(&duplicate_id)
719                                                    .copied()
720                                                    .unwrap_or_default(),
721                                            )
722                                            .to_name();
723
724                                        merge.create_unvalidated(
725                                            duplicate_id,
726                                            symkey::generate_key(),
727                                            &merge_parent,
728                                            &merge_name,
729                                            FileType::Document,
730                                            &self.keychain,
731                                        )?;
732                                        let encrypted_document = merge
733                                            .update_document_unvalidated(
734                                                &duplicate_id,
735                                                &local_document,
736                                                &self.keychain,
737                                            )?;
738                                        let duplicate_hmac =
739                                            merge.find(&duplicate_id)?.document_hmac().copied();
740                                        self.docs
741                                            .insert(
742                                                duplicate_id,
743                                                duplicate_hmac,
744                                                &encrypted_document,
745                                            )
746                                            .await?;
747                                    }
748                                }
749                            } else {
750                                // overwrite (todo: avoid reading/decrypting/encrypting document)
751                                let document = self.read_document_helper(id, &mut local).await?;
752                                merge.update_document_unvalidated(
753                                    &id,
754                                    &document,
755                                    &self.keychain,
756                                )?;
757                            }
758                        }
759                    }
760
761                    // deletes
762                    // moves happen first in case a file is moved into a deleted folder
763                    for id in db.local_metadata.ids() {
764                        if db.base_metadata.maybe_find(&id).is_some()
765                            && deletions.calculate_deleted(&id)?
766                            && !merge.calculate_deleted(&id)?
767                        {
768                            // delete
769                            merge.delete_unvalidated(&id, &self.keychain)?;
770                        }
771                    }
772                    for &id in &links_to_delete {
773                        // delete
774                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
775                            merge.delete_unvalidated(&id, &self.keychain)?;
776                        }
777                    }
778
779                    merge
780                };
781
782                // validate; handle failures by introducing changeset constraints
783                for link in merge.ids() {
784                    if !merge.calculate_deleted(&link)? {
785                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
786                            if merge.maybe_find(&target).is_some()
787                                && merge.calculate_deleted(&target)?
788                            {
789                                // delete links to deleted files
790                                if links_to_delete.insert(link) {
791                                    continue 'merge_construction;
792                                } else {
793                                    return Err(LbErrKind::Unexpected(format!(
794                                        "sync failed to resolve broken link (deletion): {link:?}"
795                                    ))
796                                    .into());
797                                }
798                            }
799                        }
800                    }
801                }
802
803                let validate_result = merge.validate(me);
804                match validate_result {
805                    // merge changeset is valid
806                    Ok(_) => {
807                        let (_, merge_changes) = merge.unstage();
808                        break merge_changes;
809                    }
810                    Err(ref err) => match err.kind {
811                        LbErrKind::Validation(ref vf) => match vf {
812                            // merge changeset has resolvable validation errors and needs modification
813                            ValidationFailure::Cycle(ids) => {
814                                // revert all local moves in the cycle
815                                let mut progress = false;
816                                for &id in ids {
817                                    if db.local_metadata.maybe_find(&id).is_some()
818                                        && files_to_unmove.insert(id)
819                                    {
820                                        progress = true;
821                                    }
822                                }
823                                if !progress {
824                                    return Err(LbErrKind::Unexpected(format!(
825                                        "sync failed to resolve cycle: {ids:?}"
826                                    ))
827                                    .into());
828                                }
829                            }
830                            ValidationFailure::PathConflict(ids) => {
831                                // pick one local id and generate a non-conflicting filename
832                                let mut progress = false;
833                                for &id in ids {
834                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
835                                        *rename_increments.entry(id).or_insert(0) += 1;
836                                        progress = true;
837                                        break;
838                                    }
839                                }
840                                if !progress {
841                                    for &id in ids {
842                                        if db.local_metadata.maybe_find(&id).is_some() {
843                                            *rename_increments.entry(id).or_insert(0) += 1;
844                                            progress = true;
845                                            break;
846                                        }
847                                    }
848                                }
849                                if !progress {
850                                    return Err(LbErrKind::Unexpected(format!(
851                                        "sync failed to resolve path conflict: {ids:?}"
852                                    ))
853                                    .into());
854                                }
855                            }
856                            ValidationFailure::SharedLink { link, shared_ancestor } => {
857                                // if ancestor is newly shared, delete share, otherwise delete link
858                                let mut progress = false;
859                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
860                                {
861                                    if !base_shared_ancestor.is_shared()
862                                        && files_to_unshare.insert(*shared_ancestor)
863                                    {
864                                        progress = true;
865                                    }
866                                }
867                                if !progress && links_to_delete.insert(*link) {
868                                    progress = true;
869                                }
870                                if !progress {
871                                    return Err(LbErrKind::Unexpected(format!(
872                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
873                                )).into());
874                                }
875                            }
876                            ValidationFailure::DuplicateLink { target } => {
877                                // delete local link with this target
878                                let mut progress = false;
879                                if let Some(link) = local.linked_by(target)? {
880                                    if links_to_delete.insert(link) {
881                                        progress = true;
882                                    }
883                                }
884                                if !progress {
885                                    return Err(LbErrKind::Unexpected(format!(
886                                        "sync failed to resolve duplicate link: target: {target:?}"
887                                    ))
888                                    .into());
889                                }
890                            }
891                            ValidationFailure::BrokenLink(link) => {
892                                // delete local link with this target
893                                if !links_to_delete.insert(*link) {
894                                    return Err(LbErrKind::Unexpected(format!(
895                                        "sync failed to resolve broken link: {link:?}"
896                                    ))
897                                    .into());
898                                }
899                            }
900                            ValidationFailure::OwnedLink(link) => {
901                                // if target is newly owned, unmove target, otherwise delete link
902                                let mut progress = false;
903                                if let Some(remote_link) = remote.maybe_find(link) {
904                                    if let FileType::Link { target } = remote_link.file_type() {
905                                        let remote_target = remote.find(&target)?;
906                                        if remote_target.owner() != me
907                                            && files_to_unmove.insert(target)
908                                        {
909                                            progress = true;
910                                        }
911                                    }
912                                }
913                                if !progress && links_to_delete.insert(*link) {
914                                    progress = true;
915                                }
916                                if !progress {
917                                    return Err(LbErrKind::Unexpected(format!(
918                                        "sync failed to resolve owned link: {link:?}"
919                                    ))
920                                    .into());
921                                }
922                            }
923                            // merge changeset has unexpected validation errors
924                            ValidationFailure::Orphan(_)
925                            | ValidationFailure::NonFolderWithChildren(_)
926                            | ValidationFailure::FileWithDifferentOwnerParent(_)
927                            | ValidationFailure::FileNameTooLong(_)
928                            | ValidationFailure::DeletedFileUpdated(_)
929                            | ValidationFailure::NonDecryptableFileName(_) => {
930                                validate_result?;
931                            }
932                        },
933                        // merge changeset has unexpected errors
934                        _ => {
935                            validate_result?;
936                        }
937                    },
938                }
939            }
940        };
941
942        // base = remote; local = merge
943        (&mut db.base_metadata)
944            .to_staged(remote_changes.clone())
945            .to_lazy()
946            .promote()?;
947        db.local_metadata.clear()?;
948        (&mut db.local_metadata)
949            .to_staged(merge_changes)
950            .to_lazy()
951            .promote()?;
952
953        // todo who else calls this did they manage locks right?
954        // self.cleanup_local_metadata()?;
955        db.base_metadata.stage(&mut db.local_metadata).prune()?;
956
957        if start.elapsed() > web_time::Duration::from_millis(100) {
958            warn!("sync merge held lock for {:?}", start.elapsed());
959        }
960
961        Ok(())
962    }
963
964    /// Updates remote and base metadata to local.
965    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
966        ctx.msg("Pushing tree changes...");
967        let mut updates = vec![];
968        let mut local_changes_no_digests = Vec::new();
969
970        let tx = self.ro_tx().await;
971        let db = tx.db();
972
973        // remote = local
974        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
975
976        for id in local.tree.staged.ids() {
977            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
978            let maybe_base_file = local.tree.base.maybe_find(&id);
979
980            // change everything but document hmac and re-sign
981            local_change.set_hmac_and_size(
982                maybe_base_file.and_then(|f| f.document_hmac().copied()),
983                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
984            );
985            let local_change = local_change.sign(&self.keychain)?;
986
987            local_changes_no_digests.push(local_change.clone());
988            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
989            updates.push(file_diff);
990        }
991
992        drop(tx);
993
994        if !updates.is_empty() {
995            self.client
996                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
997                .await?;
998            ctx.pushed_metas = updates;
999        }
1000
1001        let mut tx = self.begin_tx().await;
1002        let db = tx.db();
1003
1004        // base = local
1005        (&mut db.base_metadata)
1006            .to_lazy()
1007            .stage(local_changes_no_digests)
1008            .promote()?;
1009        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1010
1011        tx.end();
1012
1013        Ok(())
1014    }
1015
1016    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1017    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1018        ctx.msg("Pushing document changes...");
1019        let mut updates = vec![];
1020        let mut local_changes_digests_only = vec![];
1021
1022        let tx = self.ro_tx().await;
1023        let db = tx.db();
1024        let start = Instant::now();
1025
1026        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1027
1028        for id in local.tree.staged.ids() {
1029            let base_file = local.tree.base.find(&id)?.clone();
1030
1031            // change only document hmac and re-sign
1032            let mut local_change = base_file.timestamped_value.value.clone();
1033            local_change.set_hmac_and_size(
1034                local.find(&id)?.document_hmac().copied(),
1035                *local.find(&id)?.timestamped_value.value.doc_size(),
1036            );
1037
1038            if base_file.document_hmac() == local_change.document_hmac()
1039                || local_change.document_hmac().is_none()
1040            {
1041                continue;
1042            }
1043
1044            let local_change = local_change.sign(&self.keychain)?;
1045
1046            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1047            local_changes_digests_only.push(local_change);
1048            self.events.sync(SyncIncrement::PushingDocument(id, true));
1049        }
1050
1051        drop(tx);
1052        if start.elapsed() > web_time::Duration::from_millis(100) {
1053            warn!("sync push_docs held lock for {:?}", start.elapsed());
1054        }
1055
1056        let docs_count = updates.len();
1057        ctx.total += docs_count;
1058        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1059
1060        let mut stream = stream::iter(futures).buffer_unordered(
1061            thread::available_parallelism()
1062                .unwrap_or(NonZeroUsize::new(4).unwrap())
1063                .into(),
1064        );
1065
1066        let mut idx = 0;
1067        while let Some(fut) = stream.next().await {
1068            let id = fut?;
1069            self.events.sync(SyncIncrement::PushingDocument(id, false));
1070            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1071            idx += 1;
1072        }
1073        ctx.pushed_docs = updates;
1074
1075        let mut tx = self.begin_tx().await;
1076        let db = tx.db();
1077        // base = local (metadata)
1078        (&mut db.base_metadata)
1079            .to_lazy()
1080            .stage(local_changes_digests_only)
1081            .promote()?;
1082
1083        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1084
1085        tx.end();
1086
1087        Ok(())
1088    }
1089
1090    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1091        let id = *diff.new.id();
1092        let hmac = diff.new.document_hmac();
1093        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1094        self.client
1095            .request(
1096                self.get_account()?,
1097                ChangeDocRequestV2 { diff, new_content: local_document_change },
1098            )
1099            .await?;
1100
1101        Ok(id)
1102    }
1103
1104    async fn dedup(
1105        &self, updates: GetUpdatesResponseV2,
1106    ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1107        let tx = self.ro_tx().await;
1108        let db = tx.db();
1109
1110        let mut root_id = None;
1111        let (remote_changes, update_as_of) = {
1112            let mut remote_changes = updates.file_metadata;
1113            let update_as_of = updates.as_of_metadata_version;
1114
1115            remote_changes = {
1116                let me = Owner(self.keychain.get_pk()?);
1117                let remote = db.base_metadata.stage(remote_changes).to_lazy();
1118                let mut result = Vec::new();
1119
1120                for id in remote.tree.staged.ids() {
1121                    let meta = remote.find(&id)?;
1122                    if remote.maybe_find_parent(meta).is_some()
1123                        || meta
1124                            .user_access_keys()
1125                            .iter()
1126                            .any(|k| k.encrypted_for == me.0)
1127                    {
1128                        result.push(remote.find(&id)?.clone()); // todo: don't clone
1129                    }
1130                }
1131
1132                result
1133            };
1134
1135            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1136
1137            let (_, remote_changes) = remote.unstage();
1138            (remote_changes, update_as_of)
1139        };
1140
1141        // initialize root if this is the first pull on this device
1142        if db.root.get().is_none() {
1143            let root = remote_changes
1144                .all_files()?
1145                .into_iter()
1146                .find(|f| f.is_root())
1147                .ok_or(LbErrKind::RootNonexistent)?;
1148            root_id = Some(*root.id());
1149        }
1150
1151        Ok((remote_changes, update_as_of, root_id))
1152    }
1153
1154    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1155        ctx.msg("Cleaning up...");
1156        let mut tx = self.begin_tx().await;
1157        let db = tx.db();
1158        db.last_synced.insert(ctx.update_as_of as i64)?;
1159
1160        if let Some(root) = ctx.new_root {
1161            db.root.insert(root)?;
1162        }
1163
1164        Ok(())
1165    }
1166
1167    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1168        let tx = self.ro_tx().await;
1169        let db = tx.db();
1170        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1171
1172        Ok(self.get_timestamp_human_string(last_synced))
1173    }
1174
1175    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1176        if timestamp != 0 {
1177            Duration::milliseconds(clock::get_time().0 - timestamp)
1178                .format_human()
1179                .to_string()
1180        } else {
1181            "never".to_string()
1182        }
1183    }
1184}
1185
1186impl SyncContext {
1187    fn summarize(&self) -> SyncStatus {
1188        let mut local = HashSet::new();
1189        let mut server = HashSet::new();
1190        let mut work_units = vec![];
1191
1192        for meta in &self.pushed_metas {
1193            local.insert(meta.new.id());
1194        }
1195
1196        for meta in &self.pushed_docs {
1197            local.insert(meta.new.id());
1198        }
1199
1200        for meta in &self.remote_changes {
1201            server.insert(meta.id());
1202        }
1203
1204        for id in local {
1205            work_units.push(WorkUnit::LocalChange(*id));
1206        }
1207
1208        for id in server {
1209            work_units.push(WorkUnit::ServerChange(*id));
1210        }
1211
1212        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1213    }
1214
1215    fn msg(&mut self, msg: &str) {
1216        self.current += 1;
1217        if let Some(f) = &self.progress {
1218            f(SyncProgress {
1219                total: self.total,
1220                progress: self.current,
1221                file_being_processed: Default::default(),
1222                msg: msg.to_string(),
1223            })
1224        }
1225    }
1226
1227    fn file_msg(&mut self, id: Uuid, msg: &str) {
1228        self.current += 1;
1229        if let Some(f) = &self.progress {
1230            f(SyncProgress {
1231                total: self.total,
1232                progress: self.current,
1233                file_being_processed: Some(id),
1234                msg: msg.to_string(),
1235            })
1236        }
1237    }
1238
1239    fn done_msg(&mut self) {
1240        self.current = self.total;
1241        if let Some(f) = &self.progress {
1242            f(SyncProgress {
1243                total: self.total,
1244                progress: self.current,
1245                file_being_processed: None,
1246                msg: "Sync successful!".to_string(),
1247            })
1248        }
1249    }
1250}
1251
1252#[derive(Debug, Serialize, Clone)]
1253pub struct SyncStatus {
1254    pub work_units: Vec<WorkUnit>,
1255    pub latest_server_ts: u64,
1256}
1257
1258#[derive(Clone)]
1259pub struct SyncProgress {
1260    pub total: usize,
1261    pub progress: usize,
1262    pub file_being_processed: Option<Uuid>,
1263    pub msg: String,
1264}
1265
1266impl Display for SyncProgress {
1267    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1268        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1269    }
1270}
1271
1272#[derive(Debug, Clone)]
1273pub enum SyncIncrement {
1274    SyncStarted,
1275    PullingDocument(Uuid, bool),
1276    PushingDocument(Uuid, bool),
1277    SyncFinished(Option<LbErrKind>),
1278}