1use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::TreeId;
39use crate::backend::TreeValue;
40use crate::conflict_labels::ConflictLabels;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_merge::merge_trees;
57
58#[derive(Clone)]
61pub struct MergedTree {
62 store: Arc<Store>,
63 tree_ids: Merge<TreeId>,
64 labels: ConflictLabels,
65}
66
67impl fmt::Debug for MergedTree {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct("MergedTree")
70 .field("tree_ids", &self.tree_ids)
71 .field("labels", &self.labels)
72 .finish_non_exhaustive()
73 }
74}
75
76impl MergedTree {
77 pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
79 Self {
80 store,
81 tree_ids: Merge::resolved(tree_id),
82 labels: ConflictLabels::unlabeled(),
83 }
84 }
85
86 pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
88 if let Some(num_sides) = labels.num_sides() {
89 assert_eq!(tree_ids.num_sides(), num_sides);
90 }
91 Self {
92 store,
93 tree_ids,
94 labels,
95 }
96 }
97
98 pub fn store(&self) -> &Arc<Store> {
100 &self.store
101 }
102
103 pub fn tree_ids(&self) -> &Merge<TreeId> {
106 &self.tree_ids
107 }
108
109 pub fn into_tree_ids(self) -> Merge<TreeId> {
112 self.tree_ids
113 }
114
115 pub fn labels(&self) -> &ConflictLabels {
117 &self.labels
118 }
119
120 pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
124 (&self.tree_ids, &self.labels)
125 }
126
127 pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
129 (self.tree_ids, self.labels)
130 }
131
132 pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
134 self.tree_ids
135 .try_map_async(|id| self.store.get_tree_async(RepoPathBuf::root(), id))
136 .await
137 }
138
139 pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
143 if self.tree_ids.is_resolved() {
144 assert!(!self.labels.has_labels());
145 Merge::resolved(label)
146 } else if self.labels.has_labels() {
147 let labels = self.labels.as_merge();
151 assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
152 labels.map(|label| label.as_str())
153 } else {
154 Merge::repeated("", self.tree_ids.num_sides())
159 }
160 }
161
162 pub async fn resolve(self) -> BackendResult<Self> {
165 let merged = merge_trees(&self.store, self.tree_ids).await?;
166 let (simplified_labels, simplified) = if merged.is_resolved() {
171 (ConflictLabels::unlabeled(), merged)
172 } else {
173 self.labels.simplify_with(&merged)
174 };
175 if cfg!(debug_assertions) {
179 let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
180 debug_assert_eq!(re_merged, simplified);
181 }
182 Ok(Self {
183 store: self.store,
184 tree_ids: simplified,
185 labels: simplified_labels,
186 })
187 }
188
189 pub fn conflicts(
194 &self,
195 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196 self.conflicts_matching(&EverythingMatcher)
197 }
198
199 pub fn conflicts_matching<'matcher>(
201 &self,
202 matcher: &'matcher dyn Matcher,
203 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
204 ConflictIterator::new(self, matcher)
205 }
206
207 pub fn has_conflict(&self) -> bool {
209 !self.tree_ids.is_resolved()
210 }
211
212 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
216 self.path_value_async(path).block_on()
217 }
218
219 pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
221 match path.split() {
222 Some((dir, basename)) => {
223 let trees = self.trees().await?;
224 match trees.sub_tree_recursive(dir).await? {
225 None => Ok(Merge::absent()),
226 Some(tree) => Ok(tree.value(basename).cloned()),
227 }
228 }
229 None => Ok(self.to_merged_tree_value()),
230 }
231 }
232
233 fn to_merged_tree_value(&self) -> MergedTreeValue {
234 self.tree_ids
235 .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
236 }
237
238 pub fn entries(&self) -> TreeEntriesIterator<'static> {
248 self.entries_matching(&EverythingMatcher)
249 }
250
251 pub fn entries_matching<'matcher>(
253 &self,
254 matcher: &'matcher dyn Matcher,
255 ) -> TreeEntriesIterator<'matcher> {
256 TreeEntriesIterator::new(self, matcher)
257 }
258
259 fn diff_stream_internal<'matcher>(
261 &self,
262 other: &Self,
263 matcher: &'matcher dyn Matcher,
264 ) -> TreeDiffStream<'matcher> {
265 let concurrency = self.store().concurrency();
266 if concurrency <= 1 {
267 Box::pin(futures::stream::iter(TreeDiffIterator::new(
268 self, other, matcher,
269 )))
270 } else {
271 Box::pin(TreeDiffStreamImpl::new(self, other, matcher, concurrency))
272 }
273 }
274
275 pub fn diff_stream<'matcher>(
277 &self,
278 other: &Self,
279 matcher: &'matcher dyn Matcher,
280 ) -> TreeDiffStream<'matcher> {
281 stream_without_trees(self.diff_stream_internal(other, matcher))
282 }
283
284 pub fn diff_stream_with_trees<'matcher>(
286 &self,
287 other: &Self,
288 matcher: &'matcher dyn Matcher,
289 ) -> TreeDiffStream<'matcher> {
290 self.diff_stream_internal(other, matcher)
291 }
292
293 pub fn diff_stream_for_file_system<'matcher>(
296 &self,
297 other: &Self,
298 matcher: &'matcher dyn Matcher,
299 ) -> TreeDiffStream<'matcher> {
300 Box::pin(DiffStreamForFileSystem::new(
301 self.diff_stream_internal(other, matcher),
302 ))
303 }
304
305 pub fn diff_stream_with_copies<'a>(
307 &self,
308 other: &Self,
309 matcher: &'a dyn Matcher,
310 copy_records: &'a CopyRecords,
311 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
312 let stream = self.diff_stream(other, matcher);
313 Box::pin(CopiesTreeDiffStream::new(
314 stream,
315 self.clone(),
316 other.clone(),
317 copy_records,
318 ))
319 }
320
321 pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
327 Self::merge_no_resolve(merge).resolve().await
328 }
329
330 pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
333 debug_assert!(
334 merge
335 .iter()
336 .map(|(tree, _)| Arc::as_ptr(tree.store()))
337 .all_equal()
338 );
339 let store = merge.first().0.store().clone();
340 let flattened_labels = ConflictLabels::from_merge(
341 merge
342 .map(|(tree, label)| tree.labels_by_term(label))
343 .flatten()
344 .map(|&label| label.to_owned()),
345 );
346 let flattened_tree_ids: Merge<TreeId> = merge
347 .into_iter()
348 .map(|(tree, _label)| tree.into_tree_ids())
349 .collect::<MergeBuilder<_>>()
350 .build()
351 .flatten();
352
353 let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
354 Self::new(store, tree_ids, labels)
355 }
356}
357
358#[derive(Debug)]
360pub struct TreeDiffEntry {
361 pub path: RepoPathBuf,
363 pub values: BackendResult<Diff<MergedTreeValue>>,
365}
366
367pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
371
372fn all_tree_entries(
373 trees: &Merge<Tree>,
374) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
375 if let Some(tree) = trees.as_resolved() {
376 let iter = tree
377 .entries_non_recursive()
378 .map(|entry| (entry.name(), Merge::normal(entry.value())));
379 Either::Left(iter)
380 } else {
381 let same_change = trees.first().store().merge_options().same_change;
382 let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
383 let values = match values.resolve_trivial(same_change) {
385 Some(resolved) => Merge::resolved(*resolved),
386 None => values,
387 };
388 (name, values)
389 });
390 Either::Right(iter)
391 }
392}
393
394pub fn all_merged_tree_entries(
398 trees: &Merge<Tree>,
399) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
400 let mut entries_iters = trees
401 .iter()
402 .map(|tree| tree.entries_non_recursive().peekable())
403 .collect_vec();
404 iter::from_fn(move || {
405 let next_name = entries_iters
406 .iter_mut()
407 .filter_map(|iter| iter.peek())
408 .map(|entry| entry.name())
409 .min()?;
410 let values: MergeBuilder<_> = entries_iters
411 .iter_mut()
412 .map(|iter| {
413 let entry = iter.next_if(|entry| entry.name() == next_name)?;
414 Some(entry.value())
415 })
416 .collect();
417 Some((next_name, values.build()))
418 })
419}
420
421fn merged_tree_entry_diff<'a>(
422 trees1: &'a Merge<Tree>,
423 trees2: &'a Merge<Tree>,
424) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
425 itertools::merge_join_by(
426 all_tree_entries(trees1),
427 all_tree_entries(trees2),
428 |(name1, _), (name2, _)| name1.cmp(name2),
429 )
430 .map(|entry| match entry {
431 EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
432 EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
433 EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
434 })
435 .filter(|(_, diff)| diff.is_changed())
436}
437
438pub struct TreeEntriesIterator<'matcher> {
440 store: Arc<Store>,
441 stack: Vec<TreeEntriesDirItem>,
442 matcher: &'matcher dyn Matcher,
443}
444
445struct TreeEntriesDirItem {
446 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
447}
448
449impl TreeEntriesDirItem {
450 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
451 let mut entries = vec![];
452 let dir = trees.first().dir();
453 for (name, value) in all_tree_entries(trees) {
454 let path = dir.join(name);
455 if value.is_tree() {
456 if matcher.visit(&path).is_nothing() {
458 continue;
459 }
460 } else if !matcher.matches(&path) {
461 continue;
462 }
463 entries.push((path, value.cloned()));
464 }
465 entries.reverse();
466 Self { entries }
467 }
468}
469
470impl<'matcher> TreeEntriesIterator<'matcher> {
471 fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
472 Self {
473 store: trees.store.clone(),
474 stack: vec![TreeEntriesDirItem {
475 entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
476 }],
477 matcher,
478 }
479 }
480}
481
482impl Iterator for TreeEntriesIterator<'_> {
483 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
484
485 fn next(&mut self) -> Option<Self::Item> {
486 while let Some(top) = self.stack.last_mut() {
487 if let Some((path, value)) = top.entries.pop() {
488 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
489 Ok(maybe_trees) => maybe_trees,
490 Err(err) => return Some((path, Err(err))),
491 };
492 if let Some(trees) = maybe_trees {
493 self.stack
494 .push(TreeEntriesDirItem::new(&trees, self.matcher));
495 } else {
496 return Some((path, Ok(value)));
497 }
498 } else {
499 self.stack.pop();
500 }
501 }
502 None
503 }
504}
505
506struct ConflictsDirItem {
509 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
510}
511
512impl ConflictsDirItem {
513 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
514 if trees.is_resolved() {
515 return Self { entries: vec![] };
516 }
517
518 let dir = trees.first().dir();
519 let mut entries = vec![];
520 for (basename, value) in all_tree_entries(trees) {
521 if value.is_resolved() {
522 continue;
523 }
524 let path = dir.join(basename);
525 if value.is_tree() {
526 if matcher.visit(&path).is_nothing() {
527 continue;
528 }
529 } else if !matcher.matches(&path) {
530 continue;
531 }
532 entries.push((path, value.cloned()));
533 }
534 entries.reverse();
535 Self { entries }
536 }
537}
538
539struct ConflictIterator<'matcher> {
540 store: Arc<Store>,
541 stack: Vec<ConflictsDirItem>,
542 matcher: &'matcher dyn Matcher,
543}
544
545impl<'matcher> ConflictIterator<'matcher> {
546 fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
547 Self {
548 store: tree.store().clone(),
549 stack: vec![ConflictsDirItem {
550 entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
551 }],
552 matcher,
553 }
554 }
555}
556
557impl Iterator for ConflictIterator<'_> {
558 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
559
560 fn next(&mut self) -> Option<Self::Item> {
561 while let Some(top) = self.stack.last_mut() {
562 if let Some((path, tree_values)) = top.entries.pop() {
563 match tree_values.to_tree_merge(&self.store, &path).block_on() {
564 Ok(Some(trees)) => {
565 self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
567 }
568 Ok(None) => {
569 return Some((path, Ok(tree_values)));
573 }
574 Err(err) => {
575 return Some((path, Err(err)));
576 }
577 }
578 } else {
579 self.stack.pop();
580 }
581 }
582 None
583 }
584}
585
586pub struct TreeDiffIterator<'matcher> {
588 store: Arc<Store>,
589 stack: Vec<TreeDiffDir>,
590 matcher: &'matcher dyn Matcher,
591}
592
593struct TreeDiffDir {
594 entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
595}
596
597impl<'matcher> TreeDiffIterator<'matcher> {
598 pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
600 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
601 let root_dir = RepoPath::root();
602 let mut stack = Vec::new();
603 let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
604 if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
605 stack.push(TreeDiffDir {
606 entries: vec![(root_dir.to_owned(), root_diff)],
607 });
608 }
609 Self {
610 store: tree1.store().clone(),
611 stack,
612 matcher,
613 }
614 }
615
616 fn trees(
618 store: &Arc<Store>,
619 dir: &RepoPath,
620 values: &MergedTreeValue,
621 ) -> BackendResult<Merge<Tree>> {
622 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
623 Ok(trees)
624 } else {
625 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
626 }
627 }
628}
629
630impl TreeDiffDir {
631 fn from_trees(
632 dir: &RepoPath,
633 trees1: &Merge<Tree>,
634 trees2: &Merge<Tree>,
635 matcher: &dyn Matcher,
636 ) -> Self {
637 let mut entries = vec![];
638 for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
639 let path = dir.join(name);
640 let tree_before = diff.before.is_tree();
641 let tree_after = diff.after.is_tree();
642 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
645 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
646
647 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
649 diff.before
650 } else {
651 Merge::absent()
652 };
653 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
654 diff.after
655 } else {
656 Merge::absent()
657 };
658 if before.is_absent() && after.is_absent() {
659 continue;
660 }
661 entries.push((path, Diff::new(before.cloned(), after.cloned())));
662 }
663 entries.reverse();
664 Self { entries }
665 }
666}
667
668impl Iterator for TreeDiffIterator<'_> {
669 type Item = TreeDiffEntry;
670
671 fn next(&mut self) -> Option<Self::Item> {
672 while let Some(top) = self.stack.last_mut() {
673 let Some((path, diff)) = top.entries.pop() else {
674 self.stack.pop().unwrap();
675 continue;
676 };
677
678 if diff.before.is_tree() || diff.after.is_tree() {
679 let (before_tree, after_tree) = match (
680 Self::trees(&self.store, &path, &diff.before),
681 Self::trees(&self.store, &path, &diff.after),
682 ) {
683 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
684 (Err(before_err), _) => {
685 return Some(TreeDiffEntry {
686 path,
687 values: Err(before_err),
688 });
689 }
690 (_, Err(after_err)) => {
691 return Some(TreeDiffEntry {
692 path,
693 values: Err(after_err),
694 });
695 }
696 };
697 let subdir =
698 TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
699 self.stack.push(subdir);
700 }
701 if diff.before.is_file_like()
702 || diff.after.is_file_like()
703 || self.matcher.matches(&path)
704 {
705 return Some(TreeDiffEntry {
706 path,
707 values: Ok(diff),
708 });
709 }
710 }
711 None
712 }
713}
714
715pub struct TreeDiffStreamImpl<'matcher> {
717 store: Arc<Store>,
718 matcher: &'matcher dyn Matcher,
719 items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
724 #[expect(clippy::type_complexity)]
726 pending_trees:
727 BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
728 max_concurrent_reads: usize,
733 max_queued_items: usize,
739}
740
741impl<'matcher> TreeDiffStreamImpl<'matcher> {
742 pub fn new(
745 tree1: &MergedTree,
746 tree2: &MergedTree,
747 matcher: &'matcher dyn Matcher,
748 max_concurrent_reads: usize,
749 ) -> Self {
750 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
751 let store = tree1.store().clone();
752 let mut stream = Self {
753 store: store.clone(),
754 matcher,
755 items: BTreeMap::new(),
756 pending_trees: BTreeMap::new(),
757 max_concurrent_reads,
758 max_queued_items: 10000,
759 };
760 let dir = RepoPathBuf::root();
761 let merged_tree1 = tree1.to_merged_tree_value();
762 let merged_tree2 = tree2.to_merged_tree_value();
763 let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
764 if root_diff.is_changed() && matcher.matches(&dir) {
765 stream.items.insert(dir.clone(), Ok(root_diff));
766 }
767 let root_tree_fut = Box::pin(try_join(
768 Self::trees(store.clone(), dir.clone(), merged_tree1),
769 Self::trees(store, dir.clone(), merged_tree2),
770 ));
771 stream.pending_trees.insert(dir, root_tree_fut);
772 stream
773 }
774
775 async fn single_tree(
776 store: &Arc<Store>,
777 dir: RepoPathBuf,
778 value: Option<&TreeValue>,
779 ) -> BackendResult<Tree> {
780 match value {
781 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
782 _ => Ok(Tree::empty(store.clone(), dir.clone())),
783 }
784 }
785
786 async fn trees(
788 store: Arc<Store>,
789 dir: RepoPathBuf,
790 values: MergedTreeValue,
791 ) -> BackendResult<Merge<Tree>> {
792 if values.is_tree() {
793 values
794 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
795 .await
796 } else {
797 Ok(Merge::resolved(Tree::empty(store, dir)))
798 }
799 }
800
801 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
802 for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
803 let path = dir.join(basename);
804 let tree_before = diff.before.is_tree();
805 let tree_after = diff.after.is_tree();
806 let tree_matches =
809 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
810 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
811
812 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
814 diff.before
815 } else {
816 Merge::absent()
817 };
818 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
819 diff.after
820 } else {
821 Merge::absent()
822 };
823 if before.is_absent() && after.is_absent() {
824 continue;
825 }
826
827 if tree_matches {
829 let before_tree_future =
830 Self::trees(self.store.clone(), path.clone(), before.cloned());
831 let after_tree_future =
832 Self::trees(self.store.clone(), path.clone(), after.cloned());
833 let both_trees_future = try_join(before_tree_future, after_tree_future);
834 self.pending_trees
835 .insert(path.clone(), Box::pin(both_trees_future));
836 }
837
838 if file_matches || self.matcher.matches(&path) {
839 self.items
840 .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
841 }
842 }
843 }
844
845 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
846 loop {
847 let mut tree_diffs = vec![];
848 let mut some_pending = false;
849 let mut all_pending = true;
850 for (dir, future) in self
851 .pending_trees
852 .iter_mut()
853 .take(self.max_concurrent_reads)
854 {
855 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
856 all_pending = false;
857 tree_diffs.push((dir.clone(), tree_diff));
858 } else {
859 some_pending = true;
860 }
861 }
862
863 for (dir, tree_diff) in tree_diffs {
864 drop(self.pending_trees.remove_entry(&dir).unwrap());
865 match tree_diff {
866 Ok((trees1, trees2)) => {
867 self.add_dir_diff_items(&dir, &trees1, &trees2);
868 }
869 Err(err) => {
870 self.items.insert(dir, Err(err));
871 }
872 }
873 }
874
875 if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
879 return;
880 }
881 }
882 }
883}
884
885impl Stream for TreeDiffStreamImpl<'_> {
886 type Item = TreeDiffEntry;
887
888 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
889 self.poll_tree_futures(cx);
891
892 if let Some((path, _)) = self.items.first_key_value() {
894 if let Some((dir, _)) = self.pending_trees.first_key_value()
897 && dir < path
898 {
899 return Poll::Pending;
900 }
901
902 let (path, values) = self.items.pop_first().unwrap();
903 Poll::Ready(Some(TreeDiffEntry { path, values }))
904 } else if self.pending_trees.is_empty() {
905 Poll::Ready(None)
906 } else {
907 Poll::Pending
908 }
909 }
910}
911
912fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
913 Box::pin(stream.filter_map(|mut entry| async move {
914 let skip_tree = |merge: MergedTreeValue| {
915 if merge.is_tree() {
916 Merge::absent()
917 } else {
918 merge
919 }
920 };
921 entry.values = entry.values.map(|diff| diff.map(skip_tree));
922
923 let any_present = entry.values.as_ref().map_or(true, |diff| {
925 diff.before.is_present() || diff.after.is_present()
926 });
927 any_present.then_some(entry)
928 }))
929}
930
931struct DiffStreamForFileSystem<'a> {
934 inner: TreeDiffStream<'a>,
935 next_item: Option<TreeDiffEntry>,
936 held_file: Option<TreeDiffEntry>,
937}
938
939impl<'a> DiffStreamForFileSystem<'a> {
940 fn new(inner: TreeDiffStream<'a>) -> Self {
941 Self {
942 inner,
943 next_item: None,
944 held_file: None,
945 }
946 }
947}
948
949impl Stream for DiffStreamForFileSystem<'_> {
950 type Item = TreeDiffEntry;
951
952 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
953 while let Some(next) = match self.next_item.take() {
954 Some(next) => Some(next),
955 None => ready!(self.inner.as_mut().poll_next(cx)),
956 } {
957 if let Ok(diff) = &next.values
960 && !diff.before.is_file_like()
961 && !diff.after.is_file_like()
962 {
963 continue;
964 }
965
966 if let Some(held_entry) = self
970 .held_file
971 .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
972 {
973 self.next_item = Some(next);
974 return Poll::Ready(Some(held_entry));
975 }
976
977 match next.values {
978 Ok(diff) if diff.before.is_tree() => {
979 assert!(diff.after.is_present());
980 assert!(self.held_file.is_none());
981 self.held_file = Some(TreeDiffEntry {
982 path: next.path,
983 values: Ok(Diff::new(Merge::absent(), diff.after)),
984 });
985 }
986 Ok(diff) if diff.after.is_tree() => {
987 assert!(diff.before.is_present());
988 return Poll::Ready(Some(TreeDiffEntry {
989 path: next.path,
990 values: Ok(Diff::new(diff.before, Merge::absent())),
991 }));
992 }
993 _ => {
994 return Poll::Ready(Some(next));
995 }
996 }
997 }
998 Poll::Ready(self.held_file.take())
999 }
1000}