lb_rs/service/
sync.rs

1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4    ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5    GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use uuid::Uuid;
35
36pub type SyncFlag = Arc<AtomicBool>;
37
38pub struct SyncContext {
39    progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
40    current: usize,
41    total: usize,
42
43    pk_cache: HashMap<Owner, String>,
44    last_synced: u64,
45    remote_changes: Vec<SignedFile>,
46    update_as_of: u64,
47    root: Option<Uuid>,
48    pushed_metas: Vec<FileDiff<SignedFile>>,
49    pushed_docs: Vec<FileDiff<SignedFile>>,
50    pulled_docs: Vec<Uuid>,
51}
52
53impl Lb {
54    #[instrument(level = "debug", skip_all, err(Debug))]
55    pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
56        let tx = self.ro_tx().await;
57        let db = tx.db();
58        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
59        drop(tx);
60
61        let remote_changes = self
62            .client
63            .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
64            .await?;
65        let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
66        let remote_dirty = deduped
67            .into_iter()
68            .map(|f| *f.id())
69            .map(WorkUnit::ServerChange);
70
71        self.prune().await?;
72
73        let tx = self.ro_tx().await;
74        let db = tx.db();
75
76        let locally_dirty = db
77            .local_metadata
78            .get()
79            .keys()
80            .copied()
81            .map(WorkUnit::LocalChange);
82
83        let mut work_units: Vec<WorkUnit> = Vec::new();
84        work_units.extend(locally_dirty.chain(remote_dirty));
85        Ok(SyncStatus { work_units, latest_server_ts })
86    }
87
88    #[instrument(level = "debug", skip_all, err(Debug))]
89    pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
90        let old = self.syncing.swap(true, Ordering::SeqCst);
91        if old {
92            return Err(LbErrKind::AlreadySyncing.into());
93        }
94
95        let mut ctx = self.setup_sync(f).await?;
96
97        let mut got_updates = false;
98        let mut pipeline: LbResult<()> = async {
99            ctx.msg("Preparing Sync...");
100            self.prune().await?;
101            got_updates = self.fetch_meta(&mut ctx).await?;
102            self.populate_pk_cache(&mut ctx).await?;
103            self.docs.dont_delete.store(true, Ordering::SeqCst);
104            self.fetch_docs(&mut ctx).await?;
105            self.merge(&mut ctx).await?;
106            self.push_meta(&mut ctx).await?;
107            self.push_docs(&mut ctx).await?;
108            Ok(())
109        }
110        .await;
111
112        self.docs.dont_delete.store(false, Ordering::SeqCst);
113
114        if pipeline.is_ok() {
115            pipeline = self.commit_last_synced(&mut ctx).await;
116        }
117
118        let cleanup = self.cleanup().await;
119
120        self.syncing.store(false, Ordering::Relaxed);
121        pipeline?;
122        cleanup?;
123        ctx.done_msg();
124
125        if got_updates {
126            self.events.meta_changed(self.root().await?.id);
127            for id in &ctx.pulled_docs {
128                self.events.doc_written(*id);
129            }
130        }
131
132        Ok(ctx.summarize())
133    }
134
135    async fn setup_sync(
136        &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
137    ) -> LbResult<SyncContext> {
138        let tx = self.ro_tx().await;
139        let db = tx.db();
140
141        let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
142        let pk_cache = db.pub_key_lookup.get().clone();
143
144        let current = 0;
145        let total = 7;
146
147        Ok(SyncContext {
148            last_synced,
149            pk_cache,
150
151            progress,
152            current,
153            total,
154
155            root: Default::default(),
156            update_as_of: Default::default(),
157            remote_changes: Default::default(),
158            pushed_docs: Default::default(),
159            pushed_metas: Default::default(),
160            pulled_docs: Default::default(),
161        })
162    }
163
164    async fn prune(&self) -> LbResult<()> {
165        let server_ids = self
166            .client
167            .request(self.get_account()?, GetFileIdsRequest {})
168            .await?
169            .ids;
170
171        let mut tx = self.begin_tx().await;
172        let db = tx.db();
173
174        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
175        let base_ids = local.tree.base.ids();
176
177        let mut prunable_ids = base_ids;
178        prunable_ids.retain(|id| !server_ids.contains(id));
179        for id in prunable_ids.clone() {
180            prunable_ids.extend(local.descendants(&id)?.into_iter());
181        }
182        for id in &prunable_ids {
183            if let Some(base_file) = local.tree.base.maybe_find(id) {
184                self.docs
185                    .delete(*id, base_file.document_hmac().copied())
186                    .await?;
187            }
188            if let Some(local_file) = local.maybe_find(id) {
189                self.docs
190                    .delete(*id, local_file.document_hmac().copied())
191                    .await?;
192            }
193        }
194
195        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
196        base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
197        base_staged.promote()?;
198
199        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
200        local_staged.tree.removed = prunable_ids.into_iter().collect();
201        local_staged.promote()?;
202
203        Ok(())
204    }
205
206    /// Returns true if there were any updates
207    async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
208        ctx.msg("Fetching tree updates...");
209        let updates = self
210            .client
211            .request(
212                self.get_account()?,
213                GetUpdatesRequest { since_metadata_version: ctx.last_synced },
214            )
215            .await?;
216
217        let empty = updates.file_metadata.is_empty();
218        let (remote, as_of, root) = self.dedup(updates).await?;
219        ctx.remote_changes = remote;
220        ctx.update_as_of = as_of;
221        ctx.root = root;
222
223        Ok(!empty)
224    }
225
226    async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
227        ctx.msg("Updating public key cache...");
228        let mut all_owners = HashSet::new();
229        for file in &ctx.remote_changes {
230            for user_access_key in file.user_access_keys() {
231                all_owners.insert(Owner(user_access_key.encrypted_by));
232                all_owners.insert(Owner(user_access_key.encrypted_for));
233            }
234        }
235
236        let mut new_entries = HashMap::new();
237
238        for owner in all_owners {
239            if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
240                let username_result = self
241                    .client
242                    .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
243                    .await;
244                let username = match username_result {
245                    Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
246                        "<unknown>".to_string()
247                    }
248                    _ => username_result?.username.clone(),
249                };
250                new_entries.insert(owner, username.clone());
251                e.insert(username.clone());
252            }
253        }
254
255        let mut tx = self.begin_tx().await;
256        let db = tx.db();
257
258        for (owner, username) in new_entries {
259            db.pub_key_lookup.insert(owner, username)?;
260        }
261        Ok(())
262    }
263
264    async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
265        ctx.msg("Fetching documents...");
266        let mut docs_to_pull = vec![];
267
268        let tx = self.ro_tx().await;
269        let db = tx.db();
270        let start = Instant::now();
271
272        let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); // this used to be owned remote changes
273        for id in remote.tree.staged.ids() {
274            if remote.calculate_deleted(&id)? {
275                continue;
276            }
277            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
278            let base_hmac = remote
279                .tree
280                .base
281                .maybe_find(&id)
282                .and_then(|f| f.document_hmac())
283                .cloned();
284            if base_hmac == remote_hmac {
285                continue;
286            }
287
288            if let Some(remote_hmac) = remote_hmac {
289                docs_to_pull.push((id, remote_hmac));
290            }
291        }
292
293        drop(tx);
294        if start.elapsed() > std::time::Duration::from_millis(100) {
295            warn!("sync fetch_docs held lock for {:?}", start.elapsed());
296        }
297
298        let num_docs = docs_to_pull.len();
299        ctx.total += num_docs;
300
301        let futures = docs_to_pull
302            .into_iter()
303            .map(|(id, hmac)| self.fetch_doc(id, hmac));
304
305        let mut stream = stream::iter(futures).buffer_unordered(
306            thread::available_parallelism()
307                .unwrap_or(NonZeroUsize::new(4).unwrap())
308                .into(),
309        );
310
311        let mut idx = 0;
312        while let Some(fut) = stream.next().await {
313            let id = fut?;
314            ctx.pulled_docs.push(id);
315            ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
316            idx += 1;
317        }
318        Ok(())
319    }
320
321    async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
322        let remote_document = self
323            .client
324            .request(self.get_account()?, GetDocRequest { id, hmac })
325            .await?;
326        self.docs
327            .insert(id, Some(hmac), &remote_document.content)
328            .await?;
329
330        Ok(id)
331    }
332
333    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
334    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
335    async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
336        let mut tx = self.begin_tx().await;
337        let db = tx.db();
338        let start = Instant::now();
339
340        let remote_changes = &ctx.remote_changes;
341
342        // fetch document updates and local documents for merge
343        let me = Owner(self.keychain.get_pk()?);
344
345        // compute merge changes
346        let merge_changes = {
347            // assemble trees
348            let mut base = (&db.base_metadata).to_lazy();
349            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
350            let mut remote = remote_unlazy.as_lazy();
351            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
352
353            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
354            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
355            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
356            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
357            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
358            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
359
360            'merge_construction: loop {
361                // process just the edits which allow us to check deletions in the result
362                let mut deletions = {
363                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
364
365                    // creations
366                    let mut deletion_creations = HashSet::new();
367                    for id in db.local_metadata.ids() {
368                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
369                            deletion_creations.insert(id);
370                        }
371                    }
372                    'drain_creations: while !deletion_creations.is_empty() {
373                        'choose_a_creation: for id in &deletion_creations {
374                            // create
375                            let id = *id;
376                            let local_file = local.find(&id)?.clone();
377                            let result = deletions.create_unvalidated(
378                                id,
379                                symkey::generate_key(),
380                                local_file.parent(),
381                                &local.name(&id, &self.keychain)?,
382                                local_file.file_type(),
383                                &self.keychain,
384                            );
385                            match result {
386                                Ok(_) => {
387                                    deletion_creations.remove(&id);
388                                    continue 'drain_creations;
389                                }
390                                Err(ref err) => match err.kind {
391                                    LbErrKind::FileParentNonexistent => {
392                                        continue 'choose_a_creation;
393                                    }
394                                    _ => {
395                                        result?;
396                                    }
397                                },
398                            }
399                        }
400                        return Err(LbErrKind::Unexpected(format!(
401                            "sync failed to find a topomodelal order for file creations: {:?}",
402                            deletion_creations
403                        ))
404                        .into());
405                    }
406
407                    // moves (creations happen first in case a file is moved into a new folder)
408                    for id in db.local_metadata.ids() {
409                        let local_file = local.find(&id)?.clone();
410                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
411                            if !local_file.explicitly_deleted()
412                                && local_file.parent() != base_file.parent()
413                                && !files_to_unmove.contains(&id)
414                            {
415                                // move
416                                deletions.move_unvalidated(
417                                    &id,
418                                    local_file.parent(),
419                                    &self.keychain,
420                                )?;
421                            }
422                        }
423                    }
424
425                    // deletions (moves happen first in case a file is moved into a deleted folder)
426                    for id in db.local_metadata.ids() {
427                        let local_file = local.find(&id)?.clone();
428                        if local_file.explicitly_deleted() {
429                            // delete
430                            deletions.delete_unvalidated(&id, &self.keychain)?;
431                        }
432                    }
433                    deletions
434                };
435
436                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
437                let mut merge = {
438                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
439
440                    // creations and edits of created documents
441                    let mut creations = HashSet::new();
442                    for id in db.local_metadata.ids() {
443                        if deletions.maybe_find(&id).is_some()
444                            && !deletions.calculate_deleted(&id)?
445                            && remote.maybe_find(&id).is_none()
446                            && !links_to_delete.contains(&id)
447                        {
448                            creations.insert(id);
449                        }
450                    }
451                    'drain_creations: while !creations.is_empty() {
452                        'choose_a_creation: for id in &creations {
453                            // create
454                            let id = *id;
455                            let local_file = local.find(&id)?.clone();
456                            let result = merge.create_unvalidated(
457                                id,
458                                local.decrypt_key(&id, &self.keychain)?,
459                                local_file.parent(),
460                                &local.name(&id, &self.keychain)?,
461                                local_file.file_type(),
462                                &self.keychain,
463                            );
464                            match result {
465                                Ok(_) => {
466                                    creations.remove(&id);
467                                    continue 'drain_creations;
468                                }
469                                Err(ref err) => match err.kind {
470                                    LbErrKind::FileParentNonexistent => {
471                                        continue 'choose_a_creation;
472                                    }
473                                    _ => {
474                                        result?;
475                                    }
476                                },
477                            }
478                        }
479                        return Err(LbErrKind::Unexpected(format!(
480                            "sync failed to find a topomodelal order for file creations: {:?}",
481                            creations
482                        ))
483                        .into());
484                    }
485
486                    // moves, renames, edits, and shares
487                    // creations happen first in case a file is moved into a new folder
488                    for id in db.local_metadata.ids() {
489                        // skip files that are already deleted or will be deleted
490                        if deletions.maybe_find(&id).is_none()
491                            || deletions.calculate_deleted(&id)?
492                            || (remote.maybe_find(&id).is_some()
493                                && remote.calculate_deleted(&id)?)
494                        {
495                            continue;
496                        }
497
498                        let local_file = local.find(&id)?.clone();
499                        let local_name = local.name(&id, &self.keychain)?;
500                        let maybe_base_file = base.maybe_find(&id).cloned();
501                        let maybe_remote_file = remote.maybe_find(&id).cloned();
502                        if let Some(ref base_file) = maybe_base_file {
503                            let base_name = base.name(&id, &self.keychain)?;
504                            let remote_file = remote.find(&id)?.clone();
505                            let remote_name = remote.name(&id, &self.keychain)?;
506
507                            // move
508                            if local_file.parent() != base_file.parent()
509                                && remote_file.parent() == base_file.parent()
510                                && !files_to_unmove.contains(&id)
511                            {
512                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
513                            }
514
515                            // rename
516                            if local_name != base_name && remote_name == base_name {
517                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
518                            }
519                        }
520
521                        // share
522                        let mut remote_keys = HashMap::new();
523                        if let Some(ref remote_file) = maybe_remote_file {
524                            for key in remote_file.user_access_keys() {
525                                remote_keys.insert(
526                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
527                                    (key.mode, key.deleted),
528                                );
529                            }
530                        }
531                        for key in local_file.user_access_keys() {
532                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
533                            if let Some(&(remote_mode, remote_deleted)) =
534                                remote_keys.get(&(by, for_))
535                            {
536                                // upgrade share
537                                if key.mode > remote_mode || !key.deleted && remote_deleted {
538                                    let mode = match key.mode {
539                                        UserAccessMode::Read => ShareMode::Read,
540                                        UserAccessMode::Write => ShareMode::Write,
541                                        UserAccessMode::Owner => continue,
542                                    };
543                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
544                                }
545                                // delete share
546                                if key.deleted && !remote_deleted {
547                                    merge.delete_share_unvalidated(
548                                        &id,
549                                        Some(for_.0),
550                                        &self.keychain,
551                                    )?;
552                                }
553                            } else {
554                                // add share
555                                let mode = match key.mode {
556                                    UserAccessMode::Read => ShareMode::Read,
557                                    UserAccessMode::Write => ShareMode::Write,
558                                    UserAccessMode::Owner => continue,
559                                };
560                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
561                            }
562                        }
563
564                        // share deletion due to conflicts
565                        if files_to_unshare.contains(&id) {
566                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
567                        }
568
569                        // rename due to path conflict
570                        if let Some(&rename_increment) = rename_increments.get(&id) {
571                            let name = NameComponents::from(&local_name)
572                                .generate_incremented(rename_increment)
573                                .to_name();
574                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
575                        }
576
577                        // edit
578                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
579                        let remote_hmac =
580                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
581                        let local_hmac = local_file.document_hmac().cloned();
582                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
583                            && local_hmac != base_hmac
584                        {
585                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
586                                // merge
587                                let merge_name = merge.name(&id, &self.keychain)?;
588                                let document_type =
589                                    DocumentType::from_file_name_using_extension(&merge_name);
590
591                                // todo these accesses are potentially problematic
592                                // maybe not if service/docs is the persion doing network io
593                                let base_document =
594                                    self.read_document_helper(id, &mut base).await?;
595                                let remote_document =
596                                    self.read_document_helper(id, &mut remote).await?;
597                                let local_document =
598                                    self.read_document_helper(id, &mut local).await?;
599
600                                match document_type {
601                                    DocumentType::Text => {
602                                        // 3-way merge
603                                        // todo: a couple more clones than necessary
604                                        let base_document =
605                                            String::from_utf8_lossy(&base_document).to_string();
606                                        let remote_document =
607                                            String::from_utf8_lossy(&remote_document).to_string();
608                                        let local_document =
609                                            String::from_utf8_lossy(&local_document).to_string();
610                                        let merged_document = Buffer::from(base_document.as_str())
611                                            .merge(local_document, remote_document);
612                                        let encrypted_document = merge
613                                            .update_document_unvalidated(
614                                                &id,
615                                                &merged_document.into_bytes(),
616                                                &self.keychain,
617                                            )?;
618                                        let hmac = merge.find(&id)?.document_hmac().copied();
619                                        self.docs.insert(id, hmac, &encrypted_document).await?;
620                                    }
621                                    DocumentType::Drawing => {
622                                        println!("sync merge");
623                                        let base_document =
624                                            String::from_utf8_lossy(&base_document).to_string();
625                                        let remote_document =
626                                            String::from_utf8_lossy(&remote_document).to_string();
627                                        let local_document =
628                                            String::from_utf8_lossy(&local_document).to_string();
629
630                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
631                                        let remote_buffer =
632                                            svg::buffer::Buffer::new(&remote_document);
633                                        let mut local_buffer =
634                                            svg::buffer::Buffer::new(&local_document);
635
636                                        for (_, el) in local_buffer.elements.iter_mut() {
637                                            if let Element::Path(path) = el {
638                                                path.data.apply_transform(u_transform_to_bezier(
639                                                    &local_buffer.master_transform,
640                                                ));
641                                            }
642                                        }
643                                        svg::buffer::Buffer::reload(
644                                            &mut local_buffer.elements,
645                                            &mut local_buffer.weak_images,
646                                            local_buffer.master_transform,
647                                            &base_buffer,
648                                            &remote_buffer,
649                                        );
650
651                                        let merged_document = local_buffer.serialize();
652                                        let encrypted_document = merge
653                                            .update_document_unvalidated(
654                                                &id,
655                                                &merged_document.into_bytes(),
656                                                &self.keychain,
657                                            )?;
658                                        let hmac = merge.find(&id)?.document_hmac().copied();
659                                        self.docs.insert(id, hmac, &encrypted_document).await?;
660                                    }
661                                    DocumentType::Other => {
662                                        // duplicate file
663                                        let merge_parent = *merge.find(&id)?.parent();
664                                        let duplicate_id = if let Some(&duplicate_id) =
665                                            duplicate_file_ids.get(&id)
666                                        {
667                                            duplicate_id
668                                        } else {
669                                            let duplicate_id = Uuid::new_v4();
670                                            duplicate_file_ids.insert(id, duplicate_id);
671                                            rename_increments.insert(duplicate_id, 1);
672                                            duplicate_id
673                                        };
674
675                                        let mut merge_name = merge_name;
676                                        merge_name = NameComponents::from(&merge_name)
677                                            .generate_incremented(
678                                                rename_increments
679                                                    .get(&duplicate_id)
680                                                    .copied()
681                                                    .unwrap_or_default(),
682                                            )
683                                            .to_name();
684
685                                        merge.create_unvalidated(
686                                            duplicate_id,
687                                            symkey::generate_key(),
688                                            &merge_parent,
689                                            &merge_name,
690                                            FileType::Document,
691                                            &self.keychain,
692                                        )?;
693                                        let encrypted_document = merge
694                                            .update_document_unvalidated(
695                                                &duplicate_id,
696                                                &local_document,
697                                                &self.keychain,
698                                            )?;
699                                        let duplicate_hmac =
700                                            merge.find(&duplicate_id)?.document_hmac().copied();
701                                        self.docs
702                                            .insert(
703                                                duplicate_id,
704                                                duplicate_hmac,
705                                                &encrypted_document,
706                                            )
707                                            .await?;
708                                    }
709                                }
710                            } else {
711                                // overwrite (todo: avoid reading/decrypting/encrypting document)
712                                let document = self.read_document_helper(id, &mut local).await?;
713                                merge.update_document_unvalidated(
714                                    &id,
715                                    &document,
716                                    &self.keychain,
717                                )?;
718                            }
719                        }
720                    }
721
722                    // deletes
723                    // moves happen first in case a file is moved into a deleted folder
724                    for id in db.local_metadata.ids() {
725                        if db.base_metadata.maybe_find(&id).is_some()
726                            && deletions.calculate_deleted(&id)?
727                            && !merge.calculate_deleted(&id)?
728                        {
729                            // delete
730                            merge.delete_unvalidated(&id, &self.keychain)?;
731                        }
732                    }
733                    for &id in &links_to_delete {
734                        // delete
735                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
736                            merge.delete_unvalidated(&id, &self.keychain)?;
737                        }
738                    }
739
740                    merge
741                };
742
743                // validate; handle failures by introducing changeset constraints
744                for link in merge.ids() {
745                    if !merge.calculate_deleted(&link)? {
746                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
747                            if merge.maybe_find(&target).is_some()
748                                && merge.calculate_deleted(&target)?
749                            {
750                                // delete links to deleted files
751                                if links_to_delete.insert(link) {
752                                    continue 'merge_construction;
753                                } else {
754                                    return Err(LbErrKind::Unexpected(format!(
755                                        "sync failed to resolve broken link (deletion): {:?}",
756                                        link
757                                    ))
758                                    .into());
759                                }
760                            }
761                        }
762                    }
763                }
764
765                let validate_result = merge.validate(me);
766                match validate_result {
767                    // merge changeset is valid
768                    Ok(_) => {
769                        let (_, merge_changes) = merge.unstage();
770                        break merge_changes;
771                    }
772                    Err(ref err) => match err.kind {
773                        LbErrKind::Validation(ref vf) => match vf {
774                            // merge changeset has resolvable validation errors and needs modification
775                            ValidationFailure::Cycle(ids) => {
776                                // revert all local moves in the cycle
777                                let mut progress = false;
778                                for &id in ids {
779                                    if db.local_metadata.maybe_find(&id).is_some()
780                                        && files_to_unmove.insert(id)
781                                    {
782                                        progress = true;
783                                    }
784                                }
785                                if !progress {
786                                    return Err(LbErrKind::Unexpected(format!(
787                                        "sync failed to resolve cycle: {:?}",
788                                        ids
789                                    ))
790                                    .into());
791                                }
792                            }
793                            ValidationFailure::PathConflict(ids) => {
794                                // pick one local id and generate a non-conflicting filename
795                                let mut progress = false;
796                                for &id in ids {
797                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
798                                        *rename_increments.entry(id).or_insert(0) += 1;
799                                        progress = true;
800                                        break;
801                                    }
802                                }
803                                if !progress {
804                                    for &id in ids {
805                                        if db.local_metadata.maybe_find(&id).is_some() {
806                                            *rename_increments.entry(id).or_insert(0) += 1;
807                                            progress = true;
808                                            break;
809                                        }
810                                    }
811                                }
812                                if !progress {
813                                    return Err(LbErrKind::Unexpected(format!(
814                                        "sync failed to resolve path conflict: {:?}",
815                                        ids
816                                    ))
817                                    .into());
818                                }
819                            }
820                            ValidationFailure::SharedLink { link, shared_ancestor } => {
821                                // if ancestor is newly shared, delete share, otherwise delete link
822                                let mut progress = false;
823                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
824                                {
825                                    if !base_shared_ancestor.is_shared()
826                                        && files_to_unshare.insert(*shared_ancestor)
827                                    {
828                                        progress = true;
829                                    }
830                                }
831                                if !progress && links_to_delete.insert(*link) {
832                                    progress = true;
833                                }
834                                if !progress {
835                                    return Err(LbErrKind::Unexpected(format!(
836                                    "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
837                                    link, shared_ancestor
838                                )).into());
839                                }
840                            }
841                            ValidationFailure::DuplicateLink { target } => {
842                                // delete local link with this target
843                                let mut progress = false;
844                                if let Some(link) = local.linked_by(target)? {
845                                    if links_to_delete.insert(link) {
846                                        progress = true;
847                                    }
848                                }
849                                if !progress {
850                                    return Err(LbErrKind::Unexpected(format!(
851                                        "sync failed to resolve duplicate link: target: {:?}",
852                                        target
853                                    ))
854                                    .into());
855                                }
856                            }
857                            ValidationFailure::BrokenLink(link) => {
858                                // delete local link with this target
859                                if !links_to_delete.insert(*link) {
860                                    return Err(LbErrKind::Unexpected(format!(
861                                        "sync failed to resolve broken link: {:?}",
862                                        link
863                                    ))
864                                    .into());
865                                }
866                            }
867                            ValidationFailure::OwnedLink(link) => {
868                                // if target is newly owned, unmove target, otherwise delete link
869                                let mut progress = false;
870                                if let Some(remote_link) = remote.maybe_find(link) {
871                                    if let FileType::Link { target } = remote_link.file_type() {
872                                        let remote_target = remote.find(&target)?;
873                                        if remote_target.owner() != me
874                                            && files_to_unmove.insert(target)
875                                        {
876                                            progress = true;
877                                        }
878                                    }
879                                }
880                                if !progress && links_to_delete.insert(*link) {
881                                    progress = true;
882                                }
883                                if !progress {
884                                    return Err(LbErrKind::Unexpected(format!(
885                                        "sync failed to resolve owned link: {:?}",
886                                        link
887                                    ))
888                                    .into());
889                                }
890                            }
891                            // merge changeset has unexpected validation errors
892                            ValidationFailure::Orphan(_)
893                            | ValidationFailure::NonFolderWithChildren(_)
894                            | ValidationFailure::FileWithDifferentOwnerParent(_)
895                            | ValidationFailure::FileNameTooLong(_)
896                            | ValidationFailure::DeletedFileUpdated(_)
897                            | ValidationFailure::NonDecryptableFileName(_) => {
898                                validate_result?;
899                            }
900                        },
901                        // merge changeset has unexpected errors
902                        _ => {
903                            validate_result?;
904                        }
905                    },
906                }
907            }
908        };
909
910        // base = remote; local = merge
911        (&mut db.base_metadata)
912            .to_staged(remote_changes.clone())
913            .to_lazy()
914            .promote()?;
915        db.local_metadata.clear()?;
916        (&mut db.local_metadata)
917            .to_staged(merge_changes)
918            .to_lazy()
919            .promote()?;
920
921        // todo who else calls this did they manage locks right?
922        // self.cleanup_local_metadata()?;
923        db.base_metadata.stage(&mut db.local_metadata).prune()?;
924
925        if start.elapsed() > std::time::Duration::from_millis(100) {
926            warn!("sync merge held lock for {:?}", start.elapsed());
927        }
928
929        Ok(())
930    }
931
932    /// Updates remote and base metadata to local.
933    async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
934        ctx.msg("Pushing tree changes...");
935        let mut updates = vec![];
936        let mut local_changes_no_digests = Vec::new();
937
938        let tx = self.ro_tx().await;
939        let db = tx.db();
940
941        // remote = local
942        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
943
944        for id in local.tree.staged.ids() {
945            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
946            let maybe_base_file = local.tree.base.maybe_find(&id);
947
948            // change everything but document hmac and re-sign
949            local_change.document_hmac =
950                maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
951            let local_change = local_change.sign(&self.keychain)?;
952
953            local_changes_no_digests.push(local_change.clone());
954            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
955            updates.push(file_diff);
956        }
957
958        drop(tx);
959
960        if !updates.is_empty() {
961            self.client
962                .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
963                .await?;
964            ctx.pushed_metas = updates;
965        }
966
967        let mut tx = self.begin_tx().await;
968        let db = tx.db();
969
970        // base = local
971        (&mut db.base_metadata)
972            .to_lazy()
973            .stage(local_changes_no_digests)
974            .promote()?;
975        db.base_metadata.stage(&mut db.local_metadata).prune()?;
976
977        tx.end();
978
979        Ok(())
980    }
981
982    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
983    async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
984        ctx.msg("Pushing document changes...");
985        let mut updates = vec![];
986        let mut local_changes_digests_only = vec![];
987
988        let tx = self.ro_tx().await;
989        let db = tx.db();
990        let start = Instant::now();
991
992        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
993
994        for id in local.tree.staged.ids() {
995            let base_file = local.tree.base.find(&id)?.clone();
996
997            // change only document hmac and re-sign
998            let mut local_change = base_file.timestamped_value.value.clone();
999            local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1000
1001            if base_file.document_hmac() == local_change.document_hmac()
1002                || local_change.document_hmac.is_none()
1003            {
1004                continue;
1005            }
1006
1007            let local_change = local_change.sign(&self.keychain)?;
1008
1009            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1010            local_changes_digests_only.push(local_change);
1011        }
1012
1013        drop(tx);
1014        if start.elapsed() > std::time::Duration::from_millis(100) {
1015            warn!("sync push_docs held lock for {:?}", start.elapsed());
1016        }
1017
1018        let docs_count = updates.len();
1019        ctx.total += docs_count;
1020        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1021
1022        let mut stream = stream::iter(futures).buffer_unordered(
1023            thread::available_parallelism()
1024                .unwrap_or(NonZeroUsize::new(4).unwrap())
1025                .into(),
1026        );
1027
1028        let mut idx = 0;
1029        while let Some(fut) = stream.next().await {
1030            let id = fut?;
1031            ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1032            idx += 1;
1033        }
1034        ctx.pushed_docs = updates;
1035
1036        let mut tx = self.begin_tx().await;
1037        let db = tx.db();
1038        // base = local (metadata)
1039        (&mut db.base_metadata)
1040            .to_lazy()
1041            .stage(local_changes_digests_only)
1042            .promote()?;
1043
1044        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1045
1046        tx.end();
1047
1048        Ok(())
1049    }
1050
1051    async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1052        let id = *diff.new.id();
1053        let hmac = diff.new.document_hmac();
1054        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1055        self.client
1056            .request(
1057                self.get_account()?,
1058                ChangeDocRequest { diff, new_content: local_document_change },
1059            )
1060            .await?;
1061
1062        Ok(id)
1063    }
1064
1065    async fn dedup(
1066        &self, updates: GetUpdatesResponse,
1067    ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1068        let tx = self.ro_tx().await;
1069        let db = tx.db();
1070
1071        let mut root_id = None;
1072        let (remote_changes, update_as_of) = {
1073            let mut remote_changes = updates.file_metadata;
1074            let update_as_of = updates.as_of_metadata_version;
1075
1076            remote_changes = self.prune_remote_orphans(remote_changes).await?;
1077
1078            let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1079
1080            let (_, remote_changes) = remote.unstage();
1081            (remote_changes, update_as_of)
1082        };
1083
1084        // initialize root if this is the first pull on this device
1085        if db.root.get().is_none() {
1086            let root = remote_changes
1087                .all_files()?
1088                .into_iter()
1089                .find(|f| f.is_root())
1090                .ok_or(LbErrKind::RootNonexistent)?;
1091            root_id = Some(*root.id());
1092        }
1093
1094        Ok((remote_changes, update_as_of, root_id))
1095    }
1096
1097    async fn prune_remote_orphans(
1098        &self, remote_changes: Vec<SignedFile>,
1099    ) -> LbResult<Vec<SignedFile>> {
1100        let tx = self.ro_tx().await;
1101        let db = tx.db();
1102
1103        let me = Owner(self.keychain.get_pk()?);
1104        let remote = db.base_metadata.stage(remote_changes).to_lazy();
1105        let mut result = Vec::new();
1106
1107        for id in remote.tree.staged.ids() {
1108            let meta = remote.find(&id)?;
1109            if remote.maybe_find_parent(meta).is_some()
1110                || meta
1111                    .user_access_keys()
1112                    .iter()
1113                    .any(|k| k.encrypted_for == me.0)
1114            {
1115                result.push(remote.find(&id)?.clone()); // todo: don't clone
1116            }
1117        }
1118        Ok(result)
1119    }
1120
1121    async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1122        ctx.msg("Cleaning up...");
1123        let mut tx = self.begin_tx().await;
1124        let db = tx.db();
1125        db.last_synced.insert(ctx.update_as_of as i64)?;
1126
1127        if let Some(root) = ctx.root {
1128            db.root.insert(root)?;
1129        }
1130
1131        Ok(())
1132    }
1133
1134    pub async fn get_last_synced_human(&self) -> LbResult<String> {
1135        let tx = self.ro_tx().await;
1136        let db = tx.db();
1137        let last_synced = db.last_synced.get().copied().unwrap_or(0);
1138
1139        Ok(self.get_timestamp_human_string(last_synced))
1140    }
1141
1142    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1143        if timestamp != 0 {
1144            Duration::milliseconds(clock::get_time().0 - timestamp)
1145                .format_human()
1146                .to_string()
1147        } else {
1148            "never".to_string()
1149        }
1150    }
1151}
1152
1153impl SyncContext {
1154    fn summarize(&self) -> SyncStatus {
1155        let mut local = HashSet::new();
1156        let mut server = HashSet::new();
1157        let mut work_units = vec![];
1158
1159        for meta in &self.pushed_metas {
1160            local.insert(meta.new.id());
1161        }
1162
1163        for meta in &self.pushed_docs {
1164            local.insert(meta.new.id());
1165        }
1166
1167        for meta in &self.remote_changes {
1168            server.insert(meta.id());
1169        }
1170
1171        for id in local {
1172            work_units.push(WorkUnit::LocalChange(*id));
1173        }
1174
1175        for id in server {
1176            work_units.push(WorkUnit::ServerChange(*id));
1177        }
1178
1179        SyncStatus { work_units, latest_server_ts: self.update_as_of }
1180    }
1181
1182    fn msg(&mut self, msg: &str) {
1183        self.current += 1;
1184        if let Some(f) = &self.progress {
1185            f(SyncProgress {
1186                total: self.total,
1187                progress: self.current,
1188                file_being_processed: Default::default(),
1189                msg: msg.to_string(),
1190            })
1191        }
1192    }
1193
1194    fn file_msg(&mut self, id: Uuid, msg: &str) {
1195        self.current += 1;
1196        if let Some(f) = &self.progress {
1197            f(SyncProgress {
1198                total: self.total,
1199                progress: self.current,
1200                file_being_processed: Some(id),
1201                msg: msg.to_string(),
1202            })
1203        }
1204    }
1205
1206    fn done_msg(&mut self) {
1207        self.current = self.total;
1208        if let Some(f) = &self.progress {
1209            f(SyncProgress {
1210                total: self.total,
1211                progress: self.current,
1212                file_being_processed: None,
1213                msg: "Sync successful!".to_string(),
1214            })
1215        }
1216    }
1217}
1218
1219#[derive(Debug, Serialize, Clone)]
1220pub struct SyncStatus {
1221    pub work_units: Vec<WorkUnit>,
1222    pub latest_server_ts: u64,
1223}
1224
1225#[derive(Clone)]
1226pub struct SyncProgress {
1227    pub total: usize,
1228    pub progress: usize,
1229    pub file_being_processed: Option<Uuid>,
1230    pub msg: String,
1231}
1232
1233impl Display for SyncProgress {
1234    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1235        write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1236    }
1237}