Skip to main content

lb_rs/subscribers/
syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    num::NonZeroUsize,
4    sync::Arc,
5    thread,
6    time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use tokio::time;
12use usvg::Transform;
13use uuid::Uuid;
14
15use crate::{
16    LbErrKind, LbResult, LocalLb,
17    io::network::ApiError,
18    model::{
19        ValidationFailure,
20        access_info::UserAccessMode,
21        account::Account,
22        api::{
23            ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
24            GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
25        },
26        crypto::{DecryptedDocument, EncryptedDocument},
27        errors::{LbErr, Unexpected},
28        file::ShareMode,
29        file_like::FileLike,
30        file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
31        filename::{DocumentType, NameComponents},
32        lazy::LazyTree,
33        signed_meta::SignedMeta,
34        staged::StagedTreeLikeMut,
35        svg::{self, buffer::u_transform_to_bezier, element::Element},
36        symkey, text,
37        tree_like::TreeLike,
38        validate,
39    },
40    service::events::{Actor, Event, SyncIncrement},
41};
42
43pub type Syncer = Arc<Mutex<SyncState>>;
44
45#[derive(Default)]
46pub struct SyncState {
47    /// the starting point for updates for this sync pass
48    last_synced: u64,
49
50    /// if our pull is successful, this is the timestamp we will commit
51    updates_as_of: u64,
52
53    /// changes we pulled from the server, post deduplication
54    remote_changes: Vec<SignedMeta>,
55
56    /// did we pull a root on this pass?
57    new_root: Option<Uuid>,
58
59    /// what docs did we pull as a result of this sync
60    pulled_docs: Vec<Uuid>,
61}
62
63// we are gonna have a fetch metadata fn which will get the docs that it needs to get, the ones
64// that match should_fetch
65//
66// should_fetch is going to be a tree fn that will return true if:
67//     is md or svg that descends from
68
69impl LocalLb {
70    #[instrument(level = "debug", skip(self), err(Debug))]
71    pub async fn sync(&self) -> LbResult<()> {
72        let mut sync_state = self.syncer.lock().await;
73        self.events.sync_update(SyncIncrement::SyncStarted);
74
75        let pipeline: LbResult<()> = async {
76            self.pull_updates(&mut sync_state).await?;
77            self.push_local_changes().await?;
78            Ok(())
79        }
80        .await;
81
82        self.events.sync_update(SyncIncrement::SyncFinished(
83            pipeline.as_ref().err().map(|err| err.kind.clone()),
84        ));
85
86        self.cleanup().await?;
87
88        pipeline?;
89
90        let account = self.get_account()?.clone();
91
92        #[cfg(not(target_family = "wasm"))]
93        if account.is_beta() {
94            self.send_debug_info(account).await;
95        }
96
97        Ok(())
98    }
99
100    pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
101        self.inital_sync_state(sync_state).await?;
102        self.process_deletions().await?;
103        self.fetch_meta(sync_state).await?;
104        self.fetch_required_docs(sync_state).await?;
105        // todo: should this inform a re-pull?
106        self.merge(sync_state).await?;
107        self.commit_last_synced(sync_state).await?;
108        self.send_pull_events(sync_state).await?;
109
110        if !self.config.background_work {
111            self.populate_pk_cache().await?;
112        }
113
114        Ok(())
115    }
116
117    pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
118        self.push_meta().await?;
119        self.push_docs().await?;
120
121        Ok(())
122    }
123
124    async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
125        let tx = self.ro_tx().await;
126        let db = tx.db();
127
128        *state = Default::default();
129        state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
130
131        Ok(())
132    }
133
134    pub(crate) async fn process_deletions(&self) -> LbResult<()> {
135        let server_ids = self
136            .client
137            .request(self.get_account()?, GetFileIdsRequest {})
138            .await?
139            .ids;
140
141        let mut tx = self.begin_tx().await;
142        let db = tx.db();
143
144        let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
145        let base_ids = local.tree.base.ids();
146
147        let mut prunable_ids = base_ids;
148        prunable_ids.retain(|id| !server_ids.contains(id));
149        for id in prunable_ids.clone() {
150            prunable_ids.extend(local.descendants(&id)?.into_iter());
151        }
152        for id in &prunable_ids {
153            if let Some(base_file) = local.tree.base.maybe_find(id) {
154                self.docs
155                    .delete(*id, base_file.document_hmac().copied())
156                    .await?;
157            }
158            if let Some(local_file) = local.maybe_find(id) {
159                self.docs
160                    .delete(*id, local_file.document_hmac().copied())
161                    .await?;
162            }
163        }
164
165        let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
166        base_staged.tree.removed = prunable_ids.iter().copied().collect();
167        base_staged.promote()?;
168
169        let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
170        local_staged.tree.removed = prunable_ids.iter().copied().collect();
171        local_staged.promote()?;
172
173        if !prunable_ids.is_empty() {
174            self.events.meta_changed(Actor::Sync);
175        }
176
177        Ok(())
178    }
179
180    async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
181        let updates = self
182            .client
183            .request(
184                self.get_account()?,
185                GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
186            )
187            .await?;
188
189        let tx = self.ro_tx().await;
190        let db = tx.db();
191
192        // this loop implicitly prunes remote orphans
193        let mut without_orphans = Vec::new();
194        let me = Owner(self.keychain.get_pk()?);
195        let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
196        for id in remote.tree.staged.ids() {
197            let meta = remote.find(&id)?;
198            if remote.maybe_find_parent(meta).is_some()
199                || meta
200                    .user_access_keys()
201                    .iter()
202                    .any(|k| k.encrypted_for == me.0)
203            {
204                without_orphans.push(remote.find(&id)?.clone());
205            }
206        }
207
208        // this is what actually performs the deduplication
209        let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
210        let (_, deduped_changes) = pruned_tree.unstage();
211
212        // initialize root if this is the first pull on this device
213        let mut root_id = None;
214        if db.root.get().is_none() {
215            let root = deduped_changes
216                .all_files()?
217                .into_iter()
218                .find(|f| f.is_root())
219                .ok_or(LbErrKind::RootNonexistent)?;
220            root_id = Some(*root.id());
221        }
222
223        state.remote_changes = deduped_changes;
224        state.updates_as_of = updates.as_of_metadata_version;
225        state.new_root = root_id;
226
227        Ok(())
228    }
229
230    async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
231        let mut docs_to_pull = vec![];
232
233        let tx = self.ro_tx().await;
234        let db = tx.db();
235
236        let mut files_with_local_edits = vec![];
237        let local = db.base_metadata.stage(&db.local_metadata);
238        for id in local.staged.ids() {
239            if let Some(base) = local.base.maybe_find(&id) {
240                if let Some(local_hmac) = local.find(&id)?.document_hmac() {
241                    if Some(local_hmac) != base.document_hmac() {
242                        files_with_local_edits.push(id);
243                        println!("local edits found");
244                    }
245                }
246            }
247        }
248
249        let mut remote = db
250            .base_metadata
251            .stage(state.remote_changes.clone())
252            .to_lazy();
253
254        for id in remote.tree.staged.ids() {
255            if remote.calculate_deleted(&id)? {
256                continue;
257            }
258            let remote_hmac = remote.find(&id)?.document_hmac().cloned();
259            let base_hmac = remote
260                .tree
261                .base
262                .maybe_find(&id)
263                .and_then(|f| f.document_hmac())
264                .cloned();
265            if base_hmac == remote_hmac {
266                continue;
267            }
268
269            if let Some(remote_hmac) = remote_hmac {
270                // pull a file if we have a prior base, this is our heuristic -- do they have the
271                // ability to edit this file while we release the lock and are pulling all the
272                // files
273                if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
274                    docs_to_pull.push((id, remote_hmac));
275                }
276
277                // this clause captures documents which went from being new -> multiple parties
278                // having updates. We'll still need the updates
279                if files_with_local_edits.contains(&id)
280                    && !docs_to_pull
281                        .iter()
282                        .any(|(already_pulling, _)| already_pulling == &id)
283                {
284                    if let Some(base_hmac) = base_hmac {
285                        if !self.docs.exists(id, Some(base_hmac)) {
286                            // this scenario basically only comes up in tests
287                            // someone modifies a file directly without reading the prior version
288                            docs_to_pull.push((id, base_hmac));
289                        }
290                    }
291                    docs_to_pull.push((id, remote_hmac));
292                }
293            }
294        }
295        drop(tx);
296
297        let futures = docs_to_pull
298            .into_iter()
299            .map(|(id, hmac)| async move { self.ensure_doc_available(id, hmac).await.map(|_| id) });
300
301        let mut stream = stream::iter(futures).buffer_unordered(
302            thread::available_parallelism()
303                .unwrap_or(NonZeroUsize::new(4).unwrap())
304                .into(),
305        );
306
307        while let Some(fut) = stream.next().await {
308            let id = fut?;
309            state.pulled_docs.push(id);
310        }
311
312        Ok(())
313    }
314
315    pub(crate) async fn ensure_doc_available(
316        &self, id: Uuid, hmac: DocumentHmac,
317    ) -> LbResult<Option<EncryptedDocument>> {
318        // todo: in a lot of cases there is a list of ids we're trying to get, it would be better
319        // if the caller managed the event updates, the status would be more meaningful for longer
320
321        // in this world, can we get stuck pushing a doc as well? Probably fine for now
322        if self.docs.exists(id, Some(hmac)) {
323            return Ok(None);
324        }
325
326        self.events
327            .sync_update(SyncIncrement::PullingDocument(id, true));
328        let remote_document = self
329            .client
330            .request(self.get_account()?, GetDocRequest { id, hmac })
331            .await?;
332        self.docs
333            .insert(id, Some(hmac), &remote_document.content)
334            .await?;
335        self.events
336            .sync_update(SyncIncrement::PullingDocument(id, false));
337
338        Ok(Some(remote_document.content))
339    }
340
341    pub(crate) async fn fetch_doc(
342        &self, id: Uuid, hmac: DocumentHmac,
343    ) -> LbResult<EncryptedDocument> {
344        match self.ensure_doc_available(id, hmac).await? {
345            Some(doc) => Ok(doc),
346            None => self.docs.get(id, Some(hmac)).await,
347        }
348    }
349
350    /// Pulls remote changes and constructs a changeset Merge such that Stage<Stage<Stage<Base, Remote>, Local>, Merge> is valid.
351    /// Promotes Base to Stage<Base, Remote> and Local to Stage<Local, Merge>
352    async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
353        let mut tx = self.begin_tx().await;
354        let db = tx.db();
355        let start = Instant::now();
356
357        let remote_changes = &state.remote_changes;
358
359        // fetch document updates and local documents for merge
360        let me = Owner(self.keychain.get_pk()?);
361
362        // compute merge changes
363        let merge_changes = {
364            // assemble trees
365            let mut base = (&db.base_metadata).to_lazy();
366            let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
367            let mut remote = remote_unlazy.as_lazy();
368            let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
369
370            // changeset constraints - these evolve as we try to assemble changes and encounter validation failures
371            let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
372            let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
373            let mut links_to_delete: HashSet<Uuid> = HashSet::new();
374            let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
375            let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
376
377            'merge_construction: loop {
378                // process just the edits which allow us to check deletions in the result
379                let mut deletions = {
380                    let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
381
382                    // creations
383                    let mut deletion_creations = HashSet::new();
384                    for id in db.local_metadata.ids() {
385                        if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
386                            deletion_creations.insert(id);
387                        }
388                    }
389                    'drain_creations: while !deletion_creations.is_empty() {
390                        'choose_a_creation: for id in &deletion_creations {
391                            // create
392                            let id = *id;
393                            let local_file = local.find(&id)?.clone();
394                            let result = deletions.create_unvalidated(
395                                id,
396                                symkey::generate_key(),
397                                local_file.parent(),
398                                &local.name(&id, &self.keychain)?,
399                                local_file.file_type(),
400                                &self.keychain,
401                            );
402                            match result {
403                                Ok(_) => {
404                                    deletion_creations.remove(&id);
405                                    continue 'drain_creations;
406                                }
407                                Err(ref err) => match err.kind {
408                                    LbErrKind::FileParentNonexistent => {
409                                        continue 'choose_a_creation;
410                                    }
411                                    _ => {
412                                        result?;
413                                    }
414                                },
415                            }
416                        }
417                        return Err(LbErrKind::Unexpected(format!(
418                            "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
419                        ))
420                        .into());
421                    }
422
423                    // moves (creations happen first in case a file is moved into a new folder)
424                    for id in db.local_metadata.ids() {
425                        let local_file = local.find(&id)?.clone();
426                        if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
427                            if !local_file.explicitly_deleted()
428                                && local_file.parent() != base_file.parent()
429                                && !files_to_unmove.contains(&id)
430                            {
431                                // move
432                                deletions.move_unvalidated(
433                                    &id,
434                                    local_file.parent(),
435                                    &self.keychain,
436                                )?;
437                            }
438                        }
439                    }
440
441                    // deletions (moves happen first in case a file is moved into a deleted folder)
442                    for id in db.local_metadata.ids() {
443                        let local_file = local.find(&id)?.clone();
444                        if local_file.explicitly_deleted() {
445                            // delete
446                            deletions.delete_unvalidated(&id, &self.keychain)?;
447                        }
448                    }
449                    deletions
450                };
451
452                // process all edits, dropping non-deletion edits for files that will be implicitly deleted
453                let mut merge = {
454                    let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
455
456                    // creations and edits of created documents
457                    let mut creations = HashSet::new();
458                    for id in db.local_metadata.ids() {
459                        if deletions.maybe_find(&id).is_some()
460                            && !deletions.calculate_deleted(&id)?
461                            && remote.maybe_find(&id).is_none()
462                            && !links_to_delete.contains(&id)
463                        {
464                            creations.insert(id);
465                        }
466                    }
467                    'drain_creations: while !creations.is_empty() {
468                        'choose_a_creation: for id in &creations {
469                            // create
470                            let id = *id;
471                            let local_file = local.find(&id)?.clone();
472                            let result = merge.create_unvalidated(
473                                id,
474                                local.decrypt_key(&id, &self.keychain)?,
475                                local_file.parent(),
476                                &local.name(&id, &self.keychain)?,
477                                local_file.file_type(),
478                                &self.keychain,
479                            );
480                            match result {
481                                Ok(_) => {
482                                    creations.remove(&id);
483                                    continue 'drain_creations;
484                                }
485                                Err(ref err) => match err.kind {
486                                    LbErrKind::FileParentNonexistent => {
487                                        continue 'choose_a_creation;
488                                    }
489                                    _ => {
490                                        result?;
491                                    }
492                                },
493                            }
494                        }
495                        return Err(LbErrKind::Unexpected(format!(
496                            "sync failed to find a topomodelal order for file creations: {creations:?}"
497                        ))
498                        .into());
499                    }
500
501                    // moves, renames, edits, and shares
502                    // creations happen first in case a file is moved into a new folder
503                    for id in db.local_metadata.ids() {
504                        // skip files that are already deleted or will be deleted
505                        if deletions.maybe_find(&id).is_none()
506                            || deletions.calculate_deleted(&id)?
507                            || (remote.maybe_find(&id).is_some()
508                                && remote.calculate_deleted(&id)?)
509                        {
510                            continue;
511                        }
512
513                        let local_file = local.find(&id)?.clone();
514                        let local_name = local.name(&id, &self.keychain)?;
515                        let maybe_base_file = base.maybe_find(&id).cloned();
516                        let maybe_remote_file = remote.maybe_find(&id).cloned();
517                        if let Some(ref base_file) = maybe_base_file {
518                            let base_name = base.name(&id, &self.keychain)?;
519                            let remote_file = remote.find(&id)?.clone();
520                            let remote_name = remote.name(&id, &self.keychain)?;
521
522                            // move
523                            if local_file.parent() != base_file.parent()
524                                && remote_file.parent() == base_file.parent()
525                                && !files_to_unmove.contains(&id)
526                            {
527                                merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
528                            }
529
530                            // rename
531                            if local_name != base_name && remote_name == base_name {
532                                merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
533                            }
534                        }
535
536                        // share
537                        let mut remote_keys = HashMap::new();
538                        if let Some(ref remote_file) = maybe_remote_file {
539                            for key in remote_file.user_access_keys() {
540                                remote_keys.insert(
541                                    (Owner(key.encrypted_by), Owner(key.encrypted_for)),
542                                    (key.mode, key.deleted),
543                                );
544                            }
545                        }
546                        for key in local_file.user_access_keys() {
547                            let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
548                            if let Some(&(remote_mode, remote_deleted)) =
549                                remote_keys.get(&(by, for_))
550                            {
551                                // upgrade share
552                                if key.mode > remote_mode || !key.deleted && remote_deleted {
553                                    let mode = match key.mode {
554                                        UserAccessMode::Read => ShareMode::Read,
555                                        UserAccessMode::Write => ShareMode::Write,
556                                        UserAccessMode::Owner => continue,
557                                    };
558                                    merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
559                                }
560                                // delete share
561                                if key.deleted && !remote_deleted {
562                                    merge.delete_share_unvalidated(
563                                        &id,
564                                        Some(for_.0),
565                                        &self.keychain,
566                                    )?;
567                                }
568                            } else {
569                                // add share
570                                let mode = match key.mode {
571                                    UserAccessMode::Read => ShareMode::Read,
572                                    UserAccessMode::Write => ShareMode::Write,
573                                    UserAccessMode::Owner => continue,
574                                };
575                                merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
576                            }
577                        }
578
579                        // share deletion due to conflicts
580                        if files_to_unshare.contains(&id) {
581                            merge.delete_share_unvalidated(&id, None, &self.keychain)?;
582                        }
583
584                        // rename due to path conflict
585                        if let Some(&rename_increment) = rename_increments.get(&id) {
586                            let name = NameComponents::from(&local_name)
587                                .generate_incremented(rename_increment)
588                                .to_name();
589                            merge.rename_unvalidated(&id, &name, &self.keychain)?;
590                        }
591
592                        // edit
593                        let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
594                        let remote_hmac =
595                            maybe_remote_file.and_then(|f| f.document_hmac().cloned());
596                        let local_hmac = local_file.document_hmac().cloned();
597                        if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
598                            && local_hmac != base_hmac
599                        {
600                            if remote_hmac != base_hmac && remote_hmac != local_hmac {
601                                // merge
602                                let merge_name = merge.name(&id, &self.keychain)?;
603                                let document_type =
604                                    DocumentType::from_file_name_using_extension(&merge_name);
605
606                                // todo these accesses are potentially problematic
607                                // maybe not if service/docs is the persion doing network io
608                                let base_document =
609                                    self.read_document_helper(id, &mut base).await?;
610                                let remote_document =
611                                    self.read_document_helper(id, &mut remote).await?;
612                                let local_document =
613                                    self.read_document_helper(id, &mut local).await?;
614
615                                match document_type {
616                                    DocumentType::Text => {
617                                        // 3-way merge
618                                        // todo: a couple more clones than necessary
619                                        let base_document =
620                                            String::from_utf8_lossy(&base_document).to_string();
621                                        let remote_document =
622                                            String::from_utf8_lossy(&remote_document).to_string();
623                                        let local_document =
624                                            String::from_utf8_lossy(&local_document).to_string();
625                                        let merged_document =
626                                            text::buffer::Buffer::from(base_document.as_str())
627                                                .merge(local_document, remote_document);
628                                        let encrypted_document = merge
629                                            .update_document_unvalidated(
630                                                &id,
631                                                &merged_document.into_bytes(),
632                                                &self.keychain,
633                                            )?;
634                                        let hmac = merge.find(&id)?.document_hmac().copied();
635                                        self.docs.insert(id, hmac, &encrypted_document).await?;
636                                    }
637                                    DocumentType::Drawing => {
638                                        let base_document =
639                                            String::from_utf8_lossy(&base_document).to_string();
640                                        let remote_document =
641                                            String::from_utf8_lossy(&remote_document).to_string();
642                                        let local_document =
643                                            String::from_utf8_lossy(&local_document).to_string();
644
645                                        let base_buffer = svg::buffer::Buffer::new(&base_document);
646                                        let remote_buffer =
647                                            svg::buffer::Buffer::new(&remote_document);
648                                        let mut local_buffer =
649                                            svg::buffer::Buffer::new(&local_document);
650
651                                        for (_, el) in local_buffer.elements.iter_mut() {
652                                            if let Element::Path(path) = el {
653                                                path.data.apply_transform(u_transform_to_bezier(
654                                                    &Transform::from(
655                                                        local_buffer
656                                                            .weak_viewport_settings
657                                                            .master_transform,
658                                                    ),
659                                                ));
660                                            }
661                                        }
662                                        svg::buffer::Buffer::reload(
663                                            &mut local_buffer.elements,
664                                            &mut local_buffer.weak_images,
665                                            &mut local_buffer.weak_path_pressures,
666                                            &mut local_buffer.weak_viewport_settings,
667                                            &base_buffer,
668                                            &remote_buffer,
669                                        );
670
671                                        let merged_document = local_buffer.serialize();
672                                        let encrypted_document = merge
673                                            .update_document_unvalidated(
674                                                &id,
675                                                &merged_document.into_bytes(),
676                                                &self.keychain,
677                                            )?;
678                                        let hmac = merge.find(&id)?.document_hmac().copied();
679                                        self.docs.insert(id, hmac, &encrypted_document).await?;
680                                    }
681                                    DocumentType::Other => {
682                                        // duplicate file
683                                        let merge_parent = *merge.find(&id)?.parent();
684                                        let duplicate_id = if let Some(&duplicate_id) =
685                                            duplicate_file_ids.get(&id)
686                                        {
687                                            duplicate_id
688                                        } else {
689                                            let duplicate_id = Uuid::new_v4();
690                                            duplicate_file_ids.insert(id, duplicate_id);
691                                            rename_increments.insert(duplicate_id, 1);
692                                            duplicate_id
693                                        };
694
695                                        let mut merge_name = merge_name;
696                                        merge_name = NameComponents::from(&merge_name)
697                                            .generate_incremented(
698                                                rename_increments
699                                                    .get(&duplicate_id)
700                                                    .copied()
701                                                    .unwrap_or_default(),
702                                            )
703                                            .to_name();
704
705                                        merge.create_unvalidated(
706                                            duplicate_id,
707                                            symkey::generate_key(),
708                                            &merge_parent,
709                                            &merge_name,
710                                            FileType::Document,
711                                            &self.keychain,
712                                        )?;
713                                        let encrypted_document = merge
714                                            .update_document_unvalidated(
715                                                &duplicate_id,
716                                                &local_document,
717                                                &self.keychain,
718                                            )?;
719                                        let duplicate_hmac =
720                                            merge.find(&duplicate_id)?.document_hmac().copied();
721                                        self.docs
722                                            .insert(
723                                                duplicate_id,
724                                                duplicate_hmac,
725                                                &encrypted_document,
726                                            )
727                                            .await?;
728                                    }
729                                }
730                            } else {
731                                // overwrite (todo: avoid reading/decrypting/encrypting document)
732                                let document = self.read_document_helper(id, &mut local).await?;
733                                merge.update_document_unvalidated(
734                                    &id,
735                                    &document,
736                                    &self.keychain,
737                                )?;
738                            }
739                        }
740                    }
741
742                    // deletes
743                    // moves happen first in case a file is moved into a deleted folder
744                    for id in db.local_metadata.ids() {
745                        if db.base_metadata.maybe_find(&id).is_some()
746                            && deletions.calculate_deleted(&id)?
747                            && !merge.calculate_deleted(&id)?
748                        {
749                            // delete
750                            merge.delete_unvalidated(&id, &self.keychain)?;
751                        }
752                    }
753                    for &id in &links_to_delete {
754                        // delete
755                        if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
756                            merge.delete_unvalidated(&id, &self.keychain)?;
757                        }
758                    }
759
760                    merge
761                };
762
763                // validate; handle failures by introducing changeset constraints
764                for link in merge.ids() {
765                    if !merge.calculate_deleted(&link)? {
766                        if let FileType::Link { target } = merge.find(&link)?.file_type() {
767                            if merge.maybe_find(&target).is_some()
768                                && merge.calculate_deleted(&target)?
769                            {
770                                // delete links to deleted files
771                                if links_to_delete.insert(link) {
772                                    continue 'merge_construction;
773                                } else {
774                                    return Err(LbErrKind::Unexpected(format!(
775                                        "sync failed to resolve broken link (deletion): {link:?}"
776                                    ))
777                                    .into());
778                                }
779                            }
780                        }
781                    }
782                }
783
784                let validate_result = merge.validate(me);
785                match validate_result {
786                    // merge changeset is valid
787                    Ok(_) => {
788                        let (_, merge_changes) = merge.unstage();
789                        break merge_changes;
790                    }
791                    Err(ref err) => match err.kind {
792                        LbErrKind::Validation(ref vf) => match vf {
793                            // merge changeset has resolvable validation errors and needs modification
794                            ValidationFailure::Cycle(ids) => {
795                                // revert all local moves in the cycle
796                                let mut progress = false;
797                                for &id in ids {
798                                    if db.local_metadata.maybe_find(&id).is_some()
799                                        && files_to_unmove.insert(id)
800                                    {
801                                        progress = true;
802                                    }
803                                }
804                                if !progress {
805                                    return Err(LbErrKind::Unexpected(format!(
806                                        "sync failed to resolve cycle: {ids:?}"
807                                    ))
808                                    .into());
809                                }
810                            }
811                            ValidationFailure::PathConflict(ids) => {
812                                // pick one local id and generate a non-conflicting filename
813                                let mut progress = false;
814                                for &id in ids {
815                                    if duplicate_file_ids.values().any(|&dup| dup == id) {
816                                        *rename_increments.entry(id).or_insert(0) += 1;
817                                        progress = true;
818                                        break;
819                                    }
820                                }
821                                if !progress {
822                                    for &id in ids {
823                                        if db.local_metadata.maybe_find(&id).is_some() {
824                                            *rename_increments.entry(id).or_insert(0) += 1;
825                                            progress = true;
826                                            break;
827                                        }
828                                    }
829                                }
830                                if !progress {
831                                    return Err(LbErrKind::Unexpected(format!(
832                                        "sync failed to resolve path conflict: {ids:?}"
833                                    ))
834                                    .into());
835                                }
836                            }
837                            ValidationFailure::SharedLink { link, shared_ancestor } => {
838                                // if ancestor is newly shared, delete share, otherwise delete link
839                                let mut progress = false;
840                                if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
841                                {
842                                    if !base_shared_ancestor.is_shared()
843                                        && files_to_unshare.insert(*shared_ancestor)
844                                    {
845                                        progress = true;
846                                    }
847                                }
848                                if !progress && links_to_delete.insert(*link) {
849                                    progress = true;
850                                }
851                                if !progress {
852                                    return Err(LbErrKind::Unexpected(format!(
853                                    "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
854                                )).into());
855                                }
856                            }
857                            ValidationFailure::DuplicateLink { target } => {
858                                // delete local link with this target
859                                let mut progress = false;
860                                if let Some(link) = local.linked_by(target)? {
861                                    if links_to_delete.insert(link) {
862                                        progress = true;
863                                    }
864                                }
865                                if !progress {
866                                    return Err(LbErrKind::Unexpected(format!(
867                                        "sync failed to resolve duplicate link: target: {target:?}"
868                                    ))
869                                    .into());
870                                }
871                            }
872                            ValidationFailure::BrokenLink(link) => {
873                                // delete local link with this target
874                                if !links_to_delete.insert(*link) {
875                                    return Err(LbErrKind::Unexpected(format!(
876                                        "sync failed to resolve broken link: {link:?}"
877                                    ))
878                                    .into());
879                                }
880                            }
881                            ValidationFailure::OwnedLink(link) => {
882                                // if target is newly owned, unmove target, otherwise delete link
883                                let mut progress = false;
884                                if let Some(remote_link) = remote.maybe_find(link) {
885                                    if let FileType::Link { target } = remote_link.file_type() {
886                                        let remote_target = remote.find(&target)?;
887                                        if remote_target.owner() != me
888                                            && files_to_unmove.insert(target)
889                                        {
890                                            progress = true;
891                                        }
892                                    }
893                                }
894                                if !progress && links_to_delete.insert(*link) {
895                                    progress = true;
896                                }
897                                if !progress {
898                                    return Err(LbErrKind::Unexpected(format!(
899                                        "sync failed to resolve owned link: {link:?}"
900                                    ))
901                                    .into());
902                                }
903                            }
904                            // merge changeset has unexpected validation errors
905                            ValidationFailure::Orphan(_)
906                            | ValidationFailure::NonFolderWithChildren(_)
907                            | ValidationFailure::FileWithDifferentOwnerParent(_)
908                            | ValidationFailure::FileNameTooLong(_)
909                            | ValidationFailure::DeletedFileUpdated(_)
910                            | ValidationFailure::NonDecryptableFileName(_) => {
911                                validate_result?;
912                            }
913                        },
914                        // merge changeset has unexpected errors
915                        _ => {
916                            validate_result?;
917                        }
918                    },
919                }
920            }
921        };
922
923        // base = remote; local = merge
924        (&mut db.base_metadata)
925            .to_staged(remote_changes.clone())
926            .to_lazy()
927            .promote()?;
928        db.local_metadata.clear()?;
929        (&mut db.local_metadata)
930            .to_staged(merge_changes)
931            .to_lazy()
932            .promote()?;
933
934        // todo who else calls this did they manage locks right?
935        // self.cleanup_local_metadata()?;
936        db.base_metadata.stage(&mut db.local_metadata).prune()?;
937
938        if start.elapsed() > web_time::Duration::from_millis(100) {
939            warn!("sync merge held lock for {:?}", start.elapsed());
940        }
941
942        Ok(())
943    }
944
945    async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
946        if state.new_root.is_some() {
947            self.events.signed_in();
948        }
949
950        if !state.remote_changes.is_empty() {
951            self.events.meta_changed(Actor::Sync);
952
953            let owner = Owner(self.keychain.get_pk()?);
954            if state.remote_changes.iter().any(|f| f.owner() != owner) {
955                self.events.pending_shares_changed();
956            }
957        }
958
959        for &doc in &state.pulled_docs {
960            self.events.doc_written(doc, Actor::Sync);
961        }
962
963        Ok(())
964    }
965
966    async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
967        let mut tx = self.begin_tx().await;
968        let db = tx.db();
969        db.last_synced.insert(state.updates_as_of as i64)?;
970
971        if let Some(root) = state.new_root {
972            db.root.insert(root)?;
973        }
974
975        Ok(())
976    }
977
978    async fn populate_pk_cache(&self) -> LbResult<()> {
979        // todo: is this the move?
980        let mut missing_owners = HashSet::new();
981        {
982            let tx = self.ro_tx().await;
983            let db = tx.db();
984            for file in db.base_metadata.get().values() {
985                for user_access_key in file.user_access_keys() {
986                    let enc_by = Owner(user_access_key.encrypted_by);
987                    let enc_for = Owner(user_access_key.encrypted_for);
988
989                    if !db.pub_key_lookup.get().contains_key(&enc_by) {
990                        missing_owners.insert(enc_by);
991                    }
992
993                    if !db.pub_key_lookup.get().contains_key(&enc_for) {
994                        missing_owners.insert(enc_for);
995                    }
996                }
997            }
998        }
999
1000        let mut new_owners = HashMap::new();
1001        {
1002            for owner in missing_owners {
1003                let username_result = self
1004                    .client
1005                    .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
1006                    .await;
1007                new_owners.insert(owner, username_result);
1008            }
1009        }
1010
1011        let mut tx = self.begin_tx().await;
1012        let db = tx.db();
1013
1014        let have_updates = !new_owners.is_empty();
1015        for (owner, username) in new_owners {
1016            let username = match username {
1017                Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1018                Ok(username) => username.username,
1019                _ => continue, // todo: possibly add some logging here
1020            };
1021
1022            db.pub_key_lookup.insert(owner, username).unwrap();
1023        }
1024
1025        if have_updates {
1026            self.events.meta_changed(Actor::Sync);
1027        }
1028
1029        Ok(())
1030    }
1031
1032    /// Updates remote and base metadata to local.
1033    async fn push_meta(&self) -> LbResult<()> {
1034        let mut updates = vec![];
1035        let mut local_changes_no_digests = Vec::new();
1036
1037        let tx = self.ro_tx().await;
1038        let db = tx.db();
1039
1040        // remote = local
1041        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1042
1043        for id in local.tree.staged.ids() {
1044            let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1045            let maybe_base_file = local.tree.base.maybe_find(&id);
1046
1047            // change everything but document hmac and re-sign
1048            local_change.set_hmac_and_size(
1049                maybe_base_file.and_then(|f| f.document_hmac().copied()),
1050                maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1051            );
1052            let local_change = local_change.sign(&self.keychain)?;
1053
1054            local_changes_no_digests.push(local_change.clone());
1055            let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1056            updates.push(file_diff);
1057        }
1058
1059        drop(tx);
1060
1061        if !updates.is_empty() {
1062            self.client
1063                .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1064                .await?;
1065        }
1066
1067        let mut tx = self.begin_tx().await;
1068        let db = tx.db();
1069
1070        // base = local
1071        (&mut db.base_metadata)
1072            .to_lazy()
1073            .stage(local_changes_no_digests)
1074            .promote()?;
1075        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1076
1077        tx.end();
1078
1079        Ok(())
1080    }
1081
1082    /// Updates remote and base files to local. Assumes metadata is already pushed for all new files.
1083    // todo: make this so that all document updates are attempted and we don't just return the
1084    // first error. Once an attempt is made we can return any or all errors, either would be an
1085    // improvement
1086    async fn push_docs(&self) -> LbResult<()> {
1087        let mut updates = vec![];
1088        let mut local_changes_digests_only = vec![];
1089
1090        let tx = self.ro_tx().await;
1091        let db = tx.db();
1092        let start = Instant::now();
1093
1094        let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1095
1096        for id in local.tree.staged.ids() {
1097            let base_file = local.tree.base.find(&id)?.clone();
1098
1099            // change only document hmac and re-sign
1100            let mut local_change = base_file.timestamped_value.value.clone();
1101            local_change.set_hmac_and_size(
1102                local.find(&id)?.document_hmac().copied(),
1103                *local.find(&id)?.timestamped_value.value.doc_size(),
1104            );
1105
1106            if base_file.document_hmac() == local_change.document_hmac()
1107                || local_change.document_hmac().is_none()
1108            {
1109                continue;
1110            }
1111
1112            let local_change = local_change.sign(&self.keychain)?;
1113
1114            updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1115            local_changes_digests_only.push(local_change);
1116            self.events
1117                .sync_update(SyncIncrement::PushingDocument(id, true));
1118        }
1119
1120        drop(tx);
1121        if start.elapsed() > web_time::Duration::from_millis(100) {
1122            warn!("sync push_docs held lock for {:?}", start.elapsed());
1123        }
1124
1125        let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1126
1127        let mut stream = stream::iter(futures).buffer_unordered(
1128            thread::available_parallelism()
1129                .unwrap_or(NonZeroUsize::new(4).unwrap())
1130                .into(),
1131        );
1132
1133        let mut docs_without_errors = vec![];
1134        let mut last_error: Option<LbErr> = None;
1135
1136        while let Some(fut) = stream.next().await {
1137            match fut {
1138                Ok(id) => {
1139                    docs_without_errors.push(id);
1140                    self.events
1141                        .sync_update(SyncIncrement::PushingDocument(id, false));
1142                }
1143                Err(err) => {
1144                    last_error = Some(err);
1145                }
1146            }
1147        }
1148
1149        local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1150
1151        let mut tx = self.begin_tx().await;
1152        let db = tx.db();
1153        // base = local (metadata)
1154        (&mut db.base_metadata)
1155            .to_lazy()
1156            .stage(local_changes_digests_only)
1157            .promote()?;
1158
1159        db.base_metadata.stage(&mut db.local_metadata).prune()?;
1160
1161        tx.end();
1162
1163        if let Some(err) = last_error { Err(err) } else { Ok(()) }
1164    }
1165
1166    async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1167        let id = *diff.new.id();
1168        let hmac = diff.new.document_hmac();
1169        let local_document_change = self.docs.get(id, hmac.copied()).await?;
1170        self.client
1171            .request(
1172                self.get_account()?,
1173                ChangeDocRequestV2 { diff, new_content: local_document_change },
1174            )
1175            .await?;
1176
1177        Ok(id)
1178    }
1179
1180    #[cfg(not(target_family = "wasm"))]
1181    async fn send_debug_info(&self, account: Account) {
1182        use crate::service::debug;
1183
1184        let max_panic_time = debug::latest_panic_time(&self.config.writeable_path)
1185            .await
1186            .unwrap_or_else(|e| {
1187                warn!("could not enumerate panic files: {e:?}");
1188                None
1189            });
1190
1191        let last_sent = {
1192            let tx = self.ro_tx().await;
1193            tx.db().last_extracted_panic.get().copied()
1194        };
1195
1196        let should_send = match (last_sent, max_panic_time) {
1197            (None, _) => true,
1198            (Some(prev), Some(cur)) => cur > prev,
1199            (Some(_), None) => false,
1200        };
1201
1202        if !should_send {
1203            return;
1204        }
1205
1206        let debug_info = self
1207            .debug_info("none provided - sync".to_string(), false)
1208            .await
1209            .unwrap();
1210
1211        let new_marker = max_panic_time.unwrap_or(0);
1212        let bg_self = self.clone();
1213        let task = async move {
1214            match bg_self
1215                .client
1216                .request(&account, UpsertDebugInfoRequest { debug_info })
1217                .await
1218            {
1219                Ok(_) => {
1220                    let mut tx = bg_self.begin_tx().await;
1221                    if let Err(e) = tx.db().last_extracted_panic.insert(new_marker) {
1222                        warn!("could not record last_extracted_panic: {e:?}");
1223                    }
1224                    tx.end();
1225                }
1226                Err(e) => warn!("send_debug_info failed: {e:?}"),
1227            }
1228        };
1229
1230        if self.config.background_work {
1231            tokio::spawn(task);
1232        } else {
1233            task.await;
1234        }
1235    }
1236
1237    async fn read_document_helper<T>(
1238        &self, id: Uuid, tree: &mut LazyTree<T>,
1239    ) -> LbResult<DecryptedDocument>
1240    where
1241        T: TreeLike<F = SignedMeta>,
1242    {
1243        let file = tree.find(&id)?;
1244        validate::is_document(file)?;
1245        let hmac = file.document_hmac().copied();
1246
1247        if tree.calculate_deleted(&id)? {
1248            return Err(LbErrKind::FileNonexistent.into());
1249        }
1250
1251        let doc = match hmac {
1252            Some(hmac) => {
1253                let doc = self.docs.get(id, Some(hmac)).await?;
1254                tree.decrypt_document(&id, &doc, &self.keychain)?
1255            }
1256            None => vec![],
1257        };
1258
1259        Ok(doc)
1260    }
1261
1262    /// for tests only
1263    #[doc(hidden)]
1264    pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1265        let mut state = self.syncer.lock().await;
1266        self.inital_sync_state(&mut state).await?;
1267        self.process_deletions().await?;
1268        self.fetch_meta(&mut state).await?;
1269
1270        let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1271
1272        Ok(server_ids)
1273    }
1274
1275    pub(crate) fn setup_syncer(&self) {
1276        if self.config.background_work {
1277            self.clone().local_change_worker();
1278            self.clone().periodic_sync_worker();
1279            self.clone().post_sync_worker();
1280        }
1281    }
1282
1283    fn local_change_worker(self) {
1284        #[cfg(not(target_family = "wasm"))]
1285        tokio::spawn(async move {
1286            let mut events = self.subscribe();
1287
1288            let sync_criteria = |e: Event| {
1289                matches!(
1290                    e,
1291                    Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1292                )
1293            };
1294
1295            loop {
1296                time::sleep(Duration::from_millis(500)).await;
1297                let mut should_sync = false;
1298
1299                // drain the current channel, so we don't sync for each keystroke if they pile up
1300                loop {
1301                    let event = events.try_recv();
1302                    match event {
1303                        Ok(event) => {
1304                            if sync_criteria(event) {
1305                                should_sync = true;
1306                            }
1307                        }
1308                        Err(TryRecvError::Empty) => break,
1309                        _ => {
1310                            panic!(
1311                                "unexpected broadcast receive error, returning local_change_worker"
1312                            );
1313                        }
1314                    }
1315                }
1316
1317                // empty channel + nothing interesting has happened, sit and wait for something
1318                // interesting
1319                if !should_sync {
1320                    let event = events.recv().await.unwrap();
1321                    if sync_criteria(event) {
1322                        self.sync().await.map_unexpected().log_and_ignore();
1323                    } else {
1324                        continue;
1325                    }
1326                }
1327            }
1328        });
1329    }
1330
1331    fn periodic_sync_worker(self) {
1332        #[cfg(not(target_family = "wasm"))]
1333        tokio::spawn(async move {
1334            loop {
1335                self.sync().await.map_unexpected().log_and_ignore();
1336                if self.user_active().await {
1337                    tokio::time::sleep(Duration::from_secs(3)).await;
1338                } else {
1339                    tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1340                }
1341            }
1342        });
1343    }
1344
1345    async fn user_active(&self) -> bool {
1346        let last_seen = self.user_last_seen.read().await;
1347        last_seen.elapsed() < Duration::from_secs(3 * 60)
1348    }
1349
1350    fn post_sync_worker(self) {
1351        #[cfg(not(target_family = "wasm"))]
1352        tokio::spawn(async move {
1353            let mut events = self.subscribe();
1354
1355            loop {
1356                let event = events.recv().await.unwrap();
1357                if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1358                    self.fetcher().await.map_unexpected().log_and_ignore();
1359                    self.populate_pk_cache()
1360                        .await
1361                        .map_unexpected()
1362                        .log_and_ignore();
1363                };
1364            }
1365        });
1366    }
1367
1368    async fn fetcher(&self) -> LbResult<()> {
1369        let mut files_to_pull = vec![];
1370
1371        let tx = self.ro_tx().await;
1372        let db = tx.db();
1373
1374        let Some(root) = db.root.get() else {
1375            return Ok(());
1376        };
1377
1378        // we can only fetch things we know the server knows about
1379        let mut tree = db.base_metadata.stage(None).to_lazy();
1380
1381        for id in tree.descendants_using_links(root)? {
1382            let file = tree.find(&id)?;
1383            let hmac = file.document_hmac().copied();
1384
1385            // skip non-documents
1386            if !file.is_document() {
1387                continue;
1388            }
1389
1390            // skip deleted files
1391            if tree.calculate_deleted(&id)? {
1392                continue;
1393            }
1394
1395            if self.docs.exists(id, hmac) {
1396                continue;
1397            }
1398
1399            // skip non-first-party files
1400            let name = tree.name(&id, &self.keychain)?;
1401            if !name.ends_with(".md") && !name.ends_with(".svg") {
1402                continue;
1403            }
1404
1405            files_to_pull.push((id, hmac));
1406        }
1407
1408        drop(tx);
1409
1410        // this could all be done in parallel, but for now going to not do it that way
1411        // benefits: less work, but also ensures that a file that needs to be fetched immediately
1412        // can be
1413        for (id, hmac) in files_to_pull {
1414            if let Some(hmac) = hmac {
1415                self.ensure_doc_available(id, hmac).await?;
1416            }
1417        }
1418
1419        Ok(())
1420    }
1421}