1use std::collections::BTreeMap;
18use std::collections::HashMap;
19use std::collections::VecDeque;
20use std::collections::hash_map::Entry;
21use std::pin::pin;
22use std::slice;
23
24use futures::Stream;
25use futures::StreamExt as _;
26use futures::TryStreamExt as _;
27use futures::future::join_all;
28use futures::future::try_join_all;
29use futures::stream;
30use itertools::Itertools as _;
31use thiserror::Error;
32
33use crate::backend::BackendError;
34use crate::backend::BackendResult;
35use crate::backend::CommitId;
36use crate::commit::Commit;
37use crate::dag_walk;
38use crate::dag_walk_async;
39use crate::index::IndexError;
40use crate::op_store::OpStoreError;
41use crate::op_store::OpStoreResult;
42use crate::op_walk;
43use crate::operation::Operation;
44use crate::repo::ReadonlyRepo;
45use crate::repo::Repo as _;
46
47#[derive(Clone, Debug, serde::Serialize)]
49pub struct CommitEvolutionEntry {
50 pub commit: Commit,
52 pub operation: Option<Operation>,
54 #[serde(skip)]
58 reachable_predecessors: Option<Vec<CommitId>>,
59}
60
61impl CommitEvolutionEntry {
62 pub fn predecessor_ids(&self) -> &[CommitId] {
64 match &self.operation {
65 Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
66 None => self.reachable_predecessors.as_ref().unwrap(),
67 }
68 }
69
70 pub async fn predecessors(&self) -> BackendResult<Vec<Commit>> {
72 let store = self.commit.store();
73 try_join_all(
74 self.predecessor_ids()
75 .iter()
76 .map(|id| store.get_commit_async(id)),
77 )
78 .await
79 }
80}
81
82#[expect(missing_docs)]
83#[derive(Debug, Error)]
84pub enum WalkPredecessorsError {
85 #[error(transparent)]
86 Backend(#[from] BackendError),
87 #[error(transparent)]
88 Index(#[from] IndexError),
89 #[error(transparent)]
90 OpStore(#[from] OpStoreError),
91 #[error("Predecessors cycle detected around commit {0}")]
92 CycleDetected(CommitId),
93}
94
95pub fn walk_predecessors<'repo>(
97 repo: &'repo ReadonlyRepo,
98 start_commits: &[CommitId],
99) -> impl Stream<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<'repo> {
100 let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed_local();
101 let state = WalkPredecessors {
102 repo,
103 op_ancestors,
104 to_visit: start_commits.to_vec(),
105 queued: VecDeque::new(),
106 };
107 stream::unfold(state, |mut state| async move {
108 let result = state.try_next_impl().await.transpose()?;
109 Some((result, state))
110 })
111}
112
113struct WalkPredecessors<'repo, I> {
114 repo: &'repo ReadonlyRepo,
115 op_ancestors: I,
116 to_visit: Vec<CommitId>,
117 queued: VecDeque<CommitEvolutionEntry>,
118}
119
120impl<I> WalkPredecessors<'_, I>
121where
122 I: Stream<Item = OpStoreResult<Operation>> + Unpin,
123{
124 async fn try_next_impl(
125 &mut self,
126 ) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
127 while !self.to_visit.is_empty() && self.queued.is_empty() {
128 let Some(op) = self.op_ancestors.try_next().await? else {
129 self.flush_commits().await?;
131 break;
132 };
133 if !op.stores_commit_predecessors() {
134 self.scan_commits().await?;
138 break;
139 }
140 self.visit_op(&op).await?;
141 }
142 Ok(self.queued.pop_front())
143 }
144
145 async fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
147 let mut to_emit = Vec::new(); let mut has_dup = false;
149 let mut i = 0;
150 while let Some(cur_id) = self.to_visit.get(i) {
151 if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
152 if to_emit.contains(cur_id) {
153 self.to_visit.remove(i);
154 has_dup = true;
155 continue;
156 }
157 to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
158 } else {
159 i += 1;
160 }
161 }
162
163 let store = self.repo.store();
164 let mut emit = async |id: &CommitId| -> BackendResult<()> {
165 let commit = store.get_commit_async(id).await?;
166 self.queued.push_back(CommitEvolutionEntry {
167 commit,
168 operation: Some(op.clone()),
169 reachable_predecessors: None,
170 });
171 Ok(())
172 };
173 match &*to_emit {
174 [] => {}
175 [id] if !has_dup => emit(id).await?,
176 _ => {
177 let sorted_ids = dag_walk::topo_order_reverse_ok(
178 to_emit.iter().map(Ok),
179 |&id| id,
180 |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
181 |id| id, )
183 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
184 for &id in &sorted_ids {
185 if op.predecessors_for_commit(id).is_some() {
186 emit(id).await?;
187 }
188 }
189 }
190 }
191 Ok(())
192 }
193
194 async fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
196 let store = self.repo.store();
197 let index = self.repo.index();
198 let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
199 let commits = dag_walk_async::topo_order_reverse(
200 join_all(self.to_visit.drain(..).map(async |id| {
201 store
202 .get_commit_async(&id)
203 .await
204 .map_err(WalkPredecessorsError::Backend)
205 }))
206 .await,
207 |commit: &Commit| commit.id().clone(),
208 async |commit: &Commit| {
209 let ids = match commit_predecessors.entry(commit.id().clone()) {
210 Entry::Occupied(entry) => entry.into_mut(),
211 Entry::Vacant(entry) => {
212 let mut filtered = vec![];
213 for id in &commit.store_commit().predecessors {
214 match index.has_id(id) {
215 Ok(true) => {
216 filtered.push(id.clone());
217 }
218 Ok(false) => {
219 }
221 Err(err) => {
222 return vec![Err(WalkPredecessorsError::Index(err))];
223 }
224 }
225 }
226 entry.insert(filtered)
227 }
228 };
229
230 join_all(ids.iter().map(async |id| {
231 store
232 .get_commit_async(id)
233 .await
234 .map_err(WalkPredecessorsError::Backend)
235 }))
236 .await
237 },
238 |_| panic!("graph has cycle"),
239 )
240 .await?;
241 self.queued.extend(commits.into_iter().map(|commit| {
242 let predecessors = commit_predecessors
243 .remove(commit.id())
244 .expect("commit must be visited once");
245 CommitEvolutionEntry {
246 commit,
247 operation: None,
248 reachable_predecessors: Some(predecessors),
249 }
250 }));
251 Ok(())
252 }
253
254 async fn flush_commits(&mut self) -> BackendResult<()> {
256 self.queued.reserve(self.to_visit.len());
257 for id in self.to_visit.drain(..) {
258 let commit = self.repo.store().get_commit_async(&id).await?;
259 self.queued.push_back(CommitEvolutionEntry {
260 commit,
261 operation: None,
262 reachable_predecessors: Some(vec![]),
265 });
266 }
267 Ok(())
268 }
269}
270
271pub async fn accumulate_predecessors(
279 new_ops: &[Operation],
280 old_ops: &[Operation],
281) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
282 if new_ops.is_empty() || old_ops.is_empty() {
283 return Ok(BTreeMap::new()); }
285
286 if let [op] = new_ops
288 && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
289 {
290 let Some(map) = &op.store_operation().commit_predecessors else {
291 return Ok(BTreeMap::new());
292 };
293 return resolve_transitive_edges(map, map.keys())
294 .await
295 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
296 }
297
298 let mut accumulated = BTreeMap::new();
301 let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
302 if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
303 return Ok(BTreeMap::new());
304 }
305 let mut accumulated = reverse_edges(accumulated);
306 let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
308 if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
309 return Ok(BTreeMap::new());
310 }
311 let new_commit_ids = new_ops
312 .iter()
313 .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
314 .flat_map(|map| map.keys());
315 resolve_transitive_edges(&accumulated, new_commit_ids)
316 .await
317 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
318}
319
320async fn try_collect_predecessors_into(
321 collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
322 ops: impl Stream<Item = OpStoreResult<Operation>>,
323) -> OpStoreResult<bool> {
324 let mut ops = pin!(ops);
325 while let Some(op) = ops.try_next().await? {
326 let Some(map) = &op.store_operation().commit_predecessors else {
327 return Ok(false);
328 };
329 collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
331 }
332 Ok(true)
333}
334
335async fn resolve_transitive_edges<'a: 'b, 'b>(
339 graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
340 start: impl IntoIterator<Item = &'b CommitId>,
341) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
342 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
343 let sorted_ids = dag_walk::topo_order_forward_ok(
344 start.into_iter().map(Ok),
345 |&id| id,
346 |&id| graph.get(id).into_iter().flatten().map(Ok),
347 |id| id, )?;
349 for cur_id in sorted_ids {
350 let Some(neighbors) = graph.get(cur_id) else {
351 continue;
352 };
353 let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
354 let new_neighbors = match &neighbors[..] {
355 [id] => lookup(id).to_vec(), ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
357 };
358 new_graph.insert(cur_id.clone(), new_neighbors);
359 }
360 Ok(new_graph)
361}
362
363fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
364 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
365 for (node1, neighbors) in graph {
366 for node2 in neighbors {
367 new_graph.entry(node2).or_default().push(node1.clone());
368 }
369 }
370 new_graph
371}