Skip to main content

lb_rs/service/
sync.rs

1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5    ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6    GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest,
7    UpsertRequestV2,
8};
9use crate::model::errors::{LbErrKind, LbResult};
10use crate::model::file::ShareMode;
11use crate::model::file_like::FileLike;
12use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
13use crate::model::filename::{DocumentType, NameComponents};
14use crate::model::signed_meta::SignedMeta;
15use crate::model::staged::StagedTreeLikeMut;
16use crate::model::svg::buffer::u_transform_to_bezier;
17use crate::model::svg::element::Element;
18use crate::model::text::buffer::Buffer;
19use crate::model::tree_like::TreeLike;
20use crate::model::work_unit::WorkUnit;
21use crate::model::{ValidationFailure, clock, svg, symkey};
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::{StreamExt, stream};
24use serde::Serialize;
25use std::collections::{HashMap, HashSet, hash_map};
26use std::fmt::{Display, Formatter};
27use std::num::NonZeroUsize;
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::thread;
31use std::time::Instant;
32use time::Duration;
33use usvg::Transform;
34use uuid::Uuid;
35
36use super::events::Actor;
37
38pub type SyncFlag = Arc<AtomicBool>;
39
40pub struct SyncContext {
41    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
42    current: usize,
43    total: usize,
44
45    pk_cache: HashMap<Owner, String>,
46    last_synced: u64,
47    remote_changes: Vec<SignedMeta>,
48    update_as_of: u64,
49
50    /// is this the sync that populated root?
51    new_root: Option<Uuid>,
52    pushed_metas: Vec<FileDiff<SignedMeta>>,
53    pushed_docs: Vec<FileDiff<SignedMeta>>,
54    pulled_docs: Vec<Uuid>,
55}
56
57impl Lb {
58    #[instrument(level = "debug", skip_all, err(Debug))]
59    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
60        let tx = self.ro_tx().await;
61        let db = tx.db();
62        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
63        drop(tx);
64
65        let remote_changes = self
66            .client
67            .request(
68                self.get_account()?,
69                GetUpdatesRequestV2 { since_metadata_version: last_synced },
70            )
71            .await?;
72        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
73        let remote_dirty = deduped
74            .into_iter()
75            .map(|f| *f.id())
76            .map(WorkUnit::ServerChange);
77
78        self.prune().await?;
79
80        let tx = self.ro_tx().await;
81        let db = tx.db();
82
83        let locally_dirty = db
84            .local_metadata
85            .get()
86            .keys()
87            .copied()
88            .map(WorkUnit::LocalChange);
89
90        let mut work_units: Vec<WorkUnit> = Vec::new();
91        work_units.extend(locally_dirty.chain(remote_dirty));
92        Ok(SyncStatus { work_units, latest_server_ts })
93    }
94
95    #[instrument(level = "debug", skip_all, err(Debug))]
96    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
97        let old = self.syncing.swap(true, Ordering::SeqCst);
98        if old {
99            return Err(LbErrKind::AlreadySyncing.into());
100        }
101
102        let mut ctx = self.setup_sync(f).await?;
103
104        let mut got_updates = false;
105        let mut pipeline: LbResult<()> = async {
106            ctx.msg("Preparing Sync..."); // todo remove
107            self.events.sync(SyncIncrement::SyncStarted);
108            self.prune().await?;
109            got_updates = self.fetch_meta(&mut ctx).await?;
110            self.populate_pk_cache(&mut ctx).await?;
111            self.docs.dont_delete.store(true, Ordering::SeqCst);
112            self.fetch_docs(&mut ctx).await?;
113            self.merge(&mut ctx).await?;
114            self.push_meta(&mut ctx).await?;
115            self.push_docs(&mut ctx).await?;
116            Ok(())
117        }
118        .await;
119
120        self.docs.dont_delete.store(false, Ordering::SeqCst);
121
122        if pipeline.is_ok() {
123            pipeline = self.commit_last_synced(&mut ctx).await;
124        }
125
126        let cleanup = self.cleanup().await;
127
128        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
129        self.events.sync(SyncIncrement::SyncFinished(ekind));
130
131        self.syncing.store(false, Ordering::Relaxed);
132        pipeline?;
133        cleanup?;
134
135        // done not being sent if pipeline is an error is likely the reason we get stuck offline
136        ctx.done_msg();
137
138        if got_updates {
139            // did it?
140            self.events.meta_changed();
141            let owner = self.keychain.get_pk().map(Owner).ok();
142            // this is overly agressive as it'll notify on shares that have been accepted
143            // another strategy could be to diff the changes coming before and after a sync
144            if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
145                self.events.pending_shares_changed();
146            }
147            for id in &ctx.pulled_docs {
148                self.events.doc_written(*id, Some(Actor::Sync));
149            }
150        }
151
152        let account = self.get_account()?;
153        if account.is_beta() {
154            let debug_info = self.debug_info("".into()).await?;
155
156            self.client
157                .request(account, UpsertDebugInfoRequest { debug_info })
158                .await?;
159        }
160
161        Ok(ctx.summarize())
162    }
163
164    async fn setup_sync(
165        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
166    ) -> LbResult<SyncContext> {
167        let tx = self.ro_tx().await;
168        let db = tx.db();
169
170        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
171        let pk_cache = db.pub_key_lookup.get().clone();
172
173        let current = 0;
174        let total = 7;
175
176        Ok(SyncContext {
177            last_synced,
178            pk_cache,
179
180            progress,
181            current,
182            total,
183
184            new_root: Default::default(),
185            update_as_of: Default::default(),
186            remote_changes: Default::default(),
187            pushed_docs: Default::default(),
188            pushed_metas: Default::default(),
189            pulled_docs: Default::default(),
190        })
191    }
192
193    async fn prune(&self) -> LbResult<()> {
194        let server_ids = self
195            .client
196            .request(self.get_account()?, GetFileIdsRequest {})
197            .await?
198            .ids;
199
200        let mut tx = self.begin_tx().await;
201        let db = tx.db();
202
203        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
204        let base_ids = local.tree.base.ids();
205
206        let mut prunable_ids = base_ids;
207        prunable_ids.retain(|id| !server_ids.contains(id));
208        for id in prunable_ids.clone() {
209            prunable_ids.extend(local.descendants(&id)?.into_iter());
210        }
211        for id in &prunable_ids {
212            if let Some(base_file) = local.tree.base.maybe_find(id) {
213                self.docs
214                    .delete(*id, base_file.document_hmac().copied())
215                    .await?;
216            }
217            if let Some(local_file) = local.maybe_find(id) {
218                self.docs
219                    .delete(*id, local_file.document_hmac().copied())
220                    .await?;
221            }
222        }
223
224        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
225        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
226        base_staged.promote()?;
227
228        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
229        local_staged.tree.removed = prunable_ids.into_iter().collect();
230        local_staged.promote()?;
231
232        Ok(())
233    }
234
235    /// Returns true if there were any updates
236    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
237        ctx.msg("Fetching tree updates...");
238        let updates = self
239            .client
240            .request(
241                self.get_account()?,
242                GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
243            )
244            .await?;
245
246        let empty = updates.file_metadata.is_empty();
247        let (remote, as_of, root) = self.dedup(updates).await?;
248
249        ctx.remote_changes = remote;
250        ctx.update_as_of = as_of;
251        ctx.new_root = root;
252
253        Ok(!empty)
254    }
255
256    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
257        ctx.msg("Updating public key cache...");
258        let mut all_owners = HashSet::new();
259        for file in &ctx.remote_changes {
260            for user_access_key in file.user_access_keys() {
261                all_owners.insert(Owner(user_access_key.encrypted_by));
262                all_owners.insert(Owner(user_access_key.encrypted_for));
263            }
264        }
265
266        let mut new_entries = HashMap::new();
267
268        for owner in all_owners {
269            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
270                let username_result = self
271                    .client
272                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
273                    .await;
274                let username = match username_result {
275                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
276                        "<unknown>".to_string()
277                    }
278                    _ => username_result?.username.clone(),
279                };
280                new_entries.insert(owner, username.clone());
281                e.insert(username.clone());
282            }
283        }
284
285        let mut tx = self.begin_tx().await;
286        let db = tx.db();
287
288        for (owner, username) in new_entries {
289            db.pub_key_lookup.insert(owner, username)?;
290        }
291        Ok(())
292    }
293
294    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
295        ctx.msg("Fetching documents...");
296        let mut docs_to_pull = vec![];
297
298        let tx = self.ro_tx().await;
299        let db = tx.db();
300        let start = Instant::now();
301
302        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
303        for id in remote.tree.staged.ids() {
304            if remote.calculate_deleted(&id)? {
305                continue;
306            }
307            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
308            let base_hmac = remote
309                .tree
310                .base
311                .maybe_find(&id)
312                .and_then(|f| f.document_hmac())
313                .cloned();
314            if base_hmac == remote_hmac {
315                continue;
316            }
317
318            if let Some(remote_hmac) = remote_hmac {
319                docs_to_pull.push((id, remote_hmac));
320                self.events.sync(SyncIncrement::PullingDocument(id, true));
321            }
322        }
323
324        drop(tx);
325        if start.elapsed() > std::time::Duration::from_millis(100) {
326            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
327        }
328
329        let num_docs = docs_to_pull.len();
330        ctx.total += num_docs;
331
332        let futures = docs_to_pull
333            .into_iter()
334            .map(|(id, hmac)| self.fetch_doc(id, hmac));
335
336        let mut stream = stream::iter(futures).buffer_unordered(
337            thread::available_parallelism()
338                .unwrap_or(NonZeroUsize::new(4).unwrap())
339                .into(),
340        );
341
342        let mut idx = 0;
343        while let Some(fut) = stream.next().await {
344            let id = fut?;
345            ctx.pulled_docs.push(id);
346            self.events.sync(SyncIncrement::PullingDocument(id, false));
347            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
348            idx += 1;
349        }
350        Ok(())
351    }
352
353    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
354        let remote_document = self
355            .client
356            .request(self.get_account()?, GetDocRequest { id, hmac })
357            .await?;
358        self.docs
359            .insert(id, Some(hmac), &remote_document.content)
360            .await?;
361
362        Ok(id)
363    }
364
365    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
366    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
367    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
368        let mut tx = self.begin_tx().await;
369        let db = tx.db();
370        let start = Instant::now();
371
372        let remote_changes = &ctx.remote_changes;
373
374        // fetch document updates and local documents for merge
375        let me = Owner(self.keychain.get_pk()?);
376
377        // compute merge changes
378        let merge_changes = {
379            // assemble trees
380            let mut base = (&db.base_metadata).to_lazy();
381            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
382            let mut remote = remote_unlazy.as_lazy();
383            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
384
385            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
386            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
387            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
388            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
389            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
390            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
391
392            'merge_construction: loop {
393                // process just the edits which allow us to check deletions in the result
394                let mut deletions = {
395                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
396
397                    // creations
398                    let mut deletion_creations = HashSet::new();
399                    for id in db.local_metadata.ids() {
400                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
401                            deletion_creations.insert(id);
402                        }
403                    }
404                    'drain_creations: while !deletion_creations.is_empty() {
405                        'choose_a_creation: for id in &deletion_creations {
406                            // create
407                            let id = *id;
408                            let local_file = local.find(&id)?.clone();
409                            let result = deletions.create_unvalidated(
410                                id,
411                                symkey::generate_key(),
412                                local_file.parent(),
413                                &local.name(&id, &self.keychain)?,
414                                local_file.file_type(),
415                                &self.keychain,
416                            );
417                            match result {
418                                Ok(_) => {
419                                    deletion_creations.remove(&id);
420                                    continue 'drain_creations;
421                                }
422                                Err(ref err) => match err.kind {
423                                    LbErrKind::FileParentNonexistent => {
424                                        continue 'choose_a_creation;
425                                    }
426                                    _ => {
427                                        result?;
428                                    }
429                                },
430                            }
431                        }
432                        return Err(LbErrKind::Unexpected(format!(
433                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
434                        ))
435                        .into());
436                    }
437
438                    // moves (creations happen first in case a file is moved into a new folder)
439                    for id in db.local_metadata.ids() {
440                        let local_file = local.find(&id)?.clone();
441                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
442                            if !local_file.explicitly_deleted()
443                                && local_file.parent() != base_file.parent()
444                                && !files_to_unmove.contains(&id)
445                            {
446                                // move
447                                deletions.move_unvalidated(
448                                    &id,
449                                    local_file.parent(),
450                                    &self.keychain,
451                                )?;
452                            }
453                        }
454                    }
455
456                    // deletions (moves happen first in case a file is moved into a deleted folder)
457                    for id in db.local_metadata.ids() {
458                        let local_file = local.find(&id)?.clone();
459                        if local_file.explicitly_deleted() {
460                            // delete
461                            deletions.delete_unvalidated(&id, &self.keychain)?;
462                        }
463                    }
464                    deletions
465                };
466
467                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
468                let mut merge = {
469                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
470
471                    // creations and edits of created documents
472                    let mut creations = HashSet::new();
473                    for id in db.local_metadata.ids() {
474                        if deletions.maybe_find(&id).is_some()
475                            && !deletions.calculate_deleted(&id)?
476                            && remote.maybe_find(&id).is_none()
477                            && !links_to_delete.contains(&id)
478                        {
479                            creations.insert(id);
480                        }
481                    }
482                    'drain_creations: while !creations.is_empty() {
483                        'choose_a_creation: for id in &creations {
484                            // create
485                            let id = *id;
486                            let local_file = local.find(&id)?.clone();
487                            let result = merge.create_unvalidated(
488                                id,
489                                local.decrypt_key(&id, &self.keychain)?,
490                                local_file.parent(),
491                                &local.name(&id, &self.keychain)?,
492                                local_file.file_type(),
493                                &self.keychain,
494                            );
495                            match result {
496                                Ok(_) => {
497                                    creations.remove(&id);
498                                    continue 'drain_creations;
499                                }
500                                Err(ref err) => match err.kind {
501                                    LbErrKind::FileParentNonexistent => {
502                                        continue 'choose_a_creation;
503                                    }
504                                    _ => {
505                                        result?;
506                                    }
507                                },
508                            }
509                        }
510                        return Err(LbErrKind::Unexpected(format!(
511                            "sync failed to find a topomodelal order for file creations: {creations:?}"
512                        ))
513                        .into());
514                    }
515
516                    // moves, renames, edits, and shares
517                    // creations happen first in case a file is moved into a new folder
518                    for id in db.local_metadata.ids() {
519                        // skip files that are already deleted or will be deleted
520                        if deletions.maybe_find(&id).is_none()
521                            || deletions.calculate_deleted(&id)?
522                            || (remote.maybe_find(&id).is_some()
523                                && remote.calculate_deleted(&id)?)
524                        {
525                            continue;
526                        }
527
528                        let local_file = local.find(&id)?.clone();
529                        let local_name = local.name(&id, &self.keychain)?;
530                        let maybe_base_file = base.maybe_find(&id).cloned();
531                        let maybe_remote_file = remote.maybe_find(&id).cloned();
532                        if let Some(ref base_file) = maybe_base_file {
533                            let base_name = base.name(&id, &self.keychain)?;
534                            let remote_file = remote.find(&id)?.clone();
535                            let remote_name = remote.name(&id, &self.keychain)?;
536
537                            // move
538                            if local_file.parent() != base_file.parent()
539                                && remote_file.parent() == base_file.parent()
540                                && !files_to_unmove.contains(&id)
541                            {
542                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
543                            }
544
545                            // rename
546                            if local_name != base_name && remote_name == base_name {
547                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
548                            }
549                        }
550
551                        // share
552                        let mut remote_keys = HashMap::new();
553                        if let Some(ref remote_file) = maybe_remote_file {
554                            for key in remote_file.user_access_keys() {
555                                remote_keys.insert(
556                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
557                                    (key.mode, key.deleted),
558                                );
559                            }
560                        }
561                        for key in local_file.user_access_keys() {
562                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
563                            if let Some(&(remote_mode, remote_deleted)) =
564                                remote_keys.get(&(by, for_))
565                            {
566                                // upgrade share
567                                if key.mode > remote_mode || !key.deleted && remote_deleted {
568                                    let mode = match key.mode {
569                                        UserAccessMode::Read => ShareMode::Read,
570                                        UserAccessMode::Write => ShareMode::Write,
571                                        UserAccessMode::Owner => continue,
572                                    };
573                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
574                                }
575                                // delete share
576                                if key.deleted && !remote_deleted {
577                                    merge.delete_share_unvalidated(
578                                        &id,
579                                        Some(for_.0),
580                                        &self.keychain,
581                                    )?;
582                                }
583                            } else {
584                                // add share
585                                let mode = match key.mode {
586                                    UserAccessMode::Read => ShareMode::Read,
587                                    UserAccessMode::Write => ShareMode::Write,
588                                    UserAccessMode::Owner => continue,
589                                };
590                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
591                            }
592                        }
593
594                        // share deletion due to conflicts
595                        if files_to_unshare.contains(&id) {
596                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
597                        }
598
599                        // rename due to path conflict
600                        if let Some(&rename_increment) = rename_increments.get(&id) {
601                            let name = NameComponents::from(&local_name)
602                                .generate_incremented(rename_increment)
603                                .to_name();
604                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
605                        }
606
607                        // edit
608                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
609                        let remote_hmac =
610                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
611                        let local_hmac = local_file.document_hmac().cloned();
612                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
613                            && local_hmac != base_hmac
614                        {
615                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
616                                // merge
617                                let merge_name = merge.name(&id, &self.keychain)?;
618                                let document_type =
619                                    DocumentType::from_file_name_using_extension(&merge_name);
620
621                                // todo these accesses are potentially problematic
622                                // maybe not if service/docs is the persion doing network io
623                                let base_document =
624                                    self.read_document_helper(id, &mut base).await?;
625                                let remote_document =
626                                    self.read_document_helper(id, &mut remote).await?;
627                                let local_document =
628                                    self.read_document_helper(id, &mut local).await?;
629
630                                match document_type {
631                                    DocumentType::Text => {
632                                        // 3-way merge
633                                        // todo: a couple more clones than necessary
634                                        let base_document =
635                                            String::from_utf8_lossy(&base_document).to_string();
636                                        let remote_document =
637                                            String::from_utf8_lossy(&remote_document).to_string();
638                                        let local_document =
639                                            String::from_utf8_lossy(&local_document).to_string();
640                                        let merged_document = Buffer::from(base_document.as_str())
641                                            .merge(local_document, remote_document);
642                                        let encrypted_document = merge
643                                            .update_document_unvalidated(
644                                                &id,
645                                                &merged_document.into_bytes(),
646                                                &self.keychain,
647                                            )?;
648                                        let hmac = merge.find(&id)?.document_hmac().copied();
649                                        self.docs.insert(id, hmac, &encrypted_document).await?;
650                                    }
651                                    DocumentType::Drawing => {
652                                        let base_document =
653                                            String::from_utf8_lossy(&base_document).to_string();
654                                        let remote_document =
655                                            String::from_utf8_lossy(&remote_document).to_string();
656                                        let local_document =
657                                            String::from_utf8_lossy(&local_document).to_string();
658
659                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
660                                        let remote_buffer =
661                                            svg::buffer::Buffer::new(&remote_document);
662                                        let mut local_buffer =
663                                            svg::buffer::Buffer::new(&local_document);
664
665                                        for (_, el) in local_buffer.elements.iter_mut() {
666                                            if let Element::Path(path) = el {
667                                                path.data.apply_transform(u_transform_to_bezier(
668                                                    &Transform::from(
669                                                        local_buffer
670                                                            .weak_viewport_settings
671                                                            .master_transform,
672                                                    ),
673                                                ));
674                                            }
675                                        }
676                                        svg::buffer::Buffer::reload(
677                                            &mut local_buffer.elements,
678                                            &mut local_buffer.weak_images,
679                                            &mut local_buffer.weak_path_pressures,
680                                            &mut local_buffer.weak_viewport_settings,
681                                            &base_buffer,
682                                            &remote_buffer,
683                                        );
684
685                                        let merged_document = local_buffer.serialize();
686                                        let encrypted_document = merge
687                                            .update_document_unvalidated(
688                                                &id,
689                                                &merged_document.into_bytes(),
690                                                &self.keychain,
691                                            )?;
692                                        let hmac = merge.find(&id)?.document_hmac().copied();
693                                        self.docs.insert(id, hmac, &encrypted_document).await?;
694                                    }
695                                    DocumentType::Other => {
696                                        // duplicate file
697                                        let merge_parent = *merge.find(&id)?.parent();
698                                        let duplicate_id = if let Some(&duplicate_id) =
699                                            duplicate_file_ids.get(&id)
700                                        {
701                                            duplicate_id
702                                        } else {
703                                            let duplicate_id = Uuid::new_v4();
704                                            duplicate_file_ids.insert(id, duplicate_id);
705                                            rename_increments.insert(duplicate_id, 1);
706                                            duplicate_id
707                                        };
708
709                                        let mut merge_name = merge_name;
710                                        merge_name = NameComponents::from(&merge_name)
711                                            .generate_incremented(
712                                                rename_increments
713                                                    .get(&duplicate_id)
714                                                    .copied()
715                                                    .unwrap_or_default(),
716                                            )
717                                            .to_name();
718
719                                        merge.create_unvalidated(
720                                            duplicate_id,
721                                            symkey::generate_key(),
722                                            &merge_parent,
723                                            &merge_name,
724                                            FileType::Document,
725                                            &self.keychain,
726                                        )?;
727                                        let encrypted_document = merge
728                                            .update_document_unvalidated(
729                                                &duplicate_id,
730                                                &local_document,
731                                                &self.keychain,
732                                            )?;
733                                        let duplicate_hmac =
734                                            merge.find(&duplicate_id)?.document_hmac().copied();
735                                        self.docs
736                                            .insert(
737                                                duplicate_id,
738                                                duplicate_hmac,
739                                                &encrypted_document,
740                                            )
741                                            .await?;
742                                    }
743                                }
744                            } else {
745                                // overwrite (todo: avoid reading/decrypting/encrypting document)
746                                let document = self.read_document_helper(id, &mut local).await?;
747                                merge.update_document_unvalidated(
748                                    &id,
749                                    &document,
750                                    &self.keychain,
751                                )?;
752                            }
753                        }
754                    }
755
756                    // deletes
757                    // moves happen first in case a file is moved into a deleted folder
758                    for id in db.local_metadata.ids() {
759                        if db.base_metadata.maybe_find(&id).is_some()
760                            && deletions.calculate_deleted(&id)?
761                            && !merge.calculate_deleted(&id)?
762                        {
763                            // delete
764                            merge.delete_unvalidated(&id, &self.keychain)?;
765                        }
766                    }
767                    for &id in &links_to_delete {
768                        // delete
769                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
770                            merge.delete_unvalidated(&id, &self.keychain)?;
771                        }
772                    }
773
774                    merge
775                };
776
777                // validate; handle failures by introducing changeset constraints
778                for link in merge.ids() {
779                    if !merge.calculate_deleted(&link)? {
780                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
781                            if merge.maybe_find(&target).is_some()
782                                && merge.calculate_deleted(&target)?
783                            {
784                                // delete links to deleted files
785                                if links_to_delete.insert(link) {
786                                    continue 'merge_construction;
787                                } else {
788                                    return Err(LbErrKind::Unexpected(format!(
789                                        "sync failed to resolve broken link (deletion): {link:?}"
790                                    ))
791                                    .into());
792                                }
793                            }
794                        }
795                    }
796                }
797
798                let validate_result = merge.validate(me);
799                match validate_result {
800                    // merge changeset is valid
801                    Ok(_) => {
802                        let (_, merge_changes) = merge.unstage();
803                        break merge_changes;
804                    }
805                    Err(ref err) => match err.kind {
806                        LbErrKind::Validation(ref vf) => match vf {
807                            // merge changeset has resolvable validation errors and needs modification
808                            ValidationFailure::Cycle(ids) => {
809                                // revert all local moves in the cycle
810                                let mut progress = false;
811                                for &id in ids {
812                                    if db.local_metadata.maybe_find(&id).is_some()
813                                        && files_to_unmove.insert(id)
814                                    {
815                                        progress = true;
816                                    }
817                                }
818                                if !progress {
819                                    return Err(LbErrKind::Unexpected(format!(
820                                        "sync failed to resolve cycle: {ids:?}"
821                                    ))
822                                    .into());
823                                }
824                            }
825                            ValidationFailure::PathConflict(ids) => {
826                                // pick one local id and generate a non-conflicting filename
827                                let mut progress = false;
828                                for &id in ids {
829                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
830                                        *rename_increments.entry(id).or_insert(0) += 1;
831                                        progress = true;
832                                        break;
833                                    }
834                                }
835                                if !progress {
836                                    for &id in ids {
837                                        if db.local_metadata.maybe_find(&id).is_some() {
838                                            *rename_increments.entry(id).or_insert(0) += 1;
839                                            progress = true;
840                                            break;
841                                        }
842                                    }
843                                }
844                                if !progress {
845                                    return Err(LbErrKind::Unexpected(format!(
846                                        "sync failed to resolve path conflict: {ids:?}"
847                                    ))
848                                    .into());
849                                }
850                            }
851                            ValidationFailure::SharedLink { link, shared_ancestor } => {
852                                // if ancestor is newly shared, delete share, otherwise delete link
853                                let mut progress = false;
854                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
855                                {
856                                    if !base_shared_ancestor.is_shared()
857                                        && files_to_unshare.insert(*shared_ancestor)
858                                    {
859                                        progress = true;
860                                    }
861                                }
862                                if !progress && links_to_delete.insert(*link) {
863                                    progress = true;
864                                }
865                                if !progress {
866                                    return Err(LbErrKind::Unexpected(format!(
867                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
868                                )).into());
869                                }
870                            }
871                            ValidationFailure::DuplicateLink { target } => {
872                                // delete local link with this target
873                                let mut progress = false;
874                                if let Some(link) = local.linked_by(target)? {
875                                    if links_to_delete.insert(link) {
876                                        progress = true;
877                                    }
878                                }
879                                if !progress {
880                                    return Err(LbErrKind::Unexpected(format!(
881                                        "sync failed to resolve duplicate link: target: {target:?}"
882                                    ))
883                                    .into());
884                                }
885                            }
886                            ValidationFailure::BrokenLink(link) => {
887                                // delete local link with this target
888                                if !links_to_delete.insert(*link) {
889                                    return Err(LbErrKind::Unexpected(format!(
890                                        "sync failed to resolve broken link: {link:?}"
891                                    ))
892                                    .into());
893                                }
894                            }
895                            ValidationFailure::OwnedLink(link) => {
896                                // if target is newly owned, unmove target, otherwise delete link
897                                let mut progress = false;
898                                if let Some(remote_link) = remote.maybe_find(link) {
899                                    if let FileType::Link { target } = remote_link.file_type() {
900                                        let remote_target = remote.find(&target)?;
901                                        if remote_target.owner() != me
902                                            && files_to_unmove.insert(target)
903                                        {
904                                            progress = true;
905                                        }
906                                    }
907                                }
908                                if !progress && links_to_delete.insert(*link) {
909                                    progress = true;
910                                }
911                                if !progress {
912                                    return Err(LbErrKind::Unexpected(format!(
913                                        "sync failed to resolve owned link: {link:?}"
914                                    ))
915                                    .into());
916                                }
917                            }
918                            // merge changeset has unexpected validation errors
919                            ValidationFailure::Orphan(_)
920                            | ValidationFailure::NonFolderWithChildren(_)
921                            | ValidationFailure::FileWithDifferentOwnerParent(_)
922                            | ValidationFailure::FileNameTooLong(_)
923                            | ValidationFailure::DeletedFileUpdated(_)
924                            | ValidationFailure::NonDecryptableFileName(_) => {
925                                validate_result?;
926                            }
927                        },
928                        // merge changeset has unexpected errors
929                        _ => {
930                            validate_result?;
931                        }
932                    },
933                }
934            }
935        };
936
937        // base = remote; local = merge
938        (&mut db.base_metadata)
939            .to_staged(remote_changes.clone())
940            .to_lazy()
941            .promote()?;
942        db.local_metadata.clear()?;
943        (&mut db.local_metadata)
944            .to_staged(merge_changes)
945            .to_lazy()
946            .promote()?;
947
948        // todo who else calls this did they manage locks right?
949        // self.cleanup_local_metadata()?;
950        db.base_metadata.stage(&mut db.local_metadata).prune()?;
951
952        if start.elapsed() > std::time::Duration::from_millis(100) {
953            warn!("sync merge held lock for {:?}", start.elapsed());
954        }
955
956        Ok(())
957    }
958
959    /// Updates remote and base metadata to local.
960    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
961        ctx.msg("Pushing tree changes...");
962        let mut updates = vec![];
963        let mut local_changes_no_digests = Vec::new();
964
965        let tx = self.ro_tx().await;
966        let db = tx.db();
967
968        // remote = local
969        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
970
971        for id in local.tree.staged.ids() {
972            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
973            let maybe_base_file = local.tree.base.maybe_find(&id);
974
975            // change everything but document hmac and re-sign
976            local_change.set_hmac_and_size(
977                maybe_base_file.and_then(|f| f.document_hmac().copied()),
978                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
979            );
980            let local_change = local_change.sign(&self.keychain)?;
981
982            local_changes_no_digests.push(local_change.clone());
983            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
984            updates.push(file_diff);
985        }
986
987        drop(tx);
988
989        if !updates.is_empty() {
990            self.client
991                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
992                .await?;
993            ctx.pushed_metas = updates;
994        }
995
996        let mut tx = self.begin_tx().await;
997        let db = tx.db();
998
999        // base = local
1000        (&mut db.base_metadata)
1001            .to_lazy()
1002            .stage(local_changes_no_digests)
1003            .promote()?;
1004        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1005
1006        tx.end();
1007
1008        Ok(())
1009    }
1010
1011    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1012    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1013        ctx.msg("Pushing document changes...");
1014        let mut updates = vec![];
1015        let mut local_changes_digests_only = vec![];
1016
1017        let tx = self.ro_tx().await;
1018        let db = tx.db();
1019        let start = Instant::now();
1020
1021        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1022
1023        for id in local.tree.staged.ids() {
1024            let base_file = local.tree.base.find(&id)?.clone();
1025
1026            // change only document hmac and re-sign
1027            let mut local_change = base_file.timestamped_value.value.clone();
1028            local_change.set_hmac_and_size(
1029                local.find(&id)?.document_hmac().copied(),
1030                *local.find(&id)?.timestamped_value.value.doc_size(),
1031            );
1032
1033            if base_file.document_hmac() == local_change.document_hmac()
1034                || local_change.document_hmac().is_none()
1035            {
1036                continue;
1037            }
1038
1039            let local_change = local_change.sign(&self.keychain)?;
1040
1041            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1042            local_changes_digests_only.push(local_change);
1043            self.events.sync(SyncIncrement::PushingDocument(id, true));
1044        }
1045
1046        drop(tx);
1047        if start.elapsed() > std::time::Duration::from_millis(100) {
1048            warn!("sync push_docs held lock for {:?}", start.elapsed());
1049        }
1050
1051        let docs_count = updates.len();
1052        ctx.total += docs_count;
1053        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1054
1055        let mut stream = stream::iter(futures).buffer_unordered(
1056            thread::available_parallelism()
1057                .unwrap_or(NonZeroUsize::new(4).unwrap())
1058                .into(),
1059        );
1060
1061        let mut idx = 0;
1062        while let Some(fut) = stream.next().await {
1063            let id = fut?;
1064            self.events.sync(SyncIncrement::PushingDocument(id, false));
1065            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1066            idx += 1;
1067        }
1068        ctx.pushed_docs = updates;
1069
1070        let mut tx = self.begin_tx().await;
1071        let db = tx.db();
1072        // base = local (metadata)
1073        (&mut db.base_metadata)
1074            .to_lazy()
1075            .stage(local_changes_digests_only)
1076            .promote()?;
1077
1078        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1079
1080        tx.end();
1081
1082        Ok(())
1083    }
1084
1085    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1086        let id = *diff.new.id();
1087        let hmac = diff.new.document_hmac();
1088        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1089        self.client
1090            .request(
1091                self.get_account()?,
1092                ChangeDocRequestV2 { diff, new_content: local_document_change },
1093            )
1094            .await?;
1095
1096        Ok(id)
1097    }
1098
1099    async fn dedup(
1100        &self, updates: GetUpdatesResponseV2,
1101    ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1102        let tx = self.ro_tx().await;
1103        let db = tx.db();
1104
1105        let mut root_id = None;
1106        let (remote_changes, update_as_of) = {
1107            let mut remote_changes = updates.file_metadata;
1108            let update_as_of = updates.as_of_metadata_version;
1109
1110            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1111
1112            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1113
1114            let (_, remote_changes) = remote.unstage();
1115            (remote_changes, update_as_of)
1116        };
1117
1118        // initialize root if this is the first pull on this device
1119        if db.root.get().is_none() {
1120            let root = remote_changes
1121                .all_files()?
1122                .into_iter()
1123                .find(|f| f.is_root())
1124                .ok_or(LbErrKind::RootNonexistent)?;
1125            root_id = Some(*root.id());
1126        }
1127
1128        Ok((remote_changes, update_as_of, root_id))
1129    }
1130
1131    async fn prune_remote_orphans(
1132        &self, remote_changes: Vec<SignedMeta>,
1133    ) -> LbResult<Vec<SignedMeta>> {
1134        let tx = self.ro_tx().await;
1135        let db = tx.db();
1136
1137        let me = Owner(self.keychain.get_pk()?);
1138        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1139        let mut result = Vec::new();
1140
1141        for id in remote.tree.staged.ids() {
1142            let meta = remote.find(&id)?;
1143            if remote.maybe_find_parent(meta).is_some()
1144                || meta
1145                    .user_access_keys()
1146                    .iter()
1147                    .any(|k| k.encrypted_for == me.0)
1148            {
1149                result.push(remote.find(&id)?.clone()); // todo: don't clone
1150            }
1151        }
1152        Ok(result)
1153    }
1154
1155    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1156        ctx.msg("Cleaning up...");
1157        let mut tx = self.begin_tx().await;
1158        let db = tx.db();
1159        db.last_synced.insert(ctx.update_as_of as i64)?;
1160
1161        if let Some(root) = ctx.new_root {
1162            db.root.insert(root)?;
1163        }
1164
1165        Ok(())
1166    }
1167
1168    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1169        let tx = self.ro_tx().await;
1170        let db = tx.db();
1171        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1172
1173        Ok(self.get_timestamp_human_string(last_synced))
1174    }
1175
1176    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1177        if timestamp != 0 {
1178            Duration::milliseconds(clock::get_time().0 - timestamp)
1179                .format_human()
1180                .to_string()
1181        } else {
1182            "never".to_string()
1183        }
1184    }
1185}
1186
1187impl SyncContext {
1188    fn summarize(&self) -> SyncStatus {
1189        let mut local = HashSet::new();
1190        let mut server = HashSet::new();
1191        let mut work_units = vec![];
1192
1193        for meta in &self.pushed_metas {
1194            local.insert(meta.new.id());
1195        }
1196
1197        for meta in &self.pushed_docs {
1198            local.insert(meta.new.id());
1199        }
1200
1201        for meta in &self.remote_changes {
1202            server.insert(meta.id());
1203        }
1204
1205        for id in local {
1206            work_units.push(WorkUnit::LocalChange(*id));
1207        }
1208
1209        for id in server {
1210            work_units.push(WorkUnit::ServerChange(*id));
1211        }
1212
1213        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1214    }
1215
1216    fn msg(&mut self, msg: &str) {
1217        self.current += 1;
1218        if let Some(f) = &self.progress {
1219            f(SyncProgress {
1220                total: self.total,
1221                progress: self.current,
1222                file_being_processed: Default::default(),
1223                msg: msg.to_string(),
1224            })
1225        }
1226    }
1227
1228    fn file_msg(&mut self, id: Uuid, msg: &str) {
1229        self.current += 1;
1230        if let Some(f) = &self.progress {
1231            f(SyncProgress {
1232                total: self.total,
1233                progress: self.current,
1234                file_being_processed: Some(id),
1235                msg: msg.to_string(),
1236            })
1237        }
1238    }
1239
1240    fn done_msg(&mut self) {
1241        self.current = self.total;
1242        if let Some(f) = &self.progress {
1243            f(SyncProgress {
1244                total: self.total,
1245                progress: self.current,
1246                file_being_processed: None,
1247                msg: "Sync successful!".to_string(),
1248            })
1249        }
1250    }
1251}
1252
1253#[derive(Debug, Serialize, Clone)]
1254pub struct SyncStatus {
1255    pub work_units: Vec<WorkUnit>,
1256    pub latest_server_ts: u64,
1257}
1258
1259#[derive(Clone)]
1260pub struct SyncProgress {
1261    pub total: usize,
1262    pub progress: usize,
1263    pub file_being_processed: Option<Uuid>,
1264    pub msg: String,
1265}
1266
1267impl Display for SyncProgress {
1268    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1269        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1270    }
1271}
1272
1273#[derive(Debug, Clone)]
1274pub enum SyncIncrement {
1275    SyncStarted,
1276    PullingDocument(Uuid, bool),
1277    PushingDocument(Uuid, bool),
1278    SyncFinished(Option<LbErrKind>),
1279}