jj_lib/
tree_merge.rs

1// Copyright 2023-2025 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Merge trees by recursing into entries (subtrees, files)
16
17use std::borrow::Borrow;
18use std::collections::BTreeMap;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::vec;
22
23use futures::FutureExt as _;
24use futures::StreamExt as _;
25use futures::future::BoxFuture;
26use futures::future::try_join_all;
27use futures::stream::FuturesUnordered;
28use itertools::Itertools as _;
29use tokio::io::AsyncReadExt as _;
30
31use crate::backend;
32use crate::backend::BackendError;
33use crate::backend::BackendResult;
34use crate::backend::TreeId;
35use crate::backend::TreeValue;
36use crate::config::ConfigGetError;
37use crate::files;
38use crate::files::FileMergeHunkLevel;
39use crate::merge::Merge;
40use crate::merge::MergedTreeVal;
41use crate::merge::MergedTreeValue;
42use crate::merge::SameChange;
43use crate::merged_tree::all_merged_tree_entries;
44use crate::object_id::ObjectId as _;
45use crate::repo_path::RepoPath;
46use crate::repo_path::RepoPathBuf;
47use crate::repo_path::RepoPathComponentBuf;
48use crate::settings::UserSettings;
49use crate::store::Store;
50use crate::tree::Tree;
51
52/// Options for tree/file conflict resolution.
53#[derive(Clone, Debug)]
54pub struct MergeOptions {
55    /// Granularity of hunks when merging files.
56    pub hunk_level: FileMergeHunkLevel,
57    /// Whether to resolve conflict that makes the same change at all sides.
58    pub same_change: SameChange,
59}
60
61impl MergeOptions {
62    /// Loads merge options from `settings`.
63    pub fn from_settings(settings: &UserSettings) -> Result<Self, ConfigGetError> {
64        Ok(Self {
65            // Maybe we can add hunk-level=file to disable content merging if
66            // needed. It wouldn't be translated to FileMergeHunkLevel.
67            hunk_level: settings.get("merge.hunk-level")?,
68            same_change: settings.get("merge.same-change")?,
69        })
70    }
71}
72
73/// The returned conflict will either be resolved or have the same number of
74/// sides as the input.
75pub async fn merge_trees(store: &Arc<Store>, merge: Merge<TreeId>) -> BackendResult<Merge<TreeId>> {
76    let merge = match merge.into_resolved() {
77        Ok(tree) => return Ok(Merge::resolved(tree)),
78        Err(merge) => merge,
79    };
80
81    let mut merger = TreeMerger {
82        store: store.clone(),
83        trees_to_resolve: BTreeMap::new(),
84        work: FuturesUnordered::new(),
85        unstarted_work: BTreeMap::new(),
86    };
87    merger.enqueue_tree_read(
88        RepoPathBuf::root(),
89        merge.map(|tree_id| Some(TreeValue::Tree(tree_id.clone()))),
90    );
91    let trees = merger.merge().await?;
92    Ok(trees.map(|tree| tree.id().clone()))
93}
94
95struct MergedTreeInput {
96    resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
97    /// Entries that we're currently waiting for data for in order to resolve
98    /// them. When this set becomes empty, we're ready to write the tree(s).
99    pending_lookup: HashSet<RepoPathComponentBuf>,
100    conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
101}
102
103impl MergedTreeInput {
104    fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
105        Self {
106            resolved,
107            pending_lookup: HashSet::new(),
108            conflicts: BTreeMap::new(),
109        }
110    }
111
112    fn mark_completed(
113        &mut self,
114        basename: RepoPathComponentBuf,
115        value: MergedTreeValue,
116        same_change: SameChange,
117    ) {
118        let was_pending = self.pending_lookup.remove(&basename);
119        assert!(was_pending, "No pending lookup for {basename:?}");
120        if let Some(resolved) = value.resolve_trivial(same_change) {
121            if let Some(resolved) = resolved.as_ref() {
122                self.resolved.insert(basename, resolved.clone());
123            }
124        } else {
125            self.conflicts.insert(basename, value);
126        }
127    }
128
129    fn into_backend_trees(self) -> Merge<backend::Tree> {
130        assert!(self.pending_lookup.is_empty());
131
132        fn by_name(
133            (name1, _): &(RepoPathComponentBuf, TreeValue),
134            (name2, _): &(RepoPathComponentBuf, TreeValue),
135        ) -> bool {
136            name1 < name2
137        }
138
139        if self.conflicts.is_empty() {
140            let all_entries = self.resolved.into_iter().collect();
141            Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
142        } else {
143            // Create a Merge with the conflict entries for each side.
144            let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
145            for (basename, value) in self.conflicts {
146                assert_eq!(value.num_sides(), conflict_entries.num_sides());
147                for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
148                    if let Some(value) = value {
149                        entries.push((basename.clone(), value));
150                    }
151                }
152            }
153
154            let mut backend_trees = vec![];
155            for entries in conflict_entries.into_iter() {
156                let backend_tree = backend::Tree::from_sorted_entries(
157                    self.resolved
158                        .iter()
159                        .map(|(name, value)| (name.clone(), value.clone()))
160                        .merge_by(entries, by_name)
161                        .collect(),
162                );
163                backend_trees.push(backend_tree);
164            }
165            Merge::from_vec(backend_trees)
166        }
167    }
168}
169
170/// The result from an asynchronously scheduled work item.
171enum TreeMergerWorkOutput {
172    /// Trees that have been read (i.e. `Read` is past tense)
173    ReadTrees {
174        dir: RepoPathBuf,
175        result: BackendResult<Merge<Tree>>,
176    },
177    WrittenTrees {
178        dir: RepoPathBuf,
179        result: BackendResult<Merge<Tree>>,
180    },
181    MergedFiles {
182        path: RepoPathBuf,
183        result: BackendResult<MergedTreeValue>,
184    },
185}
186
187#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
188enum TreeMergeWorkItemKey {
189    // `MergeFiles` variant before `ReadTrees` so files are polled before trees because they
190    // typically take longer to process.
191    MergeFiles { path: RepoPathBuf },
192    ReadTrees { dir: RepoPathBuf },
193}
194
195struct TreeMerger {
196    store: Arc<Store>,
197    // Trees we're currently working on.
198    trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
199    // Futures we're currently processing. In order to respect the backend's concurrency limit.
200    work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
201    // Futures we haven't started polling yet, in order to respect the backend's concurrency limit.
202    unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
203}
204
205impl TreeMerger {
206    async fn merge(mut self) -> BackendResult<Merge<Tree>> {
207        while let Some(work_item) = self.work.next().await {
208            match work_item {
209                TreeMergerWorkOutput::ReadTrees { dir, result } => {
210                    let tree = result?;
211                    self.process_tree(dir, tree);
212                }
213                TreeMergerWorkOutput::WrittenTrees { dir, result } => {
214                    let tree = result?;
215                    if dir.is_root() {
216                        assert!(self.trees_to_resolve.is_empty());
217                        assert!(self.work.is_empty());
218                        assert!(self.unstarted_work.is_empty());
219                        return Ok(tree);
220                    }
221                    // Propagate the write to the parent tree, replacing empty trees by `None`.
222                    let new_value = tree.map(|tree| {
223                        (tree.id() != self.store.empty_tree_id())
224                            .then(|| TreeValue::Tree(tree.id().clone()))
225                    });
226                    self.mark_completed(&dir, new_value);
227                }
228                TreeMergerWorkOutput::MergedFiles { path, result } => {
229                    let value = result?;
230                    self.mark_completed(&path, value);
231                }
232            }
233
234            while self.work.len() < self.store.concurrency() {
235                if let Some((_key, work)) = self.unstarted_work.pop_first() {
236                    self.work.push(work);
237                } else {
238                    break;
239                }
240            }
241        }
242
243        unreachable!("There was no work item for writing the root tree");
244    }
245
246    fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
247        // First resolve trivial merges (those that we don't need to load any more data
248        // for)
249        let same_change = self.store.merge_options().same_change;
250        let mut resolved = vec![];
251        let mut non_trivial = vec![];
252        for (basename, path_merge) in all_merged_tree_entries(&tree) {
253            if let Some(value) = path_merge.resolve_trivial(same_change) {
254                if let Some(value) = value.cloned() {
255                    resolved.push((basename.to_owned(), value));
256                }
257            } else {
258                non_trivial.push((basename.to_owned(), path_merge.cloned()));
259            }
260        }
261
262        // If there are no non-trivial merges, we can write the tree now.
263        if non_trivial.is_empty() {
264            let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
265            self.enqueue_tree_write(dir, backend_trees);
266            return;
267        }
268
269        let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
270        for (basename, value) in non_trivial {
271            let path = dir.join(&basename);
272            unmerged_tree.pending_lookup.insert(basename);
273            if value.is_tree() {
274                self.enqueue_tree_read(path, value);
275            } else {
276                // TODO: If it's e.g. a dir/file conflict, there's no need to try to
277                // resolve it as a file. We should mark them to
278                // `unmerged_tree.conflicts` instead.
279                self.enqueue_file_merge(path, value);
280            }
281        }
282
283        self.trees_to_resolve.insert(dir, unmerged_tree);
284    }
285
286    fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
287        let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
288        let work_fut = read_trees(self.store.clone(), dir.clone(), value)
289            .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
290        if self.work.len() < self.store.concurrency() {
291            self.work.push(Box::pin(work_fut));
292        } else {
293            self.unstarted_work.insert(key, Box::pin(work_fut));
294        }
295    }
296
297    fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
298        let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
299            .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
300        // Bypass the `unstarted_work` queue because writing trees usually results in
301        // saving memory (each tree gets replaced by a `TreeValue::Tree`)
302        self.work.push(Box::pin(work_fut));
303    }
304
305    fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
306        let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
307        let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
308            .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
309        if self.work.len() < self.store.concurrency() {
310            self.work.push(Box::pin(work_fut));
311        } else {
312            self.unstarted_work.insert(key, Box::pin(work_fut));
313        }
314    }
315
316    fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
317        let (dir, basename) = path.split().unwrap();
318        let tree = self.trees_to_resolve.get_mut(dir).unwrap();
319        let same_change = self.store.merge_options().same_change;
320        tree.mark_completed(basename.to_owned(), value, same_change);
321        // If all entries in this tree have been processed (either resolved or still a
322        // conflict), schedule the writing of the tree(s) to the backend.
323        if tree.pending_lookup.is_empty() {
324            let tree = self.trees_to_resolve.remove(dir).unwrap();
325            self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
326        }
327    }
328}
329
330async fn read_trees(
331    store: Arc<Store>,
332    dir: RepoPathBuf,
333    value: MergedTreeValue,
334) -> BackendResult<Merge<Tree>> {
335    let trees = value
336        .to_tree_merge(&store, &dir)
337        .await?
338        .expect("Should be tree merge");
339    Ok(trees)
340}
341
342async fn write_trees(
343    store: Arc<Store>,
344    dir: RepoPathBuf,
345    backend_trees: Merge<backend::Tree>,
346) -> BackendResult<Merge<Tree>> {
347    // TODO: Could use `backend_trees.try_map_async()` here if it took `self` by
348    // value or if `Backend::write_tree()` to an `Arc<backend::Tree>`.
349    let trees = try_join_all(
350        backend_trees
351            .into_iter()
352            .map(|backend_tree| store.write_tree(&dir, backend_tree)),
353    )
354    .await?;
355    Ok(Merge::from_vec(trees))
356}
357
358async fn resolve_file_values_owned(
359    store: Arc<Store>,
360    path: RepoPathBuf,
361    values: MergedTreeValue,
362) -> BackendResult<MergedTreeValue> {
363    let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
364    Ok(maybe_resolved.unwrap_or(values))
365}
366
367/// Tries to resolve file conflicts by merging the file contents. Treats missing
368/// files as empty. If the file conflict cannot be resolved, returns the passed
369/// `values` unmodified.
370pub async fn resolve_file_values(
371    store: &Arc<Store>,
372    path: &RepoPath,
373    values: MergedTreeValue,
374) -> BackendResult<MergedTreeValue> {
375    let same_change = store.merge_options().same_change;
376    if let Some(resolved) = values.resolve_trivial(same_change) {
377        return Ok(Merge::resolved(resolved.clone()));
378    }
379
380    let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
381    Ok(maybe_resolved.unwrap_or(values))
382}
383
384async fn try_resolve_file_values<T: Borrow<TreeValue>>(
385    store: &Arc<Store>,
386    path: &RepoPath,
387    values: &Merge<Option<T>>,
388) -> BackendResult<Option<MergedTreeValue>> {
389    // The values may contain trees canceling each other (notably padded absent
390    // trees), so we need to simplify them first.
391    let simplified = values
392        .map(|value| value.as_ref().map(Borrow::borrow))
393        .simplify();
394    // No fast path for simplified.is_resolved(). If it could be resolved, it would
395    // have been caught by values.resolve_trivial() above.
396    if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
397        Ok(Some(Merge::normal(resolved)))
398    } else {
399        // Failed to merge the files, or the paths are not files
400        Ok(None)
401    }
402}
403
404/// Resolves file-level conflict by merging content hunks.
405///
406/// The input `conflict` is supposed to be simplified. It shouldn't contain
407/// non-file values that cancel each other.
408async fn try_resolve_file_conflict(
409    store: &Store,
410    filename: &RepoPath,
411    conflict: &MergedTreeVal<'_>,
412) -> BackendResult<Option<TreeValue>> {
413    let options = store.merge_options();
414    // If there are any non-file or any missing parts in the conflict, we can't
415    // merge it. We check early so we don't waste time reading file contents if
416    // we can't merge them anyway. At the same time we determine whether the
417    // resulting file should be executable.
418    let Ok(file_id_conflict) = conflict.try_map(|term| match term {
419        Some(TreeValue::File {
420            id,
421            executable: _,
422            copy_id: _,
423        }) => Ok(id),
424        _ => Err(()),
425    }) else {
426        return Ok(None);
427    };
428    let Ok(executable_conflict) = conflict.try_map(|term| match term {
429        Some(TreeValue::File {
430            id: _,
431            executable,
432            copy_id: _,
433        }) => Ok(executable),
434        _ => Err(()),
435    }) else {
436        return Ok(None);
437    };
438    let Ok(copy_id_conflict) = conflict.try_map(|term| match term {
439        Some(TreeValue::File {
440            id: _,
441            executable: _,
442            copy_id,
443        }) => Ok(copy_id),
444        _ => Err(()),
445    }) else {
446        return Ok(None);
447    };
448    // TODO: Whether to respect options.same_change to merge executable and
449    // copy_id? Should also update conflicts::resolve_file_executable().
450    let Some(&&executable) = executable_conflict.resolve_trivial(SameChange::Accept) else {
451        // We're unable to determine whether the result should be executable
452        return Ok(None);
453    };
454    let Some(&copy_id) = copy_id_conflict.resolve_trivial(SameChange::Accept) else {
455        // We're unable to determine the file's copy ID
456        return Ok(None);
457    };
458    if let Some(&resolved_file_id) = file_id_conflict.resolve_trivial(options.same_change) {
459        // Don't bother reading the file contents if the conflict can be trivially
460        // resolved.
461        return Ok(Some(TreeValue::File {
462            id: resolved_file_id.clone(),
463            executable,
464            copy_id: copy_id.clone(),
465        }));
466    }
467
468    // While the input conflict should be simplified by caller, it might contain
469    // terms which only differ in executable bits. Simplify the conflict further
470    // for two reasons:
471    // 1. Avoid reading unchanged file contents
472    // 2. The simplified conflict can sometimes be resolved when the unsimplfied one
473    //    cannot
474    let file_id_conflict = file_id_conflict.simplify();
475
476    let contents = file_id_conflict
477        .try_map_async(async |file_id| {
478            let mut content = vec![];
479            let mut reader = store.read_file(filename, file_id).await?;
480            reader
481                .read_to_end(&mut content)
482                .await
483                .map_err(|err| BackendError::ReadObject {
484                    object_type: file_id.object_type(),
485                    hash: file_id.hex(),
486                    source: err.into(),
487                })?;
488            BackendResult::Ok(content)
489        })
490        .await?;
491    if let Some(merged_content) = files::try_merge(&contents, options) {
492        let id = store
493            .write_file(filename, &mut merged_content.as_slice())
494            .await?;
495        Ok(Some(TreeValue::File {
496            id,
497            executable,
498            copy_id: copy_id.clone(),
499        }))
500    } else {
501        Ok(None)
502    }
503}