1use crate::io::network::ApiError;
2use crate::model::access_info::UserAccessMode;
3use crate::model::api::{
4 ChangeDocRequest, GetDocRequest, GetFileIdsRequest, GetUpdatesRequest, GetUpdatesResponse,
5 GetUsernameError, GetUsernameRequest, UpsertRequest,
6};
7use crate::model::errors::{LbErrKind, LbResult};
8use crate::model::file::ShareMode;
9use crate::model::file_like::FileLike;
10use crate::model::file_metadata::{DocumentHmac, FileDiff, FileType, Owner};
11use crate::model::filename::{DocumentType, NameComponents};
12use crate::model::signed_file::SignedFile;
13use crate::model::staged::StagedTreeLikeMut;
14use crate::model::svg::buffer::u_transform_to_bezier;
15use crate::model::svg::element::Element;
16use crate::model::text::buffer::Buffer;
17use crate::model::tree_like::TreeLike;
18use crate::model::work_unit::WorkUnit;
19use crate::model::{clock, svg};
20use crate::model::{symkey, ValidationFailure};
21use crate::Lb;
22pub use basic_human_duration::ChronoHumanDuration;
23use futures::stream;
24use futures::StreamExt;
25use serde::Serialize;
26use std::collections::{hash_map, HashMap, HashSet};
27use std::fmt::{Display, Formatter};
28use std::num::NonZeroUsize;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31use std::thread;
32use std::time::Instant;
33use time::Duration;
34use uuid::Uuid;
35
36pub type SyncFlag = Arc<AtomicBool>;
37
38pub struct SyncContext {
39 progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
40 current: usize,
41 total: usize,
42
43 pk_cache: HashMap<Owner, String>,
44 last_synced: u64,
45 remote_changes: Vec<SignedFile>,
46 update_as_of: u64,
47 root: Option<Uuid>,
48 pushed_metas: Vec<FileDiff<SignedFile>>,
49 pushed_docs: Vec<FileDiff<SignedFile>>,
50 pulled_docs: Vec<Uuid>,
51}
52
53impl Lb {
54 #[instrument(level = "debug", skip_all, err(Debug))]
55 pub async fn calculate_work(&self) -> LbResult<SyncStatus> {
56 let tx = self.ro_tx().await;
57 let db = tx.db();
58 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
59 drop(tx);
60
61 let remote_changes = self
62 .client
63 .request(self.get_account()?, GetUpdatesRequest { since_metadata_version: last_synced })
64 .await?;
65 let (deduped, latest_server_ts, _) = self.dedup(remote_changes).await?;
66 let remote_dirty = deduped
67 .into_iter()
68 .map(|f| *f.id())
69 .map(WorkUnit::ServerChange);
70
71 self.prune().await?;
72
73 let tx = self.ro_tx().await;
74 let db = tx.db();
75
76 let locally_dirty = db
77 .local_metadata
78 .get()
79 .keys()
80 .copied()
81 .map(WorkUnit::LocalChange);
82
83 let mut work_units: Vec<WorkUnit> = Vec::new();
84 work_units.extend(locally_dirty.chain(remote_dirty));
85 Ok(SyncStatus { work_units, latest_server_ts })
86 }
87
88 #[instrument(level = "debug", skip_all, err(Debug))]
89 pub async fn sync(&self, f: Option<Box<dyn Fn(SyncProgress) + Send>>) -> LbResult<SyncStatus> {
90 let old = self.syncing.swap(true, Ordering::SeqCst);
91 if old {
92 return Err(LbErrKind::AlreadySyncing.into());
93 }
94
95 let mut ctx = self.setup_sync(f).await?;
96
97 let mut got_updates = false;
98 let mut pipeline: LbResult<()> = async {
99 ctx.msg("Preparing Sync..."); self.events.sync(SyncIncrement::SyncStarted);
101 self.prune().await?;
102 got_updates = self.fetch_meta(&mut ctx).await?;
103 self.populate_pk_cache(&mut ctx).await?;
104 self.docs.dont_delete.store(true, Ordering::SeqCst);
105 self.fetch_docs(&mut ctx).await?;
106 self.merge(&mut ctx).await?;
107 self.push_meta(&mut ctx).await?;
108 self.push_docs(&mut ctx).await?;
109 Ok(())
110 }
111 .await;
112
113 self.docs.dont_delete.store(false, Ordering::SeqCst);
114
115 if pipeline.is_ok() {
116 pipeline = self.commit_last_synced(&mut ctx).await;
117 }
118
119 let cleanup = self.cleanup().await;
120
121 let ekind = pipeline.as_ref().err().map(|err| err.kind.clone());
122 self.events.sync(SyncIncrement::SyncFinished(ekind));
123
124 self.syncing.store(false, Ordering::Relaxed);
125 pipeline?;
126 cleanup?;
127
128 ctx.done_msg();
130
131 if got_updates {
132 self.events.meta_changed();
134 for id in &ctx.pulled_docs {
135 self.events.doc_written(*id);
136 }
137 }
138
139 Ok(ctx.summarize())
140 }
141
142 async fn setup_sync(
143 &self, progress: Option<Box<dyn Fn(SyncProgress) + Send>>,
144 ) -> LbResult<SyncContext> {
145 let tx = self.ro_tx().await;
146 let db = tx.db();
147
148 let last_synced = db.last_synced.get().copied().unwrap_or_default() as u64;
149 let pk_cache = db.pub_key_lookup.get().clone();
150
151 let current = 0;
152 let total = 7;
153
154 Ok(SyncContext {
155 last_synced,
156 pk_cache,
157
158 progress,
159 current,
160 total,
161
162 root: Default::default(),
163 update_as_of: Default::default(),
164 remote_changes: Default::default(),
165 pushed_docs: Default::default(),
166 pushed_metas: Default::default(),
167 pulled_docs: Default::default(),
168 })
169 }
170
171 async fn prune(&self) -> LbResult<()> {
172 let server_ids = self
173 .client
174 .request(self.get_account()?, GetFileIdsRequest {})
175 .await?
176 .ids;
177
178 let mut tx = self.begin_tx().await;
179 let db = tx.db();
180
181 let mut local = db.base_metadata.stage(&db.local_metadata).to_lazy();
182 let base_ids = local.tree.base.ids();
183
184 let mut prunable_ids = base_ids;
185 prunable_ids.retain(|id| !server_ids.contains(id));
186 for id in prunable_ids.clone() {
187 prunable_ids.extend(local.descendants(&id)?.into_iter());
188 }
189 for id in &prunable_ids {
190 if let Some(base_file) = local.tree.base.maybe_find(id) {
191 self.docs
192 .delete(*id, base_file.document_hmac().copied())
193 .await?;
194 }
195 if let Some(local_file) = local.maybe_find(id) {
196 self.docs
197 .delete(*id, local_file.document_hmac().copied())
198 .await?;
199 }
200 }
201
202 let mut base_staged = (&mut db.base_metadata).to_lazy().stage(None);
203 base_staged.tree.removed = prunable_ids.clone().into_iter().collect();
204 base_staged.promote()?;
205
206 let mut local_staged = (&mut db.local_metadata).to_lazy().stage(None);
207 local_staged.tree.removed = prunable_ids.into_iter().collect();
208 local_staged.promote()?;
209
210 Ok(())
211 }
212
213 async fn fetch_meta(&self, ctx: &mut SyncContext) -> LbResult<bool> {
215 ctx.msg("Fetching tree updates...");
216 let updates = self
217 .client
218 .request(
219 self.get_account()?,
220 GetUpdatesRequest { since_metadata_version: ctx.last_synced },
221 )
222 .await?;
223
224 let empty = updates.file_metadata.is_empty();
225 let (remote, as_of, root) = self.dedup(updates).await?;
226
227 ctx.remote_changes = remote;
228 ctx.update_as_of = as_of;
229 ctx.root = root;
230
231 Ok(!empty)
232 }
233
234 async fn populate_pk_cache(&self, ctx: &mut SyncContext) -> LbResult<()> {
235 ctx.msg("Updating public key cache...");
236 let mut all_owners = HashSet::new();
237 for file in &ctx.remote_changes {
238 for user_access_key in file.user_access_keys() {
239 all_owners.insert(Owner(user_access_key.encrypted_by));
240 all_owners.insert(Owner(user_access_key.encrypted_for));
241 }
242 }
243
244 let mut new_entries = HashMap::new();
245
246 for owner in all_owners {
247 if let hash_map::Entry::Vacant(e) = ctx.pk_cache.entry(owner) {
248 let username_result = self
249 .client
250 .request(self.get_account()?, GetUsernameRequest { key: owner.0 })
251 .await;
252 let username = match username_result {
253 Err(ApiError::Endpoint(GetUsernameError::UserNotFound)) => {
254 "<unknown>".to_string()
255 }
256 _ => username_result?.username.clone(),
257 };
258 new_entries.insert(owner, username.clone());
259 e.insert(username.clone());
260 }
261 }
262
263 let mut tx = self.begin_tx().await;
264 let db = tx.db();
265
266 for (owner, username) in new_entries {
267 db.pub_key_lookup.insert(owner, username)?;
268 }
269 Ok(())
270 }
271
272 async fn fetch_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
273 ctx.msg("Fetching documents...");
274 let mut docs_to_pull = vec![];
275
276 let tx = self.ro_tx().await;
277 let db = tx.db();
278 let start = Instant::now();
279
280 let mut remote = db.base_metadata.stage(ctx.remote_changes.clone()).to_lazy(); for id in remote.tree.staged.ids() {
282 if remote.calculate_deleted(&id)? {
283 continue;
284 }
285 let remote_hmac = remote.find(&id)?.document_hmac().cloned();
286 let base_hmac = remote
287 .tree
288 .base
289 .maybe_find(&id)
290 .and_then(|f| f.document_hmac())
291 .cloned();
292 if base_hmac == remote_hmac {
293 continue;
294 }
295
296 if let Some(remote_hmac) = remote_hmac {
297 docs_to_pull.push((id, remote_hmac));
298 self.events.sync(SyncIncrement::PullingDocument(id, true));
299 }
300 }
301
302 drop(tx);
303 if start.elapsed() > std::time::Duration::from_millis(100) {
304 warn!("sync fetch_docs held lock for {:?}", start.elapsed());
305 }
306
307 let num_docs = docs_to_pull.len();
308 ctx.total += num_docs;
309
310 let futures = docs_to_pull
311 .into_iter()
312 .map(|(id, hmac)| self.fetch_doc(id, hmac));
313
314 let mut stream = stream::iter(futures).buffer_unordered(
315 thread::available_parallelism()
316 .unwrap_or(NonZeroUsize::new(4).unwrap())
317 .into(),
318 );
319
320 let mut idx = 0;
321 while let Some(fut) = stream.next().await {
322 let id = fut?;
323 ctx.pulled_docs.push(id);
324 self.events.sync(SyncIncrement::PullingDocument(id, false));
325 ctx.file_msg(id, &format!("Downloaded file {idx} of {num_docs}."));
326 idx += 1;
327 }
328 Ok(())
329 }
330
331 async fn fetch_doc(&self, id: Uuid, hmac: DocumentHmac) -> LbResult<Uuid> {
332 let remote_document = self
333 .client
334 .request(self.get_account()?, GetDocRequest { id, hmac })
335 .await?;
336 self.docs
337 .insert(id, Some(hmac), &remote_document.content)
338 .await?;
339
340 Ok(id)
341 }
342
343 async fn merge(&self, ctx: &mut SyncContext) -> LbResult<()> {
346 let mut tx = self.begin_tx().await;
347 let db = tx.db();
348 let start = Instant::now();
349
350 let remote_changes = &ctx.remote_changes;
351
352 let me = Owner(self.keychain.get_pk()?);
354
355 let merge_changes = {
357 let mut base = (&db.base_metadata).to_lazy();
359 let remote_unlazy = (&db.base_metadata).to_staged(remote_changes);
360 let mut remote = remote_unlazy.as_lazy();
361 let mut local = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
362
363 let mut files_to_unmove: HashSet<Uuid> = HashSet::new();
365 let mut files_to_unshare: HashSet<Uuid> = HashSet::new();
366 let mut links_to_delete: HashSet<Uuid> = HashSet::new();
367 let mut rename_increments: HashMap<Uuid, usize> = HashMap::new();
368 let mut duplicate_file_ids: HashMap<Uuid, Uuid> = HashMap::new();
369
370 'merge_construction: loop {
371 let mut deletions = {
373 let mut deletions = remote_unlazy.stage(Vec::new()).to_lazy();
374
375 let mut deletion_creations = HashSet::new();
377 for id in db.local_metadata.ids() {
378 if remote.maybe_find(&id).is_none() && !links_to_delete.contains(&id) {
379 deletion_creations.insert(id);
380 }
381 }
382 'drain_creations: while !deletion_creations.is_empty() {
383 'choose_a_creation: for id in &deletion_creations {
384 let id = *id;
386 let local_file = local.find(&id)?.clone();
387 let result = deletions.create_unvalidated(
388 id,
389 symkey::generate_key(),
390 local_file.parent(),
391 &local.name(&id, &self.keychain)?,
392 local_file.file_type(),
393 &self.keychain,
394 );
395 match result {
396 Ok(_) => {
397 deletion_creations.remove(&id);
398 continue 'drain_creations;
399 }
400 Err(ref err) => match err.kind {
401 LbErrKind::FileParentNonexistent => {
402 continue 'choose_a_creation;
403 }
404 _ => {
405 result?;
406 }
407 },
408 }
409 }
410 return Err(LbErrKind::Unexpected(format!(
411 "sync failed to find a topomodelal order for file creations: {:?}",
412 deletion_creations
413 ))
414 .into());
415 }
416
417 for id in db.local_metadata.ids() {
419 let local_file = local.find(&id)?.clone();
420 if let Some(base_file) = db.base_metadata.maybe_find(&id).cloned() {
421 if !local_file.explicitly_deleted()
422 && local_file.parent() != base_file.parent()
423 && !files_to_unmove.contains(&id)
424 {
425 deletions.move_unvalidated(
427 &id,
428 local_file.parent(),
429 &self.keychain,
430 )?;
431 }
432 }
433 }
434
435 for id in db.local_metadata.ids() {
437 let local_file = local.find(&id)?.clone();
438 if local_file.explicitly_deleted() {
439 deletions.delete_unvalidated(&id, &self.keychain)?;
441 }
442 }
443 deletions
444 };
445
446 let mut merge = {
448 let mut merge = remote_unlazy.stage(Vec::new()).to_lazy();
449
450 let mut creations = HashSet::new();
452 for id in db.local_metadata.ids() {
453 if deletions.maybe_find(&id).is_some()
454 && !deletions.calculate_deleted(&id)?
455 && remote.maybe_find(&id).is_none()
456 && !links_to_delete.contains(&id)
457 {
458 creations.insert(id);
459 }
460 }
461 'drain_creations: while !creations.is_empty() {
462 'choose_a_creation: for id in &creations {
463 let id = *id;
465 let local_file = local.find(&id)?.clone();
466 let result = merge.create_unvalidated(
467 id,
468 local.decrypt_key(&id, &self.keychain)?,
469 local_file.parent(),
470 &local.name(&id, &self.keychain)?,
471 local_file.file_type(),
472 &self.keychain,
473 );
474 match result {
475 Ok(_) => {
476 creations.remove(&id);
477 continue 'drain_creations;
478 }
479 Err(ref err) => match err.kind {
480 LbErrKind::FileParentNonexistent => {
481 continue 'choose_a_creation;
482 }
483 _ => {
484 result?;
485 }
486 },
487 }
488 }
489 return Err(LbErrKind::Unexpected(format!(
490 "sync failed to find a topomodelal order for file creations: {:?}",
491 creations
492 ))
493 .into());
494 }
495
496 for id in db.local_metadata.ids() {
499 if deletions.maybe_find(&id).is_none()
501 || deletions.calculate_deleted(&id)?
502 || (remote.maybe_find(&id).is_some()
503 && remote.calculate_deleted(&id)?)
504 {
505 continue;
506 }
507
508 let local_file = local.find(&id)?.clone();
509 let local_name = local.name(&id, &self.keychain)?;
510 let maybe_base_file = base.maybe_find(&id).cloned();
511 let maybe_remote_file = remote.maybe_find(&id).cloned();
512 if let Some(ref base_file) = maybe_base_file {
513 let base_name = base.name(&id, &self.keychain)?;
514 let remote_file = remote.find(&id)?.clone();
515 let remote_name = remote.name(&id, &self.keychain)?;
516
517 if local_file.parent() != base_file.parent()
519 && remote_file.parent() == base_file.parent()
520 && !files_to_unmove.contains(&id)
521 {
522 merge.move_unvalidated(&id, local_file.parent(), &self.keychain)?;
523 }
524
525 if local_name != base_name && remote_name == base_name {
527 merge.rename_unvalidated(&id, &local_name, &self.keychain)?;
528 }
529 }
530
531 let mut remote_keys = HashMap::new();
533 if let Some(ref remote_file) = maybe_remote_file {
534 for key in remote_file.user_access_keys() {
535 remote_keys.insert(
536 (Owner(key.encrypted_by), Owner(key.encrypted_for)),
537 (key.mode, key.deleted),
538 );
539 }
540 }
541 for key in local_file.user_access_keys() {
542 let (by, for_) = (Owner(key.encrypted_by), Owner(key.encrypted_for));
543 if let Some(&(remote_mode, remote_deleted)) =
544 remote_keys.get(&(by, for_))
545 {
546 if key.mode > remote_mode || !key.deleted && remote_deleted {
548 let mode = match key.mode {
549 UserAccessMode::Read => ShareMode::Read,
550 UserAccessMode::Write => ShareMode::Write,
551 UserAccessMode::Owner => continue,
552 };
553 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
554 }
555 if key.deleted && !remote_deleted {
557 merge.delete_share_unvalidated(
558 &id,
559 Some(for_.0),
560 &self.keychain,
561 )?;
562 }
563 } else {
564 let mode = match key.mode {
566 UserAccessMode::Read => ShareMode::Read,
567 UserAccessMode::Write => ShareMode::Write,
568 UserAccessMode::Owner => continue,
569 };
570 merge.add_share_unvalidated(id, for_, mode, &self.keychain)?;
571 }
572 }
573
574 if files_to_unshare.contains(&id) {
576 merge.delete_share_unvalidated(&id, None, &self.keychain)?;
577 }
578
579 if let Some(&rename_increment) = rename_increments.get(&id) {
581 let name = NameComponents::from(&local_name)
582 .generate_incremented(rename_increment)
583 .to_name();
584 merge.rename_unvalidated(&id, &name, &self.keychain)?;
585 }
586
587 let base_hmac = maybe_base_file.and_then(|f| f.document_hmac().cloned());
589 let remote_hmac =
590 maybe_remote_file.and_then(|f| f.document_hmac().cloned());
591 let local_hmac = local_file.document_hmac().cloned();
592 if merge.access_mode(me, &id)? >= Some(UserAccessMode::Write)
593 && local_hmac != base_hmac
594 {
595 if remote_hmac != base_hmac && remote_hmac != local_hmac {
596 let merge_name = merge.name(&id, &self.keychain)?;
598 let document_type =
599 DocumentType::from_file_name_using_extension(&merge_name);
600
601 let base_document =
604 self.read_document_helper(id, &mut base).await?;
605 let remote_document =
606 self.read_document_helper(id, &mut remote).await?;
607 let local_document =
608 self.read_document_helper(id, &mut local).await?;
609
610 match document_type {
611 DocumentType::Text => {
612 let base_document =
615 String::from_utf8_lossy(&base_document).to_string();
616 let remote_document =
617 String::from_utf8_lossy(&remote_document).to_string();
618 let local_document =
619 String::from_utf8_lossy(&local_document).to_string();
620 let merged_document = Buffer::from(base_document.as_str())
621 .merge(local_document, remote_document);
622 let encrypted_document = merge
623 .update_document_unvalidated(
624 &id,
625 &merged_document.into_bytes(),
626 &self.keychain,
627 )?;
628 let hmac = merge.find(&id)?.document_hmac().copied();
629 self.docs.insert(id, hmac, &encrypted_document).await?;
630 }
631 DocumentType::Drawing => {
632 println!("sync merge");
633 let base_document =
634 String::from_utf8_lossy(&base_document).to_string();
635 let remote_document =
636 String::from_utf8_lossy(&remote_document).to_string();
637 let local_document =
638 String::from_utf8_lossy(&local_document).to_string();
639
640 let base_buffer = svg::buffer::Buffer::new(&base_document);
641 let remote_buffer =
642 svg::buffer::Buffer::new(&remote_document);
643 let mut local_buffer =
644 svg::buffer::Buffer::new(&local_document);
645
646 for (_, el) in local_buffer.elements.iter_mut() {
647 if let Element::Path(path) = el {
648 path.data.apply_transform(u_transform_to_bezier(
649 &local_buffer.master_transform,
650 ));
651 }
652 }
653 svg::buffer::Buffer::reload(
654 &mut local_buffer.elements,
655 &mut local_buffer.weak_images,
656 local_buffer.master_transform,
657 &base_buffer,
658 &remote_buffer,
659 );
660
661 let merged_document = local_buffer.serialize();
662 let encrypted_document = merge
663 .update_document_unvalidated(
664 &id,
665 &merged_document.into_bytes(),
666 &self.keychain,
667 )?;
668 let hmac = merge.find(&id)?.document_hmac().copied();
669 self.docs.insert(id, hmac, &encrypted_document).await?;
670 }
671 DocumentType::Other => {
672 let merge_parent = *merge.find(&id)?.parent();
674 let duplicate_id = if let Some(&duplicate_id) =
675 duplicate_file_ids.get(&id)
676 {
677 duplicate_id
678 } else {
679 let duplicate_id = Uuid::new_v4();
680 duplicate_file_ids.insert(id, duplicate_id);
681 rename_increments.insert(duplicate_id, 1);
682 duplicate_id
683 };
684
685 let mut merge_name = merge_name;
686 merge_name = NameComponents::from(&merge_name)
687 .generate_incremented(
688 rename_increments
689 .get(&duplicate_id)
690 .copied()
691 .unwrap_or_default(),
692 )
693 .to_name();
694
695 merge.create_unvalidated(
696 duplicate_id,
697 symkey::generate_key(),
698 &merge_parent,
699 &merge_name,
700 FileType::Document,
701 &self.keychain,
702 )?;
703 let encrypted_document = merge
704 .update_document_unvalidated(
705 &duplicate_id,
706 &local_document,
707 &self.keychain,
708 )?;
709 let duplicate_hmac =
710 merge.find(&duplicate_id)?.document_hmac().copied();
711 self.docs
712 .insert(
713 duplicate_id,
714 duplicate_hmac,
715 &encrypted_document,
716 )
717 .await?;
718 }
719 }
720 } else {
721 let document = self.read_document_helper(id, &mut local).await?;
723 merge.update_document_unvalidated(
724 &id,
725 &document,
726 &self.keychain,
727 )?;
728 }
729 }
730 }
731
732 for id in db.local_metadata.ids() {
735 if db.base_metadata.maybe_find(&id).is_some()
736 && deletions.calculate_deleted(&id)?
737 && !merge.calculate_deleted(&id)?
738 {
739 merge.delete_unvalidated(&id, &self.keychain)?;
741 }
742 }
743 for &id in &links_to_delete {
744 if merge.maybe_find(&id).is_some() && !merge.calculate_deleted(&id)? {
746 merge.delete_unvalidated(&id, &self.keychain)?;
747 }
748 }
749
750 merge
751 };
752
753 for link in merge.ids() {
755 if !merge.calculate_deleted(&link)? {
756 if let FileType::Link { target } = merge.find(&link)?.file_type() {
757 if merge.maybe_find(&target).is_some()
758 && merge.calculate_deleted(&target)?
759 {
760 if links_to_delete.insert(link) {
762 continue 'merge_construction;
763 } else {
764 return Err(LbErrKind::Unexpected(format!(
765 "sync failed to resolve broken link (deletion): {:?}",
766 link
767 ))
768 .into());
769 }
770 }
771 }
772 }
773 }
774
775 let validate_result = merge.validate(me);
776 match validate_result {
777 Ok(_) => {
779 let (_, merge_changes) = merge.unstage();
780 break merge_changes;
781 }
782 Err(ref err) => match err.kind {
783 LbErrKind::Validation(ref vf) => match vf {
784 ValidationFailure::Cycle(ids) => {
786 let mut progress = false;
788 for &id in ids {
789 if db.local_metadata.maybe_find(&id).is_some()
790 && files_to_unmove.insert(id)
791 {
792 progress = true;
793 }
794 }
795 if !progress {
796 return Err(LbErrKind::Unexpected(format!(
797 "sync failed to resolve cycle: {:?}",
798 ids
799 ))
800 .into());
801 }
802 }
803 ValidationFailure::PathConflict(ids) => {
804 let mut progress = false;
806 for &id in ids {
807 if duplicate_file_ids.values().any(|&dup| dup == id) {
808 *rename_increments.entry(id).or_insert(0) += 1;
809 progress = true;
810 break;
811 }
812 }
813 if !progress {
814 for &id in ids {
815 if db.local_metadata.maybe_find(&id).is_some() {
816 *rename_increments.entry(id).or_insert(0) += 1;
817 progress = true;
818 break;
819 }
820 }
821 }
822 if !progress {
823 return Err(LbErrKind::Unexpected(format!(
824 "sync failed to resolve path conflict: {:?}",
825 ids
826 ))
827 .into());
828 }
829 }
830 ValidationFailure::SharedLink { link, shared_ancestor } => {
831 let mut progress = false;
833 if let Some(base_shared_ancestor) = base.maybe_find(shared_ancestor)
834 {
835 if !base_shared_ancestor.is_shared()
836 && files_to_unshare.insert(*shared_ancestor)
837 {
838 progress = true;
839 }
840 }
841 if !progress && links_to_delete.insert(*link) {
842 progress = true;
843 }
844 if !progress {
845 return Err(LbErrKind::Unexpected(format!(
846 "sync failed to resolve shared link: link: {:?}, shared_ancestor: {:?}",
847 link, shared_ancestor
848 )).into());
849 }
850 }
851 ValidationFailure::DuplicateLink { target } => {
852 let mut progress = false;
854 if let Some(link) = local.linked_by(target)? {
855 if links_to_delete.insert(link) {
856 progress = true;
857 }
858 }
859 if !progress {
860 return Err(LbErrKind::Unexpected(format!(
861 "sync failed to resolve duplicate link: target: {:?}",
862 target
863 ))
864 .into());
865 }
866 }
867 ValidationFailure::BrokenLink(link) => {
868 if !links_to_delete.insert(*link) {
870 return Err(LbErrKind::Unexpected(format!(
871 "sync failed to resolve broken link: {:?}",
872 link
873 ))
874 .into());
875 }
876 }
877 ValidationFailure::OwnedLink(link) => {
878 let mut progress = false;
880 if let Some(remote_link) = remote.maybe_find(link) {
881 if let FileType::Link { target } = remote_link.file_type() {
882 let remote_target = remote.find(&target)?;
883 if remote_target.owner() != me
884 && files_to_unmove.insert(target)
885 {
886 progress = true;
887 }
888 }
889 }
890 if !progress && links_to_delete.insert(*link) {
891 progress = true;
892 }
893 if !progress {
894 return Err(LbErrKind::Unexpected(format!(
895 "sync failed to resolve owned link: {:?}",
896 link
897 ))
898 .into());
899 }
900 }
901 ValidationFailure::Orphan(_)
903 | ValidationFailure::NonFolderWithChildren(_)
904 | ValidationFailure::FileWithDifferentOwnerParent(_)
905 | ValidationFailure::FileNameTooLong(_)
906 | ValidationFailure::DeletedFileUpdated(_)
907 | ValidationFailure::NonDecryptableFileName(_) => {
908 validate_result?;
909 }
910 },
911 _ => {
913 validate_result?;
914 }
915 },
916 }
917 }
918 };
919
920 (&mut db.base_metadata)
922 .to_staged(remote_changes.clone())
923 .to_lazy()
924 .promote()?;
925 db.local_metadata.clear()?;
926 (&mut db.local_metadata)
927 .to_staged(merge_changes)
928 .to_lazy()
929 .promote()?;
930
931 db.base_metadata.stage(&mut db.local_metadata).prune()?;
934
935 if start.elapsed() > std::time::Duration::from_millis(100) {
936 warn!("sync merge held lock for {:?}", start.elapsed());
937 }
938
939 Ok(())
940 }
941
942 async fn push_meta(&self, ctx: &mut SyncContext) -> LbResult<()> {
944 ctx.msg("Pushing tree changes...");
945 let mut updates = vec![];
946 let mut local_changes_no_digests = Vec::new();
947
948 let tx = self.ro_tx().await;
949 let db = tx.db();
950
951 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
953
954 for id in local.tree.staged.ids() {
955 let mut local_change = local.tree.staged.find(&id)?.timestamped_value.value.clone();
956 let maybe_base_file = local.tree.base.maybe_find(&id);
957
958 local_change.document_hmac =
960 maybe_base_file.and_then(|f| f.timestamped_value.value.document_hmac);
961 let local_change = local_change.sign(&self.keychain)?;
962
963 local_changes_no_digests.push(local_change.clone());
964 let file_diff = FileDiff { old: maybe_base_file.cloned(), new: local_change };
965 updates.push(file_diff);
966 }
967
968 drop(tx);
969
970 if !updates.is_empty() {
971 self.client
972 .request(self.get_account()?, UpsertRequest { updates: updates.clone() })
973 .await?;
974 ctx.pushed_metas = updates;
975 }
976
977 let mut tx = self.begin_tx().await;
978 let db = tx.db();
979
980 (&mut db.base_metadata)
982 .to_lazy()
983 .stage(local_changes_no_digests)
984 .promote()?;
985 db.base_metadata.stage(&mut db.local_metadata).prune()?;
986
987 tx.end();
988
989 Ok(())
990 }
991
992 async fn push_docs(&self, ctx: &mut SyncContext) -> LbResult<()> {
994 ctx.msg("Pushing document changes...");
995 let mut updates = vec![];
996 let mut local_changes_digests_only = vec![];
997
998 let tx = self.ro_tx().await;
999 let db = tx.db();
1000 let start = Instant::now();
1001
1002 let local = db.base_metadata.stage(&db.local_metadata).to_lazy();
1003
1004 for id in local.tree.staged.ids() {
1005 let base_file = local.tree.base.find(&id)?.clone();
1006
1007 let mut local_change = base_file.timestamped_value.value.clone();
1009 local_change.document_hmac = local.find(&id)?.timestamped_value.value.document_hmac;
1010
1011 if base_file.document_hmac() == local_change.document_hmac()
1012 || local_change.document_hmac.is_none()
1013 {
1014 continue;
1015 }
1016
1017 let local_change = local_change.sign(&self.keychain)?;
1018
1019 updates.push(FileDiff { old: Some(base_file), new: local_change.clone() });
1020 local_changes_digests_only.push(local_change);
1021 self.events.sync(SyncIncrement::PushingDocument(id, true));
1022 }
1023
1024 drop(tx);
1025 if start.elapsed() > std::time::Duration::from_millis(100) {
1026 warn!("sync push_docs held lock for {:?}", start.elapsed());
1027 }
1028
1029 let docs_count = updates.len();
1030 ctx.total += docs_count;
1031 let futures = updates.clone().into_iter().map(|diff| self.push_doc(diff));
1032
1033 let mut stream = stream::iter(futures).buffer_unordered(
1034 thread::available_parallelism()
1035 .unwrap_or(NonZeroUsize::new(4).unwrap())
1036 .into(),
1037 );
1038
1039 let mut idx = 0;
1040 while let Some(fut) = stream.next().await {
1041 let id = fut?;
1042 self.events.sync(SyncIncrement::PushingDocument(id, false));
1043 ctx.file_msg(id, &format!("Pushed file {idx} of {docs_count}."));
1044 idx += 1;
1045 }
1046 ctx.pushed_docs = updates;
1047
1048 let mut tx = self.begin_tx().await;
1049 let db = tx.db();
1050 (&mut db.base_metadata)
1052 .to_lazy()
1053 .stage(local_changes_digests_only)
1054 .promote()?;
1055
1056 db.base_metadata.stage(&mut db.local_metadata).prune()?;
1057
1058 tx.end();
1059
1060 Ok(())
1061 }
1062
1063 async fn push_doc(&self, diff: FileDiff<SignedFile>) -> LbResult<Uuid> {
1064 let id = *diff.new.id();
1065 let hmac = diff.new.document_hmac();
1066 let local_document_change = self.docs.get(id, hmac.copied()).await?;
1067 self.client
1068 .request(
1069 self.get_account()?,
1070 ChangeDocRequest { diff, new_content: local_document_change },
1071 )
1072 .await?;
1073
1074 Ok(id)
1075 }
1076
1077 async fn dedup(
1078 &self, updates: GetUpdatesResponse,
1079 ) -> LbResult<(Vec<SignedFile>, u64, Option<Uuid>)> {
1080 let tx = self.ro_tx().await;
1081 let db = tx.db();
1082
1083 let mut root_id = None;
1084 let (remote_changes, update_as_of) = {
1085 let mut remote_changes = updates.file_metadata;
1086 let update_as_of = updates.as_of_metadata_version;
1087
1088 remote_changes = self.prune_remote_orphans(remote_changes).await?;
1089
1090 let remote = db.base_metadata.stage(remote_changes).pruned()?.to_lazy();
1091
1092 let (_, remote_changes) = remote.unstage();
1093 (remote_changes, update_as_of)
1094 };
1095
1096 if db.root.get().is_none() {
1098 let root = remote_changes
1099 .all_files()?
1100 .into_iter()
1101 .find(|f| f.is_root())
1102 .ok_or(LbErrKind::RootNonexistent)?;
1103 root_id = Some(*root.id());
1104 }
1105
1106 Ok((remote_changes, update_as_of, root_id))
1107 }
1108
1109 async fn prune_remote_orphans(
1110 &self, remote_changes: Vec<SignedFile>,
1111 ) -> LbResult<Vec<SignedFile>> {
1112 let tx = self.ro_tx().await;
1113 let db = tx.db();
1114
1115 let me = Owner(self.keychain.get_pk()?);
1116 let remote = db.base_metadata.stage(remote_changes).to_lazy();
1117 let mut result = Vec::new();
1118
1119 for id in remote.tree.staged.ids() {
1120 let meta = remote.find(&id)?;
1121 if remote.maybe_find_parent(meta).is_some()
1122 || meta
1123 .user_access_keys()
1124 .iter()
1125 .any(|k| k.encrypted_for == me.0)
1126 {
1127 result.push(remote.find(&id)?.clone()); }
1129 }
1130 Ok(result)
1131 }
1132
1133 async fn commit_last_synced(&self, ctx: &mut SyncContext) -> LbResult<()> {
1134 ctx.msg("Cleaning up...");
1135 let mut tx = self.begin_tx().await;
1136 let db = tx.db();
1137 db.last_synced.insert(ctx.update_as_of as i64)?;
1138
1139 if let Some(root) = ctx.root {
1140 db.root.insert(root)?;
1141 }
1142
1143 Ok(())
1144 }
1145
1146 pub async fn get_last_synced_human(&self) -> LbResult<String> {
1147 let tx = self.ro_tx().await;
1148 let db = tx.db();
1149 let last_synced = db.last_synced.get().copied().unwrap_or(0);
1150
1151 Ok(self.get_timestamp_human_string(last_synced))
1152 }
1153
1154 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
1155 if timestamp != 0 {
1156 Duration::milliseconds(clock::get_time().0 - timestamp)
1157 .format_human()
1158 .to_string()
1159 } else {
1160 "never".to_string()
1161 }
1162 }
1163}
1164
1165impl SyncContext {
1166 fn summarize(&self) -> SyncStatus {
1167 let mut local = HashSet::new();
1168 let mut server = HashSet::new();
1169 let mut work_units = vec![];
1170
1171 for meta in &self.pushed_metas {
1172 local.insert(meta.new.id());
1173 }
1174
1175 for meta in &self.pushed_docs {
1176 local.insert(meta.new.id());
1177 }
1178
1179 for meta in &self.remote_changes {
1180 server.insert(meta.id());
1181 }
1182
1183 for id in local {
1184 work_units.push(WorkUnit::LocalChange(*id));
1185 }
1186
1187 for id in server {
1188 work_units.push(WorkUnit::ServerChange(*id));
1189 }
1190
1191 SyncStatus { work_units, latest_server_ts: self.update_as_of }
1192 }
1193
1194 fn msg(&mut self, msg: &str) {
1195 self.current += 1;
1196 if let Some(f) = &self.progress {
1197 f(SyncProgress {
1198 total: self.total,
1199 progress: self.current,
1200 file_being_processed: Default::default(),
1201 msg: msg.to_string(),
1202 })
1203 }
1204 }
1205
1206 fn file_msg(&mut self, id: Uuid, msg: &str) {
1207 self.current += 1;
1208 if let Some(f) = &self.progress {
1209 f(SyncProgress {
1210 total: self.total,
1211 progress: self.current,
1212 file_being_processed: Some(id),
1213 msg: msg.to_string(),
1214 })
1215 }
1216 }
1217
1218 fn done_msg(&mut self) {
1219 self.current = self.total;
1220 if let Some(f) = &self.progress {
1221 f(SyncProgress {
1222 total: self.total,
1223 progress: self.current,
1224 file_being_processed: None,
1225 msg: "Sync successful!".to_string(),
1226 })
1227 }
1228 }
1229}
1230
1231#[derive(Debug, Serialize, Clone)]
1232pub struct SyncStatus {
1233 pub work_units: Vec<WorkUnit>,
1234 pub latest_server_ts: u64,
1235}
1236
1237#[derive(Clone)]
1238pub struct SyncProgress {
1239 pub total: usize,
1240 pub progress: usize,
1241 pub file_being_processed: Option<Uuid>,
1242 pub msg: String,
1243}
1244
1245impl Display for SyncProgress {
1246 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1247 write!(f, "[{} / {}]: {}", self.progress, self.total, self.msg)
1248 }
1249}
1250
1251#[derive(Debug, Clone)]
1252pub enum SyncIncrement {
1253 SyncStarted,
1254 PullingDocument(Uuid, bool),
1255 PushingDocument(Uuid, bool),
1256 SyncFinished(Option<LbErrKind>),
1257}