1use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::TreeId;
39use crate::backend::TreeValue;
40use crate::conflict_labels::ConflictLabels;
41use crate::copies::CopiesTreeDiffEntry;
42use crate::copies::CopiesTreeDiffStream;
43use crate::copies::CopyRecords;
44use crate::matchers::EverythingMatcher;
45use crate::matchers::Matcher;
46use crate::merge::Diff;
47use crate::merge::Merge;
48use crate::merge::MergeBuilder;
49use crate::merge::MergedTreeVal;
50use crate::merge::MergedTreeValue;
51use crate::repo_path::RepoPath;
52use crate::repo_path::RepoPathBuf;
53use crate::repo_path::RepoPathComponent;
54use crate::store::Store;
55use crate::tree::Tree;
56use crate::tree_merge::merge_trees;
57
58#[derive(Clone)]
61pub struct MergedTree {
62 store: Arc<Store>,
63 tree_ids: Merge<TreeId>,
64 labels: ConflictLabels,
65}
66
67impl fmt::Debug for MergedTree {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct("MergedTree")
70 .field("tree_ids", &self.tree_ids)
71 .field("labels", &self.labels)
72 .finish_non_exhaustive()
73 }
74}
75
76impl MergedTree {
77 pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
79 Self {
80 store,
81 tree_ids: Merge::resolved(tree_id),
82 labels: ConflictLabels::unlabeled(),
83 }
84 }
85
86 pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
88 if let Some(num_sides) = labels.num_sides() {
89 assert_eq!(tree_ids.num_sides(), num_sides);
90 }
91 Self {
92 store,
93 tree_ids,
94 labels,
95 }
96 }
97
98 pub fn store(&self) -> &Arc<Store> {
100 &self.store
101 }
102
103 pub fn tree_ids(&self) -> &Merge<TreeId> {
106 &self.tree_ids
107 }
108
109 pub fn into_tree_ids(self) -> Merge<TreeId> {
112 self.tree_ids
113 }
114
115 pub fn labels(&self) -> &ConflictLabels {
117 &self.labels
118 }
119
120 pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
124 (&self.tree_ids, &self.labels)
125 }
126
127 pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
129 (self.tree_ids, self.labels)
130 }
131
132 pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
134 self.tree_ids
135 .try_map_async(|id| self.store.get_tree_async(RepoPathBuf::root(), id))
136 .await
137 }
138
139 pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
143 if self.tree_ids.is_resolved() {
144 assert!(!self.labels.has_labels());
145 Merge::resolved(label)
146 } else if self.labels.has_labels() {
147 let labels = self.labels.as_merge();
151 assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
152 labels.map(|label| label.as_str())
153 } else {
154 Merge::repeated("", self.tree_ids.num_sides())
159 }
160 }
161
162 pub async fn resolve(self) -> BackendResult<Self> {
165 let merged = merge_trees(&self.store, self.tree_ids).await?;
166 let (simplified_labels, simplified) = if merged.is_resolved() {
171 (ConflictLabels::unlabeled(), merged)
172 } else {
173 self.labels.simplify_with(&merged)
174 };
175 if cfg!(debug_assertions) {
179 let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
180 debug_assert_eq!(re_merged, simplified);
181 }
182 Ok(Self {
183 store: self.store,
184 tree_ids: simplified,
185 labels: simplified_labels,
186 })
187 }
188
189 pub fn conflicts(
194 &self,
195 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
196 self.conflicts_matching(&EverythingMatcher)
197 }
198
199 pub fn conflicts_matching<'matcher>(
201 &self,
202 matcher: &'matcher dyn Matcher,
203 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
204 ConflictIterator::new(self, matcher)
205 }
206
207 pub fn has_conflict(&self) -> bool {
209 !self.tree_ids.is_resolved()
210 }
211
212 pub fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
216 self.path_value_async(path).block_on()
217 }
218
219 pub async fn path_value_async(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
221 match path.split() {
222 Some((dir, basename)) => {
223 let trees = self.trees().await?;
224 match trees.sub_tree_recursive(dir).await? {
225 None => Ok(Merge::absent()),
226 Some(tree) => Ok(tree.value(basename).cloned()),
227 }
228 }
229 None => Ok(self.to_merged_tree_value()),
230 }
231 }
232
233 fn to_merged_tree_value(&self) -> MergedTreeValue {
234 self.tree_ids
235 .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
236 }
237
238 pub fn entries(&self) -> TreeEntriesIterator<'static> {
248 self.entries_matching(&EverythingMatcher)
249 }
250
251 pub fn entries_matching<'matcher>(
253 &self,
254 matcher: &'matcher dyn Matcher,
255 ) -> TreeEntriesIterator<'matcher> {
256 TreeEntriesIterator::new(self, matcher)
257 }
258
259 fn diff_stream_internal<'matcher>(
261 &self,
262 other: &Self,
263 matcher: &'matcher dyn Matcher,
264 ) -> TreeDiffStream<'matcher> {
265 let concurrency = self.store().concurrency();
266 if concurrency <= 1 {
267 Box::pin(futures::stream::iter(TreeDiffIterator::new(
268 self, other, matcher,
269 )))
270 } else {
271 Box::pin(TreeDiffStreamImpl::new(self, other, matcher, concurrency))
272 }
273 }
274
275 pub fn diff_stream<'matcher>(
277 &self,
278 other: &Self,
279 matcher: &'matcher dyn Matcher,
280 ) -> TreeDiffStream<'matcher> {
281 stream_without_trees(self.diff_stream_internal(other, matcher))
282 }
283
284 pub fn diff_stream_with_trees<'matcher>(
286 &self,
287 other: &Self,
288 matcher: &'matcher dyn Matcher,
289 ) -> TreeDiffStream<'matcher> {
290 self.diff_stream_internal(other, matcher)
291 }
292
293 pub fn diff_stream_for_file_system<'matcher>(
296 &self,
297 other: &Self,
298 matcher: &'matcher dyn Matcher,
299 ) -> TreeDiffStream<'matcher> {
300 Box::pin(DiffStreamForFileSystem::new(
301 self.diff_stream_internal(other, matcher),
302 ))
303 }
304
305 pub fn diff_stream_with_copies<'a>(
307 &self,
308 other: &Self,
309 matcher: &'a dyn Matcher,
310 copy_records: &'a CopyRecords,
311 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
312 let stream = self.diff_stream(other, matcher);
313 Box::pin(CopiesTreeDiffStream::new(
314 stream,
315 self.clone(),
316 other.clone(),
317 copy_records,
318 ))
319 }
320
321 pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
327 Self::merge_no_resolve(merge).resolve().await
328 }
329
330 pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
333 debug_assert!(
334 merge
335 .iter()
336 .map(|(tree, _)| Arc::as_ptr(tree.store()))
337 .all_equal()
338 );
339 let store = merge.first().0.store().clone();
340 let flattened_labels = ConflictLabels::from_merge(
341 merge
342 .map(|(tree, label)| tree.labels_by_term(label))
343 .flatten()
344 .map(|&label| label.to_owned()),
345 );
346 let flattened_tree_ids: Merge<TreeId> = merge
347 .into_iter()
348 .map(|(tree, _label)| tree.into_tree_ids())
349 .collect::<MergeBuilder<_>>()
350 .build()
351 .flatten();
352
353 let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
354 Self::new(store, tree_ids, labels)
355 }
356}
357
358#[derive(Debug)]
360pub struct TreeDiffEntry {
361 pub path: RepoPathBuf,
363 pub values: BackendResult<Diff<MergedTreeValue>>,
365}
366
367pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
371
372fn all_tree_entries(
373 trees: &Merge<Tree>,
374) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
375 if let Some(tree) = trees.as_resolved() {
376 let iter = tree
377 .entries_non_recursive()
378 .map(|entry| (entry.name(), Merge::normal(entry.value())));
379 Either::Left(iter)
380 } else {
381 let same_change = trees.first().store().merge_options().same_change;
382 let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
383 let values = match values.resolve_trivial(same_change) {
385 Some(resolved) => Merge::resolved(*resolved),
386 None => values,
387 };
388 (name, values)
389 });
390 Either::Right(iter)
391 }
392}
393
394pub fn all_merged_tree_entries(
398 trees: &Merge<Tree>,
399) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
400 let mut entries_iters = trees
401 .iter()
402 .map(|tree| tree.entries_non_recursive().peekable())
403 .collect_vec();
404 iter::from_fn(move || {
405 let next_name = entries_iters
406 .iter_mut()
407 .filter_map(|iter| iter.peek())
408 .map(|entry| entry.name())
409 .min()?;
410 let values: MergeBuilder<_> = entries_iters
411 .iter_mut()
412 .map(|iter| {
413 let entry = iter.next_if(|entry| entry.name() == next_name)?;
414 Some(entry.value())
415 })
416 .collect();
417 Some((next_name, values.build()))
418 })
419}
420
421fn merged_tree_entry_diff<'a>(
422 trees1: &'a Merge<Tree>,
423 trees2: &'a Merge<Tree>,
424) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
425 itertools::merge_join_by(
426 all_tree_entries(trees1),
427 all_tree_entries(trees2),
428 |(name1, _), (name2, _)| name1.cmp(name2),
429 )
430 .map(|entry| match entry {
431 EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
432 EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
433 EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
434 })
435 .filter(|(_, diff)| diff.is_changed())
436}
437
438pub struct TreeEntriesIterator<'matcher> {
440 store: Arc<Store>,
441 stack: Vec<TreeEntriesDirItem>,
442 matcher: &'matcher dyn Matcher,
443}
444
445struct TreeEntriesDirItem {
446 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
447}
448
449impl TreeEntriesDirItem {
450 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
451 let mut entries = vec![];
452 let dir = trees.first().dir();
453 for (name, value) in all_tree_entries(trees) {
454 let path = dir.join(name);
455 if value.is_tree() {
456 if matcher.visit(&path).is_nothing() {
458 continue;
459 }
460 } else if !matcher.matches(&path) {
461 continue;
462 }
463 entries.push((path, value.cloned()));
464 }
465 entries.reverse();
466 Self { entries }
467 }
468}
469
470impl<'matcher> TreeEntriesIterator<'matcher> {
471 fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
472 Self {
473 store: trees.store.clone(),
474 stack: vec![TreeEntriesDirItem {
475 entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
476 }],
477 matcher,
478 }
479 }
480}
481
482impl Iterator for TreeEntriesIterator<'_> {
483 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
484
485 fn next(&mut self) -> Option<Self::Item> {
486 while let Some(top) = self.stack.last_mut() {
487 if let Some((path, value)) = top.entries.pop() {
488 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
489 Ok(maybe_trees) => maybe_trees,
490 Err(err) => return Some((path, Err(err))),
491 };
492 if let Some(trees) = maybe_trees {
493 self.stack
494 .push(TreeEntriesDirItem::new(&trees, self.matcher));
495 } else {
496 return Some((path, Ok(value)));
497 }
498 } else {
499 self.stack.pop();
500 }
501 }
502 None
503 }
504}
505
506struct ConflictsDirItem {
509 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
510}
511
512impl ConflictsDirItem {
513 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
514 if trees.is_resolved() {
515 return Self { entries: vec![] };
516 }
517
518 let dir = trees.first().dir();
519 let mut entries = vec![];
520 for (basename, value) in all_tree_entries(trees) {
521 if value.is_resolved() {
522 continue;
523 }
524 let path = dir.join(basename);
525 if value.is_tree() {
526 if matcher.visit(&path).is_nothing() {
527 continue;
528 }
529 } else if !matcher.matches(&path) {
530 continue;
531 }
532 entries.push((path, value.cloned()));
533 }
534 entries.reverse();
535 Self { entries }
536 }
537}
538
539struct ConflictIterator<'matcher> {
540 store: Arc<Store>,
541 stack: Vec<ConflictsDirItem>,
542 matcher: &'matcher dyn Matcher,
543}
544
545impl<'matcher> ConflictIterator<'matcher> {
546 fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
547 Self {
548 store: tree.store().clone(),
549 stack: vec![ConflictsDirItem {
550 entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
551 }],
552 matcher,
553 }
554 }
555}
556
557impl Iterator for ConflictIterator<'_> {
558 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
559
560 fn next(&mut self) -> Option<Self::Item> {
561 while let Some(top) = self.stack.last_mut() {
562 if let Some((path, tree_values)) = top.entries.pop() {
563 match tree_values.to_tree_merge(&self.store, &path).block_on() {
564 Ok(Some(trees)) => {
565 self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
567 }
568 Ok(None) => {
569 return Some((path, Ok(tree_values)));
573 }
574 Err(err) => {
575 return Some((path, Err(err)));
576 }
577 }
578 } else {
579 self.stack.pop();
580 }
581 }
582 None
583 }
584}
585
586pub struct TreeDiffIterator<'matcher> {
588 store: Arc<Store>,
589 stack: Vec<TreeDiffDir>,
590 matcher: &'matcher dyn Matcher,
591}
592
593struct TreeDiffDir {
594 entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
595}
596
597impl<'matcher> TreeDiffIterator<'matcher> {
598 pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
600 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
601 let root_dir = RepoPath::root();
602 let mut stack = Vec::new();
603 let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
604 if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
605 stack.push(TreeDiffDir {
606 entries: vec![(root_dir.to_owned(), root_diff)],
607 });
608 }
609 Self {
610 store: tree1.store().clone(),
611 stack,
612 matcher,
613 }
614 }
615
616 fn trees(
618 store: &Arc<Store>,
619 dir: &RepoPath,
620 values: &MergedTreeValue,
621 ) -> BackendResult<Merge<Tree>> {
622 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
623 Ok(trees)
624 } else {
625 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
626 }
627 }
628}
629
630impl TreeDiffDir {
631 fn from_trees(
632 dir: &RepoPath,
633 trees1: &Merge<Tree>,
634 trees2: &Merge<Tree>,
635 matcher: &dyn Matcher,
636 ) -> Self {
637 let mut entries = vec![];
638 for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
639 let path = dir.join(name);
640 let tree_before = diff.before.is_tree();
641 let tree_after = diff.after.is_tree();
642 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
645 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
646
647 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
649 diff.before
650 } else {
651 Merge::absent()
652 };
653 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
654 diff.after
655 } else {
656 Merge::absent()
657 };
658 if before.is_absent() && after.is_absent() {
659 continue;
660 }
661 entries.push((path, Diff::new(before.cloned(), after.cloned())));
662 }
663 entries.reverse();
664 Self { entries }
665 }
666}
667
668impl Iterator for TreeDiffIterator<'_> {
669 type Item = TreeDiffEntry;
670
671 fn next(&mut self) -> Option<Self::Item> {
672 while let Some(top) = self.stack.last_mut() {
673 let (path, diff) = match top.entries.pop() {
674 Some(entry) => entry,
675 None => {
676 self.stack.pop().unwrap();
677 continue;
678 }
679 };
680
681 if diff.before.is_tree() || diff.after.is_tree() {
682 let (before_tree, after_tree) = match (
683 Self::trees(&self.store, &path, &diff.before),
684 Self::trees(&self.store, &path, &diff.after),
685 ) {
686 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
687 (Err(before_err), _) => {
688 return Some(TreeDiffEntry {
689 path,
690 values: Err(before_err),
691 });
692 }
693 (_, Err(after_err)) => {
694 return Some(TreeDiffEntry {
695 path,
696 values: Err(after_err),
697 });
698 }
699 };
700 let subdir =
701 TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
702 self.stack.push(subdir);
703 }
704 if diff.before.is_file_like()
705 || diff.after.is_file_like()
706 || self.matcher.matches(&path)
707 {
708 return Some(TreeDiffEntry {
709 path,
710 values: Ok(diff),
711 });
712 }
713 }
714 None
715 }
716}
717
718pub struct TreeDiffStreamImpl<'matcher> {
720 store: Arc<Store>,
721 matcher: &'matcher dyn Matcher,
722 items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
727 #[expect(clippy::type_complexity)]
729 pending_trees:
730 BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
731 max_concurrent_reads: usize,
736 max_queued_items: usize,
742}
743
744impl<'matcher> TreeDiffStreamImpl<'matcher> {
745 pub fn new(
748 tree1: &MergedTree,
749 tree2: &MergedTree,
750 matcher: &'matcher dyn Matcher,
751 max_concurrent_reads: usize,
752 ) -> Self {
753 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
754 let store = tree1.store().clone();
755 let mut stream = Self {
756 store: store.clone(),
757 matcher,
758 items: BTreeMap::new(),
759 pending_trees: BTreeMap::new(),
760 max_concurrent_reads,
761 max_queued_items: 10000,
762 };
763 let dir = RepoPathBuf::root();
764 let merged_tree1 = tree1.to_merged_tree_value();
765 let merged_tree2 = tree2.to_merged_tree_value();
766 let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
767 if root_diff.is_changed() && matcher.matches(&dir) {
768 stream.items.insert(dir.clone(), Ok(root_diff));
769 }
770 let root_tree_fut = Box::pin(try_join(
771 Self::trees(store.clone(), dir.clone(), merged_tree1),
772 Self::trees(store, dir.clone(), merged_tree2),
773 ));
774 stream.pending_trees.insert(dir, root_tree_fut);
775 stream
776 }
777
778 async fn single_tree(
779 store: &Arc<Store>,
780 dir: RepoPathBuf,
781 value: Option<&TreeValue>,
782 ) -> BackendResult<Tree> {
783 match value {
784 Some(TreeValue::Tree(tree_id)) => store.get_tree_async(dir, tree_id).await,
785 _ => Ok(Tree::empty(store.clone(), dir.clone())),
786 }
787 }
788
789 async fn trees(
791 store: Arc<Store>,
792 dir: RepoPathBuf,
793 values: MergedTreeValue,
794 ) -> BackendResult<Merge<Tree>> {
795 if values.is_tree() {
796 values
797 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
798 .await
799 } else {
800 Ok(Merge::resolved(Tree::empty(store, dir)))
801 }
802 }
803
804 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
805 for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
806 let path = dir.join(basename);
807 let tree_before = diff.before.is_tree();
808 let tree_after = diff.after.is_tree();
809 let tree_matches =
812 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
813 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
814
815 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
817 diff.before
818 } else {
819 Merge::absent()
820 };
821 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
822 diff.after
823 } else {
824 Merge::absent()
825 };
826 if before.is_absent() && after.is_absent() {
827 continue;
828 }
829
830 if tree_matches {
832 let before_tree_future =
833 Self::trees(self.store.clone(), path.clone(), before.cloned());
834 let after_tree_future =
835 Self::trees(self.store.clone(), path.clone(), after.cloned());
836 let both_trees_future = try_join(before_tree_future, after_tree_future);
837 self.pending_trees
838 .insert(path.clone(), Box::pin(both_trees_future));
839 }
840
841 if file_matches || self.matcher.matches(&path) {
842 self.items
843 .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
844 }
845 }
846 }
847
848 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
849 loop {
850 let mut tree_diffs = vec![];
851 let mut some_pending = false;
852 let mut all_pending = true;
853 for (dir, future) in self
854 .pending_trees
855 .iter_mut()
856 .take(self.max_concurrent_reads)
857 {
858 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
859 all_pending = false;
860 tree_diffs.push((dir.clone(), tree_diff));
861 } else {
862 some_pending = true;
863 }
864 }
865
866 for (dir, tree_diff) in tree_diffs {
867 drop(self.pending_trees.remove_entry(&dir).unwrap());
868 match tree_diff {
869 Ok((trees1, trees2)) => {
870 self.add_dir_diff_items(&dir, &trees1, &trees2);
871 }
872 Err(err) => {
873 self.items.insert(dir, Err(err));
874 }
875 }
876 }
877
878 if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
882 return;
883 }
884 }
885 }
886}
887
888impl Stream for TreeDiffStreamImpl<'_> {
889 type Item = TreeDiffEntry;
890
891 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
892 self.poll_tree_futures(cx);
894
895 if let Some((path, _)) = self.items.first_key_value() {
897 if let Some((dir, _)) = self.pending_trees.first_key_value()
900 && dir < path
901 {
902 return Poll::Pending;
903 }
904
905 let (path, values) = self.items.pop_first().unwrap();
906 Poll::Ready(Some(TreeDiffEntry { path, values }))
907 } else if self.pending_trees.is_empty() {
908 Poll::Ready(None)
909 } else {
910 Poll::Pending
911 }
912 }
913}
914
915fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
916 Box::pin(stream.filter_map(|mut entry| async move {
917 let skip_tree = |merge: MergedTreeValue| {
918 if merge.is_tree() {
919 Merge::absent()
920 } else {
921 merge
922 }
923 };
924 entry.values = entry.values.map(|diff| diff.map(skip_tree));
925
926 let any_present = entry.values.as_ref().map_or(true, |diff| {
928 diff.before.is_present() || diff.after.is_present()
929 });
930 any_present.then_some(entry)
931 }))
932}
933
934struct DiffStreamForFileSystem<'a> {
937 inner: TreeDiffStream<'a>,
938 next_item: Option<TreeDiffEntry>,
939 held_file: Option<TreeDiffEntry>,
940}
941
942impl<'a> DiffStreamForFileSystem<'a> {
943 fn new(inner: TreeDiffStream<'a>) -> Self {
944 Self {
945 inner,
946 next_item: None,
947 held_file: None,
948 }
949 }
950}
951
952impl Stream for DiffStreamForFileSystem<'_> {
953 type Item = TreeDiffEntry;
954
955 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
956 while let Some(next) = match self.next_item.take() {
957 Some(next) => Some(next),
958 None => ready!(self.inner.as_mut().poll_next(cx)),
959 } {
960 if let Ok(diff) = &next.values
963 && !diff.before.is_file_like()
964 && !diff.after.is_file_like()
965 {
966 continue;
967 }
968
969 if let Some(held_entry) = self
973 .held_file
974 .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
975 {
976 self.next_item = Some(next);
977 return Poll::Ready(Some(held_entry));
978 }
979
980 match next.values {
981 Ok(diff) if diff.before.is_tree() => {
982 assert!(diff.after.is_present());
983 assert!(self.held_file.is_none());
984 self.held_file = Some(TreeDiffEntry {
985 path: next.path,
986 values: Ok(Diff::new(Merge::absent(), diff.after)),
987 });
988 }
989 Ok(diff) if diff.after.is_tree() => {
990 assert!(diff.before.is_present());
991 return Poll::Ready(Some(TreeDiffEntry {
992 path: next.path,
993 values: Ok(Diff::new(diff.before, Merge::absent())),
994 }));
995 }
996 _ => {
997 return Poll::Ready(Some(next));
998 }
999 }
1000 }
1001 Poll::Ready(self.held_file.take())
1002 }
1003}