Skip to main content

lb_rs/subscribers/
syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4    sync::Arc,
5    thread,
6    time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use usvg::Transform;
12use uuid::Uuid;
13
14use crate::{
15    Lb, LbErrKind, LbResult,
16    io::network::ApiError,
17    model::{
18        ValidationFailure,
19        access_info::UserAccessMode,
20        account::Account,
21        api::{
22            ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
23            GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
24        },
25        crypto::{DecryptedDocument, EncryptedDocument},
26        errors::{LbErr, Unexpected},
27        file::ShareMode,
28        file_like::FileLike,
29        file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
30        filename::{DocumentType, NameComponents},
31        lazy::LazyTree,
32        signed_meta::SignedMeta,
33        staged::StagedTreeLikeMut,
34        svg::{self, buffer::u_transform_to_bezier, element::Element},
35        symkey, text,
36        tree_like::TreeLike,
37        validate,
38    },
39    service::events::{Actor, Event, SyncIncrement},
40};
41
42pub type Syncer = Arc<Mutex<SyncState>>;
43
44#[derive(Default)]
45pub struct SyncState {
46    /// the starting point for updates for this sync pass
47    last_synced: u64,
48
49    /// if our pull is successful, this is the timestamp we will commit
50    updates_as_of: u64,
51
52    /// changes we pulled from the server, post deduplication
53    remote_changes: Vec<SignedMeta>,
54
55    /// did we pull a root on this pass?
56    new_root: Option<Uuid>,
57
58    /// what docs did we pull as a result of this sync
59    pulled_docs: Vec<Uuid>,
60}
61
62// we are gonna have a fetch metadata fn which will get the docs that it needs to get, the ones
63// that match should_fetch
64//
65// should_fetch is going to be a tree fn that will return true if:
66//     is md or svg that descends from
67
68impl Lb {
69    #[instrument(level = "debug", skip(self), err(Debug))]
70    pub async fn sync(&self) -> LbResult<()> {
71        let mut sync_state = self.syncer.lock().await;
72        self.events.sync_update(SyncIncrement::SyncStarted);
73
74        let pipeline: LbResult<()> = async {
75            self.pull_updates(&mut sync_state).await?;
76            self.push_local_changes().await?;
77            Ok(())
78        }
79        .await;
80
81        self.events.sync_update(SyncIncrement::SyncFinished(
82            pipeline.as_ref().err().map(|err| err.kind.clone()),
83        ));
84
85        self.cleanup().await?;
86
87        pipeline?;
88
89        let account = self.get_account()?.clone();
90
91        #[cfg(not(target_family = "wasm"))]
92        if account.is_beta() {
93            self.send_debug_info(account).await;
94        }
95
96        Ok(())
97    }
98
99    pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
100        self.inital_sync_state(sync_state).await?;
101        self.process_deletions().await?;
102        self.fetch_meta(sync_state).await?;
103        self.fetch_required_docs(sync_state).await?;
104        // todo: should this inform a re-pull?
105        self.merge(sync_state).await?;
106        self.commit_last_synced(sync_state).await?;
107        self.send_pull_events(sync_state).await?;
108
109        if !self.config.background_work {
110            self.populate_pk_cache().await?;
111        }
112
113        Ok(())
114    }
115
116    pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
117        self.push_meta().await?;
118        self.push_docs().await?;
119
120        Ok(())
121    }
122
123    async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
124        let tx = self.ro_tx().await;
125        let db = tx.db();
126
127        *state = Default::default();
128        state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
129
130        Ok(())
131    }
132
133    pub(crate) async fn process_deletions(&self) -> LbResult<()> {
134        let server_ids = self
135            .client
136            .request(self.get_account()?, GetFileIdsRequest {})
137            .await?
138            .ids;
139
140        let mut tx = self.begin_tx().await;
141        let db = tx.db();
142
143        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
144        let base_ids = local.tree.base.ids();
145
146        let mut prunable_ids = base_ids;
147        prunable_ids.retain(|id| !server_ids.contains(id));
148        for id in prunable_ids.clone() {
149            prunable_ids.extend(local.descendants(&id)?.into_iter());
150        }
151        for id in &prunable_ids {
152            if let Some(base_file) = local.tree.base.maybe_find(id) {
153                self.docs
154                    .delete(*id, base_file.document_hmac().copied())
155                    .await?;
156            }
157            if let Some(local_file) = local.maybe_find(id) {
158                self.docs
159                    .delete(*id, local_file.document_hmac().copied())
160                    .await?;
161            }
162        }
163
164        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
165        base_staged.tree.removed = prunable_ids.iter().copied().collect();
166        base_staged.promote()?;
167
168        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
169        local_staged.tree.removed = prunable_ids.iter().copied().collect();
170        local_staged.promote()?;
171
172        if !prunable_ids.is_empty() {
173            self.events.meta_changed(Actor::Sync);
174        }
175
176        Ok(())
177    }
178
179    async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
180        let updates = self
181            .client
182            .request(
183                self.get_account()?,
184                GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
185            )
186            .await?;
187
188        let tx = self.ro_tx().await;
189        let db = tx.db();
190
191        // this loop implicitly prunes remote orphans
192        let mut without_orphans = Vec::new();
193        let me = Owner(self.keychain.get_pk()?);
194        let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
195        for id in remote.tree.staged.ids() {
196            let meta = remote.find(&id)?;
197            if remote.maybe_find_parent(meta).is_some()
198                || meta
199                    .user_access_keys()
200                    .iter()
201                    .any(|k| k.encrypted_for == me.0)
202            {
203                without_orphans.push(remote.find(&id)?.clone());
204            }
205        }
206
207        // this is what actually performs the deduplication
208        let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
209        let (_, deduped_changes) = pruned_tree.unstage();
210
211        // initialize root if this is the first pull on this device
212        let mut root_id = None;
213        if db.root.get().is_none() {
214            let root = deduped_changes
215                .all_files()?
216                .into_iter()
217                .find(|f| f.is_root())
218                .ok_or(LbErrKind::RootNonexistent)?;
219            root_id = Some(*root.id());
220        }
221
222        state.remote_changes = deduped_changes;
223        state.updates_as_of = updates.as_of_metadata_version;
224        state.new_root = root_id;
225
226        Ok(())
227    }
228
229    async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
230        let mut docs_to_pull = vec![];
231
232        let tx = self.ro_tx().await;
233        let db = tx.db();
234
235        let mut files_with_local_edits = vec![];
236        let local = db.base_metadata.stage(&db.local_metadata);
237        for id in local.staged.ids() {
238            if let Some(base) = local.base.maybe_find(&id) {
239                if let Some(local_hmac) = local.find(&id)?.document_hmac() {
240                    if Some(local_hmac) != base.document_hmac() {
241                        files_with_local_edits.push(id);
242                        println!("local edits found");
243                    }
244                }
245            }
246        }
247
248        let mut remote = db
249            .base_metadata
250            .stage(state.remote_changes.clone())
251            .to_lazy();
252
253        for id in remote.tree.staged.ids() {
254            if remote.calculate_deleted(&id)? {
255                continue;
256            }
257            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
258            let base_hmac = remote
259                .tree
260                .base
261                .maybe_find(&id)
262                .and_then(|f| f.document_hmac())
263                .cloned();
264            if base_hmac == remote_hmac {
265                continue;
266            }
267
268            if let Some(remote_hmac) = remote_hmac {
269                // pull a file if we have a prior base, this is our heuristic -- do they have the
270                // ability to edit this file while we release the lock and are pulling all the
271                // files
272                if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
273                    docs_to_pull.push((id, remote_hmac));
274                }
275
276                // this clause captures documents which went from being new -> multiple parties
277                // having updates. We'll still need the updates
278                if files_with_local_edits.contains(&id)
279                    && !docs_to_pull
280                        .iter()
281                        .any(|(already_pulling, _)| already_pulling == &id)
282                {
283                    if let Some(base_hmac) = base_hmac {
284                        if !self.docs.exists(id, Some(base_hmac)) {
285                            // this scenario basically only comes up in tests
286                            // someone modifies a file directly without reading the prior version
287                            docs_to_pull.push((id, base_hmac));
288                        }
289                    }
290                    docs_to_pull.push((id, remote_hmac));
291                }
292            }
293        }
294        drop(tx);
295
296        let futures = docs_to_pull
297            .into_iter()
298            .map(|(id, hmac)| async move { self.fetch_doc(id, hmac).await.map(|_| id) });
299
300        let mut stream = stream::iter(futures).buffer_unordered(
301            thread::available_parallelism()
302                .unwrap_or(NonZeroUsize::new(4).unwrap())
303                .into(),
304        );
305
306        while let Some(fut) = stream.next().await {
307            let id = fut?;
308            state.pulled_docs.push(id);
309        }
310
311        Ok(())
312    }
313
314    pub(crate) async fn fetch_doc(
315        &self, id: Uuid, hmac: DocumentHmac,
316    ) -> LbResult<EncryptedDocument> {
317        // todo: in a lot of cases there is a list of ids we're trying to get, it would be better
318        // if the caller managed the event updates, the status would be more meaningful for longer
319
320        // in this world, can we get stuck pushing a doc as well? Probably fine for now
321        if let Ok(Some(doc)) = self.docs.maybe_get(id, Some(hmac)).await {
322            return Ok(doc);
323        }
324
325        self.events
326            .sync_update(SyncIncrement::PullingDocument(id, true));
327        let remote_document = self
328            .client
329            .request(self.get_account()?, GetDocRequest { id, hmac })
330            .await?;
331        self.docs
332            .insert(id, Some(hmac), &remote_document.content)
333            .await?;
334        self.events
335            .sync_update(SyncIncrement::PullingDocument(id, false));
336
337        Ok(remote_document.content)
338    }
339
340    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
341    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
342    async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
343        let mut tx = self.begin_tx().await;
344        let db = tx.db();
345        let start = Instant::now();
346
347        let remote_changes = &state.remote_changes;
348
349        // fetch document updates and local documents for merge
350        let me = Owner(self.keychain.get_pk()?);
351
352        // compute merge changes
353        let merge_changes = {
354            // assemble trees
355            let mut base = (&db.base_metadata).to_lazy();
356            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
357            let mut remote = remote_unlazy.as_lazy();
358            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
359
360            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
361            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
362            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
363            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
364            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
365            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
366
367            'merge_construction: loop {
368                // process just the edits which allow us to check deletions in the result
369                let mut deletions = {
370                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
371
372                    // creations
373                    let mut deletion_creations = HashSet::new();
374                    for id in db.local_metadata.ids() {
375                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
376                            deletion_creations.insert(id);
377                        }
378                    }
379                    'drain_creations: while !deletion_creations.is_empty() {
380                        'choose_a_creation: for id in &deletion_creations {
381                            // create
382                            let id = *id;
383                            let local_file = local.find(&id)?.clone();
384                            let result = deletions.create_unvalidated(
385                                id,
386                                symkey::generate_key(),
387                                local_file.parent(),
388                                &local.name(&id, &self.keychain)?,
389                                local_file.file_type(),
390                                &self.keychain,
391                            );
392                            match result {
393                                Ok(_) => {
394                                    deletion_creations.remove(&id);
395                                    continue 'drain_creations;
396                                }
397                                Err(ref err) => match err.kind {
398                                    LbErrKind::FileParentNonexistent => {
399                                        continue 'choose_a_creation;
400                                    }
401                                    _ => {
402                                        result?;
403                                    }
404                                },
405                            }
406                        }
407                        return Err(LbErrKind::Unexpected(format!(
408                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
409                        ))
410                        .into());
411                    }
412
413                    // moves (creations happen first in case a file is moved into a new folder)
414                    for id in db.local_metadata.ids() {
415                        let local_file = local.find(&id)?.clone();
416                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
417                            if !local_file.explicitly_deleted()
418                                && local_file.parent() != base_file.parent()
419                                && !files_to_unmove.contains(&id)
420                            {
421                                // move
422                                deletions.move_unvalidated(
423                                    &id,
424                                    local_file.parent(),
425                                    &self.keychain,
426                                )?;
427                            }
428                        }
429                    }
430
431                    // deletions (moves happen first in case a file is moved into a deleted folder)
432                    for id in db.local_metadata.ids() {
433                        let local_file = local.find(&id)?.clone();
434                        if local_file.explicitly_deleted() {
435                            // delete
436                            deletions.delete_unvalidated(&id, &self.keychain)?;
437                        }
438                    }
439                    deletions
440                };
441
442                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
443                let mut merge = {
444                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
445
446                    // creations and edits of created documents
447                    let mut creations = HashSet::new();
448                    for id in db.local_metadata.ids() {
449                        if deletions.maybe_find(&id).is_some()
450                            && !deletions.calculate_deleted(&id)?
451                            && remote.maybe_find(&id).is_none()
452                            && !links_to_delete.contains(&id)
453                        {
454                            creations.insert(id);
455                        }
456                    }
457                    'drain_creations: while !creations.is_empty() {
458                        'choose_a_creation: for id in &creations {
459                            // create
460                            let id = *id;
461                            let local_file = local.find(&id)?.clone();
462                            let result = merge.create_unvalidated(
463                                id,
464                                local.decrypt_key(&id, &self.keychain)?,
465                                local_file.parent(),
466                                &local.name(&id, &self.keychain)?,
467                                local_file.file_type(),
468                                &self.keychain,
469                            );
470                            match result {
471                                Ok(_) => {
472                                    creations.remove(&id);
473                                    continue 'drain_creations;
474                                }
475                                Err(ref err) => match err.kind {
476                                    LbErrKind::FileParentNonexistent => {
477                                        continue 'choose_a_creation;
478                                    }
479                                    _ => {
480                                        result?;
481                                    }
482                                },
483                            }
484                        }
485                        return Err(LbErrKind::Unexpected(format!(
486                            "sync failed to find a topomodelal order for file creations: {creations:?}"
487                        ))
488                        .into());
489                    }
490
491                    // moves, renames, edits, and shares
492                    // creations happen first in case a file is moved into a new folder
493                    for id in db.local_metadata.ids() {
494                        // skip files that are already deleted or will be deleted
495                        if deletions.maybe_find(&id).is_none()
496                            || deletions.calculate_deleted(&id)?
497                            || (remote.maybe_find(&id).is_some()
498                                && remote.calculate_deleted(&id)?)
499                        {
500                            continue;
501                        }
502
503                        let local_file = local.find(&id)?.clone();
504                        let local_name = local.name(&id, &self.keychain)?;
505                        let maybe_base_file = base.maybe_find(&id).cloned();
506                        let maybe_remote_file = remote.maybe_find(&id).cloned();
507                        if let Some(ref base_file) = maybe_base_file {
508                            let base_name = base.name(&id, &self.keychain)?;
509                            let remote_file = remote.find(&id)?.clone();
510                            let remote_name = remote.name(&id, &self.keychain)?;
511
512                            // move
513                            if local_file.parent() != base_file.parent()
514                                && remote_file.parent() == base_file.parent()
515                                && !files_to_unmove.contains(&id)
516                            {
517                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
518                            }
519
520                            // rename
521                            if local_name != base_name && remote_name == base_name {
522                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
523                            }
524                        }
525
526                        // share
527                        let mut remote_keys = HashMap::new();
528                        if let Some(ref remote_file) = maybe_remote_file {
529                            for key in remote_file.user_access_keys() {
530                                remote_keys.insert(
531                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
532                                    (key.mode, key.deleted),
533                                );
534                            }
535                        }
536                        for key in local_file.user_access_keys() {
537                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
538                            if let Some(&(remote_mode, remote_deleted)) =
539                                remote_keys.get(&(by, for_))
540                            {
541                                // upgrade share
542                                if key.mode > remote_mode || !key.deleted && remote_deleted {
543                                    let mode = match key.mode {
544                                        UserAccessMode::Read => ShareMode::Read,
545                                        UserAccessMode::Write => ShareMode::Write,
546                                        UserAccessMode::Owner => continue,
547                                    };
548                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
549                                }
550                                // delete share
551                                if key.deleted && !remote_deleted {
552                                    merge.delete_share_unvalidated(
553                                        &id,
554                                        Some(for_.0),
555                                        &self.keychain,
556                                    )?;
557                                }
558                            } else {
559                                // add share
560                                let mode = match key.mode {
561                                    UserAccessMode::Read => ShareMode::Read,
562                                    UserAccessMode::Write => ShareMode::Write,
563                                    UserAccessMode::Owner => continue,
564                                };
565                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
566                            }
567                        }
568
569                        // share deletion due to conflicts
570                        if files_to_unshare.contains(&id) {
571                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
572                        }
573
574                        // rename due to path conflict
575                        if let Some(&rename_increment) = rename_increments.get(&id) {
576                            let name = NameComponents::from(&local_name)
577                                .generate_incremented(rename_increment)
578                                .to_name();
579                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
580                        }
581
582                        // edit
583                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
584                        let remote_hmac =
585                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
586                        let local_hmac = local_file.document_hmac().cloned();
587                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
588                            && local_hmac != base_hmac
589                        {
590                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
591                                // merge
592                                let merge_name = merge.name(&id, &self.keychain)?;
593                                let document_type =
594                                    DocumentType::from_file_name_using_extension(&merge_name);
595
596                                // todo these accesses are potentially problematic
597                                // maybe not if service/docs is the persion doing network io
598                                let base_document =
599                                    self.read_document_helper(id, &mut base).await?;
600                                let remote_document =
601                                    self.read_document_helper(id, &mut remote).await?;
602                                let local_document =
603                                    self.read_document_helper(id, &mut local).await?;
604
605                                match document_type {
606                                    DocumentType::Text => {
607                                        // 3-way merge
608                                        // todo: a couple more clones than necessary
609                                        let base_document =
610                                            String::from_utf8_lossy(&base_document).to_string();
611                                        let remote_document =
612                                            String::from_utf8_lossy(&remote_document).to_string();
613                                        let local_document =
614                                            String::from_utf8_lossy(&local_document).to_string();
615                                        let merged_document =
616                                            text::buffer::Buffer::from(base_document.as_str())
617                                                .merge(local_document, remote_document);
618                                        let encrypted_document = merge
619                                            .update_document_unvalidated(
620                                                &id,
621                                                &merged_document.into_bytes(),
622                                                &self.keychain,
623                                            )?;
624                                        let hmac = merge.find(&id)?.document_hmac().copied();
625                                        self.docs.insert(id, hmac, &encrypted_document).await?;
626                                    }
627                                    DocumentType::Drawing => {
628                                        let base_document =
629                                            String::from_utf8_lossy(&base_document).to_string();
630                                        let remote_document =
631                                            String::from_utf8_lossy(&remote_document).to_string();
632                                        let local_document =
633                                            String::from_utf8_lossy(&local_document).to_string();
634
635                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
636                                        let remote_buffer =
637                                            svg::buffer::Buffer::new(&remote_document);
638                                        let mut local_buffer =
639                                            svg::buffer::Buffer::new(&local_document);
640
641                                        for (_, el) in local_buffer.elements.iter_mut() {
642                                            if let Element::Path(path) = el {
643                                                path.data.apply_transform(u_transform_to_bezier(
644                                                    &Transform::from(
645                                                        local_buffer
646                                                            .weak_viewport_settings
647                                                            .master_transform,
648                                                    ),
649                                                ));
650                                            }
651                                        }
652                                        svg::buffer::Buffer::reload(
653                                            &mut local_buffer.elements,
654                                            &mut local_buffer.weak_images,
655                                            &mut local_buffer.weak_path_pressures,
656                                            &mut local_buffer.weak_viewport_settings,
657                                            &base_buffer,
658                                            &remote_buffer,
659                                        );
660
661                                        let merged_document = local_buffer.serialize();
662                                        let encrypted_document = merge
663                                            .update_document_unvalidated(
664                                                &id,
665                                                &merged_document.into_bytes(),
666                                                &self.keychain,
667                                            )?;
668                                        let hmac = merge.find(&id)?.document_hmac().copied();
669                                        self.docs.insert(id, hmac, &encrypted_document).await?;
670                                    }
671                                    DocumentType::Other => {
672                                        // duplicate file
673                                        let merge_parent = *merge.find(&id)?.parent();
674                                        let duplicate_id = if let Some(&duplicate_id) =
675                                            duplicate_file_ids.get(&id)
676                                        {
677                                            duplicate_id
678                                        } else {
679                                            let duplicate_id = Uuid::new_v4();
680                                            duplicate_file_ids.insert(id, duplicate_id);
681                                            rename_increments.insert(duplicate_id, 1);
682                                            duplicate_id
683                                        };
684
685                                        let mut merge_name = merge_name;
686                                        merge_name = NameComponents::from(&merge_name)
687                                            .generate_incremented(
688                                                rename_increments
689                                                    .get(&duplicate_id)
690                                                    .copied()
691                                                    .unwrap_or_default(),
692                                            )
693                                            .to_name();
694
695                                        merge.create_unvalidated(
696                                            duplicate_id,
697                                            symkey::generate_key(),
698                                            &merge_parent,
699                                            &merge_name,
700                                            FileType::Document,
701                                            &self.keychain,
702                                        )?;
703                                        let encrypted_document = merge
704                                            .update_document_unvalidated(
705                                                &duplicate_id,
706                                                &local_document,
707                                                &self.keychain,
708                                            )?;
709                                        let duplicate_hmac =
710                                            merge.find(&duplicate_id)?.document_hmac().copied();
711                                        self.docs
712                                            .insert(
713                                                duplicate_id,
714                                                duplicate_hmac,
715                                                &encrypted_document,
716                                            )
717                                            .await?;
718                                    }
719                                }
720                            } else {
721                                // overwrite (todo: avoid reading/decrypting/encrypting document)
722                                let document = self.read_document_helper(id, &mut local).await?;
723                                merge.update_document_unvalidated(
724                                    &id,
725                                    &document,
726                                    &self.keychain,
727                                )?;
728                            }
729                        }
730                    }
731
732                    // deletes
733                    // moves happen first in case a file is moved into a deleted folder
734                    for id in db.local_metadata.ids() {
735                        if db.base_metadata.maybe_find(&id).is_some()
736                            && deletions.calculate_deleted(&id)?
737                            && !merge.calculate_deleted(&id)?
738                        {
739                            // delete
740                            merge.delete_unvalidated(&id, &self.keychain)?;
741                        }
742                    }
743                    for &id in &links_to_delete {
744                        // delete
745                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
746                            merge.delete_unvalidated(&id, &self.keychain)?;
747                        }
748                    }
749
750                    merge
751                };
752
753                // validate; handle failures by introducing changeset constraints
754                for link in merge.ids() {
755                    if !merge.calculate_deleted(&link)? {
756                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
757                            if merge.maybe_find(&target).is_some()
758                                && merge.calculate_deleted(&target)?
759                            {
760                                // delete links to deleted files
761                                if links_to_delete.insert(link) {
762                                    continue 'merge_construction;
763                                } else {
764                                    return Err(LbErrKind::Unexpected(format!(
765                                        "sync failed to resolve broken link (deletion): {link:?}"
766                                    ))
767                                    .into());
768                                }
769                            }
770                        }
771                    }
772                }
773
774                let validate_result = merge.validate(me);
775                match validate_result {
776                    // merge changeset is valid
777                    Ok(_) => {
778                        let (_, merge_changes) = merge.unstage();
779                        break merge_changes;
780                    }
781                    Err(ref err) => match err.kind {
782                        LbErrKind::Validation(ref vf) => match vf {
783                            // merge changeset has resolvable validation errors and needs modification
784                            ValidationFailure::Cycle(ids) => {
785                                // revert all local moves in the cycle
786                                let mut progress = false;
787                                for &id in ids {
788                                    if db.local_metadata.maybe_find(&id).is_some()
789                                        && files_to_unmove.insert(id)
790                                    {
791                                        progress = true;
792                                    }
793                                }
794                                if !progress {
795                                    return Err(LbErrKind::Unexpected(format!(
796                                        "sync failed to resolve cycle: {ids:?}"
797                                    ))
798                                    .into());
799                                }
800                            }
801                            ValidationFailure::PathConflict(ids) => {
802                                // pick one local id and generate a non-conflicting filename
803                                let mut progress = false;
804                                for &id in ids {
805                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
806                                        *rename_increments.entry(id).or_insert(0) += 1;
807                                        progress = true;
808                                        break;
809                                    }
810                                }
811                                if !progress {
812                                    for &id in ids {
813                                        if db.local_metadata.maybe_find(&id).is_some() {
814                                            *rename_increments.entry(id).or_insert(0) += 1;
815                                            progress = true;
816                                            break;
817                                        }
818                                    }
819                                }
820                                if !progress {
821                                    return Err(LbErrKind::Unexpected(format!(
822                                        "sync failed to resolve path conflict: {ids:?}"
823                                    ))
824                                    .into());
825                                }
826                            }
827                            ValidationFailure::SharedLink { link, shared_ancestor } => {
828                                // if ancestor is newly shared, delete share, otherwise delete link
829                                let mut progress = false;
830                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
831                                {
832                                    if !base_shared_ancestor.is_shared()
833                                        && files_to_unshare.insert(*shared_ancestor)
834                                    {
835                                        progress = true;
836                                    }
837                                }
838                                if !progress && links_to_delete.insert(*link) {
839                                    progress = true;
840                                }
841                                if !progress {
842                                    return Err(LbErrKind::Unexpected(format!(
843                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
844                                )).into());
845                                }
846                            }
847                            ValidationFailure::DuplicateLink { target } => {
848                                // delete local link with this target
849                                let mut progress = false;
850                                if let Some(link) = local.linked_by(target)? {
851                                    if links_to_delete.insert(link) {
852                                        progress = true;
853                                    }
854                                }
855                                if !progress {
856                                    return Err(LbErrKind::Unexpected(format!(
857                                        "sync failed to resolve duplicate link: target: {target:?}"
858                                    ))
859                                    .into());
860                                }
861                            }
862                            ValidationFailure::BrokenLink(link) => {
863                                // delete local link with this target
864                                if !links_to_delete.insert(*link) {
865                                    return Err(LbErrKind::Unexpected(format!(
866                                        "sync failed to resolve broken link: {link:?}"
867                                    ))
868                                    .into());
869                                }
870                            }
871                            ValidationFailure::OwnedLink(link) => {
872                                // if target is newly owned, unmove target, otherwise delete link
873                                let mut progress = false;
874                                if let Some(remote_link) = remote.maybe_find(link) {
875                                    if let FileType::Link { target } = remote_link.file_type() {
876                                        let remote_target = remote.find(&target)?;
877                                        if remote_target.owner() != me
878                                            && files_to_unmove.insert(target)
879                                        {
880                                            progress = true;
881                                        }
882                                    }
883                                }
884                                if !progress && links_to_delete.insert(*link) {
885                                    progress = true;
886                                }
887                                if !progress {
888                                    return Err(LbErrKind::Unexpected(format!(
889                                        "sync failed to resolve owned link: {link:?}"
890                                    ))
891                                    .into());
892                                }
893                            }
894                            // merge changeset has unexpected validation errors
895                            ValidationFailure::Orphan(_)
896                            | ValidationFailure::NonFolderWithChildren(_)
897                            | ValidationFailure::FileWithDifferentOwnerParent(_)
898                            | ValidationFailure::FileNameTooLong(_)
899                            | ValidationFailure::DeletedFileUpdated(_)
900                            | ValidationFailure::NonDecryptableFileName(_) => {
901                                validate_result?;
902                            }
903                        },
904                        // merge changeset has unexpected errors
905                        _ => {
906                            validate_result?;
907                        }
908                    },
909                }
910            }
911        };
912
913        // base = remote; local = merge
914        (&mut db.base_metadata)
915            .to_staged(remote_changes.clone())
916            .to_lazy()
917            .promote()?;
918        db.local_metadata.clear()?;
919        (&mut db.local_metadata)
920            .to_staged(merge_changes)
921            .to_lazy()
922            .promote()?;
923
924        // todo who else calls this did they manage locks right?
925        // self.cleanup_local_metadata()?;
926        db.base_metadata.stage(&mut db.local_metadata).prune()?;
927
928        if start.elapsed() > web_time::Duration::from_millis(100) {
929            warn!("sync merge held lock for {:?}", start.elapsed());
930        }
931
932        Ok(())
933    }
934
935    async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
936        if state.new_root.is_some() {
937            self.events.signed_in();
938        }
939
940        if !state.remote_changes.is_empty() {
941            self.events.meta_changed(Actor::Sync);
942
943            let owner = Owner(self.keychain.get_pk()?);
944            if state.remote_changes.iter().any(|f| f.owner() != owner) {
945                self.events.pending_shares_changed();
946            }
947        }
948
949        for &doc in &state.pulled_docs {
950            self.events.doc_written(doc, Actor::Sync);
951        }
952
953        Ok(())
954    }
955
956    async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
957        let mut tx = self.begin_tx().await;
958        let db = tx.db();
959        db.last_synced.insert(state.updates_as_of as i64)?;
960
961        if let Some(root) = state.new_root {
962            db.root.insert(root)?;
963        }
964
965        Ok(())
966    }
967
968    async fn populate_pk_cache(&self) -> LbResult<()> {
969        // todo: is this the move?
970        let mut missing_owners = HashSet::new();
971        {
972            let tx = self.ro_tx().await;
973            let db = tx.db();
974            for file in db.base_metadata.get().values() {
975                for user_access_key in file.user_access_keys() {
976                    let enc_by = Owner(user_access_key.encrypted_by);
977                    let enc_for = Owner(user_access_key.encrypted_for);
978
979                    if !db.pub_key_lookup.get().contains_key(&enc_by) {
980                        missing_owners.insert(enc_by);
981                    }
982
983                    if !db.pub_key_lookup.get().contains_key(&enc_for) {
984                        missing_owners.insert(enc_for);
985                    }
986                }
987            }
988        }
989
990        let mut new_owners = HashMap::new();
991        {
992            for owner in missing_owners {
993                let username_result = self
994                    .client
995                    .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
996                    .await;
997                new_owners.insert(owner, username_result);
998            }
999        }
1000
1001        let mut tx = self.begin_tx().await;
1002        let db = tx.db();
1003
1004        let have_updates = !new_owners.is_empty();
1005        for (owner, username) in new_owners {
1006            let username = match username {
1007                Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1008                Ok(username) => username.username,
1009                _ => continue, // todo: possibly add some logging here
1010            };
1011
1012            db.pub_key_lookup.insert(owner, username).unwrap();
1013        }
1014
1015        if have_updates {
1016            self.events.meta_changed(Actor::Sync);
1017        }
1018
1019        Ok(())
1020    }
1021
1022    /// Updates remote and base metadata to local.
1023    async fn push_meta(&self) -> LbResult<()> {
1024        let mut updates = vec![];
1025        let mut local_changes_no_digests = Vec::new();
1026
1027        let tx = self.ro_tx().await;
1028        let db = tx.db();
1029
1030        // remote = local
1031        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1032
1033        for id in local.tree.staged.ids() {
1034            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1035            let maybe_base_file = local.tree.base.maybe_find(&id);
1036
1037            // change everything but document hmac and re-sign
1038            local_change.set_hmac_and_size(
1039                maybe_base_file.and_then(|f| f.document_hmac().copied()),
1040                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1041            );
1042            let local_change = local_change.sign(&self.keychain)?;
1043
1044            local_changes_no_digests.push(local_change.clone());
1045            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1046            updates.push(file_diff);
1047        }
1048
1049        drop(tx);
1050
1051        if !updates.is_empty() {
1052            self.client
1053                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1054                .await?;
1055        }
1056
1057        let mut tx = self.begin_tx().await;
1058        let db = tx.db();
1059
1060        // base = local
1061        (&mut db.base_metadata)
1062            .to_lazy()
1063            .stage(local_changes_no_digests)
1064            .promote()?;
1065        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1066
1067        tx.end();
1068
1069        Ok(())
1070    }
1071
1072    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1073    // todo: make this so that all document updates are attempted and we don't just return the
1074    // first error. Once an attempt is made we can return any or all errors, either would be an
1075    // improvement
1076    async fn push_docs(&self) -> LbResult<()> {
1077        let mut updates = vec![];
1078        let mut local_changes_digests_only = vec![];
1079
1080        let tx = self.ro_tx().await;
1081        let db = tx.db();
1082        let start = Instant::now();
1083
1084        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1085
1086        for id in local.tree.staged.ids() {
1087            let base_file = local.tree.base.find(&id)?.clone();
1088
1089            // change only document hmac and re-sign
1090            let mut local_change = base_file.timestamped_value.value.clone();
1091            local_change.set_hmac_and_size(
1092                local.find(&id)?.document_hmac().copied(),
1093                *local.find(&id)?.timestamped_value.value.doc_size(),
1094            );
1095
1096            if base_file.document_hmac() == local_change.document_hmac()
1097                || local_change.document_hmac().is_none()
1098            {
1099                continue;
1100            }
1101
1102            let local_change = local_change.sign(&self.keychain)?;
1103
1104            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1105            local_changes_digests_only.push(local_change);
1106            self.events
1107                .sync_update(SyncIncrement::PushingDocument(id, true));
1108        }
1109
1110        drop(tx);
1111        if start.elapsed() > web_time::Duration::from_millis(100) {
1112            warn!("sync push_docs held lock for {:?}", start.elapsed());
1113        }
1114
1115        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1116
1117        let mut stream = stream::iter(futures).buffer_unordered(
1118            thread::available_parallelism()
1119                .unwrap_or(NonZeroUsize::new(4).unwrap())
1120                .into(),
1121        );
1122
1123        let mut docs_without_errors = vec![];
1124        let mut last_error: Option<LbErr> = None;
1125
1126        while let Some(fut) = stream.next().await {
1127            match fut {
1128                Ok(id) => {
1129                    docs_without_errors.push(id);
1130                    self.events
1131                        .sync_update(SyncIncrement::PushingDocument(id, false));
1132                }
1133                Err(err) => {
1134                    last_error = Some(err);
1135                }
1136            }
1137        }
1138
1139        local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1140
1141        let mut tx = self.begin_tx().await;
1142        let db = tx.db();
1143        // base = local (metadata)
1144        (&mut db.base_metadata)
1145            .to_lazy()
1146            .stage(local_changes_digests_only)
1147            .promote()?;
1148
1149        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1150
1151        tx.end();
1152
1153        if let Some(err) = last_error { Err(err) } else { Ok(()) }
1154    }
1155
1156    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1157        let id = *diff.new.id();
1158        let hmac = diff.new.document_hmac();
1159        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1160        self.client
1161            .request(
1162                self.get_account()?,
1163                ChangeDocRequestV2 { diff, new_content: local_document_change },
1164            )
1165            .await?;
1166
1167        Ok(id)
1168    }
1169
1170    #[cfg(not(target_family = "wasm"))]
1171    async fn send_debug_info(&self, account: Account) {
1172        let debug_info = self
1173            .debug_info("none provided - sync".to_string(), false)
1174            .await
1175            .unwrap();
1176
1177        if self.config.background_work {
1178            let bg_self = self.clone();
1179            tokio::spawn(async move {
1180                bg_self
1181                    .client
1182                    .request(&account, UpsertDebugInfoRequest { debug_info })
1183                    .await
1184                    .log_and_ignore();
1185            });
1186        } else {
1187            self.client
1188                .request(&account, UpsertDebugInfoRequest { debug_info })
1189                .await
1190                .log_and_ignore();
1191        }
1192    }
1193
1194    async fn read_document_helper<T>(
1195        &self, id: Uuid, tree: &mut LazyTree<T>,
1196    ) -> LbResult<DecryptedDocument>
1197    where
1198        T: TreeLike<F = SignedMeta>,
1199    {
1200        let file = tree.find(&id)?;
1201        validate::is_document(file)?;
1202        let hmac = file.document_hmac().copied();
1203
1204        if tree.calculate_deleted(&id)? {
1205            return Err(LbErrKind::FileNonexistent.into());
1206        }
1207
1208        let doc = match hmac {
1209            Some(hmac) => {
1210                let doc = self.docs.get(id, Some(hmac)).await?;
1211                tree.decrypt_document(&id, &doc, &self.keychain)?
1212            }
1213            None => vec![],
1214        };
1215
1216        Ok(doc)
1217    }
1218
1219    /// for tests only
1220    #[doc(hidden)]
1221    pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1222        let mut state = self.syncer.lock().await;
1223        self.inital_sync_state(&mut state).await?;
1224        self.process_deletions().await?;
1225        self.fetch_meta(&mut state).await?;
1226
1227        let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1228
1229        Ok(server_ids)
1230    }
1231
1232    pub(crate) fn setup_syncer(&self) {
1233        if self.config.background_work {
1234            self.clone().local_change_worker();
1235            self.clone().periodic_sync_worker();
1236            self.clone().post_sync_worker();
1237        }
1238    }
1239
1240    fn local_change_worker(self) {
1241        #[cfg(not(target_family = "wasm"))]
1242        tokio::spawn(async move {
1243            let mut events = self.subscribe();
1244
1245            let sync_criteria = |e: Event| {
1246                matches!(
1247                    e,
1248                    Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1249                )
1250            };
1251
1252            loop {
1253                let mut should_sync = false;
1254
1255                // drain the current channel, so we don't sync for each keystroke if they pile up
1256                loop {
1257                    let event = events.try_recv();
1258                    match event {
1259                        Ok(event) => {
1260                            if sync_criteria(event) {
1261                                should_sync = true;
1262                            }
1263                        }
1264                        Err(TryRecvError::Empty) => break,
1265                        _ => {
1266                            panic!(
1267                                "unexpected broadcast receive error, returning local_change_worker"
1268                            );
1269                        }
1270                    }
1271                }
1272
1273                // empty channel + nothing interesting has happened, sit and wait for something
1274                // interesting
1275                if !should_sync {
1276                    let event = events.recv().await.unwrap();
1277                    if sync_criteria(event) {
1278                        self.sync().await.map_unexpected().log_and_ignore();
1279                    } else {
1280                        continue;
1281                    }
1282                }
1283            }
1284        });
1285    }
1286
1287    fn periodic_sync_worker(self) {
1288        #[cfg(not(target_family = "wasm"))]
1289        tokio::spawn(async move {
1290            loop {
1291                self.sync().await.map_unexpected().log_and_ignore();
1292                if self.user_active().await {
1293                    tokio::time::sleep(Duration::from_secs(3)).await;
1294                } else {
1295                    tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1296                }
1297            }
1298        });
1299    }
1300
1301    async fn user_active(&self) -> bool {
1302        let last_seen = self.user_last_seen.read().await;
1303        last_seen.elapsed() < Duration::from_secs(3 * 60)
1304    }
1305
1306    fn post_sync_worker(self) {
1307        #[cfg(not(target_family = "wasm"))]
1308        tokio::spawn(async move {
1309            let mut events = self.subscribe();
1310
1311            loop {
1312                let event = events.recv().await.unwrap();
1313                if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1314                    self.fetcher().await.map_unexpected().log_and_ignore();
1315                    self.populate_pk_cache()
1316                        .await
1317                        .map_unexpected()
1318                        .log_and_ignore();
1319                };
1320            }
1321        });
1322    }
1323
1324    async fn fetcher(&self) -> LbResult<()> {
1325        let mut files_to_pull = vec![];
1326
1327        let tx = self.ro_tx().await;
1328        let db = tx.db();
1329
1330        let Some(root) = db.root.get() else {
1331            return Ok(());
1332        };
1333
1334        // we can only fetch things we know the server knows about
1335        let mut tree = db.base_metadata.stage(None).to_lazy();
1336
1337        for id in tree.descendants_using_links(root)? {
1338            let file = tree.find(&id)?;
1339            let hmac = file.document_hmac().copied();
1340
1341            // skip non-documents
1342            if !file.is_document() {
1343                continue;
1344            }
1345
1346            // skip deleted files
1347            if tree.calculate_deleted(&id)? {
1348                continue;
1349            }
1350
1351            // skip non-first-party files
1352            let name = tree.name(&id, &self.keychain)?;
1353            if !name.ends_with(".md") && !name.ends_with(".svg") {
1354                continue;
1355            }
1356
1357            files_to_pull.push((id, hmac));
1358        }
1359
1360        drop(tx);
1361
1362        // this could all be done in parallel, but for now going to not do it that way
1363        // benefits: less work, but also ensures that a file that needs to be fetched immediately
1364        // can be
1365        for (id, hmac) in files_to_pull {
1366            if let Some(hmac) = hmac {
1367                self.fetch_doc(id, hmac).await?;
1368            }
1369        }
1370
1371        Ok(())
1372    }
1373}