1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6 GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41 current: usize,
42 total: usize,
43
44 pk_cache: HashMap<Owner, String>,
45 last_synced: u64,
46 remote_changes: Vec<SignedMeta>,
47 update_as_of: u64,
48 root: Option<Uuid>,
49 pushed_metas: Vec<FileDiff<SignedMeta>>,
50 pushed_docs: Vec<FileDiff<SignedMeta>>,
51 pulled_docs: Vec<Uuid>,
52}
53
54impl Lb {
55 #[instrument(level = "debug", skip_all, err(Debug))]
56 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
57 let tx = self.ro_tx().await;
58 let db = tx.db();
59 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
60 drop(tx);
61
62 let remote_changes = self
63 .client
64 .request(
65 self.get_account()?,
66 GetUpdatesRequestV2 { since_metadata_version: last_synced },
67 )
68 .await?;
69 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
70 let remote_dirty = deduped
71 .into_iter()
72 .map(|f| *f.id())
73 .map(WorkUnit::ServerChange);
74
75 self.prune().await?;
76
77 let tx = self.ro_tx().await;
78 let db = tx.db();
79
80 let locally_dirty = db
81 .local_metadata
82 .get()
83 .keys()
84 .copied()
85 .map(WorkUnit::LocalChange);
86
87 let mut work_units: Vec<WorkUnit> = Vec::new();
88 work_units.extend(locally_dirty.chain(remote_dirty));
89 Ok(SyncStatus { work_units, latest_server_ts })
90 }
91
92 #[instrument(level = "debug", skip_all, err(Debug))]
93 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
94 let old = self.syncing.swap(true, Ordering::SeqCst);
95 if old {
96 return Err(LbErrKind::AlreadySyncing.into());
97 }
98
99 let mut ctx = self.setup_sync(f).await?;
100
101 let mut got_updates = false;
102 let mut pipeline: LbResult<()> = async {
103 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
105 self.prune().await?;
106 got_updates = self.fetch_meta(&mut ctx).await?;
107 self.populate_pk_cache(&mut ctx).await?;
108 self.docs.dont_delete.store(true, Ordering::SeqCst);
109 self.fetch_docs(&mut ctx).await?;
110 self.merge(&mut ctx).await?;
111 self.push_meta(&mut ctx).await?;
112 self.push_docs(&mut ctx).await?;
113 Ok(())
114 }
115 .await;
116
117 self.docs.dont_delete.store(false, Ordering::SeqCst);
118
119 if pipeline.is_ok() {
120 pipeline = self.commit_last_synced(&mut ctx).await;
121 }
122
123 let cleanup = self.cleanup().await;
124
125 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
126 self.events.sync(SyncIncrement::SyncFinished(ekind));
127
128 self.syncing.store(false, Ordering::Relaxed);
129 pipeline?;
130 cleanup?;
131
132 ctx.done_msg();
134
135 if got_updates {
136 self.events.meta_changed();
138 for id in &ctx.pulled_docs {
139 self.events.doc_written(*id, Some(Actor::Sync));
140 }
141 }
142
143 Ok(ctx.summarize())
144 }
145
146 async fn setup_sync(
147 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
148 ) -> LbResult<SyncContext> {
149 let tx = self.ro_tx().await;
150 let db = tx.db();
151
152 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
153 let pk_cache = db.pub_key_lookup.get().clone();
154
155 let current = 0;
156 let total = 7;
157
158 Ok(SyncContext {
159 last_synced,
160 pk_cache,
161
162 progress,
163 current,
164 total,
165
166 root: Default::default(),
167 update_as_of: Default::default(),
168 remote_changes: Default::default(),
169 pushed_docs: Default::default(),
170 pushed_metas: Default::default(),
171 pulled_docs: Default::default(),
172 })
173 }
174
175 async fn prune(&self) -> LbResult<()> {
176 let server_ids = self
177 .client
178 .request(self.get_account()?, GetFileIdsRequest {})
179 .await?
180 .ids;
181
182 let mut tx = self.begin_tx().await;
183 let db = tx.db();
184
185 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
186 let base_ids = local.tree.base.ids();
187
188 let mut prunable_ids = base_ids;
189 prunable_ids.retain(|id| !server_ids.contains(id));
190 for id in prunable_ids.clone() {
191 prunable_ids.extend(local.descendants(&id)?.into_iter());
192 }
193 for id in &prunable_ids {
194 if let Some(base_file) = local.tree.base.maybe_find(id) {
195 self.docs
196 .delete(*id, base_file.document_hmac().copied())
197 .await?;
198 }
199 if let Some(local_file) = local.maybe_find(id) {
200 self.docs
201 .delete(*id, local_file.document_hmac().copied())
202 .await?;
203 }
204 }
205
206 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
207 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
208 base_staged.promote()?;
209
210 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
211 local_staged.tree.removed = prunable_ids.into_iter().collect();
212 local_staged.promote()?;
213
214 Ok(())
215 }
216
217 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
219 ctx.msg("Fetching tree updates...");
220 let updates = self
221 .client
222 .request(
223 self.get_account()?,
224 GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
225 )
226 .await?;
227
228 let empty = updates.file_metadata.is_empty();
229 let (remote, as_of, root) = self.dedup(updates).await?;
230
231 ctx.remote_changes = remote;
232 ctx.update_as_of = as_of;
233 ctx.root = root;
234
235 Ok(!empty)
236 }
237
238 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
239 ctx.msg("Updating public key cache...");
240 let mut all_owners = HashSet::new();
241 for file in &ctx.remote_changes {
242 for user_access_key in file.user_access_keys() {
243 all_owners.insert(Owner(user_access_key.encrypted_by));
244 all_owners.insert(Owner(user_access_key.encrypted_for));
245 }
246 }
247
248 let mut new_entries = HashMap::new();
249
250 for owner in all_owners {
251 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
252 let username_result = self
253 .client
254 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
255 .await;
256 let username = match username_result {
257 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
258 "<unknown>".to_string()
259 }
260 _ => username_result?.username.clone(),
261 };
262 new_entries.insert(owner, username.clone());
263 e.insert(username.clone());
264 }
265 }
266
267 let mut tx = self.begin_tx().await;
268 let db = tx.db();
269
270 for (owner, username) in new_entries {
271 db.pub_key_lookup.insert(owner, username)?;
272 }
273 Ok(())
274 }
275
276 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
277 ctx.msg("Fetching documents...");
278 let mut docs_to_pull = vec![];
279
280 let tx = self.ro_tx().await;
281 let db = tx.db();
282 let start = Instant::now();
283
284 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
286 if remote.calculate_deleted(&id)? {
287 continue;
288 }
289 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
290 let base_hmac = remote
291 .tree
292 .base
293 .maybe_find(&id)
294 .and_then(|f| f.document_hmac())
295 .cloned();
296 if base_hmac == remote_hmac {
297 continue;
298 }
299
300 if let Some(remote_hmac) = remote_hmac {
301 docs_to_pull.push((id, remote_hmac));
302 self.events.sync(SyncIncrement::PullingDocument(id, true));
303 }
304 }
305
306 drop(tx);
307 if start.elapsed() > std::time::Duration::from_millis(100) {
308 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
309 }
310
311 let num_docs = docs_to_pull.len();
312 ctx.total += num_docs;
313
314 let futures = docs_to_pull
315 .into_iter()
316 .map(|(id, hmac)| self.fetch_doc(id, hmac));
317
318 let mut stream = stream::iter(futures).buffer_unordered(
319 thread::available_parallelism()
320 .unwrap_or(NonZeroUsize::new(4).unwrap())
321 .into(),
322 );
323
324 let mut idx = 0;
325 while let Some(fut) = stream.next().await {
326 let id = fut?;
327 ctx.pulled_docs.push(id);
328 self.events.sync(SyncIncrement::PullingDocument(id, false));
329 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
330 idx += 1;
331 }
332 Ok(())
333 }
334
335 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
336 let remote_document = self
337 .client
338 .request(self.get_account()?, GetDocRequest { id, hmac })
339 .await?;
340 self.docs
341 .insert(id, Some(hmac), &remote_document.content)
342 .await?;
343
344 Ok(id)
345 }
346
347 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
350 let mut tx = self.begin_tx().await;
351 let db = tx.db();
352 let start = Instant::now();
353
354 let remote_changes = &ctx.remote_changes;
355
356 let me = Owner(self.keychain.get_pk()?);
358
359 let merge_changes = {
361 let mut base = (&db.base_metadata).to_lazy();
363 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
364 let mut remote = remote_unlazy.as_lazy();
365 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
366
367 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
369 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
370 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
371 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
372 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
373
374 'merge_construction: loop {
375 let mut deletions = {
377 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
378
379 let mut deletion_creations = HashSet::new();
381 for id in db.local_metadata.ids() {
382 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
383 deletion_creations.insert(id);
384 }
385 }
386 'drain_creations: while !deletion_creations.is_empty() {
387 'choose_a_creation: for id in &deletion_creations {
388 let id = *id;
390 let local_file = local.find(&id)?.clone();
391 let result = deletions.create_unvalidated(
392 id,
393 symkey::generate_key(),
394 local_file.parent(),
395 &local.name(&id, &self.keychain)?,
396 local_file.file_type(),
397 &self.keychain,
398 );
399 match result {
400 Ok(_) => {
401 deletion_creations.remove(&id);
402 continue 'drain_creations;
403 }
404 Err(ref err) => match err.kind {
405 LbErrKind::FileParentNonexistent => {
406 continue 'choose_a_creation;
407 }
408 _ => {
409 result?;
410 }
411 },
412 }
413 }
414 return Err(LbErrKind::Unexpected(format!(
415 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
416 ))
417 .into());
418 }
419
420 for id in db.local_metadata.ids() {
422 let local_file = local.find(&id)?.clone();
423 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
424 if !local_file.explicitly_deleted()
425 && local_file.parent() != base_file.parent()
426 && !files_to_unmove.contains(&id)
427 {
428 deletions.move_unvalidated(
430 &id,
431 local_file.parent(),
432 &self.keychain,
433 )?;
434 }
435 }
436 }
437
438 for id in db.local_metadata.ids() {
440 let local_file = local.find(&id)?.clone();
441 if local_file.explicitly_deleted() {
442 deletions.delete_unvalidated(&id, &self.keychain)?;
444 }
445 }
446 deletions
447 };
448
449 let mut merge = {
451 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
452
453 let mut creations = HashSet::new();
455 for id in db.local_metadata.ids() {
456 if deletions.maybe_find(&id).is_some()
457 && !deletions.calculate_deleted(&id)?
458 && remote.maybe_find(&id).is_none()
459 && !links_to_delete.contains(&id)
460 {
461 creations.insert(id);
462 }
463 }
464 'drain_creations: while !creations.is_empty() {
465 'choose_a_creation: for id in &creations {
466 let id = *id;
468 let local_file = local.find(&id)?.clone();
469 let result = merge.create_unvalidated(
470 id,
471 local.decrypt_key(&id, &self.keychain)?,
472 local_file.parent(),
473 &local.name(&id, &self.keychain)?,
474 local_file.file_type(),
475 &self.keychain,
476 );
477 match result {
478 Ok(_) => {
479 creations.remove(&id);
480 continue 'drain_creations;
481 }
482 Err(ref err) => match err.kind {
483 LbErrKind::FileParentNonexistent => {
484 continue 'choose_a_creation;
485 }
486 _ => {
487 result?;
488 }
489 },
490 }
491 }
492 return Err(LbErrKind::Unexpected(format!(
493 "sync failed to find a topomodelal order for file creations: {creations:?}"
494 ))
495 .into());
496 }
497
498 for id in db.local_metadata.ids() {
501 if deletions.maybe_find(&id).is_none()
503 || deletions.calculate_deleted(&id)?
504 || (remote.maybe_find(&id).is_some()
505 && remote.calculate_deleted(&id)?)
506 {
507 continue;
508 }
509
510 let local_file = local.find(&id)?.clone();
511 let local_name = local.name(&id, &self.keychain)?;
512 let maybe_base_file = base.maybe_find(&id).cloned();
513 let maybe_remote_file = remote.maybe_find(&id).cloned();
514 if let Some(ref base_file) = maybe_base_file {
515 let base_name = base.name(&id, &self.keychain)?;
516 let remote_file = remote.find(&id)?.clone();
517 let remote_name = remote.name(&id, &self.keychain)?;
518
519 if local_file.parent() != base_file.parent()
521 && remote_file.parent() == base_file.parent()
522 && !files_to_unmove.contains(&id)
523 {
524 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
525 }
526
527 if local_name != base_name && remote_name == base_name {
529 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
530 }
531 }
532
533 let mut remote_keys = HashMap::new();
535 if let Some(ref remote_file) = maybe_remote_file {
536 for key in remote_file.user_access_keys() {
537 remote_keys.insert(
538 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
539 (key.mode, key.deleted),
540 );
541 }
542 }
543 for key in local_file.user_access_keys() {
544 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
545 if let Some(&(remote_mode, remote_deleted)) =
546 remote_keys.get(&(by, for_))
547 {
548 if key.mode > remote_mode || !key.deleted && remote_deleted {
550 let mode = match key.mode {
551 UserAccessMode::Read => ShareMode::Read,
552 UserAccessMode::Write => ShareMode::Write,
553 UserAccessMode::Owner => continue,
554 };
555 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
556 }
557 if key.deleted && !remote_deleted {
559 merge.delete_share_unvalidated(
560 &id,
561 Some(for_.0),
562 &self.keychain,
563 )?;
564 }
565 } else {
566 let mode = match key.mode {
568 UserAccessMode::Read => ShareMode::Read,
569 UserAccessMode::Write => ShareMode::Write,
570 UserAccessMode::Owner => continue,
571 };
572 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
573 }
574 }
575
576 if files_to_unshare.contains(&id) {
578 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
579 }
580
581 if let Some(&rename_increment) = rename_increments.get(&id) {
583 let name = NameComponents::from(&local_name)
584 .generate_incremented(rename_increment)
585 .to_name();
586 merge.rename_unvalidated(&id, &name, &self.keychain)?;
587 }
588
589 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
591 let remote_hmac =
592 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
593 let local_hmac = local_file.document_hmac().cloned();
594 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
595 && local_hmac != base_hmac
596 {
597 if remote_hmac != base_hmac && remote_hmac != local_hmac {
598 let merge_name = merge.name(&id, &self.keychain)?;
600 let document_type =
601 DocumentType::from_file_name_using_extension(&merge_name);
602
603 let base_document =
606 self.read_document_helper(id, &mut base).await?;
607 let remote_document =
608 self.read_document_helper(id, &mut remote).await?;
609 let local_document =
610 self.read_document_helper(id, &mut local).await?;
611
612 match document_type {
613 DocumentType::Text => {
614 let base_document =
617 String::from_utf8_lossy(&base_document).to_string();
618 let remote_document =
619 String::from_utf8_lossy(&remote_document).to_string();
620 let local_document =
621 String::from_utf8_lossy(&local_document).to_string();
622 let merged_document = Buffer::from(base_document.as_str())
623 .merge(local_document, remote_document);
624 let encrypted_document = merge
625 .update_document_unvalidated(
626 &id,
627 &merged_document.into_bytes(),
628 &self.keychain,
629 )?;
630 let hmac = merge.find(&id)?.document_hmac().copied();
631 self.docs.insert(id, hmac, &encrypted_document).await?;
632 }
633 DocumentType::Drawing => {
634 let base_document =
635 String::from_utf8_lossy(&base_document).to_string();
636 let remote_document =
637 String::from_utf8_lossy(&remote_document).to_string();
638 let local_document =
639 String::from_utf8_lossy(&local_document).to_string();
640
641 let base_buffer = svg::buffer::Buffer::new(&base_document);
642 let remote_buffer =
643 svg::buffer::Buffer::new(&remote_document);
644 let mut local_buffer =
645 svg::buffer::Buffer::new(&local_document);
646
647 for (_, el) in local_buffer.elements.iter_mut() {
648 if let Element::Path(path) = el {
649 path.data.apply_transform(u_transform_to_bezier(
650 &Transform::from(
651 local_buffer
652 .weak_viewport_settings
653 .master_transform,
654 ),
655 ));
656 }
657 }
658 svg::buffer::Buffer::reload(
659 &mut local_buffer.elements,
660 &mut local_buffer.weak_images,
661 &mut local_buffer.weak_path_pressures,
662 &mut local_buffer.weak_viewport_settings,
663 &base_buffer,
664 &remote_buffer,
665 );
666
667 let merged_document = local_buffer.serialize();
668 let encrypted_document = merge
669 .update_document_unvalidated(
670 &id,
671 &merged_document.into_bytes(),
672 &self.keychain,
673 )?;
674 let hmac = merge.find(&id)?.document_hmac().copied();
675 self.docs.insert(id, hmac, &encrypted_document).await?;
676 }
677 DocumentType::Other => {
678 let merge_parent = *merge.find(&id)?.parent();
680 let duplicate_id = if let Some(&duplicate_id) =
681 duplicate_file_ids.get(&id)
682 {
683 duplicate_id
684 } else {
685 let duplicate_id = Uuid::new_v4();
686 duplicate_file_ids.insert(id, duplicate_id);
687 rename_increments.insert(duplicate_id, 1);
688 duplicate_id
689 };
690
691 let mut merge_name = merge_name;
692 merge_name = NameComponents::from(&merge_name)
693 .generate_incremented(
694 rename_increments
695 .get(&duplicate_id)
696 .copied()
697 .unwrap_or_default(),
698 )
699 .to_name();
700
701 merge.create_unvalidated(
702 duplicate_id,
703 symkey::generate_key(),
704 &merge_parent,
705 &merge_name,
706 FileType::Document,
707 &self.keychain,
708 )?;
709 let encrypted_document = merge
710 .update_document_unvalidated(
711 &duplicate_id,
712 &local_document,
713 &self.keychain,
714 )?;
715 let duplicate_hmac =
716 merge.find(&duplicate_id)?.document_hmac().copied();
717 self.docs
718 .insert(
719 duplicate_id,
720 duplicate_hmac,
721 &encrypted_document,
722 )
723 .await?;
724 }
725 }
726 } else {
727 let document = self.read_document_helper(id, &mut local).await?;
729 merge.update_document_unvalidated(
730 &id,
731 &document,
732 &self.keychain,
733 )?;
734 }
735 }
736 }
737
738 for id in db.local_metadata.ids() {
741 if db.base_metadata.maybe_find(&id).is_some()
742 && deletions.calculate_deleted(&id)?
743 && !merge.calculate_deleted(&id)?
744 {
745 merge.delete_unvalidated(&id, &self.keychain)?;
747 }
748 }
749 for &id in &links_to_delete {
750 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
752 merge.delete_unvalidated(&id, &self.keychain)?;
753 }
754 }
755
756 merge
757 };
758
759 for link in merge.ids() {
761 if !merge.calculate_deleted(&link)? {
762 if let FileType::Link { target } = merge.find(&link)?.file_type() {
763 if merge.maybe_find(&target).is_some()
764 && merge.calculate_deleted(&target)?
765 {
766 if links_to_delete.insert(link) {
768 continue 'merge_construction;
769 } else {
770 return Err(LbErrKind::Unexpected(format!(
771 "sync failed to resolve broken link (deletion): {link:?}"
772 ))
773 .into());
774 }
775 }
776 }
777 }
778 }
779
780 let validate_result = merge.validate(me);
781 match validate_result {
782 Ok(_) => {
784 let (_, merge_changes) = merge.unstage();
785 break merge_changes;
786 }
787 Err(ref err) => match err.kind {
788 LbErrKind::Validation(ref vf) => match vf {
789 ValidationFailure::Cycle(ids) => {
791 let mut progress = false;
793 for &id in ids {
794 if db.local_metadata.maybe_find(&id).is_some()
795 && files_to_unmove.insert(id)
796 {
797 progress = true;
798 }
799 }
800 if !progress {
801 return Err(LbErrKind::Unexpected(format!(
802 "sync failed to resolve cycle: {ids:?}"
803 ))
804 .into());
805 }
806 }
807 ValidationFailure::PathConflict(ids) => {
808 let mut progress = false;
810 for &id in ids {
811 if duplicate_file_ids.values().any(|&dup| dup == id) {
812 *rename_increments.entry(id).or_insert(0) += 1;
813 progress = true;
814 break;
815 }
816 }
817 if !progress {
818 for &id in ids {
819 if db.local_metadata.maybe_find(&id).is_some() {
820 *rename_increments.entry(id).or_insert(0) += 1;
821 progress = true;
822 break;
823 }
824 }
825 }
826 if !progress {
827 return Err(LbErrKind::Unexpected(format!(
828 "sync failed to resolve path conflict: {ids:?}"
829 ))
830 .into());
831 }
832 }
833 ValidationFailure::SharedLink { link, shared_ancestor } => {
834 let mut progress = false;
836 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
837 {
838 if !base_shared_ancestor.is_shared()
839 && files_to_unshare.insert(*shared_ancestor)
840 {
841 progress = true;
842 }
843 }
844 if !progress && links_to_delete.insert(*link) {
845 progress = true;
846 }
847 if !progress {
848 return Err(LbErrKind::Unexpected(format!(
849 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
850 )).into());
851 }
852 }
853 ValidationFailure::DuplicateLink { target } => {
854 let mut progress = false;
856 if let Some(link) = local.linked_by(target)? {
857 if links_to_delete.insert(link) {
858 progress = true;
859 }
860 }
861 if !progress {
862 return Err(LbErrKind::Unexpected(format!(
863 "sync failed to resolve duplicate link: target: {target:?}"
864 ))
865 .into());
866 }
867 }
868 ValidationFailure::BrokenLink(link) => {
869 if !links_to_delete.insert(*link) {
871 return Err(LbErrKind::Unexpected(format!(
872 "sync failed to resolve broken link: {link:?}"
873 ))
874 .into());
875 }
876 }
877 ValidationFailure::OwnedLink(link) => {
878 let mut progress = false;
880 if let Some(remote_link) = remote.maybe_find(link) {
881 if let FileType::Link { target } = remote_link.file_type() {
882 let remote_target = remote.find(&target)?;
883 if remote_target.owner() != me
884 && files_to_unmove.insert(target)
885 {
886 progress = true;
887 }
888 }
889 }
890 if !progress && links_to_delete.insert(*link) {
891 progress = true;
892 }
893 if !progress {
894 return Err(LbErrKind::Unexpected(format!(
895 "sync failed to resolve owned link: {link:?}"
896 ))
897 .into());
898 }
899 }
900 ValidationFailure::Orphan(_)
902 | ValidationFailure::NonFolderWithChildren(_)
903 | ValidationFailure::FileWithDifferentOwnerParent(_)
904 | ValidationFailure::FileNameTooLong(_)
905 | ValidationFailure::DeletedFileUpdated(_)
906 | ValidationFailure::NonDecryptableFileName(_) => {
907 validate_result?;
908 }
909 },
910 _ => {
912 validate_result?;
913 }
914 },
915 }
916 }
917 };
918
919 (&mut db.base_metadata)
921 .to_staged(remote_changes.clone())
922 .to_lazy()
923 .promote()?;
924 db.local_metadata.clear()?;
925 (&mut db.local_metadata)
926 .to_staged(merge_changes)
927 .to_lazy()
928 .promote()?;
929
930 db.base_metadata.stage(&mut db.local_metadata).prune()?;
933
934 if start.elapsed() > std::time::Duration::from_millis(100) {
935 warn!("sync merge held lock for {:?}", start.elapsed());
936 }
937
938 Ok(())
939 }
940
941 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
943 ctx.msg("Pushing tree changes...");
944 let mut updates = vec![];
945 let mut local_changes_no_digests = Vec::new();
946
947 let tx = self.ro_tx().await;
948 let db = tx.db();
949
950 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
952
953 for id in local.tree.staged.ids() {
954 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
955 let maybe_base_file = local.tree.base.maybe_find(&id);
956
957 local_change.set_hmac_and_size(
959 maybe_base_file.and_then(|f| f.document_hmac().copied()),
960 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
961 );
962 let local_change = local_change.sign(&self.keychain)?;
963
964 local_changes_no_digests.push(local_change.clone());
965 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
966 updates.push(file_diff);
967 }
968
969 drop(tx);
970
971 if !updates.is_empty() {
972 self.client
973 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
974 .await?;
975 ctx.pushed_metas = updates;
976 }
977
978 let mut tx = self.begin_tx().await;
979 let db = tx.db();
980
981 (&mut db.base_metadata)
983 .to_lazy()
984 .stage(local_changes_no_digests)
985 .promote()?;
986 db.base_metadata.stage(&mut db.local_metadata).prune()?;
987
988 tx.end();
989
990 Ok(())
991 }
992
993 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
995 ctx.msg("Pushing document changes...");
996 let mut updates = vec![];
997 let mut local_changes_digests_only = vec![];
998
999 let tx = self.ro_tx().await;
1000 let db = tx.db();
1001 let start = Instant::now();
1002
1003 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1004
1005 for id in local.tree.staged.ids() {
1006 let base_file = local.tree.base.find(&id)?.clone();
1007
1008 let mut local_change = base_file.timestamped_value.value.clone();
1010 local_change.set_hmac_and_size(
1011 local.find(&id)?.document_hmac().copied(),
1012 *local.find(&id)?.timestamped_value.value.doc_size(),
1013 );
1014
1015 if base_file.document_hmac() == local_change.document_hmac()
1016 || local_change.document_hmac().is_none()
1017 {
1018 continue;
1019 }
1020
1021 let local_change = local_change.sign(&self.keychain)?;
1022
1023 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1024 local_changes_digests_only.push(local_change);
1025 self.events.sync(SyncIncrement::PushingDocument(id, true));
1026 }
1027
1028 drop(tx);
1029 if start.elapsed() > std::time::Duration::from_millis(100) {
1030 warn!("sync push_docs held lock for {:?}", start.elapsed());
1031 }
1032
1033 let docs_count = updates.len();
1034 ctx.total += docs_count;
1035 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1036
1037 let mut stream = stream::iter(futures).buffer_unordered(
1038 thread::available_parallelism()
1039 .unwrap_or(NonZeroUsize::new(4).unwrap())
1040 .into(),
1041 );
1042
1043 let mut idx = 0;
1044 while let Some(fut) = stream.next().await {
1045 let id = fut?;
1046 self.events.sync(SyncIncrement::PushingDocument(id, false));
1047 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1048 idx += 1;
1049 }
1050 ctx.pushed_docs = updates;
1051
1052 let mut tx = self.begin_tx().await;
1053 let db = tx.db();
1054 (&mut db.base_metadata)
1056 .to_lazy()
1057 .stage(local_changes_digests_only)
1058 .promote()?;
1059
1060 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1061
1062 tx.end();
1063
1064 Ok(())
1065 }
1066
1067 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1068 let id = *diff.new.id();
1069 let hmac = diff.new.document_hmac();
1070 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1071 self.client
1072 .request(
1073 self.get_account()?,
1074 ChangeDocRequestV2 { diff, new_content: local_document_change },
1075 )
1076 .await?;
1077
1078 Ok(id)
1079 }
1080
1081 async fn dedup(
1082 &self, updates: GetUpdatesResponseV2,
1083 ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1084 let tx = self.ro_tx().await;
1085 let db = tx.db();
1086
1087 let mut root_id = None;
1088 let (remote_changes, update_as_of) = {
1089 let mut remote_changes = updates.file_metadata;
1090 let update_as_of = updates.as_of_metadata_version;
1091
1092 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1093
1094 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1095
1096 let (_, remote_changes) = remote.unstage();
1097 (remote_changes, update_as_of)
1098 };
1099
1100 if db.root.get().is_none() {
1102 let root = remote_changes
1103 .all_files()?
1104 .into_iter()
1105 .find(|f| f.is_root())
1106 .ok_or(LbErrKind::RootNonexistent)?;
1107 root_id = Some(*root.id());
1108 }
1109
1110 Ok((remote_changes, update_as_of, root_id))
1111 }
1112
1113 async fn prune_remote_orphans(
1114 &self, remote_changes: Vec<SignedMeta>,
1115 ) -> LbResult<Vec<SignedMeta>> {
1116 let tx = self.ro_tx().await;
1117 let db = tx.db();
1118
1119 let me = Owner(self.keychain.get_pk()?);
1120 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1121 let mut result = Vec::new();
1122
1123 for id in remote.tree.staged.ids() {
1124 let meta = remote.find(&id)?;
1125 if remote.maybe_find_parent(meta).is_some()
1126 || meta
1127 .user_access_keys()
1128 .iter()
1129 .any(|k| k.encrypted_for == me.0)
1130 {
1131 result.push(remote.find(&id)?.clone()); }
1133 }
1134 Ok(result)
1135 }
1136
1137 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1138 ctx.msg("Cleaning up...");
1139 let mut tx = self.begin_tx().await;
1140 let db = tx.db();
1141 db.last_synced.insert(ctx.update_as_of as i64)?;
1142
1143 if let Some(root) = ctx.root {
1144 db.root.insert(root)?;
1145 }
1146
1147 Ok(())
1148 }
1149
1150 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1151 let tx = self.ro_tx().await;
1152 let db = tx.db();
1153 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1154
1155 Ok(self.get_timestamp_human_string(last_synced))
1156 }
1157
1158 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1159 if timestamp != 0 {
1160 Duration::milliseconds(clock::get_time().0 - timestamp)
1161 .format_human()
1162 .to_string()
1163 } else {
1164 "never".to_string()
1165 }
1166 }
1167}
1168
1169impl SyncContext {
1170 fn summarize(&self) -> SyncStatus {
1171 let mut local = HashSet::new();
1172 let mut server = HashSet::new();
1173 let mut work_units = vec![];
1174
1175 for meta in &self.pushed_metas {
1176 local.insert(meta.new.id());
1177 }
1178
1179 for meta in &self.pushed_docs {
1180 local.insert(meta.new.id());
1181 }
1182
1183 for meta in &self.remote_changes {
1184 server.insert(meta.id());
1185 }
1186
1187 for id in local {
1188 work_units.push(WorkUnit::LocalChange(*id));
1189 }
1190
1191 for id in server {
1192 work_units.push(WorkUnit::ServerChange(*id));
1193 }
1194
1195 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1196 }
1197
1198 fn msg(&mut self, msg: &str) {
1199 self.current += 1;
1200 if let Some(f) = &self.progress {
1201 f(SyncProgress {
1202 total: self.total,
1203 progress: self.current,
1204 file_being_processed: Default::default(),
1205 msg: msg.to_string(),
1206 })
1207 }
1208 }
1209
1210 fn file_msg(&mut self, id: Uuid, msg: &str) {
1211 self.current += 1;
1212 if let Some(f) = &self.progress {
1213 f(SyncProgress {
1214 total: self.total,
1215 progress: self.current,
1216 file_being_processed: Some(id),
1217 msg: msg.to_string(),
1218 })
1219 }
1220 }
1221
1222 fn done_msg(&mut self) {
1223 self.current = self.total;
1224 if let Some(f) = &self.progress {
1225 f(SyncProgress {
1226 total: self.total,
1227 progress: self.current,
1228 file_being_processed: None,
1229 msg: "Sync successful!".to_string(),
1230 })
1231 }
1232 }
1233}
1234
1235#[derive(Debug, Serialize, Clone)]
1236pub struct SyncStatus {
1237 pub work_units: Vec<WorkUnit>,
1238 pub latest_server_ts: u64,
1239}
1240
1241#[derive(Clone)]
1242pub struct SyncProgress {
1243 pub total: usize,
1244 pub progress: usize,
1245 pub file_being_processed: Option<Uuid>,
1246 pub msg: String,
1247}
1248
1249impl Display for SyncProgress {
1250 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1251 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1252 }
1253}
1254
1255#[derive(Debug, Clone)]
1256pub enum SyncIncrement {
1257 SyncStarted,
1258 PullingDocument(Uuid, bool),
1259 PushingDocument(Uuid, bool),
1260 SyncFinished(Option<LbErrKind>),
1261}