1use std::borrow::Borrow;
18use std::collections::BTreeMap;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::vec;
22
23use futures::FutureExt as _;
24use futures::StreamExt as _;
25use futures::future::BoxFuture;
26use futures::future::try_join_all;
27use futures::stream::FuturesUnordered;
28use itertools::Itertools as _;
29use tokio::io::AsyncReadExt as _;
30
31use crate::backend;
32use crate::backend::BackendError;
33use crate::backend::BackendResult;
34use crate::backend::TreeValue;
35use crate::config::ConfigGetError;
36use crate::files;
37use crate::files::FileMergeHunkLevel;
38use crate::merge::Merge;
39use crate::merge::MergedTreeVal;
40use crate::merge::MergedTreeValue;
41use crate::merge::SameChange;
42use crate::merged_tree::all_merged_tree_entries;
43use crate::object_id::ObjectId as _;
44use crate::repo_path::RepoPath;
45use crate::repo_path::RepoPathBuf;
46use crate::repo_path::RepoPathComponentBuf;
47use crate::settings::UserSettings;
48use crate::store::Store;
49use crate::tree::Tree;
50
51#[derive(Clone, Debug)]
53pub struct MergeOptions {
54 pub hunk_level: FileMergeHunkLevel,
56 pub same_change: SameChange,
58}
59
60impl MergeOptions {
61 pub fn from_settings(settings: &UserSettings) -> Result<Self, ConfigGetError> {
63 Ok(Self {
64 hunk_level: settings.get("merge.hunk-level")?,
67 same_change: settings.get("merge.same-change")?,
68 })
69 }
70}
71
72pub async fn merge_trees(merge: Merge<Tree>) -> BackendResult<Merge<Tree>> {
75 let merge = match merge.into_resolved() {
76 Ok(tree) => return Ok(Merge::resolved(tree)),
77 Err(merge) => merge,
78 };
79
80 let store = merge.first().store().clone();
81 let merger = TreeMerger {
82 store,
83 trees_to_resolve: BTreeMap::new(),
84 work: FuturesUnordered::new(),
85 unstarted_work: BTreeMap::new(),
86 };
87 merger.work.push(Box::pin(std::future::ready(
88 TreeMergerWorkOutput::ReadTrees {
89 dir: RepoPathBuf::root(),
90 result: Ok(merge),
91 },
92 )));
93 merger.merge().await
94}
95
96struct MergedTreeInput {
97 resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
98 pending_lookup: HashSet<RepoPathComponentBuf>,
101 conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
102}
103
104impl MergedTreeInput {
105 fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
106 Self {
107 resolved,
108 pending_lookup: HashSet::new(),
109 conflicts: BTreeMap::new(),
110 }
111 }
112
113 fn mark_completed(
114 &mut self,
115 basename: RepoPathComponentBuf,
116 value: MergedTreeValue,
117 same_change: SameChange,
118 ) {
119 let was_pending = self.pending_lookup.remove(&basename);
120 assert!(was_pending, "No pending lookup for {basename:?}");
121 if let Some(resolved) = value.resolve_trivial(same_change) {
122 if let Some(resolved) = resolved.as_ref() {
123 self.resolved.insert(basename, resolved.clone());
124 }
125 } else {
126 self.conflicts.insert(basename, value);
127 }
128 }
129
130 fn into_backend_trees(self) -> Merge<backend::Tree> {
131 assert!(self.pending_lookup.is_empty());
132
133 fn by_name(
134 (name1, _): &(RepoPathComponentBuf, TreeValue),
135 (name2, _): &(RepoPathComponentBuf, TreeValue),
136 ) -> bool {
137 name1 < name2
138 }
139
140 if self.conflicts.is_empty() {
141 let all_entries = self.resolved.into_iter().collect();
142 Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
143 } else {
144 let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
146 for (basename, value) in self.conflicts {
147 assert_eq!(value.num_sides(), conflict_entries.num_sides());
148 for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
149 if let Some(value) = value {
150 entries.push((basename.clone(), value));
151 }
152 }
153 }
154
155 let mut backend_trees = vec![];
156 for entries in conflict_entries.into_iter() {
157 let backend_tree = backend::Tree::from_sorted_entries(
158 self.resolved
159 .iter()
160 .map(|(name, value)| (name.clone(), value.clone()))
161 .merge_by(entries, by_name)
162 .collect(),
163 );
164 backend_trees.push(backend_tree);
165 }
166 Merge::from_vec(backend_trees)
167 }
168 }
169}
170
171enum TreeMergerWorkOutput {
173 ReadTrees {
175 dir: RepoPathBuf,
176 result: BackendResult<Merge<Tree>>,
177 },
178 WrittenTrees {
179 dir: RepoPathBuf,
180 result: BackendResult<Merge<Tree>>,
181 },
182 MergedFiles {
183 path: RepoPathBuf,
184 result: BackendResult<MergedTreeValue>,
185 },
186}
187
188#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
189enum TreeMergeWorkItemKey {
190 MergeFiles { path: RepoPathBuf },
193 ReadTrees { dir: RepoPathBuf },
194}
195
196struct TreeMerger {
197 store: Arc<Store>,
198 trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
200 work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
202 unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
204}
205
206impl TreeMerger {
207 async fn merge(mut self) -> BackendResult<Merge<Tree>> {
208 while let Some(work_item) = self.work.next().await {
209 match work_item {
210 TreeMergerWorkOutput::ReadTrees { dir, result } => {
211 let tree = result?;
212 self.process_tree(dir, tree);
213 }
214 TreeMergerWorkOutput::WrittenTrees { dir, result } => {
215 let tree = result?;
216 if dir.is_root() {
217 assert!(self.trees_to_resolve.is_empty());
218 assert!(self.work.is_empty());
219 assert!(self.unstarted_work.is_empty());
220 return Ok(tree);
221 }
222 let new_value = tree.map(|tree| {
224 (tree.id() != self.store.empty_tree_id())
225 .then(|| TreeValue::Tree(tree.id().clone()))
226 });
227 self.mark_completed(&dir, new_value);
228 }
229 TreeMergerWorkOutput::MergedFiles { path, result } => {
230 let value = result?;
231 self.mark_completed(&path, value);
232 }
233 }
234
235 while self.work.len() < self.store.concurrency() {
236 if let Some((_key, work)) = self.unstarted_work.pop_first() {
237 self.work.push(work);
238 } else {
239 break;
240 }
241 }
242 }
243
244 unreachable!("There was no work item for writing the root tree");
245 }
246
247 fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
248 let same_change = self.store.merge_options().same_change;
251 let mut resolved = vec![];
252 let mut non_trivial = vec![];
253 for (basename, path_merge) in all_merged_tree_entries(&tree) {
254 if let Some(value) = path_merge.resolve_trivial(same_change) {
255 if let Some(value) = value.cloned() {
256 resolved.push((basename.to_owned(), value));
257 }
258 } else {
259 non_trivial.push((basename.to_owned(), path_merge.cloned()));
260 }
261 }
262
263 if non_trivial.is_empty() {
265 let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
266 self.enqueue_tree_write(dir, backend_trees);
267 return;
268 }
269
270 let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
271 for (basename, value) in non_trivial {
272 let path = dir.join(&basename);
273 unmerged_tree.pending_lookup.insert(basename);
274 if value.is_tree() {
275 self.enqueue_tree_read(path, value);
276 } else {
277 self.enqueue_file_merge(path, value);
281 }
282 }
283
284 self.trees_to_resolve.insert(dir, unmerged_tree);
285 }
286
287 fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
288 let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
289 let work_fut = read_trees(self.store.clone(), dir.clone(), value)
290 .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
291 if self.work.len() < self.store.concurrency() {
292 self.work.push(Box::pin(work_fut));
293 } else {
294 self.unstarted_work.insert(key, Box::pin(work_fut));
295 }
296 }
297
298 fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
299 let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
300 .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
301 self.work.push(Box::pin(work_fut));
304 }
305
306 fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
307 let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
308 let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
309 .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
310 if self.work.len() < self.store.concurrency() {
311 self.work.push(Box::pin(work_fut));
312 } else {
313 self.unstarted_work.insert(key, Box::pin(work_fut));
314 }
315 }
316
317 fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
318 let (dir, basename) = path.split().unwrap();
319 let tree = self.trees_to_resolve.get_mut(dir).unwrap();
320 let same_change = self.store.merge_options().same_change;
321 tree.mark_completed(basename.to_owned(), value, same_change);
322 if tree.pending_lookup.is_empty() {
325 let tree = self.trees_to_resolve.remove(dir).unwrap();
326 self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
327 }
328 }
329}
330
331async fn read_trees(
332 store: Arc<Store>,
333 dir: RepoPathBuf,
334 value: MergedTreeValue,
335) -> BackendResult<Merge<Tree>> {
336 let trees = value
337 .to_tree_merge(&store, &dir)
338 .await?
339 .expect("Should be tree merge");
340 Ok(trees)
341}
342
343async fn write_trees(
344 store: Arc<Store>,
345 dir: RepoPathBuf,
346 backend_trees: Merge<backend::Tree>,
347) -> BackendResult<Merge<Tree>> {
348 let trees = try_join_all(
351 backend_trees
352 .into_iter()
353 .map(|backend_tree| store.write_tree(&dir, backend_tree)),
354 )
355 .await?;
356 Ok(Merge::from_vec(trees))
357}
358
359async fn resolve_file_values_owned(
360 store: Arc<Store>,
361 path: RepoPathBuf,
362 values: MergedTreeValue,
363) -> BackendResult<MergedTreeValue> {
364 let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
365 Ok(maybe_resolved.unwrap_or(values))
366}
367
368pub async fn resolve_file_values(
372 store: &Arc<Store>,
373 path: &RepoPath,
374 values: MergedTreeValue,
375) -> BackendResult<MergedTreeValue> {
376 let same_change = store.merge_options().same_change;
377 if let Some(resolved) = values.resolve_trivial(same_change) {
378 return Ok(Merge::resolved(resolved.clone()));
379 }
380
381 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
382 Ok(maybe_resolved.unwrap_or(values))
383}
384
385async fn try_resolve_file_values<T: Borrow<TreeValue>>(
386 store: &Arc<Store>,
387 path: &RepoPath,
388 values: &Merge<Option<T>>,
389) -> BackendResult<Option<MergedTreeValue>> {
390 let simplified = values
393 .map(|value| value.as_ref().map(Borrow::borrow))
394 .simplify();
395 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
398 Ok(Some(Merge::normal(resolved)))
399 } else {
400 Ok(None)
402 }
403}
404
405async fn try_resolve_file_conflict(
410 store: &Store,
411 filename: &RepoPath,
412 conflict: &MergedTreeVal<'_>,
413) -> BackendResult<Option<TreeValue>> {
414 let options = store.merge_options();
415 let Ok(file_id_conflict) = conflict.try_map(|term| match term {
420 Some(TreeValue::File {
421 id,
422 executable: _,
423 copy_id: _,
424 }) => Ok(id),
425 _ => Err(()),
426 }) else {
427 return Ok(None);
428 };
429 let Ok(executable_conflict) = conflict.try_map(|term| match term {
430 Some(TreeValue::File {
431 id: _,
432 executable,
433 copy_id: _,
434 }) => Ok(executable),
435 _ => Err(()),
436 }) else {
437 return Ok(None);
438 };
439 let Ok(copy_id_conflict) = conflict.try_map(|term| match term {
440 Some(TreeValue::File {
441 id: _,
442 executable: _,
443 copy_id,
444 }) => Ok(copy_id),
445 _ => Err(()),
446 }) else {
447 return Ok(None);
448 };
449 let Some(&&executable) = executable_conflict.resolve_trivial(SameChange::Accept) else {
452 return Ok(None);
454 };
455 let Some(©_id) = copy_id_conflict.resolve_trivial(SameChange::Accept) else {
456 return Ok(None);
458 };
459 if let Some(&resolved_file_id) = file_id_conflict.resolve_trivial(options.same_change) {
460 return Ok(Some(TreeValue::File {
463 id: resolved_file_id.clone(),
464 executable,
465 copy_id: copy_id.clone(),
466 }));
467 }
468
469 let file_id_conflict = file_id_conflict.simplify();
476
477 let contents = file_id_conflict
478 .try_map_async(async |file_id| {
479 let mut content = vec![];
480 let mut reader = store.read_file(filename, file_id).await?;
481 reader
482 .read_to_end(&mut content)
483 .await
484 .map_err(|err| BackendError::ReadObject {
485 object_type: file_id.object_type(),
486 hash: file_id.hex(),
487 source: err.into(),
488 })?;
489 BackendResult::Ok(content)
490 })
491 .await?;
492 if let Some(merged_content) = files::try_merge(&contents, options) {
493 let id = store
494 .write_file(filename, &mut merged_content.as_slice())
495 .await?;
496 Ok(Some(TreeValue::File {
497 id,
498 executable,
499 copy_id: copy_id.clone(),
500 }))
501 } else {
502 Ok(None)
503 }
504}