1use std::collections::BTreeMap;
18use std::fmt;
19use std::iter;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24use std::task::ready;
25use std::vec;
26
27use either::Either;
28use futures::Stream;
29use futures::StreamExt as _;
30use futures::future::BoxFuture;
31use futures::future::try_join;
32use futures::stream::BoxStream;
33use itertools::EitherOrBoth;
34use itertools::Itertools as _;
35use pollster::FutureExt as _;
36
37use crate::backend::BackendResult;
38use crate::backend::CopyId;
39use crate::backend::TreeId;
40use crate::backend::TreeValue;
41use crate::conflict_labels::ConflictLabels;
42use crate::copies::CopiesTreeDiffEntry;
43use crate::copies::CopiesTreeDiffStream;
44use crate::copies::CopyHistoryDiffStream;
45use crate::copies::CopyHistoryTreeDiffEntry;
46use crate::copies::CopyRecords;
47use crate::matchers::EverythingMatcher;
48use crate::matchers::Matcher;
49use crate::merge::Diff;
50use crate::merge::Merge;
51use crate::merge::MergeBuilder;
52use crate::merge::MergedTreeVal;
53use crate::merge::MergedTreeValue;
54use crate::repo_path::RepoPath;
55use crate::repo_path::RepoPathBuf;
56use crate::repo_path::RepoPathComponent;
57use crate::store::Store;
58use crate::tree::Tree;
59use crate::tree_merge::merge_trees;
60
61#[derive(Clone)]
64pub struct MergedTree {
65 store: Arc<Store>,
66 tree_ids: Merge<TreeId>,
67 labels: ConflictLabels,
68}
69
70impl fmt::Debug for MergedTree {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 f.debug_struct("MergedTree")
73 .field("tree_ids", &self.tree_ids)
74 .field("labels", &self.labels)
75 .finish_non_exhaustive()
76 }
77}
78
79impl MergedTree {
80 pub fn resolved(store: Arc<Store>, tree_id: TreeId) -> Self {
82 Self {
83 store,
84 tree_ids: Merge::resolved(tree_id),
85 labels: ConflictLabels::unlabeled(),
86 }
87 }
88
89 pub fn new(store: Arc<Store>, tree_ids: Merge<TreeId>, labels: ConflictLabels) -> Self {
91 if let Some(num_sides) = labels.num_sides() {
92 assert_eq!(tree_ids.num_sides(), num_sides);
93 }
94 Self {
95 store,
96 tree_ids,
97 labels,
98 }
99 }
100
101 pub fn store(&self) -> &Arc<Store> {
103 &self.store
104 }
105
106 pub fn tree_ids(&self) -> &Merge<TreeId> {
109 &self.tree_ids
110 }
111
112 pub fn into_tree_ids(self) -> Merge<TreeId> {
115 self.tree_ids
116 }
117
118 pub fn labels(&self) -> &ConflictLabels {
120 &self.labels
121 }
122
123 pub fn tree_ids_and_labels(&self) -> (&Merge<TreeId>, &ConflictLabels) {
127 (&self.tree_ids, &self.labels)
128 }
129
130 pub fn into_tree_ids_and_labels(self) -> (Merge<TreeId>, ConflictLabels) {
132 (self.tree_ids, self.labels)
133 }
134
135 pub async fn trees(&self) -> BackendResult<Merge<Tree>> {
137 self.tree_ids
138 .try_map_async(|id| self.store.get_tree(RepoPathBuf::root(), id))
139 .await
140 }
141
142 pub fn labels_by_term<'a>(&'a self, label: &'a str) -> Merge<&'a str> {
146 if self.tree_ids.is_resolved() {
147 assert!(!self.labels.has_labels());
148 Merge::resolved(label)
149 } else if self.labels.has_labels() {
150 let labels = self.labels.as_merge();
154 assert_eq!(labels.num_sides(), self.tree_ids.num_sides());
155 labels.map(|label| label.as_str())
156 } else {
157 Merge::repeated("", self.tree_ids.num_sides())
162 }
163 }
164
165 pub async fn resolve(self) -> BackendResult<Self> {
168 let merged = merge_trees(&self.store, self.tree_ids).await?;
169 let (simplified_labels, simplified) = if merged.is_resolved() {
174 (ConflictLabels::unlabeled(), merged)
175 } else {
176 self.labels.simplify_with(&merged)
177 };
178 if cfg!(debug_assertions) {
182 let re_merged = merge_trees(&self.store, simplified.clone()).await.unwrap();
183 debug_assert_eq!(re_merged, simplified);
184 }
185 Ok(Self {
186 store: self.store,
187 tree_ids: simplified,
188 labels: simplified_labels,
189 })
190 }
191
192 pub fn conflicts(
197 &self,
198 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<> {
199 self.conflicts_matching(&EverythingMatcher)
200 }
201
202 pub fn conflicts_matching<'matcher>(
204 &self,
205 matcher: &'matcher dyn Matcher,
206 ) -> impl Iterator<Item = (RepoPathBuf, BackendResult<MergedTreeValue>)> + use<'matcher> {
207 ConflictIterator::new(self, matcher)
208 }
209
210 pub fn has_conflict(&self) -> bool {
212 !self.tree_ids.is_resolved()
213 }
214
215 pub async fn path_value(&self, path: &RepoPath) -> BackendResult<MergedTreeValue> {
219 match path.split() {
220 Some((dir, basename)) => {
221 let trees = self.trees().await?;
222 match trees.sub_tree_recursive(dir).await? {
223 None => Ok(Merge::absent()),
224 Some(tree) => Ok(tree.value(basename).cloned()),
225 }
226 }
227 None => Ok(self.to_merged_tree_value()),
228 }
229 }
230
231 pub async fn copy_value(&self, id: &CopyId) -> BackendResult<Option<TreeValue>> {
234 let copy = self.store().backend().read_copy(id).await?;
235 let merged_val = self.path_value(©.current_path).await?;
236 match merged_val.into_resolved() {
237 Ok(Some(val)) if val.copy_id() == Some(id) => Ok(Some(val)),
238 _ => Ok(None),
239 }
240 }
241
242 fn to_merged_tree_value(&self) -> MergedTreeValue {
243 self.tree_ids
244 .map(|tree_id| Some(TreeValue::Tree(tree_id.clone())))
245 }
246
247 pub fn entries(&self) -> TreeEntriesIterator<'static> {
257 self.entries_matching(&EverythingMatcher)
258 }
259
260 pub fn entries_matching<'matcher>(
262 &self,
263 matcher: &'matcher dyn Matcher,
264 ) -> TreeEntriesIterator<'matcher> {
265 TreeEntriesIterator::new(self, matcher)
266 }
267
268 fn diff_stream_internal<'matcher>(
270 &self,
271 other: &Self,
272 matcher: &'matcher dyn Matcher,
273 ) -> TreeDiffStream<'matcher> {
274 let concurrency = self.store().concurrency();
275 if concurrency <= 1 {
276 futures::stream::iter(TreeDiffIterator::new(self, other, matcher)).boxed()
277 } else {
278 TreeDiffStreamImpl::new(self, other, matcher, concurrency).boxed()
279 }
280 }
281
282 pub fn diff_stream<'matcher>(
284 &self,
285 other: &Self,
286 matcher: &'matcher dyn Matcher,
287 ) -> TreeDiffStream<'matcher> {
288 stream_without_trees(self.diff_stream_internal(other, matcher))
289 }
290
291 pub fn diff_stream_with_trees<'matcher>(
293 &self,
294 other: &Self,
295 matcher: &'matcher dyn Matcher,
296 ) -> TreeDiffStream<'matcher> {
297 self.diff_stream_internal(other, matcher)
298 }
299
300 pub fn diff_stream_for_file_system<'matcher>(
303 &self,
304 other: &Self,
305 matcher: &'matcher dyn Matcher,
306 ) -> TreeDiffStream<'matcher> {
307 DiffStreamForFileSystem::new(self.diff_stream_internal(other, matcher)).boxed()
308 }
309
310 pub fn diff_stream_with_copies<'a>(
312 &self,
313 other: &Self,
314 matcher: &'a dyn Matcher,
315 copy_records: &'a CopyRecords,
316 ) -> BoxStream<'a, CopiesTreeDiffEntry> {
317 let stream = self.diff_stream(other, matcher);
318 CopiesTreeDiffStream::new(stream, self.clone(), other.clone(), copy_records).boxed()
319 }
320
321 pub fn diff_stream_with_copy_history<'a>(
323 &'a self,
324 other: &'a Self,
325 matcher: &'a dyn Matcher,
326 ) -> BoxStream<'a, CopyHistoryTreeDiffEntry> {
327 let stream = self.diff_stream(other, matcher);
328 CopyHistoryDiffStream::new(stream, self, other).boxed()
329 }
330
331 pub async fn merge(merge: Merge<(Self, String)>) -> BackendResult<Self> {
337 Self::merge_no_resolve(merge).resolve().await
338 }
339
340 pub fn merge_no_resolve(merge: Merge<(Self, String)>) -> Self {
343 debug_assert!(
344 merge
345 .iter()
346 .map(|(tree, _)| Arc::as_ptr(tree.store()))
347 .all_equal()
348 );
349 let store = merge.first().0.store().clone();
350 let flattened_labels = ConflictLabels::from_merge(
351 merge
352 .map(|(tree, label)| tree.labels_by_term(label))
353 .flatten()
354 .map(|&label| label.to_owned()),
355 );
356 let flattened_tree_ids: Merge<TreeId> = merge
357 .into_map(|(tree, _label)| tree.into_tree_ids())
358 .flatten();
359
360 let (labels, tree_ids) = flattened_labels.simplify_with(&flattened_tree_ids);
361 Self::new(store, tree_ids, labels)
362 }
363}
364
365#[derive(Debug)]
367pub struct TreeDiffEntry {
368 pub path: RepoPathBuf,
370 pub values: BackendResult<Diff<MergedTreeValue>>,
372}
373
374pub type TreeDiffStream<'matcher> = BoxStream<'matcher, TreeDiffEntry>;
378
379fn all_tree_entries(
380 trees: &Merge<Tree>,
381) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
382 if let Some(tree) = trees.as_resolved() {
383 let iter = tree
384 .entries_non_recursive()
385 .map(|entry| (entry.name(), Merge::normal(entry.value())));
386 Either::Left(iter)
387 } else {
388 let same_change = trees.first().store().merge_options().same_change;
389 let iter = all_merged_tree_entries(trees).map(move |(name, values)| {
390 let values = match values.resolve_trivial(same_change) {
392 Some(resolved) => Merge::resolved(*resolved),
393 None => values,
394 };
395 (name, values)
396 });
397 Either::Right(iter)
398 }
399}
400
401pub fn all_merged_tree_entries(
405 trees: &Merge<Tree>,
406) -> impl Iterator<Item = (&RepoPathComponent, MergedTreeVal<'_>)> {
407 let mut entries_iters = trees
408 .iter()
409 .map(|tree| tree.entries_non_recursive().peekable())
410 .collect_vec();
411 iter::from_fn(move || {
412 let next_name = entries_iters
413 .iter_mut()
414 .filter_map(|iter| iter.peek())
415 .map(|entry| entry.name())
416 .min()?;
417 let values: MergeBuilder<_> = entries_iters
418 .iter_mut()
419 .map(|iter| {
420 let entry = iter.next_if(|entry| entry.name() == next_name)?;
421 Some(entry.value())
422 })
423 .collect();
424 Some((next_name, values.build()))
425 })
426}
427
428fn merged_tree_entry_diff<'a>(
429 trees1: &'a Merge<Tree>,
430 trees2: &'a Merge<Tree>,
431) -> impl Iterator<Item = (&'a RepoPathComponent, Diff<MergedTreeVal<'a>>)> {
432 itertools::merge_join_by(
433 all_tree_entries(trees1),
434 all_tree_entries(trees2),
435 |(name1, _), (name2, _)| name1.cmp(name2),
436 )
437 .map(|entry| match entry {
438 EitherOrBoth::Both((name, value1), (_, value2)) => (name, Diff::new(value1, value2)),
439 EitherOrBoth::Left((name, value1)) => (name, Diff::new(value1, Merge::absent())),
440 EitherOrBoth::Right((name, value2)) => (name, Diff::new(Merge::absent(), value2)),
441 })
442 .filter(|(_, diff)| diff.is_changed())
443}
444
445pub struct TreeEntriesIterator<'matcher> {
447 store: Arc<Store>,
448 stack: Vec<TreeEntriesDirItem>,
449 matcher: &'matcher dyn Matcher,
450}
451
452struct TreeEntriesDirItem {
453 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
454}
455
456impl TreeEntriesDirItem {
457 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
458 let mut entries = vec![];
459 let dir = trees.first().dir();
460 for (name, value) in all_tree_entries(trees) {
461 let path = dir.join(name);
462 if value.is_tree() {
463 if matcher.visit(&path).is_nothing() {
465 continue;
466 }
467 } else if !matcher.matches(&path) {
468 continue;
469 }
470 entries.push((path, value.cloned()));
471 }
472 entries.reverse();
473 Self { entries }
474 }
475}
476
477impl<'matcher> TreeEntriesIterator<'matcher> {
478 fn new(trees: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
479 Self {
480 store: trees.store.clone(),
481 stack: vec![TreeEntriesDirItem {
482 entries: vec![(RepoPathBuf::root(), trees.to_merged_tree_value())],
483 }],
484 matcher,
485 }
486 }
487}
488
489impl Iterator for TreeEntriesIterator<'_> {
490 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
491
492 fn next(&mut self) -> Option<Self::Item> {
493 while let Some(top) = self.stack.last_mut() {
494 if let Some((path, value)) = top.entries.pop() {
495 let maybe_trees = match value.to_tree_merge(&self.store, &path).block_on() {
496 Ok(maybe_trees) => maybe_trees,
497 Err(err) => return Some((path, Err(err))),
498 };
499 if let Some(trees) = maybe_trees {
500 self.stack
501 .push(TreeEntriesDirItem::new(&trees, self.matcher));
502 } else {
503 return Some((path, Ok(value)));
504 }
505 } else {
506 self.stack.pop();
507 }
508 }
509 None
510 }
511}
512
513struct ConflictsDirItem {
516 entries: Vec<(RepoPathBuf, MergedTreeValue)>,
517}
518
519impl ConflictsDirItem {
520 fn new(trees: &Merge<Tree>, matcher: &dyn Matcher) -> Self {
521 if trees.is_resolved() {
522 return Self { entries: vec![] };
523 }
524
525 let dir = trees.first().dir();
526 let mut entries = vec![];
527 for (basename, value) in all_tree_entries(trees) {
528 if value.is_resolved() {
529 continue;
530 }
531 let path = dir.join(basename);
532 if value.is_tree() {
533 if matcher.visit(&path).is_nothing() {
534 continue;
535 }
536 } else if !matcher.matches(&path) {
537 continue;
538 }
539 entries.push((path, value.cloned()));
540 }
541 entries.reverse();
542 Self { entries }
543 }
544}
545
546struct ConflictIterator<'matcher> {
547 store: Arc<Store>,
548 stack: Vec<ConflictsDirItem>,
549 matcher: &'matcher dyn Matcher,
550}
551
552impl<'matcher> ConflictIterator<'matcher> {
553 fn new(tree: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
554 Self {
555 store: tree.store().clone(),
556 stack: vec![ConflictsDirItem {
557 entries: vec![(RepoPathBuf::root(), tree.to_merged_tree_value())],
558 }],
559 matcher,
560 }
561 }
562}
563
564impl Iterator for ConflictIterator<'_> {
565 type Item = (RepoPathBuf, BackendResult<MergedTreeValue>);
566
567 fn next(&mut self) -> Option<Self::Item> {
568 while let Some(top) = self.stack.last_mut() {
569 if let Some((path, tree_values)) = top.entries.pop() {
570 match tree_values.to_tree_merge(&self.store, &path).block_on() {
571 Ok(Some(trees)) => {
572 self.stack.push(ConflictsDirItem::new(&trees, self.matcher));
574 }
575 Ok(None) => {
576 return Some((path, Ok(tree_values)));
580 }
581 Err(err) => {
582 return Some((path, Err(err)));
583 }
584 }
585 } else {
586 self.stack.pop();
587 }
588 }
589 None
590 }
591}
592
593pub struct TreeDiffIterator<'matcher> {
595 store: Arc<Store>,
596 stack: Vec<TreeDiffDir>,
597 matcher: &'matcher dyn Matcher,
598}
599
600struct TreeDiffDir {
601 entries: Vec<(RepoPathBuf, Diff<MergedTreeValue>)>,
602}
603
604impl<'matcher> TreeDiffIterator<'matcher> {
605 pub fn new(tree1: &MergedTree, tree2: &MergedTree, matcher: &'matcher dyn Matcher) -> Self {
607 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
608 let root_dir = RepoPath::root();
609 let mut stack = Vec::new();
610 let root_diff = Diff::new(tree1.to_merged_tree_value(), tree2.to_merged_tree_value());
611 if root_diff.is_changed() && !matcher.visit(root_dir).is_nothing() {
612 stack.push(TreeDiffDir {
613 entries: vec![(root_dir.to_owned(), root_diff)],
614 });
615 }
616 Self {
617 store: tree1.store().clone(),
618 stack,
619 matcher,
620 }
621 }
622
623 fn trees(
625 store: &Arc<Store>,
626 dir: &RepoPath,
627 values: &MergedTreeValue,
628 ) -> BackendResult<Merge<Tree>> {
629 if let Some(trees) = values.to_tree_merge(store, dir).block_on()? {
630 Ok(trees)
631 } else {
632 Ok(Merge::resolved(Tree::empty(store.clone(), dir.to_owned())))
633 }
634 }
635}
636
637impl TreeDiffDir {
638 fn from_trees(
639 dir: &RepoPath,
640 trees1: &Merge<Tree>,
641 trees2: &Merge<Tree>,
642 matcher: &dyn Matcher,
643 ) -> Self {
644 let mut entries = vec![];
645 for (name, diff) in merged_tree_entry_diff(trees1, trees2) {
646 let path = dir.join(name);
647 let tree_before = diff.before.is_tree();
648 let tree_after = diff.after.is_tree();
649 let tree_matches = (tree_before || tree_after) && !matcher.visit(&path).is_nothing();
652 let file_matches = (!tree_before || !tree_after) && matcher.matches(&path);
653
654 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
656 diff.before
657 } else {
658 Merge::absent()
659 };
660 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
661 diff.after
662 } else {
663 Merge::absent()
664 };
665 if before.is_absent() && after.is_absent() {
666 continue;
667 }
668 entries.push((path, Diff::new(before.cloned(), after.cloned())));
669 }
670 entries.reverse();
671 Self { entries }
672 }
673}
674
675impl Iterator for TreeDiffIterator<'_> {
676 type Item = TreeDiffEntry;
677
678 fn next(&mut self) -> Option<Self::Item> {
679 while let Some(top) = self.stack.last_mut() {
680 let Some((path, diff)) = top.entries.pop() else {
681 self.stack.pop().unwrap();
682 continue;
683 };
684
685 if diff.before.is_tree() || diff.after.is_tree() {
686 let (before_tree, after_tree) = match (
687 Self::trees(&self.store, &path, &diff.before),
688 Self::trees(&self.store, &path, &diff.after),
689 ) {
690 (Ok(before_tree), Ok(after_tree)) => (before_tree, after_tree),
691 (Err(before_err), _) => {
692 return Some(TreeDiffEntry {
693 path,
694 values: Err(before_err),
695 });
696 }
697 (_, Err(after_err)) => {
698 return Some(TreeDiffEntry {
699 path,
700 values: Err(after_err),
701 });
702 }
703 };
704 let subdir =
705 TreeDiffDir::from_trees(&path, &before_tree, &after_tree, self.matcher);
706 self.stack.push(subdir);
707 }
708 if diff.before.is_file_like()
709 || diff.after.is_file_like()
710 || self.matcher.matches(&path)
711 {
712 return Some(TreeDiffEntry {
713 path,
714 values: Ok(diff),
715 });
716 }
717 }
718 None
719 }
720}
721
722pub struct TreeDiffStreamImpl<'matcher> {
724 store: Arc<Store>,
725 matcher: &'matcher dyn Matcher,
726 items: BTreeMap<RepoPathBuf, BackendResult<Diff<MergedTreeValue>>>,
731 #[expect(clippy::type_complexity)]
733 pending_trees:
734 BTreeMap<RepoPathBuf, BoxFuture<'matcher, BackendResult<(Merge<Tree>, Merge<Tree>)>>>,
735 max_concurrent_reads: usize,
740 max_queued_items: usize,
746}
747
748impl<'matcher> TreeDiffStreamImpl<'matcher> {
749 pub fn new(
752 tree1: &MergedTree,
753 tree2: &MergedTree,
754 matcher: &'matcher dyn Matcher,
755 max_concurrent_reads: usize,
756 ) -> Self {
757 assert!(Arc::ptr_eq(tree1.store(), tree2.store()));
758 let store = tree1.store().clone();
759 let mut stream = Self {
760 store: store.clone(),
761 matcher,
762 items: BTreeMap::new(),
763 pending_trees: BTreeMap::new(),
764 max_concurrent_reads,
765 max_queued_items: 10000,
766 };
767 let dir = RepoPathBuf::root();
768 let merged_tree1 = tree1.to_merged_tree_value();
769 let merged_tree2 = tree2.to_merged_tree_value();
770 let root_diff = Diff::new(merged_tree1.clone(), merged_tree2.clone());
771 if root_diff.is_changed() && matcher.matches(&dir) {
772 stream.items.insert(dir.clone(), Ok(root_diff));
773 }
774 let root_tree_fut = Box::pin(try_join(
775 Self::trees(store.clone(), dir.clone(), merged_tree1),
776 Self::trees(store, dir.clone(), merged_tree2),
777 ));
778 stream.pending_trees.insert(dir, root_tree_fut);
779 stream
780 }
781
782 async fn single_tree(
783 store: &Arc<Store>,
784 dir: RepoPathBuf,
785 value: Option<&TreeValue>,
786 ) -> BackendResult<Tree> {
787 match value {
788 Some(TreeValue::Tree(tree_id)) => store.get_tree(dir, tree_id).await,
789 _ => Ok(Tree::empty(store.clone(), dir.clone())),
790 }
791 }
792
793 async fn trees(
795 store: Arc<Store>,
796 dir: RepoPathBuf,
797 values: MergedTreeValue,
798 ) -> BackendResult<Merge<Tree>> {
799 if values.is_tree() {
800 values
801 .try_map_async(|value| Self::single_tree(&store, dir.clone(), value.as_ref()))
802 .await
803 } else {
804 Ok(Merge::resolved(Tree::empty(store, dir)))
805 }
806 }
807
808 fn add_dir_diff_items(&mut self, dir: &RepoPath, trees1: &Merge<Tree>, trees2: &Merge<Tree>) {
809 for (basename, diff) in merged_tree_entry_diff(trees1, trees2) {
810 let path = dir.join(basename);
811 let tree_before = diff.before.is_tree();
812 let tree_after = diff.after.is_tree();
813 let tree_matches =
816 (tree_before || tree_after) && !self.matcher.visit(&path).is_nothing();
817 let file_matches = (!tree_before || !tree_after) && self.matcher.matches(&path);
818
819 let before = if (tree_before && tree_matches) || (!tree_before && file_matches) {
821 diff.before
822 } else {
823 Merge::absent()
824 };
825 let after = if (tree_after && tree_matches) || (!tree_after && file_matches) {
826 diff.after
827 } else {
828 Merge::absent()
829 };
830 if before.is_absent() && after.is_absent() {
831 continue;
832 }
833
834 if tree_matches {
836 let before_tree_future =
837 Self::trees(self.store.clone(), path.clone(), before.cloned());
838 let after_tree_future =
839 Self::trees(self.store.clone(), path.clone(), after.cloned());
840 let both_trees_future = try_join(before_tree_future, after_tree_future);
841 self.pending_trees
842 .insert(path.clone(), Box::pin(both_trees_future));
843 }
844
845 if file_matches || self.matcher.matches(&path) {
846 self.items
847 .insert(path, Ok(Diff::new(before.cloned(), after.cloned())));
848 }
849 }
850 }
851
852 fn poll_tree_futures(&mut self, cx: &mut Context<'_>) {
853 loop {
854 let mut tree_diffs = vec![];
855 let mut some_pending = false;
856 let mut all_pending = true;
857 for (dir, future) in self
858 .pending_trees
859 .iter_mut()
860 .take(self.max_concurrent_reads)
861 {
862 if let Poll::Ready(tree_diff) = future.as_mut().poll(cx) {
863 all_pending = false;
864 tree_diffs.push((dir.clone(), tree_diff));
865 } else {
866 some_pending = true;
867 }
868 }
869
870 for (dir, tree_diff) in tree_diffs {
871 drop(self.pending_trees.remove_entry(&dir).unwrap());
872 match tree_diff {
873 Ok((trees1, trees2)) => {
874 self.add_dir_diff_items(&dir, &trees1, &trees2);
875 }
876 Err(err) => {
877 self.items.insert(dir, Err(err));
878 }
879 }
880 }
881
882 if all_pending || (some_pending && self.items.len() >= self.max_queued_items) {
886 return;
887 }
888 }
889 }
890}
891
892impl Stream for TreeDiffStreamImpl<'_> {
893 type Item = TreeDiffEntry;
894
895 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
896 self.poll_tree_futures(cx);
898
899 if let Some((path, _)) = self.items.first_key_value() {
901 if let Some((dir, _)) = self.pending_trees.first_key_value()
904 && dir < path
905 {
906 return Poll::Pending;
907 }
908
909 let (path, values) = self.items.pop_first().unwrap();
910 Poll::Ready(Some(TreeDiffEntry { path, values }))
911 } else if self.pending_trees.is_empty() {
912 Poll::Ready(None)
913 } else {
914 Poll::Pending
915 }
916 }
917}
918
919fn stream_without_trees(stream: TreeDiffStream) -> TreeDiffStream {
920 stream
921 .filter_map(|mut entry| async move {
922 let skip_tree = |merge: MergedTreeValue| {
923 if merge.is_tree() {
924 Merge::absent()
925 } else {
926 merge
927 }
928 };
929 entry.values = entry.values.map(|diff| diff.map(skip_tree));
930
931 let any_present = entry.values.as_ref().map_or(true, |diff| {
933 diff.before.is_present() || diff.after.is_present()
934 });
935 any_present.then_some(entry)
936 })
937 .boxed()
938}
939
940struct DiffStreamForFileSystem<'a> {
943 inner: TreeDiffStream<'a>,
944 next_item: Option<TreeDiffEntry>,
945 held_file: Option<TreeDiffEntry>,
946}
947
948impl<'a> DiffStreamForFileSystem<'a> {
949 fn new(inner: TreeDiffStream<'a>) -> Self {
950 Self {
951 inner,
952 next_item: None,
953 held_file: None,
954 }
955 }
956}
957
958impl Stream for DiffStreamForFileSystem<'_> {
959 type Item = TreeDiffEntry;
960
961 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
962 while let Some(next) = match self.next_item.take() {
963 Some(next) => Some(next),
964 None => ready!(self.inner.as_mut().poll_next(cx)),
965 } {
966 if let Ok(diff) = &next.values
969 && !diff.before.is_file_like()
970 && !diff.after.is_file_like()
971 {
972 continue;
973 }
974
975 if let Some(held_entry) = self
979 .held_file
980 .take_if(|held_entry| !next.path.starts_with(&held_entry.path))
981 {
982 self.next_item = Some(next);
983 return Poll::Ready(Some(held_entry));
984 }
985
986 match next.values {
987 Ok(diff) if diff.before.is_tree() => {
988 assert!(diff.after.is_present());
989 assert!(self.held_file.is_none());
990 self.held_file = Some(TreeDiffEntry {
991 path: next.path,
992 values: Ok(Diff::new(Merge::absent(), diff.after)),
993 });
994 }
995 Ok(diff) if diff.after.is_tree() => {
996 assert!(diff.before.is_present());
997 return Poll::Ready(Some(TreeDiffEntry {
998 path: next.path,
999 values: Ok(Diff::new(diff.before, Merge::absent())),
1000 }));
1001 }
1002 _ => {
1003 return Poll::Ready(Some(next));
1004 }
1005 }
1006 }
1007 Poll::Ready(self.held_file.take())
1008 }
1009}