1use std::{
2 collections::{HashMap, HashSet},
3 num::NonZeroUsize,
4 sync::Arc,
5 thread,
6 time::{Duration, Instant},
7};
8
9use futures::{StreamExt, stream};
10use tokio::sync::{Mutex, broadcast::error::TryRecvError};
11use tokio::time;
12use usvg::Transform;
13use uuid::Uuid;
14
15use crate::{
16 Lb, LbErrKind, LbResult,
17 io::network::ApiError,
18 model::{
19 ValidationFailure,
20 access_info::UserAccessMode,
21 account::Account,
22 api::{
23 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
24 GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest, UpsertRequestV2,
25 },
26 crypto::{DecryptedDocument, EncryptedDocument},
27 errors::{LbErr, Unexpected},
28 file::ShareMode,
29 file_like::FileLike,
30 file_metadata::{DocumentHmac, FileDiff, FileType, Owner},
31 filename::{DocumentType, NameComponents},
32 lazy::LazyTree,
33 signed_meta::SignedMeta,
34 staged::StagedTreeLikeMut,
35 svg::{self, buffer::u_transform_to_bezier, element::Element},
36 symkey, text,
37 tree_like::TreeLike,
38 validate,
39 },
40 service::events::{Actor, Event, SyncIncrement},
41};
42
43pub type Syncer = Arc<Mutex<SyncState>>;
44
45#[derive(Default)]
46pub struct SyncState {
47 last_synced: u64,
49
50 updates_as_of: u64,
52
53 remote_changes: Vec<SignedMeta>,
55
56 new_root: Option<Uuid>,
58
59 pulled_docs: Vec<Uuid>,
61}
62
63impl Lb {
70 #[instrument(level = "debug", skip(self), err(Debug))]
71 pub async fn sync(&self) -> LbResult<()> {
72 let mut sync_state = self.syncer.lock().await;
73 self.events.sync_update(SyncIncrement::SyncStarted);
74
75 let pipeline: LbResult<()> = async {
76 self.pull_updates(&mut sync_state).await?;
77 self.push_local_changes().await?;
78 Ok(())
79 }
80 .await;
81
82 self.events.sync_update(SyncIncrement::SyncFinished(
83 pipeline.as_ref().err().map(|err| err.kind.clone()),
84 ));
85
86 self.cleanup().await?;
87
88 pipeline?;
89
90 let account = self.get_account()?.clone();
91
92 #[cfg(not(target_family = "wasm"))]
93 if account.is_beta() {
94 self.send_debug_info(account).await;
95 }
96
97 Ok(())
98 }
99
100 pub(crate) async fn pull_updates(&self, sync_state: &mut SyncState) -> LbResult<()> {
101 self.inital_sync_state(sync_state).await?;
102 self.process_deletions().await?;
103 self.fetch_meta(sync_state).await?;
104 self.fetch_required_docs(sync_state).await?;
105 self.merge(sync_state).await?;
107 self.commit_last_synced(sync_state).await?;
108 self.send_pull_events(sync_state).await?;
109
110 if !self.config.background_work {
111 self.populate_pk_cache().await?;
112 }
113
114 Ok(())
115 }
116
117 pub(crate) async fn push_local_changes(&self) -> LbResult<()> {
118 self.push_meta().await?;
119 self.push_docs().await?;
120
121 Ok(())
122 }
123
124 async fn inital_sync_state(&self, state: &mut SyncState) -> LbResult<()> {
125 let tx = self.ro_tx().await;
126 let db = tx.db();
127
128 *state = Default::default();
129 state.last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
130
131 Ok(())
132 }
133
134 pub(crate) async fn process_deletions(&self) -> LbResult<()> {
135 let server_ids = self
136 .client
137 .request(self.get_account()?, GetFileIdsRequest {})
138 .await?
139 .ids;
140
141 let mut tx = self.begin_tx().await;
142 let db = tx.db();
143
144 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
145 let base_ids = local.tree.base.ids();
146
147 let mut prunable_ids = base_ids;
148 prunable_ids.retain(|id| !server_ids.contains(id));
149 for id in prunable_ids.clone() {
150 prunable_ids.extend(local.descendants(&id)?.into_iter());
151 }
152 for id in &prunable_ids {
153 if let Some(base_file) = local.tree.base.maybe_find(id) {
154 self.docs
155 .delete(*id, base_file.document_hmac().copied())
156 .await?;
157 }
158 if let Some(local_file) = local.maybe_find(id) {
159 self.docs
160 .delete(*id, local_file.document_hmac().copied())
161 .await?;
162 }
163 }
164
165 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
166 base_staged.tree.removed = prunable_ids.iter().copied().collect();
167 base_staged.promote()?;
168
169 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
170 local_staged.tree.removed = prunable_ids.iter().copied().collect();
171 local_staged.promote()?;
172
173 if !prunable_ids.is_empty() {
174 self.events.meta_changed(Actor::Sync);
175 }
176
177 Ok(())
178 }
179
180 async fn fetch_meta(&self, state: &mut SyncState) -> LbResult<()> {
181 let updates = self
182 .client
183 .request(
184 self.get_account()?,
185 GetUpdatesRequestV2 { since_metadata_version: state.last_synced },
186 )
187 .await?;
188
189 let tx = self.ro_tx().await;
190 let db = tx.db();
191
192 let mut without_orphans = Vec::new();
194 let me = Owner(self.keychain.get_pk()?);
195 let remote = db.base_metadata.stage(updates.file_metadata).to_lazy();
196 for id in remote.tree.staged.ids() {
197 let meta = remote.find(&id)?;
198 if remote.maybe_find_parent(meta).is_some()
199 || meta
200 .user_access_keys()
201 .iter()
202 .any(|k| k.encrypted_for == me.0)
203 {
204 without_orphans.push(remote.find(&id)?.clone());
205 }
206 }
207
208 let pruned_tree = db.base_metadata.stage(without_orphans).pruned()?.to_lazy();
210 let (_, deduped_changes) = pruned_tree.unstage();
211
212 let mut root_id = None;
214 if db.root.get().is_none() {
215 let root = deduped_changes
216 .all_files()?
217 .into_iter()
218 .find(|f| f.is_root())
219 .ok_or(LbErrKind::RootNonexistent)?;
220 root_id = Some(*root.id());
221 }
222
223 state.remote_changes = deduped_changes;
224 state.updates_as_of = updates.as_of_metadata_version;
225 state.new_root = root_id;
226
227 Ok(())
228 }
229
230 async fn fetch_required_docs(&self, state: &mut SyncState) -> LbResult<()> {
231 let mut docs_to_pull = vec![];
232
233 let tx = self.ro_tx().await;
234 let db = tx.db();
235
236 let mut files_with_local_edits = vec![];
237 let local = db.base_metadata.stage(&db.local_metadata);
238 for id in local.staged.ids() {
239 if let Some(base) = local.base.maybe_find(&id) {
240 if let Some(local_hmac) = local.find(&id)?.document_hmac() {
241 if Some(local_hmac) != base.document_hmac() {
242 files_with_local_edits.push(id);
243 println!("local edits found");
244 }
245 }
246 }
247 }
248
249 let mut remote = db
250 .base_metadata
251 .stage(state.remote_changes.clone())
252 .to_lazy();
253
254 for id in remote.tree.staged.ids() {
255 if remote.calculate_deleted(&id)? {
256 continue;
257 }
258 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
259 let base_hmac = remote
260 .tree
261 .base
262 .maybe_find(&id)
263 .and_then(|f| f.document_hmac())
264 .cloned();
265 if base_hmac == remote_hmac {
266 continue;
267 }
268
269 if let Some(remote_hmac) = remote_hmac {
270 if self.docs.exists(id, base_hmac) && !self.docs.exists(id, Some(remote_hmac)) {
274 docs_to_pull.push((id, remote_hmac));
275 }
276
277 if files_with_local_edits.contains(&id)
280 && !docs_to_pull
281 .iter()
282 .any(|(already_pulling, _)| already_pulling == &id)
283 {
284 if let Some(base_hmac) = base_hmac {
285 if !self.docs.exists(id, Some(base_hmac)) {
286 docs_to_pull.push((id, base_hmac));
289 }
290 }
291 docs_to_pull.push((id, remote_hmac));
292 }
293 }
294 }
295 drop(tx);
296
297 let futures = docs_to_pull
298 .into_iter()
299 .map(|(id, hmac)| async move { self.fetch_doc(id, hmac).await.map(|_| id) });
300
301 let mut stream = stream::iter(futures).buffer_unordered(
302 thread::available_parallelism()
303 .unwrap_or(NonZeroUsize::new(4).unwrap())
304 .into(),
305 );
306
307 while let Some(fut) = stream.next().await {
308 let id = fut?;
309 state.pulled_docs.push(id);
310 }
311
312 Ok(())
313 }
314
315 pub(crate) async fn fetch_doc(
316 &self, id: Uuid, hmac: DocumentHmac,
317 ) -> LbResult<EncryptedDocument> {
318 if let Ok(Some(doc)) = self.docs.maybe_get(id, Some(hmac)).await {
323 return Ok(doc);
324 }
325
326 self.events
327 .sync_update(SyncIncrement::PullingDocument(id, true));
328 let remote_document = self
329 .client
330 .request(self.get_account()?, GetDocRequest { id, hmac })
331 .await?;
332 self.docs
333 .insert(id, Some(hmac), &remote_document.content)
334 .await?;
335 self.events
336 .sync_update(SyncIncrement::PullingDocument(id, false));
337
338 Ok(remote_document.content)
339 }
340
341 async fn merge(&self, state: &mut SyncState) -> LbResult<()> {
344 let mut tx = self.begin_tx().await;
345 let db = tx.db();
346 let start = Instant::now();
347
348 let remote_changes = &state.remote_changes;
349
350 let me = Owner(self.keychain.get_pk()?);
352
353 let merge_changes = {
355 let mut base = (&db.base_metadata).to_lazy();
357 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
358 let mut remote = remote_unlazy.as_lazy();
359 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
360
361 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
363 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
364 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
365 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
366 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
367
368 'merge_construction: loop {
369 let mut deletions = {
371 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
372
373 let mut deletion_creations = HashSet::new();
375 for id in db.local_metadata.ids() {
376 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
377 deletion_creations.insert(id);
378 }
379 }
380 'drain_creations: while !deletion_creations.is_empty() {
381 'choose_a_creation: for id in &deletion_creations {
382 let id = *id;
384 let local_file = local.find(&id)?.clone();
385 let result = deletions.create_unvalidated(
386 id,
387 symkey::generate_key(),
388 local_file.parent(),
389 &local.name(&id, &self.keychain)?,
390 local_file.file_type(),
391 &self.keychain,
392 );
393 match result {
394 Ok(_) => {
395 deletion_creations.remove(&id);
396 continue 'drain_creations;
397 }
398 Err(ref err) => match err.kind {
399 LbErrKind::FileParentNonexistent => {
400 continue 'choose_a_creation;
401 }
402 _ => {
403 result?;
404 }
405 },
406 }
407 }
408 return Err(LbErrKind::Unexpected(format!(
409 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
410 ))
411 .into());
412 }
413
414 for id in db.local_metadata.ids() {
416 let local_file = local.find(&id)?.clone();
417 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
418 if !local_file.explicitly_deleted()
419 && local_file.parent() != base_file.parent()
420 && !files_to_unmove.contains(&id)
421 {
422 deletions.move_unvalidated(
424 &id,
425 local_file.parent(),
426 &self.keychain,
427 )?;
428 }
429 }
430 }
431
432 for id in db.local_metadata.ids() {
434 let local_file = local.find(&id)?.clone();
435 if local_file.explicitly_deleted() {
436 deletions.delete_unvalidated(&id, &self.keychain)?;
438 }
439 }
440 deletions
441 };
442
443 let mut merge = {
445 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
446
447 let mut creations = HashSet::new();
449 for id in db.local_metadata.ids() {
450 if deletions.maybe_find(&id).is_some()
451 && !deletions.calculate_deleted(&id)?
452 && remote.maybe_find(&id).is_none()
453 && !links_to_delete.contains(&id)
454 {
455 creations.insert(id);
456 }
457 }
458 'drain_creations: while !creations.is_empty() {
459 'choose_a_creation: for id in &creations {
460 let id = *id;
462 let local_file = local.find(&id)?.clone();
463 let result = merge.create_unvalidated(
464 id,
465 local.decrypt_key(&id, &self.keychain)?,
466 local_file.parent(),
467 &local.name(&id, &self.keychain)?,
468 local_file.file_type(),
469 &self.keychain,
470 );
471 match result {
472 Ok(_) => {
473 creations.remove(&id);
474 continue 'drain_creations;
475 }
476 Err(ref err) => match err.kind {
477 LbErrKind::FileParentNonexistent => {
478 continue 'choose_a_creation;
479 }
480 _ => {
481 result?;
482 }
483 },
484 }
485 }
486 return Err(LbErrKind::Unexpected(format!(
487 "sync failed to find a topomodelal order for file creations: {creations:?}"
488 ))
489 .into());
490 }
491
492 for id in db.local_metadata.ids() {
495 if deletions.maybe_find(&id).is_none()
497 || deletions.calculate_deleted(&id)?
498 || (remote.maybe_find(&id).is_some()
499 && remote.calculate_deleted(&id)?)
500 {
501 continue;
502 }
503
504 let local_file = local.find(&id)?.clone();
505 let local_name = local.name(&id, &self.keychain)?;
506 let maybe_base_file = base.maybe_find(&id).cloned();
507 let maybe_remote_file = remote.maybe_find(&id).cloned();
508 if let Some(ref base_file) = maybe_base_file {
509 let base_name = base.name(&id, &self.keychain)?;
510 let remote_file = remote.find(&id)?.clone();
511 let remote_name = remote.name(&id, &self.keychain)?;
512
513 if local_file.parent() != base_file.parent()
515 && remote_file.parent() == base_file.parent()
516 && !files_to_unmove.contains(&id)
517 {
518 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
519 }
520
521 if local_name != base_name && remote_name == base_name {
523 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
524 }
525 }
526
527 let mut remote_keys = HashMap::new();
529 if let Some(ref remote_file) = maybe_remote_file {
530 for key in remote_file.user_access_keys() {
531 remote_keys.insert(
532 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
533 (key.mode, key.deleted),
534 );
535 }
536 }
537 for key in local_file.user_access_keys() {
538 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
539 if let Some(&(remote_mode, remote_deleted)) =
540 remote_keys.get(&(by, for_))
541 {
542 if key.mode > remote_mode || !key.deleted && remote_deleted {
544 let mode = match key.mode {
545 UserAccessMode::Read => ShareMode::Read,
546 UserAccessMode::Write => ShareMode::Write,
547 UserAccessMode::Owner => continue,
548 };
549 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
550 }
551 if key.deleted && !remote_deleted {
553 merge.delete_share_unvalidated(
554 &id,
555 Some(for_.0),
556 &self.keychain,
557 )?;
558 }
559 } else {
560 let mode = match key.mode {
562 UserAccessMode::Read => ShareMode::Read,
563 UserAccessMode::Write => ShareMode::Write,
564 UserAccessMode::Owner => continue,
565 };
566 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
567 }
568 }
569
570 if files_to_unshare.contains(&id) {
572 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
573 }
574
575 if let Some(&rename_increment) = rename_increments.get(&id) {
577 let name = NameComponents::from(&local_name)
578 .generate_incremented(rename_increment)
579 .to_name();
580 merge.rename_unvalidated(&id, &name, &self.keychain)?;
581 }
582
583 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
585 let remote_hmac =
586 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
587 let local_hmac = local_file.document_hmac().cloned();
588 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
589 && local_hmac != base_hmac
590 {
591 if remote_hmac != base_hmac && remote_hmac != local_hmac {
592 let merge_name = merge.name(&id, &self.keychain)?;
594 let document_type =
595 DocumentType::from_file_name_using_extension(&merge_name);
596
597 let base_document =
600 self.read_document_helper(id, &mut base).await?;
601 let remote_document =
602 self.read_document_helper(id, &mut remote).await?;
603 let local_document =
604 self.read_document_helper(id, &mut local).await?;
605
606 match document_type {
607 DocumentType::Text => {
608 let base_document =
611 String::from_utf8_lossy(&base_document).to_string();
612 let remote_document =
613 String::from_utf8_lossy(&remote_document).to_string();
614 let local_document =
615 String::from_utf8_lossy(&local_document).to_string();
616 let merged_document =
617 text::buffer::Buffer::from(base_document.as_str())
618 .merge(local_document, remote_document);
619 let encrypted_document = merge
620 .update_document_unvalidated(
621 &id,
622 &merged_document.into_bytes(),
623 &self.keychain,
624 )?;
625 let hmac = merge.find(&id)?.document_hmac().copied();
626 self.docs.insert(id, hmac, &encrypted_document).await?;
627 }
628 DocumentType::Drawing => {
629 let base_document =
630 String::from_utf8_lossy(&base_document).to_string();
631 let remote_document =
632 String::from_utf8_lossy(&remote_document).to_string();
633 let local_document =
634 String::from_utf8_lossy(&local_document).to_string();
635
636 let base_buffer = svg::buffer::Buffer::new(&base_document);
637 let remote_buffer =
638 svg::buffer::Buffer::new(&remote_document);
639 let mut local_buffer =
640 svg::buffer::Buffer::new(&local_document);
641
642 for (_, el) in local_buffer.elements.iter_mut() {
643 if let Element::Path(path) = el {
644 path.data.apply_transform(u_transform_to_bezier(
645 &Transform::from(
646 local_buffer
647 .weak_viewport_settings
648 .master_transform,
649 ),
650 ));
651 }
652 }
653 svg::buffer::Buffer::reload(
654 &mut local_buffer.elements,
655 &mut local_buffer.weak_images,
656 &mut local_buffer.weak_path_pressures,
657 &mut local_buffer.weak_viewport_settings,
658 &base_buffer,
659 &remote_buffer,
660 );
661
662 let merged_document = local_buffer.serialize();
663 let encrypted_document = merge
664 .update_document_unvalidated(
665 &id,
666 &merged_document.into_bytes(),
667 &self.keychain,
668 )?;
669 let hmac = merge.find(&id)?.document_hmac().copied();
670 self.docs.insert(id, hmac, &encrypted_document).await?;
671 }
672 DocumentType::Other => {
673 let merge_parent = *merge.find(&id)?.parent();
675 let duplicate_id = if let Some(&duplicate_id) =
676 duplicate_file_ids.get(&id)
677 {
678 duplicate_id
679 } else {
680 let duplicate_id = Uuid::new_v4();
681 duplicate_file_ids.insert(id, duplicate_id);
682 rename_increments.insert(duplicate_id, 1);
683 duplicate_id
684 };
685
686 let mut merge_name = merge_name;
687 merge_name = NameComponents::from(&merge_name)
688 .generate_incremented(
689 rename_increments
690 .get(&duplicate_id)
691 .copied()
692 .unwrap_or_default(),
693 )
694 .to_name();
695
696 merge.create_unvalidated(
697 duplicate_id,
698 symkey::generate_key(),
699 &merge_parent,
700 &merge_name,
701 FileType::Document,
702 &self.keychain,
703 )?;
704 let encrypted_document = merge
705 .update_document_unvalidated(
706 &duplicate_id,
707 &local_document,
708 &self.keychain,
709 )?;
710 let duplicate_hmac =
711 merge.find(&duplicate_id)?.document_hmac().copied();
712 self.docs
713 .insert(
714 duplicate_id,
715 duplicate_hmac,
716 &encrypted_document,
717 )
718 .await?;
719 }
720 }
721 } else {
722 let document = self.read_document_helper(id, &mut local).await?;
724 merge.update_document_unvalidated(
725 &id,
726 &document,
727 &self.keychain,
728 )?;
729 }
730 }
731 }
732
733 for id in db.local_metadata.ids() {
736 if db.base_metadata.maybe_find(&id).is_some()
737 && deletions.calculate_deleted(&id)?
738 && !merge.calculate_deleted(&id)?
739 {
740 merge.delete_unvalidated(&id, &self.keychain)?;
742 }
743 }
744 for &id in &links_to_delete {
745 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
747 merge.delete_unvalidated(&id, &self.keychain)?;
748 }
749 }
750
751 merge
752 };
753
754 for link in merge.ids() {
756 if !merge.calculate_deleted(&link)? {
757 if let FileType::Link { target } = merge.find(&link)?.file_type() {
758 if merge.maybe_find(&target).is_some()
759 && merge.calculate_deleted(&target)?
760 {
761 if links_to_delete.insert(link) {
763 continue 'merge_construction;
764 } else {
765 return Err(LbErrKind::Unexpected(format!(
766 "sync failed to resolve broken link (deletion): {link:?}"
767 ))
768 .into());
769 }
770 }
771 }
772 }
773 }
774
775 let validate_result = merge.validate(me);
776 match validate_result {
777 Ok(_) => {
779 let (_, merge_changes) = merge.unstage();
780 break merge_changes;
781 }
782 Err(ref err) => match err.kind {
783 LbErrKind::Validation(ref vf) => match vf {
784 ValidationFailure::Cycle(ids) => {
786 let mut progress = false;
788 for &id in ids {
789 if db.local_metadata.maybe_find(&id).is_some()
790 && files_to_unmove.insert(id)
791 {
792 progress = true;
793 }
794 }
795 if !progress {
796 return Err(LbErrKind::Unexpected(format!(
797 "sync failed to resolve cycle: {ids:?}"
798 ))
799 .into());
800 }
801 }
802 ValidationFailure::PathConflict(ids) => {
803 let mut progress = false;
805 for &id in ids {
806 if duplicate_file_ids.values().any(|&dup| dup == id) {
807 *rename_increments.entry(id).or_insert(0) += 1;
808 progress = true;
809 break;
810 }
811 }
812 if !progress {
813 for &id in ids {
814 if db.local_metadata.maybe_find(&id).is_some() {
815 *rename_increments.entry(id).or_insert(0) += 1;
816 progress = true;
817 break;
818 }
819 }
820 }
821 if !progress {
822 return Err(LbErrKind::Unexpected(format!(
823 "sync failed to resolve path conflict: {ids:?}"
824 ))
825 .into());
826 }
827 }
828 ValidationFailure::SharedLink { link, shared_ancestor } => {
829 let mut progress = false;
831 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
832 {
833 if !base_shared_ancestor.is_shared()
834 && files_to_unshare.insert(*shared_ancestor)
835 {
836 progress = true;
837 }
838 }
839 if !progress && links_to_delete.insert(*link) {
840 progress = true;
841 }
842 if !progress {
843 return Err(LbErrKind::Unexpected(format!(
844 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
845 )).into());
846 }
847 }
848 ValidationFailure::DuplicateLink { target } => {
849 let mut progress = false;
851 if let Some(link) = local.linked_by(target)? {
852 if links_to_delete.insert(link) {
853 progress = true;
854 }
855 }
856 if !progress {
857 return Err(LbErrKind::Unexpected(format!(
858 "sync failed to resolve duplicate link: target: {target:?}"
859 ))
860 .into());
861 }
862 }
863 ValidationFailure::BrokenLink(link) => {
864 if !links_to_delete.insert(*link) {
866 return Err(LbErrKind::Unexpected(format!(
867 "sync failed to resolve broken link: {link:?}"
868 ))
869 .into());
870 }
871 }
872 ValidationFailure::OwnedLink(link) => {
873 let mut progress = false;
875 if let Some(remote_link) = remote.maybe_find(link) {
876 if let FileType::Link { target } = remote_link.file_type() {
877 let remote_target = remote.find(&target)?;
878 if remote_target.owner() != me
879 && files_to_unmove.insert(target)
880 {
881 progress = true;
882 }
883 }
884 }
885 if !progress && links_to_delete.insert(*link) {
886 progress = true;
887 }
888 if !progress {
889 return Err(LbErrKind::Unexpected(format!(
890 "sync failed to resolve owned link: {link:?}"
891 ))
892 .into());
893 }
894 }
895 ValidationFailure::Orphan(_)
897 | ValidationFailure::NonFolderWithChildren(_)
898 | ValidationFailure::FileWithDifferentOwnerParent(_)
899 | ValidationFailure::FileNameTooLong(_)
900 | ValidationFailure::DeletedFileUpdated(_)
901 | ValidationFailure::NonDecryptableFileName(_) => {
902 validate_result?;
903 }
904 },
905 _ => {
907 validate_result?;
908 }
909 },
910 }
911 }
912 };
913
914 (&mut db.base_metadata)
916 .to_staged(remote_changes.clone())
917 .to_lazy()
918 .promote()?;
919 db.local_metadata.clear()?;
920 (&mut db.local_metadata)
921 .to_staged(merge_changes)
922 .to_lazy()
923 .promote()?;
924
925 db.base_metadata.stage(&mut db.local_metadata).prune()?;
928
929 if start.elapsed() > web_time::Duration::from_millis(100) {
930 warn!("sync merge held lock for {:?}", start.elapsed());
931 }
932
933 Ok(())
934 }
935
936 async fn send_pull_events(&self, state: &mut SyncState) -> LbResult<()> {
937 if state.new_root.is_some() {
938 self.events.signed_in();
939 }
940
941 if !state.remote_changes.is_empty() {
942 self.events.meta_changed(Actor::Sync);
943
944 let owner = Owner(self.keychain.get_pk()?);
945 if state.remote_changes.iter().any(|f| f.owner() != owner) {
946 self.events.pending_shares_changed();
947 }
948 }
949
950 for &doc in &state.pulled_docs {
951 self.events.doc_written(doc, Actor::Sync);
952 }
953
954 Ok(())
955 }
956
957 async fn commit_last_synced(&self, state: &mut SyncState) -> LbResult<()> {
958 let mut tx = self.begin_tx().await;
959 let db = tx.db();
960 db.last_synced.insert(state.updates_as_of as i64)?;
961
962 if let Some(root) = state.new_root {
963 db.root.insert(root)?;
964 }
965
966 Ok(())
967 }
968
969 async fn populate_pk_cache(&self) -> LbResult<()> {
970 let mut missing_owners = HashSet::new();
972 {
973 let tx = self.ro_tx().await;
974 let db = tx.db();
975 for file in db.base_metadata.get().values() {
976 for user_access_key in file.user_access_keys() {
977 let enc_by = Owner(user_access_key.encrypted_by);
978 let enc_for = Owner(user_access_key.encrypted_for);
979
980 if !db.pub_key_lookup.get().contains_key(&enc_by) {
981 missing_owners.insert(enc_by);
982 }
983
984 if !db.pub_key_lookup.get().contains_key(&enc_for) {
985 missing_owners.insert(enc_for);
986 }
987 }
988 }
989 }
990
991 let mut new_owners = HashMap::new();
992 {
993 for owner in missing_owners {
994 let username_result = self
995 .client
996 .request(self.get_account().unwrap(), GetUsernameRequest { key: owner.0 })
997 .await;
998 new_owners.insert(owner, username_result);
999 }
1000 }
1001
1002 let mut tx = self.begin_tx().await;
1003 let db = tx.db();
1004
1005 let have_updates = !new_owners.is_empty();
1006 for (owner, username) in new_owners {
1007 let username = match username {
1008 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => "<unknown>".to_string(),
1009 Ok(username) => username.username,
1010 _ => continue, };
1012
1013 db.pub_key_lookup.insert(owner, username).unwrap();
1014 }
1015
1016 if have_updates {
1017 self.events.meta_changed(Actor::Sync);
1018 }
1019
1020 Ok(())
1021 }
1022
1023 async fn push_meta(&self) -> LbResult<()> {
1025 let mut updates = vec![];
1026 let mut local_changes_no_digests = Vec::new();
1027
1028 let tx = self.ro_tx().await;
1029 let db = tx.db();
1030
1031 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1033
1034 for id in local.tree.staged.ids() {
1035 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
1036 let maybe_base_file = local.tree.base.maybe_find(&id);
1037
1038 local_change.set_hmac_and_size(
1040 maybe_base_file.and_then(|f| f.document_hmac().copied()),
1041 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
1042 );
1043 let local_change = local_change.sign(&self.keychain)?;
1044
1045 local_changes_no_digests.push(local_change.clone());
1046 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
1047 updates.push(file_diff);
1048 }
1049
1050 drop(tx);
1051
1052 if !updates.is_empty() {
1053 self.client
1054 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
1055 .await?;
1056 }
1057
1058 let mut tx = self.begin_tx().await;
1059 let db = tx.db();
1060
1061 (&mut db.base_metadata)
1063 .to_lazy()
1064 .stage(local_changes_no_digests)
1065 .promote()?;
1066 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1067
1068 tx.end();
1069
1070 Ok(())
1071 }
1072
1073 async fn push_docs(&self) -> LbResult<()> {
1078 let mut updates = vec![];
1079 let mut local_changes_digests_only = vec![];
1080
1081 let tx = self.ro_tx().await;
1082 let db = tx.db();
1083 let start = Instant::now();
1084
1085 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1086
1087 for id in local.tree.staged.ids() {
1088 let base_file = local.tree.base.find(&id)?.clone();
1089
1090 let mut local_change = base_file.timestamped_value.value.clone();
1092 local_change.set_hmac_and_size(
1093 local.find(&id)?.document_hmac().copied(),
1094 *local.find(&id)?.timestamped_value.value.doc_size(),
1095 );
1096
1097 if base_file.document_hmac() == local_change.document_hmac()
1098 || local_change.document_hmac().is_none()
1099 {
1100 continue;
1101 }
1102
1103 let local_change = local_change.sign(&self.keychain)?;
1104
1105 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1106 local_changes_digests_only.push(local_change);
1107 self.events
1108 .sync_update(SyncIncrement::PushingDocument(id, true));
1109 }
1110
1111 drop(tx);
1112 if start.elapsed() > web_time::Duration::from_millis(100) {
1113 warn!("sync push_docs held lock for {:?}", start.elapsed());
1114 }
1115
1116 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1117
1118 let mut stream = stream::iter(futures).buffer_unordered(
1119 thread::available_parallelism()
1120 .unwrap_or(NonZeroUsize::new(4).unwrap())
1121 .into(),
1122 );
1123
1124 let mut docs_without_errors = vec![];
1125 let mut last_error: Option<LbErr> = None;
1126
1127 while let Some(fut) = stream.next().await {
1128 match fut {
1129 Ok(id) => {
1130 docs_without_errors.push(id);
1131 self.events
1132 .sync_update(SyncIncrement::PushingDocument(id, false));
1133 }
1134 Err(err) => {
1135 last_error = Some(err);
1136 }
1137 }
1138 }
1139
1140 local_changes_digests_only.retain(|f| docs_without_errors.contains(f.id()));
1141
1142 let mut tx = self.begin_tx().await;
1143 let db = tx.db();
1144 (&mut db.base_metadata)
1146 .to_lazy()
1147 .stage(local_changes_digests_only)
1148 .promote()?;
1149
1150 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1151
1152 tx.end();
1153
1154 if let Some(err) = last_error { Err(err) } else { Ok(()) }
1155 }
1156
1157 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1158 let id = *diff.new.id();
1159 let hmac = diff.new.document_hmac();
1160 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1161 self.client
1162 .request(
1163 self.get_account()?,
1164 ChangeDocRequestV2 { diff, new_content: local_document_change },
1165 )
1166 .await?;
1167
1168 Ok(id)
1169 }
1170
1171 #[cfg(not(target_family = "wasm"))]
1172 async fn send_debug_info(&self, account: Account) {
1173 let debug_info = self
1174 .debug_info("none provided - sync".to_string(), false)
1175 .await
1176 .unwrap();
1177
1178 if self.config.background_work {
1179 let bg_self = self.clone();
1180 tokio::spawn(async move {
1181 bg_self
1182 .client
1183 .request(&account, UpsertDebugInfoRequest { debug_info })
1184 .await
1185 .log_and_ignore();
1186 });
1187 } else {
1188 self.client
1189 .request(&account, UpsertDebugInfoRequest { debug_info })
1190 .await
1191 .log_and_ignore();
1192 }
1193 }
1194
1195 async fn read_document_helper<T>(
1196 &self, id: Uuid, tree: &mut LazyTree<T>,
1197 ) -> LbResult<DecryptedDocument>
1198 where
1199 T: TreeLike<F = SignedMeta>,
1200 {
1201 let file = tree.find(&id)?;
1202 validate::is_document(file)?;
1203 let hmac = file.document_hmac().copied();
1204
1205 if tree.calculate_deleted(&id)? {
1206 return Err(LbErrKind::FileNonexistent.into());
1207 }
1208
1209 let doc = match hmac {
1210 Some(hmac) => {
1211 let doc = self.docs.get(id, Some(hmac)).await?;
1212 tree.decrypt_document(&id, &doc, &self.keychain)?
1213 }
1214 None => vec![],
1215 };
1216
1217 Ok(doc)
1218 }
1219
1220 #[doc(hidden)]
1222 pub async fn server_dirty_ids(&self) -> LbResult<Vec<Uuid>> {
1223 let mut state = self.syncer.lock().await;
1224 self.inital_sync_state(&mut state).await?;
1225 self.process_deletions().await?;
1226 self.fetch_meta(&mut state).await?;
1227
1228 let server_ids = state.remote_changes.iter().map(|f| *f.id()).collect();
1229
1230 Ok(server_ids)
1231 }
1232
1233 pub(crate) fn setup_syncer(&self) {
1234 if self.config.background_work {
1235 self.clone().local_change_worker();
1236 self.clone().periodic_sync_worker();
1237 self.clone().post_sync_worker();
1238 }
1239 }
1240
1241 fn local_change_worker(self) {
1242 #[cfg(not(target_family = "wasm"))]
1243 tokio::spawn(async move {
1244 let mut events = self.subscribe();
1245
1246 let sync_criteria = |e: Event| {
1247 matches!(
1248 e,
1249 Event::MetadataChanged(Actor::User) | Event::DocumentWritten(_, Actor::User)
1250 )
1251 };
1252
1253 loop {
1254 time::sleep(Duration::from_millis(500)).await;
1255 let mut should_sync = false;
1256
1257 loop {
1259 let event = events.try_recv();
1260 match event {
1261 Ok(event) => {
1262 if sync_criteria(event) {
1263 should_sync = true;
1264 }
1265 }
1266 Err(TryRecvError::Empty) => break,
1267 _ => {
1268 panic!(
1269 "unexpected broadcast receive error, returning local_change_worker"
1270 );
1271 }
1272 }
1273 }
1274
1275 if !should_sync {
1278 let event = events.recv().await.unwrap();
1279 if sync_criteria(event) {
1280 self.sync().await.map_unexpected().log_and_ignore();
1281 } else {
1282 continue;
1283 }
1284 }
1285 }
1286 });
1287 }
1288
1289 fn periodic_sync_worker(self) {
1290 #[cfg(not(target_family = "wasm"))]
1291 tokio::spawn(async move {
1292 loop {
1293 self.sync().await.map_unexpected().log_and_ignore();
1294 if self.user_active().await {
1295 tokio::time::sleep(Duration::from_secs(3)).await;
1296 } else {
1297 tokio::time::sleep(Duration::from_secs(5 * 60)).await;
1298 }
1299 }
1300 });
1301 }
1302
1303 async fn user_active(&self) -> bool {
1304 let last_seen = self.user_last_seen.read().await;
1305 last_seen.elapsed() < Duration::from_secs(3 * 60)
1306 }
1307
1308 fn post_sync_worker(self) {
1309 #[cfg(not(target_family = "wasm"))]
1310 tokio::spawn(async move {
1311 let mut events = self.subscribe();
1312
1313 loop {
1314 let event = events.recv().await.unwrap();
1315 if let Event::Sync(SyncIncrement::SyncFinished(_)) = event {
1316 self.fetcher().await.map_unexpected().log_and_ignore();
1317 self.populate_pk_cache()
1318 .await
1319 .map_unexpected()
1320 .log_and_ignore();
1321 };
1322 }
1323 });
1324 }
1325
1326 async fn fetcher(&self) -> LbResult<()> {
1327 let mut files_to_pull = vec![];
1328
1329 let tx = self.ro_tx().await;
1330 let db = tx.db();
1331
1332 let Some(root) = db.root.get() else {
1333 return Ok(());
1334 };
1335
1336 let mut tree = db.base_metadata.stage(None).to_lazy();
1338
1339 for id in tree.descendants_using_links(root)? {
1340 let file = tree.find(&id)?;
1341 let hmac = file.document_hmac().copied();
1342
1343 if !file.is_document() {
1345 continue;
1346 }
1347
1348 if tree.calculate_deleted(&id)? {
1350 continue;
1351 }
1352
1353 let name = tree.name(&id, &self.keychain)?;
1355 if !name.ends_with(".md") && !name.ends_with(".svg") {
1356 continue;
1357 }
1358
1359 files_to_pull.push((id, hmac));
1360 }
1361
1362 drop(tx);
1363
1364 for (id, hmac) in files_to_pull {
1368 if let Some(hmac) = hmac {
1369 self.fetch_doc(id, hmac).await?;
1370 }
1371 }
1372
1373 Ok(())
1374 }
1375}