1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5 ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
6 GetUsernameError, GetUsernameRequest, UpsertRequest,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_file::SignedFile;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41 current: usize,
42 total: usize,
43
44 pk_cache: HashMap<Owner, String>,
45 last_synced: u64,
46 remote_changes: Vec<SignedFile>,
47 update_as_of: u64,
48 root: Option<Uuid>,
49 pushed_metas: Vec<FileDiff<SignedFile>>,
50 pushed_docs: Vec<FileDiff<SignedFile>>,
51 pulled_docs: Vec<Uuid>,
52}
53
54impl Lb {
55 #[instrument(level = "debug", skip_all, err(Debug))]
56 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
57 let tx = self.ro_tx().await;
58 let db = tx.db();
59 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
60 drop(tx);
61
62 let remote_changes = self
63 .client
64 .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
65 .await?;
66 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
67 let remote_dirty = deduped
68 .into_iter()
69 .map(|f| *f.id())
70 .map(WorkUnit::ServerChange);
71
72 self.prune().await?;
73
74 let tx = self.ro_tx().await;
75 let db = tx.db();
76
77 let locally_dirty = db
78 .local_metadata
79 .get()
80 .keys()
81 .copied()
82 .map(WorkUnit::LocalChange);
83
84 let mut work_units: Vec<WorkUnit> = Vec::new();
85 work_units.extend(locally_dirty.chain(remote_dirty));
86 Ok(SyncStatus { work_units, latest_server_ts })
87 }
88
89 #[instrument(level = "debug", skip_all, err(Debug))]
90 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
91 let old = self.syncing.swap(true, Ordering::SeqCst);
92 if old {
93 return Err(LbErrKind::AlreadySyncing.into());
94 }
95
96 let mut ctx = self.setup_sync(f).await?;
97
98 let mut got_updates = false;
99 let mut pipeline: LbResult<()> = async {
100 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
102 self.prune().await?;
103 got_updates = self.fetch_meta(&mut ctx).await?;
104 self.populate_pk_cache(&mut ctx).await?;
105 self.docs.dont_delete.store(true, Ordering::SeqCst);
106 self.fetch_docs(&mut ctx).await?;
107 self.merge(&mut ctx).await?;
108 self.push_meta(&mut ctx).await?;
109 self.push_docs(&mut ctx).await?;
110 Ok(())
111 }
112 .await;
113
114 self.docs.dont_delete.store(false, Ordering::SeqCst);
115
116 if pipeline.is_ok() {
117 pipeline = self.commit_last_synced(&mut ctx).await;
118 }
119
120 let cleanup = self.cleanup().await;
121
122 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
123 self.events.sync(SyncIncrement::SyncFinished(ekind));
124
125 self.syncing.store(false, Ordering::Relaxed);
126 pipeline?;
127 cleanup?;
128
129 ctx.done_msg();
131
132 if got_updates {
133 self.events.meta_changed();
135 for id in &ctx.pulled_docs {
136 self.events.doc_written(*id, Some(Actor::Sync));
137 }
138 }
139
140 Ok(ctx.summarize())
141 }
142
143 async fn setup_sync(
144 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
145 ) -> LbResult<SyncContext> {
146 let tx = self.ro_tx().await;
147 let db = tx.db();
148
149 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
150 let pk_cache = db.pub_key_lookup.get().clone();
151
152 let current = 0;
153 let total = 7;
154
155 Ok(SyncContext {
156 last_synced,
157 pk_cache,
158
159 progress,
160 current,
161 total,
162
163 root: Default::default(),
164 update_as_of: Default::default(),
165 remote_changes: Default::default(),
166 pushed_docs: Default::default(),
167 pushed_metas: Default::default(),
168 pulled_docs: Default::default(),
169 })
170 }
171
172 async fn prune(&self) -> LbResult<()> {
173 let server_ids = self
174 .client
175 .request(self.get_account()?, GetFileIdsRequest {})
176 .await?
177 .ids;
178
179 let mut tx = self.begin_tx().await;
180 let db = tx.db();
181
182 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
183 let base_ids = local.tree.base.ids();
184
185 let mut prunable_ids = base_ids;
186 prunable_ids.retain(|id| !server_ids.contains(id));
187 for id in prunable_ids.clone() {
188 prunable_ids.extend(local.descendants(&id)?.into_iter());
189 }
190 for id in &prunable_ids {
191 if let Some(base_file) = local.tree.base.maybe_find(id) {
192 self.docs
193 .delete(*id, base_file.document_hmac().copied())
194 .await?;
195 }
196 if let Some(local_file) = local.maybe_find(id) {
197 self.docs
198 .delete(*id, local_file.document_hmac().copied())
199 .await?;
200 }
201 }
202
203 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
204 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
205 base_staged.promote()?;
206
207 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
208 local_staged.tree.removed = prunable_ids.into_iter().collect();
209 local_staged.promote()?;
210
211 Ok(())
212 }
213
214 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
216 ctx.msg("Fetching tree updates...");
217 let updates = self
218 .client
219 .request(
220 self.get_account()?,
221 GetUpdatesRequest { since_metadata_version: ctx.last_synced },
222 )
223 .await?;
224
225 let empty = updates.file_metadata.is_empty();
226 let (remote, as_of, root) = self.dedup(updates).await?;
227
228 ctx.remote_changes = remote;
229 ctx.update_as_of = as_of;
230 ctx.root = root;
231
232 Ok(!empty)
233 }
234
235 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
236 ctx.msg("Updating public key cache...");
237 let mut all_owners = HashSet::new();
238 for file in &ctx.remote_changes {
239 for user_access_key in file.user_access_keys() {
240 all_owners.insert(Owner(user_access_key.encrypted_by));
241 all_owners.insert(Owner(user_access_key.encrypted_for));
242 }
243 }
244
245 let mut new_entries = HashMap::new();
246
247 for owner in all_owners {
248 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
249 let username_result = self
250 .client
251 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
252 .await;
253 let username = match username_result {
254 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
255 "<unknown>".to_string()
256 }
257 _ => username_result?.username.clone(),
258 };
259 new_entries.insert(owner, username.clone());
260 e.insert(username.clone());
261 }
262 }
263
264 let mut tx = self.begin_tx().await;
265 let db = tx.db();
266
267 for (owner, username) in new_entries {
268 db.pub_key_lookup.insert(owner, username)?;
269 }
270 Ok(())
271 }
272
273 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
274 ctx.msg("Fetching documents...");
275 let mut docs_to_pull = vec![];
276
277 let tx = self.ro_tx().await;
278 let db = tx.db();
279 let start = Instant::now();
280
281 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
283 if remote.calculate_deleted(&id)? {
284 continue;
285 }
286 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
287 let base_hmac = remote
288 .tree
289 .base
290 .maybe_find(&id)
291 .and_then(|f| f.document_hmac())
292 .cloned();
293 if base_hmac == remote_hmac {
294 continue;
295 }
296
297 if let Some(remote_hmac) = remote_hmac {
298 docs_to_pull.push((id, remote_hmac));
299 self.events.sync(SyncIncrement::PullingDocument(id, true));
300 }
301 }
302
303 drop(tx);
304 if start.elapsed() > std::time::Duration::from_millis(100) {
305 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
306 }
307
308 let num_docs = docs_to_pull.len();
309 ctx.total += num_docs;
310
311 let futures = docs_to_pull
312 .into_iter()
313 .map(|(id, hmac)| self.fetch_doc(id, hmac));
314
315 let mut stream = stream::iter(futures).buffer_unordered(
316 thread::available_parallelism()
317 .unwrap_or(NonZeroUsize::new(4).unwrap())
318 .into(),
319 );
320
321 let mut idx = 0;
322 while let Some(fut) = stream.next().await {
323 let id = fut?;
324 ctx.pulled_docs.push(id);
325 self.events.sync(SyncIncrement::PullingDocument(id, false));
326 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
327 idx += 1;
328 }
329 Ok(())
330 }
331
332 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
333 let remote_document = self
334 .client
335 .request(self.get_account()?, GetDocRequest { id, hmac })
336 .await?;
337 self.docs
338 .insert(id, Some(hmac), &remote_document.content)
339 .await?;
340
341 Ok(id)
342 }
343
344 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
347 let mut tx = self.begin_tx().await;
348 let db = tx.db();
349 let start = Instant::now();
350
351 let remote_changes = &ctx.remote_changes;
352
353 let me = Owner(self.keychain.get_pk()?);
355
356 let merge_changes = {
358 let mut base = (&db.base_metadata).to_lazy();
360 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
361 let mut remote = remote_unlazy.as_lazy();
362 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
363
364 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
366 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
367 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
368 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
369 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
370
371 'merge_construction: loop {
372 let mut deletions = {
374 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
375
376 let mut deletion_creations = HashSet::new();
378 for id in db.local_metadata.ids() {
379 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
380 deletion_creations.insert(id);
381 }
382 }
383 'drain_creations: while !deletion_creations.is_empty() {
384 'choose_a_creation: for id in &deletion_creations {
385 let id = *id;
387 let local_file = local.find(&id)?.clone();
388 let result = deletions.create_unvalidated(
389 id,
390 symkey::generate_key(),
391 local_file.parent(),
392 &local.name(&id, &self.keychain)?,
393 local_file.file_type(),
394 &self.keychain,
395 );
396 match result {
397 Ok(_) => {
398 deletion_creations.remove(&id);
399 continue 'drain_creations;
400 }
401 Err(ref err) => match err.kind {
402 LbErrKind::FileParentNonexistent => {
403 continue 'choose_a_creation;
404 }
405 _ => {
406 result?;
407 }
408 },
409 }
410 }
411 return Err(LbErrKind::Unexpected(format!(
412 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
413 ))
414 .into());
415 }
416
417 for id in db.local_metadata.ids() {
419 let local_file = local.find(&id)?.clone();
420 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
421 if !local_file.explicitly_deleted()
422 && local_file.parent() != base_file.parent()
423 && !files_to_unmove.contains(&id)
424 {
425 deletions.move_unvalidated(
427 &id,
428 local_file.parent(),
429 &self.keychain,
430 )?;
431 }
432 }
433 }
434
435 for id in db.local_metadata.ids() {
437 let local_file = local.find(&id)?.clone();
438 if local_file.explicitly_deleted() {
439 deletions.delete_unvalidated(&id, &self.keychain)?;
441 }
442 }
443 deletions
444 };
445
446 let mut merge = {
448 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
449
450 let mut creations = HashSet::new();
452 for id in db.local_metadata.ids() {
453 if deletions.maybe_find(&id).is_some()
454 && !deletions.calculate_deleted(&id)?
455 && remote.maybe_find(&id).is_none()
456 && !links_to_delete.contains(&id)
457 {
458 creations.insert(id);
459 }
460 }
461 'drain_creations: while !creations.is_empty() {
462 'choose_a_creation: for id in &creations {
463 let id = *id;
465 let local_file = local.find(&id)?.clone();
466 let result = merge.create_unvalidated(
467 id,
468 local.decrypt_key(&id, &self.keychain)?,
469 local_file.parent(),
470 &local.name(&id, &self.keychain)?,
471 local_file.file_type(),
472 &self.keychain,
473 );
474 match result {
475 Ok(_) => {
476 creations.remove(&id);
477 continue 'drain_creations;
478 }
479 Err(ref err) => match err.kind {
480 LbErrKind::FileParentNonexistent => {
481 continue 'choose_a_creation;
482 }
483 _ => {
484 result?;
485 }
486 },
487 }
488 }
489 return Err(LbErrKind::Unexpected(format!(
490 "sync failed to find a topomodelal order for file creations: {creations:?}"
491 ))
492 .into());
493 }
494
495 for id in db.local_metadata.ids() {
498 if deletions.maybe_find(&id).is_none()
500 || deletions.calculate_deleted(&id)?
501 || (remote.maybe_find(&id).is_some()
502 && remote.calculate_deleted(&id)?)
503 {
504 continue;
505 }
506
507 let local_file = local.find(&id)?.clone();
508 let local_name = local.name(&id, &self.keychain)?;
509 let maybe_base_file = base.maybe_find(&id).cloned();
510 let maybe_remote_file = remote.maybe_find(&id).cloned();
511 if let Some(ref base_file) = maybe_base_file {
512 let base_name = base.name(&id, &self.keychain)?;
513 let remote_file = remote.find(&id)?.clone();
514 let remote_name = remote.name(&id, &self.keychain)?;
515
516 if local_file.parent() != base_file.parent()
518 && remote_file.parent() == base_file.parent()
519 && !files_to_unmove.contains(&id)
520 {
521 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
522 }
523
524 if local_name != base_name && remote_name == base_name {
526 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
527 }
528 }
529
530 let mut remote_keys = HashMap::new();
532 if let Some(ref remote_file) = maybe_remote_file {
533 for key in remote_file.user_access_keys() {
534 remote_keys.insert(
535 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
536 (key.mode, key.deleted),
537 );
538 }
539 }
540 for key in local_file.user_access_keys() {
541 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
542 if let Some(&(remote_mode, remote_deleted)) =
543 remote_keys.get(&(by, for_))
544 {
545 if key.mode > remote_mode || !key.deleted && remote_deleted {
547 let mode = match key.mode {
548 UserAccessMode::Read => ShareMode::Read,
549 UserAccessMode::Write => ShareMode::Write,
550 UserAccessMode::Owner => continue,
551 };
552 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
553 }
554 if key.deleted && !remote_deleted {
556 merge.delete_share_unvalidated(
557 &id,
558 Some(for_.0),
559 &self.keychain,
560 )?;
561 }
562 } else {
563 let mode = match key.mode {
565 UserAccessMode::Read => ShareMode::Read,
566 UserAccessMode::Write => ShareMode::Write,
567 UserAccessMode::Owner => continue,
568 };
569 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
570 }
571 }
572
573 if files_to_unshare.contains(&id) {
575 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
576 }
577
578 if let Some(&rename_increment) = rename_increments.get(&id) {
580 let name = NameComponents::from(&local_name)
581 .generate_incremented(rename_increment)
582 .to_name();
583 merge.rename_unvalidated(&id, &name, &self.keychain)?;
584 }
585
586 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
588 let remote_hmac =
589 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
590 let local_hmac = local_file.document_hmac().cloned();
591 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
592 && local_hmac != base_hmac
593 {
594 if remote_hmac != base_hmac && remote_hmac != local_hmac {
595 let merge_name = merge.name(&id, &self.keychain)?;
597 let document_type =
598 DocumentType::from_file_name_using_extension(&merge_name);
599
600 let base_document =
603 self.read_document_helper(id, &mut base).await?;
604 let remote_document =
605 self.read_document_helper(id, &mut remote).await?;
606 let local_document =
607 self.read_document_helper(id, &mut local).await?;
608
609 match document_type {
610 DocumentType::Text => {
611 let base_document =
614 String::from_utf8_lossy(&base_document).to_string();
615 let remote_document =
616 String::from_utf8_lossy(&remote_document).to_string();
617 let local_document =
618 String::from_utf8_lossy(&local_document).to_string();
619 let merged_document = Buffer::from(base_document.as_str())
620 .merge(local_document, remote_document);
621 let encrypted_document = merge
622 .update_document_unvalidated(
623 &id,
624 &merged_document.into_bytes(),
625 &self.keychain,
626 )?;
627 let hmac = merge.find(&id)?.document_hmac().copied();
628 self.docs.insert(id, hmac, &encrypted_document).await?;
629 }
630 DocumentType::Drawing => {
631 let base_document =
632 String::from_utf8_lossy(&base_document).to_string();
633 let remote_document =
634 String::from_utf8_lossy(&remote_document).to_string();
635 let local_document =
636 String::from_utf8_lossy(&local_document).to_string();
637
638 let base_buffer = svg::buffer::Buffer::new(&base_document);
639 let remote_buffer =
640 svg::buffer::Buffer::new(&remote_document);
641 let mut local_buffer =
642 svg::buffer::Buffer::new(&local_document);
643
644 for (_, el) in local_buffer.elements.iter_mut() {
645 if let Element::Path(path) = el {
646 path.data.apply_transform(u_transform_to_bezier(
647 &Transform::from(
648 local_buffer
649 .weak_viewport_settings
650 .master_transform,
651 ),
652 ));
653 }
654 }
655 svg::buffer::Buffer::reload(
656 &mut local_buffer.elements,
657 &mut local_buffer.weak_images,
658 &mut local_buffer.weak_path_pressures,
659 &mut local_buffer.weak_viewport_settings,
660 &base_buffer,
661 &remote_buffer,
662 );
663
664 let merged_document = local_buffer.serialize();
665 let encrypted_document = merge
666 .update_document_unvalidated(
667 &id,
668 &merged_document.into_bytes(),
669 &self.keychain,
670 )?;
671 let hmac = merge.find(&id)?.document_hmac().copied();
672 self.docs.insert(id, hmac, &encrypted_document).await?;
673 }
674 DocumentType::Other => {
675 let merge_parent = *merge.find(&id)?.parent();
677 let duplicate_id = if let Some(&duplicate_id) =
678 duplicate_file_ids.get(&id)
679 {
680 duplicate_id
681 } else {
682 let duplicate_id = Uuid::new_v4();
683 duplicate_file_ids.insert(id, duplicate_id);
684 rename_increments.insert(duplicate_id, 1);
685 duplicate_id
686 };
687
688 let mut merge_name = merge_name;
689 merge_name = NameComponents::from(&merge_name)
690 .generate_incremented(
691 rename_increments
692 .get(&duplicate_id)
693 .copied()
694 .unwrap_or_default(),
695 )
696 .to_name();
697
698 merge.create_unvalidated(
699 duplicate_id,
700 symkey::generate_key(),
701 &merge_parent,
702 &merge_name,
703 FileType::Document,
704 &self.keychain,
705 )?;
706 let encrypted_document = merge
707 .update_document_unvalidated(
708 &duplicate_id,
709 &local_document,
710 &self.keychain,
711 )?;
712 let duplicate_hmac =
713 merge.find(&duplicate_id)?.document_hmac().copied();
714 self.docs
715 .insert(
716 duplicate_id,
717 duplicate_hmac,
718 &encrypted_document,
719 )
720 .await?;
721 }
722 }
723 } else {
724 let document = self.read_document_helper(id, &mut local).await?;
726 merge.update_document_unvalidated(
727 &id,
728 &document,
729 &self.keychain,
730 )?;
731 }
732 }
733 }
734
735 for id in db.local_metadata.ids() {
738 if db.base_metadata.maybe_find(&id).is_some()
739 && deletions.calculate_deleted(&id)?
740 && !merge.calculate_deleted(&id)?
741 {
742 merge.delete_unvalidated(&id, &self.keychain)?;
744 }
745 }
746 for &id in &links_to_delete {
747 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
749 merge.delete_unvalidated(&id, &self.keychain)?;
750 }
751 }
752
753 merge
754 };
755
756 for link in merge.ids() {
758 if !merge.calculate_deleted(&link)? {
759 if let FileType::Link { target } = merge.find(&link)?.file_type() {
760 if merge.maybe_find(&target).is_some()
761 && merge.calculate_deleted(&target)?
762 {
763 if links_to_delete.insert(link) {
765 continue 'merge_construction;
766 } else {
767 return Err(LbErrKind::Unexpected(format!(
768 "sync failed to resolve broken link (deletion): {link:?}"
769 ))
770 .into());
771 }
772 }
773 }
774 }
775 }
776
777 let validate_result = merge.validate(me);
778 match validate_result {
779 Ok(_) => {
781 let (_, merge_changes) = merge.unstage();
782 break merge_changes;
783 }
784 Err(ref err) => match err.kind {
785 LbErrKind::Validation(ref vf) => match vf {
786 ValidationFailure::Cycle(ids) => {
788 let mut progress = false;
790 for &id in ids {
791 if db.local_metadata.maybe_find(&id).is_some()
792 && files_to_unmove.insert(id)
793 {
794 progress = true;
795 }
796 }
797 if !progress {
798 return Err(LbErrKind::Unexpected(format!(
799 "sync failed to resolve cycle: {ids:?}"
800 ))
801 .into());
802 }
803 }
804 ValidationFailure::PathConflict(ids) => {
805 let mut progress = false;
807 for &id in ids {
808 if duplicate_file_ids.values().any(|&dup| dup == id) {
809 *rename_increments.entry(id).or_insert(0) += 1;
810 progress = true;
811 break;
812 }
813 }
814 if !progress {
815 for &id in ids {
816 if db.local_metadata.maybe_find(&id).is_some() {
817 *rename_increments.entry(id).or_insert(0) += 1;
818 progress = true;
819 break;
820 }
821 }
822 }
823 if !progress {
824 return Err(LbErrKind::Unexpected(format!(
825 "sync failed to resolve path conflict: {ids:?}"
826 ))
827 .into());
828 }
829 }
830 ValidationFailure::SharedLink { link, shared_ancestor } => {
831 let mut progress = false;
833 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
834 {
835 if !base_shared_ancestor.is_shared()
836 && files_to_unshare.insert(*shared_ancestor)
837 {
838 progress = true;
839 }
840 }
841 if !progress && links_to_delete.insert(*link) {
842 progress = true;
843 }
844 if !progress {
845 return Err(LbErrKind::Unexpected(format!(
846 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
847 )).into());
848 }
849 }
850 ValidationFailure::DuplicateLink { target } => {
851 let mut progress = false;
853 if let Some(link) = local.linked_by(target)? {
854 if links_to_delete.insert(link) {
855 progress = true;
856 }
857 }
858 if !progress {
859 return Err(LbErrKind::Unexpected(format!(
860 "sync failed to resolve duplicate link: target: {target:?}"
861 ))
862 .into());
863 }
864 }
865 ValidationFailure::BrokenLink(link) => {
866 if !links_to_delete.insert(*link) {
868 return Err(LbErrKind::Unexpected(format!(
869 "sync failed to resolve broken link: {link:?}"
870 ))
871 .into());
872 }
873 }
874 ValidationFailure::OwnedLink(link) => {
875 let mut progress = false;
877 if let Some(remote_link) = remote.maybe_find(link) {
878 if let FileType::Link { target } = remote_link.file_type() {
879 let remote_target = remote.find(&target)?;
880 if remote_target.owner() != me
881 && files_to_unmove.insert(target)
882 {
883 progress = true;
884 }
885 }
886 }
887 if !progress && links_to_delete.insert(*link) {
888 progress = true;
889 }
890 if !progress {
891 return Err(LbErrKind::Unexpected(format!(
892 "sync failed to resolve owned link: {link:?}"
893 ))
894 .into());
895 }
896 }
897 ValidationFailure::Orphan(_)
899 | ValidationFailure::NonFolderWithChildren(_)
900 | ValidationFailure::FileWithDifferentOwnerParent(_)
901 | ValidationFailure::FileNameTooLong(_)
902 | ValidationFailure::DeletedFileUpdated(_)
903 | ValidationFailure::NonDecryptableFileName(_) => {
904 validate_result?;
905 }
906 },
907 _ => {
909 validate_result?;
910 }
911 },
912 }
913 }
914 };
915
916 (&mut db.base_metadata)
918 .to_staged(remote_changes.clone())
919 .to_lazy()
920 .promote()?;
921 db.local_metadata.clear()?;
922 (&mut db.local_metadata)
923 .to_staged(merge_changes)
924 .to_lazy()
925 .promote()?;
926
927 db.base_metadata.stage(&mut db.local_metadata).prune()?;
930
931 if start.elapsed() > std::time::Duration::from_millis(100) {
932 warn!("sync merge held lock for {:?}", start.elapsed());
933 }
934
935 Ok(())
936 }
937
938 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
940 ctx.msg("Pushing tree changes...");
941 let mut updates = vec![];
942 let mut local_changes_no_digests = Vec::new();
943
944 let tx = self.ro_tx().await;
945 let db = tx.db();
946
947 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
949
950 for id in local.tree.staged.ids() {
951 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
952 let maybe_base_file = local.tree.base.maybe_find(&id);
953
954 local_change.document_hmac =
956 maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
957 let local_change = local_change.sign(&self.keychain)?;
958
959 local_changes_no_digests.push(local_change.clone());
960 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
961 updates.push(file_diff);
962 }
963
964 drop(tx);
965
966 if !updates.is_empty() {
967 self.client
968 .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
969 .await?;
970 ctx.pushed_metas = updates;
971 }
972
973 let mut tx = self.begin_tx().await;
974 let db = tx.db();
975
976 (&mut db.base_metadata)
978 .to_lazy()
979 .stage(local_changes_no_digests)
980 .promote()?;
981 db.base_metadata.stage(&mut db.local_metadata).prune()?;
982
983 tx.end();
984
985 Ok(())
986 }
987
988 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
990 ctx.msg("Pushing document changes...");
991 let mut updates = vec![];
992 let mut local_changes_digests_only = vec![];
993
994 let tx = self.ro_tx().await;
995 let db = tx.db();
996 let start = Instant::now();
997
998 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
999
1000 for id in local.tree.staged.ids() {
1001 let base_file = local.tree.base.find(&id)?.clone();
1002
1003 let mut local_change = base_file.timestamped_value.value.clone();
1005 local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1006
1007 if base_file.document_hmac() == local_change.document_hmac()
1008 || local_change.document_hmac.is_none()
1009 {
1010 continue;
1011 }
1012
1013 let local_change = local_change.sign(&self.keychain)?;
1014
1015 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1016 local_changes_digests_only.push(local_change);
1017 self.events.sync(SyncIncrement::PushingDocument(id, true));
1018 }
1019
1020 drop(tx);
1021 if start.elapsed() > std::time::Duration::from_millis(100) {
1022 warn!("sync push_docs held lock for {:?}", start.elapsed());
1023 }
1024
1025 let docs_count = updates.len();
1026 ctx.total += docs_count;
1027 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1028
1029 let mut stream = stream::iter(futures).buffer_unordered(
1030 thread::available_parallelism()
1031 .unwrap_or(NonZeroUsize::new(4).unwrap())
1032 .into(),
1033 );
1034
1035 let mut idx = 0;
1036 while let Some(fut) = stream.next().await {
1037 let id = fut?;
1038 self.events.sync(SyncIncrement::PushingDocument(id, false));
1039 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1040 idx += 1;
1041 }
1042 ctx.pushed_docs = updates;
1043
1044 let mut tx = self.begin_tx().await;
1045 let db = tx.db();
1046 (&mut db.base_metadata)
1048 .to_lazy()
1049 .stage(local_changes_digests_only)
1050 .promote()?;
1051
1052 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1053
1054 tx.end();
1055
1056 Ok(())
1057 }
1058
1059 async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1060 let id = *diff.new.id();
1061 let hmac = diff.new.document_hmac();
1062 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1063 self.client
1064 .request(
1065 self.get_account()?,
1066 ChangeDocRequest { diff, new_content: local_document_change },
1067 )
1068 .await?;
1069
1070 Ok(id)
1071 }
1072
1073 async fn dedup(
1074 &self, updates: GetUpdatesResponse,
1075 ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1076 let tx = self.ro_tx().await;
1077 let db = tx.db();
1078
1079 let mut root_id = None;
1080 let (remote_changes, update_as_of) = {
1081 let mut remote_changes = updates.file_metadata;
1082 let update_as_of = updates.as_of_metadata_version;
1083
1084 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1085
1086 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1087
1088 let (_, remote_changes) = remote.unstage();
1089 (remote_changes, update_as_of)
1090 };
1091
1092 if db.root.get().is_none() {
1094 let root = remote_changes
1095 .all_files()?
1096 .into_iter()
1097 .find(|f| f.is_root())
1098 .ok_or(LbErrKind::RootNonexistent)?;
1099 root_id = Some(*root.id());
1100 }
1101
1102 Ok((remote_changes, update_as_of, root_id))
1103 }
1104
1105 async fn prune_remote_orphans(
1106 &self, remote_changes: Vec<SignedFile>,
1107 ) -> LbResult<Vec<SignedFile>> {
1108 let tx = self.ro_tx().await;
1109 let db = tx.db();
1110
1111 let me = Owner(self.keychain.get_pk()?);
1112 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1113 let mut result = Vec::new();
1114
1115 for id in remote.tree.staged.ids() {
1116 let meta = remote.find(&id)?;
1117 if remote.maybe_find_parent(meta).is_some()
1118 || meta
1119 .user_access_keys()
1120 .iter()
1121 .any(|k| k.encrypted_for == me.0)
1122 {
1123 result.push(remote.find(&id)?.clone()); }
1125 }
1126 Ok(result)
1127 }
1128
1129 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1130 ctx.msg("Cleaning up...");
1131 let mut tx = self.begin_tx().await;
1132 let db = tx.db();
1133 db.last_synced.insert(ctx.update_as_of as i64)?;
1134
1135 if let Some(root) = ctx.root {
1136 db.root.insert(root)?;
1137 }
1138
1139 Ok(())
1140 }
1141
1142 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1143 let tx = self.ro_tx().await;
1144 let db = tx.db();
1145 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1146
1147 Ok(self.get_timestamp_human_string(last_synced))
1148 }
1149
1150 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1151 if timestamp != 0 {
1152 Duration::milliseconds(clock::get_time().0 - timestamp)
1153 .format_human()
1154 .to_string()
1155 } else {
1156 "never".to_string()
1157 }
1158 }
1159}
1160
1161impl SyncContext {
1162 fn summarize(&self) -> SyncStatus {
1163 let mut local = HashSet::new();
1164 let mut server = HashSet::new();
1165 let mut work_units = vec![];
1166
1167 for meta in &self.pushed_metas {
1168 local.insert(meta.new.id());
1169 }
1170
1171 for meta in &self.pushed_docs {
1172 local.insert(meta.new.id());
1173 }
1174
1175 for meta in &self.remote_changes {
1176 server.insert(meta.id());
1177 }
1178
1179 for id in local {
1180 work_units.push(WorkUnit::LocalChange(*id));
1181 }
1182
1183 for id in server {
1184 work_units.push(WorkUnit::ServerChange(*id));
1185 }
1186
1187 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1188 }
1189
1190 fn msg(&mut self, msg: &str) {
1191 self.current += 1;
1192 if let Some(f) = &self.progress {
1193 f(SyncProgress {
1194 total: self.total,
1195 progress: self.current,
1196 file_being_processed: Default::default(),
1197 msg: msg.to_string(),
1198 })
1199 }
1200 }
1201
1202 fn file_msg(&mut self, id: Uuid, msg: &str) {
1203 self.current += 1;
1204 if let Some(f) = &self.progress {
1205 f(SyncProgress {
1206 total: self.total,
1207 progress: self.current,
1208 file_being_processed: Some(id),
1209 msg: msg.to_string(),
1210 })
1211 }
1212 }
1213
1214 fn done_msg(&mut self) {
1215 self.current = self.total;
1216 if let Some(f) = &self.progress {
1217 f(SyncProgress {
1218 total: self.total,
1219 progress: self.current,
1220 file_being_processed: None,
1221 msg: "Sync successful!".to_string(),
1222 })
1223 }
1224 }
1225}
1226
1227#[derive(Debug, Serialize, Clone)]
1228pub struct SyncStatus {
1229 pub work_units: Vec<WorkUnit>,
1230 pub latest_server_ts: u64,
1231}
1232
1233#[derive(Clone)]
1234pub struct SyncProgress {
1235 pub total: usize,
1236 pub progress: usize,
1237 pub file_being_processed: Option<Uuid>,
1238 pub msg: String,
1239}
1240
1241impl Display for SyncProgress {
1242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1243 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1244 }
1245}
1246
1247#[derive(Debug, Clone)]
1248pub enum SyncIncrement {
1249 SyncStarted,
1250 PullingDocument(Uuid, bool),
1251 PushingDocument(Uuid, bool),
1252 SyncFinished(Option<LbErrKind>),
1253}