1use std::cmp::Ordering;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::future::Future;
8use std::sync::{Arc, Mutex};
9
10use async_trait::async_trait;
11use eden_dag::ops::{DagPersistent, Parents};
12use eden_dag::set::hints::Hints;
13use eden_dag::{DagAlgorithm, Group, VertexListWithOptions, VertexOptions};
14use eyre::Context;
15use futures::{StreamExt, TryStreamExt};
16use itertools::Itertools;
17use once_cell::sync::OnceCell;
18use tracing::{instrument, trace, warn};
19
20use crate::core::effects::{Effects, OperationType};
21use crate::core::eventlog::{CommitActivityStatus, EventCursor, EventReplayer};
22use crate::git::{Commit, MaybeZeroOid, NonZeroOid, Repo, Time};
23
24use super::repo_ext::RepoReferencesSnapshot;
25
26impl From<NonZeroOid> for eden_dag::Vertex {
27 fn from(oid: NonZeroOid) -> Self {
28 eden_dag::Vertex::copy_from(oid.as_bytes())
29 }
30}
31
32impl TryFrom<eden_dag::Vertex> for MaybeZeroOid {
33 type Error = eyre::Error;
34
35 fn try_from(value: eden_dag::Vertex) -> Result<Self, Self::Error> {
36 let oid = git2::Oid::from_bytes(value.as_ref())?;
37 let oid = MaybeZeroOid::from(oid);
38 Ok(oid)
39 }
40}
41
42impl TryFrom<eden_dag::Vertex> for NonZeroOid {
43 type Error = eyre::Error;
44
45 fn try_from(value: eden_dag::Vertex) -> Result<Self, Self::Error> {
46 let oid = MaybeZeroOid::try_from(value)?;
47 let oid = NonZeroOid::try_from(oid)?;
48 Ok(oid)
49 }
50}
51
52pub type CommitSet = eden_dag::Set;
54
55pub type CommitVertex = eden_dag::Vertex;
57
58impl From<NonZeroOid> for CommitSet {
59 fn from(oid: NonZeroOid) -> Self {
60 let vertex = CommitVertex::from(oid);
61 CommitSet::from_static_names([vertex])
62 }
63}
64
65impl FromIterator<NonZeroOid> for CommitSet {
66 fn from_iter<T: IntoIterator<Item = NonZeroOid>>(iter: T) -> Self {
67 let oids = iter
68 .into_iter()
69 .map(CommitVertex::from)
70 .map(Ok)
71 .collect_vec();
72 CommitSet::from_iter(oids, Hints::default())
73 }
74}
75
76pub fn union_all(commits: &[CommitSet]) -> CommitSet {
78 commits
79 .iter()
80 .fold(CommitSet::empty(), |acc, elem| acc.union(elem))
81}
82
83struct GitParentsBlocking {
84 repo: Arc<Mutex<Repo>>,
85}
86
87#[async_trait]
88impl Parents for GitParentsBlocking {
89 async fn parent_names(&self, v: CommitVertex) -> eden_dag::Result<Vec<CommitVertex>> {
90 use eden_dag::errors::BackendError;
91 trace!(?v, "visiting Git commit");
92
93 let oid = MaybeZeroOid::from_bytes(v.as_ref())
94 .map_err(|_e| anyhow::anyhow!("Could not convert to Git oid: {:?}", &v))
95 .map_err(BackendError::Other)?;
96 let oid = match oid {
97 MaybeZeroOid::NonZero(oid) => oid,
98 MaybeZeroOid::Zero => return Ok(Vec::new()),
99 };
100
101 let repo = self.repo.lock().unwrap();
102 let commit = repo
103 .find_commit(oid)
104 .map_err(|_e| anyhow::anyhow!("Could not resolve to Git commit: {:?}", &v))
105 .map_err(BackendError::Other)?;
106 let commit = match commit {
107 Some(commit) => commit,
108 None => {
109 return Ok(Vec::new());
112 }
113 };
114
115 Ok(commit
116 .get_parent_oids()
117 .into_iter()
118 .map(CommitVertex::from)
119 .collect())
120 }
121
122 async fn hint_subdag_for_insertion(
123 &self,
124 _heads: &[CommitVertex],
125 ) -> Result<eden_dag::MemDag, eden_dag::Error> {
126 Ok(eden_dag::MemDag::new())
127 }
128}
129
130pub struct Dag {
133 inner: eden_dag::Dag,
134
135 pub head_commit: CommitSet,
138
139 pub main_branch_commit: CommitSet,
141
142 pub branch_commits: CommitSet,
144
145 observed_commits: CommitSet,
148
149 obsolete_commits: CommitSet,
152
153 public_commits: OnceCell<CommitSet>,
154 visible_heads: OnceCell<CommitSet>,
155 visible_commits: OnceCell<CommitSet>,
156 draft_commits: OnceCell<CommitSet>,
157}
158
159impl Dag {
160 pub fn try_clone(&self, repo: &Repo) -> eyre::Result<Self> {
162 let inner = Self::open_inner_dag(repo)?;
163 Ok(Self {
164 inner,
165 head_commit: self.head_commit.clone(),
166 main_branch_commit: self.main_branch_commit.clone(),
167 branch_commits: self.branch_commits.clone(),
168 observed_commits: self.observed_commits.clone(),
169 obsolete_commits: self.obsolete_commits.clone(),
170 public_commits: OnceCell::new(),
171 visible_heads: OnceCell::new(),
172 visible_commits: OnceCell::new(),
173 draft_commits: OnceCell::new(),
174 })
175 }
176
177 #[instrument]
180 pub fn open_and_sync(
181 effects: &Effects,
182 repo: &Repo,
183 event_replayer: &EventReplayer,
184 event_cursor: EventCursor,
185 references_snapshot: &RepoReferencesSnapshot,
186 ) -> eyre::Result<Self> {
187 let mut dag = Self::open_without_syncing(
188 effects,
189 repo,
190 event_replayer,
191 event_cursor,
192 references_snapshot,
193 )?;
194 dag.sync(effects, repo)?;
195 Ok(dag)
196 }
197
198 #[instrument]
205 pub fn open_without_syncing(
206 effects: &Effects,
207 repo: &Repo,
208 event_replayer: &EventReplayer,
209 event_cursor: EventCursor,
210 references_snapshot: &RepoReferencesSnapshot,
211 ) -> eyre::Result<Self> {
212 let observed_commits = event_replayer.get_cursor_oids(event_cursor);
213 let RepoReferencesSnapshot {
214 head_oid,
215 main_branch_oid,
216 branch_oid_to_names,
217 } = references_snapshot;
218
219 let obsolete_commits: CommitSet = observed_commits
220 .iter()
221 .copied()
222 .filter(|commit_oid| {
223 match event_replayer.get_cursor_commit_activity_status(event_cursor, *commit_oid) {
224 CommitActivityStatus::Active | CommitActivityStatus::Inactive => false,
225 CommitActivityStatus::Obsolete => true,
226 }
227 })
228 .collect();
229
230 let dag = Self::open_inner_dag(repo)?;
231
232 let observed_commits: CommitSet = observed_commits.into_iter().collect();
233 let head_commit = match head_oid {
234 Some(head_oid) => CommitSet::from(*head_oid),
235 None => CommitSet::empty(),
236 };
237 let main_branch_commit = CommitSet::from(*main_branch_oid);
238 let branch_commits: CommitSet = branch_oid_to_names.keys().copied().collect();
239
240 Ok(Self {
241 inner: dag,
242 head_commit,
243 main_branch_commit,
244 branch_commits,
245 observed_commits,
246 obsolete_commits,
247 public_commits: Default::default(),
248 visible_heads: Default::default(),
249 visible_commits: Default::default(),
250 draft_commits: Default::default(),
251 })
252 }
253
254 #[instrument]
255 fn open_inner_dag(repo: &Repo) -> eyre::Result<eden_dag::Dag> {
256 let dag_dir = repo.get_dag_dir()?;
257 std::fs::create_dir_all(&dag_dir).wrap_err("Creating .git/branchless/dag dir")?;
258 let dag = eden_dag::Dag::open(&dag_dir)
259 .wrap_err_with(|| format!("Opening DAG directory at: {:?}", &dag_dir))?;
260 Ok(dag)
261 }
262
263 fn run_blocking<T>(&self, fut: impl Future<Output = T>) -> T {
264 futures::executor::block_on(fut)
265 }
266
267 #[instrument]
269 fn sync(&mut self, effects: &Effects, repo: &Repo) -> eyre::Result<()> {
270 let master_heads = self.main_branch_commit.clone();
271 let non_master_heads = self
272 .observed_commits
273 .union(&self.head_commit)
274 .union(&self.branch_commits);
275 self.sync_from_oids(effects, repo, master_heads, non_master_heads)
276 }
277
278 #[instrument]
280 pub fn sync_from_oids(
281 &mut self,
282 effects: &Effects,
283 repo: &Repo,
284 master_heads: CommitSet,
285 non_master_heads: CommitSet,
286 ) -> eyre::Result<()> {
287 let (effects, _progress) = effects.start_operation(OperationType::UpdateCommitGraph);
288 let _effects = effects;
289
290 let master_group_options = {
291 let mut options = VertexOptions::default();
292 options.desired_group = Group::MASTER;
293 options
294 };
295 let master_heads = self
296 .commit_set_to_vec(&master_heads)?
297 .into_iter()
298 .map(|vertex| (CommitVertex::from(vertex), master_group_options.clone()))
299 .collect_vec();
300 let non_master_heads = self
301 .commit_set_to_vec(&non_master_heads)?
302 .into_iter()
303 .map(|vertex| (CommitVertex::from(vertex), VertexOptions::default()))
304 .collect_vec();
305 let heads = [master_heads, non_master_heads].concat();
306
307 let repo = repo.try_clone()?;
308 futures::executor::block_on(self.inner.add_heads_and_flush(
309 &GitParentsBlocking {
310 repo: Arc::new(Mutex::new(repo)),
311 },
312 &VertexListWithOptions::from(heads),
313 ))?;
314 Ok(())
315 }
316
317 pub fn set_cursor(
320 &self,
321 effects: &Effects,
322 repo: &Repo,
323 event_replayer: &EventReplayer,
324 event_cursor: EventCursor,
325 ) -> eyre::Result<Self> {
326 let references_snapshot = event_replayer.get_references_snapshot(repo, event_cursor)?;
327 let dag = Self::open_without_syncing(
328 effects,
329 repo,
330 event_replayer,
331 event_cursor,
332 &references_snapshot,
333 )?;
334 Ok(dag)
335 }
336
337 #[instrument]
339 pub fn clear_obsolete_commits(&self, repo: &Repo) -> eyre::Result<Self> {
340 let inner = Self::open_inner_dag(repo)?;
341 Ok(Self {
342 inner,
343 head_commit: self.head_commit.clone(),
344 branch_commits: self.branch_commits.clone(),
345 main_branch_commit: self.main_branch_commit.clone(),
346 observed_commits: self.observed_commits.clone(),
347 obsolete_commits: CommitSet::empty(),
348 draft_commits: Default::default(),
349 public_commits: Default::default(),
350 visible_heads: Default::default(),
351 visible_commits: Default::default(),
352 })
353 }
354
355 #[instrument]
357 pub fn sort(&self, commit_set: &CommitSet) -> eyre::Result<Vec<NonZeroOid>> {
358 let commit_set = self.run_blocking(self.inner.sort(commit_set))?;
359 let commit_oids = self.commit_set_to_vec(&commit_set)?;
360
361 Ok(commit_oids.into_iter().rev().collect())
364 }
365
366 #[instrument]
368 pub fn commit_set_to_vec(&self, commit_set: &CommitSet) -> eyre::Result<Vec<NonZeroOid>> {
369 async fn map_vertex(
370 vertex: Result<CommitVertex, eden_dag::Error>,
371 ) -> eyre::Result<NonZeroOid> {
372 let vertex = vertex.wrap_err("Evaluating vertex")?;
373 let vertex = NonZeroOid::try_from(vertex.clone())
374 .wrap_err_with(|| format!("Converting vertex to NonZeroOid: {:?}", &vertex))?;
375 Ok(vertex)
376 }
377 let stream = self
378 .run_blocking(commit_set.iter())
379 .wrap_err("Iterating commit set")?;
380 let result = self.run_blocking(stream.then(map_vertex).try_collect())?;
381 Ok(result)
382 }
383
384 #[instrument]
387 pub fn get_only_parent_oid(&self, oid: NonZeroOid) -> eyre::Result<NonZeroOid> {
388 let parents: CommitSet = self.run_blocking(self.inner.parents(CommitSet::from(oid)))?;
389 match self.commit_set_to_vec(&parents)?[..] {
390 [oid] => Ok(oid),
391 [] => Err(eyre::eyre!("Commit {} has no parents.", oid)),
392 _ => Err(eyre::eyre!("Commit {} has more than 1 parents.", oid)),
393 }
394 }
395
396 pub fn query(&self) -> &eden_dag::Dag {
398 &self.inner
399 }
400
401 #[instrument]
404 pub fn is_public_commit(&self, commit_oid: NonZeroOid) -> eyre::Result<bool> {
405 let main_branch_commits = self.commit_set_to_vec(&self.main_branch_commit)?;
406 for main_branch_commit in main_branch_commits {
407 if self.run_blocking(
408 self.inner
409 .is_ancestor(commit_oid.into(), main_branch_commit.into()),
410 )? {
411 return Ok(true);
412 }
413 }
414 Ok(false)
415 }
416
417 #[instrument]
419 pub fn query_is_ancestor(
420 &self,
421 ancestor: NonZeroOid,
422 descendant: NonZeroOid,
423 ) -> eden_dag::Result<bool> {
424 let result =
425 self.run_blocking(self.inner.is_ancestor(ancestor.into(), descendant.into()))?;
426 Ok(result)
427 }
428
429 #[instrument]
431 pub fn set_is_empty(&self, commit_set: &CommitSet) -> eden_dag::Result<bool> {
432 let result = self.run_blocking(commit_set.is_empty())?;
433 Ok(result)
434 }
435
436 #[instrument]
438 pub fn set_contains<T: Into<CommitVertex> + Debug>(
439 &self,
440 commit_set: &CommitSet,
441 oid: T,
442 ) -> eden_dag::Result<bool> {
443 let result = self.run_blocking(commit_set.contains(&oid.into()))?;
444 Ok(result)
445 }
446
447 #[instrument]
449 pub fn set_count(&self, commit_set: &CommitSet) -> eden_dag::Result<usize> {
450 let result = self.run_blocking(commit_set.count())?;
451 Ok(result.try_into().unwrap())
452 }
453
454 #[instrument]
456 pub fn set_first(&self, commit_set: &CommitSet) -> eden_dag::Result<Option<CommitVertex>> {
457 let result = self.run_blocking(commit_set.first())?;
458 Ok(result)
459 }
460
461 #[instrument]
465 pub fn query_public_commits_slow(&self) -> eyre::Result<&CommitSet> {
466 self.public_commits.get_or_try_init(|| {
467 let public_commits =
468 self.run_blocking(self.inner.ancestors(self.main_branch_commit.clone()))?;
469 Ok(public_commits)
470 })
471 }
472
473 #[instrument]
477 pub fn query_visible_heads(&self) -> eyre::Result<&CommitSet> {
478 self.visible_heads.get_or_try_init(|| {
479 let visible_heads = CommitSet::empty()
480 .union(&self.observed_commits.difference(&self.obsolete_commits))
481 .union(&self.head_commit)
482 .union(&self.main_branch_commit)
483 .union(&self.branch_commits);
484 let visible_heads = self.run_blocking(self.inner.heads(visible_heads))?;
485 Ok(visible_heads)
486 })
487 }
488
489 #[instrument]
493 pub fn query_visible_commits_slow(&self) -> eyre::Result<&CommitSet> {
494 self.visible_commits.get_or_try_init(|| {
495 let visible_heads = self.query_visible_heads()?;
496 let result = self.run_blocking(self.inner.ancestors(visible_heads.clone()))?;
497 Ok(result)
498 })
499 }
500
501 #[instrument]
504 pub fn filter_visible_commits(&self, commits: CommitSet) -> eyre::Result<CommitSet> {
505 let visible_heads = self.query_visible_heads()?;
506 Ok(commits.intersection(
507 &self.run_blocking(self.inner.range(commits.clone(), visible_heads.clone()))?,
508 ))
509 }
510
511 #[instrument]
514 pub fn query_obsolete_commits(&self) -> CommitSet {
515 self.obsolete_commits.clone()
516 }
517
518 #[instrument]
521 pub fn query_draft_commits(&self) -> eyre::Result<&CommitSet> {
522 self.draft_commits.get_or_try_init(|| {
523 let visible_heads = self.query_visible_heads()?;
524 let draft_commits = self.run_blocking(
525 self.inner
526 .only(visible_heads.clone(), self.main_branch_commit.clone()),
527 )?;
528 Ok(draft_commits)
529 })
530 }
531
532 #[instrument]
535 pub fn query_stack_commits(&self, commit_set: CommitSet) -> eyre::Result<CommitSet> {
536 let draft_commits = self.query_draft_commits()?;
537 let stack_roots = self.query_roots(draft_commits.clone())?;
538 let stack_ancestors = self.query_range(stack_roots, commit_set)?;
539 let stack = self
540 .query_range(stack_ancestors, draft_commits.clone())?;
553 Ok(stack)
554 }
555
556 #[instrument]
558 pub fn query_all(&self) -> eyre::Result<CommitSet> {
559 let result = self.run_blocking(self.inner.all())?;
560 Ok(result)
561 }
562
563 #[instrument]
565 pub fn query_parents(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
566 let result = self.run_blocking(self.inner.parents(commit_set))?;
567 Ok(result)
568 }
569
570 #[instrument]
572 pub fn query_parent_names<T: Into<CommitVertex> + Debug>(
573 &self,
574 vertex: T,
575 ) -> eden_dag::Result<Vec<CommitVertex>> {
576 let result = self.run_blocking(self.inner.parent_names(vertex.into()))?;
577 Ok(result)
578 }
579
580 #[instrument]
582 pub fn query_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
583 let result = self.run_blocking(self.inner.ancestors(commit_set))?;
584 Ok(result)
585 }
586
587 #[instrument]
589 pub fn query_first_ancestor_nth(
590 &self,
591 vertex: CommitVertex,
592 n: u64,
593 ) -> eden_dag::Result<Option<CommitVertex>> {
594 let result = self.run_blocking(self.inner.first_ancestor_nth(vertex, n))?;
595 Ok(result)
596 }
597
598 #[instrument]
600 pub fn query_children(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
601 let result = self.run_blocking(self.inner.children(commit_set))?;
602 Ok(result)
603 }
604
605 #[instrument]
607 pub fn query_descendants(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
608 let result = self.run_blocking(self.inner.descendants(commit_set))?;
609 Ok(result)
610 }
611
612 #[instrument]
614 pub fn query_roots(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
615 let result = self.run_blocking(self.inner.roots(commit_set))?;
616 Ok(result)
617 }
618
619 #[instrument]
621 pub fn query_heads(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
622 let result = self.run_blocking(self.inner.heads(commit_set))?;
623 Ok(result)
624 }
625
626 #[instrument]
628 pub fn query_heads_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
629 let result = self.run_blocking(self.inner.heads_ancestors(commit_set))?;
630 Ok(result)
631 }
632
633 #[instrument]
635 pub fn query_only(
636 &self,
637 reachable: CommitSet,
638 unreachable: CommitSet,
639 ) -> eden_dag::Result<CommitSet> {
640 let result = self.run_blocking(self.inner.only(reachable, unreachable))?;
641 Ok(result)
642 }
643
644 #[instrument]
646 pub fn query_range(&self, roots: CommitSet, heads: CommitSet) -> eden_dag::Result<CommitSet> {
647 let result = self.run_blocking(self.inner.range(roots, heads))?;
648 Ok(result)
649 }
650
651 #[instrument]
653 pub fn query_common_ancestors(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
654 let result = self.run_blocking(self.inner.common_ancestors(commit_set))?;
655 Ok(result)
656 }
657
658 #[instrument]
660 pub fn query_gca_one(&self, commit_set: CommitSet) -> eden_dag::Result<Option<CommitVertex>> {
661 let result = self.run_blocking(self.inner.gca_one(commit_set))?;
662 Ok(result)
663 }
664
665 #[instrument]
667 pub fn query_gca_all(&self, commit_set: CommitSet) -> eden_dag::Result<CommitSet> {
668 let result = self.run_blocking(self.inner.gca_all(commit_set))?;
669 Ok(result)
670 }
671
672 #[instrument]
679 pub fn get_connected_components(&self, commit_set: &CommitSet) -> eyre::Result<Vec<CommitSet>> {
680 let mut components: Vec<CommitSet> = Vec::new();
681 let mut component = CommitSet::empty();
682 let mut commits_to_connect = commit_set.clone();
683
684 for commit in self.commit_set_to_vec(commit_set)? {
687 if self.run_blocking(commits_to_connect.is_empty())? {
688 break;
689 }
690
691 if !self.run_blocking(commits_to_connect.contains(&commit.into()))? {
692 continue;
693 }
694
695 let mut commits = CommitSet::from(commit);
696 while !self.run_blocking(commits.is_empty())? {
697 component = component.union(&commits);
698 commits_to_connect = commits_to_connect.difference(&commits);
699
700 let parents = self.run_blocking(self.inner.parents(commits.clone()))?;
701 let children = self.run_blocking(self.inner.children(commits.clone()))?;
702 commits = parents.union(&children).intersection(&commits_to_connect);
703 }
704
705 components.push(component);
706 component = CommitSet::empty();
707 }
708
709 let connected_commits = union_all(&components);
710 assert_eq!(
711 self.run_blocking(commit_set.count())?,
712 self.run_blocking(connected_commits.count())?
713 );
714 let connected_commits = commit_set.intersection(&connected_commits);
715 assert_eq!(
716 self.run_blocking(commit_set.count())?,
717 self.run_blocking(connected_commits.count())?
718 );
719
720 Ok(components)
721 }
722}
723
724impl std::fmt::Debug for Dag {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 write!(f, "<Dag>")
727 }
728}
729
730pub fn sorted_commit_set<'repo>(
745 repo: &'repo Repo,
746 dag: &Dag,
747 commit_set: &CommitSet,
748) -> eyre::Result<Vec<Commit<'repo>>> {
749 let commit_oids = dag.commit_set_to_vec(commit_set)?;
750 let mut commits: Vec<Commit> = {
751 let mut commits = Vec::new();
752 for commit_oid in commit_oids {
753 if let Some(commit) = repo.find_commit(commit_oid)? {
754 commits.push(commit)
755 }
756 }
757 commits
758 };
759
760 let commit_times: HashMap<NonZeroOid, Time> = commits
761 .iter()
762 .map(|commit| (commit.get_oid(), commit.get_time()))
763 .collect();
764
765 commits.sort_by(|lhs, rhs| {
766 let lhs_vertex = CommitVertex::from(lhs.get_oid());
767 let rhs_vertex = CommitVertex::from(rhs.get_oid());
768 if dag
769 .query_is_ancestor(lhs.get_oid(), rhs.get_oid())
770 .unwrap_or_else(|_| {
771 warn!(
772 ?lhs_vertex,
773 ?rhs_vertex,
774 "Could not calculate `is_ancestor`"
775 );
776 false
777 })
778 {
779 return Ordering::Less;
780 } else if dag
781 .query_is_ancestor(rhs.get_oid(), lhs.get_oid())
782 .unwrap_or_else(|_| {
783 warn!(
784 ?lhs_vertex,
785 ?rhs_vertex,
786 "Could not calculate `is_ancestor`"
787 );
788 false
789 })
790 {
791 return Ordering::Greater;
792 }
793
794 (&commit_times[&lhs.get_oid()], lhs.get_oid())
795 .cmp(&(&commit_times[&rhs.get_oid()], rhs.get_oid()))
796 });
797
798 Ok(commits)
799}