1use std::cmp::Ordering;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::future::Future;
8use std::sync::{Arc, Mutex};
9
10use async_trait::async_trait;
11use eden_dag::namedag::MemNameDag;
12use eden_dag::nameset::hints::Hints;
13use eden_dag::ops::{DagPersistent, Parents};
14use eden_dag::{DagAlgorithm, Group, VertexListWithOptions, VertexOptions};
15use eyre::Context;
16use futures::{StreamExt, TryStreamExt};
17use itertools::Itertools;
18use once_cell::sync::OnceCell;
19use tracing::{instrument, trace, warn};
20
21use crate::core::effects::{Effects, OperationType};
22use crate::core::eventlog::{CommitActivityStatus, EventCursor, EventReplayer};
23use crate::git::{Commit, MaybeZeroOid, NonZeroOid, Repo, Time};
24
25use super::repo_ext::RepoReferencesSnapshot;
26
27impl From<NonZeroOid> for eden_dag::VertexName {
28 fn from(oid: NonZeroOid) -> Self {
29 eden_dag::VertexName::copy_from(oid.as_bytes())
30 }
31}
32
33impl TryFrom<eden_dag::VertexName> for MaybeZeroOid {
34 type Error = eyre::Error;
35
36 fn try_from(value: eden_dag::VertexName) -> Result<Self, Self::Error> {
37 let oid = git2::Oid::from_bytes(value.as_ref())?;
38 let oid = MaybeZeroOid::from(oid);
39 Ok(oid)
40 }
41}
42
43impl TryFrom<eden_dag::VertexName> for NonZeroOid {
44 type Error = eyre::Error;
45
46 fn try_from(value: eden_dag::VertexName) -> Result<Self, Self::Error> {
47 let oid = MaybeZeroOid::try_from(value)?;
48 let oid = NonZeroOid::try_from(oid)?;
49 Ok(oid)
50 }
51}
52
53pub type CommitSet = eden_dag::NameSet;
55
56pub type CommitVertex = eden_dag::VertexName;
58
59impl From<NonZeroOid> for CommitSet {
60 fn from(oid: NonZeroOid) -> Self {
61 let vertex = CommitVertex::from(oid);
62 CommitSet::from_static_names([vertex])
63 }
64}
65
66impl FromIterator<NonZeroOid> for CommitSet {
67 fn from_iter<T: IntoIterator<Item = NonZeroOid>>(iter: T) -> Self {
68 let oids = iter
69 .into_iter()
70 .map(CommitVertex::from)
71 .map(Ok)
72 .collect_vec();
73 CommitSet::from_iter(oids, Hints::default())
74 }
75}
76
77pub fn union_all(commits: &[CommitSet]) -> CommitSet {
79 commits
80 .iter()
81 .fold(CommitSet::empty(), |acc, elem| acc.union(elem))
82}
83
84struct GitParentsBlocking {
85 repo: Arc<Mutex<Repo>>,
86}
87
88#[async_trait]
89impl Parents for GitParentsBlocking {
90 async fn parent_names(&self, v: CommitVertex) -> eden_dag::Result<Vec<CommitVertex>> {
91 use eden_dag::errors::BackendError;
92 trace!(?v, "visiting Git commit");
93
94 let oid = MaybeZeroOid::from_bytes(v.as_ref())
95 .map_err(|_e| anyhow::anyhow!("Could not convert to Git oid: {:?}", &v))
96 .map_err(BackendError::Other)?;
97 let oid = match oid {
98 MaybeZeroOid::NonZero(oid) => oid,
99 MaybeZeroOid::Zero => return Ok(Vec::new()),
100 };
101
102 let repo = self.repo.lock().unwrap();
103 let commit = repo
104 .find_commit(oid)
105 .map_err(|_e| anyhow::anyhow!("Could not resolve to Git commit: {:?}", &v))
106 .map_err(BackendError::Other)?;
107 let commit = match commit {
108 Some(commit) => commit,
109 None => {
110 return Ok(Vec::new());
113 }
114 };
115
116 Ok(commit
117 .get_parent_oids()
118 .into_iter()
119 .map(CommitVertex::from)
120 .collect())
121 }
122
123 async fn hint_subdag_for_insertion(
124 &self,
125 _heads: &[CommitVertex],
126 ) -> Result<MemNameDag, eden_dag::Error> {
127 Ok(MemNameDag::new())
128 }
129}
130
131pub struct Dag {
134 inner: eden_dag::Dag,
135
136 pub head_commit: CommitSet,
139
140 pub main_branch_commit: CommitSet,
142
143 pub branch_commits: CommitSet,
145
146 observed_commits: CommitSet,
149
150 obsolete_commits: CommitSet,
153
154 public_commits: OnceCell<CommitSet>,
155 visible_heads: OnceCell<CommitSet>,
156 visible_commits: OnceCell<CommitSet>,
157 draft_commits: OnceCell<CommitSet>,
158}
159
160impl Dag {
161 pub fn try_clone(&self, repo: &Repo) -> eyre::Result<Self> {
163 let inner = Self::open_inner_dag(repo)?;
164 Ok(Self {
165 inner,
166 head_commit: self.head_commit.clone(),
167 main_branch_commit: self.main_branch_commit.clone(),
168 branch_commits: self.branch_commits.clone(),
169 observed_commits: self.observed_commits.clone(),
170 obsolete_commits: self.obsolete_commits.clone(),
171 public_commits: OnceCell::new(),
172 visible_heads: OnceCell::new(),
173 visible_commits: OnceCell::new(),
174 draft_commits: OnceCell::new(),
175 })
176 }
177
178 #[instrument]
181 pub fn open_and_sync(
182 effects: &Effects,
183 repo: &Repo,
184 event_replayer: &EventReplayer,
185 event_cursor: EventCursor,
186 references_snapshot: &RepoReferencesSnapshot,
187 ) -> eyre::Result<Self> {
188 let mut dag = Self::open_without_syncing(
189 effects,
190 repo,
191 event_replayer,
192 event_cursor,
193 references_snapshot,
194 )?;
195 dag.sync(effects, repo)?;
196 Ok(dag)
197 }
198
199 #[instrument]
206 pub fn open_without_syncing(
207 effects: &Effects,
208 repo: &Repo,
209 event_replayer: &EventReplayer,
210 event_cursor: EventCursor,
211 references_snapshot: &RepoReferencesSnapshot,
212 ) -> eyre::Result<Self> {
213 let observed_commits = event_replayer.get_cursor_oids(event_cursor);
214 let RepoReferencesSnapshot {
215 head_oid,
216 main_branch_oid,
217 branch_oid_to_names,
218 } = references_snapshot;
219
220 let obsolete_commits: CommitSet = observed_commits
221 .iter()
222 .copied()
223 .filter(|commit_oid| {
224 match event_replayer.get_cursor_commit_activity_status(event_cursor, *commit_oid) {
225 CommitActivityStatus::Active | CommitActivityStatus::Inactive => false,
226 CommitActivityStatus::Obsolete => true,
227 }
228 })
229 .collect();
230
231 let dag = Self::open_inner_dag(repo)?;
232
233 let observed_commits: CommitSet = observed_commits.into_iter().collect();
234 let head_commit = match head_oid {
235 Some(head_oid) => CommitSet::from(*head_oid),
236 None => CommitSet::empty(),
237 };
238 let main_branch_commit = CommitSet::from(*main_branch_oid);
239 let branch_commits: CommitSet = branch_oid_to_names.keys().copied().collect();
240
241 Ok(Self {
242 inner: dag,
243 head_commit,
244 main_branch_commit,
245 branch_commits,
246 observed_commits,
247 obsolete_commits,
248 public_commits: Default::default(),
249 visible_heads: Default::default(),
250 visible_commits: Default::default(),
251 draft_commits: Default::default(),
252 })
253 }
254
255 #[instrument]
256 fn open_inner_dag(repo: &Repo) -> eyre::Result<eden_dag::Dag> {
257 let dag_dir = repo.get_dag_dir()?;
258 std::fs::create_dir_all(&dag_dir).wrap_err("Creating .git/branchless/dag dir")?;
259 let dag = eden_dag::Dag::open(&dag_dir)
260 .wrap_err_with(|| format!("Opening DAG directory at: {:?}", &dag_dir))?;
261 Ok(dag)
262 }
263
264 fn run_blocking<T>(&self, fut: impl Future<Output = T>) -> T {
265 futures::executor::block_on(fut)
266 }
267
268 #[instrument]
270 fn sync(&mut self, effects: &Effects, repo: &Repo) -> eyre::Result<()> {
271 let master_heads = self.main_branch_commit.clone();
272 let non_master_heads = self
273 .observed_commits
274 .union(&self.head_commit)
275 .union(&self.branch_commits);
276 self.sync_from_oids(effects, repo, master_heads, non_master_heads)
277 }
278
279 #[instrument]
281 pub fn sync_from_oids(
282 &mut self,
283 effects: &Effects,
284 repo: &Repo,
285 master_heads: CommitSet,
286 non_master_heads: CommitSet,
287 ) -> eyre::Result<()> {
288 let (effects, _progress) = effects.start_operation(OperationType::UpdateCommitGraph);
289 let _effects = effects;
290
291 let master_group_options = {
292 let mut options = VertexOptions::default();
293 options.highest_group = Group::MASTER;
294 options
295 };
296 let master_heads = self
297 .commit_set_to_vec(&master_heads)?
298 .into_iter()
299 .map(|vertex| (CommitVertex::from(vertex), master_group_options.clone()))
300 .collect_vec();
301 let non_master_heads = self
302 .commit_set_to_vec(&non_master_heads)?
303 .into_iter()
304 .map(|vertex| (CommitVertex::from(vertex), VertexOptions::default()))
305 .collect_vec();
306 let heads = [master_heads, non_master_heads].concat();
307
308 let repo = repo.try_clone()?;
309 futures::executor::block_on(self.inner.add_heads_and_flush(
310 &GitParentsBlocking {
311 repo: Arc::new(Mutex::new(repo)),
312 },
313 &VertexListWithOptions::from(heads),
314 ))?;
315 Ok(())
316 }
317
318 pub fn set_cursor(
321 &self,
322 effects: &Effects,
323 repo: &Repo,
324 event_replayer: &EventReplayer,
325 event_cursor: EventCursor,
326 ) -> eyre::Result<Self> {
327 let references_snapshot = event_replayer.get_references_snapshot(repo, event_cursor)?;
328 let dag = Self::open_without_syncing(
329 effects,
330 repo,
331 event_replayer,
332 event_cursor,
333 &references_snapshot,
334 )?;
335 Ok(dag)
336 }
337
338 #[instrument]
340 pub fn clear_obsolete_commits(&self, repo: &Repo) -> eyre::Result<Self> {
341 let inner = Self::open_inner_dag(repo)?;
342 Ok(Self {
343 inner,
344 head_commit: self.head_commit.clone(),
345 branch_commits: self.branch_commits.clone(),
346 main_branch_commit: self.main_branch_commit.clone(),
347 observed_commits: self.observed_commits.clone(),
348 obsolete_commits: CommitSet::empty(),
349 draft_commits: Default::default(),
350 public_commits: Default::default(),
351 visible_heads: Default::default(),
352 visible_commits: Default::default(),
353 })
354 }
355
356 #[instrument]
358 pub fn sort(&self, commit_set: &CommitSet) -> eyre::Result<Vec<NonZeroOid>> {
359 let commit_set = self.run_blocking(self.inner.sort(commit_set))?;
360 let commit_oids = self.commit_set_to_vec(&commit_set)?;
361
362 Ok(commit_oids.into_iter().rev().collect())
365 }
366
367 #[instrument]
369 pub fn commit_set_to_vec(&self, commit_set: &CommitSet) -> eyre::Result<Vec<NonZeroOid>> {
370 async fn map_vertex(
371 vertex: Result<CommitVertex, eden_dag::Error>,
372 ) -> eyre::Result<NonZeroOid> {
373 let vertex = vertex.wrap_err("Evaluating vertex")?;
374 let vertex = NonZeroOid::try_from(vertex.clone())
375 .wrap_err_with(|| format!("Converting vertex to NonZeroOid: {:?}", &vertex))?;
376 Ok(vertex)
377 }
378 let stream = self
379 .run_blocking(commit_set.iter())
380 .wrap_err("Iterating commit set")?;
381 let result = self.run_blocking(stream.then(map_vertex).try_collect())?;
382 Ok(result)
383 }
384
385 #[instrument]
388 pub fn get_only_parent_oid(&self, oid: NonZeroOid) -> eyre::Result<NonZeroOid> {
389 let parents: CommitSet = self.run_blocking(self.inner.parents(CommitSet::from(oid)))?;
390 match self.commit_set_to_vec(&parents)?[..] {
391 [oid] => Ok(oid),
392 [] => Err(eyre::eyre!("Commit {} has no parents.", oid)),
393 _ => Err(eyre::eyre!("Commit {} has more than 1 parents.", oid)),
394 }
395 }
396
397 pub fn query(&self) -> &eden_dag::Dag {
399 &self.inner
400 }
401
402 #[instrument]
405 pub fn is_public_commit(&self, commit_oid: NonZeroOid) -> eyre::Result<bool> {
406 let main_branch_commits = self.commit_set_to_vec(&self.main_branch_commit)?;
407 for main_branch_commit in main_branch_commits {
408 if self.run_blocking(
409 self.inner
410 .is_ancestor(commit_oid.into(), main_branch_commit.into()),
411 )? {
412 return Ok(true);
413 }
414 }
415 Ok(false)
416 }
417
418 #[instrument]
420 pub fn query_is_ancestor(
421 &self,
422 ancestor: NonZeroOid,
423 descendant: NonZeroOid,
424 ) -> eden_dag::Result<bool> {
425 let result =
426 self.run_blocking(self.inner.is_ancestor(ancestor.into(), descendant.into()))?;
427 Ok(result)
428 }
429
430 #[instrument]
432 pub fn set_is_empty(&self, commit_set: &CommitSet) -> eden_dag::Result<bool> {
433 let result = self.run_blocking(commit_set.is_empty())?;
434 Ok(result)
435 }
436
437 #[instrument]
439 pub fn set_contains<T: Into<CommitVertex> + Debug>(
440 &self,
441 commit_set: &CommitSet,
442 oid: T,
443 ) -> eden_dag::Result<bool> {
444 let result = self.run_blocking(commit_set.contains(&oid.into()))?;
445 Ok(result)
446 }
447
448 #[instrument]
450 pub fn set_count(&self, commit_set: &CommitSet) -> eden_dag::Result<usize> {
451 let result = self.run_blocking(commit_set.count())?;
452 Ok(result)
453 }
454
455 #[instrument]
457 pub fn set_first(&self, commit_set: &CommitSet) -> eden_dag::Result<Option<CommitVertex>> {
458 let result = self.run_blocking(commit_set.first())?;
459 Ok(result)
460 }
461
462 #[instrument]
466 pub fn query_public_commits_slow(&self) -> eyre::Result<&CommitSet> {
467 self.public_commits.get_or_try_init(|| {
468 let public_commits =
469 self.run_blocking(self.inner.ancestors(self.main_branch_commit.clone()))?;
470 Ok(public_commits)
471 })
472 }
473
474 #[instrument]
478 pub fn query_visible_heads(&self) -> eyre::Result<&CommitSet> {
479 self.visible_heads.get_or_try_init(|| {
480 let visible_heads = CommitSet::empty()
481 .union(&self.observed_commits.difference(&self.obsolete_commits))
482 .union(&self.head_commit)
483 .union(&self.main_branch_commit)
484 .union(&self.branch_commits);
485 let visible_heads = self.run_blocking(self.inner.heads(visible_heads))?;
486 Ok(visible_heads)
487 })
488 }
489
490 #[instrument]
494 pub fn query_visible_commits_slow(&self) -> eyre::Result<&CommitSet> {
495 self.visible_commits.get_or_try_init(|| {
496 let visible_heads = self.query_visible_heads()?;
497 let result = self.run_blocking(self.inner.ancestors(visible_heads.clone()))?;
498 Ok(result)
499 })
500 }
501
502 #[instrument]
505 pub fn filter_visible_commits(&self, commits: CommitSet) -> eyre::Result<CommitSet> {
506 let visible_heads = self.query_visible_heads()?;
507 Ok(commits.intersection(
508 &self.run_blocking(self.inner.range(commits.clone(), visible_heads.clone()))?,
509 ))
510 }
511
512 #[instrument]
515 pub fn query_obsolete_commits(&self) -> CommitSet {
516 self.obsolete_commits.clone()
517 }
518
519 #[instrument]
522 pub fn query_draft_commits(&self) -> eyre::Result<&CommitSet> {
523 self.draft_commits.get_or_try_init(|| {
524 let visible_heads = self.query_visible_heads()?;
525 let draft_commits = self.run_blocking(
526 self.inner
527 .only(visible_heads.clone(), self.main_branch_commit.clone()),
528 )?;
529 Ok(draft_commits)
530 })
531 }
532
533 #[instrument]
536 pub fn query_stack_commits(&self, commit_set: CommitSet) -> eyre::Result<CommitSet> {
537 let draft_commits = self.query_draft_commits()?;
538 let stack_roots = self.query_roots(draft_commits.clone())?;
539 let stack_ancestors = self.query_range(stack_roots, commit_set)?;
540 let stack = self
541 .query_range(stack_ancestors, draft_commits.clone())?;
554 Ok(stack)
555 }
556
557 #[instrument]
559 pub fn query_all(&self) -> eyre::Result<CommitSet> {
560 let result = self.run_blocking(self.inner.all())?;
561 Ok(result)
562 }
563
564 #[instrument]
566 pub fn query_parents(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
567 let result = self.run_blocking(self.inner.parents(commit_set))?;
568 Ok(result)
569 }
570
571 #[instrument]
573 pub fn query_parent_names<T: Into<CommitVertex> + Debug>(
574 &self,
575 vertex: T,
576 ) -> eden_dag::Result<Vec<CommitVertex>> {
577 let result = self.run_blocking(self.inner.parent_names(vertex.into()))?;
578 Ok(result)
579 }
580
581 #[instrument]
583 pub fn query_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
584 let result = self.run_blocking(self.inner.ancestors(commit_set))?;
585 Ok(result)
586 }
587
588 #[instrument]
590 pub fn query_first_ancestor_nth(
591 &self,
592 vertex: CommitVertex,
593 n: u64,
594 ) -> eden_dag::Result<Option<CommitVertex>> {
595 let result = self.run_blocking(self.inner.first_ancestor_nth(vertex, n))?;
596 Ok(result)
597 }
598
599 #[instrument]
601 pub fn query_children(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
602 let result = self.run_blocking(self.inner.children(commit_set))?;
603 Ok(result)
604 }
605
606 #[instrument]
608 pub fn query_descendants(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
609 let result = self.run_blocking(self.inner.descendants(commit_set))?;
610 Ok(result)
611 }
612
613 #[instrument]
615 pub fn query_roots(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
616 let result = self.run_blocking(self.inner.roots(commit_set))?;
617 Ok(result)
618 }
619
620 #[instrument]
622 pub fn query_heads(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
623 let result = self.run_blocking(self.inner.heads(commit_set))?;
624 Ok(result)
625 }
626
627 #[instrument]
629 pub fn query_heads_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
630 let result = self.run_blocking(self.inner.heads_ancestors(commit_set))?;
631 Ok(result)
632 }
633
634 #[instrument]
636 pub fn query_only(
637 &self,
638 reachable: CommitSet,
639 unreachable: CommitSet,
640 ) -> eden_dag::Result<CommitSet> {
641 let result = self.run_blocking(self.inner.only(reachable, unreachable))?;
642 Ok(result)
643 }
644
645 #[instrument]
647 pub fn query_range(&self, roots: CommitSet, heads: CommitSet) -> eden_dag::Result<CommitSet> {
648 let result = self.run_blocking(self.inner.range(roots, heads))?;
649 Ok(result)
650 }
651
652 #[instrument]
654 pub fn query_common_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
655 let result = self.run_blocking(self.inner.common_ancestors(commit_set))?;
656 Ok(result)
657 }
658
659 #[instrument]
661 pub fn query_gca_one(&self, commit_set: CommitSet) -> eden_dag::Result<Option<CommitVertex>> {
662 let result = self.run_blocking(self.inner.gca_one(commit_set))?;
663 Ok(result)
664 }
665
666 #[instrument]
668 pub fn query_gca_all(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
669 let result = self.run_blocking(self.inner.gca_all(commit_set))?;
670 Ok(result)
671 }
672
673 #[instrument]
680 pub fn get_connected_components(&self, commit_set: &CommitSet) -> eyre::Result<Vec<CommitSet>> {
681 let mut components: Vec<CommitSet> = Vec::new();
682 let mut component = CommitSet::empty();
683 let mut commits_to_connect = commit_set.clone();
684
685 for commit in self.commit_set_to_vec(commit_set)? {
688 if self.run_blocking(commits_to_connect.is_empty())? {
689 break;
690 }
691
692 if !self.run_blocking(commits_to_connect.contains(&commit.into()))? {
693 continue;
694 }
695
696 let mut commits = CommitSet::from(commit);
697 while !self.run_blocking(commits.is_empty())? {
698 component = component.union(&commits);
699 commits_to_connect = commits_to_connect.difference(&commits);
700
701 let parents = self.run_blocking(self.inner.parents(commits.clone()))?;
702 let children = self.run_blocking(self.inner.children(commits.clone()))?;
703 commits = parents.union(&children).intersection(&commits_to_connect);
704 }
705
706 components.push(component);
707 component = CommitSet::empty();
708 }
709
710 let connected_commits = union_all(&components);
711 assert_eq!(
712 self.run_blocking(commit_set.count())?,
713 self.run_blocking(connected_commits.count())?
714 );
715 let connected_commits = commit_set.intersection(&connected_commits);
716 assert_eq!(
717 self.run_blocking(commit_set.count())?,
718 self.run_blocking(connected_commits.count())?
719 );
720
721 Ok(components)
722 }
723}
724
725impl std::fmt::Debug for Dag {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 write!(f, "<Dag>")
728 }
729}
730
731pub fn sorted_commit_set<'repo>(
746 repo: &'repo Repo,
747 dag: &Dag,
748 commit_set: &CommitSet,
749) -> eyre::Result<Vec<Commit<'repo>>> {
750 let commit_oids = dag.commit_set_to_vec(commit_set)?;
751 let mut commits: Vec<Commit> = {
752 let mut commits = Vec::new();
753 for commit_oid in commit_oids {
754 if let Some(commit) = repo.find_commit(commit_oid)? {
755 commits.push(commit)
756 }
757 }
758 commits
759 };
760
761 let commit_times: HashMap<NonZeroOid, Time> = commits
762 .iter()
763 .map(|commit| (commit.get_oid(), commit.get_time()))
764 .collect();
765
766 commits.sort_by(|lhs, rhs| {
767 let lhs_vertex = CommitVertex::from(lhs.get_oid());
768 let rhs_vertex = CommitVertex::from(rhs.get_oid());
769 if dag
770 .query_is_ancestor(lhs.get_oid(), rhs.get_oid())
771 .unwrap_or_else(|_| {
772 warn!(
773 ?lhs_vertex,
774 ?rhs_vertex,
775 "Could not calculate `is_ancestor`"
776 );
777 false
778 })
779 {
780 return Ordering::Less;
781 } else if dag
782 .query_is_ancestor(rhs.get_oid(), lhs.get_oid())
783 .unwrap_or_else(|_| {
784 warn!(
785 ?lhs_vertex,
786 ?rhs_vertex,
787 "Could not calculate `is_ancestor`"
788 );
789 false
790 })
791 {
792 return Ordering::Greater;
793 }
794
795 (&commit_times[&lhs.get_oid()], lhs.get_oid())
796 .cmp(&(&commit_times[&rhs.get_oid()], rhs.get_oid()))
797 });
798
799 Ok(commits)
800}