1use std::borrow::Borrow;
18use std::collections::BTreeMap;
19use std::collections::HashSet;
20use std::sync::Arc;
21use std::vec;
22
23use futures::FutureExt as _;
24use futures::StreamExt as _;
25use futures::future::BoxFuture;
26use futures::future::try_join_all;
27use futures::stream::FuturesUnordered;
28use itertools::Itertools as _;
29use tokio::io::AsyncReadExt as _;
30
31use crate::backend;
32use crate::backend::BackendError;
33use crate::backend::BackendResult;
34use crate::backend::TreeId;
35use crate::backend::TreeValue;
36use crate::config::ConfigGetError;
37use crate::files;
38use crate::files::FileMergeHunkLevel;
39use crate::merge::Merge;
40use crate::merge::MergedTreeVal;
41use crate::merge::MergedTreeValue;
42use crate::merge::SameChange;
43use crate::merged_tree::all_merged_tree_entries;
44use crate::object_id::ObjectId as _;
45use crate::repo_path::RepoPath;
46use crate::repo_path::RepoPathBuf;
47use crate::repo_path::RepoPathComponentBuf;
48use crate::settings::UserSettings;
49use crate::store::Store;
50use crate::tree::Tree;
51
52#[derive(Clone, Debug)]
54pub struct MergeOptions {
55 pub hunk_level: FileMergeHunkLevel,
57 pub same_change: SameChange,
59}
60
61impl MergeOptions {
62 pub fn from_settings(settings: &UserSettings) -> Result<Self, ConfigGetError> {
64 Ok(Self {
65 hunk_level: settings.get("merge.hunk-level")?,
68 same_change: settings.get("merge.same-change")?,
69 })
70 }
71}
72
73pub async fn merge_trees(store: &Arc<Store>, merge: Merge<TreeId>) -> BackendResult<Merge<TreeId>> {
76 let merge = match merge.into_resolved() {
77 Ok(tree) => return Ok(Merge::resolved(tree)),
78 Err(merge) => merge,
79 };
80
81 let mut merger = TreeMerger {
82 store: store.clone(),
83 trees_to_resolve: BTreeMap::new(),
84 work: FuturesUnordered::new(),
85 unstarted_work: BTreeMap::new(),
86 };
87 merger.enqueue_tree_read(
88 RepoPathBuf::root(),
89 merge.map(|tree_id| Some(TreeValue::Tree(tree_id.clone()))),
90 );
91 let trees = merger.merge().await?;
92 Ok(trees.map(|tree| tree.id().clone()))
93}
94
95struct MergedTreeInput {
96 resolved: BTreeMap<RepoPathComponentBuf, TreeValue>,
97 pending_lookup: HashSet<RepoPathComponentBuf>,
100 conflicts: BTreeMap<RepoPathComponentBuf, MergedTreeValue>,
101}
102
103impl MergedTreeInput {
104 fn new(resolved: BTreeMap<RepoPathComponentBuf, TreeValue>) -> Self {
105 Self {
106 resolved,
107 pending_lookup: HashSet::new(),
108 conflicts: BTreeMap::new(),
109 }
110 }
111
112 fn mark_completed(
113 &mut self,
114 basename: RepoPathComponentBuf,
115 value: MergedTreeValue,
116 same_change: SameChange,
117 ) {
118 let was_pending = self.pending_lookup.remove(&basename);
119 assert!(was_pending, "No pending lookup for {basename:?}");
120 if let Some(resolved) = value.resolve_trivial(same_change) {
121 if let Some(resolved) = resolved.as_ref() {
122 self.resolved.insert(basename, resolved.clone());
123 }
124 } else {
125 self.conflicts.insert(basename, value);
126 }
127 }
128
129 fn into_backend_trees(self) -> Merge<backend::Tree> {
130 assert!(self.pending_lookup.is_empty());
131
132 fn by_name(
133 (name1, _): &(RepoPathComponentBuf, TreeValue),
134 (name2, _): &(RepoPathComponentBuf, TreeValue),
135 ) -> bool {
136 name1 < name2
137 }
138
139 if self.conflicts.is_empty() {
140 let all_entries = self.resolved.into_iter().collect();
141 Merge::resolved(backend::Tree::from_sorted_entries(all_entries))
142 } else {
143 let mut conflict_entries = self.conflicts.first_key_value().unwrap().1.map(|_| vec![]);
145 for (basename, value) in self.conflicts {
146 assert_eq!(value.num_sides(), conflict_entries.num_sides());
147 for (entries, value) in conflict_entries.iter_mut().zip(value.into_iter()) {
148 if let Some(value) = value {
149 entries.push((basename.clone(), value));
150 }
151 }
152 }
153
154 let mut backend_trees = vec![];
155 for entries in conflict_entries.into_iter() {
156 let backend_tree = backend::Tree::from_sorted_entries(
157 self.resolved
158 .iter()
159 .map(|(name, value)| (name.clone(), value.clone()))
160 .merge_by(entries, by_name)
161 .collect(),
162 );
163 backend_trees.push(backend_tree);
164 }
165 Merge::from_vec(backend_trees)
166 }
167 }
168}
169
170enum TreeMergerWorkOutput {
172 ReadTrees {
174 dir: RepoPathBuf,
175 result: BackendResult<Merge<Tree>>,
176 },
177 WrittenTrees {
178 dir: RepoPathBuf,
179 result: BackendResult<Merge<Tree>>,
180 },
181 MergedFiles {
182 path: RepoPathBuf,
183 result: BackendResult<MergedTreeValue>,
184 },
185}
186
187#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
188enum TreeMergeWorkItemKey {
189 MergeFiles { path: RepoPathBuf },
192 ReadTrees { dir: RepoPathBuf },
193}
194
195struct TreeMerger {
196 store: Arc<Store>,
197 trees_to_resolve: BTreeMap<RepoPathBuf, MergedTreeInput>,
199 work: FuturesUnordered<BoxFuture<'static, TreeMergerWorkOutput>>,
201 unstarted_work: BTreeMap<TreeMergeWorkItemKey, BoxFuture<'static, TreeMergerWorkOutput>>,
203}
204
205impl TreeMerger {
206 async fn merge(mut self) -> BackendResult<Merge<Tree>> {
207 while let Some(work_item) = self.work.next().await {
208 match work_item {
209 TreeMergerWorkOutput::ReadTrees { dir, result } => {
210 let tree = result?;
211 self.process_tree(dir, tree);
212 }
213 TreeMergerWorkOutput::WrittenTrees { dir, result } => {
214 let tree = result?;
215 if dir.is_root() {
216 assert!(self.trees_to_resolve.is_empty());
217 assert!(self.work.is_empty());
218 assert!(self.unstarted_work.is_empty());
219 return Ok(tree);
220 }
221 let new_value = tree.map(|tree| {
223 (tree.id() != self.store.empty_tree_id())
224 .then(|| TreeValue::Tree(tree.id().clone()))
225 });
226 self.mark_completed(&dir, new_value);
227 }
228 TreeMergerWorkOutput::MergedFiles { path, result } => {
229 let value = result?;
230 self.mark_completed(&path, value);
231 }
232 }
233
234 while self.work.len() < self.store.concurrency() {
235 if let Some((_key, work)) = self.unstarted_work.pop_first() {
236 self.work.push(work);
237 } else {
238 break;
239 }
240 }
241 }
242
243 unreachable!("There was no work item for writing the root tree");
244 }
245
246 fn process_tree(&mut self, dir: RepoPathBuf, tree: Merge<Tree>) {
247 let same_change = self.store.merge_options().same_change;
250 let mut resolved = vec![];
251 let mut non_trivial = vec![];
252 for (basename, path_merge) in all_merged_tree_entries(&tree) {
253 if let Some(value) = path_merge.resolve_trivial(same_change) {
254 if let Some(value) = value.cloned() {
255 resolved.push((basename.to_owned(), value));
256 }
257 } else {
258 non_trivial.push((basename.to_owned(), path_merge.cloned()));
259 }
260 }
261
262 if non_trivial.is_empty() {
264 let backend_trees = Merge::resolved(backend::Tree::from_sorted_entries(resolved));
265 self.enqueue_tree_write(dir, backend_trees);
266 return;
267 }
268
269 let mut unmerged_tree = MergedTreeInput::new(resolved.into_iter().collect());
270 for (basename, value) in non_trivial {
271 let path = dir.join(&basename);
272 unmerged_tree.pending_lookup.insert(basename);
273 if value.is_tree() {
274 self.enqueue_tree_read(path, value);
275 } else {
276 self.enqueue_file_merge(path, value);
280 }
281 }
282
283 self.trees_to_resolve.insert(dir, unmerged_tree);
284 }
285
286 fn enqueue_tree_read(&mut self, dir: RepoPathBuf, value: MergedTreeValue) {
287 let key = TreeMergeWorkItemKey::ReadTrees { dir: dir.clone() };
288 let work_fut = read_trees(self.store.clone(), dir.clone(), value)
289 .map(|result| TreeMergerWorkOutput::ReadTrees { dir, result });
290 if self.work.len() < self.store.concurrency() {
291 self.work.push(Box::pin(work_fut));
292 } else {
293 self.unstarted_work.insert(key, Box::pin(work_fut));
294 }
295 }
296
297 fn enqueue_tree_write(&mut self, dir: RepoPathBuf, backend_trees: Merge<backend::Tree>) {
298 let work_fut = write_trees(self.store.clone(), dir.clone(), backend_trees)
299 .map(|result| TreeMergerWorkOutput::WrittenTrees { dir, result });
300 self.work.push(Box::pin(work_fut));
303 }
304
305 fn enqueue_file_merge(&mut self, path: RepoPathBuf, value: MergedTreeValue) {
306 let key = TreeMergeWorkItemKey::MergeFiles { path: path.clone() };
307 let work_fut = resolve_file_values_owned(self.store.clone(), path.clone(), value)
308 .map(|result| TreeMergerWorkOutput::MergedFiles { path, result });
309 if self.work.len() < self.store.concurrency() {
310 self.work.push(Box::pin(work_fut));
311 } else {
312 self.unstarted_work.insert(key, Box::pin(work_fut));
313 }
314 }
315
316 fn mark_completed(&mut self, path: &RepoPath, value: MergedTreeValue) {
317 let (dir, basename) = path.split().unwrap();
318 let tree = self.trees_to_resolve.get_mut(dir).unwrap();
319 let same_change = self.store.merge_options().same_change;
320 tree.mark_completed(basename.to_owned(), value, same_change);
321 if tree.pending_lookup.is_empty() {
324 let tree = self.trees_to_resolve.remove(dir).unwrap();
325 self.enqueue_tree_write(dir.to_owned(), tree.into_backend_trees());
326 }
327 }
328}
329
330async fn read_trees(
331 store: Arc<Store>,
332 dir: RepoPathBuf,
333 value: MergedTreeValue,
334) -> BackendResult<Merge<Tree>> {
335 let trees = value
336 .to_tree_merge(&store, &dir)
337 .await?
338 .expect("Should be tree merge");
339 Ok(trees)
340}
341
342async fn write_trees(
343 store: Arc<Store>,
344 dir: RepoPathBuf,
345 backend_trees: Merge<backend::Tree>,
346) -> BackendResult<Merge<Tree>> {
347 let trees = try_join_all(
350 backend_trees
351 .into_iter()
352 .map(|backend_tree| store.write_tree(&dir, backend_tree)),
353 )
354 .await?;
355 Ok(Merge::from_vec(trees))
356}
357
358async fn resolve_file_values_owned(
359 store: Arc<Store>,
360 path: RepoPathBuf,
361 values: MergedTreeValue,
362) -> BackendResult<MergedTreeValue> {
363 let maybe_resolved = try_resolve_file_values(&store, &path, &values).await?;
364 Ok(maybe_resolved.unwrap_or(values))
365}
366
367pub async fn resolve_file_values(
371 store: &Arc<Store>,
372 path: &RepoPath,
373 values: MergedTreeValue,
374) -> BackendResult<MergedTreeValue> {
375 let same_change = store.merge_options().same_change;
376 if let Some(resolved) = values.resolve_trivial(same_change) {
377 return Ok(Merge::resolved(resolved.clone()));
378 }
379
380 let maybe_resolved = try_resolve_file_values(store, path, &values).await?;
381 Ok(maybe_resolved.unwrap_or(values))
382}
383
384async fn try_resolve_file_values<T: Borrow<TreeValue>>(
385 store: &Arc<Store>,
386 path: &RepoPath,
387 values: &Merge<Option<T>>,
388) -> BackendResult<Option<MergedTreeValue>> {
389 let simplified = values
392 .map(|value| value.as_ref().map(Borrow::borrow))
393 .simplify();
394 if let Some(resolved) = try_resolve_file_conflict(store, path, &simplified).await? {
397 Ok(Some(Merge::normal(resolved)))
398 } else {
399 Ok(None)
401 }
402}
403
404async fn try_resolve_file_conflict(
409 store: &Store,
410 filename: &RepoPath,
411 conflict: &MergedTreeVal<'_>,
412) -> BackendResult<Option<TreeValue>> {
413 let options = store.merge_options();
414 let Ok(file_id_conflict) = conflict.try_map(|term| match term {
419 Some(TreeValue::File {
420 id,
421 executable: _,
422 copy_id: _,
423 }) => Ok(id),
424 _ => Err(()),
425 }) else {
426 return Ok(None);
427 };
428 let Ok(executable_conflict) = conflict.try_map(|term| match term {
429 Some(TreeValue::File {
430 id: _,
431 executable,
432 copy_id: _,
433 }) => Ok(executable),
434 _ => Err(()),
435 }) else {
436 return Ok(None);
437 };
438 let Ok(copy_id_conflict) = conflict.try_map(|term| match term {
439 Some(TreeValue::File {
440 id: _,
441 executable: _,
442 copy_id,
443 }) => Ok(copy_id),
444 _ => Err(()),
445 }) else {
446 return Ok(None);
447 };
448 let Some(&&executable) = executable_conflict.resolve_trivial(SameChange::Accept) else {
451 return Ok(None);
453 };
454 let Some(©_id) = copy_id_conflict.resolve_trivial(SameChange::Accept) else {
455 return Ok(None);
457 };
458 if let Some(&resolved_file_id) = file_id_conflict.resolve_trivial(options.same_change) {
459 return Ok(Some(TreeValue::File {
462 id: resolved_file_id.clone(),
463 executable,
464 copy_id: copy_id.clone(),
465 }));
466 }
467
468 let file_id_conflict = file_id_conflict.simplify();
475
476 let contents = file_id_conflict
477 .try_map_async(async |file_id| {
478 let mut content = vec![];
479 let mut reader = store.read_file(filename, file_id).await?;
480 reader
481 .read_to_end(&mut content)
482 .await
483 .map_err(|err| BackendError::ReadObject {
484 object_type: file_id.object_type(),
485 hash: file_id.hex(),
486 source: err.into(),
487 })?;
488 BackendResult::Ok(content)
489 })
490 .await?;
491 if let Some(merged_content) = files::try_merge(&contents, options) {
492 let id = store
493 .write_file(filename, &mut merged_content.as_slice())
494 .await?;
495 Ok(Some(TreeValue::File {
496 id,
497 executable,
498 copy_id: copy_id.clone(),
499 }))
500 } else {
501 Ok(None)
502 }
503}