Skip to main content

lb_rs/subscribers/
syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4    sync::Arc,
5    thread,
6    time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use tokio::time;
12use usvg::Transform;
13use uuid::Uuid;
14
15use crate::{
16    Lb, LbErrKind, LbResult,
17    io::network::ApiError,
18    model::{
19        ValidationFailure,
20        access_info::UserAccessMode,
21        account::Account,
22        api::{
23            ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
24            GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
25        },
26        crypto::{DecryptedDocument, EncryptedDocument},
27        errors::{LbErr, Unexpected},
28        file::ShareMode,
29        file_like::FileLike,
30        file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
31        filename::{DocumentType, NameComponents},
32        lazy::LazyTree,
33        signed_meta::SignedMeta,
34        staged::StagedTreeLikeMut,
35        svg::{self, buffer::u_transform_to_bezier, element::Element},
36        symkey, text,
37        tree_like::TreeLike,
38        validate,
39    },
40    service::events::{Actor, Event, SyncIncrement},
41};
42
43pub type Syncer = Arc<Mutex<SyncState>>;
44
45#[derive(Default)]
46pub struct SyncState {
47    /// the starting point for updates for this sync pass
48    last_synced: u64,
49
50    /// if our pull is successful, this is the timestamp we will commit
51    updates_as_of: u64,
52
53    /// changes we pulled from the server, post deduplication
54    remote_changes: Vec<SignedMeta>,
55
56    /// did we pull a root on this pass?
57    new_root: Option<Uuid>,
58
59    /// what docs did we pull as a result of this sync
60    pulled_docs: Vec<Uuid>,
61}
62
63// we are gonna have a fetch metadata fn which will get the docs that it needs to get, the ones
64// that match should_fetch
65//
66// should_fetch is going to be a tree fn that will return true if:
67//     is md or svg that descends from
68
69impl Lb {
70    #[instrument(level = "debug", skip(self), err(Debug))]
71    pub async fn sync(&self) -> LbResult<()> {
72        let mut sync_state = self.syncer.lock().await;
73        self.events.sync_update(SyncIncrement::SyncStarted);
74
75        let pipeline: LbResult<()> = async {
76            self.pull_updates(&mut sync_state).await?;
77            self.push_local_changes().await?;
78            Ok(())
79        }
80        .await;
81
82        self.events.sync_update(SyncIncrement::SyncFinished(
83            pipeline.as_ref().err().map(|err| err.kind.clone()),
84        ));
85
86        self.cleanup().await?;
87
88        pipeline?;
89
90        let account = self.get_account()?.clone();
91
92        #[cfg(not(target_family = "wasm"))]
93        if account.is_beta() {
94            self.send_debug_info(account).await;
95        }
96
97        Ok(())
98    }
99
100    pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
101        self.inital_sync_state(sync_state).await?;
102        self.process_deletions().await?;
103        self.fetch_meta(sync_state).await?;
104        self.fetch_required_docs(sync_state).await?;
105        // todo: should this inform a re-pull?
106        self.merge(sync_state).await?;
107        self.commit_last_synced(sync_state).await?;
108        self.send_pull_events(sync_state).await?;
109
110        if !self.config.background_work {
111            self.populate_pk_cache().await?;
112        }
113
114        Ok(())
115    }
116
117    pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
118        self.push_meta().await?;
119        self.push_docs().await?;
120
121        Ok(())
122    }
123
124    async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
125        let tx = self.ro_tx().await;
126        let db = tx.db();
127
128        *state = Default::default();
129        state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
130
131        Ok(())
132    }
133
134    pub(crate) async fn process_deletions(&self) -> LbResult<()> {
135        let server_ids = self
136            .client
137            .request(self.get_account()?, GetFileIdsRequest {})
138            .await?
139            .ids;
140
141        let mut tx = self.begin_tx().await;
142        let db = tx.db();
143
144        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
145        let base_ids = local.tree.base.ids();
146
147        let mut prunable_ids = base_ids;
148        prunable_ids.retain(|id| !server_ids.contains(id));
149        for id in prunable_ids.clone() {
150            prunable_ids.extend(local.descendants(&id)?.into_iter());
151        }
152        for id in &prunable_ids {
153            if let Some(base_file) = local.tree.base.maybe_find(id) {
154                self.docs
155                    .delete(*id, base_file.document_hmac().copied())
156                    .await?;
157            }
158            if let Some(local_file) = local.maybe_find(id) {
159                self.docs
160                    .delete(*id, local_file.document_hmac().copied())
161                    .await?;
162            }
163        }
164
165        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
166        base_staged.tree.removed = prunable_ids.iter().copied().collect();
167        base_staged.promote()?;
168
169        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
170        local_staged.tree.removed = prunable_ids.iter().copied().collect();
171        local_staged.promote()?;
172
173        if !prunable_ids.is_empty() {
174            self.events.meta_changed(Actor::Sync);
175        }
176
177        Ok(())
178    }
179
180    async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
181        let updates = self
182            .client
183            .request(
184                self.get_account()?,
185                GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
186            )
187            .await?;
188
189        let tx = self.ro_tx().await;
190        let db = tx.db();
191
192        // this loop implicitly prunes remote orphans
193        let mut without_orphans = Vec::new();
194        let me = Owner(self.keychain.get_pk()?);
195        let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
196        for id in remote.tree.staged.ids() {
197            let meta = remote.find(&id)?;
198            if remote.maybe_find_parent(meta).is_some()
199                || meta
200                    .user_access_keys()
201                    .iter()
202                    .any(|k| k.encrypted_for == me.0)
203            {
204                without_orphans.push(remote.find(&id)?.clone());
205            }
206        }
207
208        // this is what actually performs the deduplication
209        let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
210        let (_, deduped_changes) = pruned_tree.unstage();
211
212        // initialize root if this is the first pull on this device
213        let mut root_id = None;
214        if db.root.get().is_none() {
215            let root = deduped_changes
216                .all_files()?
217                .into_iter()
218                .find(|f| f.is_root())
219                .ok_or(LbErrKind::RootNonexistent)?;
220            root_id = Some(*root.id());
221        }
222
223        state.remote_changes = deduped_changes;
224        state.updates_as_of = updates.as_of_metadata_version;
225        state.new_root = root_id;
226
227        Ok(())
228    }
229
230    async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
231        let mut docs_to_pull = vec![];
232
233        let tx = self.ro_tx().await;
234        let db = tx.db();
235
236        let mut files_with_local_edits = vec![];
237        let local = db.base_metadata.stage(&db.local_metadata);
238        for id in local.staged.ids() {
239            if let Some(base) = local.base.maybe_find(&id) {
240                if let Some(local_hmac) = local.find(&id)?.document_hmac() {
241                    if Some(local_hmac) != base.document_hmac() {
242                        files_with_local_edits.push(id);
243                        println!("local edits found");
244                    }
245                }
246            }
247        }
248
249        let mut remote = db
250            .base_metadata
251            .stage(state.remote_changes.clone())
252            .to_lazy();
253
254        for id in remote.tree.staged.ids() {
255            if remote.calculate_deleted(&id)? {
256                continue;
257            }
258            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
259            let base_hmac = remote
260                .tree
261                .base
262                .maybe_find(&id)
263                .and_then(|f| f.document_hmac())
264                .cloned();
265            if base_hmac == remote_hmac {
266                continue;
267            }
268
269            if let Some(remote_hmac) = remote_hmac {
270                // pull a file if we have a prior base, this is our heuristic -- do they have the
271                // ability to edit this file while we release the lock and are pulling all the
272                // files
273                if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
274                    docs_to_pull.push((id, remote_hmac));
275                }
276
277                // this clause captures documents which went from being new -> multiple parties
278                // having updates. We'll still need the updates
279                if files_with_local_edits.contains(&id)
280                    && !docs_to_pull
281                        .iter()
282                        .any(|(already_pulling, _)| already_pulling == &id)
283                {
284                    if let Some(base_hmac) = base_hmac {
285                        if !self.docs.exists(id, Some(base_hmac)) {
286                            // this scenario basically only comes up in tests
287                            // someone modifies a file directly without reading the prior version
288                            docs_to_pull.push((id, base_hmac));
289                        }
290                    }
291                    docs_to_pull.push((id, remote_hmac));
292                }
293            }
294        }
295        drop(tx);
296
297        let futures = docs_to_pull
298            .into_iter()
299            .map(|(id, hmac)| async move { self.fetch_doc(id, hmac).await.map(|_| id) });
300
301        let mut stream = stream::iter(futures).buffer_unordered(
302            thread::available_parallelism()
303                .unwrap_or(NonZeroUsize::new(4).unwrap())
304                .into(),
305        );
306
307        while let Some(fut) = stream.next().await {
308            let id = fut?;
309            state.pulled_docs.push(id);
310        }
311
312        Ok(())
313    }
314
315    pub(crate) async fn fetch_doc(
316        &self, id: Uuid, hmac: DocumentHmac,
317    ) -> LbResult<EncryptedDocument> {
318        // todo: in a lot of cases there is a list of ids we're trying to get, it would be better
319        // if the caller managed the event updates, the status would be more meaningful for longer
320
321        // in this world, can we get stuck pushing a doc as well? Probably fine for now
322        if let Ok(Some(doc)) = self.docs.maybe_get(id, Some(hmac)).await {
323            return Ok(doc);
324        }
325
326        self.events
327            .sync_update(SyncIncrement::PullingDocument(id, true));
328        let remote_document = self
329            .client
330            .request(self.get_account()?, GetDocRequest { id, hmac })
331            .await?;
332        self.docs
333            .insert(id, Some(hmac), &remote_document.content)
334            .await?;
335        self.events
336            .sync_update(SyncIncrement::PullingDocument(id, false));
337
338        Ok(remote_document.content)
339    }
340
341    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
342    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
343    async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
344        let mut tx = self.begin_tx().await;
345        let db = tx.db();
346        let start = Instant::now();
347
348        let remote_changes = &state.remote_changes;
349
350        // fetch document updates and local documents for merge
351        let me = Owner(self.keychain.get_pk()?);
352
353        // compute merge changes
354        let merge_changes = {
355            // assemble trees
356            let mut base = (&db.base_metadata).to_lazy();
357            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
358            let mut remote = remote_unlazy.as_lazy();
359            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
360
361            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
362            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
363            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
364            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
365            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
366            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
367
368            'merge_construction: loop {
369                // process just the edits which allow us to check deletions in the result
370                let mut deletions = {
371                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
372
373                    // creations
374                    let mut deletion_creations = HashSet::new();
375                    for id in db.local_metadata.ids() {
376                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
377                            deletion_creations.insert(id);
378                        }
379                    }
380                    'drain_creations: while !deletion_creations.is_empty() {
381                        'choose_a_creation: for id in &deletion_creations {
382                            // create
383                            let id = *id;
384                            let local_file = local.find(&id)?.clone();
385                            let result = deletions.create_unvalidated(
386                                id,
387                                symkey::generate_key(),
388                                local_file.parent(),
389                                &local.name(&id, &self.keychain)?,
390                                local_file.file_type(),
391                                &self.keychain,
392                            );
393                            match result {
394                                Ok(_) => {
395                                    deletion_creations.remove(&id);
396                                    continue 'drain_creations;
397                                }
398                                Err(ref err) => match err.kind {
399                                    LbErrKind::FileParentNonexistent => {
400                                        continue 'choose_a_creation;
401                                    }
402                                    _ => {
403                                        result?;
404                                    }
405                                },
406                            }
407                        }
408                        return Err(LbErrKind::Unexpected(format!(
409                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
410                        ))
411                        .into());
412                    }
413
414                    // moves (creations happen first in case a file is moved into a new folder)
415                    for id in db.local_metadata.ids() {
416                        let local_file = local.find(&id)?.clone();
417                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
418                            if !local_file.explicitly_deleted()
419                                && local_file.parent() != base_file.parent()
420                                && !files_to_unmove.contains(&id)
421                            {
422                                // move
423                                deletions.move_unvalidated(
424                                    &id,
425                                    local_file.parent(),
426                                    &self.keychain,
427                                )?;
428                            }
429                        }
430                    }
431
432                    // deletions (moves happen first in case a file is moved into a deleted folder)
433                    for id in db.local_metadata.ids() {
434                        let local_file = local.find(&id)?.clone();
435                        if local_file.explicitly_deleted() {
436                            // delete
437                            deletions.delete_unvalidated(&id, &self.keychain)?;
438                        }
439                    }
440                    deletions
441                };
442
443                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
444                let mut merge = {
445                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
446
447                    // creations and edits of created documents
448                    let mut creations = HashSet::new();
449                    for id in db.local_metadata.ids() {
450                        if deletions.maybe_find(&id).is_some()
451                            && !deletions.calculate_deleted(&id)?
452                            && remote.maybe_find(&id).is_none()
453                            && !links_to_delete.contains(&id)
454                        {
455                            creations.insert(id);
456                        }
457                    }
458                    'drain_creations: while !creations.is_empty() {
459                        'choose_a_creation: for id in &creations {
460                            // create
461                            let id = *id;
462                            let local_file = local.find(&id)?.clone();
463                            let result = merge.create_unvalidated(
464                                id,
465                                local.decrypt_key(&id, &self.keychain)?,
466                                local_file.parent(),
467                                &local.name(&id, &self.keychain)?,
468                                local_file.file_type(),
469                                &self.keychain,
470                            );
471                            match result {
472                                Ok(_) => {
473                                    creations.remove(&id);
474                                    continue 'drain_creations;
475                                }
476                                Err(ref err) => match err.kind {
477                                    LbErrKind::FileParentNonexistent => {
478                                        continue 'choose_a_creation;
479                                    }
480                                    _ => {
481                                        result?;
482                                    }
483                                },
484                            }
485                        }
486                        return Err(LbErrKind::Unexpected(format!(
487                            "sync failed to find a topomodelal order for file creations: {creations:?}"
488                        ))
489                        .into());
490                    }
491
492                    // moves, renames, edits, and shares
493                    // creations happen first in case a file is moved into a new folder
494                    for id in db.local_metadata.ids() {
495                        // skip files that are already deleted or will be deleted
496                        if deletions.maybe_find(&id).is_none()
497                            || deletions.calculate_deleted(&id)?
498                            || (remote.maybe_find(&id).is_some()
499                                && remote.calculate_deleted(&id)?)
500                        {
501                            continue;
502                        }
503
504                        let local_file = local.find(&id)?.clone();
505                        let local_name = local.name(&id, &self.keychain)?;
506                        let maybe_base_file = base.maybe_find(&id).cloned();
507                        let maybe_remote_file = remote.maybe_find(&id).cloned();
508                        if let Some(ref base_file) = maybe_base_file {
509                            let base_name = base.name(&id, &self.keychain)?;
510                            let remote_file = remote.find(&id)?.clone();
511                            let remote_name = remote.name(&id, &self.keychain)?;
512
513                            // move
514                            if local_file.parent() != base_file.parent()
515                                && remote_file.parent() == base_file.parent()
516                                && !files_to_unmove.contains(&id)
517                            {
518                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
519                            }
520
521                            // rename
522                            if local_name != base_name && remote_name == base_name {
523                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
524                            }
525                        }
526
527                        // share
528                        let mut remote_keys = HashMap::new();
529                        if let Some(ref remote_file) = maybe_remote_file {
530                            for key in remote_file.user_access_keys() {
531                                remote_keys.insert(
532                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
533                                    (key.mode, key.deleted),
534                                );
535                            }
536                        }
537                        for key in local_file.user_access_keys() {
538                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
539                            if let Some(&(remote_mode, remote_deleted)) =
540                                remote_keys.get(&(by, for_))
541                            {
542                                // upgrade share
543                                if key.mode > remote_mode || !key.deleted && remote_deleted {
544                                    let mode = match key.mode {
545                                        UserAccessMode::Read => ShareMode::Read,
546                                        UserAccessMode::Write => ShareMode::Write,
547                                        UserAccessMode::Owner => continue,
548                                    };
549                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
550                                }
551                                // delete share
552                                if key.deleted && !remote_deleted {
553                                    merge.delete_share_unvalidated(
554                                        &id,
555                                        Some(for_.0),
556                                        &self.keychain,
557                                    )?;
558                                }
559                            } else {
560                                // add share
561                                let mode = match key.mode {
562                                    UserAccessMode::Read => ShareMode::Read,
563                                    UserAccessMode::Write => ShareMode::Write,
564                                    UserAccessMode::Owner => continue,
565                                };
566                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
567                            }
568                        }
569
570                        // share deletion due to conflicts
571                        if files_to_unshare.contains(&id) {
572                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
573                        }
574
575                        // rename due to path conflict
576                        if let Some(&rename_increment) = rename_increments.get(&id) {
577                            let name = NameComponents::from(&local_name)
578                                .generate_incremented(rename_increment)
579                                .to_name();
580                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
581                        }
582
583                        // edit
584                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
585                        let remote_hmac =
586                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
587                        let local_hmac = local_file.document_hmac().cloned();
588                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
589                            && local_hmac != base_hmac
590                        {
591                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
592                                // merge
593                                let merge_name = merge.name(&id, &self.keychain)?;
594                                let document_type =
595                                    DocumentType::from_file_name_using_extension(&merge_name);
596
597                                // todo these accesses are potentially problematic
598                                // maybe not if service/docs is the persion doing network io
599                                let base_document =
600                                    self.read_document_helper(id, &mut base).await?;
601                                let remote_document =
602                                    self.read_document_helper(id, &mut remote).await?;
603                                let local_document =
604                                    self.read_document_helper(id, &mut local).await?;
605
606                                match document_type {
607                                    DocumentType::Text => {
608                                        // 3-way merge
609                                        // todo: a couple more clones than necessary
610                                        let base_document =
611                                            String::from_utf8_lossy(&base_document).to_string();
612                                        let remote_document =
613                                            String::from_utf8_lossy(&remote_document).to_string();
614                                        let local_document =
615                                            String::from_utf8_lossy(&local_document).to_string();
616                                        let merged_document =
617                                            text::buffer::Buffer::from(base_document.as_str())
618                                                .merge(local_document, remote_document);
619                                        let encrypted_document = merge
620                                            .update_document_unvalidated(
621                                                &id,
622                                                &merged_document.into_bytes(),
623                                                &self.keychain,
624                                            )?;
625                                        let hmac = merge.find(&id)?.document_hmac().copied();
626                                        self.docs.insert(id, hmac, &encrypted_document).await?;
627                                    }
628                                    DocumentType::Drawing => {
629                                        let base_document =
630                                            String::from_utf8_lossy(&base_document).to_string();
631                                        let remote_document =
632                                            String::from_utf8_lossy(&remote_document).to_string();
633                                        let local_document =
634                                            String::from_utf8_lossy(&local_document).to_string();
635
636                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
637                                        let remote_buffer =
638                                            svg::buffer::Buffer::new(&remote_document);
639                                        let mut local_buffer =
640                                            svg::buffer::Buffer::new(&local_document);
641
642                                        for (_, el) in local_buffer.elements.iter_mut() {
643                                            if let Element::Path(path) = el {
644                                                path.data.apply_transform(u_transform_to_bezier(
645                                                    &Transform::from(
646                                                        local_buffer
647                                                            .weak_viewport_settings
648                                                            .master_transform,
649                                                    ),
650                                                ));
651                                            }
652                                        }
653                                        svg::buffer::Buffer::reload(
654                                            &mut local_buffer.elements,
655                                            &mut local_buffer.weak_images,
656                                            &mut local_buffer.weak_path_pressures,
657                                            &mut local_buffer.weak_viewport_settings,
658                                            &base_buffer,
659                                            &remote_buffer,
660                                        );
661
662                                        let merged_document = local_buffer.serialize();
663                                        let encrypted_document = merge
664                                            .update_document_unvalidated(
665                                                &id,
666                                                &merged_document.into_bytes(),
667                                                &self.keychain,
668                                            )?;
669                                        let hmac = merge.find(&id)?.document_hmac().copied();
670                                        self.docs.insert(id, hmac, &encrypted_document).await?;
671                                    }
672                                    DocumentType::Other => {
673                                        // duplicate file
674                                        let merge_parent = *merge.find(&id)?.parent();
675                                        let duplicate_id = if let Some(&duplicate_id) =
676                                            duplicate_file_ids.get(&id)
677                                        {
678                                            duplicate_id
679                                        } else {
680                                            let duplicate_id = Uuid::new_v4();
681                                            duplicate_file_ids.insert(id, duplicate_id);
682                                            rename_increments.insert(duplicate_id, 1);
683                                            duplicate_id
684                                        };
685
686                                        let mut merge_name = merge_name;
687                                        merge_name = NameComponents::from(&merge_name)
688                                            .generate_incremented(
689                                                rename_increments
690                                                    .get(&duplicate_id)
691                                                    .copied()
692                                                    .unwrap_or_default(),
693                                            )
694                                            .to_name();
695
696                                        merge.create_unvalidated(
697                                            duplicate_id,
698                                            symkey::generate_key(),
699                                            &merge_parent,
700                                            &merge_name,
701                                            FileType::Document,
702                                            &self.keychain,
703                                        )?;
704                                        let encrypted_document = merge
705                                            .update_document_unvalidated(
706                                                &duplicate_id,
707                                                &local_document,
708                                                &self.keychain,
709                                            )?;
710                                        let duplicate_hmac =
711                                            merge.find(&duplicate_id)?.document_hmac().copied();
712                                        self.docs
713                                            .insert(
714                                                duplicate_id,
715                                                duplicate_hmac,
716                                                &encrypted_document,
717                                            )
718                                            .await?;
719                                    }
720                                }
721                            } else {
722                                // overwrite (todo: avoid reading/decrypting/encrypting document)
723                                let document = self.read_document_helper(id, &mut local).await?;
724                                merge.update_document_unvalidated(
725                                    &id,
726                                    &document,
727                                    &self.keychain,
728                                )?;
729                            }
730                        }
731                    }
732
733                    // deletes
734                    // moves happen first in case a file is moved into a deleted folder
735                    for id in db.local_metadata.ids() {
736                        if db.base_metadata.maybe_find(&id).is_some()
737                            && deletions.calculate_deleted(&id)?
738                            && !merge.calculate_deleted(&id)?
739                        {
740                            // delete
741                            merge.delete_unvalidated(&id, &self.keychain)?;
742                        }
743                    }
744                    for &id in &links_to_delete {
745                        // delete
746                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
747                            merge.delete_unvalidated(&id, &self.keychain)?;
748                        }
749                    }
750
751                    merge
752                };
753
754                // validate; handle failures by introducing changeset constraints
755                for link in merge.ids() {
756                    if !merge.calculate_deleted(&link)? {
757                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
758                            if merge.maybe_find(&target).is_some()
759                                && merge.calculate_deleted(&target)?
760                            {
761                                // delete links to deleted files
762                                if links_to_delete.insert(link) {
763                                    continue 'merge_construction;
764                                } else {
765                                    return Err(LbErrKind::Unexpected(format!(
766                                        "sync failed to resolve broken link (deletion): {link:?}"
767                                    ))
768                                    .into());
769                                }
770                            }
771                        }
772                    }
773                }
774
775                let validate_result = merge.validate(me);
776                match validate_result {
777                    // merge changeset is valid
778                    Ok(_) => {
779                        let (_, merge_changes) = merge.unstage();
780                        break merge_changes;
781                    }
782                    Err(ref err) => match err.kind {
783                        LbErrKind::Validation(ref vf) => match vf {
784                            // merge changeset has resolvable validation errors and needs modification
785                            ValidationFailure::Cycle(ids) => {
786                                // revert all local moves in the cycle
787                                let mut progress = false;
788                                for &id in ids {
789                                    if db.local_metadata.maybe_find(&id).is_some()
790                                        && files_to_unmove.insert(id)
791                                    {
792                                        progress = true;
793                                    }
794                                }
795                                if !progress {
796                                    return Err(LbErrKind::Unexpected(format!(
797                                        "sync failed to resolve cycle: {ids:?}"
798                                    ))
799                                    .into());
800                                }
801                            }
802                            ValidationFailure::PathConflict(ids) => {
803                                // pick one local id and generate a non-conflicting filename
804                                let mut progress = false;
805                                for &id in ids {
806                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
807                                        *rename_increments.entry(id).or_insert(0) += 1;
808                                        progress = true;
809                                        break;
810                                    }
811                                }
812                                if !progress {
813                                    for &id in ids {
814                                        if db.local_metadata.maybe_find(&id).is_some() {
815                                            *rename_increments.entry(id).or_insert(0) += 1;
816                                            progress = true;
817                                            break;
818                                        }
819                                    }
820                                }
821                                if !progress {
822                                    return Err(LbErrKind::Unexpected(format!(
823                                        "sync failed to resolve path conflict: {ids:?}"
824                                    ))
825                                    .into());
826                                }
827                            }
828                            ValidationFailure::SharedLink { link, shared_ancestor } => {
829                                // if ancestor is newly shared, delete share, otherwise delete link
830                                let mut progress = false;
831                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
832                                {
833                                    if !base_shared_ancestor.is_shared()
834                                        && files_to_unshare.insert(*shared_ancestor)
835                                    {
836                                        progress = true;
837                                    }
838                                }
839                                if !progress && links_to_delete.insert(*link) {
840                                    progress = true;
841                                }
842                                if !progress {
843                                    return Err(LbErrKind::Unexpected(format!(
844                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
845                                )).into());
846                                }
847                            }
848                            ValidationFailure::DuplicateLink { target } => {
849                                // delete local link with this target
850                                let mut progress = false;
851                                if let Some(link) = local.linked_by(target)? {
852                                    if links_to_delete.insert(link) {
853                                        progress = true;
854                                    }
855                                }
856                                if !progress {
857                                    return Err(LbErrKind::Unexpected(format!(
858                                        "sync failed to resolve duplicate link: target: {target:?}"
859                                    ))
860                                    .into());
861                                }
862                            }
863                            ValidationFailure::BrokenLink(link) => {
864                                // delete local link with this target
865                                if !links_to_delete.insert(*link) {
866                                    return Err(LbErrKind::Unexpected(format!(
867                                        "sync failed to resolve broken link: {link:?}"
868                                    ))
869                                    .into());
870                                }
871                            }
872                            ValidationFailure::OwnedLink(link) => {
873                                // if target is newly owned, unmove target, otherwise delete link
874                                let mut progress = false;
875                                if let Some(remote_link) = remote.maybe_find(link) {
876                                    if let FileType::Link { target } = remote_link.file_type() {
877                                        let remote_target = remote.find(&target)?;
878                                        if remote_target.owner() != me
879                                            && files_to_unmove.insert(target)
880                                        {
881                                            progress = true;
882                                        }
883                                    }
884                                }
885                                if !progress && links_to_delete.insert(*link) {
886                                    progress = true;
887                                }
888                                if !progress {
889                                    return Err(LbErrKind::Unexpected(format!(
890                                        "sync failed to resolve owned link: {link:?}"
891                                    ))
892                                    .into());
893                                }
894                            }
895                            // merge changeset has unexpected validation errors
896                            ValidationFailure::Orphan(_)
897                            | ValidationFailure::NonFolderWithChildren(_)
898                            | ValidationFailure::FileWithDifferentOwnerParent(_)
899                            | ValidationFailure::FileNameTooLong(_)
900                            | ValidationFailure::DeletedFileUpdated(_)
901                            | ValidationFailure::NonDecryptableFileName(_) => {
902                                validate_result?;
903                            }
904                        },
905                        // merge changeset has unexpected errors
906                        _ => {
907                            validate_result?;
908                        }
909                    },
910                }
911            }
912        };
913
914        // base = remote; local = merge
915        (&mut db.base_metadata)
916            .to_staged(remote_changes.clone())
917            .to_lazy()
918            .promote()?;
919        db.local_metadata.clear()?;
920        (&mut db.local_metadata)
921            .to_staged(merge_changes)
922            .to_lazy()
923            .promote()?;
924
925        // todo who else calls this did they manage locks right?
926        // self.cleanup_local_metadata()?;
927        db.base_metadata.stage(&mut db.local_metadata).prune()?;
928
929        if start.elapsed() > web_time::Duration::from_millis(100) {
930            warn!("sync merge held lock for {:?}", start.elapsed());
931        }
932
933        Ok(())
934    }
935
936    async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
937        if state.new_root.is_some() {
938            self.events.signed_in();
939        }
940
941        if !state.remote_changes.is_empty() {
942            self.events.meta_changed(Actor::Sync);
943
944            let owner = Owner(self.keychain.get_pk()?);
945            if state.remote_changes.iter().any(|f| f.owner() != owner) {
946                self.events.pending_shares_changed();
947            }
948        }
949
950        for &doc in &state.pulled_docs {
951            self.events.doc_written(doc, Actor::Sync);
952        }
953
954        Ok(())
955    }
956
957    async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
958        let mut tx = self.begin_tx().await;
959        let db = tx.db();
960        db.last_synced.insert(state.updates_as_of as i64)?;
961
962        if let Some(root) = state.new_root {
963            db.root.insert(root)?;
964        }
965
966        Ok(())
967    }
968
969    async fn populate_pk_cache(&self) -> LbResult<()> {
970        // todo: is this the move?
971        let mut missing_owners = HashSet::new();
972        {
973            let tx = self.ro_tx().await;
974            let db = tx.db();
975            for file in db.base_metadata.get().values() {
976                for user_access_key in file.user_access_keys() {
977                    let enc_by = Owner(user_access_key.encrypted_by);
978                    let enc_for = Owner(user_access_key.encrypted_for);
979
980                    if !db.pub_key_lookup.get().contains_key(&enc_by) {
981                        missing_owners.insert(enc_by);
982                    }
983
984                    if !db.pub_key_lookup.get().contains_key(&enc_for) {
985                        missing_owners.insert(enc_for);
986                    }
987                }
988            }
989        }
990
991        let mut new_owners = HashMap::new();
992        {
993            for owner in missing_owners {
994                let username_result = self
995                    .client
996                    .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
997                    .await;
998                new_owners.insert(owner, username_result);
999            }
1000        }
1001
1002        let mut tx = self.begin_tx().await;
1003        let db = tx.db();
1004
1005        let have_updates = !new_owners.is_empty();
1006        for (owner, username) in new_owners {
1007            let username = match username {
1008                Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1009                Ok(username) => username.username,
1010                _ => continue, // todo: possibly add some logging here
1011            };
1012
1013            db.pub_key_lookup.insert(owner, username).unwrap();
1014        }
1015
1016        if have_updates {
1017            self.events.meta_changed(Actor::Sync);
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Updates remote and base metadata to local.
1024    async fn push_meta(&self) -> LbResult<()> {
1025        let mut updates = vec![];
1026        let mut local_changes_no_digests = Vec::new();
1027
1028        let tx = self.ro_tx().await;
1029        let db = tx.db();
1030
1031        // remote = local
1032        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1033
1034        for id in local.tree.staged.ids() {
1035            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1036            let maybe_base_file = local.tree.base.maybe_find(&id);
1037
1038            // change everything but document hmac and re-sign
1039            local_change.set_hmac_and_size(
1040                maybe_base_file.and_then(|f| f.document_hmac().copied()),
1041                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1042            );
1043            let local_change = local_change.sign(&self.keychain)?;
1044
1045            local_changes_no_digests.push(local_change.clone());
1046            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1047            updates.push(file_diff);
1048        }
1049
1050        drop(tx);
1051
1052        if !updates.is_empty() {
1053            self.client
1054                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1055                .await?;
1056        }
1057
1058        let mut tx = self.begin_tx().await;
1059        let db = tx.db();
1060
1061        // base = local
1062        (&mut db.base_metadata)
1063            .to_lazy()
1064            .stage(local_changes_no_digests)
1065            .promote()?;
1066        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1067
1068        tx.end();
1069
1070        Ok(())
1071    }
1072
1073    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1074    // todo: make this so that all document updates are attempted and we don't just return the
1075    // first error. Once an attempt is made we can return any or all errors, either would be an
1076    // improvement
1077    async fn push_docs(&self) -> LbResult<()> {
1078        let mut updates = vec![];
1079        let mut local_changes_digests_only = vec![];
1080
1081        let tx = self.ro_tx().await;
1082        let db = tx.db();
1083        let start = Instant::now();
1084
1085        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1086
1087        for id in local.tree.staged.ids() {
1088            let base_file = local.tree.base.find(&id)?.clone();
1089
1090            // change only document hmac and re-sign
1091            let mut local_change = base_file.timestamped_value.value.clone();
1092            local_change.set_hmac_and_size(
1093                local.find(&id)?.document_hmac().copied(),
1094                *local.find(&id)?.timestamped_value.value.doc_size(),
1095            );
1096
1097            if base_file.document_hmac() == local_change.document_hmac()
1098                || local_change.document_hmac().is_none()
1099            {
1100                continue;
1101            }
1102
1103            let local_change = local_change.sign(&self.keychain)?;
1104
1105            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1106            local_changes_digests_only.push(local_change);
1107            self.events
1108                .sync_update(SyncIncrement::PushingDocument(id, true));
1109        }
1110
1111        drop(tx);
1112        if start.elapsed() > web_time::Duration::from_millis(100) {
1113            warn!("sync push_docs held lock for {:?}", start.elapsed());
1114        }
1115
1116        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1117
1118        let mut stream = stream::iter(futures).buffer_unordered(
1119            thread::available_parallelism()
1120                .unwrap_or(NonZeroUsize::new(4).unwrap())
1121                .into(),
1122        );
1123
1124        let mut docs_without_errors = vec![];
1125        let mut last_error: Option<LbErr> = None;
1126
1127        while let Some(fut) = stream.next().await {
1128            match fut {
1129                Ok(id) => {
1130                    docs_without_errors.push(id);
1131                    self.events
1132                        .sync_update(SyncIncrement::PushingDocument(id, false));
1133                }
1134                Err(err) => {
1135                    last_error = Some(err);
1136                }
1137            }
1138        }
1139
1140        local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1141
1142        let mut tx = self.begin_tx().await;
1143        let db = tx.db();
1144        // base = local (metadata)
1145        (&mut db.base_metadata)
1146            .to_lazy()
1147            .stage(local_changes_digests_only)
1148            .promote()?;
1149
1150        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1151
1152        tx.end();
1153
1154        if let Some(err) = last_error { Err(err) } else { Ok(()) }
1155    }
1156
1157    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1158        let id = *diff.new.id();
1159        let hmac = diff.new.document_hmac();
1160        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1161        self.client
1162            .request(
1163                self.get_account()?,
1164                ChangeDocRequestV2 { diff, new_content: local_document_change },
1165            )
1166            .await?;
1167
1168        Ok(id)
1169    }
1170
1171    #[cfg(not(target_family = "wasm"))]
1172    async fn send_debug_info(&self, account: Account) {
1173        let debug_info = self
1174            .debug_info("none provided - sync".to_string(), false)
1175            .await
1176            .unwrap();
1177
1178        if self.config.background_work {
1179            let bg_self = self.clone();
1180            tokio::spawn(async move {
1181                bg_self
1182                    .client
1183                    .request(&account, UpsertDebugInfoRequest { debug_info })
1184                    .await
1185                    .log_and_ignore();
1186            });
1187        } else {
1188            self.client
1189                .request(&account, UpsertDebugInfoRequest { debug_info })
1190                .await
1191                .log_and_ignore();
1192        }
1193    }
1194
1195    async fn read_document_helper<T>(
1196        &self, id: Uuid, tree: &mut LazyTree<T>,
1197    ) -> LbResult<DecryptedDocument>
1198    where
1199        T: TreeLike<F = SignedMeta>,
1200    {
1201        let file = tree.find(&id)?;
1202        validate::is_document(file)?;
1203        let hmac = file.document_hmac().copied();
1204
1205        if tree.calculate_deleted(&id)? {
1206            return Err(LbErrKind::FileNonexistent.into());
1207        }
1208
1209        let doc = match hmac {
1210            Some(hmac) => {
1211                let doc = self.docs.get(id, Some(hmac)).await?;
1212                tree.decrypt_document(&id, &doc, &self.keychain)?
1213            }
1214            None => vec![],
1215        };
1216
1217        Ok(doc)
1218    }
1219
1220    /// for tests only
1221    #[doc(hidden)]
1222    pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1223        let mut state = self.syncer.lock().await;
1224        self.inital_sync_state(&mut state).await?;
1225        self.process_deletions().await?;
1226        self.fetch_meta(&mut state).await?;
1227
1228        let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1229
1230        Ok(server_ids)
1231    }
1232
1233    pub(crate) fn setup_syncer(&self) {
1234        if self.config.background_work {
1235            self.clone().local_change_worker();
1236            self.clone().periodic_sync_worker();
1237            self.clone().post_sync_worker();
1238        }
1239    }
1240
1241    fn local_change_worker(self) {
1242        #[cfg(not(target_family = "wasm"))]
1243        tokio::spawn(async move {
1244            let mut events = self.subscribe();
1245
1246            let sync_criteria = |e: Event| {
1247                matches!(
1248                    e,
1249                    Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1250                )
1251            };
1252
1253            loop {
1254                time::sleep(Duration::from_millis(500)).await;
1255                let mut should_sync = false;
1256
1257                // drain the current channel, so we don't sync for each keystroke if they pile up
1258                loop {
1259                    let event = events.try_recv();
1260                    match event {
1261                        Ok(event) => {
1262                            if sync_criteria(event) {
1263                                should_sync = true;
1264                            }
1265                        }
1266                        Err(TryRecvError::Empty) => break,
1267                        _ => {
1268                            panic!(
1269                                "unexpected broadcast receive error, returning local_change_worker"
1270                            );
1271                        }
1272                    }
1273                }
1274
1275                // empty channel + nothing interesting has happened, sit and wait for something
1276                // interesting
1277                if !should_sync {
1278                    let event = events.recv().await.unwrap();
1279                    if sync_criteria(event) {
1280                        self.sync().await.map_unexpected().log_and_ignore();
1281                    } else {
1282                        continue;
1283                    }
1284                }
1285            }
1286        });
1287    }
1288
1289    fn periodic_sync_worker(self) {
1290        #[cfg(not(target_family = "wasm"))]
1291        tokio::spawn(async move {
1292            loop {
1293                self.sync().await.map_unexpected().log_and_ignore();
1294                if self.user_active().await {
1295                    tokio::time::sleep(Duration::from_secs(3)).await;
1296                } else {
1297                    tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1298                }
1299            }
1300        });
1301    }
1302
1303    async fn user_active(&self) -> bool {
1304        let last_seen = self.user_last_seen.read().await;
1305        last_seen.elapsed() < Duration::from_secs(3 * 60)
1306    }
1307
1308    fn post_sync_worker(self) {
1309        #[cfg(not(target_family = "wasm"))]
1310        tokio::spawn(async move {
1311            let mut events = self.subscribe();
1312
1313            loop {
1314                let event = events.recv().await.unwrap();
1315                if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1316                    self.fetcher().await.map_unexpected().log_and_ignore();
1317                    self.populate_pk_cache()
1318                        .await
1319                        .map_unexpected()
1320                        .log_and_ignore();
1321                };
1322            }
1323        });
1324    }
1325
1326    async fn fetcher(&self) -> LbResult<()> {
1327        let mut files_to_pull = vec![];
1328
1329        let tx = self.ro_tx().await;
1330        let db = tx.db();
1331
1332        let Some(root) = db.root.get() else {
1333            return Ok(());
1334        };
1335
1336        // we can only fetch things we know the server knows about
1337        let mut tree = db.base_metadata.stage(None).to_lazy();
1338
1339        for id in tree.descendants_using_links(root)? {
1340            let file = tree.find(&id)?;
1341            let hmac = file.document_hmac().copied();
1342
1343            // skip non-documents
1344            if !file.is_document() {
1345                continue;
1346            }
1347
1348            // skip deleted files
1349            if tree.calculate_deleted(&id)? {
1350                continue;
1351            }
1352
1353            // skip non-first-party files
1354            let name = tree.name(&id, &self.keychain)?;
1355            if !name.ends_with(".md") && !name.ends_with(".svg") {
1356                continue;
1357            }
1358
1359            files_to_pull.push((id, hmac));
1360        }
1361
1362        drop(tx);
1363
1364        // this could all be done in parallel, but for now going to not do it that way
1365        // benefits: less work, but also ensures that a file that needs to be fetched immediately
1366        // can be
1367        for (id, hmac) in files_to_pull {
1368            if let Some(hmac) = hmac {
1369                self.fetch_doc(id, hmac).await?;
1370            }
1371        }
1372
1373        Ok(())
1374    }
1375}