1use std::collections::BTreeMap;
18use std::collections::HashMap;
19use std::collections::VecDeque;
20use std::collections::hash_map::Entry;
21use std::pin::pin;
22use std::slice;
23
24use futures::Stream;
25use futures::StreamExt as _;
26use futures::TryStreamExt as _;
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use thiserror::Error;
30
31use crate::backend::BackendError;
32use crate::backend::BackendResult;
33use crate::backend::CommitId;
34use crate::commit::Commit;
35use crate::dag_walk;
36use crate::index::IndexError;
37use crate::op_store::OpStoreError;
38use crate::op_store::OpStoreResult;
39use crate::op_walk;
40use crate::operation::Operation;
41use crate::repo::ReadonlyRepo;
42use crate::repo::Repo as _;
43
44#[derive(Clone, Debug, serde::Serialize)]
46pub struct CommitEvolutionEntry {
47 pub commit: Commit,
49 pub operation: Option<Operation>,
51 #[serde(skip)]
55 reachable_predecessors: Option<Vec<CommitId>>,
56}
57
58impl CommitEvolutionEntry {
59 pub fn predecessor_ids(&self) -> &[CommitId] {
61 match &self.operation {
62 Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
63 None => self.reachable_predecessors.as_ref().unwrap(),
64 }
65 }
66
67 pub fn predecessors(&self) -> impl ExactSizeIterator<Item = BackendResult<Commit>> {
69 let store = self.commit.store();
70 self.predecessor_ids().iter().map(|id| store.get_commit(id))
71 }
72}
73
74#[expect(missing_docs)]
75#[derive(Debug, Error)]
76pub enum WalkPredecessorsError {
77 #[error(transparent)]
78 Backend(#[from] BackendError),
79 #[error(transparent)]
80 Index(#[from] IndexError),
81 #[error(transparent)]
82 OpStore(#[from] OpStoreError),
83 #[error("Predecessors cycle detected around commit {0}")]
84 CycleDetected(CommitId),
85}
86
87pub fn walk_predecessors<'repo>(
89 repo: &'repo ReadonlyRepo,
90 start_commits: &[CommitId],
91) -> impl Iterator<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<'repo> {
92 let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed();
93 WalkPredecessors {
94 repo,
95 op_ancestors,
96 to_visit: start_commits.to_vec(),
97 queued: VecDeque::new(),
98 }
99}
100
101struct WalkPredecessors<'repo, I> {
102 repo: &'repo ReadonlyRepo,
103 op_ancestors: I,
104 to_visit: Vec<CommitId>,
105 queued: VecDeque<CommitEvolutionEntry>,
106}
107
108impl<I> WalkPredecessors<'_, I>
109where
110 I: Stream<Item = OpStoreResult<Operation>> + Unpin,
111{
112 fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
113 while !self.to_visit.is_empty() && self.queued.is_empty() {
114 let Some(op) = self.op_ancestors.next().block_on().transpose()? else {
115 self.flush_commits()?;
117 break;
118 };
119 if !op.stores_commit_predecessors() {
120 self.scan_commits()?;
124 break;
125 }
126 self.visit_op(&op)?;
127 }
128 Ok(self.queued.pop_front())
129 }
130
131 fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
133 let mut to_emit = Vec::new(); let mut has_dup = false;
135 let mut i = 0;
136 while let Some(cur_id) = self.to_visit.get(i) {
137 if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
138 if to_emit.contains(cur_id) {
139 self.to_visit.remove(i);
140 has_dup = true;
141 continue;
142 }
143 to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
144 } else {
145 i += 1;
146 }
147 }
148
149 let store = self.repo.store();
150 let mut emit = |id: &CommitId| -> BackendResult<()> {
151 let commit = store.get_commit(id)?;
152 self.queued.push_back(CommitEvolutionEntry {
153 commit,
154 operation: Some(op.clone()),
155 reachable_predecessors: None,
156 });
157 Ok(())
158 };
159 match &*to_emit {
160 [] => {}
161 [id] if !has_dup => emit(id)?,
162 _ => {
163 let sorted_ids = dag_walk::topo_order_reverse_ok(
164 to_emit.iter().map(Ok),
165 |&id| id,
166 |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
167 |id| id, )
169 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
170 for &id in &sorted_ids {
171 if op.predecessors_for_commit(id).is_some() {
172 emit(id)?;
173 }
174 }
175 }
176 }
177 Ok(())
178 }
179
180 fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
182 let store = self.repo.store();
183 let index = self.repo.index();
184 let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
185 let commits = dag_walk::topo_order_reverse_ok(
186 self.to_visit.drain(..).map(|id| {
187 store
188 .get_commit(&id)
189 .map_err(WalkPredecessorsError::Backend)
190 }),
191 |commit: &Commit| commit.id().clone(),
192 |commit: &Commit| {
193 let ids = match commit_predecessors.entry(commit.id().clone()) {
194 Entry::Occupied(entry) => entry.into_mut(),
195 Entry::Vacant(entry) => {
196 let mut filtered = vec![];
197 for id in &commit.store_commit().predecessors {
198 match index.has_id(id) {
199 Ok(true) => {
200 filtered.push(id.clone());
201 }
202 Ok(false) => {
203 }
205 Err(err) => {
206 return vec![Err(WalkPredecessorsError::Index(err))];
207 }
208 }
209 }
210 entry.insert(filtered)
211 }
212 };
213
214 ids.iter()
215 .map(|id| store.get_commit(id).map_err(WalkPredecessorsError::Backend))
216 .collect_vec()
217 },
218 |_| panic!("graph has cycle"),
219 )?;
220 self.queued.extend(commits.into_iter().map(|commit| {
221 let predecessors = commit_predecessors
222 .remove(commit.id())
223 .expect("commit must be visited once");
224 CommitEvolutionEntry {
225 commit,
226 operation: None,
227 reachable_predecessors: Some(predecessors),
228 }
229 }));
230 Ok(())
231 }
232
233 fn flush_commits(&mut self) -> BackendResult<()> {
235 self.queued.reserve(self.to_visit.len());
236 for id in self.to_visit.drain(..) {
237 let commit = self.repo.store().get_commit(&id)?;
238 self.queued.push_back(CommitEvolutionEntry {
239 commit,
240 operation: None,
241 reachable_predecessors: Some(vec![]),
244 });
245 }
246 Ok(())
247 }
248}
249
250impl<I> Iterator for WalkPredecessors<'_, I>
251where
252 I: Stream<Item = OpStoreResult<Operation>> + Unpin,
253{
254 type Item = Result<CommitEvolutionEntry, WalkPredecessorsError>;
255
256 fn next(&mut self) -> Option<Self::Item> {
257 self.try_next().transpose()
258 }
259}
260
261pub async fn accumulate_predecessors(
269 new_ops: &[Operation],
270 old_ops: &[Operation],
271) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
272 if new_ops.is_empty() || old_ops.is_empty() {
273 return Ok(BTreeMap::new()); }
275
276 if let [op] = new_ops
278 && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
279 {
280 let Some(map) = &op.store_operation().commit_predecessors else {
281 return Ok(BTreeMap::new());
282 };
283 return resolve_transitive_edges(map, map.keys())
284 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
285 }
286
287 let mut accumulated = BTreeMap::new();
290 let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
291 if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
292 return Ok(BTreeMap::new());
293 }
294 let mut accumulated = reverse_edges(accumulated);
295 let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
297 if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
298 return Ok(BTreeMap::new());
299 }
300 let new_commit_ids = new_ops
301 .iter()
302 .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
303 .flat_map(|map| map.keys());
304 resolve_transitive_edges(&accumulated, new_commit_ids)
305 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
306}
307
308async fn try_collect_predecessors_into(
309 collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
310 ops: impl Stream<Item = OpStoreResult<Operation>>,
311) -> OpStoreResult<bool> {
312 let mut ops = pin!(ops);
313 while let Some(op) = ops.try_next().await? {
314 let Some(map) = &op.store_operation().commit_predecessors else {
315 return Ok(false);
316 };
317 collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
319 }
320 Ok(true)
321}
322
323fn resolve_transitive_edges<'a: 'b, 'b>(
327 graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
328 start: impl IntoIterator<Item = &'b CommitId>,
329) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
330 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
331 let sorted_ids = dag_walk::topo_order_forward_ok(
332 start.into_iter().map(Ok),
333 |&id| id,
334 |&id| graph.get(id).into_iter().flatten().map(Ok),
335 |id| id, )?;
337 for cur_id in sorted_ids {
338 let Some(neighbors) = graph.get(cur_id) else {
339 continue;
340 };
341 let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
342 let new_neighbors = match &neighbors[..] {
343 [id] => lookup(id).to_vec(), ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
345 };
346 new_graph.insert(cur_id.clone(), new_neighbors);
347 }
348 Ok(new_graph)
349}
350
351fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
352 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
353 for (node1, neighbors) in graph {
354 for node2 in neighbors {
355 new_graph.entry(node2).or_default().push(node1.clone());
356 }
357 }
358 new_graph
359}