use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::pin::pin;
use std::slice;
use futures::Stream;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
use itertools::Itertools as _;
use pollster::FutureExt as _;
use thiserror::Error;
use crate::backend::BackendError;
use crate::backend::BackendResult;
use crate::backend::CommitId;
use crate::commit::Commit;
use crate::dag_walk;
use crate::index::IndexError;
use crate::op_store::OpStoreError;
use crate::op_store::OpStoreResult;
use crate::op_walk;
use crate::operation::Operation;
use crate::repo::ReadonlyRepo;
use crate::repo::Repo as _;
#[derive(Clone, Debug, serde::Serialize)]
pub struct CommitEvolutionEntry {
pub commit: Commit,
pub operation: Option<Operation>,
#[serde(skip)]
reachable_predecessors: Option<Vec<CommitId>>,
}
impl CommitEvolutionEntry {
pub fn predecessor_ids(&self) -> &[CommitId] {
match &self.operation {
Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
None => self.reachable_predecessors.as_ref().unwrap(),
}
}
pub fn predecessors(&self) -> impl ExactSizeIterator<Item = BackendResult<Commit>> {
let store = self.commit.store();
self.predecessor_ids().iter().map(|id| store.get_commit(id))
}
}
#[expect(missing_docs)]
#[derive(Debug, Error)]
pub enum WalkPredecessorsError {
#[error(transparent)]
Backend(#[from] BackendError),
#[error(transparent)]
Index(#[from] IndexError),
#[error(transparent)]
OpStore(#[from] OpStoreError),
#[error("Predecessors cycle detected around commit {0}")]
CycleDetected(CommitId),
}
pub fn walk_predecessors<'repo>(
repo: &'repo ReadonlyRepo,
start_commits: &[CommitId],
) -> impl Iterator<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<'repo> {
let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed();
WalkPredecessors {
repo,
op_ancestors,
to_visit: start_commits.to_vec(),
queued: VecDeque::new(),
}
}
struct WalkPredecessors<'repo, I> {
repo: &'repo ReadonlyRepo,
op_ancestors: I,
to_visit: Vec<CommitId>,
queued: VecDeque<CommitEvolutionEntry>,
}
impl<I> WalkPredecessors<'_, I>
where
I: Stream<Item = OpStoreResult<Operation>> + Unpin,
{
fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
while !self.to_visit.is_empty() && self.queued.is_empty() {
let Some(op) = self.op_ancestors.next().block_on().transpose()? else {
self.flush_commits()?;
break;
};
if !op.stores_commit_predecessors() {
self.scan_commits()?;
break;
}
self.visit_op(&op)?;
}
Ok(self.queued.pop_front())
}
fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
let mut to_emit = Vec::new(); let mut has_dup = false;
let mut i = 0;
while let Some(cur_id) = self.to_visit.get(i) {
if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
if to_emit.contains(cur_id) {
self.to_visit.remove(i);
has_dup = true;
continue;
}
to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
} else {
i += 1;
}
}
let store = self.repo.store();
let mut emit = |id: &CommitId| -> BackendResult<()> {
let commit = store.get_commit(id)?;
self.queued.push_back(CommitEvolutionEntry {
commit,
operation: Some(op.clone()),
reachable_predecessors: None,
});
Ok(())
};
match &*to_emit {
[] => {}
[id] if !has_dup => emit(id)?,
_ => {
let sorted_ids = dag_walk::topo_order_reverse_ok(
to_emit.iter().map(Ok),
|&id| id,
|&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
|id| id, )
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
for &id in &sorted_ids {
if op.predecessors_for_commit(id).is_some() {
emit(id)?;
}
}
}
}
Ok(())
}
fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
let store = self.repo.store();
let index = self.repo.index();
let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
let commits = dag_walk::topo_order_reverse_ok(
self.to_visit.drain(..).map(|id| {
store
.get_commit(&id)
.map_err(WalkPredecessorsError::Backend)
}),
|commit: &Commit| commit.id().clone(),
|commit: &Commit| {
let ids = match commit_predecessors.entry(commit.id().clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let mut filtered = vec![];
for id in &commit.store_commit().predecessors {
match index.has_id(id) {
Ok(true) => {
filtered.push(id.clone());
}
Ok(false) => {
}
Err(err) => {
return vec![Err(WalkPredecessorsError::Index(err))];
}
}
}
entry.insert(filtered)
}
};
ids.iter()
.map(|id| store.get_commit(id).map_err(WalkPredecessorsError::Backend))
.collect_vec()
},
|_| panic!("graph has cycle"),
)?;
self.queued.extend(commits.into_iter().map(|commit| {
let predecessors = commit_predecessors
.remove(commit.id())
.expect("commit must be visited once");
CommitEvolutionEntry {
commit,
operation: None,
reachable_predecessors: Some(predecessors),
}
}));
Ok(())
}
fn flush_commits(&mut self) -> BackendResult<()> {
self.queued.reserve(self.to_visit.len());
for id in self.to_visit.drain(..) {
let commit = self.repo.store().get_commit(&id)?;
self.queued.push_back(CommitEvolutionEntry {
commit,
operation: None,
reachable_predecessors: Some(vec![]),
});
}
Ok(())
}
}
impl<I> Iterator for WalkPredecessors<'_, I>
where
I: Stream<Item = OpStoreResult<Operation>> + Unpin,
{
type Item = Result<CommitEvolutionEntry, WalkPredecessorsError>;
fn next(&mut self) -> Option<Self::Item> {
self.try_next().transpose()
}
}
pub async fn accumulate_predecessors(
new_ops: &[Operation],
old_ops: &[Operation],
) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
if new_ops.is_empty() || old_ops.is_empty() {
return Ok(BTreeMap::new()); }
if let [op] = new_ops
&& op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
{
let Some(map) = &op.store_operation().commit_predecessors else {
return Ok(BTreeMap::new());
};
return resolve_transitive_edges(map, map.keys())
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
}
let mut accumulated = BTreeMap::new();
let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
return Ok(BTreeMap::new());
}
let mut accumulated = reverse_edges(accumulated);
let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
return Ok(BTreeMap::new());
}
let new_commit_ids = new_ops
.iter()
.filter_map(|op| op.store_operation().commit_predecessors.as_ref())
.flat_map(|map| map.keys());
resolve_transitive_edges(&accumulated, new_commit_ids)
.map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
}
async fn try_collect_predecessors_into(
collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
ops: impl Stream<Item = OpStoreResult<Operation>>,
) -> OpStoreResult<bool> {
let mut ops = pin!(ops);
while let Some(op) = ops.try_next().await? {
let Some(map) = &op.store_operation().commit_predecessors else {
return Ok(false);
};
collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
}
Ok(true)
}
fn resolve_transitive_edges<'a: 'b, 'b>(
graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
start: impl IntoIterator<Item = &'b CommitId>,
) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
let sorted_ids = dag_walk::topo_order_forward_ok(
start.into_iter().map(Ok),
|&id| id,
|&id| graph.get(id).into_iter().flatten().map(Ok),
|id| id, )?;
for cur_id in sorted_ids {
let Some(neighbors) = graph.get(cur_id) else {
continue;
};
let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
let new_neighbors = match &neighbors[..] {
[id] => lookup(id).to_vec(), ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
};
new_graph.insert(cur_id.clone(), new_neighbors);
}
Ok(new_graph)
}
fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
for (node1, neighbors) in graph {
for node2 in neighbors {
new_graph.entry(node2).or_default().push(node1.clone());
}
}
new_graph
}