lb_rs/service/
sync.rs

1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4    ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5    GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use usvg::Transform;
35use uuid::Uuid;
36
37use super::events::Actor;
38
39pub type SyncFlag = Arc<AtomicBool>;
40
41pub struct SyncContext {
42    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
43    current: usize,
44    total: usize,
45
46    pk_cache: HashMap<Owner, String>,
47    last_synced: u64,
48    remote_changes: Vec<SignedFile>,
49    update_as_of: u64,
50    root: Option<Uuid>,
51    pushed_metas: Vec<FileDiff<SignedFile>>,
52    pushed_docs: Vec<FileDiff<SignedFile>>,
53    pulled_docs: Vec<Uuid>,
54}
55
56impl Lb {
57    #[instrument(level = "debug", skip_all, err(Debug))]
58    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
59        let tx = self.ro_tx().await;
60        let db = tx.db();
61        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
62        drop(tx);
63
64        let remote_changes = self
65            .client
66            .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
67            .await?;
68        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
69        let remote_dirty = deduped
70            .into_iter()
71            .map(|f| *f.id())
72            .map(WorkUnit::ServerChange);
73
74        self.prune().await?;
75
76        let tx = self.ro_tx().await;
77        let db = tx.db();
78
79        let locally_dirty = db
80            .local_metadata
81            .get()
82            .keys()
83            .copied()
84            .map(WorkUnit::LocalChange);
85
86        let mut work_units: Vec<WorkUnit> = Vec::new();
87        work_units.extend(locally_dirty.chain(remote_dirty));
88        Ok(SyncStatus { work_units, latest_server_ts })
89    }
90
91    #[instrument(level = "debug", skip_all, err(Debug))]
92    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
93        let old = self.syncing.swap(true, Ordering::SeqCst);
94        if old {
95            return Err(LbErrKind::AlreadySyncing.into());
96        }
97
98        let mut ctx = self.setup_sync(f).await?;
99
100        let mut got_updates = false;
101        let mut pipeline: LbResult<()> = async {
102            ctx.msg("Preparing Sync..."); // todo remove
103            self.events.sync(SyncIncrement::SyncStarted);
104            self.prune().await?;
105            got_updates = self.fetch_meta(&mut ctx).await?;
106            self.populate_pk_cache(&mut ctx).await?;
107            self.docs.dont_delete.store(true, Ordering::SeqCst);
108            self.fetch_docs(&mut ctx).await?;
109            self.merge(&mut ctx).await?;
110            self.push_meta(&mut ctx).await?;
111            self.push_docs(&mut ctx).await?;
112            Ok(())
113        }
114        .await;
115
116        self.docs.dont_delete.store(false, Ordering::SeqCst);
117
118        if pipeline.is_ok() {
119            pipeline = self.commit_last_synced(&mut ctx).await;
120        }
121
122        let cleanup = self.cleanup().await;
123
124        let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
125        self.events.sync(SyncIncrement::SyncFinished(ekind));
126
127        self.syncing.store(false, Ordering::Relaxed);
128        pipeline?;
129        cleanup?;
130
131        // done not being sent if pipeline is an error is likely the reason we get stuck offline
132        ctx.done_msg();
133
134        if got_updates {
135            // did it?
136            self.events.meta_changed();
137            for id in &ctx.pulled_docs {
138                self.events.doc_written(*id, Some(Actor::Sync));
139            }
140        }
141
142        Ok(ctx.summarize())
143    }
144
145    async fn setup_sync(
146        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
147    ) -> LbResult<SyncContext> {
148        let tx = self.ro_tx().await;
149        let db = tx.db();
150
151        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
152        let pk_cache = db.pub_key_lookup.get().clone();
153
154        let current = 0;
155        let total = 7;
156
157        Ok(SyncContext {
158            last_synced,
159            pk_cache,
160
161            progress,
162            current,
163            total,
164
165            root: Default::default(),
166            update_as_of: Default::default(),
167            remote_changes: Default::default(),
168            pushed_docs: Default::default(),
169            pushed_metas: Default::default(),
170            pulled_docs: Default::default(),
171        })
172    }
173
174    async fn prune(&self) -> LbResult<()> {
175        let server_ids = self
176            .client
177            .request(self.get_account()?, GetFileIdsRequest {})
178            .await?
179            .ids;
180
181        let mut tx = self.begin_tx().await;
182        let db = tx.db();
183
184        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
185        let base_ids = local.tree.base.ids();
186
187        let mut prunable_ids = base_ids;
188        prunable_ids.retain(|id| !server_ids.contains(id));
189        for id in prunable_ids.clone() {
190            prunable_ids.extend(local.descendants(&id)?.into_iter());
191        }
192        for id in &prunable_ids {
193            if let Some(base_file) = local.tree.base.maybe_find(id) {
194                self.docs
195                    .delete(*id, base_file.document_hmac().copied())
196                    .await?;
197            }
198            if let Some(local_file) = local.maybe_find(id) {
199                self.docs
200                    .delete(*id, local_file.document_hmac().copied())
201                    .await?;
202            }
203        }
204
205        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
206        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
207        base_staged.promote()?;
208
209        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
210        local_staged.tree.removed = prunable_ids.into_iter().collect();
211        local_staged.promote()?;
212
213        Ok(())
214    }
215
216    /// Returns true if there were any updates
217    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
218        ctx.msg("Fetching tree updates...");
219        let updates = self
220            .client
221            .request(
222                self.get_account()?,
223                GetUpdatesRequest { since_metadata_version: ctx.last_synced },
224            )
225            .await?;
226
227        let empty = updates.file_metadata.is_empty();
228        let (remote, as_of, root) = self.dedup(updates).await?;
229
230        ctx.remote_changes = remote;
231        ctx.update_as_of = as_of;
232        ctx.root = root;
233
234        Ok(!empty)
235    }
236
237    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
238        ctx.msg("Updating public key cache...");
239        let mut all_owners = HashSet::new();
240        for file in &ctx.remote_changes {
241            for user_access_key in file.user_access_keys() {
242                all_owners.insert(Owner(user_access_key.encrypted_by));
243                all_owners.insert(Owner(user_access_key.encrypted_for));
244            }
245        }
246
247        let mut new_entries = HashMap::new();
248
249        for owner in all_owners {
250            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
251                let username_result = self
252                    .client
253                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
254                    .await;
255                let username = match username_result {
256                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
257                        "<unknown>".to_string()
258                    }
259                    _ => username_result?.username.clone(),
260                };
261                new_entries.insert(owner, username.clone());
262                e.insert(username.clone());
263            }
264        }
265
266        let mut tx = self.begin_tx().await;
267        let db = tx.db();
268
269        for (owner, username) in new_entries {
270            db.pub_key_lookup.insert(owner, username)?;
271        }
272        Ok(())
273    }
274
275    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
276        ctx.msg("Fetching documents...");
277        let mut docs_to_pull = vec![];
278
279        let tx = self.ro_tx().await;
280        let db = tx.db();
281        let start = Instant::now();
282
283        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
284        for id in remote.tree.staged.ids() {
285            if remote.calculate_deleted(&id)? {
286                continue;
287            }
288            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
289            let base_hmac = remote
290                .tree
291                .base
292                .maybe_find(&id)
293                .and_then(|f| f.document_hmac())
294                .cloned();
295            if base_hmac == remote_hmac {
296                continue;
297            }
298
299            if let Some(remote_hmac) = remote_hmac {
300                docs_to_pull.push((id, remote_hmac));
301                self.events.sync(SyncIncrement::PullingDocument(id, true));
302            }
303        }
304
305        drop(tx);
306        if start.elapsed() > std::time::Duration::from_millis(100) {
307            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
308        }
309
310        let num_docs = docs_to_pull.len();
311        ctx.total += num_docs;
312
313        let futures = docs_to_pull
314            .into_iter()
315            .map(|(id, hmac)| self.fetch_doc(id, hmac));
316
317        let mut stream = stream::iter(futures).buffer_unordered(
318            thread::available_parallelism()
319                .unwrap_or(NonZeroUsize::new(4).unwrap())
320                .into(),
321        );
322
323        let mut idx = 0;
324        while let Some(fut) = stream.next().await {
325            let id = fut?;
326            ctx.pulled_docs.push(id);
327            self.events.sync(SyncIncrement::PullingDocument(id, false));
328            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
329            idx += 1;
330        }
331        Ok(())
332    }
333
334    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
335        let remote_document = self
336            .client
337            .request(self.get_account()?, GetDocRequest { id, hmac })
338            .await?;
339        self.docs
340            .insert(id, Some(hmac), &remote_document.content)
341            .await?;
342
343        Ok(id)
344    }
345
346    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
347    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
348    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
349        let mut tx = self.begin_tx().await;
350        let db = tx.db();
351        let start = Instant::now();
352
353        let remote_changes = &ctx.remote_changes;
354
355        // fetch document updates and local documents for merge
356        let me = Owner(self.keychain.get_pk()?);
357
358        // compute merge changes
359        let merge_changes = {
360            // assemble trees
361            let mut base = (&db.base_metadata).to_lazy();
362            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
363            let mut remote = remote_unlazy.as_lazy();
364            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
365
366            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
367            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
368            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
369            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
370            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
371            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
372
373            'merge_construction: loop {
374                // process just the edits which allow us to check deletions in the result
375                let mut deletions = {
376                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
377
378                    // creations
379                    let mut deletion_creations = HashSet::new();
380                    for id in db.local_metadata.ids() {
381                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
382                            deletion_creations.insert(id);
383                        }
384                    }
385                    'drain_creations: while !deletion_creations.is_empty() {
386                        'choose_a_creation: for id in &deletion_creations {
387                            // create
388                            let id = *id;
389                            let local_file = local.find(&id)?.clone();
390                            let result = deletions.create_unvalidated(
391                                id,
392                                symkey::generate_key(),
393                                local_file.parent(),
394                                &local.name(&id, &self.keychain)?,
395                                local_file.file_type(),
396                                &self.keychain,
397                            );
398                            match result {
399                                Ok(_) => {
400                                    deletion_creations.remove(&id);
401                                    continue 'drain_creations;
402                                }
403                                Err(ref err) => match err.kind {
404                                    LbErrKind::FileParentNonexistent => {
405                                        continue 'choose_a_creation;
406                                    }
407                                    _ => {
408                                        result?;
409                                    }
410                                },
411                            }
412                        }
413                        return Err(LbErrKind::Unexpected(format!(
414                            "sync failed to find a topomodelal order for file creations: {:?}",
415                            deletion_creations
416                        ))
417                        .into());
418                    }
419
420                    // moves (creations happen first in case a file is moved into a new folder)
421                    for id in db.local_metadata.ids() {
422                        let local_file = local.find(&id)?.clone();
423                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
424                            if !local_file.explicitly_deleted()
425                                && local_file.parent() != base_file.parent()
426                                && !files_to_unmove.contains(&id)
427                            {
428                                // move
429                                deletions.move_unvalidated(
430                                    &id,
431                                    local_file.parent(),
432                                    &self.keychain,
433                                )?;
434                            }
435                        }
436                    }
437
438                    // deletions (moves happen first in case a file is moved into a deleted folder)
439                    for id in db.local_metadata.ids() {
440                        let local_file = local.find(&id)?.clone();
441                        if local_file.explicitly_deleted() {
442                            // delete
443                            deletions.delete_unvalidated(&id, &self.keychain)?;
444                        }
445                    }
446                    deletions
447                };
448
449                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
450                let mut merge = {
451                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
452
453                    // creations and edits of created documents
454                    let mut creations = HashSet::new();
455                    for id in db.local_metadata.ids() {
456                        if deletions.maybe_find(&id).is_some()
457                            && !deletions.calculate_deleted(&id)?
458                            && remote.maybe_find(&id).is_none()
459                            && !links_to_delete.contains(&id)
460                        {
461                            creations.insert(id);
462                        }
463                    }
464                    'drain_creations: while !creations.is_empty() {
465                        'choose_a_creation: for id in &creations {
466                            // create
467                            let id = *id;
468                            let local_file = local.find(&id)?.clone();
469                            let result = merge.create_unvalidated(
470                                id,
471                                local.decrypt_key(&id, &self.keychain)?,
472                                local_file.parent(),
473                                &local.name(&id, &self.keychain)?,
474                                local_file.file_type(),
475                                &self.keychain,
476                            );
477                            match result {
478                                Ok(_) => {
479                                    creations.remove(&id);
480                                    continue 'drain_creations;
481                                }
482                                Err(ref err) => match err.kind {
483                                    LbErrKind::FileParentNonexistent => {
484                                        continue 'choose_a_creation;
485                                    }
486                                    _ => {
487                                        result?;
488                                    }
489                                },
490                            }
491                        }
492                        return Err(LbErrKind::Unexpected(format!(
493                            "sync failed to find a topomodelal order for file creations: {:?}",
494                            creations
495                        ))
496                        .into());
497                    }
498
499                    // moves, renames, edits, and shares
500                    // creations happen first in case a file is moved into a new folder
501                    for id in db.local_metadata.ids() {
502                        // skip files that are already deleted or will be deleted
503                        if deletions.maybe_find(&id).is_none()
504                            || deletions.calculate_deleted(&id)?
505                            || (remote.maybe_find(&id).is_some()
506                                && remote.calculate_deleted(&id)?)
507                        {
508                            continue;
509                        }
510
511                        let local_file = local.find(&id)?.clone();
512                        let local_name = local.name(&id, &self.keychain)?;
513                        let maybe_base_file = base.maybe_find(&id).cloned();
514                        let maybe_remote_file = remote.maybe_find(&id).cloned();
515                        if let Some(ref base_file) = maybe_base_file {
516                            let base_name = base.name(&id, &self.keychain)?;
517                            let remote_file = remote.find(&id)?.clone();
518                            let remote_name = remote.name(&id, &self.keychain)?;
519
520                            // move
521                            if local_file.parent() != base_file.parent()
522                                && remote_file.parent() == base_file.parent()
523                                && !files_to_unmove.contains(&id)
524                            {
525                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
526                            }
527
528                            // rename
529                            if local_name != base_name && remote_name == base_name {
530                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
531                            }
532                        }
533
534                        // share
535                        let mut remote_keys = HashMap::new();
536                        if let Some(ref remote_file) = maybe_remote_file {
537                            for key in remote_file.user_access_keys() {
538                                remote_keys.insert(
539                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
540                                    (key.mode, key.deleted),
541                                );
542                            }
543                        }
544                        for key in local_file.user_access_keys() {
545                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
546                            if let Some(&(remote_mode, remote_deleted)) =
547                                remote_keys.get(&(by, for_))
548                            {
549                                // upgrade share
550                                if key.mode > remote_mode || !key.deleted && remote_deleted {
551                                    let mode = match key.mode {
552                                        UserAccessMode::Read => ShareMode::Read,
553                                        UserAccessMode::Write => ShareMode::Write,
554                                        UserAccessMode::Owner => continue,
555                                    };
556                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
557                                }
558                                // delete share
559                                if key.deleted && !remote_deleted {
560                                    merge.delete_share_unvalidated(
561                                        &id,
562                                        Some(for_.0),
563                                        &self.keychain,
564                                    )?;
565                                }
566                            } else {
567                                // add share
568                                let mode = match key.mode {
569                                    UserAccessMode::Read => ShareMode::Read,
570                                    UserAccessMode::Write => ShareMode::Write,
571                                    UserAccessMode::Owner => continue,
572                                };
573                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
574                            }
575                        }
576
577                        // share deletion due to conflicts
578                        if files_to_unshare.contains(&id) {
579                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
580                        }
581
582                        // rename due to path conflict
583                        if let Some(&rename_increment) = rename_increments.get(&id) {
584                            let name = NameComponents::from(&local_name)
585                                .generate_incremented(rename_increment)
586                                .to_name();
587                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
588                        }
589
590                        // edit
591                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
592                        let remote_hmac =
593                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
594                        let local_hmac = local_file.document_hmac().cloned();
595                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
596                            && local_hmac != base_hmac
597                        {
598                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
599                                // merge
600                                let merge_name = merge.name(&id, &self.keychain)?;
601                                let document_type =
602                                    DocumentType::from_file_name_using_extension(&merge_name);
603
604                                // todo these accesses are potentially problematic
605                                // maybe not if service/docs is the persion doing network io
606                                let base_document =
607                                    self.read_document_helper(id, &mut base).await?;
608                                let remote_document =
609                                    self.read_document_helper(id, &mut remote).await?;
610                                let local_document =
611                                    self.read_document_helper(id, &mut local).await?;
612
613                                match document_type {
614                                    DocumentType::Text => {
615                                        // 3-way merge
616                                        // todo: a couple more clones than necessary
617                                        let base_document =
618                                            String::from_utf8_lossy(&base_document).to_string();
619                                        let remote_document =
620                                            String::from_utf8_lossy(&remote_document).to_string();
621                                        let local_document =
622                                            String::from_utf8_lossy(&local_document).to_string();
623                                        let merged_document = Buffer::from(base_document.as_str())
624                                            .merge(local_document, remote_document);
625                                        let encrypted_document = merge
626                                            .update_document_unvalidated(
627                                                &id,
628                                                &merged_document.into_bytes(),
629                                                &self.keychain,
630                                            )?;
631                                        let hmac = merge.find(&id)?.document_hmac().copied();
632                                        self.docs.insert(id, hmac, &encrypted_document).await?;
633                                    }
634                                    DocumentType::Drawing => {
635                                        let base_document =
636                                            String::from_utf8_lossy(&base_document).to_string();
637                                        let remote_document =
638                                            String::from_utf8_lossy(&remote_document).to_string();
639                                        let local_document =
640                                            String::from_utf8_lossy(&local_document).to_string();
641
642                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
643                                        let remote_buffer =
644                                            svg::buffer::Buffer::new(&remote_document);
645                                        let mut local_buffer =
646                                            svg::buffer::Buffer::new(&local_document);
647
648                                        for (_, el) in local_buffer.elements.iter_mut() {
649                                            if let Element::Path(path) = el {
650                                                path.data.apply_transform(u_transform_to_bezier(
651                                                    &Transform::from(
652                                                        local_buffer
653                                                            .weak_viewport_settings
654                                                            .master_transform,
655                                                    ),
656                                                ));
657                                            }
658                                        }
659                                        svg::buffer::Buffer::reload(
660                                            &mut local_buffer.elements,
661                                            &mut local_buffer.weak_images,
662                                            &mut local_buffer.weak_path_pressures,
663                                            &mut local_buffer.weak_viewport_settings,
664                                            &base_buffer,
665                                            &remote_buffer,
666                                        );
667
668                                        let merged_document = local_buffer.serialize();
669                                        let encrypted_document = merge
670                                            .update_document_unvalidated(
671                                                &id,
672                                                &merged_document.into_bytes(),
673                                                &self.keychain,
674                                            )?;
675                                        let hmac = merge.find(&id)?.document_hmac().copied();
676                                        self.docs.insert(id, hmac, &encrypted_document).await?;
677                                    }
678                                    DocumentType::Other => {
679                                        // duplicate file
680                                        let merge_parent = *merge.find(&id)?.parent();
681                                        let duplicate_id = if let Some(&duplicate_id) =
682                                            duplicate_file_ids.get(&id)
683                                        {
684                                            duplicate_id
685                                        } else {
686                                            let duplicate_id = Uuid::new_v4();
687                                            duplicate_file_ids.insert(id, duplicate_id);
688                                            rename_increments.insert(duplicate_id, 1);
689                                            duplicate_id
690                                        };
691
692                                        let mut merge_name = merge_name;
693                                        merge_name = NameComponents::from(&merge_name)
694                                            .generate_incremented(
695                                                rename_increments
696                                                    .get(&duplicate_id)
697                                                    .copied()
698                                                    .unwrap_or_default(),
699                                            )
700                                            .to_name();
701
702                                        merge.create_unvalidated(
703                                            duplicate_id,
704                                            symkey::generate_key(),
705                                            &merge_parent,
706                                            &merge_name,
707                                            FileType::Document,
708                                            &self.keychain,
709                                        )?;
710                                        let encrypted_document = merge
711                                            .update_document_unvalidated(
712                                                &duplicate_id,
713                                                &local_document,
714                                                &self.keychain,
715                                            )?;
716                                        let duplicate_hmac =
717                                            merge.find(&duplicate_id)?.document_hmac().copied();
718                                        self.docs
719                                            .insert(
720                                                duplicate_id,
721                                                duplicate_hmac,
722                                                &encrypted_document,
723                                            )
724                                            .await?;
725                                    }
726                                }
727                            } else {
728                                // overwrite (todo: avoid reading/decrypting/encrypting document)
729                                let document = self.read_document_helper(id, &mut local).await?;
730                                merge.update_document_unvalidated(
731                                    &id,
732                                    &document,
733                                    &self.keychain,
734                                )?;
735                            }
736                        }
737                    }
738
739                    // deletes
740                    // moves happen first in case a file is moved into a deleted folder
741                    for id in db.local_metadata.ids() {
742                        if db.base_metadata.maybe_find(&id).is_some()
743                            && deletions.calculate_deleted(&id)?
744                            && !merge.calculate_deleted(&id)?
745                        {
746                            // delete
747                            merge.delete_unvalidated(&id, &self.keychain)?;
748                        }
749                    }
750                    for &id in &links_to_delete {
751                        // delete
752                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
753                            merge.delete_unvalidated(&id, &self.keychain)?;
754                        }
755                    }
756
757                    merge
758                };
759
760                // validate; handle failures by introducing changeset constraints
761                for link in merge.ids() {
762                    if !merge.calculate_deleted(&link)? {
763                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
764                            if merge.maybe_find(&target).is_some()
765                                && merge.calculate_deleted(&target)?
766                            {
767                                // delete links to deleted files
768                                if links_to_delete.insert(link) {
769                                    continue 'merge_construction;
770                                } else {
771                                    return Err(LbErrKind::Unexpected(format!(
772                                        "sync failed to resolve broken link (deletion): {:?}",
773                                        link
774                                    ))
775                                    .into());
776                                }
777                            }
778                        }
779                    }
780                }
781
782                let validate_result = merge.validate(me);
783                match validate_result {
784                    // merge changeset is valid
785                    Ok(_) => {
786                        let (_, merge_changes) = merge.unstage();
787                        break merge_changes;
788                    }
789                    Err(ref err) => match err.kind {
790                        LbErrKind::Validation(ref vf) => match vf {
791                            // merge changeset has resolvable validation errors and needs modification
792                            ValidationFailure::Cycle(ids) => {
793                                // revert all local moves in the cycle
794                                let mut progress = false;
795                                for &id in ids {
796                                    if db.local_metadata.maybe_find(&id).is_some()
797                                        && files_to_unmove.insert(id)
798                                    {
799                                        progress = true;
800                                    }
801                                }
802                                if !progress {
803                                    return Err(LbErrKind::Unexpected(format!(
804                                        "sync failed to resolve cycle: {:?}",
805                                        ids
806                                    ))
807                                    .into());
808                                }
809                            }
810                            ValidationFailure::PathConflict(ids) => {
811                                // pick one local id and generate a non-conflicting filename
812                                let mut progress = false;
813                                for &id in ids {
814                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
815                                        *rename_increments.entry(id).or_insert(0) += 1;
816                                        progress = true;
817                                        break;
818                                    }
819                                }
820                                if !progress {
821                                    for &id in ids {
822                                        if db.local_metadata.maybe_find(&id).is_some() {
823                                            *rename_increments.entry(id).or_insert(0) += 1;
824                                            progress = true;
825                                            break;
826                                        }
827                                    }
828                                }
829                                if !progress {
830                                    return Err(LbErrKind::Unexpected(format!(
831                                        "sync failed to resolve path conflict: {:?}",
832                                        ids
833                                    ))
834                                    .into());
835                                }
836                            }
837                            ValidationFailure::SharedLink { link, shared_ancestor } => {
838                                // if ancestor is newly shared, delete share, otherwise delete link
839                                let mut progress = false;
840                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
841                                {
842                                    if !base_shared_ancestor.is_shared()
843                                        && files_to_unshare.insert(*shared_ancestor)
844                                    {
845                                        progress = true;
846                                    }
847                                }
848                                if !progress && links_to_delete.insert(*link) {
849                                    progress = true;
850                                }
851                                if !progress {
852                                    return Err(LbErrKind::Unexpected(format!(
853                                    "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
854                                    link, shared_ancestor
855                                )).into());
856                                }
857                            }
858                            ValidationFailure::DuplicateLink { target } => {
859                                // delete local link with this target
860                                let mut progress = false;
861                                if let Some(link) = local.linked_by(target)? {
862                                    if links_to_delete.insert(link) {
863                                        progress = true;
864                                    }
865                                }
866                                if !progress {
867                                    return Err(LbErrKind::Unexpected(format!(
868                                        "sync failed to resolve duplicate link: target: {:?}",
869                                        target
870                                    ))
871                                    .into());
872                                }
873                            }
874                            ValidationFailure::BrokenLink(link) => {
875                                // delete local link with this target
876                                if !links_to_delete.insert(*link) {
877                                    return Err(LbErrKind::Unexpected(format!(
878                                        "sync failed to resolve broken link: {:?}",
879                                        link
880                                    ))
881                                    .into());
882                                }
883                            }
884                            ValidationFailure::OwnedLink(link) => {
885                                // if target is newly owned, unmove target, otherwise delete link
886                                let mut progress = false;
887                                if let Some(remote_link) = remote.maybe_find(link) {
888                                    if let FileType::Link { target } = remote_link.file_type() {
889                                        let remote_target = remote.find(&target)?;
890                                        if remote_target.owner() != me
891                                            && files_to_unmove.insert(target)
892                                        {
893                                            progress = true;
894                                        }
895                                    }
896                                }
897                                if !progress && links_to_delete.insert(*link) {
898                                    progress = true;
899                                }
900                                if !progress {
901                                    return Err(LbErrKind::Unexpected(format!(
902                                        "sync failed to resolve owned link: {:?}",
903                                        link
904                                    ))
905                                    .into());
906                                }
907                            }
908                            // merge changeset has unexpected validation errors
909                            ValidationFailure::Orphan(_)
910                            | ValidationFailure::NonFolderWithChildren(_)
911                            | ValidationFailure::FileWithDifferentOwnerParent(_)
912                            | ValidationFailure::FileNameTooLong(_)
913                            | ValidationFailure::DeletedFileUpdated(_)
914                            | ValidationFailure::NonDecryptableFileName(_) => {
915                                validate_result?;
916                            }
917                        },
918                        // merge changeset has unexpected errors
919                        _ => {
920                            validate_result?;
921                        }
922                    },
923                }
924            }
925        };
926
927        // base = remote; local = merge
928        (&mut db.base_metadata)
929            .to_staged(remote_changes.clone())
930            .to_lazy()
931            .promote()?;
932        db.local_metadata.clear()?;
933        (&mut db.local_metadata)
934            .to_staged(merge_changes)
935            .to_lazy()
936            .promote()?;
937
938        // todo who else calls this did they manage locks right?
939        // self.cleanup_local_metadata()?;
940        db.base_metadata.stage(&mut db.local_metadata).prune()?;
941
942        if start.elapsed() > std::time::Duration::from_millis(100) {
943            warn!("sync merge held lock for {:?}", start.elapsed());
944        }
945
946        Ok(())
947    }
948
949    /// Updates remote and base metadata to local.
950    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
951        ctx.msg("Pushing tree changes...");
952        let mut updates = vec![];
953        let mut local_changes_no_digests = Vec::new();
954
955        let tx = self.ro_tx().await;
956        let db = tx.db();
957
958        // remote = local
959        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
960
961        for id in local.tree.staged.ids() {
962            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
963            let maybe_base_file = local.tree.base.maybe_find(&id);
964
965            // change everything but document hmac and re-sign
966            local_change.document_hmac =
967                maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
968            let local_change = local_change.sign(&self.keychain)?;
969
970            local_changes_no_digests.push(local_change.clone());
971            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
972            updates.push(file_diff);
973        }
974
975        drop(tx);
976
977        if !updates.is_empty() {
978            self.client
979                .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
980                .await?;
981            ctx.pushed_metas = updates;
982        }
983
984        let mut tx = self.begin_tx().await;
985        let db = tx.db();
986
987        // base = local
988        (&mut db.base_metadata)
989            .to_lazy()
990            .stage(local_changes_no_digests)
991            .promote()?;
992        db.base_metadata.stage(&mut db.local_metadata).prune()?;
993
994        tx.end();
995
996        Ok(())
997    }
998
999    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1000    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1001        ctx.msg("Pushing document changes...");
1002        let mut updates = vec![];
1003        let mut local_changes_digests_only = vec![];
1004
1005        let tx = self.ro_tx().await;
1006        let db = tx.db();
1007        let start = Instant::now();
1008
1009        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1010
1011        for id in local.tree.staged.ids() {
1012            let base_file = local.tree.base.find(&id)?.clone();
1013
1014            // change only document hmac and re-sign
1015            let mut local_change = base_file.timestamped_value.value.clone();
1016            local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1017
1018            if base_file.document_hmac() == local_change.document_hmac()
1019                || local_change.document_hmac.is_none()
1020            {
1021                continue;
1022            }
1023
1024            let local_change = local_change.sign(&self.keychain)?;
1025
1026            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1027            local_changes_digests_only.push(local_change);
1028            self.events.sync(SyncIncrement::PushingDocument(id, true));
1029        }
1030
1031        drop(tx);
1032        if start.elapsed() > std::time::Duration::from_millis(100) {
1033            warn!("sync push_docs held lock for {:?}", start.elapsed());
1034        }
1035
1036        let docs_count = updates.len();
1037        ctx.total += docs_count;
1038        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1039
1040        let mut stream = stream::iter(futures).buffer_unordered(
1041            thread::available_parallelism()
1042                .unwrap_or(NonZeroUsize::new(4).unwrap())
1043                .into(),
1044        );
1045
1046        let mut idx = 0;
1047        while let Some(fut) = stream.next().await {
1048            let id = fut?;
1049            self.events.sync(SyncIncrement::PushingDocument(id, false));
1050            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1051            idx += 1;
1052        }
1053        ctx.pushed_docs = updates;
1054
1055        let mut tx = self.begin_tx().await;
1056        let db = tx.db();
1057        // base = local (metadata)
1058        (&mut db.base_metadata)
1059            .to_lazy()
1060            .stage(local_changes_digests_only)
1061            .promote()?;
1062
1063        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1064
1065        tx.end();
1066
1067        Ok(())
1068    }
1069
1070    async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1071        let id = *diff.new.id();
1072        let hmac = diff.new.document_hmac();
1073        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1074        self.client
1075            .request(
1076                self.get_account()?,
1077                ChangeDocRequest { diff, new_content: local_document_change },
1078            )
1079            .await?;
1080
1081        Ok(id)
1082    }
1083
1084    async fn dedup(
1085        &self, updates: GetUpdatesResponse,
1086    ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1087        let tx = self.ro_tx().await;
1088        let db = tx.db();
1089
1090        let mut root_id = None;
1091        let (remote_changes, update_as_of) = {
1092            let mut remote_changes = updates.file_metadata;
1093            let update_as_of = updates.as_of_metadata_version;
1094
1095            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1096
1097            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1098
1099            let (_, remote_changes) = remote.unstage();
1100            (remote_changes, update_as_of)
1101        };
1102
1103        // initialize root if this is the first pull on this device
1104        if db.root.get().is_none() {
1105            let root = remote_changes
1106                .all_files()?
1107                .into_iter()
1108                .find(|f| f.is_root())
1109                .ok_or(LbErrKind::RootNonexistent)?;
1110            root_id = Some(*root.id());
1111        }
1112
1113        Ok((remote_changes, update_as_of, root_id))
1114    }
1115
1116    async fn prune_remote_orphans(
1117        &self, remote_changes: Vec<SignedFile>,
1118    ) -> LbResult<Vec<SignedFile>> {
1119        let tx = self.ro_tx().await;
1120        let db = tx.db();
1121
1122        let me = Owner(self.keychain.get_pk()?);
1123        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1124        let mut result = Vec::new();
1125
1126        for id in remote.tree.staged.ids() {
1127            let meta = remote.find(&id)?;
1128            if remote.maybe_find_parent(meta).is_some()
1129                || meta
1130                    .user_access_keys()
1131                    .iter()
1132                    .any(|k| k.encrypted_for == me.0)
1133            {
1134                result.push(remote.find(&id)?.clone()); // todo: don't clone
1135            }
1136        }
1137        Ok(result)
1138    }
1139
1140    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1141        ctx.msg("Cleaning up...");
1142        let mut tx = self.begin_tx().await;
1143        let db = tx.db();
1144        db.last_synced.insert(ctx.update_as_of as i64)?;
1145
1146        if let Some(root) = ctx.root {
1147            db.root.insert(root)?;
1148        }
1149
1150        Ok(())
1151    }
1152
1153    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1154        let tx = self.ro_tx().await;
1155        let db = tx.db();
1156        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1157
1158        Ok(self.get_timestamp_human_string(last_synced))
1159    }
1160
1161    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1162        if timestamp != 0 {
1163            Duration::milliseconds(clock::get_time().0 - timestamp)
1164                .format_human()
1165                .to_string()
1166        } else {
1167            "never".to_string()
1168        }
1169    }
1170}
1171
1172impl SyncContext {
1173    fn summarize(&self) -> SyncStatus {
1174        let mut local = HashSet::new();
1175        let mut server = HashSet::new();
1176        let mut work_units = vec![];
1177
1178        for meta in &self.pushed_metas {
1179            local.insert(meta.new.id());
1180        }
1181
1182        for meta in &self.pushed_docs {
1183            local.insert(meta.new.id());
1184        }
1185
1186        for meta in &self.remote_changes {
1187            server.insert(meta.id());
1188        }
1189
1190        for id in local {
1191            work_units.push(WorkUnit::LocalChange(*id));
1192        }
1193
1194        for id in server {
1195            work_units.push(WorkUnit::ServerChange(*id));
1196        }
1197
1198        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1199    }
1200
1201    fn msg(&mut self, msg: &str) {
1202        self.current += 1;
1203        if let Some(f) = &self.progress {
1204            f(SyncProgress {
1205                total: self.total,
1206                progress: self.current,
1207                file_being_processed: Default::default(),
1208                msg: msg.to_string(),
1209            })
1210        }
1211    }
1212
1213    fn file_msg(&mut self, id: Uuid, msg: &str) {
1214        self.current += 1;
1215        if let Some(f) = &self.progress {
1216            f(SyncProgress {
1217                total: self.total,
1218                progress: self.current,
1219                file_being_processed: Some(id),
1220                msg: msg.to_string(),
1221            })
1222        }
1223    }
1224
1225    fn done_msg(&mut self) {
1226        self.current = self.total;
1227        if let Some(f) = &self.progress {
1228            f(SyncProgress {
1229                total: self.total,
1230                progress: self.current,
1231                file_being_processed: None,
1232                msg: "Sync successful!".to_string(),
1233            })
1234        }
1235    }
1236}
1237
1238#[derive(Debug, Serialize, Clone)]
1239pub struct SyncStatus {
1240    pub work_units: Vec<WorkUnit>,
1241    pub latest_server_ts: u64,
1242}
1243
1244#[derive(Clone)]
1245pub struct SyncProgress {
1246    pub total: usize,
1247    pub progress: usize,
1248    pub file_being_processed: Option<Uuid>,
1249    pub msg: String,
1250}
1251
1252impl Display for SyncProgress {
1253    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1254        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1255    }
1256}
1257
1258#[derive(Debug, Clone)]
1259pub enum SyncIncrement {
1260    SyncStarted,
1261    PullingDocument(Uuid, bool),
1262    PushingDocument(Uuid, bool),
1263    SyncFinished(Option<LbErrKind>),
1264}