jj_lib/
tree_merge.rs

1// Copyright 2023-2025 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Merge trees by recursing into entries (subtrees, files)
16
17use std::borrow::Borrow;
18use std::collections::BTreeMap;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::vec;
22
23use futures::FutureExt as _;
24use futures::StreamExt as _;
25use futures::future::BoxFuture;
26use futures::future::try_join_all;
27use futures::stream::FuturesUnordered;
28use itertools::Itertools as _;
29use tokio::io::AsyncReadExt as _;
30
31use crate::backend;
32use crate::backend::BackendError;
33use crate::backend::BackendResult;
34use crate::backend::TreeValue;
35use crate::config::ConfigGetError;
36use crate::files;
37use crate::files::FileMergeHunkLevel;
38use crate::merge::Merge;
39use crate::merge::MergedTreeVal;
40use crate::merge::MergedTreeValue;
41use crate::merge::SameChange;
42use crate::merged_tree::all_merged_tree_entries;
43use crate::object_id::ObjectId as _;
44use crate::repo_path::RepoPath;
45use crate::repo_path::RepoPathBuf;
46use crate::repo_path::RepoPathComponentBuf;
47use crate::settings::UserSettings;
48use crate::store::Store;
49use crate::tree::Tree;
50
51/// Options for tree/file conflict resolution.
52#[derive(Clone, Debug)]
53pub struct MergeOptions {
54    /// Granularity of hunks when merging files.
55    pub hunk_level: FileMergeHunkLevel,
56    /// Whether to resolve conflict that makes the same change at all sides.
57    pub same_change: SameChange,
58}
59
60impl MergeOptions {
61    /// Loads merge options from `settings`.
62    pub fn from_settings(settings: &UserSettings) -> Result<Self, ConfigGetError> {
63        Ok(Self {
64            // Maybe we can add hunk-level=file to disable content merging if
65            // needed. It wouldn't be translated to FileMergeHunkLevel.
66            hunk_level: settings.get("merge.hunk-level")?,
67            same_change: settings.get("merge.same-change")?,
68        })
69    }
70}
71
72/// The returned conflict will either be resolved or have the same number of
73/// sides as the input.
74pub async fn merge_trees(merge: Merge<Tree>) -> BackendResult<Merge<Tree>> {
75    let merge = match merge.into_resolved() {
76        Ok(tree) => return Ok(Merge::resolved(tree)),
77        Err(merge) => merge,
78    };
79
80    let store = merge.first().store().clone();
81    let merger = TreeMerger {
82        store,
83        trees_to_resolve: BTreeMap::new(),
84        work: FuturesUnordered::new(),
85        unstarted_work: BTreeMap::new(),
86    };
87    merger.work.push(Box::pin(std::future::ready(
88        TreeMergerWorkOutput::ReadTrees {
89            dir: RepoPathBuf::root(),
90            result: Ok(merge),
91        },
92    )));
93    merger.merge().await
94}
95
96struct MergedTreeInput {
97    resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
98    /// Entries that we're currently waiting for data for in order to resolve
99    /// them. When this set becomes empty, we're ready to write the tree(s).
100    pending_lookup: HashSet<RepoPathComponentBuf>,
101    conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
102}
103
104impl MergedTreeInput {
105    fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
106        Self {
107            resolved,
108            pending_lookup: HashSet::new(),
109            conflicts: BTreeMap::new(),
110        }
111    }
112
113    fn mark_completed(
114        &mut self,
115        basename: RepoPathComponentBuf,
116        value: MergedTreeValue,
117        same_change: SameChange,
118    ) {
119        let was_pending = self.pending_lookup.remove(&basename);
120        assert!(was_pending, "No pending lookup for {basename:?}");
121        if let Some(resolved) = value.resolve_trivial(same_change) {
122            if let Some(resolved) = resolved.as_ref() {
123                self.resolved.insert(basename, resolved.clone());
124            }
125        } else {
126            self.conflicts.insert(basename, value);
127        }
128    }
129
130    fn into_backend_trees(self) -> Merge<backend::Tree> {
131        assert!(self.pending_lookup.is_empty());
132
133        fn by_name(
134            (name1, _): &(RepoPathComponentBuf, TreeValue),
135            (name2, _): &(RepoPathComponentBuf, TreeValue),
136        ) -> bool {
137            name1 < name2
138        }
139
140        if self.conflicts.is_empty() {
141            let all_entries = self.resolved.into_iter().collect();
142            Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
143        } else {
144            // Create a Merge with the conflict entries for each side.
145            let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
146            for (basename, value) in self.conflicts {
147                assert_eq!(value.num_sides(), conflict_entries.num_sides());
148                for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
149                    if let Some(value) = value {
150                        entries.push((basename.clone(), value));
151                    }
152                }
153            }
154
155            let mut backend_trees = vec![];
156            for entries in conflict_entries.into_iter() {
157                let backend_tree = backend::Tree::from_sorted_entries(
158                    self.resolved
159                        .iter()
160                        .map(|(name, value)| (name.clone(), value.clone()))
161                        .merge_by(entries, by_name)
162                        .collect(),
163                );
164                backend_trees.push(backend_tree);
165            }
166            Merge::from_vec(backend_trees)
167        }
168    }
169}
170
171/// The result from an asynchronously scheduled work item.
172enum TreeMergerWorkOutput {
173    /// Trees that have been read (i.e. `Read` is past tense)
174    ReadTrees {
175        dir: RepoPathBuf,
176        result: BackendResult<Merge<Tree>>,
177    },
178    WrittenTrees {
179        dir: RepoPathBuf,
180        result: BackendResult<Merge<Tree>>,
181    },
182    MergedFiles {
183        path: RepoPathBuf,
184        result: BackendResult<MergedTreeValue>,
185    },
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
189enum TreeMergeWorkItemKey {
190    // `MergeFiles` variant before `ReadTrees` so files are polled before trees because they
191    // typically take longer to process.
192    MergeFiles { path: RepoPathBuf },
193    ReadTrees { dir: RepoPathBuf },
194}
195
196struct TreeMerger {
197    store: Arc<Store>,
198    // Trees we're currently working on.
199    trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
200    // Futures we're currently processing. In order to respect the backend's concurrency limit.
201    work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
202    // Futures we haven't started polling yet, in order to respect the backend's concurrency limit.
203    unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
204}
205
206impl TreeMerger {
207    async fn merge(mut self) -> BackendResult<Merge<Tree>> {
208        while let Some(work_item) = self.work.next().await {
209            match work_item {
210                TreeMergerWorkOutput::ReadTrees { dir, result } => {
211                    let tree = result?;
212                    self.process_tree(dir, tree);
213                }
214                TreeMergerWorkOutput::WrittenTrees { dir, result } => {
215                    let tree = result?;
216                    if dir.is_root() {
217                        assert!(self.trees_to_resolve.is_empty());
218                        assert!(self.work.is_empty());
219                        assert!(self.unstarted_work.is_empty());
220                        return Ok(tree);
221                    }
222                    // Propagate the write to the parent tree, replacing empty trees by `None`.
223                    let new_value = tree.map(|tree| {
224                        (tree.id() != self.store.empty_tree_id())
225                            .then(|| TreeValue::Tree(tree.id().clone()))
226                    });
227                    self.mark_completed(&dir, new_value);
228                }
229                TreeMergerWorkOutput::MergedFiles { path, result } => {
230                    let value = result?;
231                    self.mark_completed(&path, value);
232                }
233            }
234
235            while self.work.len() < self.store.concurrency() {
236                if let Some((_key, work)) = self.unstarted_work.pop_first() {
237                    self.work.push(work);
238                } else {
239                    break;
240                }
241            }
242        }
243
244        unreachable!("There was no work item for writing the root tree");
245    }
246
247    fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
248        // First resolve trivial merges (those that we don't need to load any more data
249        // for)
250        let same_change = self.store.merge_options().same_change;
251        let mut resolved = vec![];
252        let mut non_trivial = vec![];
253        for (basename, path_merge) in all_merged_tree_entries(&tree) {
254            if let Some(value) = path_merge.resolve_trivial(same_change) {
255                if let Some(value) = value.cloned() {
256                    resolved.push((basename.to_owned(), value));
257                }
258            } else {
259                non_trivial.push((basename.to_owned(), path_merge.cloned()));
260            }
261        }
262
263        // If there are no non-trivial merges, we can write the tree now.
264        if non_trivial.is_empty() {
265            let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
266            self.enqueue_tree_write(dir, backend_trees);
267            return;
268        }
269
270        let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
271        for (basename, value) in non_trivial {
272            let path = dir.join(&basename);
273            unmerged_tree.pending_lookup.insert(basename);
274            if value.is_tree() {
275                self.enqueue_tree_read(path, value);
276            } else {
277                // TODO: If it's e.g. a dir/file conflict, there's no need to try to
278                // resolve it as a file. We should mark them to
279                // `unmerged_tree.conflicts` instead.
280                self.enqueue_file_merge(path, value);
281            }
282        }
283
284        self.trees_to_resolve.insert(dir, unmerged_tree);
285    }
286
287    fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
288        let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
289        let work_fut = read_trees(self.store.clone(), dir.clone(), value)
290            .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
291        if self.work.len() < self.store.concurrency() {
292            self.work.push(Box::pin(work_fut));
293        } else {
294            self.unstarted_work.insert(key, Box::pin(work_fut));
295        }
296    }
297
298    fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
299        let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
300            .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
301        // Bypass the `unstarted_work` queue because writing trees usually results in
302        // saving memory (each tree gets replaced by a `TreeValue::Tree`)
303        self.work.push(Box::pin(work_fut));
304    }
305
306    fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
307        let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
308        let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
309            .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
310        if self.work.len() < self.store.concurrency() {
311            self.work.push(Box::pin(work_fut));
312        } else {
313            self.unstarted_work.insert(key, Box::pin(work_fut));
314        }
315    }
316
317    fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
318        let (dir, basename) = path.split().unwrap();
319        let tree = self.trees_to_resolve.get_mut(dir).unwrap();
320        let same_change = self.store.merge_options().same_change;
321        tree.mark_completed(basename.to_owned(), value, same_change);
322        // If all entries in this tree have been processed (either resolved or still a
323        // conflict), schedule the writing of the tree(s) to the backend.
324        if tree.pending_lookup.is_empty() {
325            let tree = self.trees_to_resolve.remove(dir).unwrap();
326            self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
327        }
328    }
329}
330
331async fn read_trees(
332    store: Arc<Store>,
333    dir: RepoPathBuf,
334    value: MergedTreeValue,
335) -> BackendResult<Merge<Tree>> {
336    let trees = value
337        .to_tree_merge(&store, &dir)
338        .await?
339        .expect("Should be tree merge");
340    Ok(trees)
341}
342
343async fn write_trees(
344    store: Arc<Store>,
345    dir: RepoPathBuf,
346    backend_trees: Merge<backend::Tree>,
347) -> BackendResult<Merge<Tree>> {
348    // TODO: Could use `backend_trees.try_map_async()` here if it took `self` by
349    // value or if `Backend::write_tree()` to an `Arc<backend::Tree>`.
350    let trees = try_join_all(
351        backend_trees
352            .into_iter()
353            .map(|backend_tree| store.write_tree(&dir, backend_tree)),
354    )
355    .await?;
356    Ok(Merge::from_vec(trees))
357}
358
359async fn resolve_file_values_owned(
360    store: Arc<Store>,
361    path: RepoPathBuf,
362    values: MergedTreeValue,
363) -> BackendResult<MergedTreeValue> {
364    let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
365    Ok(maybe_resolved.unwrap_or(values))
366}
367
368/// Tries to resolve file conflicts by merging the file contents. Treats missing
369/// files as empty. If the file conflict cannot be resolved, returns the passed
370/// `values` unmodified.
371pub async fn resolve_file_values(
372    store: &Arc<Store>,
373    path: &RepoPath,
374    values: MergedTreeValue,
375) -> BackendResult<MergedTreeValue> {
376    let same_change = store.merge_options().same_change;
377    if let Some(resolved) = values.resolve_trivial(same_change) {
378        return Ok(Merge::resolved(resolved.clone()));
379    }
380
381    let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
382    Ok(maybe_resolved.unwrap_or(values))
383}
384
385async fn try_resolve_file_values<T: Borrow<TreeValue>>(
386    store: &Arc<Store>,
387    path: &RepoPath,
388    values: &Merge<Option<T>>,
389) -> BackendResult<Option<MergedTreeValue>> {
390    // The values may contain trees canceling each other (notably padded absent
391    // trees), so we need to simplify them first.
392    let simplified = values
393        .map(|value| value.as_ref().map(Borrow::borrow))
394        .simplify();
395    // No fast path for simplified.is_resolved(). If it could be resolved, it would
396    // have been caught by values.resolve_trivial() above.
397    if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
398        Ok(Some(Merge::normal(resolved)))
399    } else {
400        // Failed to merge the files, or the paths are not files
401        Ok(None)
402    }
403}
404
405/// Resolves file-level conflict by merging content hunks.
406///
407/// The input `conflict` is supposed to be simplified. It shouldn't contain
408/// non-file values that cancel each other.
409async fn try_resolve_file_conflict(
410    store: &Store,
411    filename: &RepoPath,
412    conflict: &MergedTreeVal<'_>,
413) -> BackendResult<Option<TreeValue>> {
414    let options = store.merge_options();
415    // If there are any non-file or any missing parts in the conflict, we can't
416    // merge it. We check early so we don't waste time reading file contents if
417    // we can't merge them anyway. At the same time we determine whether the
418    // resulting file should be executable.
419    let Ok(file_id_conflict) = conflict.try_map(|term| match term {
420        Some(TreeValue::File {
421            id,
422            executable: _,
423            copy_id: _,
424        }) => Ok(id),
425        _ => Err(()),
426    }) else {
427        return Ok(None);
428    };
429    let Ok(executable_conflict) = conflict.try_map(|term| match term {
430        Some(TreeValue::File {
431            id: _,
432            executable,
433            copy_id: _,
434        }) => Ok(executable),
435        _ => Err(()),
436    }) else {
437        return Ok(None);
438    };
439    let Ok(copy_id_conflict) = conflict.try_map(|term| match term {
440        Some(TreeValue::File {
441            id: _,
442            executable: _,
443            copy_id,
444        }) => Ok(copy_id),
445        _ => Err(()),
446    }) else {
447        return Ok(None);
448    };
449    // TODO: Whether to respect options.same_change to merge executable and
450    // copy_id? Should also update conflicts::resolve_file_executable().
451    let Some(&&executable) = executable_conflict.resolve_trivial(SameChange::Accept) else {
452        // We're unable to determine whether the result should be executable
453        return Ok(None);
454    };
455    let Some(&copy_id) = copy_id_conflict.resolve_trivial(SameChange::Accept) else {
456        // We're unable to determine the file's copy ID
457        return Ok(None);
458    };
459    if let Some(&resolved_file_id) = file_id_conflict.resolve_trivial(options.same_change) {
460        // Don't bother reading the file contents if the conflict can be trivially
461        // resolved.
462        return Ok(Some(TreeValue::File {
463            id: resolved_file_id.clone(),
464            executable,
465            copy_id: copy_id.clone(),
466        }));
467    }
468
469    // While the input conflict should be simplified by caller, it might contain
470    // terms which only differ in executable bits. Simplify the conflict further
471    // for two reasons:
472    // 1. Avoid reading unchanged file contents
473    // 2. The simplified conflict can sometimes be resolved when the unsimplfied one
474    //    cannot
475    let file_id_conflict = file_id_conflict.simplify();
476
477    let contents = file_id_conflict
478        .try_map_async(async |file_id| {
479            let mut content = vec![];
480            let mut reader = store.read_file(filename, file_id).await?;
481            reader
482                .read_to_end(&mut content)
483                .await
484                .map_err(|err| BackendError::ReadObject {
485                    object_type: file_id.object_type(),
486                    hash: file_id.hex(),
487                    source: err.into(),
488                })?;
489            BackendResult::Ok(content)
490        })
491        .await?;
492    if let Some(merged_content) = files::try_merge(&contents, options) {
493        let id = store
494            .write_file(filename, &mut merged_content.as_slice())
495            .await?;
496        Ok(Some(TreeValue::File {
497            id,
498            executable,
499            copy_id: copy_id.clone(),
500        }))
501    } else {
502        Ok(None)
503    }
504}