1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4 ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5 GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use uuid::Uuid;
35
36pub type SyncFlag = Arc<AtomicBool>;
37
38pub struct SyncContext {
39 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
40 current: usize,
41 total: usize,
42
43 pk_cache: HashMap<Owner, String>,
44 last_synced: u64,
45 remote_changes: Vec<SignedFile>,
46 update_as_of: u64,
47 root: Option<Uuid>,
48 pushed_metas: Vec<FileDiff<SignedFile>>,
49 pushed_docs: Vec<FileDiff<SignedFile>>,
50 pulled_docs: Vec<Uuid>,
51}
52
53impl Lb {
54 #[instrument(level = "debug", skip_all, err(Debug))]
55 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
56 let tx = self.ro_tx().await;
57 let db = tx.db();
58 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
59 drop(tx);
60
61 let remote_changes = self
62 .client
63 .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
64 .await?;
65 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
66 let remote_dirty = deduped
67 .into_iter()
68 .map(|f| *f.id())
69 .map(WorkUnit::ServerChange);
70
71 self.prune().await?;
72
73 let tx = self.ro_tx().await;
74 let db = tx.db();
75
76 let locally_dirty = db
77 .local_metadata
78 .get()
79 .keys()
80 .copied()
81 .map(WorkUnit::LocalChange);
82
83 let mut work_units: Vec<WorkUnit> = Vec::new();
84 work_units.extend(locally_dirty.chain(remote_dirty));
85 Ok(SyncStatus { work_units, latest_server_ts })
86 }
87
88 #[instrument(level = "debug", skip_all, err(Debug))]
89 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
90 let old = self.syncing.swap(true, Ordering::SeqCst);
91 if old {
92 return Err(LbErrKind::AlreadySyncing.into());
93 }
94
95 let mut ctx = self.setup_sync(f).await?;
96
97 let mut got_updates = false;
98 let mut pipeline: LbResult<()> = async {
99 ctx.msg("Preparing Sync...");
100 self.prune().await?;
101 got_updates = self.fetch_meta(&mut ctx).await?;
102 self.populate_pk_cache(&mut ctx).await?;
103 self.docs.dont_delete.store(true, Ordering::SeqCst);
104 self.fetch_docs(&mut ctx).await?;
105 self.merge(&mut ctx).await?;
106 self.push_meta(&mut ctx).await?;
107 self.push_docs(&mut ctx).await?;
108 Ok(())
109 }
110 .await;
111
112 self.docs.dont_delete.store(false, Ordering::SeqCst);
113
114 if pipeline.is_ok() {
115 pipeline = self.commit_last_synced(&mut ctx).await;
116 }
117
118 let cleanup = self.cleanup().await;
119
120 self.syncing.store(false, Ordering::Relaxed);
121 pipeline?;
122 cleanup?;
123 ctx.done_msg();
124
125 if got_updates {
126 self.events.meta_changed(self.root().await?.id);
127 for id in &ctx.pulled_docs {
128 self.events.doc_written(*id);
129 }
130 }
131
132 Ok(ctx.summarize())
133 }
134
135 async fn setup_sync(
136 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
137 ) -> LbResult<SyncContext> {
138 let tx = self.ro_tx().await;
139 let db = tx.db();
140
141 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
142 let pk_cache = db.pub_key_lookup.get().clone();
143
144 let current = 0;
145 let total = 7;
146
147 Ok(SyncContext {
148 last_synced,
149 pk_cache,
150
151 progress,
152 current,
153 total,
154
155 root: Default::default(),
156 update_as_of: Default::default(),
157 remote_changes: Default::default(),
158 pushed_docs: Default::default(),
159 pushed_metas: Default::default(),
160 pulled_docs: Default::default(),
161 })
162 }
163
164 async fn prune(&self) -> LbResult<()> {
165 let server_ids = self
166 .client
167 .request(self.get_account()?, GetFileIdsRequest {})
168 .await?
169 .ids;
170
171 let mut tx = self.begin_tx().await;
172 let db = tx.db();
173
174 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
175 let base_ids = local.tree.base.ids();
176
177 let mut prunable_ids = base_ids;
178 prunable_ids.retain(|id| !server_ids.contains(id));
179 for id in prunable_ids.clone() {
180 prunable_ids.extend(local.descendants(&id)?.into_iter());
181 }
182 for id in &prunable_ids {
183 if let Some(base_file) = local.tree.base.maybe_find(id) {
184 self.docs
185 .delete(*id, base_file.document_hmac().copied())
186 .await?;
187 }
188 if let Some(local_file) = local.maybe_find(id) {
189 self.docs
190 .delete(*id, local_file.document_hmac().copied())
191 .await?;
192 }
193 }
194
195 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
196 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
197 base_staged.promote()?;
198
199 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
200 local_staged.tree.removed = prunable_ids.into_iter().collect();
201 local_staged.promote()?;
202
203 Ok(())
204 }
205
206 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
208 ctx.msg("Fetching tree updates...");
209 let updates = self
210 .client
211 .request(
212 self.get_account()?,
213 GetUpdatesRequest { since_metadata_version: ctx.last_synced },
214 )
215 .await?;
216
217 let empty = updates.file_metadata.is_empty();
218 let (remote, as_of, root) = self.dedup(updates).await?;
219 ctx.remote_changes = remote;
220 ctx.update_as_of = as_of;
221 ctx.root = root;
222
223 Ok(!empty)
224 }
225
226 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
227 ctx.msg("Updating public key cache...");
228 let mut all_owners = HashSet::new();
229 for file in &ctx.remote_changes {
230 for user_access_key in file.user_access_keys() {
231 all_owners.insert(Owner(user_access_key.encrypted_by));
232 all_owners.insert(Owner(user_access_key.encrypted_for));
233 }
234 }
235
236 let mut new_entries = HashMap::new();
237
238 for owner in all_owners {
239 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
240 let username_result = self
241 .client
242 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
243 .await;
244 let username = match username_result {
245 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
246 "<unknown>".to_string()
247 }
248 _ => username_result?.username.clone(),
249 };
250 new_entries.insert(owner, username.clone());
251 e.insert(username.clone());
252 }
253 }
254
255 let mut tx = self.begin_tx().await;
256 let db = tx.db();
257
258 for (owner, username) in new_entries {
259 db.pub_key_lookup.insert(owner, username)?;
260 }
261 Ok(())
262 }
263
264 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
265 ctx.msg("Fetching documents...");
266 let mut docs_to_pull = vec![];
267
268 let tx = self.ro_tx().await;
269 let db = tx.db();
270 let start = Instant::now();
271
272 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
274 if remote.calculate_deleted(&id)? {
275 continue;
276 }
277 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
278 let base_hmac = remote
279 .tree
280 .base
281 .maybe_find(&id)
282 .and_then(|f| f.document_hmac())
283 .cloned();
284 if base_hmac == remote_hmac {
285 continue;
286 }
287
288 if let Some(remote_hmac) = remote_hmac {
289 docs_to_pull.push((id, remote_hmac));
290 }
291 }
292
293 drop(tx);
294 if start.elapsed() > std::time::Duration::from_millis(100) {
295 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
296 }
297
298 let num_docs = docs_to_pull.len();
299 ctx.total += num_docs;
300
301 let futures = docs_to_pull
302 .into_iter()
303 .map(|(id, hmac)| self.fetch_doc(id, hmac));
304
305 let mut stream = stream::iter(futures).buffer_unordered(
306 thread::available_parallelism()
307 .unwrap_or(NonZeroUsize::new(4).unwrap())
308 .into(),
309 );
310
311 let mut idx = 0;
312 while let Some(fut) = stream.next().await {
313 let id = fut?;
314 ctx.pulled_docs.push(id);
315 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
316 idx += 1;
317 }
318 Ok(())
319 }
320
321 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
322 let remote_document = self
323 .client
324 .request(self.get_account()?, GetDocRequest { id, hmac })
325 .await?;
326 self.docs
327 .insert(id, Some(hmac), &remote_document.content)
328 .await?;
329
330 Ok(id)
331 }
332
333 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
336 let mut tx = self.begin_tx().await;
337 let db = tx.db();
338 let start = Instant::now();
339
340 let remote_changes = &ctx.remote_changes;
341
342 let me = Owner(self.keychain.get_pk()?);
344
345 let merge_changes = {
347 let mut base = (&db.base_metadata).to_lazy();
349 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
350 let mut remote = remote_unlazy.as_lazy();
351 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
352
353 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
355 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
356 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
357 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
358 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
359
360 'merge_construction: loop {
361 let mut deletions = {
363 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
364
365 let mut deletion_creations = HashSet::new();
367 for id in db.local_metadata.ids() {
368 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
369 deletion_creations.insert(id);
370 }
371 }
372 'drain_creations: while !deletion_creations.is_empty() {
373 'choose_a_creation: for id in &deletion_creations {
374 let id = *id;
376 let local_file = local.find(&id)?.clone();
377 let result = deletions.create_unvalidated(
378 id,
379 symkey::generate_key(),
380 local_file.parent(),
381 &local.name(&id, &self.keychain)?,
382 local_file.file_type(),
383 &self.keychain,
384 );
385 match result {
386 Ok(_) => {
387 deletion_creations.remove(&id);
388 continue 'drain_creations;
389 }
390 Err(ref err) => match err.kind {
391 LbErrKind::FileParentNonexistent => {
392 continue 'choose_a_creation;
393 }
394 _ => {
395 result?;
396 }
397 },
398 }
399 }
400 return Err(LbErrKind::Unexpected(format!(
401 "sync failed to find a topomodelal order for file creations: {:?}",
402 deletion_creations
403 ))
404 .into());
405 }
406
407 for id in db.local_metadata.ids() {
409 let local_file = local.find(&id)?.clone();
410 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
411 if !local_file.explicitly_deleted()
412 && local_file.parent() != base_file.parent()
413 && !files_to_unmove.contains(&id)
414 {
415 deletions.move_unvalidated(
417 &id,
418 local_file.parent(),
419 &self.keychain,
420 )?;
421 }
422 }
423 }
424
425 for id in db.local_metadata.ids() {
427 let local_file = local.find(&id)?.clone();
428 if local_file.explicitly_deleted() {
429 deletions.delete_unvalidated(&id, &self.keychain)?;
431 }
432 }
433 deletions
434 };
435
436 let mut merge = {
438 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
439
440 let mut creations = HashSet::new();
442 for id in db.local_metadata.ids() {
443 if deletions.maybe_find(&id).is_some()
444 && !deletions.calculate_deleted(&id)?
445 && remote.maybe_find(&id).is_none()
446 && !links_to_delete.contains(&id)
447 {
448 creations.insert(id);
449 }
450 }
451 'drain_creations: while !creations.is_empty() {
452 'choose_a_creation: for id in &creations {
453 let id = *id;
455 let local_file = local.find(&id)?.clone();
456 let result = merge.create_unvalidated(
457 id,
458 local.decrypt_key(&id, &self.keychain)?,
459 local_file.parent(),
460 &local.name(&id, &self.keychain)?,
461 local_file.file_type(),
462 &self.keychain,
463 );
464 match result {
465 Ok(_) => {
466 creations.remove(&id);
467 continue 'drain_creations;
468 }
469 Err(ref err) => match err.kind {
470 LbErrKind::FileParentNonexistent => {
471 continue 'choose_a_creation;
472 }
473 _ => {
474 result?;
475 }
476 },
477 }
478 }
479 return Err(LbErrKind::Unexpected(format!(
480 "sync failed to find a topomodelal order for file creations: {:?}",
481 creations
482 ))
483 .into());
484 }
485
486 for id in db.local_metadata.ids() {
489 if deletions.maybe_find(&id).is_none()
491 || deletions.calculate_deleted(&id)?
492 || (remote.maybe_find(&id).is_some()
493 && remote.calculate_deleted(&id)?)
494 {
495 continue;
496 }
497
498 let local_file = local.find(&id)?.clone();
499 let local_name = local.name(&id, &self.keychain)?;
500 let maybe_base_file = base.maybe_find(&id).cloned();
501 let maybe_remote_file = remote.maybe_find(&id).cloned();
502 if let Some(ref base_file) = maybe_base_file {
503 let base_name = base.name(&id, &self.keychain)?;
504 let remote_file = remote.find(&id)?.clone();
505 let remote_name = remote.name(&id, &self.keychain)?;
506
507 if local_file.parent() != base_file.parent()
509 && remote_file.parent() == base_file.parent()
510 && !files_to_unmove.contains(&id)
511 {
512 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
513 }
514
515 if local_name != base_name && remote_name == base_name {
517 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
518 }
519 }
520
521 let mut remote_keys = HashMap::new();
523 if let Some(ref remote_file) = maybe_remote_file {
524 for key in remote_file.user_access_keys() {
525 remote_keys.insert(
526 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
527 (key.mode, key.deleted),
528 );
529 }
530 }
531 for key in local_file.user_access_keys() {
532 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
533 if let Some(&(remote_mode, remote_deleted)) =
534 remote_keys.get(&(by, for_))
535 {
536 if key.mode > remote_mode || !key.deleted && remote_deleted {
538 let mode = match key.mode {
539 UserAccessMode::Read => ShareMode::Read,
540 UserAccessMode::Write => ShareMode::Write,
541 UserAccessMode::Owner => continue,
542 };
543 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
544 }
545 if key.deleted && !remote_deleted {
547 merge.delete_share_unvalidated(
548 &id,
549 Some(for_.0),
550 &self.keychain,
551 )?;
552 }
553 } else {
554 let mode = match key.mode {
556 UserAccessMode::Read => ShareMode::Read,
557 UserAccessMode::Write => ShareMode::Write,
558 UserAccessMode::Owner => continue,
559 };
560 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
561 }
562 }
563
564 if files_to_unshare.contains(&id) {
566 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
567 }
568
569 if let Some(&rename_increment) = rename_increments.get(&id) {
571 let name = NameComponents::from(&local_name)
572 .generate_incremented(rename_increment)
573 .to_name();
574 merge.rename_unvalidated(&id, &name, &self.keychain)?;
575 }
576
577 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
579 let remote_hmac =
580 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
581 let local_hmac = local_file.document_hmac().cloned();
582 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
583 && local_hmac != base_hmac
584 {
585 if remote_hmac != base_hmac && remote_hmac != local_hmac {
586 let merge_name = merge.name(&id, &self.keychain)?;
588 let document_type =
589 DocumentType::from_file_name_using_extension(&merge_name);
590
591 let base_document =
594 self.read_document_helper(id, &mut base).await?;
595 let remote_document =
596 self.read_document_helper(id, &mut remote).await?;
597 let local_document =
598 self.read_document_helper(id, &mut local).await?;
599
600 match document_type {
601 DocumentType::Text => {
602 let base_document =
605 String::from_utf8_lossy(&base_document).to_string();
606 let remote_document =
607 String::from_utf8_lossy(&remote_document).to_string();
608 let local_document =
609 String::from_utf8_lossy(&local_document).to_string();
610 let merged_document = Buffer::from(base_document.as_str())
611 .merge(local_document, remote_document);
612 let encrypted_document = merge
613 .update_document_unvalidated(
614 &id,
615 &merged_document.into_bytes(),
616 &self.keychain,
617 )?;
618 let hmac = merge.find(&id)?.document_hmac().copied();
619 self.docs.insert(id, hmac, &encrypted_document).await?;
620 }
621 DocumentType::Drawing => {
622 println!("sync merge");
623 let base_document =
624 String::from_utf8_lossy(&base_document).to_string();
625 let remote_document =
626 String::from_utf8_lossy(&remote_document).to_string();
627 let local_document =
628 String::from_utf8_lossy(&local_document).to_string();
629
630 let base_buffer = svg::buffer::Buffer::new(&base_document);
631 let remote_buffer =
632 svg::buffer::Buffer::new(&remote_document);
633 let mut local_buffer =
634 svg::buffer::Buffer::new(&local_document);
635
636 for (_, el) in local_buffer.elements.iter_mut() {
637 if let Element::Path(path) = el {
638 path.data.apply_transform(u_transform_to_bezier(
639 &local_buffer.master_transform,
640 ));
641 }
642 }
643 svg::buffer::Buffer::reload(
644 &mut local_buffer.elements,
645 &mut local_buffer.weak_images,
646 local_buffer.master_transform,
647 &base_buffer,
648 &remote_buffer,
649 );
650
651 let merged_document = local_buffer.serialize();
652 let encrypted_document = merge
653 .update_document_unvalidated(
654 &id,
655 &merged_document.into_bytes(),
656 &self.keychain,
657 )?;
658 let hmac = merge.find(&id)?.document_hmac().copied();
659 self.docs.insert(id, hmac, &encrypted_document).await?;
660 }
661 DocumentType::Other => {
662 let merge_parent = *merge.find(&id)?.parent();
664 let duplicate_id = if let Some(&duplicate_id) =
665 duplicate_file_ids.get(&id)
666 {
667 duplicate_id
668 } else {
669 let duplicate_id = Uuid::new_v4();
670 duplicate_file_ids.insert(id, duplicate_id);
671 rename_increments.insert(duplicate_id, 1);
672 duplicate_id
673 };
674
675 let mut merge_name = merge_name;
676 merge_name = NameComponents::from(&merge_name)
677 .generate_incremented(
678 rename_increments
679 .get(&duplicate_id)
680 .copied()
681 .unwrap_or_default(),
682 )
683 .to_name();
684
685 merge.create_unvalidated(
686 duplicate_id,
687 symkey::generate_key(),
688 &merge_parent,
689 &merge_name,
690 FileType::Document,
691 &self.keychain,
692 )?;
693 let encrypted_document = merge
694 .update_document_unvalidated(
695 &duplicate_id,
696 &local_document,
697 &self.keychain,
698 )?;
699 let duplicate_hmac =
700 merge.find(&duplicate_id)?.document_hmac().copied();
701 self.docs
702 .insert(
703 duplicate_id,
704 duplicate_hmac,
705 &encrypted_document,
706 )
707 .await?;
708 }
709 }
710 } else {
711 let document = self.read_document_helper(id, &mut local).await?;
713 merge.update_document_unvalidated(
714 &id,
715 &document,
716 &self.keychain,
717 )?;
718 }
719 }
720 }
721
722 for id in db.local_metadata.ids() {
725 if db.base_metadata.maybe_find(&id).is_some()
726 && deletions.calculate_deleted(&id)?
727 && !merge.calculate_deleted(&id)?
728 {
729 merge.delete_unvalidated(&id, &self.keychain)?;
731 }
732 }
733 for &id in &links_to_delete {
734 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
736 merge.delete_unvalidated(&id, &self.keychain)?;
737 }
738 }
739
740 merge
741 };
742
743 for link in merge.ids() {
745 if !merge.calculate_deleted(&link)? {
746 if let FileType::Link { target } = merge.find(&link)?.file_type() {
747 if merge.maybe_find(&target).is_some()
748 && merge.calculate_deleted(&target)?
749 {
750 if links_to_delete.insert(link) {
752 continue 'merge_construction;
753 } else {
754 return Err(LbErrKind::Unexpected(format!(
755 "sync failed to resolve broken link (deletion): {:?}",
756 link
757 ))
758 .into());
759 }
760 }
761 }
762 }
763 }
764
765 let validate_result = merge.validate(me);
766 match validate_result {
767 Ok(_) => {
769 let (_, merge_changes) = merge.unstage();
770 break merge_changes;
771 }
772 Err(ref err) => match err.kind {
773 LbErrKind::Validation(ref vf) => match vf {
774 ValidationFailure::Cycle(ids) => {
776 let mut progress = false;
778 for &id in ids {
779 if db.local_metadata.maybe_find(&id).is_some()
780 && files_to_unmove.insert(id)
781 {
782 progress = true;
783 }
784 }
785 if !progress {
786 return Err(LbErrKind::Unexpected(format!(
787 "sync failed to resolve cycle: {:?}",
788 ids
789 ))
790 .into());
791 }
792 }
793 ValidationFailure::PathConflict(ids) => {
794 let mut progress = false;
796 for &id in ids {
797 if duplicate_file_ids.values().any(|&dup| dup == id) {
798 *rename_increments.entry(id).or_insert(0) += 1;
799 progress = true;
800 break;
801 }
802 }
803 if !progress {
804 for &id in ids {
805 if db.local_metadata.maybe_find(&id).is_some() {
806 *rename_increments.entry(id).or_insert(0) += 1;
807 progress = true;
808 break;
809 }
810 }
811 }
812 if !progress {
813 return Err(LbErrKind::Unexpected(format!(
814 "sync failed to resolve path conflict: {:?}",
815 ids
816 ))
817 .into());
818 }
819 }
820 ValidationFailure::SharedLink { link, shared_ancestor } => {
821 let mut progress = false;
823 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
824 {
825 if !base_shared_ancestor.is_shared()
826 && files_to_unshare.insert(*shared_ancestor)
827 {
828 progress = true;
829 }
830 }
831 if !progress && links_to_delete.insert(*link) {
832 progress = true;
833 }
834 if !progress {
835 return Err(LbErrKind::Unexpected(format!(
836 "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
837 link, shared_ancestor
838 )).into());
839 }
840 }
841 ValidationFailure::DuplicateLink { target } => {
842 let mut progress = false;
844 if let Some(link) = local.linked_by(target)? {
845 if links_to_delete.insert(link) {
846 progress = true;
847 }
848 }
849 if !progress {
850 return Err(LbErrKind::Unexpected(format!(
851 "sync failed to resolve duplicate link: target: {:?}",
852 target
853 ))
854 .into());
855 }
856 }
857 ValidationFailure::BrokenLink(link) => {
858 if !links_to_delete.insert(*link) {
860 return Err(LbErrKind::Unexpected(format!(
861 "sync failed to resolve broken link: {:?}",
862 link
863 ))
864 .into());
865 }
866 }
867 ValidationFailure::OwnedLink(link) => {
868 let mut progress = false;
870 if let Some(remote_link) = remote.maybe_find(link) {
871 if let FileType::Link { target } = remote_link.file_type() {
872 let remote_target = remote.find(&target)?;
873 if remote_target.owner() != me
874 && files_to_unmove.insert(target)
875 {
876 progress = true;
877 }
878 }
879 }
880 if !progress && links_to_delete.insert(*link) {
881 progress = true;
882 }
883 if !progress {
884 return Err(LbErrKind::Unexpected(format!(
885 "sync failed to resolve owned link: {:?}",
886 link
887 ))
888 .into());
889 }
890 }
891 ValidationFailure::Orphan(_)
893 | ValidationFailure::NonFolderWithChildren(_)
894 | ValidationFailure::FileWithDifferentOwnerParent(_)
895 | ValidationFailure::FileNameTooLong(_)
896 | ValidationFailure::DeletedFileUpdated(_)
897 | ValidationFailure::NonDecryptableFileName(_) => {
898 validate_result?;
899 }
900 },
901 _ => {
903 validate_result?;
904 }
905 },
906 }
907 }
908 };
909
910 (&mut db.base_metadata)
912 .to_staged(remote_changes.clone())
913 .to_lazy()
914 .promote()?;
915 db.local_metadata.clear()?;
916 (&mut db.local_metadata)
917 .to_staged(merge_changes)
918 .to_lazy()
919 .promote()?;
920
921 db.base_metadata.stage(&mut db.local_metadata).prune()?;
924
925 if start.elapsed() > std::time::Duration::from_millis(100) {
926 warn!("sync merge held lock for {:?}", start.elapsed());
927 }
928
929 Ok(())
930 }
931
932 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
934 ctx.msg("Pushing tree changes...");
935 let mut updates = vec![];
936 let mut local_changes_no_digests = Vec::new();
937
938 let tx = self.ro_tx().await;
939 let db = tx.db();
940
941 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
943
944 for id in local.tree.staged.ids() {
945 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
946 let maybe_base_file = local.tree.base.maybe_find(&id);
947
948 local_change.document_hmac =
950 maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
951 let local_change = local_change.sign(&self.keychain)?;
952
953 local_changes_no_digests.push(local_change.clone());
954 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
955 updates.push(file_diff);
956 }
957
958 drop(tx);
959
960 if !updates.is_empty() {
961 self.client
962 .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
963 .await?;
964 ctx.pushed_metas = updates;
965 }
966
967 let mut tx = self.begin_tx().await;
968 let db = tx.db();
969
970 (&mut db.base_metadata)
972 .to_lazy()
973 .stage(local_changes_no_digests)
974 .promote()?;
975 db.base_metadata.stage(&mut db.local_metadata).prune()?;
976
977 tx.end();
978
979 Ok(())
980 }
981
982 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
984 ctx.msg("Pushing document changes...");
985 let mut updates = vec![];
986 let mut local_changes_digests_only = vec![];
987
988 let tx = self.ro_tx().await;
989 let db = tx.db();
990 let start = Instant::now();
991
992 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
993
994 for id in local.tree.staged.ids() {
995 let base_file = local.tree.base.find(&id)?.clone();
996
997 let mut local_change = base_file.timestamped_value.value.clone();
999 local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1000
1001 if base_file.document_hmac() == local_change.document_hmac()
1002 || local_change.document_hmac.is_none()
1003 {
1004 continue;
1005 }
1006
1007 let local_change = local_change.sign(&self.keychain)?;
1008
1009 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1010 local_changes_digests_only.push(local_change);
1011 }
1012
1013 drop(tx);
1014 if start.elapsed() > std::time::Duration::from_millis(100) {
1015 warn!("sync push_docs held lock for {:?}", start.elapsed());
1016 }
1017
1018 let docs_count = updates.len();
1019 ctx.total += docs_count;
1020 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1021
1022 let mut stream = stream::iter(futures).buffer_unordered(
1023 thread::available_parallelism()
1024 .unwrap_or(NonZeroUsize::new(4).unwrap())
1025 .into(),
1026 );
1027
1028 let mut idx = 0;
1029 while let Some(fut) = stream.next().await {
1030 let id = fut?;
1031 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1032 idx += 1;
1033 }
1034 ctx.pushed_docs = updates;
1035
1036 let mut tx = self.begin_tx().await;
1037 let db = tx.db();
1038 (&mut db.base_metadata)
1040 .to_lazy()
1041 .stage(local_changes_digests_only)
1042 .promote()?;
1043
1044 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1045
1046 tx.end();
1047
1048 Ok(())
1049 }
1050
1051 async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1052 let id = *diff.new.id();
1053 let hmac = diff.new.document_hmac();
1054 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1055 self.client
1056 .request(
1057 self.get_account()?,
1058 ChangeDocRequest { diff, new_content: local_document_change },
1059 )
1060 .await?;
1061
1062 Ok(id)
1063 }
1064
1065 async fn dedup(
1066 &self, updates: GetUpdatesResponse,
1067 ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1068 let tx = self.ro_tx().await;
1069 let db = tx.db();
1070
1071 let mut root_id = None;
1072 let (remote_changes, update_as_of) = {
1073 let mut remote_changes = updates.file_metadata;
1074 let update_as_of = updates.as_of_metadata_version;
1075
1076 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1077
1078 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1079
1080 let (_, remote_changes) = remote.unstage();
1081 (remote_changes, update_as_of)
1082 };
1083
1084 if db.root.get().is_none() {
1086 let root = remote_changes
1087 .all_files()?
1088 .into_iter()
1089 .find(|f| f.is_root())
1090 .ok_or(LbErrKind::RootNonexistent)?;
1091 root_id = Some(*root.id());
1092 }
1093
1094 Ok((remote_changes, update_as_of, root_id))
1095 }
1096
1097 async fn prune_remote_orphans(
1098 &self, remote_changes: Vec<SignedFile>,
1099 ) -> LbResult<Vec<SignedFile>> {
1100 let tx = self.ro_tx().await;
1101 let db = tx.db();
1102
1103 let me = Owner(self.keychain.get_pk()?);
1104 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1105 let mut result = Vec::new();
1106
1107 for id in remote.tree.staged.ids() {
1108 let meta = remote.find(&id)?;
1109 if remote.maybe_find_parent(meta).is_some()
1110 || meta
1111 .user_access_keys()
1112 .iter()
1113 .any(|k| k.encrypted_for == me.0)
1114 {
1115 result.push(remote.find(&id)?.clone()); }
1117 }
1118 Ok(result)
1119 }
1120
1121 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1122 ctx.msg("Cleaning up...");
1123 let mut tx = self.begin_tx().await;
1124 let db = tx.db();
1125 db.last_synced.insert(ctx.update_as_of as i64)?;
1126
1127 if let Some(root) = ctx.root {
1128 db.root.insert(root)?;
1129 }
1130
1131 Ok(())
1132 }
1133
1134 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1135 let tx = self.ro_tx().await;
1136 let db = tx.db();
1137 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1138
1139 Ok(self.get_timestamp_human_string(last_synced))
1140 }
1141
1142 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1143 if timestamp != 0 {
1144 Duration::milliseconds(clock::get_time().0 - timestamp)
1145 .format_human()
1146 .to_string()
1147 } else {
1148 "never".to_string()
1149 }
1150 }
1151}
1152
1153impl SyncContext {
1154 fn summarize(&self) -> SyncStatus {
1155 let mut local = HashSet::new();
1156 let mut server = HashSet::new();
1157 let mut work_units = vec![];
1158
1159 for meta in &self.pushed_metas {
1160 local.insert(meta.new.id());
1161 }
1162
1163 for meta in &self.pushed_docs {
1164 local.insert(meta.new.id());
1165 }
1166
1167 for meta in &self.remote_changes {
1168 server.insert(meta.id());
1169 }
1170
1171 for id in local {
1172 work_units.push(WorkUnit::LocalChange(*id));
1173 }
1174
1175 for id in server {
1176 work_units.push(WorkUnit::ServerChange(*id));
1177 }
1178
1179 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1180 }
1181
1182 fn msg(&mut self, msg: &str) {
1183 self.current += 1;
1184 if let Some(f) = &self.progress {
1185 f(SyncProgress {
1186 total: self.total,
1187 progress: self.current,
1188 file_being_processed: Default::default(),
1189 msg: msg.to_string(),
1190 })
1191 }
1192 }
1193
1194 fn file_msg(&mut self, id: Uuid, msg: &str) {
1195 self.current += 1;
1196 if let Some(f) = &self.progress {
1197 f(SyncProgress {
1198 total: self.total,
1199 progress: self.current,
1200 file_being_processed: Some(id),
1201 msg: msg.to_string(),
1202 })
1203 }
1204 }
1205
1206 fn done_msg(&mut self) {
1207 self.current = self.total;
1208 if let Some(f) = &self.progress {
1209 f(SyncProgress {
1210 total: self.total,
1211 progress: self.current,
1212 file_being_processed: None,
1213 msg: "Sync successful!".to_string(),
1214 })
1215 }
1216 }
1217}
1218
1219#[derive(Debug, Serialize, Clone)]
1220pub struct SyncStatus {
1221 pub work_units: Vec<WorkUnit>,
1222 pub latest_server_ts: u64,
1223}
1224
1225#[derive(Clone)]
1226pub struct SyncProgress {
1227 pub total: usize,
1228 pub progress: usize,
1229 pub file_being_processed: Option<Uuid>,
1230 pub msg: String,
1231}
1232
1233impl Display for SyncProgress {
1234 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1235 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1236 }
1237}