1use std::collections::BTreeMap;
18use std::iter;
19use std::iter::zip;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::MergedTreeId;
39use crate::backend::TreeId;
40use crate::backend::TreeValue;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_builder::TreeBuilder;
57use crate::tree_merge::merge_trees;
58
59#[derive(PartialEq, Eq, Clone, Debug)]
61pub struct MergedTree {
62 trees: Merge<Tree>,
63}
64
65impl MergedTree {
66 pub fn resolved(tree: Tree) -> Self {
68 Self::new(Merge::resolved(tree))
69 }
70
71 pub fn new(trees: Merge<Tree>) -> Self {
74 debug_assert!(trees.iter().map(|tree| tree.dir()).all_equal());
75 debug_assert!(
76 trees
77 .iter()
78 .map(|tree| Arc::as_ptr(tree.store()))
79 .all_equal()
80 );
81 Self { trees }
82 }
83
84 pub fn as_merge(&self) -> &Merge<Tree> {
86 &self.trees
87 }
88
89 pub fn take(self) -> Merge<Tree> {
91 self.trees
92 }
93
94 pub fn dir(&self) -> &RepoPath {
96 self.trees.first().dir()
97 }
98
99 pub fn store(&self) -> &Arc<Store> {
101 self.trees.first().store()
102 }
103
104 pub fn names<'a>(&'a self) -> Box<dyn Iterator<Item = &'a RepoPathComponent> + 'a> {
106 Box::new(all_tree_basenames(&self.trees))
107 }
108
109 pub fn value(&self, basename: &RepoPathComponent) -> MergedTreeVal<'_> {
114 trees_value(&self.trees, basename)
115 }
116
117 pub async fn resolve(self) -> BackendResult<Self> {
120 let merged = merge_trees(self.trees).await?;
121 let simplified = merged.simplify();
126 if cfg!(debug_assertions) {
130 let re_merged = merge_trees(simplified.clone()).await.unwrap();
131 debug_assert_eq!(re_merged, simplified);
132 }
133 Ok(Self { trees: simplified })
134 }
135
136 pub fn conflicts(
142 &self,
143 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
144 ConflictIterator::new(self)
145 }
146
147 pub fn has_conflict(&self) -> bool {
149 !self.trees.is_resolved()
150 }
151
152 pub async fn sub_tree(&self, name: &RepoPathComponent) -> BackendResult<Option<Self>> {
156 match self.value(name).into_resolved() {
157 Ok(Some(TreeValue::Tree(sub_tree_id))) => {
158 let subdir = self.dir().join(name);
159 Ok(Some(Self::resolved(
160 self.store().get_tree_async(subdir, sub_tree_id).await?,
161 )))
162 }
163 Ok(_) => Ok(None),
164 Err(merge) => {
165 if !merge.is_tree() {
166 return Ok(None);
167 }
168 let trees = merge
169 .try_map_async(async |value| match value {
170 Some(TreeValue::Tree(sub_tree_id)) => {
171 let subdir = self.dir().join(name);
172 self.store().get_tree_async(subdir, sub_tree_id).await
173 }
174 Some(_) => unreachable!(),
175 None => {
176 let subdir = self.dir().join(name);
177 Ok(Tree::empty(self.store().clone(), subdir))
178 }
179 })
180 .await?;
181 Ok(Some(Self { trees }))
182 }
183 }
184 }
185
186 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
190 self.path_value_async(path).block_on()
191 }
192
193 pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
195 assert_eq!(self.dir(), RepoPath::root());
196 match path.split() {
197 Some((dir, basename)) => match self.sub_tree_recursive(dir).await? {
198 None => Ok(Merge::absent()),
199 Some(tree) => Ok(tree.value(basename).cloned()),
200 },
201 None => Ok(self
202 .trees
203 .map(|tree| Some(TreeValue::Tree(tree.id().clone())))),
204 }
205 }
206
207 pub fn id(&self) -> MergedTreeId {
209 MergedTreeId::Merge(self.trees.map(|tree| tree.id().clone()))
210 }
211
212 pub async fn sub_tree_recursive(&self, path: &RepoPath) -> BackendResult<Option<Self>> {
214 let mut current_tree = self.clone();
215 for name in path.components() {
216 match current_tree.sub_tree(name).await? {
217 None => {
218 return Ok(None);
219 }
220 Some(sub_tree) => {
221 current_tree = sub_tree;
222 }
223 }
224 }
225 Ok(Some(current_tree))
226 }
227
228 pub fn entries(&self) -> TreeEntriesIterator<'static> {
238 self.entries_matching(&EverythingMatcher)
239 }
240
241 pub fn entries_matching<'matcher>(
243 &self,
244 matcher: &'matcher dyn Matcher,
245 ) -> TreeEntriesIterator<'matcher> {
246 TreeEntriesIterator::new(&self.trees, matcher)
247 }
248
249 fn diff_stream_internal<'matcher>(
254 &self,
255 other: &Self,
256 matcher: &'matcher dyn Matcher,
257 ) -> TreeDiffStream<'matcher> {
258 let concurrency = self.store().concurrency();
259 if concurrency <= 1 {
260 Box::pin(futures::stream::iter(TreeDiffIterator::new(
261 &self.trees,
262 &other.trees,
263 matcher,
264 )))
265 } else {
266 Box::pin(TreeDiffStreamImpl::new(
267 &self.trees,
268 &other.trees,
269 matcher,
270 concurrency,
271 ))
272 }
273 }
274
275 pub fn diff_stream<'matcher>(
277 &self,
278 other: &Self,
279 matcher: &'matcher dyn Matcher,
280 ) -> TreeDiffStream<'matcher> {
281 stream_without_trees(self.diff_stream_internal(other, matcher))
282 }
283
284 pub fn diff_stream_for_file_system<'matcher>(
287 &self,
288 other: &Self,
289 matcher: &'matcher dyn Matcher,
290 ) -> TreeDiffStream<'matcher> {
291 Box::pin(DiffStreamForFileSystem::new(
292 self.diff_stream_internal(other, matcher),
293 ))
294 }
295
296 pub fn diff_stream_with_copies<'a>(
298 &self,
299 other: &Self,
300 matcher: &'a dyn Matcher,
301 copy_records: &'a CopyRecords,
302 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
303 let stream = self.diff_stream(other, matcher);
304 Box::pin(CopiesTreeDiffStream::new(
305 stream,
306 self.clone(),
307 other.clone(),
308 copy_records,
309 ))
310 }
311
312 pub async fn merge(self, base: Self, other: Self) -> BackendResult<Self> {
315 self.merge_no_resolve(base, other).resolve().await
316 }
317
318 pub fn merge_no_resolve(self, base: Self, other: Self) -> Self {
321 let nested = Merge::from_vec(vec![self.trees, base.trees, other.trees]);
322 Self {
323 trees: nested.flatten().simplify(),
324 }
325 }
326}
327
328pub struct TreeDiffEntry {
330 pub path: RepoPathBuf,
332 pub values: BackendResult<Diff<MergedTreeValue>>,
334}
335
336pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
340
341fn all_tree_basenames(trees: &Merge<Tree>) -> impl Iterator<Item = &RepoPathComponent> {
342 trees
343 .iter()
344 .map(|tree| tree.data().names())
345 .kmerge()
346 .dedup()
347}
348
349fn all_tree_entries(
350 trees: &Merge<Tree>,
351) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
352 if let Some(tree) = trees.as_resolved() {
353 let iter = tree
354 .entries_non_recursive()
355 .map(|entry| (entry.name(), Merge::normal(entry.value())));
356 Either::Left(iter)
357 } else {
358 let same_change = trees.first().store().merge_options().same_change;
359 let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
360 let values = match values.resolve_trivial(same_change) {
362 Some(resolved) => Merge::resolved(*resolved),
363 None => values,
364 };
365 (name, values)
366 });
367 Either::Right(iter)
368 }
369}
370
371pub fn all_merged_tree_entries(
375 trees: &Merge<Tree>,
376) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
377 let mut entries_iters = trees
378 .iter()
379 .map(|tree| tree.entries_non_recursive().peekable())
380 .collect_vec();
381 iter::from_fn(move || {
382 let next_name = entries_iters
383 .iter_mut()
384 .filter_map(|iter| iter.peek())
385 .map(|entry| entry.name())
386 .min()?;
387 let values: MergeBuilder<_> = entries_iters
388 .iter_mut()
389 .map(|iter| {
390 let entry = iter.next_if(|entry| entry.name() == next_name)?;
391 Some(entry.value())
392 })
393 .collect();
394 Some((next_name, values.build()))
395 })
396}
397
398fn merged_tree_entry_diff<'a>(
399 trees1: &'a Merge<Tree>,
400 trees2: &'a Merge<Tree>,
401) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
402 itertools::merge_join_by(
403 all_tree_entries(trees1),
404 all_tree_entries(trees2),
405 |(name1, _), (name2, _)| name1.cmp(name2),
406 )
407 .map(|entry| match entry {
408 EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
409 EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
410 EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
411 })
412 .filter(|(_, diff)| diff.is_changed())
413}
414
415fn trees_value<'a>(trees: &'a Merge<Tree>, basename: &RepoPathComponent) -> MergedTreeVal<'a> {
416 if let Some(tree) = trees.as_resolved() {
417 return Merge::resolved(tree.value(basename));
418 }
419 let same_change = trees.first().store().merge_options().same_change;
420 let value = trees.map(|tree| tree.value(basename));
421 if let Some(resolved) = value.resolve_trivial(same_change) {
422 return Merge::resolved(*resolved);
423 }
424 value
425}
426
427pub struct TreeEntriesIterator<'matcher> {
429 store: Arc<Store>,
430 stack: Vec<TreeEntriesDirItem>,
431 matcher: &'matcher dyn Matcher,
432}
433
434struct TreeEntriesDirItem {
435 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
436}
437
438impl TreeEntriesDirItem {
439 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
440 let mut entries = vec![];
441 let dir = trees.first().dir();
442 for (name, value) in all_tree_entries(trees) {
443 let path = dir.join(name);
444 if value.is_tree() {
445 if matcher.visit(&path).is_nothing() {
447 continue;
448 }
449 } else if !matcher.matches(&path) {
450 continue;
451 }
452 entries.push((path, value.cloned()));
453 }
454 entries.reverse();
455 Self { entries }
456 }
457}
458
459impl<'matcher> TreeEntriesIterator<'matcher> {
460 fn new(trees: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
461 Self {
462 store: trees.first().store().clone(),
463 stack: vec![TreeEntriesDirItem::new(trees, matcher)],
464 matcher,
465 }
466 }
467}
468
469impl Iterator for TreeEntriesIterator<'_> {
470 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
471
472 fn next(&mut self) -> Option<Self::Item> {
473 while let Some(top) = self.stack.last_mut() {
474 if let Some((path, value)) = top.entries.pop() {
475 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
476 Ok(maybe_trees) => maybe_trees,
477 Err(err) => return Some((path, Err(err))),
478 };
479 if let Some(trees) = maybe_trees {
480 self.stack
481 .push(TreeEntriesDirItem::new(&trees, self.matcher));
482 } else {
483 return Some((path, Ok(value)));
484 }
485 } else {
486 self.stack.pop();
487 }
488 }
489 None
490 }
491}
492
493struct ConflictsDirItem {
496 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
497}
498
499impl From<&Merge<Tree>> for ConflictsDirItem {
500 fn from(trees: &Merge<Tree>) -> Self {
501 let dir = trees.first().dir();
502 if trees.is_resolved() {
503 return Self { entries: vec![] };
504 }
505
506 let mut entries = vec![];
507 for (basename, value) in all_tree_entries(trees) {
508 if !value.is_resolved() {
509 entries.push((dir.join(basename), value.cloned()));
510 }
511 }
512 entries.reverse();
513 Self { entries }
514 }
515}
516
517struct ConflictIterator {
518 store: Arc<Store>,
519 stack: Vec<ConflictsDirItem>,
520}
521
522impl ConflictIterator {
523 fn new(tree: &MergedTree) -> Self {
524 Self {
525 store: tree.store().clone(),
526 stack: vec![ConflictsDirItem::from(&tree.trees)],
527 }
528 }
529}
530
531impl Iterator for ConflictIterator {
532 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
533
534 fn next(&mut self) -> Option<Self::Item> {
535 while let Some(top) = self.stack.last_mut() {
536 if let Some((path, tree_values)) = top.entries.pop() {
537 match tree_values.to_tree_merge(&self.store, &path).block_on() {
538 Ok(Some(trees)) => {
539 self.stack.push(ConflictsDirItem::from(&trees));
541 }
542 Ok(None) => {
543 return Some((path, Ok(tree_values)));
547 }
548 Err(err) => {
549 return Some((path, Err(err)));
550 }
551 }
552 } else {
553 self.stack.pop();
554 }
555 }
556 None
557 }
558}
559
560pub struct TreeDiffIterator<'matcher> {
565 store: Arc<Store>,
566 stack: Vec<TreeDiffDir>,
567 matcher: &'matcher dyn Matcher,
568}
569
570struct TreeDiffDir {
571 entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
572}
573
574impl<'matcher> TreeDiffIterator<'matcher> {
575 pub fn new(trees1: &Merge<Tree>, trees2: &Merge<Tree>, matcher: &'matcher dyn Matcher) -> Self {
577 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
578 let root_dir = RepoPath::root();
579 let mut stack = Vec::new();
580 if !matcher.visit(root_dir).is_nothing() {
581 stack.push(TreeDiffDir::from_trees(root_dir, trees1, trees2, matcher));
582 };
583 Self {
584 store: trees1.first().store().clone(),
585 stack,
586 matcher,
587 }
588 }
589
590 fn trees(
592 store: &Arc<Store>,
593 dir: &RepoPath,
594 values: &MergedTreeValue,
595 ) -> BackendResult<Merge<Tree>> {
596 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
597 Ok(trees)
598 } else {
599 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
600 }
601 }
602}
603
604impl TreeDiffDir {
605 fn from_trees(
606 dir: &RepoPath,
607 trees1: &Merge<Tree>,
608 trees2: &Merge<Tree>,
609 matcher: &dyn Matcher,
610 ) -> Self {
611 let mut entries = vec![];
612 for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
613 let path = dir.join(name);
614 let tree_before = diff.before.is_tree();
615 let tree_after = diff.after.is_tree();
616 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
619 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
620
621 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
623 diff.before
624 } else {
625 Merge::absent()
626 };
627 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
628 diff.after
629 } else {
630 Merge::absent()
631 };
632 if before.is_absent() && after.is_absent() {
633 continue;
634 }
635 entries.push((path, Diff::new(before.cloned(), after.cloned())));
636 }
637 entries.reverse();
638 Self { entries }
639 }
640}
641
642impl Iterator for TreeDiffIterator<'_> {
643 type Item = TreeDiffEntry;
644
645 fn next(&mut self) -> Option<Self::Item> {
646 while let Some(top) = self.stack.last_mut() {
647 let (path, diff) = match top.entries.pop() {
648 Some(entry) => entry,
649 None => {
650 self.stack.pop().unwrap();
651 continue;
652 }
653 };
654
655 if diff.before.is_tree() || diff.after.is_tree() {
656 let (before_tree, after_tree) = match (
657 Self::trees(&self.store, &path, &diff.before),
658 Self::trees(&self.store, &path, &diff.after),
659 ) {
660 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
661 (Err(before_err), _) => {
662 return Some(TreeDiffEntry {
663 path,
664 values: Err(before_err),
665 });
666 }
667 (_, Err(after_err)) => {
668 return Some(TreeDiffEntry {
669 path,
670 values: Err(after_err),
671 });
672 }
673 };
674 let subdir =
675 TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
676 self.stack.push(subdir);
677 };
678 if diff.before.is_file_like() || diff.after.is_file_like() {
679 return Some(TreeDiffEntry {
680 path,
681 values: Ok(diff),
682 });
683 }
684 }
685 None
686 }
687}
688
689pub struct TreeDiffStreamImpl<'matcher> {
694 store: Arc<Store>,
695 matcher: &'matcher dyn Matcher,
696 items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
701 #[expect(clippy::type_complexity)]
703 pending_trees:
704 BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
705 max_concurrent_reads: usize,
710 max_queued_items: usize,
716}
717
718impl<'matcher> TreeDiffStreamImpl<'matcher> {
719 pub fn new(
722 trees1: &Merge<Tree>,
723 trees2: &Merge<Tree>,
724 matcher: &'matcher dyn Matcher,
725 max_concurrent_reads: usize,
726 ) -> Self {
727 assert!(Arc::ptr_eq(trees1.first().store(), trees2.first().store()));
728 let mut stream = Self {
729 store: trees1.first().store().clone(),
730 matcher,
731 items: BTreeMap::new(),
732 pending_trees: BTreeMap::new(),
733 max_concurrent_reads,
734 max_queued_items: 10000,
735 };
736 stream.add_dir_diff_items(RepoPath::root(), trees1, trees2);
737 stream
738 }
739
740 async fn single_tree(
741 store: &Arc<Store>,
742 dir: RepoPathBuf,
743 value: Option<&TreeValue>,
744 ) -> BackendResult<Tree> {
745 match value {
746 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
747 _ => Ok(Tree::empty(store.clone(), dir.clone())),
748 }
749 }
750
751 async fn trees(
753 store: Arc<Store>,
754 dir: RepoPathBuf,
755 values: MergedTreeValue,
756 ) -> BackendResult<Merge<Tree>> {
757 if values.is_tree() {
758 values
759 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
760 .await
761 } else {
762 Ok(Merge::resolved(Tree::empty(store, dir)))
763 }
764 }
765
766 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
767 for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
768 let path = dir.join(basename);
769 let tree_before = diff.before.is_tree();
770 let tree_after = diff.after.is_tree();
771 let tree_matches =
774 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
775 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
776
777 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
779 diff.before
780 } else {
781 Merge::absent()
782 };
783 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
784 diff.after
785 } else {
786 Merge::absent()
787 };
788 if before.is_absent() && after.is_absent() {
789 continue;
790 }
791
792 if tree_matches {
794 let before_tree_future =
795 Self::trees(self.store.clone(), path.clone(), before.cloned());
796 let after_tree_future =
797 Self::trees(self.store.clone(), path.clone(), after.cloned());
798 let both_trees_future = try_join(before_tree_future, after_tree_future);
799 self.pending_trees
800 .insert(path.clone(), Box::pin(both_trees_future));
801 }
802
803 if before.is_file_like() || after.is_file_like() {
804 self.items
805 .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
806 }
807 }
808 }
809
810 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
811 loop {
812 let mut tree_diffs = vec![];
813 let mut some_pending = false;
814 let mut all_pending = true;
815 for (dir, future) in self
816 .pending_trees
817 .iter_mut()
818 .take(self.max_concurrent_reads)
819 {
820 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
821 all_pending = false;
822 tree_diffs.push((dir.clone(), tree_diff));
823 } else {
824 some_pending = true;
825 }
826 }
827
828 for (dir, tree_diff) in tree_diffs {
829 let _ = self.pending_trees.remove_entry(&dir).unwrap();
830 match tree_diff {
831 Ok((trees1, trees2)) => {
832 self.add_dir_diff_items(&dir, &trees1, &trees2);
833 }
834 Err(err) => {
835 self.items.insert(dir, Err(err));
836 }
837 }
838 }
839
840 if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
844 return;
845 }
846 }
847 }
848}
849
850impl Stream for TreeDiffStreamImpl<'_> {
851 type Item = TreeDiffEntry;
852
853 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
854 self.poll_tree_futures(cx);
856
857 if let Some((path, _)) = self.items.first_key_value() {
859 if let Some((dir, _)) = self.pending_trees.first_key_value()
862 && dir < path
863 {
864 return Poll::Pending;
865 }
866
867 let (path, values) = self.items.pop_first().unwrap();
868 Poll::Ready(Some(TreeDiffEntry { path, values }))
869 } else if self.pending_trees.is_empty() {
870 Poll::Ready(None)
871 } else {
872 Poll::Pending
873 }
874 }
875}
876
877fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
878 Box::pin(stream.map(|mut entry| {
879 let skip_tree = |merge: MergedTreeValue| {
880 if merge.is_tree() {
881 Merge::absent()
882 } else {
883 merge
884 }
885 };
886 entry.values = entry.values.map(|diff| diff.map(skip_tree));
887 entry
888 }))
889}
890
891struct DiffStreamForFileSystem<'a> {
894 inner: TreeDiffStream<'a>,
895 next_item: Option<TreeDiffEntry>,
896 held_file: Option<TreeDiffEntry>,
897}
898
899impl<'a> DiffStreamForFileSystem<'a> {
900 fn new(inner: TreeDiffStream<'a>) -> Self {
901 Self {
902 inner,
903 next_item: None,
904 held_file: None,
905 }
906 }
907}
908
909impl Stream for DiffStreamForFileSystem<'_> {
910 type Item = TreeDiffEntry;
911
912 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
913 while let Some(next) = match self.next_item.take() {
914 Some(next) => Some(next),
915 None => ready!(self.inner.as_mut().poll_next(cx)),
916 } {
917 if let Some(held_entry) = self
921 .held_file
922 .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
923 {
924 self.next_item = Some(next);
925 return Poll::Ready(Some(held_entry));
926 }
927
928 match next.values {
929 Ok(diff) if diff.before.is_tree() => {
930 assert!(diff.after.is_present());
931 assert!(self.held_file.is_none());
932 self.held_file = Some(TreeDiffEntry {
933 path: next.path,
934 values: Ok(Diff::new(Merge::absent(), diff.after)),
935 });
936 }
937 Ok(diff) if diff.after.is_tree() => {
938 assert!(diff.before.is_present());
939 return Poll::Ready(Some(TreeDiffEntry {
940 path: next.path,
941 values: Ok(Diff::new(diff.before, Merge::absent())),
942 }));
943 }
944 _ => {
945 return Poll::Ready(Some(next));
946 }
947 }
948 }
949 Poll::Ready(self.held_file.take())
950 }
951}
952
953pub struct MergedTreeBuilder {
961 base_tree_id: MergedTreeId,
962 overrides: BTreeMap<RepoPathBuf, MergedTreeValue>,
963}
964
965impl MergedTreeBuilder {
966 pub fn new(base_tree_id: MergedTreeId) -> Self {
968 Self {
969 base_tree_id,
970 overrides: BTreeMap::new(),
971 }
972 }
973
974 pub fn set_or_remove(&mut self, path: RepoPathBuf, values: MergedTreeValue) {
979 self.overrides.insert(path, values);
980 }
981
982 pub fn write_tree(self, store: &Arc<Store>) -> BackendResult<MergedTreeId> {
984 let base_tree_ids = match self.base_tree_id.clone() {
985 MergedTreeId::Legacy(base_tree_id) => Merge::resolved(base_tree_id),
986 MergedTreeId::Merge(base_tree_ids) => base_tree_ids,
987 };
988 let new_tree_ids = self.write_merged_trees(base_tree_ids, store)?;
989 match new_tree_ids.simplify().into_resolved() {
990 Ok(single_tree_id) => Ok(MergedTreeId::resolved(single_tree_id)),
991 Err(tree_id) => {
992 let tree = store.get_root_tree(&MergedTreeId::Merge(tree_id))?;
993 let resolved = tree.resolve().block_on()?;
994 Ok(resolved.id())
995 }
996 }
997 }
998
999 fn write_merged_trees(
1000 self,
1001 mut base_tree_ids: Merge<TreeId>,
1002 store: &Arc<Store>,
1003 ) -> BackendResult<Merge<TreeId>> {
1004 let num_sides = self
1005 .overrides
1006 .values()
1007 .map(|value| value.num_sides())
1008 .max()
1009 .unwrap_or(0);
1010 base_tree_ids.pad_to(num_sides, store.empty_tree_id());
1011 let mut tree_builders =
1013 base_tree_ids.map(|base_tree_id| TreeBuilder::new(store.clone(), base_tree_id.clone()));
1014 for (path, values) in self.overrides {
1015 match values.into_resolved() {
1016 Ok(value) => {
1017 for builder in tree_builders.iter_mut() {
1020 builder.set_or_remove(path.clone(), value.clone());
1021 }
1022 }
1023 Err(mut values) => {
1024 values.pad_to(num_sides, &None);
1025 for (builder, value) in zip(tree_builders.iter_mut(), values) {
1028 builder.set_or_remove(path.clone(), value);
1029 }
1030 }
1031 }
1032 }
1033 let merge_builder: MergeBuilder<TreeId> = tree_builders
1037 .into_iter()
1038 .map(|builder| builder.write_tree())
1039 .try_collect()?;
1040 Ok(merge_builder.build())
1041 }
1042}