1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4 ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5 GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use usvg::Transform;
35use uuid::Uuid;
36
37use super::events::Actor;
38
39pub type SyncFlag = Arc<AtomicBool>;
40
41pub struct SyncContext {
42 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
43 current: usize,
44 total: usize,
45
46 pk_cache: HashMap<Owner, String>,
47 last_synced: u64,
48 remote_changes: Vec<SignedFile>,
49 update_as_of: u64,
50 root: Option<Uuid>,
51 pushed_metas: Vec<FileDiff<SignedFile>>,
52 pushed_docs: Vec<FileDiff<SignedFile>>,
53 pulled_docs: Vec<Uuid>,
54}
55
56impl Lb {
57 #[instrument(level = "debug", skip_all, err(Debug))]
58 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
59 let tx = self.ro_tx().await;
60 let db = tx.db();
61 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
62 drop(tx);
63
64 let remote_changes = self
65 .client
66 .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
67 .await?;
68 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
69 let remote_dirty = deduped
70 .into_iter()
71 .map(|f| *f.id())
72 .map(WorkUnit::ServerChange);
73
74 self.prune().await?;
75
76 let tx = self.ro_tx().await;
77 let db = tx.db();
78
79 let locally_dirty = db
80 .local_metadata
81 .get()
82 .keys()
83 .copied()
84 .map(WorkUnit::LocalChange);
85
86 let mut work_units: Vec<WorkUnit> = Vec::new();
87 work_units.extend(locally_dirty.chain(remote_dirty));
88 Ok(SyncStatus { work_units, latest_server_ts })
89 }
90
91 #[instrument(level = "debug", skip_all, err(Debug))]
92 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
93 let old = self.syncing.swap(true, Ordering::SeqCst);
94 if old {
95 return Err(LbErrKind::AlreadySyncing.into());
96 }
97
98 let mut ctx = self.setup_sync(f).await?;
99
100 let mut got_updates = false;
101 let mut pipeline: LbResult<()> = async {
102 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
104 self.prune().await?;
105 got_updates = self.fetch_meta(&mut ctx).await?;
106 self.populate_pk_cache(&mut ctx).await?;
107 self.docs.dont_delete.store(true, Ordering::SeqCst);
108 self.fetch_docs(&mut ctx).await?;
109 self.merge(&mut ctx).await?;
110 self.push_meta(&mut ctx).await?;
111 self.push_docs(&mut ctx).await?;
112 Ok(())
113 }
114 .await;
115
116 self.docs.dont_delete.store(false, Ordering::SeqCst);
117
118 if pipeline.is_ok() {
119 pipeline = self.commit_last_synced(&mut ctx).await;
120 }
121
122 let cleanup = self.cleanup().await;
123
124 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
125 self.events.sync(SyncIncrement::SyncFinished(ekind));
126
127 self.syncing.store(false, Ordering::Relaxed);
128 pipeline?;
129 cleanup?;
130
131 ctx.done_msg();
133
134 if got_updates {
135 self.events.meta_changed();
137 for id in &ctx.pulled_docs {
138 self.events.doc_written(*id, Some(Actor::Sync));
139 }
140 }
141
142 Ok(ctx.summarize())
143 }
144
145 async fn setup_sync(
146 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
147 ) -> LbResult<SyncContext> {
148 let tx = self.ro_tx().await;
149 let db = tx.db();
150
151 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
152 let pk_cache = db.pub_key_lookup.get().clone();
153
154 let current = 0;
155 let total = 7;
156
157 Ok(SyncContext {
158 last_synced,
159 pk_cache,
160
161 progress,
162 current,
163 total,
164
165 root: Default::default(),
166 update_as_of: Default::default(),
167 remote_changes: Default::default(),
168 pushed_docs: Default::default(),
169 pushed_metas: Default::default(),
170 pulled_docs: Default::default(),
171 })
172 }
173
174 async fn prune(&self) -> LbResult<()> {
175 let server_ids = self
176 .client
177 .request(self.get_account()?, GetFileIdsRequest {})
178 .await?
179 .ids;
180
181 let mut tx = self.begin_tx().await;
182 let db = tx.db();
183
184 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
185 let base_ids = local.tree.base.ids();
186
187 let mut prunable_ids = base_ids;
188 prunable_ids.retain(|id| !server_ids.contains(id));
189 for id in prunable_ids.clone() {
190 prunable_ids.extend(local.descendants(&id)?.into_iter());
191 }
192 for id in &prunable_ids {
193 if let Some(base_file) = local.tree.base.maybe_find(id) {
194 self.docs
195 .delete(*id, base_file.document_hmac().copied())
196 .await?;
197 }
198 if let Some(local_file) = local.maybe_find(id) {
199 self.docs
200 .delete(*id, local_file.document_hmac().copied())
201 .await?;
202 }
203 }
204
205 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
206 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
207 base_staged.promote()?;
208
209 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
210 local_staged.tree.removed = prunable_ids.into_iter().collect();
211 local_staged.promote()?;
212
213 Ok(())
214 }
215
216 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
218 ctx.msg("Fetching tree updates...");
219 let updates = self
220 .client
221 .request(
222 self.get_account()?,
223 GetUpdatesRequest { since_metadata_version: ctx.last_synced },
224 )
225 .await?;
226
227 let empty = updates.file_metadata.is_empty();
228 let (remote, as_of, root) = self.dedup(updates).await?;
229
230 ctx.remote_changes = remote;
231 ctx.update_as_of = as_of;
232 ctx.root = root;
233
234 Ok(!empty)
235 }
236
237 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
238 ctx.msg("Updating public key cache...");
239 let mut all_owners = HashSet::new();
240 for file in &ctx.remote_changes {
241 for user_access_key in file.user_access_keys() {
242 all_owners.insert(Owner(user_access_key.encrypted_by));
243 all_owners.insert(Owner(user_access_key.encrypted_for));
244 }
245 }
246
247 let mut new_entries = HashMap::new();
248
249 for owner in all_owners {
250 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
251 let username_result = self
252 .client
253 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
254 .await;
255 let username = match username_result {
256 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
257 "<unknown>".to_string()
258 }
259 _ => username_result?.username.clone(),
260 };
261 new_entries.insert(owner, username.clone());
262 e.insert(username.clone());
263 }
264 }
265
266 let mut tx = self.begin_tx().await;
267 let db = tx.db();
268
269 for (owner, username) in new_entries {
270 db.pub_key_lookup.insert(owner, username)?;
271 }
272 Ok(())
273 }
274
275 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
276 ctx.msg("Fetching documents...");
277 let mut docs_to_pull = vec![];
278
279 let tx = self.ro_tx().await;
280 let db = tx.db();
281 let start = Instant::now();
282
283 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
285 if remote.calculate_deleted(&id)? {
286 continue;
287 }
288 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
289 let base_hmac = remote
290 .tree
291 .base
292 .maybe_find(&id)
293 .and_then(|f| f.document_hmac())
294 .cloned();
295 if base_hmac == remote_hmac {
296 continue;
297 }
298
299 if let Some(remote_hmac) = remote_hmac {
300 docs_to_pull.push((id, remote_hmac));
301 self.events.sync(SyncIncrement::PullingDocument(id, true));
302 }
303 }
304
305 drop(tx);
306 if start.elapsed() > std::time::Duration::from_millis(100) {
307 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
308 }
309
310 let num_docs = docs_to_pull.len();
311 ctx.total += num_docs;
312
313 let futures = docs_to_pull
314 .into_iter()
315 .map(|(id, hmac)| self.fetch_doc(id, hmac));
316
317 let mut stream = stream::iter(futures).buffer_unordered(
318 thread::available_parallelism()
319 .unwrap_or(NonZeroUsize::new(4).unwrap())
320 .into(),
321 );
322
323 let mut idx = 0;
324 while let Some(fut) = stream.next().await {
325 let id = fut?;
326 ctx.pulled_docs.push(id);
327 self.events.sync(SyncIncrement::PullingDocument(id, false));
328 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
329 idx += 1;
330 }
331 Ok(())
332 }
333
334 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
335 let remote_document = self
336 .client
337 .request(self.get_account()?, GetDocRequest { id, hmac })
338 .await?;
339 self.docs
340 .insert(id, Some(hmac), &remote_document.content)
341 .await?;
342
343 Ok(id)
344 }
345
346 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
349 let mut tx = self.begin_tx().await;
350 let db = tx.db();
351 let start = Instant::now();
352
353 let remote_changes = &ctx.remote_changes;
354
355 let me = Owner(self.keychain.get_pk()?);
357
358 let merge_changes = {
360 let mut base = (&db.base_metadata).to_lazy();
362 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
363 let mut remote = remote_unlazy.as_lazy();
364 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
365
366 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
368 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
369 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
370 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
371 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
372
373 'merge_construction: loop {
374 let mut deletions = {
376 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
377
378 let mut deletion_creations = HashSet::new();
380 for id in db.local_metadata.ids() {
381 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
382 deletion_creations.insert(id);
383 }
384 }
385 'drain_creations: while !deletion_creations.is_empty() {
386 'choose_a_creation: for id in &deletion_creations {
387 let id = *id;
389 let local_file = local.find(&id)?.clone();
390 let result = deletions.create_unvalidated(
391 id,
392 symkey::generate_key(),
393 local_file.parent(),
394 &local.name(&id, &self.keychain)?,
395 local_file.file_type(),
396 &self.keychain,
397 );
398 match result {
399 Ok(_) => {
400 deletion_creations.remove(&id);
401 continue 'drain_creations;
402 }
403 Err(ref err) => match err.kind {
404 LbErrKind::FileParentNonexistent => {
405 continue 'choose_a_creation;
406 }
407 _ => {
408 result?;
409 }
410 },
411 }
412 }
413 return Err(LbErrKind::Unexpected(format!(
414 "sync failed to find a topomodelal order for file creations: {:?}",
415 deletion_creations
416 ))
417 .into());
418 }
419
420 for id in db.local_metadata.ids() {
422 let local_file = local.find(&id)?.clone();
423 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
424 if !local_file.explicitly_deleted()
425 && local_file.parent() != base_file.parent()
426 && !files_to_unmove.contains(&id)
427 {
428 deletions.move_unvalidated(
430 &id,
431 local_file.parent(),
432 &self.keychain,
433 )?;
434 }
435 }
436 }
437
438 for id in db.local_metadata.ids() {
440 let local_file = local.find(&id)?.clone();
441 if local_file.explicitly_deleted() {
442 deletions.delete_unvalidated(&id, &self.keychain)?;
444 }
445 }
446 deletions
447 };
448
449 let mut merge = {
451 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
452
453 let mut creations = HashSet::new();
455 for id in db.local_metadata.ids() {
456 if deletions.maybe_find(&id).is_some()
457 && !deletions.calculate_deleted(&id)?
458 && remote.maybe_find(&id).is_none()
459 && !links_to_delete.contains(&id)
460 {
461 creations.insert(id);
462 }
463 }
464 'drain_creations: while !creations.is_empty() {
465 'choose_a_creation: for id in &creations {
466 let id = *id;
468 let local_file = local.find(&id)?.clone();
469 let result = merge.create_unvalidated(
470 id,
471 local.decrypt_key(&id, &self.keychain)?,
472 local_file.parent(),
473 &local.name(&id, &self.keychain)?,
474 local_file.file_type(),
475 &self.keychain,
476 );
477 match result {
478 Ok(_) => {
479 creations.remove(&id);
480 continue 'drain_creations;
481 }
482 Err(ref err) => match err.kind {
483 LbErrKind::FileParentNonexistent => {
484 continue 'choose_a_creation;
485 }
486 _ => {
487 result?;
488 }
489 },
490 }
491 }
492 return Err(LbErrKind::Unexpected(format!(
493 "sync failed to find a topomodelal order for file creations: {:?}",
494 creations
495 ))
496 .into());
497 }
498
499 for id in db.local_metadata.ids() {
502 if deletions.maybe_find(&id).is_none()
504 || deletions.calculate_deleted(&id)?
505 || (remote.maybe_find(&id).is_some()
506 && remote.calculate_deleted(&id)?)
507 {
508 continue;
509 }
510
511 let local_file = local.find(&id)?.clone();
512 let local_name = local.name(&id, &self.keychain)?;
513 let maybe_base_file = base.maybe_find(&id).cloned();
514 let maybe_remote_file = remote.maybe_find(&id).cloned();
515 if let Some(ref base_file) = maybe_base_file {
516 let base_name = base.name(&id, &self.keychain)?;
517 let remote_file = remote.find(&id)?.clone();
518 let remote_name = remote.name(&id, &self.keychain)?;
519
520 if local_file.parent() != base_file.parent()
522 && remote_file.parent() == base_file.parent()
523 && !files_to_unmove.contains(&id)
524 {
525 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
526 }
527
528 if local_name != base_name && remote_name == base_name {
530 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
531 }
532 }
533
534 let mut remote_keys = HashMap::new();
536 if let Some(ref remote_file) = maybe_remote_file {
537 for key in remote_file.user_access_keys() {
538 remote_keys.insert(
539 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
540 (key.mode, key.deleted),
541 );
542 }
543 }
544 for key in local_file.user_access_keys() {
545 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
546 if let Some(&(remote_mode, remote_deleted)) =
547 remote_keys.get(&(by, for_))
548 {
549 if key.mode > remote_mode || !key.deleted && remote_deleted {
551 let mode = match key.mode {
552 UserAccessMode::Read => ShareMode::Read,
553 UserAccessMode::Write => ShareMode::Write,
554 UserAccessMode::Owner => continue,
555 };
556 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
557 }
558 if key.deleted && !remote_deleted {
560 merge.delete_share_unvalidated(
561 &id,
562 Some(for_.0),
563 &self.keychain,
564 )?;
565 }
566 } else {
567 let mode = match key.mode {
569 UserAccessMode::Read => ShareMode::Read,
570 UserAccessMode::Write => ShareMode::Write,
571 UserAccessMode::Owner => continue,
572 };
573 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
574 }
575 }
576
577 if files_to_unshare.contains(&id) {
579 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
580 }
581
582 if let Some(&rename_increment) = rename_increments.get(&id) {
584 let name = NameComponents::from(&local_name)
585 .generate_incremented(rename_increment)
586 .to_name();
587 merge.rename_unvalidated(&id, &name, &self.keychain)?;
588 }
589
590 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
592 let remote_hmac =
593 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
594 let local_hmac = local_file.document_hmac().cloned();
595 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
596 && local_hmac != base_hmac
597 {
598 if remote_hmac != base_hmac && remote_hmac != local_hmac {
599 let merge_name = merge.name(&id, &self.keychain)?;
601 let document_type =
602 DocumentType::from_file_name_using_extension(&merge_name);
603
604 let base_document =
607 self.read_document_helper(id, &mut base).await?;
608 let remote_document =
609 self.read_document_helper(id, &mut remote).await?;
610 let local_document =
611 self.read_document_helper(id, &mut local).await?;
612
613 match document_type {
614 DocumentType::Text => {
615 let base_document =
618 String::from_utf8_lossy(&base_document).to_string();
619 let remote_document =
620 String::from_utf8_lossy(&remote_document).to_string();
621 let local_document =
622 String::from_utf8_lossy(&local_document).to_string();
623 let merged_document = Buffer::from(base_document.as_str())
624 .merge(local_document, remote_document);
625 let encrypted_document = merge
626 .update_document_unvalidated(
627 &id,
628 &merged_document.into_bytes(),
629 &self.keychain,
630 )?;
631 let hmac = merge.find(&id)?.document_hmac().copied();
632 self.docs.insert(id, hmac, &encrypted_document).await?;
633 }
634 DocumentType::Drawing => {
635 let base_document =
636 String::from_utf8_lossy(&base_document).to_string();
637 let remote_document =
638 String::from_utf8_lossy(&remote_document).to_string();
639 let local_document =
640 String::from_utf8_lossy(&local_document).to_string();
641
642 let base_buffer = svg::buffer::Buffer::new(&base_document);
643 let remote_buffer =
644 svg::buffer::Buffer::new(&remote_document);
645 let mut local_buffer =
646 svg::buffer::Buffer::new(&local_document);
647
648 for (_, el) in local_buffer.elements.iter_mut() {
649 if let Element::Path(path) = el {
650 path.data.apply_transform(u_transform_to_bezier(
651 &Transform::from(
652 local_buffer
653 .weak_viewport_settings
654 .master_transform,
655 ),
656 ));
657 }
658 }
659 svg::buffer::Buffer::reload(
660 &mut local_buffer.elements,
661 &mut local_buffer.weak_images,
662 &mut local_buffer.weak_path_pressures,
663 &mut local_buffer.weak_viewport_settings,
664 &base_buffer,
665 &remote_buffer,
666 );
667
668 let merged_document = local_buffer.serialize();
669 let encrypted_document = merge
670 .update_document_unvalidated(
671 &id,
672 &merged_document.into_bytes(),
673 &self.keychain,
674 )?;
675 let hmac = merge.find(&id)?.document_hmac().copied();
676 self.docs.insert(id, hmac, &encrypted_document).await?;
677 }
678 DocumentType::Other => {
679 let merge_parent = *merge.find(&id)?.parent();
681 let duplicate_id = if let Some(&duplicate_id) =
682 duplicate_file_ids.get(&id)
683 {
684 duplicate_id
685 } else {
686 let duplicate_id = Uuid::new_v4();
687 duplicate_file_ids.insert(id, duplicate_id);
688 rename_increments.insert(duplicate_id, 1);
689 duplicate_id
690 };
691
692 let mut merge_name = merge_name;
693 merge_name = NameComponents::from(&merge_name)
694 .generate_incremented(
695 rename_increments
696 .get(&duplicate_id)
697 .copied()
698 .unwrap_or_default(),
699 )
700 .to_name();
701
702 merge.create_unvalidated(
703 duplicate_id,
704 symkey::generate_key(),
705 &merge_parent,
706 &merge_name,
707 FileType::Document,
708 &self.keychain,
709 )?;
710 let encrypted_document = merge
711 .update_document_unvalidated(
712 &duplicate_id,
713 &local_document,
714 &self.keychain,
715 )?;
716 let duplicate_hmac =
717 merge.find(&duplicate_id)?.document_hmac().copied();
718 self.docs
719 .insert(
720 duplicate_id,
721 duplicate_hmac,
722 &encrypted_document,
723 )
724 .await?;
725 }
726 }
727 } else {
728 let document = self.read_document_helper(id, &mut local).await?;
730 merge.update_document_unvalidated(
731 &id,
732 &document,
733 &self.keychain,
734 )?;
735 }
736 }
737 }
738
739 for id in db.local_metadata.ids() {
742 if db.base_metadata.maybe_find(&id).is_some()
743 && deletions.calculate_deleted(&id)?
744 && !merge.calculate_deleted(&id)?
745 {
746 merge.delete_unvalidated(&id, &self.keychain)?;
748 }
749 }
750 for &id in &links_to_delete {
751 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
753 merge.delete_unvalidated(&id, &self.keychain)?;
754 }
755 }
756
757 merge
758 };
759
760 for link in merge.ids() {
762 if !merge.calculate_deleted(&link)? {
763 if let FileType::Link { target } = merge.find(&link)?.file_type() {
764 if merge.maybe_find(&target).is_some()
765 && merge.calculate_deleted(&target)?
766 {
767 if links_to_delete.insert(link) {
769 continue 'merge_construction;
770 } else {
771 return Err(LbErrKind::Unexpected(format!(
772 "sync failed to resolve broken link (deletion): {:?}",
773 link
774 ))
775 .into());
776 }
777 }
778 }
779 }
780 }
781
782 let validate_result = merge.validate(me);
783 match validate_result {
784 Ok(_) => {
786 let (_, merge_changes) = merge.unstage();
787 break merge_changes;
788 }
789 Err(ref err) => match err.kind {
790 LbErrKind::Validation(ref vf) => match vf {
791 ValidationFailure::Cycle(ids) => {
793 let mut progress = false;
795 for &id in ids {
796 if db.local_metadata.maybe_find(&id).is_some()
797 && files_to_unmove.insert(id)
798 {
799 progress = true;
800 }
801 }
802 if !progress {
803 return Err(LbErrKind::Unexpected(format!(
804 "sync failed to resolve cycle: {:?}",
805 ids
806 ))
807 .into());
808 }
809 }
810 ValidationFailure::PathConflict(ids) => {
811 let mut progress = false;
813 for &id in ids {
814 if duplicate_file_ids.values().any(|&dup| dup == id) {
815 *rename_increments.entry(id).or_insert(0) += 1;
816 progress = true;
817 break;
818 }
819 }
820 if !progress {
821 for &id in ids {
822 if db.local_metadata.maybe_find(&id).is_some() {
823 *rename_increments.entry(id).or_insert(0) += 1;
824 progress = true;
825 break;
826 }
827 }
828 }
829 if !progress {
830 return Err(LbErrKind::Unexpected(format!(
831 "sync failed to resolve path conflict: {:?}",
832 ids
833 ))
834 .into());
835 }
836 }
837 ValidationFailure::SharedLink { link, shared_ancestor } => {
838 let mut progress = false;
840 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
841 {
842 if !base_shared_ancestor.is_shared()
843 && files_to_unshare.insert(*shared_ancestor)
844 {
845 progress = true;
846 }
847 }
848 if !progress && links_to_delete.insert(*link) {
849 progress = true;
850 }
851 if !progress {
852 return Err(LbErrKind::Unexpected(format!(
853 "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
854 link, shared_ancestor
855 )).into());
856 }
857 }
858 ValidationFailure::DuplicateLink { target } => {
859 let mut progress = false;
861 if let Some(link) = local.linked_by(target)? {
862 if links_to_delete.insert(link) {
863 progress = true;
864 }
865 }
866 if !progress {
867 return Err(LbErrKind::Unexpected(format!(
868 "sync failed to resolve duplicate link: target: {:?}",
869 target
870 ))
871 .into());
872 }
873 }
874 ValidationFailure::BrokenLink(link) => {
875 if !links_to_delete.insert(*link) {
877 return Err(LbErrKind::Unexpected(format!(
878 "sync failed to resolve broken link: {:?}",
879 link
880 ))
881 .into());
882 }
883 }
884 ValidationFailure::OwnedLink(link) => {
885 let mut progress = false;
887 if let Some(remote_link) = remote.maybe_find(link) {
888 if let FileType::Link { target } = remote_link.file_type() {
889 let remote_target = remote.find(&target)?;
890 if remote_target.owner() != me
891 && files_to_unmove.insert(target)
892 {
893 progress = true;
894 }
895 }
896 }
897 if !progress && links_to_delete.insert(*link) {
898 progress = true;
899 }
900 if !progress {
901 return Err(LbErrKind::Unexpected(format!(
902 "sync failed to resolve owned link: {:?}",
903 link
904 ))
905 .into());
906 }
907 }
908 ValidationFailure::Orphan(_)
910 | ValidationFailure::NonFolderWithChildren(_)
911 | ValidationFailure::FileWithDifferentOwnerParent(_)
912 | ValidationFailure::FileNameTooLong(_)
913 | ValidationFailure::DeletedFileUpdated(_)
914 | ValidationFailure::NonDecryptableFileName(_) => {
915 validate_result?;
916 }
917 },
918 _ => {
920 validate_result?;
921 }
922 },
923 }
924 }
925 };
926
927 (&mut db.base_metadata)
929 .to_staged(remote_changes.clone())
930 .to_lazy()
931 .promote()?;
932 db.local_metadata.clear()?;
933 (&mut db.local_metadata)
934 .to_staged(merge_changes)
935 .to_lazy()
936 .promote()?;
937
938 db.base_metadata.stage(&mut db.local_metadata).prune()?;
941
942 if start.elapsed() > std::time::Duration::from_millis(100) {
943 warn!("sync merge held lock for {:?}", start.elapsed());
944 }
945
946 Ok(())
947 }
948
949 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
951 ctx.msg("Pushing tree changes...");
952 let mut updates = vec![];
953 let mut local_changes_no_digests = Vec::new();
954
955 let tx = self.ro_tx().await;
956 let db = tx.db();
957
958 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
960
961 for id in local.tree.staged.ids() {
962 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
963 let maybe_base_file = local.tree.base.maybe_find(&id);
964
965 local_change.document_hmac =
967 maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
968 let local_change = local_change.sign(&self.keychain)?;
969
970 local_changes_no_digests.push(local_change.clone());
971 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
972 updates.push(file_diff);
973 }
974
975 drop(tx);
976
977 if !updates.is_empty() {
978 self.client
979 .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
980 .await?;
981 ctx.pushed_metas = updates;
982 }
983
984 let mut tx = self.begin_tx().await;
985 let db = tx.db();
986
987 (&mut db.base_metadata)
989 .to_lazy()
990 .stage(local_changes_no_digests)
991 .promote()?;
992 db.base_metadata.stage(&mut db.local_metadata).prune()?;
993
994 tx.end();
995
996 Ok(())
997 }
998
999 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1001 ctx.msg("Pushing document changes...");
1002 let mut updates = vec![];
1003 let mut local_changes_digests_only = vec![];
1004
1005 let tx = self.ro_tx().await;
1006 let db = tx.db();
1007 let start = Instant::now();
1008
1009 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1010
1011 for id in local.tree.staged.ids() {
1012 let base_file = local.tree.base.find(&id)?.clone();
1013
1014 let mut local_change = base_file.timestamped_value.value.clone();
1016 local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1017
1018 if base_file.document_hmac() == local_change.document_hmac()
1019 || local_change.document_hmac.is_none()
1020 {
1021 continue;
1022 }
1023
1024 let local_change = local_change.sign(&self.keychain)?;
1025
1026 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1027 local_changes_digests_only.push(local_change);
1028 self.events.sync(SyncIncrement::PushingDocument(id, true));
1029 }
1030
1031 drop(tx);
1032 if start.elapsed() > std::time::Duration::from_millis(100) {
1033 warn!("sync push_docs held lock for {:?}", start.elapsed());
1034 }
1035
1036 let docs_count = updates.len();
1037 ctx.total += docs_count;
1038 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1039
1040 let mut stream = stream::iter(futures).buffer_unordered(
1041 thread::available_parallelism()
1042 .unwrap_or(NonZeroUsize::new(4).unwrap())
1043 .into(),
1044 );
1045
1046 let mut idx = 0;
1047 while let Some(fut) = stream.next().await {
1048 let id = fut?;
1049 self.events.sync(SyncIncrement::PushingDocument(id, false));
1050 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1051 idx += 1;
1052 }
1053 ctx.pushed_docs = updates;
1054
1055 let mut tx = self.begin_tx().await;
1056 let db = tx.db();
1057 (&mut db.base_metadata)
1059 .to_lazy()
1060 .stage(local_changes_digests_only)
1061 .promote()?;
1062
1063 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1064
1065 tx.end();
1066
1067 Ok(())
1068 }
1069
1070 async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1071 let id = *diff.new.id();
1072 let hmac = diff.new.document_hmac();
1073 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1074 self.client
1075 .request(
1076 self.get_account()?,
1077 ChangeDocRequest { diff, new_content: local_document_change },
1078 )
1079 .await?;
1080
1081 Ok(id)
1082 }
1083
1084 async fn dedup(
1085 &self, updates: GetUpdatesResponse,
1086 ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1087 let tx = self.ro_tx().await;
1088 let db = tx.db();
1089
1090 let mut root_id = None;
1091 let (remote_changes, update_as_of) = {
1092 let mut remote_changes = updates.file_metadata;
1093 let update_as_of = updates.as_of_metadata_version;
1094
1095 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1096
1097 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1098
1099 let (_, remote_changes) = remote.unstage();
1100 (remote_changes, update_as_of)
1101 };
1102
1103 if db.root.get().is_none() {
1105 let root = remote_changes
1106 .all_files()?
1107 .into_iter()
1108 .find(|f| f.is_root())
1109 .ok_or(LbErrKind::RootNonexistent)?;
1110 root_id = Some(*root.id());
1111 }
1112
1113 Ok((remote_changes, update_as_of, root_id))
1114 }
1115
1116 async fn prune_remote_orphans(
1117 &self, remote_changes: Vec<SignedFile>,
1118 ) -> LbResult<Vec<SignedFile>> {
1119 let tx = self.ro_tx().await;
1120 let db = tx.db();
1121
1122 let me = Owner(self.keychain.get_pk()?);
1123 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1124 let mut result = Vec::new();
1125
1126 for id in remote.tree.staged.ids() {
1127 let meta = remote.find(&id)?;
1128 if remote.maybe_find_parent(meta).is_some()
1129 || meta
1130 .user_access_keys()
1131 .iter()
1132 .any(|k| k.encrypted_for == me.0)
1133 {
1134 result.push(remote.find(&id)?.clone()); }
1136 }
1137 Ok(result)
1138 }
1139
1140 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1141 ctx.msg("Cleaning up...");
1142 let mut tx = self.begin_tx().await;
1143 let db = tx.db();
1144 db.last_synced.insert(ctx.update_as_of as i64)?;
1145
1146 if let Some(root) = ctx.root {
1147 db.root.insert(root)?;
1148 }
1149
1150 Ok(())
1151 }
1152
1153 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1154 let tx = self.ro_tx().await;
1155 let db = tx.db();
1156 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1157
1158 Ok(self.get_timestamp_human_string(last_synced))
1159 }
1160
1161 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1162 if timestamp != 0 {
1163 Duration::milliseconds(clock::get_time().0 - timestamp)
1164 .format_human()
1165 .to_string()
1166 } else {
1167 "never".to_string()
1168 }
1169 }
1170}
1171
1172impl SyncContext {
1173 fn summarize(&self) -> SyncStatus {
1174 let mut local = HashSet::new();
1175 let mut server = HashSet::new();
1176 let mut work_units = vec![];
1177
1178 for meta in &self.pushed_metas {
1179 local.insert(meta.new.id());
1180 }
1181
1182 for meta in &self.pushed_docs {
1183 local.insert(meta.new.id());
1184 }
1185
1186 for meta in &self.remote_changes {
1187 server.insert(meta.id());
1188 }
1189
1190 for id in local {
1191 work_units.push(WorkUnit::LocalChange(*id));
1192 }
1193
1194 for id in server {
1195 work_units.push(WorkUnit::ServerChange(*id));
1196 }
1197
1198 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1199 }
1200
1201 fn msg(&mut self, msg: &str) {
1202 self.current += 1;
1203 if let Some(f) = &self.progress {
1204 f(SyncProgress {
1205 total: self.total,
1206 progress: self.current,
1207 file_being_processed: Default::default(),
1208 msg: msg.to_string(),
1209 })
1210 }
1211 }
1212
1213 fn file_msg(&mut self, id: Uuid, msg: &str) {
1214 self.current += 1;
1215 if let Some(f) = &self.progress {
1216 f(SyncProgress {
1217 total: self.total,
1218 progress: self.current,
1219 file_being_processed: Some(id),
1220 msg: msg.to_string(),
1221 })
1222 }
1223 }
1224
1225 fn done_msg(&mut self) {
1226 self.current = self.total;
1227 if let Some(f) = &self.progress {
1228 f(SyncProgress {
1229 total: self.total,
1230 progress: self.current,
1231 file_being_processed: None,
1232 msg: "Sync successful!".to_string(),
1233 })
1234 }
1235 }
1236}
1237
1238#[derive(Debug, Serialize, Clone)]
1239pub struct SyncStatus {
1240 pub work_units: Vec<WorkUnit>,
1241 pub latest_server_ts: u64,
1242}
1243
1244#[derive(Clone)]
1245pub struct SyncProgress {
1246 pub total: usize,
1247 pub progress: usize,
1248 pub file_being_processed: Option<Uuid>,
1249 pub msg: String,
1250}
1251
1252impl Display for SyncProgress {
1253 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1254 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1255 }
1256}
1257
1258#[derive(Debug, Clone)]
1259pub enum SyncIncrement {
1260 SyncStarted,
1261 PullingDocument(Uuid, bool),
1262 PushingDocument(Uuid, bool),
1263 SyncFinished(Option<LbErrKind>),
1264}