1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4 sync::Arc,
5 thread,
6 time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use usvg::Transform;
12use uuid::Uuid;
13
14use crate::{
15 Lb, LbErrKind, LbResult,
16 io::network::ApiError,
17 model::{
18 ValidationFailure,
19 access_info::UserAccessMode,
20 account::Account,
21 api::{
22 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
23 GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
24 },
25 crypto::{DecryptedDocument, EncryptedDocument},
26 errors::{LbErr, Unexpected},
27 file::ShareMode,
28 file_like::FileLike,
29 file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
30 filename::{DocumentType, NameComponents},
31 lazy::LazyTree,
32 signed_meta::SignedMeta,
33 staged::StagedTreeLikeMut,
34 svg::{self, buffer::u_transform_to_bezier, element::Element},
35 symkey, text,
36 tree_like::TreeLike,
37 validate,
38 },
39 service::events::{Actor, Event, SyncIncrement},
40};
41
42pub type Syncer = Arc<Mutex<SyncState>>;
43
44#[derive(Default)]
45pub struct SyncState {
46 last_synced: u64,
48
49 updates_as_of: u64,
51
52 remote_changes: Vec<SignedMeta>,
54
55 new_root: Option<Uuid>,
57
58 pulled_docs: Vec<Uuid>,
60}
61
62impl Lb {
69 #[instrument(level = "debug", skip(self), err(Debug))]
70 pub async fn sync(&self) -> LbResult<()> {
71 let mut sync_state = self.syncer.lock().await;
72 self.events.sync_update(SyncIncrement::SyncStarted);
73
74 let pipeline: LbResult<()> = async {
75 self.pull_updates(&mut sync_state).await?;
76 self.push_local_changes().await?;
77 Ok(())
78 }
79 .await;
80
81 self.events.sync_update(SyncIncrement::SyncFinished(
82 pipeline.as_ref().err().map(|err| err.kind.clone()),
83 ));
84
85 self.cleanup().await?;
86
87 pipeline?;
88
89 let account = self.get_account()?.clone();
90
91 #[cfg(not(target_family = "wasm"))]
92 if account.is_beta() {
93 self.send_debug_info(account).await;
94 }
95
96 Ok(())
97 }
98
99 pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
100 self.inital_sync_state(sync_state).await?;
101 self.process_deletions().await?;
102 self.fetch_meta(sync_state).await?;
103 self.fetch_required_docs(sync_state).await?;
104 self.merge(sync_state).await?;
106 self.commit_last_synced(sync_state).await?;
107 self.send_pull_events(sync_state).await?;
108
109 if !self.config.background_work {
110 self.populate_pk_cache().await?;
111 }
112
113 Ok(())
114 }
115
116 pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
117 self.push_meta().await?;
118 self.push_docs().await?;
119
120 Ok(())
121 }
122
123 async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
124 let tx = self.ro_tx().await;
125 let db = tx.db();
126
127 *state = Default::default();
128 state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
129
130 Ok(())
131 }
132
133 pub(crate) async fn process_deletions(&self) -> LbResult<()> {
134 let server_ids = self
135 .client
136 .request(self.get_account()?, GetFileIdsRequest {})
137 .await?
138 .ids;
139
140 let mut tx = self.begin_tx().await;
141 let db = tx.db();
142
143 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
144 let base_ids = local.tree.base.ids();
145
146 let mut prunable_ids = base_ids;
147 prunable_ids.retain(|id| !server_ids.contains(id));
148 for id in prunable_ids.clone() {
149 prunable_ids.extend(local.descendants(&id)?.into_iter());
150 }
151 for id in &prunable_ids {
152 if let Some(base_file) = local.tree.base.maybe_find(id) {
153 self.docs
154 .delete(*id, base_file.document_hmac().copied())
155 .await?;
156 }
157 if let Some(local_file) = local.maybe_find(id) {
158 self.docs
159 .delete(*id, local_file.document_hmac().copied())
160 .await?;
161 }
162 }
163
164 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
165 base_staged.tree.removed = prunable_ids.iter().copied().collect();
166 base_staged.promote()?;
167
168 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
169 local_staged.tree.removed = prunable_ids.iter().copied().collect();
170 local_staged.promote()?;
171
172 if !prunable_ids.is_empty() {
173 self.events.meta_changed(Actor::Sync);
174 }
175
176 Ok(())
177 }
178
179 async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
180 let updates = self
181 .client
182 .request(
183 self.get_account()?,
184 GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
185 )
186 .await?;
187
188 let tx = self.ro_tx().await;
189 let db = tx.db();
190
191 let mut without_orphans = Vec::new();
193 let me = Owner(self.keychain.get_pk()?);
194 let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
195 for id in remote.tree.staged.ids() {
196 let meta = remote.find(&id)?;
197 if remote.maybe_find_parent(meta).is_some()
198 || meta
199 .user_access_keys()
200 .iter()
201 .any(|k| k.encrypted_for == me.0)
202 {
203 without_orphans.push(remote.find(&id)?.clone());
204 }
205 }
206
207 let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
209 let (_, deduped_changes) = pruned_tree.unstage();
210
211 let mut root_id = None;
213 if db.root.get().is_none() {
214 let root = deduped_changes
215 .all_files()?
216 .into_iter()
217 .find(|f| f.is_root())
218 .ok_or(LbErrKind::RootNonexistent)?;
219 root_id = Some(*root.id());
220 }
221
222 state.remote_changes = deduped_changes;
223 state.updates_as_of = updates.as_of_metadata_version;
224 state.new_root = root_id;
225
226 Ok(())
227 }
228
229 async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
230 let mut docs_to_pull = vec![];
231
232 let tx = self.ro_tx().await;
233 let db = tx.db();
234
235 let mut files_with_local_edits = vec![];
236 let local = db.base_metadata.stage(&db.local_metadata);
237 for id in local.staged.ids() {
238 if let Some(base) = local.base.maybe_find(&id) {
239 if let Some(local_hmac) = local.find(&id)?.document_hmac() {
240 if Some(local_hmac) != base.document_hmac() {
241 files_with_local_edits.push(id);
242 println!("local edits found");
243 }
244 }
245 }
246 }
247
248 let mut remote = db
249 .base_metadata
250 .stage(state.remote_changes.clone())
251 .to_lazy();
252
253 for id in remote.tree.staged.ids() {
254 if remote.calculate_deleted(&id)? {
255 continue;
256 }
257 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
258 let base_hmac = remote
259 .tree
260 .base
261 .maybe_find(&id)
262 .and_then(|f| f.document_hmac())
263 .cloned();
264 if base_hmac == remote_hmac {
265 continue;
266 }
267
268 if let Some(remote_hmac) = remote_hmac {
269 if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
273 docs_to_pull.push((id, remote_hmac));
274 }
275
276 if files_with_local_edits.contains(&id)
279 && !docs_to_pull
280 .iter()
281 .any(|(already_pulling, _)| already_pulling == &id)
282 {
283 if let Some(base_hmac) = base_hmac {
284 if !self.docs.exists(id, Some(base_hmac)) {
285 docs_to_pull.push((id, base_hmac));
288 }
289 }
290 docs_to_pull.push((id, remote_hmac));
291 }
292 }
293 }
294 drop(tx);
295
296 let futures = docs_to_pull
297 .into_iter()
298 .map(|(id, hmac)| async move { self.fetch_doc(id, hmac).await.map(|_| id) });
299
300 let mut stream = stream::iter(futures).buffer_unordered(
301 thread::available_parallelism()
302 .unwrap_or(NonZeroUsize::new(4).unwrap())
303 .into(),
304 );
305
306 while let Some(fut) = stream.next().await {
307 let id = fut?;
308 state.pulled_docs.push(id);
309 }
310
311 Ok(())
312 }
313
314 pub(crate) async fn fetch_doc(
315 &self, id: Uuid, hmac: DocumentHmac,
316 ) -> LbResult<EncryptedDocument> {
317 if let Ok(Some(doc)) = self.docs.maybe_get(id, Some(hmac)).await {
322 return Ok(doc);
323 }
324
325 self.events
326 .sync_update(SyncIncrement::PullingDocument(id, true));
327 let remote_document = self
328 .client
329 .request(self.get_account()?, GetDocRequest { id, hmac })
330 .await?;
331 self.docs
332 .insert(id, Some(hmac), &remote_document.content)
333 .await?;
334 self.events
335 .sync_update(SyncIncrement::PullingDocument(id, false));
336
337 Ok(remote_document.content)
338 }
339
340 async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
343 let mut tx = self.begin_tx().await;
344 let db = tx.db();
345 let start = Instant::now();
346
347 let remote_changes = &state.remote_changes;
348
349 let me = Owner(self.keychain.get_pk()?);
351
352 let merge_changes = {
354 let mut base = (&db.base_metadata).to_lazy();
356 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
357 let mut remote = remote_unlazy.as_lazy();
358 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
359
360 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
362 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
363 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
364 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
365 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
366
367 'merge_construction: loop {
368 let mut deletions = {
370 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
371
372 let mut deletion_creations = HashSet::new();
374 for id in db.local_metadata.ids() {
375 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
376 deletion_creations.insert(id);
377 }
378 }
379 'drain_creations: while !deletion_creations.is_empty() {
380 'choose_a_creation: for id in &deletion_creations {
381 let id = *id;
383 let local_file = local.find(&id)?.clone();
384 let result = deletions.create_unvalidated(
385 id,
386 symkey::generate_key(),
387 local_file.parent(),
388 &local.name(&id, &self.keychain)?,
389 local_file.file_type(),
390 &self.keychain,
391 );
392 match result {
393 Ok(_) => {
394 deletion_creations.remove(&id);
395 continue 'drain_creations;
396 }
397 Err(ref err) => match err.kind {
398 LbErrKind::FileParentNonexistent => {
399 continue 'choose_a_creation;
400 }
401 _ => {
402 result?;
403 }
404 },
405 }
406 }
407 return Err(LbErrKind::Unexpected(format!(
408 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
409 ))
410 .into());
411 }
412
413 for id in db.local_metadata.ids() {
415 let local_file = local.find(&id)?.clone();
416 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
417 if !local_file.explicitly_deleted()
418 && local_file.parent() != base_file.parent()
419 && !files_to_unmove.contains(&id)
420 {
421 deletions.move_unvalidated(
423 &id,
424 local_file.parent(),
425 &self.keychain,
426 )?;
427 }
428 }
429 }
430
431 for id in db.local_metadata.ids() {
433 let local_file = local.find(&id)?.clone();
434 if local_file.explicitly_deleted() {
435 deletions.delete_unvalidated(&id, &self.keychain)?;
437 }
438 }
439 deletions
440 };
441
442 let mut merge = {
444 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
445
446 let mut creations = HashSet::new();
448 for id in db.local_metadata.ids() {
449 if deletions.maybe_find(&id).is_some()
450 && !deletions.calculate_deleted(&id)?
451 && remote.maybe_find(&id).is_none()
452 && !links_to_delete.contains(&id)
453 {
454 creations.insert(id);
455 }
456 }
457 'drain_creations: while !creations.is_empty() {
458 'choose_a_creation: for id in &creations {
459 let id = *id;
461 let local_file = local.find(&id)?.clone();
462 let result = merge.create_unvalidated(
463 id,
464 local.decrypt_key(&id, &self.keychain)?,
465 local_file.parent(),
466 &local.name(&id, &self.keychain)?,
467 local_file.file_type(),
468 &self.keychain,
469 );
470 match result {
471 Ok(_) => {
472 creations.remove(&id);
473 continue 'drain_creations;
474 }
475 Err(ref err) => match err.kind {
476 LbErrKind::FileParentNonexistent => {
477 continue 'choose_a_creation;
478 }
479 _ => {
480 result?;
481 }
482 },
483 }
484 }
485 return Err(LbErrKind::Unexpected(format!(
486 "sync failed to find a topomodelal order for file creations: {creations:?}"
487 ))
488 .into());
489 }
490
491 for id in db.local_metadata.ids() {
494 if deletions.maybe_find(&id).is_none()
496 || deletions.calculate_deleted(&id)?
497 || (remote.maybe_find(&id).is_some()
498 && remote.calculate_deleted(&id)?)
499 {
500 continue;
501 }
502
503 let local_file = local.find(&id)?.clone();
504 let local_name = local.name(&id, &self.keychain)?;
505 let maybe_base_file = base.maybe_find(&id).cloned();
506 let maybe_remote_file = remote.maybe_find(&id).cloned();
507 if let Some(ref base_file) = maybe_base_file {
508 let base_name = base.name(&id, &self.keychain)?;
509 let remote_file = remote.find(&id)?.clone();
510 let remote_name = remote.name(&id, &self.keychain)?;
511
512 if local_file.parent() != base_file.parent()
514 && remote_file.parent() == base_file.parent()
515 && !files_to_unmove.contains(&id)
516 {
517 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
518 }
519
520 if local_name != base_name && remote_name == base_name {
522 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
523 }
524 }
525
526 let mut remote_keys = HashMap::new();
528 if let Some(ref remote_file) = maybe_remote_file {
529 for key in remote_file.user_access_keys() {
530 remote_keys.insert(
531 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
532 (key.mode, key.deleted),
533 );
534 }
535 }
536 for key in local_file.user_access_keys() {
537 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
538 if let Some(&(remote_mode, remote_deleted)) =
539 remote_keys.get(&(by, for_))
540 {
541 if key.mode > remote_mode || !key.deleted && remote_deleted {
543 let mode = match key.mode {
544 UserAccessMode::Read => ShareMode::Read,
545 UserAccessMode::Write => ShareMode::Write,
546 UserAccessMode::Owner => continue,
547 };
548 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
549 }
550 if key.deleted && !remote_deleted {
552 merge.delete_share_unvalidated(
553 &id,
554 Some(for_.0),
555 &self.keychain,
556 )?;
557 }
558 } else {
559 let mode = match key.mode {
561 UserAccessMode::Read => ShareMode::Read,
562 UserAccessMode::Write => ShareMode::Write,
563 UserAccessMode::Owner => continue,
564 };
565 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
566 }
567 }
568
569 if files_to_unshare.contains(&id) {
571 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
572 }
573
574 if let Some(&rename_increment) = rename_increments.get(&id) {
576 let name = NameComponents::from(&local_name)
577 .generate_incremented(rename_increment)
578 .to_name();
579 merge.rename_unvalidated(&id, &name, &self.keychain)?;
580 }
581
582 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
584 let remote_hmac =
585 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
586 let local_hmac = local_file.document_hmac().cloned();
587 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
588 && local_hmac != base_hmac
589 {
590 if remote_hmac != base_hmac && remote_hmac != local_hmac {
591 let merge_name = merge.name(&id, &self.keychain)?;
593 let document_type =
594 DocumentType::from_file_name_using_extension(&merge_name);
595
596 let base_document =
599 self.read_document_helper(id, &mut base).await?;
600 let remote_document =
601 self.read_document_helper(id, &mut remote).await?;
602 let local_document =
603 self.read_document_helper(id, &mut local).await?;
604
605 match document_type {
606 DocumentType::Text => {
607 let base_document =
610 String::from_utf8_lossy(&base_document).to_string();
611 let remote_document =
612 String::from_utf8_lossy(&remote_document).to_string();
613 let local_document =
614 String::from_utf8_lossy(&local_document).to_string();
615 let merged_document =
616 text::buffer::Buffer::from(base_document.as_str())
617 .merge(local_document, remote_document);
618 let encrypted_document = merge
619 .update_document_unvalidated(
620 &id,
621 &merged_document.into_bytes(),
622 &self.keychain,
623 )?;
624 let hmac = merge.find(&id)?.document_hmac().copied();
625 self.docs.insert(id, hmac, &encrypted_document).await?;
626 }
627 DocumentType::Drawing => {
628 let base_document =
629 String::from_utf8_lossy(&base_document).to_string();
630 let remote_document =
631 String::from_utf8_lossy(&remote_document).to_string();
632 let local_document =
633 String::from_utf8_lossy(&local_document).to_string();
634
635 let base_buffer = svg::buffer::Buffer::new(&base_document);
636 let remote_buffer =
637 svg::buffer::Buffer::new(&remote_document);
638 let mut local_buffer =
639 svg::buffer::Buffer::new(&local_document);
640
641 for (_, el) in local_buffer.elements.iter_mut() {
642 if let Element::Path(path) = el {
643 path.data.apply_transform(u_transform_to_bezier(
644 &Transform::from(
645 local_buffer
646 .weak_viewport_settings
647 .master_transform,
648 ),
649 ));
650 }
651 }
652 svg::buffer::Buffer::reload(
653 &mut local_buffer.elements,
654 &mut local_buffer.weak_images,
655 &mut local_buffer.weak_path_pressures,
656 &mut local_buffer.weak_viewport_settings,
657 &base_buffer,
658 &remote_buffer,
659 );
660
661 let merged_document = local_buffer.serialize();
662 let encrypted_document = merge
663 .update_document_unvalidated(
664 &id,
665 &merged_document.into_bytes(),
666 &self.keychain,
667 )?;
668 let hmac = merge.find(&id)?.document_hmac().copied();
669 self.docs.insert(id, hmac, &encrypted_document).await?;
670 }
671 DocumentType::Other => {
672 let merge_parent = *merge.find(&id)?.parent();
674 let duplicate_id = if let Some(&duplicate_id) =
675 duplicate_file_ids.get(&id)
676 {
677 duplicate_id
678 } else {
679 let duplicate_id = Uuid::new_v4();
680 duplicate_file_ids.insert(id, duplicate_id);
681 rename_increments.insert(duplicate_id, 1);
682 duplicate_id
683 };
684
685 let mut merge_name = merge_name;
686 merge_name = NameComponents::from(&merge_name)
687 .generate_incremented(
688 rename_increments
689 .get(&duplicate_id)
690 .copied()
691 .unwrap_or_default(),
692 )
693 .to_name();
694
695 merge.create_unvalidated(
696 duplicate_id,
697 symkey::generate_key(),
698 &merge_parent,
699 &merge_name,
700 FileType::Document,
701 &self.keychain,
702 )?;
703 let encrypted_document = merge
704 .update_document_unvalidated(
705 &duplicate_id,
706 &local_document,
707 &self.keychain,
708 )?;
709 let duplicate_hmac =
710 merge.find(&duplicate_id)?.document_hmac().copied();
711 self.docs
712 .insert(
713 duplicate_id,
714 duplicate_hmac,
715 &encrypted_document,
716 )
717 .await?;
718 }
719 }
720 } else {
721 let document = self.read_document_helper(id, &mut local).await?;
723 merge.update_document_unvalidated(
724 &id,
725 &document,
726 &self.keychain,
727 )?;
728 }
729 }
730 }
731
732 for id in db.local_metadata.ids() {
735 if db.base_metadata.maybe_find(&id).is_some()
736 && deletions.calculate_deleted(&id)?
737 && !merge.calculate_deleted(&id)?
738 {
739 merge.delete_unvalidated(&id, &self.keychain)?;
741 }
742 }
743 for &id in &links_to_delete {
744 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
746 merge.delete_unvalidated(&id, &self.keychain)?;
747 }
748 }
749
750 merge
751 };
752
753 for link in merge.ids() {
755 if !merge.calculate_deleted(&link)? {
756 if let FileType::Link { target } = merge.find(&link)?.file_type() {
757 if merge.maybe_find(&target).is_some()
758 && merge.calculate_deleted(&target)?
759 {
760 if links_to_delete.insert(link) {
762 continue 'merge_construction;
763 } else {
764 return Err(LbErrKind::Unexpected(format!(
765 "sync failed to resolve broken link (deletion): {link:?}"
766 ))
767 .into());
768 }
769 }
770 }
771 }
772 }
773
774 let validate_result = merge.validate(me);
775 match validate_result {
776 Ok(_) => {
778 let (_, merge_changes) = merge.unstage();
779 break merge_changes;
780 }
781 Err(ref err) => match err.kind {
782 LbErrKind::Validation(ref vf) => match vf {
783 ValidationFailure::Cycle(ids) => {
785 let mut progress = false;
787 for &id in ids {
788 if db.local_metadata.maybe_find(&id).is_some()
789 && files_to_unmove.insert(id)
790 {
791 progress = true;
792 }
793 }
794 if !progress {
795 return Err(LbErrKind::Unexpected(format!(
796 "sync failed to resolve cycle: {ids:?}"
797 ))
798 .into());
799 }
800 }
801 ValidationFailure::PathConflict(ids) => {
802 let mut progress = false;
804 for &id in ids {
805 if duplicate_file_ids.values().any(|&dup| dup == id) {
806 *rename_increments.entry(id).or_insert(0) += 1;
807 progress = true;
808 break;
809 }
810 }
811 if !progress {
812 for &id in ids {
813 if db.local_metadata.maybe_find(&id).is_some() {
814 *rename_increments.entry(id).or_insert(0) += 1;
815 progress = true;
816 break;
817 }
818 }
819 }
820 if !progress {
821 return Err(LbErrKind::Unexpected(format!(
822 "sync failed to resolve path conflict: {ids:?}"
823 ))
824 .into());
825 }
826 }
827 ValidationFailure::SharedLink { link, shared_ancestor } => {
828 let mut progress = false;
830 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
831 {
832 if !base_shared_ancestor.is_shared()
833 && files_to_unshare.insert(*shared_ancestor)
834 {
835 progress = true;
836 }
837 }
838 if !progress && links_to_delete.insert(*link) {
839 progress = true;
840 }
841 if !progress {
842 return Err(LbErrKind::Unexpected(format!(
843 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
844 )).into());
845 }
846 }
847 ValidationFailure::DuplicateLink { target } => {
848 let mut progress = false;
850 if let Some(link) = local.linked_by(target)? {
851 if links_to_delete.insert(link) {
852 progress = true;
853 }
854 }
855 if !progress {
856 return Err(LbErrKind::Unexpected(format!(
857 "sync failed to resolve duplicate link: target: {target:?}"
858 ))
859 .into());
860 }
861 }
862 ValidationFailure::BrokenLink(link) => {
863 if !links_to_delete.insert(*link) {
865 return Err(LbErrKind::Unexpected(format!(
866 "sync failed to resolve broken link: {link:?}"
867 ))
868 .into());
869 }
870 }
871 ValidationFailure::OwnedLink(link) => {
872 let mut progress = false;
874 if let Some(remote_link) = remote.maybe_find(link) {
875 if let FileType::Link { target } = remote_link.file_type() {
876 let remote_target = remote.find(&target)?;
877 if remote_target.owner() != me
878 && files_to_unmove.insert(target)
879 {
880 progress = true;
881 }
882 }
883 }
884 if !progress && links_to_delete.insert(*link) {
885 progress = true;
886 }
887 if !progress {
888 return Err(LbErrKind::Unexpected(format!(
889 "sync failed to resolve owned link: {link:?}"
890 ))
891 .into());
892 }
893 }
894 ValidationFailure::Orphan(_)
896 | ValidationFailure::NonFolderWithChildren(_)
897 | ValidationFailure::FileWithDifferentOwnerParent(_)
898 | ValidationFailure::FileNameTooLong(_)
899 | ValidationFailure::DeletedFileUpdated(_)
900 | ValidationFailure::NonDecryptableFileName(_) => {
901 validate_result?;
902 }
903 },
904 _ => {
906 validate_result?;
907 }
908 },
909 }
910 }
911 };
912
913 (&mut db.base_metadata)
915 .to_staged(remote_changes.clone())
916 .to_lazy()
917 .promote()?;
918 db.local_metadata.clear()?;
919 (&mut db.local_metadata)
920 .to_staged(merge_changes)
921 .to_lazy()
922 .promote()?;
923
924 db.base_metadata.stage(&mut db.local_metadata).prune()?;
927
928 if start.elapsed() > web_time::Duration::from_millis(100) {
929 warn!("sync merge held lock for {:?}", start.elapsed());
930 }
931
932 Ok(())
933 }
934
935 async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
936 if state.new_root.is_some() {
937 self.events.signed_in();
938 }
939
940 if !state.remote_changes.is_empty() {
941 self.events.meta_changed(Actor::Sync);
942
943 let owner = Owner(self.keychain.get_pk()?);
944 if state.remote_changes.iter().any(|f| f.owner() != owner) {
945 self.events.pending_shares_changed();
946 }
947 }
948
949 for &doc in &state.pulled_docs {
950 self.events.doc_written(doc, Actor::Sync);
951 }
952
953 Ok(())
954 }
955
956 async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
957 let mut tx = self.begin_tx().await;
958 let db = tx.db();
959 db.last_synced.insert(state.updates_as_of as i64)?;
960
961 if let Some(root) = state.new_root {
962 db.root.insert(root)?;
963 }
964
965 Ok(())
966 }
967
968 async fn populate_pk_cache(&self) -> LbResult<()> {
969 let mut missing_owners = HashSet::new();
971 {
972 let tx = self.ro_tx().await;
973 let db = tx.db();
974 for file in db.base_metadata.get().values() {
975 for user_access_key in file.user_access_keys() {
976 let enc_by = Owner(user_access_key.encrypted_by);
977 let enc_for = Owner(user_access_key.encrypted_for);
978
979 if !db.pub_key_lookup.get().contains_key(&enc_by) {
980 missing_owners.insert(enc_by);
981 }
982
983 if !db.pub_key_lookup.get().contains_key(&enc_for) {
984 missing_owners.insert(enc_for);
985 }
986 }
987 }
988 }
989
990 let mut new_owners = HashMap::new();
991 {
992 for owner in missing_owners {
993 let username_result = self
994 .client
995 .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
996 .await;
997 new_owners.insert(owner, username_result);
998 }
999 }
1000
1001 let mut tx = self.begin_tx().await;
1002 let db = tx.db();
1003
1004 let have_updates = !new_owners.is_empty();
1005 for (owner, username) in new_owners {
1006 let username = match username {
1007 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1008 Ok(username) => username.username,
1009 _ => continue, };
1011
1012 db.pub_key_lookup.insert(owner, username).unwrap();
1013 }
1014
1015 if have_updates {
1016 self.events.meta_changed(Actor::Sync);
1017 }
1018
1019 Ok(())
1020 }
1021
1022 async fn push_meta(&self) -> LbResult<()> {
1024 let mut updates = vec![];
1025 let mut local_changes_no_digests = Vec::new();
1026
1027 let tx = self.ro_tx().await;
1028 let db = tx.db();
1029
1030 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1032
1033 for id in local.tree.staged.ids() {
1034 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1035 let maybe_base_file = local.tree.base.maybe_find(&id);
1036
1037 local_change.set_hmac_and_size(
1039 maybe_base_file.and_then(|f| f.document_hmac().copied()),
1040 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1041 );
1042 let local_change = local_change.sign(&self.keychain)?;
1043
1044 local_changes_no_digests.push(local_change.clone());
1045 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1046 updates.push(file_diff);
1047 }
1048
1049 drop(tx);
1050
1051 if !updates.is_empty() {
1052 self.client
1053 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1054 .await?;
1055 }
1056
1057 let mut tx = self.begin_tx().await;
1058 let db = tx.db();
1059
1060 (&mut db.base_metadata)
1062 .to_lazy()
1063 .stage(local_changes_no_digests)
1064 .promote()?;
1065 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1066
1067 tx.end();
1068
1069 Ok(())
1070 }
1071
1072 async fn push_docs(&self) -> LbResult<()> {
1077 let mut updates = vec![];
1078 let mut local_changes_digests_only = vec![];
1079
1080 let tx = self.ro_tx().await;
1081 let db = tx.db();
1082 let start = Instant::now();
1083
1084 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1085
1086 for id in local.tree.staged.ids() {
1087 let base_file = local.tree.base.find(&id)?.clone();
1088
1089 let mut local_change = base_file.timestamped_value.value.clone();
1091 local_change.set_hmac_and_size(
1092 local.find(&id)?.document_hmac().copied(),
1093 *local.find(&id)?.timestamped_value.value.doc_size(),
1094 );
1095
1096 if base_file.document_hmac() == local_change.document_hmac()
1097 || local_change.document_hmac().is_none()
1098 {
1099 continue;
1100 }
1101
1102 let local_change = local_change.sign(&self.keychain)?;
1103
1104 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1105 local_changes_digests_only.push(local_change);
1106 self.events
1107 .sync_update(SyncIncrement::PushingDocument(id, true));
1108 }
1109
1110 drop(tx);
1111 if start.elapsed() > web_time::Duration::from_millis(100) {
1112 warn!("sync push_docs held lock for {:?}", start.elapsed());
1113 }
1114
1115 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1116
1117 let mut stream = stream::iter(futures).buffer_unordered(
1118 thread::available_parallelism()
1119 .unwrap_or(NonZeroUsize::new(4).unwrap())
1120 .into(),
1121 );
1122
1123 let mut docs_without_errors = vec![];
1124 let mut last_error: Option<LbErr> = None;
1125
1126 while let Some(fut) = stream.next().await {
1127 match fut {
1128 Ok(id) => {
1129 docs_without_errors.push(id);
1130 self.events
1131 .sync_update(SyncIncrement::PushingDocument(id, false));
1132 }
1133 Err(err) => {
1134 last_error = Some(err);
1135 }
1136 }
1137 }
1138
1139 local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1140
1141 let mut tx = self.begin_tx().await;
1142 let db = tx.db();
1143 (&mut db.base_metadata)
1145 .to_lazy()
1146 .stage(local_changes_digests_only)
1147 .promote()?;
1148
1149 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1150
1151 tx.end();
1152
1153 if let Some(err) = last_error { Err(err) } else { Ok(()) }
1154 }
1155
1156 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1157 let id = *diff.new.id();
1158 let hmac = diff.new.document_hmac();
1159 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1160 self.client
1161 .request(
1162 self.get_account()?,
1163 ChangeDocRequestV2 { diff, new_content: local_document_change },
1164 )
1165 .await?;
1166
1167 Ok(id)
1168 }
1169
1170 #[cfg(not(target_family = "wasm"))]
1171 async fn send_debug_info(&self, account: Account) {
1172 let debug_info = self
1173 .debug_info("none provided - sync".to_string(), false)
1174 .await
1175 .unwrap();
1176
1177 if self.config.background_work {
1178 let bg_self = self.clone();
1179 tokio::spawn(async move {
1180 bg_self
1181 .client
1182 .request(&account, UpsertDebugInfoRequest { debug_info })
1183 .await
1184 .log_and_ignore();
1185 });
1186 } else {
1187 self.client
1188 .request(&account, UpsertDebugInfoRequest { debug_info })
1189 .await
1190 .log_and_ignore();
1191 }
1192 }
1193
1194 async fn read_document_helper<T>(
1195 &self, id: Uuid, tree: &mut LazyTree<T>,
1196 ) -> LbResult<DecryptedDocument>
1197 where
1198 T: TreeLike<F = SignedMeta>,
1199 {
1200 let file = tree.find(&id)?;
1201 validate::is_document(file)?;
1202 let hmac = file.document_hmac().copied();
1203
1204 if tree.calculate_deleted(&id)? {
1205 return Err(LbErrKind::FileNonexistent.into());
1206 }
1207
1208 let doc = match hmac {
1209 Some(hmac) => {
1210 let doc = self.docs.get(id, Some(hmac)).await?;
1211 tree.decrypt_document(&id, &doc, &self.keychain)?
1212 }
1213 None => vec![],
1214 };
1215
1216 Ok(doc)
1217 }
1218
1219 #[doc(hidden)]
1221 pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1222 let mut state = self.syncer.lock().await;
1223 self.inital_sync_state(&mut state).await?;
1224 self.process_deletions().await?;
1225 self.fetch_meta(&mut state).await?;
1226
1227 let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1228
1229 Ok(server_ids)
1230 }
1231
1232 pub(crate) fn setup_syncer(&self) {
1233 if self.config.background_work {
1234 self.clone().local_change_worker();
1235 self.clone().periodic_sync_worker();
1236 self.clone().post_sync_worker();
1237 }
1238 }
1239
1240 fn local_change_worker(self) {
1241 #[cfg(not(target_family = "wasm"))]
1242 tokio::spawn(async move {
1243 let mut events = self.subscribe();
1244
1245 let sync_criteria = |e: Event| {
1246 matches!(
1247 e,
1248 Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1249 )
1250 };
1251
1252 loop {
1253 let mut should_sync = false;
1254
1255 loop {
1257 let event = events.try_recv();
1258 match event {
1259 Ok(event) => {
1260 if sync_criteria(event) {
1261 should_sync = true;
1262 }
1263 }
1264 Err(TryRecvError::Empty) => break,
1265 _ => {
1266 panic!(
1267 "unexpected broadcast receive error, returning local_change_worker"
1268 );
1269 }
1270 }
1271 }
1272
1273 if !should_sync {
1276 let event = events.recv().await.unwrap();
1277 if sync_criteria(event) {
1278 self.sync().await.map_unexpected().log_and_ignore();
1279 } else {
1280 continue;
1281 }
1282 }
1283 }
1284 });
1285 }
1286
1287 fn periodic_sync_worker(self) {
1288 #[cfg(not(target_family = "wasm"))]
1289 tokio::spawn(async move {
1290 loop {
1291 self.sync().await.map_unexpected().log_and_ignore();
1292 if self.user_active().await {
1293 tokio::time::sleep(Duration::from_secs(3)).await;
1294 } else {
1295 tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1296 }
1297 }
1298 });
1299 }
1300
1301 async fn user_active(&self) -> bool {
1302 let last_seen = self.user_last_seen.read().await;
1303 last_seen.elapsed() < Duration::from_secs(3 * 60)
1304 }
1305
1306 fn post_sync_worker(self) {
1307 #[cfg(not(target_family = "wasm"))]
1308 tokio::spawn(async move {
1309 let mut events = self.subscribe();
1310
1311 loop {
1312 let event = events.recv().await.unwrap();
1313 if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1314 self.fetcher().await.map_unexpected().log_and_ignore();
1315 self.populate_pk_cache()
1316 .await
1317 .map_unexpected()
1318 .log_and_ignore();
1319 };
1320 }
1321 });
1322 }
1323
1324 async fn fetcher(&self) -> LbResult<()> {
1325 let mut files_to_pull = vec![];
1326
1327 let tx = self.ro_tx().await;
1328 let db = tx.db();
1329
1330 let Some(root) = db.root.get() else {
1331 return Ok(());
1332 };
1333
1334 let mut tree = db.base_metadata.stage(None).to_lazy();
1336
1337 for id in tree.descendants_using_links(root)? {
1338 let file = tree.find(&id)?;
1339 let hmac = file.document_hmac().copied();
1340
1341 if !file.is_document() {
1343 continue;
1344 }
1345
1346 if tree.calculate_deleted(&id)? {
1348 continue;
1349 }
1350
1351 let name = tree.name(&id, &self.keychain)?;
1353 if !name.ends_with(".md") && !name.ends_with(".svg") {
1354 continue;
1355 }
1356
1357 files_to_pull.push((id, hmac));
1358 }
1359
1360 drop(tx);
1361
1362 for (id, hmac) in files_to_pull {
1366 if let Some(hmac) = hmac {
1367 self.fetch_doc(id, hmac).await?;
1368 }
1369 }
1370
1371 Ok(())
1372 }
1373}