1use crate::Lb;
2use crate::io::network::ApiError;
3use crate::model::access_info::UserAccessMode;
4use crate::model::api::{
5 ChangeDocRequestV2, GetDocRequest, GetFileIdsRequest, GetUpdatesRequestV2,
6 GetUpdatesResponseV2, GetUsernameError, GetUsernameRequest, UpsertRequestV2,
7};
8use crate::model::errors::{LbErrKind, LbResult};
9use crate::model::file::ShareMode;
10use crate::model::file_like::FileLike;
11use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
12use crate::model::filename::{DocumentType, NameComponents};
13use crate::model::signed_meta::SignedMeta;
14use crate::model::staged::StagedTreeLikeMut;
15use crate::model::svg::buffer::u_transform_to_bezier;
16use crate::model::svg::element::Element;
17use crate::model::text::buffer::Buffer;
18use crate::model::tree_like::TreeLike;
19use crate::model::work_unit::WorkUnit;
20use crate::model::{ValidationFailure, clock, svg, symkey};
21pub use basic_human_duration::ChronoHumanDuration;
22use futures::{StreamExt, stream};
23use serde::Serialize;
24use std::collections::{HashMap, HashSet, hash_map};
25use std::fmt::{Display, Formatter};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::thread;
30use std::time::Instant;
31use time::Duration;
32use usvg::Transform;
33use uuid::Uuid;
34
35use super::events::Actor;
36
37pub type SyncFlag = Arc<AtomicBool>;
38
39pub struct SyncContext {
40 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
41 current: usize,
42 total: usize,
43
44 pk_cache: HashMap<Owner, String>,
45 last_synced: u64,
46 remote_changes: Vec<SignedMeta>,
47 update_as_of: u64,
48
49 new_root: Option<Uuid>,
51 pushed_metas: Vec<FileDiff<SignedMeta>>,
52 pushed_docs: Vec<FileDiff<SignedMeta>>,
53 pulled_docs: Vec<Uuid>,
54}
55
56impl Lb {
57 #[instrument(level = "debug", skip_all, err(Debug))]
58 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
59 let tx = self.ro_tx().await;
60 let db = tx.db();
61 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
62 drop(tx);
63
64 let remote_changes = self
65 .client
66 .request(
67 self.get_account()?,
68 GetUpdatesRequestV2 { since_metadata_version: last_synced },
69 )
70 .await?;
71 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
72 let remote_dirty = deduped
73 .into_iter()
74 .map(|f| *f.id())
75 .map(WorkUnit::ServerChange);
76
77 self.prune().await?;
78
79 let tx = self.ro_tx().await;
80 let db = tx.db();
81
82 let locally_dirty = db
83 .local_metadata
84 .get()
85 .keys()
86 .copied()
87 .map(WorkUnit::LocalChange);
88
89 let mut work_units: Vec<WorkUnit> = Vec::new();
90 work_units.extend(locally_dirty.chain(remote_dirty));
91 Ok(SyncStatus { work_units, latest_server_ts })
92 }
93
94 #[instrument(level = "debug", skip_all, err(Debug))]
95 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
96 let old = self.syncing.swap(true, Ordering::SeqCst);
97 if old {
98 return Err(LbErrKind::AlreadySyncing.into());
99 }
100
101 let mut ctx = self.setup_sync(f).await?;
102
103 let mut got_updates = false;
104 let mut pipeline: LbResult<()> = async {
105 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
107 self.prune().await?;
108 got_updates = self.fetch_meta(&mut ctx).await?;
109 self.populate_pk_cache(&mut ctx).await?;
110 self.docs.dont_delete.store(true, Ordering::SeqCst);
111 self.fetch_docs(&mut ctx).await?;
112 self.merge(&mut ctx).await?;
113 self.push_meta(&mut ctx).await?;
114 self.push_docs(&mut ctx).await?;
115 Ok(())
116 }
117 .await;
118
119 self.docs.dont_delete.store(false, Ordering::SeqCst);
120
121 if pipeline.is_ok() {
122 pipeline = self.commit_last_synced(&mut ctx).await;
123 }
124
125 let cleanup = self.cleanup().await;
126
127 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
128 self.events.sync(SyncIncrement::SyncFinished(ekind));
129
130 self.syncing.store(false, Ordering::Relaxed);
131 pipeline?;
132 cleanup?;
133
134 ctx.done_msg();
136
137 if got_updates {
138 self.events.meta_changed();
140 let owner = self.keychain.get_pk().map(Owner).ok();
141 if ctx.remote_changes.iter().any(|f| Some(f.owner()) != owner) {
144 self.events.pending_shares_changed();
145 }
146 for id in &ctx.pulled_docs {
147 self.events.doc_written(*id, Some(Actor::Sync));
148 }
149 }
150
151 Ok(ctx.summarize())
152 }
153
154 async fn setup_sync(
155 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
156 ) -> LbResult<SyncContext> {
157 let tx = self.ro_tx().await;
158 let db = tx.db();
159
160 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
161 let pk_cache = db.pub_key_lookup.get().clone();
162
163 let current = 0;
164 let total = 7;
165
166 Ok(SyncContext {
167 last_synced,
168 pk_cache,
169
170 progress,
171 current,
172 total,
173
174 new_root: Default::default(),
175 update_as_of: Default::default(),
176 remote_changes: Default::default(),
177 pushed_docs: Default::default(),
178 pushed_metas: Default::default(),
179 pulled_docs: Default::default(),
180 })
181 }
182
183 async fn prune(&self) -> LbResult<()> {
184 let server_ids = self
185 .client
186 .request(self.get_account()?, GetFileIdsRequest {})
187 .await?
188 .ids;
189
190 let mut tx = self.begin_tx().await;
191 let db = tx.db();
192
193 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
194 let base_ids = local.tree.base.ids();
195
196 let mut prunable_ids = base_ids;
197 prunable_ids.retain(|id| !server_ids.contains(id));
198 for id in prunable_ids.clone() {
199 prunable_ids.extend(local.descendants(&id)?.into_iter());
200 }
201 for id in &prunable_ids {
202 if let Some(base_file) = local.tree.base.maybe_find(id) {
203 self.docs
204 .delete(*id, base_file.document_hmac().copied())
205 .await?;
206 }
207 if let Some(local_file) = local.maybe_find(id) {
208 self.docs
209 .delete(*id, local_file.document_hmac().copied())
210 .await?;
211 }
212 }
213
214 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
215 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
216 base_staged.promote()?;
217
218 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
219 local_staged.tree.removed = prunable_ids.into_iter().collect();
220 local_staged.promote()?;
221
222 Ok(())
223 }
224
225 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
227 ctx.msg("Fetching tree updates...");
228 let updates = self
229 .client
230 .request(
231 self.get_account()?,
232 GetUpdatesRequestV2 { since_metadata_version: ctx.last_synced },
233 )
234 .await?;
235
236 let empty = updates.file_metadata.is_empty();
237 let (remote, as_of, root) = self.dedup(updates).await?;
238
239 ctx.remote_changes = remote;
240 ctx.update_as_of = as_of;
241 ctx.new_root = root;
242
243 Ok(!empty)
244 }
245
246 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
247 ctx.msg("Updating public key cache...");
248 let mut all_owners = HashSet::new();
249 for file in &ctx.remote_changes {
250 for user_access_key in file.user_access_keys() {
251 all_owners.insert(Owner(user_access_key.encrypted_by));
252 all_owners.insert(Owner(user_access_key.encrypted_for));
253 }
254 }
255
256 let mut new_entries = HashMap::new();
257
258 for owner in all_owners {
259 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
260 let username_result = self
261 .client
262 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
263 .await;
264 let username = match username_result {
265 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
266 "<unknown>".to_string()
267 }
268 _ => username_result?.username.clone(),
269 };
270 new_entries.insert(owner, username.clone());
271 e.insert(username.clone());
272 }
273 }
274
275 let mut tx = self.begin_tx().await;
276 let db = tx.db();
277
278 for (owner, username) in new_entries {
279 db.pub_key_lookup.insert(owner, username)?;
280 }
281 Ok(())
282 }
283
284 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
285 ctx.msg("Fetching documents...");
286 let mut docs_to_pull = vec![];
287
288 let tx = self.ro_tx().await;
289 let db = tx.db();
290 let start = Instant::now();
291
292 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
294 if remote.calculate_deleted(&id)? {
295 continue;
296 }
297 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
298 let base_hmac = remote
299 .tree
300 .base
301 .maybe_find(&id)
302 .and_then(|f| f.document_hmac())
303 .cloned();
304 if base_hmac == remote_hmac {
305 continue;
306 }
307
308 if let Some(remote_hmac) = remote_hmac {
309 docs_to_pull.push((id, remote_hmac));
310 self.events.sync(SyncIncrement::PullingDocument(id, true));
311 }
312 }
313
314 drop(tx);
315 if start.elapsed() > std::time::Duration::from_millis(100) {
316 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
317 }
318
319 let num_docs = docs_to_pull.len();
320 ctx.total += num_docs;
321
322 let futures = docs_to_pull
323 .into_iter()
324 .map(|(id, hmac)| self.fetch_doc(id, hmac));
325
326 let mut stream = stream::iter(futures).buffer_unordered(
327 thread::available_parallelism()
328 .unwrap_or(NonZeroUsize::new(4).unwrap())
329 .into(),
330 );
331
332 let mut idx = 0;
333 while let Some(fut) = stream.next().await {
334 let id = fut?;
335 ctx.pulled_docs.push(id);
336 self.events.sync(SyncIncrement::PullingDocument(id, false));
337 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
338 idx += 1;
339 }
340 Ok(())
341 }
342
343 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
344 let remote_document = self
345 .client
346 .request(self.get_account()?, GetDocRequest { id, hmac })
347 .await?;
348 self.docs
349 .insert(id, Some(hmac), &remote_document.content)
350 .await?;
351
352 Ok(id)
353 }
354
355 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
358 let mut tx = self.begin_tx().await;
359 let db = tx.db();
360 let start = Instant::now();
361
362 let remote_changes = &ctx.remote_changes;
363
364 let me = Owner(self.keychain.get_pk()?);
366
367 let merge_changes = {
369 let mut base = (&db.base_metadata).to_lazy();
371 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
372 let mut remote = remote_unlazy.as_lazy();
373 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
374
375 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
377 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
378 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
379 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
380 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
381
382 'merge_construction: loop {
383 let mut deletions = {
385 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
386
387 let mut deletion_creations = HashSet::new();
389 for id in db.local_metadata.ids() {
390 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
391 deletion_creations.insert(id);
392 }
393 }
394 'drain_creations: while !deletion_creations.is_empty() {
395 'choose_a_creation: for id in &deletion_creations {
396 let id = *id;
398 let local_file = local.find(&id)?.clone();
399 let result = deletions.create_unvalidated(
400 id,
401 symkey::generate_key(),
402 local_file.parent(),
403 &local.name(&id, &self.keychain)?,
404 local_file.file_type(),
405 &self.keychain,
406 );
407 match result {
408 Ok(_) => {
409 deletion_creations.remove(&id);
410 continue 'drain_creations;
411 }
412 Err(ref err) => match err.kind {
413 LbErrKind::FileParentNonexistent => {
414 continue 'choose_a_creation;
415 }
416 _ => {
417 result?;
418 }
419 },
420 }
421 }
422 return Err(LbErrKind::Unexpected(format!(
423 "sync failed to find a topomodelal order for file creations: {deletion_creations:?}"
424 ))
425 .into());
426 }
427
428 for id in db.local_metadata.ids() {
430 let local_file = local.find(&id)?.clone();
431 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
432 if !local_file.explicitly_deleted()
433 && local_file.parent() != base_file.parent()
434 && !files_to_unmove.contains(&id)
435 {
436 deletions.move_unvalidated(
438 &id,
439 local_file.parent(),
440 &self.keychain,
441 )?;
442 }
443 }
444 }
445
446 for id in db.local_metadata.ids() {
448 let local_file = local.find(&id)?.clone();
449 if local_file.explicitly_deleted() {
450 deletions.delete_unvalidated(&id, &self.keychain)?;
452 }
453 }
454 deletions
455 };
456
457 let mut merge = {
459 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
460
461 let mut creations = HashSet::new();
463 for id in db.local_metadata.ids() {
464 if deletions.maybe_find(&id).is_some()
465 && !deletions.calculate_deleted(&id)?
466 && remote.maybe_find(&id).is_none()
467 && !links_to_delete.contains(&id)
468 {
469 creations.insert(id);
470 }
471 }
472 'drain_creations: while !creations.is_empty() {
473 'choose_a_creation: for id in &creations {
474 let id = *id;
476 let local_file = local.find(&id)?.clone();
477 let result = merge.create_unvalidated(
478 id,
479 local.decrypt_key(&id, &self.keychain)?,
480 local_file.parent(),
481 &local.name(&id, &self.keychain)?,
482 local_file.file_type(),
483 &self.keychain,
484 );
485 match result {
486 Ok(_) => {
487 creations.remove(&id);
488 continue 'drain_creations;
489 }
490 Err(ref err) => match err.kind {
491 LbErrKind::FileParentNonexistent => {
492 continue 'choose_a_creation;
493 }
494 _ => {
495 result?;
496 }
497 },
498 }
499 }
500 return Err(LbErrKind::Unexpected(format!(
501 "sync failed to find a topomodelal order for file creations: {creations:?}"
502 ))
503 .into());
504 }
505
506 for id in db.local_metadata.ids() {
509 if deletions.maybe_find(&id).is_none()
511 || deletions.calculate_deleted(&id)?
512 || (remote.maybe_find(&id).is_some()
513 && remote.calculate_deleted(&id)?)
514 {
515 continue;
516 }
517
518 let local_file = local.find(&id)?.clone();
519 let local_name = local.name(&id, &self.keychain)?;
520 let maybe_base_file = base.maybe_find(&id).cloned();
521 let maybe_remote_file = remote.maybe_find(&id).cloned();
522 if let Some(ref base_file) = maybe_base_file {
523 let base_name = base.name(&id, &self.keychain)?;
524 let remote_file = remote.find(&id)?.clone();
525 let remote_name = remote.name(&id, &self.keychain)?;
526
527 if local_file.parent() != base_file.parent()
529 && remote_file.parent() == base_file.parent()
530 && !files_to_unmove.contains(&id)
531 {
532 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
533 }
534
535 if local_name != base_name && remote_name == base_name {
537 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
538 }
539 }
540
541 let mut remote_keys = HashMap::new();
543 if let Some(ref remote_file) = maybe_remote_file {
544 for key in remote_file.user_access_keys() {
545 remote_keys.insert(
546 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
547 (key.mode, key.deleted),
548 );
549 }
550 }
551 for key in local_file.user_access_keys() {
552 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
553 if let Some(&(remote_mode, remote_deleted)) =
554 remote_keys.get(&(by, for_))
555 {
556 if key.mode > remote_mode || !key.deleted && remote_deleted {
558 let mode = match key.mode {
559 UserAccessMode::Read => ShareMode::Read,
560 UserAccessMode::Write => ShareMode::Write,
561 UserAccessMode::Owner => continue,
562 };
563 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
564 }
565 if key.deleted && !remote_deleted {
567 merge.delete_share_unvalidated(
568 &id,
569 Some(for_.0),
570 &self.keychain,
571 )?;
572 }
573 } else {
574 let mode = match key.mode {
576 UserAccessMode::Read => ShareMode::Read,
577 UserAccessMode::Write => ShareMode::Write,
578 UserAccessMode::Owner => continue,
579 };
580 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
581 }
582 }
583
584 if files_to_unshare.contains(&id) {
586 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
587 }
588
589 if let Some(&rename_increment) = rename_increments.get(&id) {
591 let name = NameComponents::from(&local_name)
592 .generate_incremented(rename_increment)
593 .to_name();
594 merge.rename_unvalidated(&id, &name, &self.keychain)?;
595 }
596
597 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
599 let remote_hmac =
600 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
601 let local_hmac = local_file.document_hmac().cloned();
602 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
603 && local_hmac != base_hmac
604 {
605 if remote_hmac != base_hmac && remote_hmac != local_hmac {
606 let merge_name = merge.name(&id, &self.keychain)?;
608 let document_type =
609 DocumentType::from_file_name_using_extension(&merge_name);
610
611 let base_document =
614 self.read_document_helper(id, &mut base).await?;
615 let remote_document =
616 self.read_document_helper(id, &mut remote).await?;
617 let local_document =
618 self.read_document_helper(id, &mut local).await?;
619
620 match document_type {
621 DocumentType::Text => {
622 let base_document =
625 String::from_utf8_lossy(&base_document).to_string();
626 let remote_document =
627 String::from_utf8_lossy(&remote_document).to_string();
628 let local_document =
629 String::from_utf8_lossy(&local_document).to_string();
630 let merged_document = Buffer::from(base_document.as_str())
631 .merge(local_document, remote_document);
632 let encrypted_document = merge
633 .update_document_unvalidated(
634 &id,
635 &merged_document.into_bytes(),
636 &self.keychain,
637 )?;
638 let hmac = merge.find(&id)?.document_hmac().copied();
639 self.docs.insert(id, hmac, &encrypted_document).await?;
640 }
641 DocumentType::Drawing => {
642 let base_document =
643 String::from_utf8_lossy(&base_document).to_string();
644 let remote_document =
645 String::from_utf8_lossy(&remote_document).to_string();
646 let local_document =
647 String::from_utf8_lossy(&local_document).to_string();
648
649 let base_buffer = svg::buffer::Buffer::new(&base_document);
650 let remote_buffer =
651 svg::buffer::Buffer::new(&remote_document);
652 let mut local_buffer =
653 svg::buffer::Buffer::new(&local_document);
654
655 for (_, el) in local_buffer.elements.iter_mut() {
656 if let Element::Path(path) = el {
657 path.data.apply_transform(u_transform_to_bezier(
658 &Transform::from(
659 local_buffer
660 .weak_viewport_settings
661 .master_transform,
662 ),
663 ));
664 }
665 }
666 svg::buffer::Buffer::reload(
667 &mut local_buffer.elements,
668 &mut local_buffer.weak_images,
669 &mut local_buffer.weak_path_pressures,
670 &mut local_buffer.weak_viewport_settings,
671 &base_buffer,
672 &remote_buffer,
673 );
674
675 let merged_document = local_buffer.serialize();
676 let encrypted_document = merge
677 .update_document_unvalidated(
678 &id,
679 &merged_document.into_bytes(),
680 &self.keychain,
681 )?;
682 let hmac = merge.find(&id)?.document_hmac().copied();
683 self.docs.insert(id, hmac, &encrypted_document).await?;
684 }
685 DocumentType::Other => {
686 let merge_parent = *merge.find(&id)?.parent();
688 let duplicate_id = if let Some(&duplicate_id) =
689 duplicate_file_ids.get(&id)
690 {
691 duplicate_id
692 } else {
693 let duplicate_id = Uuid::new_v4();
694 duplicate_file_ids.insert(id, duplicate_id);
695 rename_increments.insert(duplicate_id, 1);
696 duplicate_id
697 };
698
699 let mut merge_name = merge_name;
700 merge_name = NameComponents::from(&merge_name)
701 .generate_incremented(
702 rename_increments
703 .get(&duplicate_id)
704 .copied()
705 .unwrap_or_default(),
706 )
707 .to_name();
708
709 merge.create_unvalidated(
710 duplicate_id,
711 symkey::generate_key(),
712 &merge_parent,
713 &merge_name,
714 FileType::Document,
715 &self.keychain,
716 )?;
717 let encrypted_document = merge
718 .update_document_unvalidated(
719 &duplicate_id,
720 &local_document,
721 &self.keychain,
722 )?;
723 let duplicate_hmac =
724 merge.find(&duplicate_id)?.document_hmac().copied();
725 self.docs
726 .insert(
727 duplicate_id,
728 duplicate_hmac,
729 &encrypted_document,
730 )
731 .await?;
732 }
733 }
734 } else {
735 let document = self.read_document_helper(id, &mut local).await?;
737 merge.update_document_unvalidated(
738 &id,
739 &document,
740 &self.keychain,
741 )?;
742 }
743 }
744 }
745
746 for id in db.local_metadata.ids() {
749 if db.base_metadata.maybe_find(&id).is_some()
750 && deletions.calculate_deleted(&id)?
751 && !merge.calculate_deleted(&id)?
752 {
753 merge.delete_unvalidated(&id, &self.keychain)?;
755 }
756 }
757 for &id in &links_to_delete {
758 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
760 merge.delete_unvalidated(&id, &self.keychain)?;
761 }
762 }
763
764 merge
765 };
766
767 for link in merge.ids() {
769 if !merge.calculate_deleted(&link)? {
770 if let FileType::Link { target } = merge.find(&link)?.file_type() {
771 if merge.maybe_find(&target).is_some()
772 && merge.calculate_deleted(&target)?
773 {
774 if links_to_delete.insert(link) {
776 continue 'merge_construction;
777 } else {
778 return Err(LbErrKind::Unexpected(format!(
779 "sync failed to resolve broken link (deletion): {link:?}"
780 ))
781 .into());
782 }
783 }
784 }
785 }
786 }
787
788 let validate_result = merge.validate(me);
789 match validate_result {
790 Ok(_) => {
792 let (_, merge_changes) = merge.unstage();
793 break merge_changes;
794 }
795 Err(ref err) => match err.kind {
796 LbErrKind::Validation(ref vf) => match vf {
797 ValidationFailure::Cycle(ids) => {
799 let mut progress = false;
801 for &id in ids {
802 if db.local_metadata.maybe_find(&id).is_some()
803 && files_to_unmove.insert(id)
804 {
805 progress = true;
806 }
807 }
808 if !progress {
809 return Err(LbErrKind::Unexpected(format!(
810 "sync failed to resolve cycle: {ids:?}"
811 ))
812 .into());
813 }
814 }
815 ValidationFailure::PathConflict(ids) => {
816 let mut progress = false;
818 for &id in ids {
819 if duplicate_file_ids.values().any(|&dup| dup == id) {
820 *rename_increments.entry(id).or_insert(0) += 1;
821 progress = true;
822 break;
823 }
824 }
825 if !progress {
826 for &id in ids {
827 if db.local_metadata.maybe_find(&id).is_some() {
828 *rename_increments.entry(id).or_insert(0) += 1;
829 progress = true;
830 break;
831 }
832 }
833 }
834 if !progress {
835 return Err(LbErrKind::Unexpected(format!(
836 "sync failed to resolve path conflict: {ids:?}"
837 ))
838 .into());
839 }
840 }
841 ValidationFailure::SharedLink { link, shared_ancestor } => {
842 let mut progress = false;
844 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
845 {
846 if !base_shared_ancestor.is_shared()
847 && files_to_unshare.insert(*shared_ancestor)
848 {
849 progress = true;
850 }
851 }
852 if !progress && links_to_delete.insert(*link) {
853 progress = true;
854 }
855 if !progress {
856 return Err(LbErrKind::Unexpected(format!(
857 "sync failed to resolve shared link: link: {link:?}, shared_ancestor: {shared_ancestor:?}"
858 )).into());
859 }
860 }
861 ValidationFailure::DuplicateLink { target } => {
862 let mut progress = false;
864 if let Some(link) = local.linked_by(target)? {
865 if links_to_delete.insert(link) {
866 progress = true;
867 }
868 }
869 if !progress {
870 return Err(LbErrKind::Unexpected(format!(
871 "sync failed to resolve duplicate link: target: {target:?}"
872 ))
873 .into());
874 }
875 }
876 ValidationFailure::BrokenLink(link) => {
877 if !links_to_delete.insert(*link) {
879 return Err(LbErrKind::Unexpected(format!(
880 "sync failed to resolve broken link: {link:?}"
881 ))
882 .into());
883 }
884 }
885 ValidationFailure::OwnedLink(link) => {
886 let mut progress = false;
888 if let Some(remote_link) = remote.maybe_find(link) {
889 if let FileType::Link { target } = remote_link.file_type() {
890 let remote_target = remote.find(&target)?;
891 if remote_target.owner() != me
892 && files_to_unmove.insert(target)
893 {
894 progress = true;
895 }
896 }
897 }
898 if !progress && links_to_delete.insert(*link) {
899 progress = true;
900 }
901 if !progress {
902 return Err(LbErrKind::Unexpected(format!(
903 "sync failed to resolve owned link: {link:?}"
904 ))
905 .into());
906 }
907 }
908 ValidationFailure::Orphan(_)
910 | ValidationFailure::NonFolderWithChildren(_)
911 | ValidationFailure::FileWithDifferentOwnerParent(_)
912 | ValidationFailure::FileNameTooLong(_)
913 | ValidationFailure::DeletedFileUpdated(_)
914 | ValidationFailure::NonDecryptableFileName(_) => {
915 validate_result?;
916 }
917 },
918 _ => {
920 validate_result?;
921 }
922 },
923 }
924 }
925 };
926
927 (&mut db.base_metadata)
929 .to_staged(remote_changes.clone())
930 .to_lazy()
931 .promote()?;
932 db.local_metadata.clear()?;
933 (&mut db.local_metadata)
934 .to_staged(merge_changes)
935 .to_lazy()
936 .promote()?;
937
938 db.base_metadata.stage(&mut db.local_metadata).prune()?;
941
942 if start.elapsed() > std::time::Duration::from_millis(100) {
943 warn!("sync merge held lock for {:?}", start.elapsed());
944 }
945
946 Ok(())
947 }
948
949 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
951 ctx.msg("Pushing tree changes...");
952 let mut updates = vec![];
953 let mut local_changes_no_digests = Vec::new();
954
955 let tx = self.ro_tx().await;
956 let db = tx.db();
957
958 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
960
961 for id in local.tree.staged.ids() {
962 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
963 let maybe_base_file = local.tree.base.maybe_find(&id);
964
965 local_change.set_hmac_and_size(
967 maybe_base_file.and_then(|f| f.document_hmac().copied()),
968 maybe_base_file.and_then(|f| *f.timestamped_value.value.doc_size()),
969 );
970 let local_change = local_change.sign(&self.keychain)?;
971
972 local_changes_no_digests.push(local_change.clone());
973 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
974 updates.push(file_diff);
975 }
976
977 drop(tx);
978
979 if !updates.is_empty() {
980 self.client
981 .request(self.get_account()?, UpsertRequestV2 { updates: updates.clone() })
982 .await?;
983 ctx.pushed_metas = updates;
984 }
985
986 let mut tx = self.begin_tx().await;
987 let db = tx.db();
988
989 (&mut db.base_metadata)
991 .to_lazy()
992 .stage(local_changes_no_digests)
993 .promote()?;
994 db.base_metadata.stage(&mut db.local_metadata).prune()?;
995
996 tx.end();
997
998 Ok(())
999 }
1000
1001 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
1003 ctx.msg("Pushing document changes...");
1004 let mut updates = vec![];
1005 let mut local_changes_digests_only = vec![];
1006
1007 let tx = self.ro_tx().await;
1008 let db = tx.db();
1009 let start = Instant::now();
1010
1011 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1012
1013 for id in local.tree.staged.ids() {
1014 let base_file = local.tree.base.find(&id)?.clone();
1015
1016 let mut local_change = base_file.timestamped_value.value.clone();
1018 local_change.set_hmac_and_size(
1019 local.find(&id)?.document_hmac().copied(),
1020 *local.find(&id)?.timestamped_value.value.doc_size(),
1021 );
1022
1023 if base_file.document_hmac() == local_change.document_hmac()
1024 || local_change.document_hmac().is_none()
1025 {
1026 continue;
1027 }
1028
1029 let local_change = local_change.sign(&self.keychain)?;
1030
1031 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1032 local_changes_digests_only.push(local_change);
1033 self.events.sync(SyncIncrement::PushingDocument(id, true));
1034 }
1035
1036 drop(tx);
1037 if start.elapsed() > std::time::Duration::from_millis(100) {
1038 warn!("sync push_docs held lock for {:?}", start.elapsed());
1039 }
1040
1041 let docs_count = updates.len();
1042 ctx.total += docs_count;
1043 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1044
1045 let mut stream = stream::iter(futures).buffer_unordered(
1046 thread::available_parallelism()
1047 .unwrap_or(NonZeroUsize::new(4).unwrap())
1048 .into(),
1049 );
1050
1051 let mut idx = 0;
1052 while let Some(fut) = stream.next().await {
1053 let id = fut?;
1054 self.events.sync(SyncIncrement::PushingDocument(id, false));
1055 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1056 idx += 1;
1057 }
1058 ctx.pushed_docs = updates;
1059
1060 let mut tx = self.begin_tx().await;
1061 let db = tx.db();
1062 (&mut db.base_metadata)
1064 .to_lazy()
1065 .stage(local_changes_digests_only)
1066 .promote()?;
1067
1068 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1069
1070 tx.end();
1071
1072 Ok(())
1073 }
1074
1075 async fn push_doc(&self, diff: FileDiff<SignedMeta>) -> LbResult<Uuid> {
1076 let id = *diff.new.id();
1077 let hmac = diff.new.document_hmac();
1078 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1079 self.client
1080 .request(
1081 self.get_account()?,
1082 ChangeDocRequestV2 { diff, new_content: local_document_change },
1083 )
1084 .await?;
1085
1086 Ok(id)
1087 }
1088
1089 async fn dedup(
1090 &self, updates: GetUpdatesResponseV2,
1091 ) -> LbResult<(Vec<SignedMeta>, u64, Option<Uuid>)> {
1092 let tx = self.ro_tx().await;
1093 let db = tx.db();
1094
1095 let mut root_id = None;
1096 let (remote_changes, update_as_of) = {
1097 let mut remote_changes = updates.file_metadata;
1098 let update_as_of = updates.as_of_metadata_version;
1099
1100 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1101
1102 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1103
1104 let (_, remote_changes) = remote.unstage();
1105 (remote_changes, update_as_of)
1106 };
1107
1108 if db.root.get().is_none() {
1110 let root = remote_changes
1111 .all_files()?
1112 .into_iter()
1113 .find(|f| f.is_root())
1114 .ok_or(LbErrKind::RootNonexistent)?;
1115 root_id = Some(*root.id());
1116 }
1117
1118 Ok((remote_changes, update_as_of, root_id))
1119 }
1120
1121 async fn prune_remote_orphans(
1122 &self, remote_changes: Vec<SignedMeta>,
1123 ) -> LbResult<Vec<SignedMeta>> {
1124 let tx = self.ro_tx().await;
1125 let db = tx.db();
1126
1127 let me = Owner(self.keychain.get_pk()?);
1128 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1129 let mut result = Vec::new();
1130
1131 for id in remote.tree.staged.ids() {
1132 let meta = remote.find(&id)?;
1133 if remote.maybe_find_parent(meta).is_some()
1134 || meta
1135 .user_access_keys()
1136 .iter()
1137 .any(|k| k.encrypted_for == me.0)
1138 {
1139 result.push(remote.find(&id)?.clone()); }
1141 }
1142 Ok(result)
1143 }
1144
1145 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1146 ctx.msg("Cleaning up...");
1147 let mut tx = self.begin_tx().await;
1148 let db = tx.db();
1149 db.last_synced.insert(ctx.update_as_of as i64)?;
1150
1151 if let Some(root) = ctx.new_root {
1152 db.root.insert(root)?;
1153 }
1154
1155 Ok(())
1156 }
1157
1158 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1159 let tx = self.ro_tx().await;
1160 let db = tx.db();
1161 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1162
1163 Ok(self.get_timestamp_human_string(last_synced))
1164 }
1165
1166 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1167 if timestamp != 0 {
1168 Duration::milliseconds(clock::get_time().0 - timestamp)
1169 .format_human()
1170 .to_string()
1171 } else {
1172 "never".to_string()
1173 }
1174 }
1175}
1176
1177impl SyncContext {
1178 fn summarize(&self) -> SyncStatus {
1179 let mut local = HashSet::new();
1180 let mut server = HashSet::new();
1181 let mut work_units = vec![];
1182
1183 for meta in &self.pushed_metas {
1184 local.insert(meta.new.id());
1185 }
1186
1187 for meta in &self.pushed_docs {
1188 local.insert(meta.new.id());
1189 }
1190
1191 for meta in &self.remote_changes {
1192 server.insert(meta.id());
1193 }
1194
1195 for id in local {
1196 work_units.push(WorkUnit::LocalChange(*id));
1197 }
1198
1199 for id in server {
1200 work_units.push(WorkUnit::ServerChange(*id));
1201 }
1202
1203 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1204 }
1205
1206 fn msg(&mut self, msg: &str) {
1207 self.current += 1;
1208 if let Some(f) = &self.progress {
1209 f(SyncProgress {
1210 total: self.total,
1211 progress: self.current,
1212 file_being_processed: Default::default(),
1213 msg: msg.to_string(),
1214 })
1215 }
1216 }
1217
1218 fn file_msg(&mut self, id: Uuid, msg: &str) {
1219 self.current += 1;
1220 if let Some(f) = &self.progress {
1221 f(SyncProgress {
1222 total: self.total,
1223 progress: self.current,
1224 file_being_processed: Some(id),
1225 msg: msg.to_string(),
1226 })
1227 }
1228 }
1229
1230 fn done_msg(&mut self) {
1231 self.current = self.total;
1232 if let Some(f) = &self.progress {
1233 f(SyncProgress {
1234 total: self.total,
1235 progress: self.current,
1236 file_being_processed: None,
1237 msg: "Sync successful!".to_string(),
1238 })
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Serialize, Clone)]
1244pub struct SyncStatus {
1245 pub work_units: Vec<WorkUnit>,
1246 pub latest_server_ts: u64,
1247}
1248
1249#[derive(Clone)]
1250pub struct SyncProgress {
1251 pub total: usize,
1252 pub progress: usize,
1253 pub file_being_processed: Option<Uuid>,
1254 pub msg: String,
1255}
1256
1257impl Display for SyncProgress {
1258 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1259 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1260 }
1261}
1262
1263#[derive(Debug, Clone)]
1264pub enum SyncIncrement {
1265 SyncStarted,
1266 PullingDocument(Uuid, bool),
1267 PushingDocument(Uuid, bool),
1268 SyncFinished(Option<LbErrKind>),
1269}