1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4 sync::Arc,
5 thread,
6 time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use tokio::time;
12use usvg::Transform;
13use uuid::Uuid;
14
15use crate::{
16 LbErrKind, LbResult, LocalLb,
17 io::network::ApiError,
18 model::{
19 ValidationFailure,
20 access_info::UserAccessMode,
21 account::Account,
22 api::{
23 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
24 GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
25 },
26 crypto::{DecryptedDocument, EncryptedDocument},
27 errors::{LbErr, Unexpected},
28 file::ShareMode,
29 file_like::FileLike,
30 file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
31 filename::{DocumentType, NameComponents},
32 lazy::LazyTree,
33 signed_meta::SignedMeta,
34 staged::StagedTreeLikeMut,
35 svg::{self, buffer::u_transform_to_bezier, element::Element},
36 symkey, text,
37 tree_like::TreeLike,
38 validate,
39 },
40 service::events::{Actor, Event, SyncIncrement},
41};
42
43pub type Syncer = Arc<Mutex<SyncState>>;
44
45#[derive(Default)]
46pub struct SyncState {
47 last_synced: u64,
49
50 updates_as_of: u64,
52
53 remote_changes: Vec<SignedMeta>,
55
56 new_root: Option<Uuid>,
58
59 pulled_docs: Vec<Uuid>,
61}
62
63impl LocalLb {
70 #[instrument(level = "debug", skip(self), err(Debug))]
71 pub async fn sync(&self) -> LbResult<()> {
72 let mut sync_state = self.syncer.lock().await;
73 self.events.sync_update(SyncIncrement::SyncStarted);
74
75 let pipeline: LbResult<()> = async {
76 self.pull_updates(&mut sync_state).await?;
77 self.push_local_changes().await?;
78 Ok(())
79 }
80 .await;
81
82 self.events.sync_update(SyncIncrement::SyncFinished(
83 pipeline.as_ref().err().map(|err| err.kind.clone()),
84 ));
85
86 self.cleanup().await?;
87
88 pipeline?;
89
90 let account = self.get_account()?.clone();
91
92 #[cfg(not(target_family = "wasm"))]
93 if account.is_beta() {
94 self.send_debug_info(account).await;
95 }
96
97 Ok(())
98 }
99
100 pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
101 self.inital_sync_state(sync_state).await?;
102 self.process_deletions().await?;
103 self.fetch_meta(sync_state).await?;
104 self.fetch_required_docs(sync_state).await?;
105 self.merge(sync_state).await?;
107 self.commit_last_synced(sync_state).await?;
108 self.send_pull_events(sync_state).await?;
109
110 if !self.config.background_work {
111 self.populate_pk_cache().await?;
112 }
113
114 Ok(())
115 }
116
117 pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
118 self.push_meta().await?;
119 self.push_docs().await?;
120
121 Ok(())
122 }
123
124 async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
125 let tx = self.ro_tx().await;
126 let db = tx.db();
127
128 *state = Default::default();
129 state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
130
131 Ok(())
132 }
133
134 pub(crate) async fn process_deletions(&self) -> LbResult<()> {
135 let server_ids = self
136 .client
137 .request(self.get_account()?, GetFileIdsRequest {})
138 .await?
139 .ids;
140
141 let mut tx = self.begin_tx().await;
142 let db = tx.db();
143
144 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
145 let base_ids = local.tree.base.ids();
146
147 let mut prunable_ids = base_ids;
148 prunable_ids.retain(|id| !server_ids.contains(id));
149 for id in prunable_ids.clone() {
150 prunable_ids.extend(local.descendants(&id)?.into_iter());
151 }
152 for id in &prunable_ids {
153 if let Some(base_file) = local.tree.base.maybe_find(id) {
154 self.docs
155 .delete(*id, base_file.document_hmac().copied())
156 .await?;
157 }
158 if let Some(local_file) = local.maybe_find(id) {
159 self.docs
160 .delete(*id, local_file.document_hmac().copied())
161 .await?;
162 }
163 }
164
165 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
166 base_staged.tree.removed = prunable_ids.iter().copied().collect();
167 base_staged.promote()?;
168
169 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
170 local_staged.tree.removed = prunable_ids.iter().copied().collect();
171 local_staged.promote()?;
172
173 if !prunable_ids.is_empty() {
174 self.events.meta_changed(Actor::Sync);
175 }
176
177 Ok(())
178 }
179
180 async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
181 let updates = self
182 .client
183 .request(
184 self.get_account()?,
185 GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
186 )
187 .await?;
188
189 let tx = self.ro_tx().await;
190 let db = tx.db();
191
192 let mut without_orphans = Vec::new();
194 let me = Owner(self.keychain.get_pk()?);
195 let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
196 for id in remote.tree.staged.ids() {
197 let meta = remote.find(&id)?;
198 if remote.maybe_find_parent(meta).is_some()
199 || meta
200 .user_access_keys()
201 .iter()
202 .any(|k| k.encrypted_for == me.0)
203 {
204 without_orphans.push(remote.find(&id)?.clone());
205 }
206 }
207
208 let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
210 let (_, deduped_changes) = pruned_tree.unstage();
211
212 let mut root_id = None;
214 if db.root.get().is_none() {
215 let root = deduped_changes
216 .all_files()?
217 .into_iter()
218 .find(|f| f.is_root())
219 .ok_or(LbErrKind::RootNonexistent)?;
220 root_id = Some(*root.id());
221 }
222
223 state.remote_changes = deduped_changes;
224 state.updates_as_of = updates.as_of_metadata_version;
225 state.new_root = root_id;
226
227 Ok(())
228 }
229
230 async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
231 let mut docs_to_pull = vec![];
232
233 let tx = self.ro_tx().await;
234 let db = tx.db();
235
236 let mut files_with_local_edits = vec![];
237 let local = db.base_metadata.stage(&db.local_metadata);
238 for id in local.staged.ids() {
239 if let Some(base) = local.base.maybe_find(&id) {
240 if let Some(local_hmac) = local.find(&id)?.document_hmac() {
241 if Some(local_hmac) != base.document_hmac() {
242 files_with_local_edits.push(id);
243 println!("local edits found");
244 }
245 }
246 }
247 }
248
249 let mut remote = db
250 .base_metadata
251 .stage(state.remote_changes.clone())
252 .to_lazy();
253
254 for id in remote.tree.staged.ids() {
255 if remote.calculate_deleted(&id)? {
256 continue;
257 }
258 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
259 let base_hmac = remote
260 .tree
261 .base
262 .maybe_find(&id)
263 .and_then(|f| f.document_hmac())
264 .cloned();
265 if base_hmac == remote_hmac {
266 continue;
267 }
268
269 if let Some(remote_hmac) = remote_hmac {
270 if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
274 docs_to_pull.push((id, remote_hmac));
275 }
276
277 if files_with_local_edits.contains(&id)
280 && !docs_to_pull
281 .iter()
282 .any(|(already_pulling, _)| already_pulling == &id)
283 {
284 if let Some(base_hmac) = base_hmac {
285 if !self.docs.exists(id, Some(base_hmac)) {
286 docs_to_pull.push((id, base_hmac));
289 }
290 }
291 docs_to_pull.push((id, remote_hmac));
292 }
293 }
294 }
295 drop(tx);
296
297 let futures = docs_to_pull
298 .into_iter()
299 .map(|(id, hmac)| async move { self.ensure_doc_available(id, hmac).await.map(|_| id) });
300
301 let mut stream = stream::iter(futures).buffer_unordered(
302 thread::available_parallelism()
303 .unwrap_or(NonZeroUsize::new(4).unwrap())
304 .into(),
305 );
306
307 while let Some(fut) = stream.next().await {
308 let id = fut?;
309 state.pulled_docs.push(id);
310 }
311
312 Ok(())
313 }
314
315 pub(crate) async fn ensure_doc_available(
316 &self, id: Uuid, hmac: DocumentHmac,
317 ) -> LbResult<Option<EncryptedDocument>> {
318 if self.docs.exists(id, Some(hmac)) {
323 return Ok(None);
324 }
325
326 self.events
327 .sync_update(SyncIncrement::PullingDocument(id, true));
328 let remote_document = self
329 .client
330 .request(self.get_account()?, GetDocRequest { id, hmac })
331 .await?;
332 self.docs
333 .insert(id, Some(hmac), &remote_document.content)
334 .await?;
335 self.events
336 .sync_update(SyncIncrement::PullingDocument(id, false));
337
338 Ok(Some(remote_document.content))
339 }
340
341 pub(crate) async fn fetch_doc(
342 &self, id: Uuid, hmac: DocumentHmac,
343 ) -> LbResult<EncryptedDocument> {
344 match self.ensure_doc_available(id, hmac).await? {
345 Some(doc) => Ok(doc),
346 None => self.docs.get(id, Some(hmac)).await,
347 }
348 }
349
350 async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
353 let mut tx = self.begin_tx().await;
354 let db = tx.db();
355 let start = Instant::now();
356
357 let remote_changes = &state.remote_changes;
358
359 let me = Owner(self.keychain.get_pk()?);
361
362 let merge_changes = {
364 let mut base = (&db.base_metadata).to_lazy();
366 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
367 let mut remote = remote_unlazy.as_lazy();
368 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
369
370 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
372 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
373 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
374 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
375 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
376
377 'merge_construction: loop {
378 let mut deletions = {
380 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
381
382 let mut deletion_creations = HashSet::new();
384 for id in db.local_metadata.ids() {
385 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
386 deletion_creations.insert(id);
387 }
388 }
389 'drain_creations: while !deletion_creations.is_empty() {
390 'choose_a_creation: for id in &deletion_creations {
391 let id = *id;
393 let local_file = local.find(&id)?.clone();
394 let result = deletions.create_unvalidated(
395 id,
396 symkey::generate_key(),
397 local_file.parent(),
398 &local.name(&id, &self.keychain)?,
399 local_file.file_type(),
400 &self.keychain,
401 );
402 match result {
403 Ok(_) => {
404 deletion_creations.remove(&id);
405 continue 'drain_creations;
406 }
407 Err(ref err) => match err.kind {
408 LbErrKind::FileParentNonexistent => {
409 continue 'choose_a_creation;
410 }
411 _ => {
412 result?;
413 }
414 },
415 }
416 }
417 return Err(LbErrKind::Unexpected(format!(
418 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
419 ))
420 .into());
421 }
422
423 for id in db.local_metadata.ids() {
425 let local_file = local.find(&id)?.clone();
426 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
427 if !local_file.explicitly_deleted()
428 && local_file.parent() != base_file.parent()
429 && !files_to_unmove.contains(&id)
430 {
431 deletions.move_unvalidated(
433 &id,
434 local_file.parent(),
435 &self.keychain,
436 )?;
437 }
438 }
439 }
440
441 for id in db.local_metadata.ids() {
443 let local_file = local.find(&id)?.clone();
444 if local_file.explicitly_deleted() {
445 deletions.delete_unvalidated(&id, &self.keychain)?;
447 }
448 }
449 deletions
450 };
451
452 let mut merge = {
454 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
455
456 let mut creations = HashSet::new();
458 for id in db.local_metadata.ids() {
459 if deletions.maybe_find(&id).is_some()
460 && !deletions.calculate_deleted(&id)?
461 && remote.maybe_find(&id).is_none()
462 && !links_to_delete.contains(&id)
463 {
464 creations.insert(id);
465 }
466 }
467 'drain_creations: while !creations.is_empty() {
468 'choose_a_creation: for id in &creations {
469 let id = *id;
471 let local_file = local.find(&id)?.clone();
472 let result = merge.create_unvalidated(
473 id,
474 local.decrypt_key(&id, &self.keychain)?,
475 local_file.parent(),
476 &local.name(&id, &self.keychain)?,
477 local_file.file_type(),
478 &self.keychain,
479 );
480 match result {
481 Ok(_) => {
482 creations.remove(&id);
483 continue 'drain_creations;
484 }
485 Err(ref err) => match err.kind {
486 LbErrKind::FileParentNonexistent => {
487 continue 'choose_a_creation;
488 }
489 _ => {
490 result?;
491 }
492 },
493 }
494 }
495 return Err(LbErrKind::Unexpected(format!(
496 "sync failed to find a topomodelal order for file creations: {creations:?}"
497 ))
498 .into());
499 }
500
501 for id in db.local_metadata.ids() {
504 if deletions.maybe_find(&id).is_none()
506 || deletions.calculate_deleted(&id)?
507 || (remote.maybe_find(&id).is_some()
508 && remote.calculate_deleted(&id)?)
509 {
510 continue;
511 }
512
513 let local_file = local.find(&id)?.clone();
514 let local_name = local.name(&id, &self.keychain)?;
515 let maybe_base_file = base.maybe_find(&id).cloned();
516 let maybe_remote_file = remote.maybe_find(&id).cloned();
517 if let Some(ref base_file) = maybe_base_file {
518 let base_name = base.name(&id, &self.keychain)?;
519 let remote_file = remote.find(&id)?.clone();
520 let remote_name = remote.name(&id, &self.keychain)?;
521
522 if local_file.parent() != base_file.parent()
524 && remote_file.parent() == base_file.parent()
525 && !files_to_unmove.contains(&id)
526 {
527 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
528 }
529
530 if local_name != base_name && remote_name == base_name {
532 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
533 }
534 }
535
536 let mut remote_keys = HashMap::new();
538 if let Some(ref remote_file) = maybe_remote_file {
539 for key in remote_file.user_access_keys() {
540 remote_keys.insert(
541 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
542 (key.mode, key.deleted),
543 );
544 }
545 }
546 for key in local_file.user_access_keys() {
547 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
548 if let Some(&(remote_mode, remote_deleted)) =
549 remote_keys.get(&(by, for_))
550 {
551 if key.mode > remote_mode || !key.deleted && remote_deleted {
553 let mode = match key.mode {
554 UserAccessMode::Read => ShareMode::Read,
555 UserAccessMode::Write => ShareMode::Write,
556 UserAccessMode::Owner => continue,
557 };
558 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
559 }
560 if key.deleted && !remote_deleted {
562 merge.delete_share_unvalidated(
563 &id,
564 Some(for_.0),
565 &self.keychain,
566 )?;
567 }
568 } else {
569 let mode = match key.mode {
571 UserAccessMode::Read => ShareMode::Read,
572 UserAccessMode::Write => ShareMode::Write,
573 UserAccessMode::Owner => continue,
574 };
575 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
576 }
577 }
578
579 if files_to_unshare.contains(&id) {
581 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
582 }
583
584 if let Some(&rename_increment) = rename_increments.get(&id) {
586 let name = NameComponents::from(&local_name)
587 .generate_incremented(rename_increment)
588 .to_name();
589 merge.rename_unvalidated(&id, &name, &self.keychain)?;
590 }
591
592 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
594 let remote_hmac =
595 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
596 let local_hmac = local_file.document_hmac().cloned();
597 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
598 && local_hmac != base_hmac
599 {
600 if remote_hmac != base_hmac && remote_hmac != local_hmac {
601 let merge_name = merge.name(&id, &self.keychain)?;
603 let document_type =
604 DocumentType::from_file_name_using_extension(&merge_name);
605
606 let base_document =
609 self.read_document_helper(id, &mut base).await?;
610 let remote_document =
611 self.read_document_helper(id, &mut remote).await?;
612 let local_document =
613 self.read_document_helper(id, &mut local).await?;
614
615 match document_type {
616 DocumentType::Text => {
617 let base_document =
620 String::from_utf8_lossy(&base_document).to_string();
621 let remote_document =
622 String::from_utf8_lossy(&remote_document).to_string();
623 let local_document =
624 String::from_utf8_lossy(&local_document).to_string();
625 let merged_document =
626 text::buffer::Buffer::from(base_document.as_str())
627 .merge(local_document, remote_document);
628 let encrypted_document = merge
629 .update_document_unvalidated(
630 &id,
631 &merged_document.into_bytes(),
632 &self.keychain,
633 )?;
634 let hmac = merge.find(&id)?.document_hmac().copied();
635 self.docs.insert(id, hmac, &encrypted_document).await?;
636 }
637 DocumentType::Drawing => {
638 let base_document =
639 String::from_utf8_lossy(&base_document).to_string();
640 let remote_document =
641 String::from_utf8_lossy(&remote_document).to_string();
642 let local_document =
643 String::from_utf8_lossy(&local_document).to_string();
644
645 let base_buffer = svg::buffer::Buffer::new(&base_document);
646 let remote_buffer =
647 svg::buffer::Buffer::new(&remote_document);
648 let mut local_buffer =
649 svg::buffer::Buffer::new(&local_document);
650
651 for (_, el) in local_buffer.elements.iter_mut() {
652 if let Element::Path(path) = el {
653 path.data.apply_transform(u_transform_to_bezier(
654 &Transform::from(
655 local_buffer
656 .weak_viewport_settings
657 .master_transform,
658 ),
659 ));
660 }
661 }
662 svg::buffer::Buffer::reload(
663 &mut local_buffer.elements,
664 &mut local_buffer.weak_images,
665 &mut local_buffer.weak_path_pressures,
666 &mut local_buffer.weak_viewport_settings,
667 &base_buffer,
668 &remote_buffer,
669 );
670
671 let merged_document = local_buffer.serialize();
672 let encrypted_document = merge
673 .update_document_unvalidated(
674 &id,
675 &merged_document.into_bytes(),
676 &self.keychain,
677 )?;
678 let hmac = merge.find(&id)?.document_hmac().copied();
679 self.docs.insert(id, hmac, &encrypted_document).await?;
680 }
681 DocumentType::Other => {
682 let merge_parent = *merge.find(&id)?.parent();
684 let duplicate_id = if let Some(&duplicate_id) =
685 duplicate_file_ids.get(&id)
686 {
687 duplicate_id
688 } else {
689 let duplicate_id = Uuid::new_v4();
690 duplicate_file_ids.insert(id, duplicate_id);
691 rename_increments.insert(duplicate_id, 1);
692 duplicate_id
693 };
694
695 let mut merge_name = merge_name;
696 merge_name = NameComponents::from(&merge_name)
697 .generate_incremented(
698 rename_increments
699 .get(&duplicate_id)
700 .copied()
701 .unwrap_or_default(),
702 )
703 .to_name();
704
705 merge.create_unvalidated(
706 duplicate_id,
707 symkey::generate_key(),
708 &merge_parent,
709 &merge_name,
710 FileType::Document,
711 &self.keychain,
712 )?;
713 let encrypted_document = merge
714 .update_document_unvalidated(
715 &duplicate_id,
716 &local_document,
717 &self.keychain,
718 )?;
719 let duplicate_hmac =
720 merge.find(&duplicate_id)?.document_hmac().copied();
721 self.docs
722 .insert(
723 duplicate_id,
724 duplicate_hmac,
725 &encrypted_document,
726 )
727 .await?;
728 }
729 }
730 } else {
731 let document = self.read_document_helper(id, &mut local).await?;
733 merge.update_document_unvalidated(
734 &id,
735 &document,
736 &self.keychain,
737 )?;
738 }
739 }
740 }
741
742 for id in db.local_metadata.ids() {
745 if db.base_metadata.maybe_find(&id).is_some()
746 && deletions.calculate_deleted(&id)?
747 && !merge.calculate_deleted(&id)?
748 {
749 merge.delete_unvalidated(&id, &self.keychain)?;
751 }
752 }
753 for &id in &links_to_delete {
754 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
756 merge.delete_unvalidated(&id, &self.keychain)?;
757 }
758 }
759
760 merge
761 };
762
763 for link in merge.ids() {
765 if !merge.calculate_deleted(&link)? {
766 if let FileType::Link { target } = merge.find(&link)?.file_type() {
767 if merge.maybe_find(&target).is_some()
768 && merge.calculate_deleted(&target)?
769 {
770 if links_to_delete.insert(link) {
772 continue 'merge_construction;
773 } else {
774 return Err(LbErrKind::Unexpected(format!(
775 "sync failed to resolve broken link (deletion): {link:?}"
776 ))
777 .into());
778 }
779 }
780 }
781 }
782 }
783
784 let validate_result = merge.validate(me);
785 match validate_result {
786 Ok(_) => {
788 let (_, merge_changes) = merge.unstage();
789 break merge_changes;
790 }
791 Err(ref err) => match err.kind {
792 LbErrKind::Validation(ref vf) => match vf {
793 ValidationFailure::Cycle(ids) => {
795 let mut progress = false;
797 for &id in ids {
798 if db.local_metadata.maybe_find(&id).is_some()
799 && files_to_unmove.insert(id)
800 {
801 progress = true;
802 }
803 }
804 if !progress {
805 return Err(LbErrKind::Unexpected(format!(
806 "sync failed to resolve cycle: {ids:?}"
807 ))
808 .into());
809 }
810 }
811 ValidationFailure::PathConflict(ids) => {
812 let mut progress = false;
814 for &id in ids {
815 if duplicate_file_ids.values().any(|&dup| dup == id) {
816 *rename_increments.entry(id).or_insert(0) += 1;
817 progress = true;
818 break;
819 }
820 }
821 if !progress {
822 for &id in ids {
823 if db.local_metadata.maybe_find(&id).is_some() {
824 *rename_increments.entry(id).or_insert(0) += 1;
825 progress = true;
826 break;
827 }
828 }
829 }
830 if !progress {
831 return Err(LbErrKind::Unexpected(format!(
832 "sync failed to resolve path conflict: {ids:?}"
833 ))
834 .into());
835 }
836 }
837 ValidationFailure::SharedLink { link, shared_ancestor } => {
838 let mut progress = false;
840 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
841 {
842 if !base_shared_ancestor.is_shared()
843 && files_to_unshare.insert(*shared_ancestor)
844 {
845 progress = true;
846 }
847 }
848 if !progress && links_to_delete.insert(*link) {
849 progress = true;
850 }
851 if !progress {
852 return Err(LbErrKind::Unexpected(format!(
853 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
854 )).into());
855 }
856 }
857 ValidationFailure::DuplicateLink { target } => {
858 let mut progress = false;
860 if let Some(link) = local.linked_by(target)? {
861 if links_to_delete.insert(link) {
862 progress = true;
863 }
864 }
865 if !progress {
866 return Err(LbErrKind::Unexpected(format!(
867 "sync failed to resolve duplicate link: target: {target:?}"
868 ))
869 .into());
870 }
871 }
872 ValidationFailure::BrokenLink(link) => {
873 if !links_to_delete.insert(*link) {
875 return Err(LbErrKind::Unexpected(format!(
876 "sync failed to resolve broken link: {link:?}"
877 ))
878 .into());
879 }
880 }
881 ValidationFailure::OwnedLink(link) => {
882 let mut progress = false;
884 if let Some(remote_link) = remote.maybe_find(link) {
885 if let FileType::Link { target } = remote_link.file_type() {
886 let remote_target = remote.find(&target)?;
887 if remote_target.owner() != me
888 && files_to_unmove.insert(target)
889 {
890 progress = true;
891 }
892 }
893 }
894 if !progress && links_to_delete.insert(*link) {
895 progress = true;
896 }
897 if !progress {
898 return Err(LbErrKind::Unexpected(format!(
899 "sync failed to resolve owned link: {link:?}"
900 ))
901 .into());
902 }
903 }
904 ValidationFailure::Orphan(_)
906 | ValidationFailure::NonFolderWithChildren(_)
907 | ValidationFailure::FileWithDifferentOwnerParent(_)
908 | ValidationFailure::FileNameTooLong(_)
909 | ValidationFailure::DeletedFileUpdated(_)
910 | ValidationFailure::NonDecryptableFileName(_) => {
911 validate_result?;
912 }
913 },
914 _ => {
916 validate_result?;
917 }
918 },
919 }
920 }
921 };
922
923 (&mut db.base_metadata)
925 .to_staged(remote_changes.clone())
926 .to_lazy()
927 .promote()?;
928 db.local_metadata.clear()?;
929 (&mut db.local_metadata)
930 .to_staged(merge_changes)
931 .to_lazy()
932 .promote()?;
933
934 db.base_metadata.stage(&mut db.local_metadata).prune()?;
937
938 if start.elapsed() > web_time::Duration::from_millis(100) {
939 warn!("sync merge held lock for {:?}", start.elapsed());
940 }
941
942 Ok(())
943 }
944
945 async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
946 if state.new_root.is_some() {
947 self.events.signed_in();
948 }
949
950 if !state.remote_changes.is_empty() {
951 self.events.meta_changed(Actor::Sync);
952
953 let owner = Owner(self.keychain.get_pk()?);
954 if state.remote_changes.iter().any(|f| f.owner() != owner) {
955 self.events.pending_shares_changed();
956 }
957 }
958
959 for &doc in &state.pulled_docs {
960 self.events.doc_written(doc, Actor::Sync);
961 }
962
963 Ok(())
964 }
965
966 async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
967 let mut tx = self.begin_tx().await;
968 let db = tx.db();
969 db.last_synced.insert(state.updates_as_of as i64)?;
970
971 if let Some(root) = state.new_root {
972 db.root.insert(root)?;
973 }
974
975 Ok(())
976 }
977
978 async fn populate_pk_cache(&self) -> LbResult<()> {
979 let mut missing_owners = HashSet::new();
981 {
982 let tx = self.ro_tx().await;
983 let db = tx.db();
984 for file in db.base_metadata.get().values() {
985 for user_access_key in file.user_access_keys() {
986 let enc_by = Owner(user_access_key.encrypted_by);
987 let enc_for = Owner(user_access_key.encrypted_for);
988
989 if !db.pub_key_lookup.get().contains_key(&enc_by) {
990 missing_owners.insert(enc_by);
991 }
992
993 if !db.pub_key_lookup.get().contains_key(&enc_for) {
994 missing_owners.insert(enc_for);
995 }
996 }
997 }
998 }
999
1000 let mut new_owners = HashMap::new();
1001 {
1002 for owner in missing_owners {
1003 let username_result = self
1004 .client
1005 .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
1006 .await;
1007 new_owners.insert(owner, username_result);
1008 }
1009 }
1010
1011 let mut tx = self.begin_tx().await;
1012 let db = tx.db();
1013
1014 let have_updates = !new_owners.is_empty();
1015 for (owner, username) in new_owners {
1016 let username = match username {
1017 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1018 Ok(username) => username.username,
1019 _ => continue, };
1021
1022 db.pub_key_lookup.insert(owner, username).unwrap();
1023 }
1024
1025 if have_updates {
1026 self.events.meta_changed(Actor::Sync);
1027 }
1028
1029 Ok(())
1030 }
1031
1032 async fn push_meta(&self) -> LbResult<()> {
1034 let mut updates = vec![];
1035 let mut local_changes_no_digests = Vec::new();
1036
1037 let tx = self.ro_tx().await;
1038 let db = tx.db();
1039
1040 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1042
1043 for id in local.tree.staged.ids() {
1044 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1045 let maybe_base_file = local.tree.base.maybe_find(&id);
1046
1047 local_change.set_hmac_and_size(
1049 maybe_base_file.and_then(|f| f.document_hmac().copied()),
1050 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1051 );
1052 let local_change = local_change.sign(&self.keychain)?;
1053
1054 local_changes_no_digests.push(local_change.clone());
1055 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1056 updates.push(file_diff);
1057 }
1058
1059 drop(tx);
1060
1061 if !updates.is_empty() {
1062 self.client
1063 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1064 .await?;
1065 }
1066
1067 let mut tx = self.begin_tx().await;
1068 let db = tx.db();
1069
1070 (&mut db.base_metadata)
1072 .to_lazy()
1073 .stage(local_changes_no_digests)
1074 .promote()?;
1075 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1076
1077 tx.end();
1078
1079 Ok(())
1080 }
1081
1082 async fn push_docs(&self) -> LbResult<()> {
1087 let mut updates = vec![];
1088 let mut local_changes_digests_only = vec![];
1089
1090 let tx = self.ro_tx().await;
1091 let db = tx.db();
1092 let start = Instant::now();
1093
1094 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1095
1096 for id in local.tree.staged.ids() {
1097 let base_file = local.tree.base.find(&id)?.clone();
1098
1099 let mut local_change = base_file.timestamped_value.value.clone();
1101 local_change.set_hmac_and_size(
1102 local.find(&id)?.document_hmac().copied(),
1103 *local.find(&id)?.timestamped_value.value.doc_size(),
1104 );
1105
1106 if base_file.document_hmac() == local_change.document_hmac()
1107 || local_change.document_hmac().is_none()
1108 {
1109 continue;
1110 }
1111
1112 let local_change = local_change.sign(&self.keychain)?;
1113
1114 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1115 local_changes_digests_only.push(local_change);
1116 self.events
1117 .sync_update(SyncIncrement::PushingDocument(id, true));
1118 }
1119
1120 drop(tx);
1121 if start.elapsed() > web_time::Duration::from_millis(100) {
1122 warn!("sync push_docs held lock for {:?}", start.elapsed());
1123 }
1124
1125 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1126
1127 let mut stream = stream::iter(futures).buffer_unordered(
1128 thread::available_parallelism()
1129 .unwrap_or(NonZeroUsize::new(4).unwrap())
1130 .into(),
1131 );
1132
1133 let mut docs_without_errors = vec![];
1134 let mut last_error: Option<LbErr> = None;
1135
1136 while let Some(fut) = stream.next().await {
1137 match fut {
1138 Ok(id) => {
1139 docs_without_errors.push(id);
1140 self.events
1141 .sync_update(SyncIncrement::PushingDocument(id, false));
1142 }
1143 Err(err) => {
1144 last_error = Some(err);
1145 }
1146 }
1147 }
1148
1149 local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1150
1151 let mut tx = self.begin_tx().await;
1152 let db = tx.db();
1153 (&mut db.base_metadata)
1155 .to_lazy()
1156 .stage(local_changes_digests_only)
1157 .promote()?;
1158
1159 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1160
1161 tx.end();
1162
1163 if let Some(err) = last_error { Err(err) } else { Ok(()) }
1164 }
1165
1166 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1167 let id = *diff.new.id();
1168 let hmac = diff.new.document_hmac();
1169 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1170 self.client
1171 .request(
1172 self.get_account()?,
1173 ChangeDocRequestV2 { diff, new_content: local_document_change },
1174 )
1175 .await?;
1176
1177 Ok(id)
1178 }
1179
1180 #[cfg(not(target_family = "wasm"))]
1181 async fn send_debug_info(&self, account: Account) {
1182 use crate::service::debug;
1183
1184 let max_panic_time = debug::latest_panic_time(&self.config.writeable_path)
1185 .await
1186 .unwrap_or_else(|e| {
1187 warn!("could not enumerate panic files: {e:?}");
1188 None
1189 });
1190
1191 let last_sent = {
1192 let tx = self.ro_tx().await;
1193 tx.db().last_extracted_panic.get().copied()
1194 };
1195
1196 let should_send = match (last_sent, max_panic_time) {
1197 (None, _) => true,
1198 (Some(prev), Some(cur)) => cur > prev,
1199 (Some(_), None) => false,
1200 };
1201
1202 if !should_send {
1203 return;
1204 }
1205
1206 let debug_info = self
1207 .debug_info("none provided - sync".to_string(), false)
1208 .await
1209 .unwrap();
1210
1211 let new_marker = max_panic_time.unwrap_or(0);
1212 let bg_self = self.clone();
1213 let task = async move {
1214 match bg_self
1215 .client
1216 .request(&account, UpsertDebugInfoRequest { debug_info })
1217 .await
1218 {
1219 Ok(_) => {
1220 let mut tx = bg_self.begin_tx().await;
1221 if let Err(e) = tx.db().last_extracted_panic.insert(new_marker) {
1222 warn!("could not record last_extracted_panic: {e:?}");
1223 }
1224 tx.end();
1225 }
1226 Err(e) => warn!("send_debug_info failed: {e:?}"),
1227 }
1228 };
1229
1230 if self.config.background_work {
1231 tokio::spawn(task);
1232 } else {
1233 task.await;
1234 }
1235 }
1236
1237 async fn read_document_helper<T>(
1238 &self, id: Uuid, tree: &mut LazyTree<T>,
1239 ) -> LbResult<DecryptedDocument>
1240 where
1241 T: TreeLike<F = SignedMeta>,
1242 {
1243 let file = tree.find(&id)?;
1244 validate::is_document(file)?;
1245 let hmac = file.document_hmac().copied();
1246
1247 if tree.calculate_deleted(&id)? {
1248 return Err(LbErrKind::FileNonexistent.into());
1249 }
1250
1251 let doc = match hmac {
1252 Some(hmac) => {
1253 let doc = self.docs.get(id, Some(hmac)).await?;
1254 tree.decrypt_document(&id, &doc, &self.keychain)?
1255 }
1256 None => vec![],
1257 };
1258
1259 Ok(doc)
1260 }
1261
1262 #[doc(hidden)]
1264 pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1265 let mut state = self.syncer.lock().await;
1266 self.inital_sync_state(&mut state).await?;
1267 self.process_deletions().await?;
1268 self.fetch_meta(&mut state).await?;
1269
1270 let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1271
1272 Ok(server_ids)
1273 }
1274
1275 pub(crate) fn setup_syncer(&self) {
1276 if self.config.background_work {
1277 self.clone().local_change_worker();
1278 self.clone().periodic_sync_worker();
1279 self.clone().post_sync_worker();
1280 }
1281 }
1282
1283 fn local_change_worker(self) {
1284 #[cfg(not(target_family = "wasm"))]
1285 tokio::spawn(async move {
1286 let mut events = self.subscribe();
1287
1288 let sync_criteria = |e: Event| {
1289 matches!(
1290 e,
1291 Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1292 )
1293 };
1294
1295 loop {
1296 time::sleep(Duration::from_millis(500)).await;
1297 let mut should_sync = false;
1298
1299 loop {
1301 let event = events.try_recv();
1302 match event {
1303 Ok(event) => {
1304 if sync_criteria(event) {
1305 should_sync = true;
1306 }
1307 }
1308 Err(TryRecvError::Empty) => break,
1309 _ => {
1310 panic!(
1311 "unexpected broadcast receive error, returning local_change_worker"
1312 );
1313 }
1314 }
1315 }
1316
1317 if !should_sync {
1320 let event = events.recv().await.unwrap();
1321 if sync_criteria(event) {
1322 self.sync().await.map_unexpected().log_and_ignore();
1323 } else {
1324 continue;
1325 }
1326 }
1327 }
1328 });
1329 }
1330
1331 fn periodic_sync_worker(self) {
1332 #[cfg(not(target_family = "wasm"))]
1333 tokio::spawn(async move {
1334 loop {
1335 self.sync().await.map_unexpected().log_and_ignore();
1336 if self.user_active().await {
1337 tokio::time::sleep(Duration::from_secs(3)).await;
1338 } else {
1339 tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1340 }
1341 }
1342 });
1343 }
1344
1345 async fn user_active(&self) -> bool {
1346 let last_seen = self.user_last_seen.read().await;
1347 last_seen.elapsed() < Duration::from_secs(3 * 60)
1348 }
1349
1350 fn post_sync_worker(self) {
1351 #[cfg(not(target_family = "wasm"))]
1352 tokio::spawn(async move {
1353 let mut events = self.subscribe();
1354
1355 loop {
1356 let event = events.recv().await.unwrap();
1357 if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1358 self.fetcher().await.map_unexpected().log_and_ignore();
1359 self.populate_pk_cache()
1360 .await
1361 .map_unexpected()
1362 .log_and_ignore();
1363 };
1364 }
1365 });
1366 }
1367
1368 async fn fetcher(&self) -> LbResult<()> {
1369 let mut files_to_pull = vec![];
1370
1371 let tx = self.ro_tx().await;
1372 let db = tx.db();
1373
1374 let Some(root) = db.root.get() else {
1375 return Ok(());
1376 };
1377
1378 let mut tree = db.base_metadata.stage(None).to_lazy();
1380
1381 for id in tree.descendants_using_links(root)? {
1382 let file = tree.find(&id)?;
1383 let hmac = file.document_hmac().copied();
1384
1385 if !file.is_document() {
1387 continue;
1388 }
1389
1390 if tree.calculate_deleted(&id)? {
1392 continue;
1393 }
1394
1395 if self.docs.exists(id, hmac) {
1396 continue;
1397 }
1398
1399 let name = tree.name(&id, &self.keychain)?;
1401 if !name.ends_with(".md") && !name.ends_with(".svg") {
1402 continue;
1403 }
1404
1405 files_to_pull.push((id, hmac));
1406 }
1407
1408 drop(tx);
1409
1410 for (id, hmac) in files_to_pull {
1414 if let Some(hmac) = hmac {
1415 self.ensure_doc_available(id, hmac).await?;
1416 }
1417 }
1418
1419 Ok(())
1420 }
1421}