1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6 GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use time::Duration;
31use usvg::Transform;
32use uuid::Uuid;
33use web_time::Instant;
34
35use super::events::Actor;
36
37#[cfg(not(target_family = "wasm"))]
38use crate::model::api::UpsertDebugInfoRequest;
39
40pub type SyncFlag = Arc<AtomicBool>;
41
42pub struct SyncContext {
43 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
44 current: usize,
45 total: usize,
46
47 pk_cache: HashMap<Owner, String>,
48 last_synced: u64,
49 remote_changes: Vec<SignedMeta>,
50 update_as_of: u64,
51
52 new_root: Option<Uuid>,
54 pushed_metas: Vec<FileDiff<SignedMeta>>,
55 pushed_docs: Vec<FileDiff<SignedMeta>>,
56 pulled_docs: Vec<Uuid>,
57}
58
59impl Lb {
60 #[instrument(level = "debug", skip_all, err(Debug))]
61 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
62 let tx = self.ro_tx().await;
63 let db = tx.db();
64 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
65 drop(tx);
66
67 let remote_changes = self
68 .client
69 .request(
70 self.get_account()?,
71 GetUpdatesRequestV2 { since_metadata_version: last_synced },
72 )
73 .await?;
74 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
75 let remote_dirty = deduped
76 .into_iter()
77 .map(|f| *f.id())
78 .map(WorkUnit::ServerChange);
79
80 self.prune().await?;
81
82 let tx = self.ro_tx().await;
83 let db = tx.db();
84
85 let locally_dirty = db
86 .local_metadata
87 .get()
88 .keys()
89 .copied()
90 .map(WorkUnit::LocalChange);
91
92 let mut work_units: Vec<WorkUnit> = Vec::new();
93 work_units.extend(locally_dirty.chain(remote_dirty));
94 Ok(SyncStatus { work_units, latest_server_ts })
95 }
96
97 #[instrument(level = "debug", skip_all, err(Debug))]
98 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
99 let old = self.syncing.swap(true, Ordering::SeqCst);
100 if old {
101 return Err(LbErrKind::AlreadySyncing.into());
102 }
103
104 let mut ctx = self.setup_sync(f).await?;
105
106 let mut got_updates = false;
107 let mut pipeline: LbResult<()> = async {
108 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
110 self.prune().await?;
111 got_updates = self.fetch_meta(&mut ctx).await?;
112 self.populate_pk_cache(&mut ctx).await?;
113 self.docs.dont_delete.store(true, Ordering::SeqCst);
114 self.fetch_docs(&mut ctx).await?;
115 self.merge(&mut ctx).await?;
116 self.push_meta(&mut ctx).await?;
117 self.push_docs(&mut ctx).await?;
118 Ok(())
119 }
120 .await;
121
122 self.docs.dont_delete.store(false, Ordering::SeqCst);
123
124 if pipeline.is_ok() {
125 pipeline = self.commit_last_synced(&mut ctx).await;
126 }
127
128 let cleanup = self.cleanup().await;
129
130 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
131 self.events.sync(SyncIncrement::SyncFinished(ekind));
132
133 self.syncing.store(false, Ordering::Relaxed);
134 pipeline?;
135 cleanup?;
136
137 ctx.done_msg();
139
140 if got_updates {
141 self.events.meta_changed();
143 let owner = self.keychain.get_pk().map(Owner).ok();
144 if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
147 self.events.pending_shares_changed();
148 }
149 for id in &ctx.pulled_docs {
150 self.events.doc_written(*id, Some(Actor::Sync));
151 }
152 }
153
154 #[cfg(not(target_family = "wasm"))]
155 {
156 let account = self.get_account()?;
157 if account.is_beta() {
158 let debug_info = self.debug_info("".into(), false).await?;
159
160 self.client
161 .request(account, UpsertDebugInfoRequest { debug_info })
162 .await?;
163 }
164 }
165
166 Ok(ctx.summarize())
167 }
168
169 async fn setup_sync(
170 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
171 ) -> LbResult<SyncContext> {
172 let tx = self.ro_tx().await;
173 let db = tx.db();
174
175 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
176 let pk_cache = db.pub_key_lookup.get().clone();
177
178 let current = 0;
179 let total = 7;
180
181 Ok(SyncContext {
182 last_synced,
183 pk_cache,
184
185 progress,
186 current,
187 total,
188
189 new_root: Default::default(),
190 update_as_of: Default::default(),
191 remote_changes: Default::default(),
192 pushed_docs: Default::default(),
193 pushed_metas: Default::default(),
194 pulled_docs: Default::default(),
195 })
196 }
197
198 async fn prune(&self) -> LbResult<()> {
199 let server_ids = self
200 .client
201 .request(self.get_account()?, GetFileIdsRequest {})
202 .await?
203 .ids;
204
205 let mut tx = self.begin_tx().await;
206 let db = tx.db();
207
208 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
209 let base_ids = local.tree.base.ids();
210
211 let mut prunable_ids = base_ids;
212 prunable_ids.retain(|id| !server_ids.contains(id));
213 for id in prunable_ids.clone() {
214 prunable_ids.extend(local.descendants(&id)?.into_iter());
215 }
216 for id in &prunable_ids {
217 if let Some(base_file) = local.tree.base.maybe_find(id) {
218 self.docs
219 .delete(*id, base_file.document_hmac().copied())
220 .await?;
221 }
222 if let Some(local_file) = local.maybe_find(id) {
223 self.docs
224 .delete(*id, local_file.document_hmac().copied())
225 .await?;
226 }
227 }
228
229 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
230 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
231 base_staged.promote()?;
232
233 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
234 local_staged.tree.removed = prunable_ids.into_iter().collect();
235 local_staged.promote()?;
236
237 Ok(())
238 }
239
240 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
242 ctx.msg("Fetching tree updates...");
243 let updates = self
244 .client
245 .request(
246 self.get_account()?,
247 GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
248 )
249 .await?;
250
251 let empty = updates.file_metadata.is_empty();
252 let (remote, as_of, root) = self.dedup(updates).await?;
253
254 ctx.remote_changes = remote;
255 ctx.update_as_of = as_of;
256 ctx.new_root = root;
257
258 Ok(!empty)
259 }
260
261 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
262 ctx.msg("Updating public key cache...");
263 let mut all_owners = HashSet::new();
264 for file in &ctx.remote_changes {
265 for user_access_key in file.user_access_keys() {
266 all_owners.insert(Owner(user_access_key.encrypted_by));
267 all_owners.insert(Owner(user_access_key.encrypted_for));
268 }
269 }
270
271 let mut new_entries = HashMap::new();
272
273 for owner in all_owners {
274 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
275 let username_result = self
276 .client
277 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
278 .await;
279 let username = match username_result {
280 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
281 "<unknown>".to_string()
282 }
283 _ => username_result?.username.clone(),
284 };
285 new_entries.insert(owner, username.clone());
286 e.insert(username.clone());
287 }
288 }
289
290 let mut tx = self.begin_tx().await;
291 let db = tx.db();
292
293 for (owner, username) in new_entries {
294 db.pub_key_lookup.insert(owner, username)?;
295 }
296 Ok(())
297 }
298
299 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
300 ctx.msg("Fetching documents...");
301 let mut docs_to_pull = vec![];
302
303 let tx = self.ro_tx().await;
304 let db = tx.db();
305 let start = Instant::now();
306
307 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
309 if remote.calculate_deleted(&id)? {
310 continue;
311 }
312 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
313 let base_hmac = remote
314 .tree
315 .base
316 .maybe_find(&id)
317 .and_then(|f| f.document_hmac())
318 .cloned();
319 if base_hmac == remote_hmac {
320 continue;
321 }
322
323 if let Some(remote_hmac) = remote_hmac {
324 docs_to_pull.push((id, remote_hmac));
325 self.events.sync(SyncIncrement::PullingDocument(id, true));
326 }
327 }
328
329 drop(tx);
330 if start.elapsed() > web_time::Duration::from_millis(100) {
331 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
332 }
333
334 let num_docs = docs_to_pull.len();
335 ctx.total += num_docs;
336
337 let futures = docs_to_pull
338 .into_iter()
339 .map(|(id, hmac)| self.fetch_doc(id, hmac));
340
341 let mut stream = stream::iter(futures).buffer_unordered(
342 thread::available_parallelism()
343 .unwrap_or(NonZeroUsize::new(4).unwrap())
344 .into(),
345 );
346
347 let mut idx = 0;
348 while let Some(fut) = stream.next().await {
349 let id = fut?;
350 ctx.pulled_docs.push(id);
351 self.events.sync(SyncIncrement::PullingDocument(id, false));
352 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
353 idx += 1;
354 }
355 Ok(())
356 }
357
358 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
359 let remote_document = self
360 .client
361 .request(self.get_account()?, GetDocRequest { id, hmac })
362 .await?;
363 self.docs
364 .insert(id, Some(hmac), &remote_document.content)
365 .await?;
366
367 Ok(id)
368 }
369
370 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
373 let mut tx = self.begin_tx().await;
374 let db = tx.db();
375 let start = Instant::now();
376
377 let remote_changes = &ctx.remote_changes;
378
379 let me = Owner(self.keychain.get_pk()?);
381
382 let merge_changes = {
384 let mut base = (&db.base_metadata).to_lazy();
386 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
387 let mut remote = remote_unlazy.as_lazy();
388 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
389
390 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
392 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
393 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
394 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
395 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
396
397 'merge_construction: loop {
398 let mut deletions = {
400 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
401
402 let mut deletion_creations = HashSet::new();
404 for id in db.local_metadata.ids() {
405 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
406 deletion_creations.insert(id);
407 }
408 }
409 'drain_creations: while !deletion_creations.is_empty() {
410 'choose_a_creation: for id in &deletion_creations {
411 let id = *id;
413 let local_file = local.find(&id)?.clone();
414 let result = deletions.create_unvalidated(
415 id,
416 symkey::generate_key(),
417 local_file.parent(),
418 &local.name(&id, &self.keychain)?,
419 local_file.file_type(),
420 &self.keychain,
421 );
422 match result {
423 Ok(_) => {
424 deletion_creations.remove(&id);
425 continue 'drain_creations;
426 }
427 Err(ref err) => match err.kind {
428 LbErrKind::FileParentNonexistent => {
429 continue 'choose_a_creation;
430 }
431 _ => {
432 result?;
433 }
434 },
435 }
436 }
437 return Err(LbErrKind::Unexpected(format!(
438 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
439 ))
440 .into());
441 }
442
443 for id in db.local_metadata.ids() {
445 let local_file = local.find(&id)?.clone();
446 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
447 if !local_file.explicitly_deleted()
448 && local_file.parent() != base_file.parent()
449 && !files_to_unmove.contains(&id)
450 {
451 deletions.move_unvalidated(
453 &id,
454 local_file.parent(),
455 &self.keychain,
456 )?;
457 }
458 }
459 }
460
461 for id in db.local_metadata.ids() {
463 let local_file = local.find(&id)?.clone();
464 if local_file.explicitly_deleted() {
465 deletions.delete_unvalidated(&id, &self.keychain)?;
467 }
468 }
469 deletions
470 };
471
472 let mut merge = {
474 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
475
476 let mut creations = HashSet::new();
478 for id in db.local_metadata.ids() {
479 if deletions.maybe_find(&id).is_some()
480 && !deletions.calculate_deleted(&id)?
481 && remote.maybe_find(&id).is_none()
482 && !links_to_delete.contains(&id)
483 {
484 creations.insert(id);
485 }
486 }
487 'drain_creations: while !creations.is_empty() {
488 'choose_a_creation: for id in &creations {
489 let id = *id;
491 let local_file = local.find(&id)?.clone();
492 let result = merge.create_unvalidated(
493 id,
494 local.decrypt_key(&id, &self.keychain)?,
495 local_file.parent(),
496 &local.name(&id, &self.keychain)?,
497 local_file.file_type(),
498 &self.keychain,
499 );
500 match result {
501 Ok(_) => {
502 creations.remove(&id);
503 continue 'drain_creations;
504 }
505 Err(ref err) => match err.kind {
506 LbErrKind::FileParentNonexistent => {
507 continue 'choose_a_creation;
508 }
509 _ => {
510 result?;
511 }
512 },
513 }
514 }
515 return Err(LbErrKind::Unexpected(format!(
516 "sync failed to find a topomodelal order for file creations: {creations:?}"
517 ))
518 .into());
519 }
520
521 for id in db.local_metadata.ids() {
524 if deletions.maybe_find(&id).is_none()
526 || deletions.calculate_deleted(&id)?
527 || (remote.maybe_find(&id).is_some()
528 && remote.calculate_deleted(&id)?)
529 {
530 continue;
531 }
532
533 let local_file = local.find(&id)?.clone();
534 let local_name = local.name(&id, &self.keychain)?;
535 let maybe_base_file = base.maybe_find(&id).cloned();
536 let maybe_remote_file = remote.maybe_find(&id).cloned();
537 if let Some(ref base_file) = maybe_base_file {
538 let base_name = base.name(&id, &self.keychain)?;
539 let remote_file = remote.find(&id)?.clone();
540 let remote_name = remote.name(&id, &self.keychain)?;
541
542 if local_file.parent() != base_file.parent()
544 && remote_file.parent() == base_file.parent()
545 && !files_to_unmove.contains(&id)
546 {
547 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
548 }
549
550 if local_name != base_name && remote_name == base_name {
552 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
553 }
554 }
555
556 let mut remote_keys = HashMap::new();
558 if let Some(ref remote_file) = maybe_remote_file {
559 for key in remote_file.user_access_keys() {
560 remote_keys.insert(
561 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
562 (key.mode, key.deleted),
563 );
564 }
565 }
566 for key in local_file.user_access_keys() {
567 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
568 if let Some(&(remote_mode, remote_deleted)) =
569 remote_keys.get(&(by, for_))
570 {
571 if key.mode > remote_mode || !key.deleted && remote_deleted {
573 let mode = match key.mode {
574 UserAccessMode::Read => ShareMode::Read,
575 UserAccessMode::Write => ShareMode::Write,
576 UserAccessMode::Owner => continue,
577 };
578 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
579 }
580 if key.deleted && !remote_deleted {
582 merge.delete_share_unvalidated(
583 &id,
584 Some(for_.0),
585 &self.keychain,
586 )?;
587 }
588 } else {
589 let mode = match key.mode {
591 UserAccessMode::Read => ShareMode::Read,
592 UserAccessMode::Write => ShareMode::Write,
593 UserAccessMode::Owner => continue,
594 };
595 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
596 }
597 }
598
599 if files_to_unshare.contains(&id) {
601 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
602 }
603
604 if let Some(&rename_increment) = rename_increments.get(&id) {
606 let name = NameComponents::from(&local_name)
607 .generate_incremented(rename_increment)
608 .to_name();
609 merge.rename_unvalidated(&id, &name, &self.keychain)?;
610 }
611
612 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
614 let remote_hmac =
615 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
616 let local_hmac = local_file.document_hmac().cloned();
617 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
618 && local_hmac != base_hmac
619 {
620 if remote_hmac != base_hmac && remote_hmac != local_hmac {
621 let merge_name = merge.name(&id, &self.keychain)?;
623 let document_type =
624 DocumentType::from_file_name_using_extension(&merge_name);
625
626 let base_document =
629 self.read_document_helper(id, &mut base).await?;
630 let remote_document =
631 self.read_document_helper(id, &mut remote).await?;
632 let local_document =
633 self.read_document_helper(id, &mut local).await?;
634
635 match document_type {
636 DocumentType::Text => {
637 let base_document =
640 String::from_utf8_lossy(&base_document).to_string();
641 let remote_document =
642 String::from_utf8_lossy(&remote_document).to_string();
643 let local_document =
644 String::from_utf8_lossy(&local_document).to_string();
645 let merged_document = Buffer::from(base_document.as_str())
646 .merge(local_document, remote_document);
647 let encrypted_document = merge
648 .update_document_unvalidated(
649 &id,
650 &merged_document.into_bytes(),
651 &self.keychain,
652 )?;
653 let hmac = merge.find(&id)?.document_hmac().copied();
654 self.docs.insert(id, hmac, &encrypted_document).await?;
655 }
656 DocumentType::Drawing => {
657 let base_document =
658 String::from_utf8_lossy(&base_document).to_string();
659 let remote_document =
660 String::from_utf8_lossy(&remote_document).to_string();
661 let local_document =
662 String::from_utf8_lossy(&local_document).to_string();
663
664 let base_buffer = svg::buffer::Buffer::new(&base_document);
665 let remote_buffer =
666 svg::buffer::Buffer::new(&remote_document);
667 let mut local_buffer =
668 svg::buffer::Buffer::new(&local_document);
669
670 for (_, el) in local_buffer.elements.iter_mut() {
671 if let Element::Path(path) = el {
672 path.data.apply_transform(u_transform_to_bezier(
673 &Transform::from(
674 local_buffer
675 .weak_viewport_settings
676 .master_transform,
677 ),
678 ));
679 }
680 }
681 svg::buffer::Buffer::reload(
682 &mut local_buffer.elements,
683 &mut local_buffer.weak_images,
684 &mut local_buffer.weak_path_pressures,
685 &mut local_buffer.weak_viewport_settings,
686 &base_buffer,
687 &remote_buffer,
688 );
689
690 let merged_document = local_buffer.serialize();
691 let encrypted_document = merge
692 .update_document_unvalidated(
693 &id,
694 &merged_document.into_bytes(),
695 &self.keychain,
696 )?;
697 let hmac = merge.find(&id)?.document_hmac().copied();
698 self.docs.insert(id, hmac, &encrypted_document).await?;
699 }
700 DocumentType::Other => {
701 let merge_parent = *merge.find(&id)?.parent();
703 let duplicate_id = if let Some(&duplicate_id) =
704 duplicate_file_ids.get(&id)
705 {
706 duplicate_id
707 } else {
708 let duplicate_id = Uuid::new_v4();
709 duplicate_file_ids.insert(id, duplicate_id);
710 rename_increments.insert(duplicate_id, 1);
711 duplicate_id
712 };
713
714 let mut merge_name = merge_name;
715 merge_name = NameComponents::from(&merge_name)
716 .generate_incremented(
717 rename_increments
718 .get(&duplicate_id)
719 .copied()
720 .unwrap_or_default(),
721 )
722 .to_name();
723
724 merge.create_unvalidated(
725 duplicate_id,
726 symkey::generate_key(),
727 &merge_parent,
728 &merge_name,
729 FileType::Document,
730 &self.keychain,
731 )?;
732 let encrypted_document = merge
733 .update_document_unvalidated(
734 &duplicate_id,
735 &local_document,
736 &self.keychain,
737 )?;
738 let duplicate_hmac =
739 merge.find(&duplicate_id)?.document_hmac().copied();
740 self.docs
741 .insert(
742 duplicate_id,
743 duplicate_hmac,
744 &encrypted_document,
745 )
746 .await?;
747 }
748 }
749 } else {
750 let document = self.read_document_helper(id, &mut local).await?;
752 merge.update_document_unvalidated(
753 &id,
754 &document,
755 &self.keychain,
756 )?;
757 }
758 }
759 }
760
761 for id in db.local_metadata.ids() {
764 if db.base_metadata.maybe_find(&id).is_some()
765 && deletions.calculate_deleted(&id)?
766 && !merge.calculate_deleted(&id)?
767 {
768 merge.delete_unvalidated(&id, &self.keychain)?;
770 }
771 }
772 for &id in &links_to_delete {
773 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
775 merge.delete_unvalidated(&id, &self.keychain)?;
776 }
777 }
778
779 merge
780 };
781
782 for link in merge.ids() {
784 if !merge.calculate_deleted(&link)? {
785 if let FileType::Link { target } = merge.find(&link)?.file_type() {
786 if merge.maybe_find(&target).is_some()
787 && merge.calculate_deleted(&target)?
788 {
789 if links_to_delete.insert(link) {
791 continue 'merge_construction;
792 } else {
793 return Err(LbErrKind::Unexpected(format!(
794 "sync failed to resolve broken link (deletion): {link:?}"
795 ))
796 .into());
797 }
798 }
799 }
800 }
801 }
802
803 let validate_result = merge.validate(me);
804 match validate_result {
805 Ok(_) => {
807 let (_, merge_changes) = merge.unstage();
808 break merge_changes;
809 }
810 Err(ref err) => match err.kind {
811 LbErrKind::Validation(ref vf) => match vf {
812 ValidationFailure::Cycle(ids) => {
814 let mut progress = false;
816 for &id in ids {
817 if db.local_metadata.maybe_find(&id).is_some()
818 && files_to_unmove.insert(id)
819 {
820 progress = true;
821 }
822 }
823 if !progress {
824 return Err(LbErrKind::Unexpected(format!(
825 "sync failed to resolve cycle: {ids:?}"
826 ))
827 .into());
828 }
829 }
830 ValidationFailure::PathConflict(ids) => {
831 let mut progress = false;
833 for &id in ids {
834 if duplicate_file_ids.values().any(|&dup| dup == id) {
835 *rename_increments.entry(id).or_insert(0) += 1;
836 progress = true;
837 break;
838 }
839 }
840 if !progress {
841 for &id in ids {
842 if db.local_metadata.maybe_find(&id).is_some() {
843 *rename_increments.entry(id).or_insert(0) += 1;
844 progress = true;
845 break;
846 }
847 }
848 }
849 if !progress {
850 return Err(LbErrKind::Unexpected(format!(
851 "sync failed to resolve path conflict: {ids:?}"
852 ))
853 .into());
854 }
855 }
856 ValidationFailure::SharedLink { link, shared_ancestor } => {
857 let mut progress = false;
859 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
860 {
861 if !base_shared_ancestor.is_shared()
862 && files_to_unshare.insert(*shared_ancestor)
863 {
864 progress = true;
865 }
866 }
867 if !progress && links_to_delete.insert(*link) {
868 progress = true;
869 }
870 if !progress {
871 return Err(LbErrKind::Unexpected(format!(
872 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
873 )).into());
874 }
875 }
876 ValidationFailure::DuplicateLink { target } => {
877 let mut progress = false;
879 if let Some(link) = local.linked_by(target)? {
880 if links_to_delete.insert(link) {
881 progress = true;
882 }
883 }
884 if !progress {
885 return Err(LbErrKind::Unexpected(format!(
886 "sync failed to resolve duplicate link: target: {target:?}"
887 ))
888 .into());
889 }
890 }
891 ValidationFailure::BrokenLink(link) => {
892 if !links_to_delete.insert(*link) {
894 return Err(LbErrKind::Unexpected(format!(
895 "sync failed to resolve broken link: {link:?}"
896 ))
897 .into());
898 }
899 }
900 ValidationFailure::OwnedLink(link) => {
901 let mut progress = false;
903 if let Some(remote_link) = remote.maybe_find(link) {
904 if let FileType::Link { target } = remote_link.file_type() {
905 let remote_target = remote.find(&target)?;
906 if remote_target.owner() != me
907 && files_to_unmove.insert(target)
908 {
909 progress = true;
910 }
911 }
912 }
913 if !progress && links_to_delete.insert(*link) {
914 progress = true;
915 }
916 if !progress {
917 return Err(LbErrKind::Unexpected(format!(
918 "sync failed to resolve owned link: {link:?}"
919 ))
920 .into());
921 }
922 }
923 ValidationFailure::Orphan(_)
925 | ValidationFailure::NonFolderWithChildren(_)
926 | ValidationFailure::FileWithDifferentOwnerParent(_)
927 | ValidationFailure::FileNameTooLong(_)
928 | ValidationFailure::DeletedFileUpdated(_)
929 | ValidationFailure::NonDecryptableFileName(_) => {
930 validate_result?;
931 }
932 },
933 _ => {
935 validate_result?;
936 }
937 },
938 }
939 }
940 };
941
942 (&mut db.base_metadata)
944 .to_staged(remote_changes.clone())
945 .to_lazy()
946 .promote()?;
947 db.local_metadata.clear()?;
948 (&mut db.local_metadata)
949 .to_staged(merge_changes)
950 .to_lazy()
951 .promote()?;
952
953 db.base_metadata.stage(&mut db.local_metadata).prune()?;
956
957 if start.elapsed() > web_time::Duration::from_millis(100) {
958 warn!("sync merge held lock for {:?}", start.elapsed());
959 }
960
961 Ok(())
962 }
963
964 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
966 ctx.msg("Pushing tree changes...");
967 let mut updates = vec![];
968 let mut local_changes_no_digests = Vec::new();
969
970 let tx = self.ro_tx().await;
971 let db = tx.db();
972
973 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
975
976 for id in local.tree.staged.ids() {
977 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
978 let maybe_base_file = local.tree.base.maybe_find(&id);
979
980 local_change.set_hmac_and_size(
982 maybe_base_file.and_then(|f| f.document_hmac().copied()),
983 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
984 );
985 let local_change = local_change.sign(&self.keychain)?;
986
987 local_changes_no_digests.push(local_change.clone());
988 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
989 updates.push(file_diff);
990 }
991
992 drop(tx);
993
994 if !updates.is_empty() {
995 self.client
996 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
997 .await?;
998 ctx.pushed_metas = updates;
999 }
1000
1001 let mut tx = self.begin_tx().await;
1002 let db = tx.db();
1003
1004 (&mut db.base_metadata)
1006 .to_lazy()
1007 .stage(local_changes_no_digests)
1008 .promote()?;
1009 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1010
1011 tx.end();
1012
1013 Ok(())
1014 }
1015
1016 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1018 ctx.msg("Pushing document changes...");
1019 let mut updates = vec![];
1020 let mut local_changes_digests_only = vec![];
1021
1022 let tx = self.ro_tx().await;
1023 let db = tx.db();
1024 let start = Instant::now();
1025
1026 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1027
1028 for id in local.tree.staged.ids() {
1029 let base_file = local.tree.base.find(&id)?.clone();
1030
1031 let mut local_change = base_file.timestamped_value.value.clone();
1033 local_change.set_hmac_and_size(
1034 local.find(&id)?.document_hmac().copied(),
1035 *local.find(&id)?.timestamped_value.value.doc_size(),
1036 );
1037
1038 if base_file.document_hmac() == local_change.document_hmac()
1039 || local_change.document_hmac().is_none()
1040 {
1041 continue;
1042 }
1043
1044 let local_change = local_change.sign(&self.keychain)?;
1045
1046 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1047 local_changes_digests_only.push(local_change);
1048 self.events.sync(SyncIncrement::PushingDocument(id, true));
1049 }
1050
1051 drop(tx);
1052 if start.elapsed() > web_time::Duration::from_millis(100) {
1053 warn!("sync push_docs held lock for {:?}", start.elapsed());
1054 }
1055
1056 let docs_count = updates.len();
1057 ctx.total += docs_count;
1058 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1059
1060 let mut stream = stream::iter(futures).buffer_unordered(
1061 thread::available_parallelism()
1062 .unwrap_or(NonZeroUsize::new(4).unwrap())
1063 .into(),
1064 );
1065
1066 let mut idx = 0;
1067 while let Some(fut) = stream.next().await {
1068 let id = fut?;
1069 self.events.sync(SyncIncrement::PushingDocument(id, false));
1070 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1071 idx += 1;
1072 }
1073 ctx.pushed_docs = updates;
1074
1075 let mut tx = self.begin_tx().await;
1076 let db = tx.db();
1077 (&mut db.base_metadata)
1079 .to_lazy()
1080 .stage(local_changes_digests_only)
1081 .promote()?;
1082
1083 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1084
1085 tx.end();
1086
1087 Ok(())
1088 }
1089
1090 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1091 let id = *diff.new.id();
1092 let hmac = diff.new.document_hmac();
1093 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1094 self.client
1095 .request(
1096 self.get_account()?,
1097 ChangeDocRequestV2 { diff, new_content: local_document_change },
1098 )
1099 .await?;
1100
1101 Ok(id)
1102 }
1103
1104 async fn dedup(
1105 &self, updates: GetUpdatesResponseV2,
1106 ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1107 let tx = self.ro_tx().await;
1108 let db = tx.db();
1109
1110 let mut root_id = None;
1111 let (remote_changes, update_as_of) = {
1112 let mut remote_changes = updates.file_metadata;
1113 let update_as_of = updates.as_of_metadata_version;
1114
1115 remote_changes = {
1116 let me = Owner(self.keychain.get_pk()?);
1117 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1118 let mut result = Vec::new();
1119
1120 for id in remote.tree.staged.ids() {
1121 let meta = remote.find(&id)?;
1122 if remote.maybe_find_parent(meta).is_some()
1123 || meta
1124 .user_access_keys()
1125 .iter()
1126 .any(|k| k.encrypted_for == me.0)
1127 {
1128 result.push(remote.find(&id)?.clone()); }
1130 }
1131
1132 result
1133 };
1134
1135 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1136
1137 let (_, remote_changes) = remote.unstage();
1138 (remote_changes, update_as_of)
1139 };
1140
1141 if db.root.get().is_none() {
1143 let root = remote_changes
1144 .all_files()?
1145 .into_iter()
1146 .find(|f| f.is_root())
1147 .ok_or(LbErrKind::RootNonexistent)?;
1148 root_id = Some(*root.id());
1149 }
1150
1151 Ok((remote_changes, update_as_of, root_id))
1152 }
1153
1154 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1155 ctx.msg("Cleaning up...");
1156 let mut tx = self.begin_tx().await;
1157 let db = tx.db();
1158 db.last_synced.insert(ctx.update_as_of as i64)?;
1159
1160 if let Some(root) = ctx.new_root {
1161 db.root.insert(root)?;
1162 }
1163
1164 Ok(())
1165 }
1166
1167 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1168 let tx = self.ro_tx().await;
1169 let db = tx.db();
1170 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1171
1172 Ok(self.get_timestamp_human_string(last_synced))
1173 }
1174
1175 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1176 if timestamp != 0 {
1177 Duration::milliseconds(clock::get_time().0 - timestamp)
1178 .format_human()
1179 .to_string()
1180 } else {
1181 "never".to_string()
1182 }
1183 }
1184}
1185
1186impl SyncContext {
1187 fn summarize(&self) -> SyncStatus {
1188 let mut local = HashSet::new();
1189 let mut server = HashSet::new();
1190 let mut work_units = vec![];
1191
1192 for meta in &self.pushed_metas {
1193 local.insert(meta.new.id());
1194 }
1195
1196 for meta in &self.pushed_docs {
1197 local.insert(meta.new.id());
1198 }
1199
1200 for meta in &self.remote_changes {
1201 server.insert(meta.id());
1202 }
1203
1204 for id in local {
1205 work_units.push(WorkUnit::LocalChange(*id));
1206 }
1207
1208 for id in server {
1209 work_units.push(WorkUnit::ServerChange(*id));
1210 }
1211
1212 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1213 }
1214
1215 fn msg(&mut self, msg: &str) {
1216 self.current += 1;
1217 if let Some(f) = &self.progress {
1218 f(SyncProgress {
1219 total: self.total,
1220 progress: self.current,
1221 file_being_processed: Default::default(),
1222 msg: msg.to_string(),
1223 })
1224 }
1225 }
1226
1227 fn file_msg(&mut self, id: Uuid, msg: &str) {
1228 self.current += 1;
1229 if let Some(f) = &self.progress {
1230 f(SyncProgress {
1231 total: self.total,
1232 progress: self.current,
1233 file_being_processed: Some(id),
1234 msg: msg.to_string(),
1235 })
1236 }
1237 }
1238
1239 fn done_msg(&mut self) {
1240 self.current = self.total;
1241 if let Some(f) = &self.progress {
1242 f(SyncProgress {
1243 total: self.total,
1244 progress: self.current,
1245 file_being_processed: None,
1246 msg: "Sync successful!".to_string(),
1247 })
1248 }
1249 }
1250}
1251
1252#[derive(Debug, Serialize, Clone)]
1253pub struct SyncStatus {
1254 pub work_units: Vec<WorkUnit>,
1255 pub latest_server_ts: u64,
1256}
1257
1258#[derive(Clone)]
1259pub struct SyncProgress {
1260 pub total: usize,
1261 pub progress: usize,
1262 pub file_being_processed: Option<Uuid>,
1263 pub msg: String,
1264}
1265
1266impl Display for SyncProgress {
1267 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1268 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1269 }
1270}
1271
1272#[derive(Debug, Clone)]
1273pub enum SyncIncrement {
1274 SyncStarted,
1275 PullingDocument(Uuid, bool),
1276 PushingDocument(Uuid, bool),
1277 SyncFinished(Option<LbErrKind>),
1278}