1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6 GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertDebugInfoRequest,
7 UpsertRequestV2,
8};
9use crate::model::errors::{LbErrKind, LbResult};
10use crate::model::file::ShareMode;
11use crate::model::file_like::FileLike;
12use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
13use crate::model::filename::{DocumentType, NameComponents};
14use crate::model::signed_meta::SignedMeta;
15use crate::model::staged::StagedTreeLikeMut;
16use crate::model::svg::buffer::u_transform_to_bezier;
17use crate::model::svg::element::Element;
18use crate::model::text::buffer::Buffer;
19use crate::model::tree_like::TreeLike;
20use crate::model::work_unit::WorkUnit;
21use crate::model::{ValidationFailure, clock, svg, symkey};
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::{StreamExt, stream};
24use serde::Serialize;
25use std::collections::{HashMap, HashSet, hash_map};
26use std::fmt::{Display, Formatter};
27use std::num::NonZeroUsize;
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::thread;
31use std::time::Instant;
32use time::Duration;
33use usvg::Transform;
34use uuid::Uuid;
35
36use super::events::Actor;
37
38pub type SyncFlag = Arc<AtomicBool>;
39
40pub struct SyncContext {
41 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
42 current: usize,
43 total: usize,
44
45 pk_cache: HashMap<Owner, String>,
46 last_synced: u64,
47 remote_changes: Vec<SignedMeta>,
48 update_as_of: u64,
49
50 new_root: Option<Uuid>,
52 pushed_metas: Vec<FileDiff<SignedMeta>>,
53 pushed_docs: Vec<FileDiff<SignedMeta>>,
54 pulled_docs: Vec<Uuid>,
55}
56
57impl Lb {
58 #[instrument(level = "debug", skip_all, err(Debug))]
59 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
60 let tx = self.ro_tx().await;
61 let db = tx.db();
62 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
63 drop(tx);
64
65 let remote_changes = self
66 .client
67 .request(
68 self.get_account()?,
69 GetUpdatesRequestV2 { since_metadata_version: last_synced },
70 )
71 .await?;
72 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
73 let remote_dirty = deduped
74 .into_iter()
75 .map(|f| *f.id())
76 .map(WorkUnit::ServerChange);
77
78 self.prune().await?;
79
80 let tx = self.ro_tx().await;
81 let db = tx.db();
82
83 let locally_dirty = db
84 .local_metadata
85 .get()
86 .keys()
87 .copied()
88 .map(WorkUnit::LocalChange);
89
90 let mut work_units: Vec<WorkUnit> = Vec::new();
91 work_units.extend(locally_dirty.chain(remote_dirty));
92 Ok(SyncStatus { work_units, latest_server_ts })
93 }
94
95 #[instrument(level = "debug", skip_all, err(Debug))]
96 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
97 let old = self.syncing.swap(true, Ordering::SeqCst);
98 if old {
99 return Err(LbErrKind::AlreadySyncing.into());
100 }
101
102 let mut ctx = self.setup_sync(f).await?;
103
104 let mut got_updates = false;
105 let mut pipeline: LbResult<()> = async {
106 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
108 self.prune().await?;
109 got_updates = self.fetch_meta(&mut ctx).await?;
110 self.populate_pk_cache(&mut ctx).await?;
111 self.docs.dont_delete.store(true, Ordering::SeqCst);
112 self.fetch_docs(&mut ctx).await?;
113 self.merge(&mut ctx).await?;
114 self.push_meta(&mut ctx).await?;
115 self.push_docs(&mut ctx).await?;
116 Ok(())
117 }
118 .await;
119
120 self.docs.dont_delete.store(false, Ordering::SeqCst);
121
122 if pipeline.is_ok() {
123 pipeline = self.commit_last_synced(&mut ctx).await;
124 }
125
126 let cleanup = self.cleanup().await;
127
128 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
129 self.events.sync(SyncIncrement::SyncFinished(ekind));
130
131 self.syncing.store(false, Ordering::Relaxed);
132 pipeline?;
133 cleanup?;
134
135 ctx.done_msg();
137
138 if got_updates {
139 self.events.meta_changed();
141 let owner = self.keychain.get_pk().map(Owner).ok();
142 if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
145 self.events.pending_shares_changed();
146 }
147 for id in &ctx.pulled_docs {
148 self.events.doc_written(*id, Some(Actor::Sync));
149 }
150 }
151
152 let account = self.get_account()?;
153 if account.is_beta() {
154 let debug_info = self.debug_info("".into()).await?;
155
156 self.client
157 .request(account, UpsertDebugInfoRequest { debug_info })
158 .await?;
159 }
160
161 Ok(ctx.summarize())
162 }
163
164 async fn setup_sync(
165 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
166 ) -> LbResult<SyncContext> {
167 let tx = self.ro_tx().await;
168 let db = tx.db();
169
170 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
171 let pk_cache = db.pub_key_lookup.get().clone();
172
173 let current = 0;
174 let total = 7;
175
176 Ok(SyncContext {
177 last_synced,
178 pk_cache,
179
180 progress,
181 current,
182 total,
183
184 new_root: Default::default(),
185 update_as_of: Default::default(),
186 remote_changes: Default::default(),
187 pushed_docs: Default::default(),
188 pushed_metas: Default::default(),
189 pulled_docs: Default::default(),
190 })
191 }
192
193 async fn prune(&self) -> LbResult<()> {
194 let server_ids = self
195 .client
196 .request(self.get_account()?, GetFileIdsRequest {})
197 .await?
198 .ids;
199
200 let mut tx = self.begin_tx().await;
201 let db = tx.db();
202
203 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
204 let base_ids = local.tree.base.ids();
205
206 let mut prunable_ids = base_ids;
207 prunable_ids.retain(|id| !server_ids.contains(id));
208 for id in prunable_ids.clone() {
209 prunable_ids.extend(local.descendants(&id)?.into_iter());
210 }
211 for id in &prunable_ids {
212 if let Some(base_file) = local.tree.base.maybe_find(id) {
213 self.docs
214 .delete(*id, base_file.document_hmac().copied())
215 .await?;
216 }
217 if let Some(local_file) = local.maybe_find(id) {
218 self.docs
219 .delete(*id, local_file.document_hmac().copied())
220 .await?;
221 }
222 }
223
224 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
225 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
226 base_staged.promote()?;
227
228 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
229 local_staged.tree.removed = prunable_ids.into_iter().collect();
230 local_staged.promote()?;
231
232 Ok(())
233 }
234
235 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
237 ctx.msg("Fetching tree updates...");
238 let updates = self
239 .client
240 .request(
241 self.get_account()?,
242 GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
243 )
244 .await?;
245
246 let empty = updates.file_metadata.is_empty();
247 let (remote, as_of, root) = self.dedup(updates).await?;
248
249 ctx.remote_changes = remote;
250 ctx.update_as_of = as_of;
251 ctx.new_root = root;
252
253 Ok(!empty)
254 }
255
256 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
257 ctx.msg("Updating public key cache...");
258 let mut all_owners = HashSet::new();
259 for file in &ctx.remote_changes {
260 for user_access_key in file.user_access_keys() {
261 all_owners.insert(Owner(user_access_key.encrypted_by));
262 all_owners.insert(Owner(user_access_key.encrypted_for));
263 }
264 }
265
266 let mut new_entries = HashMap::new();
267
268 for owner in all_owners {
269 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
270 let username_result = self
271 .client
272 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
273 .await;
274 let username = match username_result {
275 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
276 "<unknown>".to_string()
277 }
278 _ => username_result?.username.clone(),
279 };
280 new_entries.insert(owner, username.clone());
281 e.insert(username.clone());
282 }
283 }
284
285 let mut tx = self.begin_tx().await;
286 let db = tx.db();
287
288 for (owner, username) in new_entries {
289 db.pub_key_lookup.insert(owner, username)?;
290 }
291 Ok(())
292 }
293
294 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
295 ctx.msg("Fetching documents...");
296 let mut docs_to_pull = vec![];
297
298 let tx = self.ro_tx().await;
299 let db = tx.db();
300 let start = Instant::now();
301
302 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
304 if remote.calculate_deleted(&id)? {
305 continue;
306 }
307 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
308 let base_hmac = remote
309 .tree
310 .base
311 .maybe_find(&id)
312 .and_then(|f| f.document_hmac())
313 .cloned();
314 if base_hmac == remote_hmac {
315 continue;
316 }
317
318 if let Some(remote_hmac) = remote_hmac {
319 docs_to_pull.push((id, remote_hmac));
320 self.events.sync(SyncIncrement::PullingDocument(id, true));
321 }
322 }
323
324 drop(tx);
325 if start.elapsed() > std::time::Duration::from_millis(100) {
326 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
327 }
328
329 let num_docs = docs_to_pull.len();
330 ctx.total += num_docs;
331
332 let futures = docs_to_pull
333 .into_iter()
334 .map(|(id, hmac)| self.fetch_doc(id, hmac));
335
336 let mut stream = stream::iter(futures).buffer_unordered(
337 thread::available_parallelism()
338 .unwrap_or(NonZeroUsize::new(4).unwrap())
339 .into(),
340 );
341
342 let mut idx = 0;
343 while let Some(fut) = stream.next().await {
344 let id = fut?;
345 ctx.pulled_docs.push(id);
346 self.events.sync(SyncIncrement::PullingDocument(id, false));
347 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
348 idx += 1;
349 }
350 Ok(())
351 }
352
353 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
354 let remote_document = self
355 .client
356 .request(self.get_account()?, GetDocRequest { id, hmac })
357 .await?;
358 self.docs
359 .insert(id, Some(hmac), &remote_document.content)
360 .await?;
361
362 Ok(id)
363 }
364
365 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
368 let mut tx = self.begin_tx().await;
369 let db = tx.db();
370 let start = Instant::now();
371
372 let remote_changes = &ctx.remote_changes;
373
374 let me = Owner(self.keychain.get_pk()?);
376
377 let merge_changes = {
379 let mut base = (&db.base_metadata).to_lazy();
381 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
382 let mut remote = remote_unlazy.as_lazy();
383 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
384
385 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
387 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
388 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
389 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
390 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
391
392 'merge_construction: loop {
393 let mut deletions = {
395 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
396
397 let mut deletion_creations = HashSet::new();
399 for id in db.local_metadata.ids() {
400 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
401 deletion_creations.insert(id);
402 }
403 }
404 'drain_creations: while !deletion_creations.is_empty() {
405 'choose_a_creation: for id in &deletion_creations {
406 let id = *id;
408 let local_file = local.find(&id)?.clone();
409 let result = deletions.create_unvalidated(
410 id,
411 symkey::generate_key(),
412 local_file.parent(),
413 &local.name(&id, &self.keychain)?,
414 local_file.file_type(),
415 &self.keychain,
416 );
417 match result {
418 Ok(_) => {
419 deletion_creations.remove(&id);
420 continue 'drain_creations;
421 }
422 Err(ref err) => match err.kind {
423 LbErrKind::FileParentNonexistent => {
424 continue 'choose_a_creation;
425 }
426 _ => {
427 result?;
428 }
429 },
430 }
431 }
432 return Err(LbErrKind::Unexpected(format!(
433 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
434 ))
435 .into());
436 }
437
438 for id in db.local_metadata.ids() {
440 let local_file = local.find(&id)?.clone();
441 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
442 if !local_file.explicitly_deleted()
443 && local_file.parent() != base_file.parent()
444 && !files_to_unmove.contains(&id)
445 {
446 deletions.move_unvalidated(
448 &id,
449 local_file.parent(),
450 &self.keychain,
451 )?;
452 }
453 }
454 }
455
456 for id in db.local_metadata.ids() {
458 let local_file = local.find(&id)?.clone();
459 if local_file.explicitly_deleted() {
460 deletions.delete_unvalidated(&id, &self.keychain)?;
462 }
463 }
464 deletions
465 };
466
467 let mut merge = {
469 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
470
471 let mut creations = HashSet::new();
473 for id in db.local_metadata.ids() {
474 if deletions.maybe_find(&id).is_some()
475 && !deletions.calculate_deleted(&id)?
476 && remote.maybe_find(&id).is_none()
477 && !links_to_delete.contains(&id)
478 {
479 creations.insert(id);
480 }
481 }
482 'drain_creations: while !creations.is_empty() {
483 'choose_a_creation: for id in &creations {
484 let id = *id;
486 let local_file = local.find(&id)?.clone();
487 let result = merge.create_unvalidated(
488 id,
489 local.decrypt_key(&id, &self.keychain)?,
490 local_file.parent(),
491 &local.name(&id, &self.keychain)?,
492 local_file.file_type(),
493 &self.keychain,
494 );
495 match result {
496 Ok(_) => {
497 creations.remove(&id);
498 continue 'drain_creations;
499 }
500 Err(ref err) => match err.kind {
501 LbErrKind::FileParentNonexistent => {
502 continue 'choose_a_creation;
503 }
504 _ => {
505 result?;
506 }
507 },
508 }
509 }
510 return Err(LbErrKind::Unexpected(format!(
511 "sync failed to find a topomodelal order for file creations: {creations:?}"
512 ))
513 .into());
514 }
515
516 for id in db.local_metadata.ids() {
519 if deletions.maybe_find(&id).is_none()
521 || deletions.calculate_deleted(&id)?
522 || (remote.maybe_find(&id).is_some()
523 && remote.calculate_deleted(&id)?)
524 {
525 continue;
526 }
527
528 let local_file = local.find(&id)?.clone();
529 let local_name = local.name(&id, &self.keychain)?;
530 let maybe_base_file = base.maybe_find(&id).cloned();
531 let maybe_remote_file = remote.maybe_find(&id).cloned();
532 if let Some(ref base_file) = maybe_base_file {
533 let base_name = base.name(&id, &self.keychain)?;
534 let remote_file = remote.find(&id)?.clone();
535 let remote_name = remote.name(&id, &self.keychain)?;
536
537 if local_file.parent() != base_file.parent()
539 && remote_file.parent() == base_file.parent()
540 && !files_to_unmove.contains(&id)
541 {
542 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
543 }
544
545 if local_name != base_name && remote_name == base_name {
547 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
548 }
549 }
550
551 let mut remote_keys = HashMap::new();
553 if let Some(ref remote_file) = maybe_remote_file {
554 for key in remote_file.user_access_keys() {
555 remote_keys.insert(
556 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
557 (key.mode, key.deleted),
558 );
559 }
560 }
561 for key in local_file.user_access_keys() {
562 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
563 if let Some(&(remote_mode, remote_deleted)) =
564 remote_keys.get(&(by, for_))
565 {
566 if key.mode > remote_mode || !key.deleted && remote_deleted {
568 let mode = match key.mode {
569 UserAccessMode::Read => ShareMode::Read,
570 UserAccessMode::Write => ShareMode::Write,
571 UserAccessMode::Owner => continue,
572 };
573 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
574 }
575 if key.deleted && !remote_deleted {
577 merge.delete_share_unvalidated(
578 &id,
579 Some(for_.0),
580 &self.keychain,
581 )?;
582 }
583 } else {
584 let mode = match key.mode {
586 UserAccessMode::Read => ShareMode::Read,
587 UserAccessMode::Write => ShareMode::Write,
588 UserAccessMode::Owner => continue,
589 };
590 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
591 }
592 }
593
594 if files_to_unshare.contains(&id) {
596 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
597 }
598
599 if let Some(&rename_increment) = rename_increments.get(&id) {
601 let name = NameComponents::from(&local_name)
602 .generate_incremented(rename_increment)
603 .to_name();
604 merge.rename_unvalidated(&id, &name, &self.keychain)?;
605 }
606
607 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
609 let remote_hmac =
610 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
611 let local_hmac = local_file.document_hmac().cloned();
612 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
613 && local_hmac != base_hmac
614 {
615 if remote_hmac != base_hmac && remote_hmac != local_hmac {
616 let merge_name = merge.name(&id, &self.keychain)?;
618 let document_type =
619 DocumentType::from_file_name_using_extension(&merge_name);
620
621 let base_document =
624 self.read_document_helper(id, &mut base).await?;
625 let remote_document =
626 self.read_document_helper(id, &mut remote).await?;
627 let local_document =
628 self.read_document_helper(id, &mut local).await?;
629
630 match document_type {
631 DocumentType::Text => {
632 let base_document =
635 String::from_utf8_lossy(&base_document).to_string();
636 let remote_document =
637 String::from_utf8_lossy(&remote_document).to_string();
638 let local_document =
639 String::from_utf8_lossy(&local_document).to_string();
640 let merged_document = Buffer::from(base_document.as_str())
641 .merge(local_document, remote_document);
642 let encrypted_document = merge
643 .update_document_unvalidated(
644 &id,
645 &merged_document.into_bytes(),
646 &self.keychain,
647 )?;
648 let hmac = merge.find(&id)?.document_hmac().copied();
649 self.docs.insert(id, hmac, &encrypted_document).await?;
650 }
651 DocumentType::Drawing => {
652 let base_document =
653 String::from_utf8_lossy(&base_document).to_string();
654 let remote_document =
655 String::from_utf8_lossy(&remote_document).to_string();
656 let local_document =
657 String::from_utf8_lossy(&local_document).to_string();
658
659 let base_buffer = svg::buffer::Buffer::new(&base_document);
660 let remote_buffer =
661 svg::buffer::Buffer::new(&remote_document);
662 let mut local_buffer =
663 svg::buffer::Buffer::new(&local_document);
664
665 for (_, el) in local_buffer.elements.iter_mut() {
666 if let Element::Path(path) = el {
667 path.data.apply_transform(u_transform_to_bezier(
668 &Transform::from(
669 local_buffer
670 .weak_viewport_settings
671 .master_transform,
672 ),
673 ));
674 }
675 }
676 svg::buffer::Buffer::reload(
677 &mut local_buffer.elements,
678 &mut local_buffer.weak_images,
679 &mut local_buffer.weak_path_pressures,
680 &mut local_buffer.weak_viewport_settings,
681 &base_buffer,
682 &remote_buffer,
683 );
684
685 let merged_document = local_buffer.serialize();
686 let encrypted_document = merge
687 .update_document_unvalidated(
688 &id,
689 &merged_document.into_bytes(),
690 &self.keychain,
691 )?;
692 let hmac = merge.find(&id)?.document_hmac().copied();
693 self.docs.insert(id, hmac, &encrypted_document).await?;
694 }
695 DocumentType::Other => {
696 let merge_parent = *merge.find(&id)?.parent();
698 let duplicate_id = if let Some(&duplicate_id) =
699 duplicate_file_ids.get(&id)
700 {
701 duplicate_id
702 } else {
703 let duplicate_id = Uuid::new_v4();
704 duplicate_file_ids.insert(id, duplicate_id);
705 rename_increments.insert(duplicate_id, 1);
706 duplicate_id
707 };
708
709 let mut merge_name = merge_name;
710 merge_name = NameComponents::from(&merge_name)
711 .generate_incremented(
712 rename_increments
713 .get(&duplicate_id)
714 .copied()
715 .unwrap_or_default(),
716 )
717 .to_name();
718
719 merge.create_unvalidated(
720 duplicate_id,
721 symkey::generate_key(),
722 &merge_parent,
723 &merge_name,
724 FileType::Document,
725 &self.keychain,
726 )?;
727 let encrypted_document = merge
728 .update_document_unvalidated(
729 &duplicate_id,
730 &local_document,
731 &self.keychain,
732 )?;
733 let duplicate_hmac =
734 merge.find(&duplicate_id)?.document_hmac().copied();
735 self.docs
736 .insert(
737 duplicate_id,
738 duplicate_hmac,
739 &encrypted_document,
740 )
741 .await?;
742 }
743 }
744 } else {
745 let document = self.read_document_helper(id, &mut local).await?;
747 merge.update_document_unvalidated(
748 &id,
749 &document,
750 &self.keychain,
751 )?;
752 }
753 }
754 }
755
756 for id in db.local_metadata.ids() {
759 if db.base_metadata.maybe_find(&id).is_some()
760 && deletions.calculate_deleted(&id)?
761 && !merge.calculate_deleted(&id)?
762 {
763 merge.delete_unvalidated(&id, &self.keychain)?;
765 }
766 }
767 for &id in &links_to_delete {
768 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
770 merge.delete_unvalidated(&id, &self.keychain)?;
771 }
772 }
773
774 merge
775 };
776
777 for link in merge.ids() {
779 if !merge.calculate_deleted(&link)? {
780 if let FileType::Link { target } = merge.find(&link)?.file_type() {
781 if merge.maybe_find(&target).is_some()
782 && merge.calculate_deleted(&target)?
783 {
784 if links_to_delete.insert(link) {
786 continue 'merge_construction;
787 } else {
788 return Err(LbErrKind::Unexpected(format!(
789 "sync failed to resolve broken link (deletion): {link:?}"
790 ))
791 .into());
792 }
793 }
794 }
795 }
796 }
797
798 let validate_result = merge.validate(me);
799 match validate_result {
800 Ok(_) => {
802 let (_, merge_changes) = merge.unstage();
803 break merge_changes;
804 }
805 Err(ref err) => match err.kind {
806 LbErrKind::Validation(ref vf) => match vf {
807 ValidationFailure::Cycle(ids) => {
809 let mut progress = false;
811 for &id in ids {
812 if db.local_metadata.maybe_find(&id).is_some()
813 && files_to_unmove.insert(id)
814 {
815 progress = true;
816 }
817 }
818 if !progress {
819 return Err(LbErrKind::Unexpected(format!(
820 "sync failed to resolve cycle: {ids:?}"
821 ))
822 .into());
823 }
824 }
825 ValidationFailure::PathConflict(ids) => {
826 let mut progress = false;
828 for &id in ids {
829 if duplicate_file_ids.values().any(|&dup| dup == id) {
830 *rename_increments.entry(id).or_insert(0) += 1;
831 progress = true;
832 break;
833 }
834 }
835 if !progress {
836 for &id in ids {
837 if db.local_metadata.maybe_find(&id).is_some() {
838 *rename_increments.entry(id).or_insert(0) += 1;
839 progress = true;
840 break;
841 }
842 }
843 }
844 if !progress {
845 return Err(LbErrKind::Unexpected(format!(
846 "sync failed to resolve path conflict: {ids:?}"
847 ))
848 .into());
849 }
850 }
851 ValidationFailure::SharedLink { link, shared_ancestor } => {
852 let mut progress = false;
854 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
855 {
856 if !base_shared_ancestor.is_shared()
857 && files_to_unshare.insert(*shared_ancestor)
858 {
859 progress = true;
860 }
861 }
862 if !progress && links_to_delete.insert(*link) {
863 progress = true;
864 }
865 if !progress {
866 return Err(LbErrKind::Unexpected(format!(
867 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
868 )).into());
869 }
870 }
871 ValidationFailure::DuplicateLink { target } => {
872 let mut progress = false;
874 if let Some(link) = local.linked_by(target)? {
875 if links_to_delete.insert(link) {
876 progress = true;
877 }
878 }
879 if !progress {
880 return Err(LbErrKind::Unexpected(format!(
881 "sync failed to resolve duplicate link: target: {target:?}"
882 ))
883 .into());
884 }
885 }
886 ValidationFailure::BrokenLink(link) => {
887 if !links_to_delete.insert(*link) {
889 return Err(LbErrKind::Unexpected(format!(
890 "sync failed to resolve broken link: {link:?}"
891 ))
892 .into());
893 }
894 }
895 ValidationFailure::OwnedLink(link) => {
896 let mut progress = false;
898 if let Some(remote_link) = remote.maybe_find(link) {
899 if let FileType::Link { target } = remote_link.file_type() {
900 let remote_target = remote.find(&target)?;
901 if remote_target.owner() != me
902 && files_to_unmove.insert(target)
903 {
904 progress = true;
905 }
906 }
907 }
908 if !progress && links_to_delete.insert(*link) {
909 progress = true;
910 }
911 if !progress {
912 return Err(LbErrKind::Unexpected(format!(
913 "sync failed to resolve owned link: {link:?}"
914 ))
915 .into());
916 }
917 }
918 ValidationFailure::Orphan(_)
920 | ValidationFailure::NonFolderWithChildren(_)
921 | ValidationFailure::FileWithDifferentOwnerParent(_)
922 | ValidationFailure::FileNameTooLong(_)
923 | ValidationFailure::DeletedFileUpdated(_)
924 | ValidationFailure::NonDecryptableFileName(_) => {
925 validate_result?;
926 }
927 },
928 _ => {
930 validate_result?;
931 }
932 },
933 }
934 }
935 };
936
937 (&mut db.base_metadata)
939 .to_staged(remote_changes.clone())
940 .to_lazy()
941 .promote()?;
942 db.local_metadata.clear()?;
943 (&mut db.local_metadata)
944 .to_staged(merge_changes)
945 .to_lazy()
946 .promote()?;
947
948 db.base_metadata.stage(&mut db.local_metadata).prune()?;
951
952 if start.elapsed() > std::time::Duration::from_millis(100) {
953 warn!("sync merge held lock for {:?}", start.elapsed());
954 }
955
956 Ok(())
957 }
958
959 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
961 ctx.msg("Pushing tree changes...");
962 let mut updates = vec![];
963 let mut local_changes_no_digests = Vec::new();
964
965 let tx = self.ro_tx().await;
966 let db = tx.db();
967
968 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
970
971 for id in local.tree.staged.ids() {
972 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
973 let maybe_base_file = local.tree.base.maybe_find(&id);
974
975 local_change.set_hmac_and_size(
977 maybe_base_file.and_then(|f| f.document_hmac().copied()),
978 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
979 );
980 let local_change = local_change.sign(&self.keychain)?;
981
982 local_changes_no_digests.push(local_change.clone());
983 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
984 updates.push(file_diff);
985 }
986
987 drop(tx);
988
989 if !updates.is_empty() {
990 self.client
991 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
992 .await?;
993 ctx.pushed_metas = updates;
994 }
995
996 let mut tx = self.begin_tx().await;
997 let db = tx.db();
998
999 (&mut db.base_metadata)
1001 .to_lazy()
1002 .stage(local_changes_no_digests)
1003 .promote()?;
1004 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1005
1006 tx.end();
1007
1008 Ok(())
1009 }
1010
1011 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1013 ctx.msg("Pushing document changes...");
1014 let mut updates = vec![];
1015 let mut local_changes_digests_only = vec![];
1016
1017 let tx = self.ro_tx().await;
1018 let db = tx.db();
1019 let start = Instant::now();
1020
1021 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1022
1023 for id in local.tree.staged.ids() {
1024 let base_file = local.tree.base.find(&id)?.clone();
1025
1026 let mut local_change = base_file.timestamped_value.value.clone();
1028 local_change.set_hmac_and_size(
1029 local.find(&id)?.document_hmac().copied(),
1030 *local.find(&id)?.timestamped_value.value.doc_size(),
1031 );
1032
1033 if base_file.document_hmac() == local_change.document_hmac()
1034 || local_change.document_hmac().is_none()
1035 {
1036 continue;
1037 }
1038
1039 let local_change = local_change.sign(&self.keychain)?;
1040
1041 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1042 local_changes_digests_only.push(local_change);
1043 self.events.sync(SyncIncrement::PushingDocument(id, true));
1044 }
1045
1046 drop(tx);
1047 if start.elapsed() > std::time::Duration::from_millis(100) {
1048 warn!("sync push_docs held lock for {:?}", start.elapsed());
1049 }
1050
1051 let docs_count = updates.len();
1052 ctx.total += docs_count;
1053 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1054
1055 let mut stream = stream::iter(futures).buffer_unordered(
1056 thread::available_parallelism()
1057 .unwrap_or(NonZeroUsize::new(4).unwrap())
1058 .into(),
1059 );
1060
1061 let mut idx = 0;
1062 while let Some(fut) = stream.next().await {
1063 let id = fut?;
1064 self.events.sync(SyncIncrement::PushingDocument(id, false));
1065 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1066 idx += 1;
1067 }
1068 ctx.pushed_docs = updates;
1069
1070 let mut tx = self.begin_tx().await;
1071 let db = tx.db();
1072 (&mut db.base_metadata)
1074 .to_lazy()
1075 .stage(local_changes_digests_only)
1076 .promote()?;
1077
1078 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1079
1080 tx.end();
1081
1082 Ok(())
1083 }
1084
1085 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1086 let id = *diff.new.id();
1087 let hmac = diff.new.document_hmac();
1088 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1089 self.client
1090 .request(
1091 self.get_account()?,
1092 ChangeDocRequestV2 { diff, new_content: local_document_change },
1093 )
1094 .await?;
1095
1096 Ok(id)
1097 }
1098
1099 async fn dedup(
1100 &self, updates: GetUpdatesResponseV2,
1101 ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1102 let tx = self.ro_tx().await;
1103 let db = tx.db();
1104
1105 let mut root_id = None;
1106 let (remote_changes, update_as_of) = {
1107 let mut remote_changes = updates.file_metadata;
1108 let update_as_of = updates.as_of_metadata_version;
1109
1110 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1111
1112 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1113
1114 let (_, remote_changes) = remote.unstage();
1115 (remote_changes, update_as_of)
1116 };
1117
1118 if db.root.get().is_none() {
1120 let root = remote_changes
1121 .all_files()?
1122 .into_iter()
1123 .find(|f| f.is_root())
1124 .ok_or(LbErrKind::RootNonexistent)?;
1125 root_id = Some(*root.id());
1126 }
1127
1128 Ok((remote_changes, update_as_of, root_id))
1129 }
1130
1131 async fn prune_remote_orphans(
1132 &self, remote_changes: Vec<SignedMeta>,
1133 ) -> LbResult<Vec<SignedMeta>> {
1134 let tx = self.ro_tx().await;
1135 let db = tx.db();
1136
1137 let me = Owner(self.keychain.get_pk()?);
1138 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1139 let mut result = Vec::new();
1140
1141 for id in remote.tree.staged.ids() {
1142 let meta = remote.find(&id)?;
1143 if remote.maybe_find_parent(meta).is_some()
1144 || meta
1145 .user_access_keys()
1146 .iter()
1147 .any(|k| k.encrypted_for == me.0)
1148 {
1149 result.push(remote.find(&id)?.clone()); }
1151 }
1152 Ok(result)
1153 }
1154
1155 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1156 ctx.msg("Cleaning up...");
1157 let mut tx = self.begin_tx().await;
1158 let db = tx.db();
1159 db.last_synced.insert(ctx.update_as_of as i64)?;
1160
1161 if let Some(root) = ctx.new_root {
1162 db.root.insert(root)?;
1163 }
1164
1165 Ok(())
1166 }
1167
1168 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1169 let tx = self.ro_tx().await;
1170 let db = tx.db();
1171 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1172
1173 Ok(self.get_timestamp_human_string(last_synced))
1174 }
1175
1176 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1177 if timestamp != 0 {
1178 Duration::milliseconds(clock::get_time().0 - timestamp)
1179 .format_human()
1180 .to_string()
1181 } else {
1182 "never".to_string()
1183 }
1184 }
1185}
1186
1187impl SyncContext {
1188 fn summarize(&self) -> SyncStatus {
1189 let mut local = HashSet::new();
1190 let mut server = HashSet::new();
1191 let mut work_units = vec![];
1192
1193 for meta in &self.pushed_metas {
1194 local.insert(meta.new.id());
1195 }
1196
1197 for meta in &self.pushed_docs {
1198 local.insert(meta.new.id());
1199 }
1200
1201 for meta in &self.remote_changes {
1202 server.insert(meta.id());
1203 }
1204
1205 for id in local {
1206 work_units.push(WorkUnit::LocalChange(*id));
1207 }
1208
1209 for id in server {
1210 work_units.push(WorkUnit::ServerChange(*id));
1211 }
1212
1213 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1214 }
1215
1216 fn msg(&mut self, msg: &str) {
1217 self.current += 1;
1218 if let Some(f) = &self.progress {
1219 f(SyncProgress {
1220 total: self.total,
1221 progress: self.current,
1222 file_being_processed: Default::default(),
1223 msg: msg.to_string(),
1224 })
1225 }
1226 }
1227
1228 fn file_msg(&mut self, id: Uuid, msg: &str) {
1229 self.current += 1;
1230 if let Some(f) = &self.progress {
1231 f(SyncProgress {
1232 total: self.total,
1233 progress: self.current,
1234 file_being_processed: Some(id),
1235 msg: msg.to_string(),
1236 })
1237 }
1238 }
1239
1240 fn done_msg(&mut self) {
1241 self.current = self.total;
1242 if let Some(f) = &self.progress {
1243 f(SyncProgress {
1244 total: self.total,
1245 progress: self.current,
1246 file_being_processed: None,
1247 msg: "Sync successful!".to_string(),
1248 })
1249 }
1250 }
1251}
1252
1253#[derive(Debug, Serialize, Clone)]
1254pub struct SyncStatus {
1255 pub work_units: Vec<WorkUnit>,
1256 pub latest_server_ts: u64,
1257}
1258
1259#[derive(Clone)]
1260pub struct SyncProgress {
1261 pub total: usize,
1262 pub progress: usize,
1263 pub file_being_processed: Option<Uuid>,
1264 pub msg: String,
1265}
1266
1267impl Display for SyncProgress {
1268 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1269 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1270 }
1271}
1272
1273#[derive(Debug, Clone)]
1274pub enum SyncIncrement {
1275 SyncStarted,
1276 PullingDocument(Uuid, bool),
1277 PushingDocument(Uuid, bool),
1278 SyncFinished(Option<LbErrKind>),
1279}