1use std::collections::BTreeMap;
18use std::collections::VecDeque;
19use std::slice;
20use std::sync::Arc;
21
22use itertools::Itertools as _;
23use thiserror::Error;
24
25use crate::backend::BackendError;
26use crate::backend::BackendResult;
27use crate::backend::CommitId;
28use crate::commit::Commit;
29use crate::dag_walk;
30use crate::op_store::OpStoreError;
31use crate::op_store::OpStoreResult;
32use crate::op_walk;
33use crate::operation::Operation;
34use crate::repo::ReadonlyRepo;
35use crate::repo::Repo as _;
36use crate::store::Store;
37
38#[derive(Clone, Debug)]
40pub struct CommitEvolutionEntry {
41 pub commit: Commit,
43 pub operation: Option<Operation>,
45}
46
47impl CommitEvolutionEntry {
48 pub fn predecessor_ids(&self) -> &[CommitId] {
50 match &self.operation {
51 Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
52 None => &self.commit.store_commit().predecessors,
53 }
54 }
55
56 pub fn predecessors(&self) -> impl ExactSizeIterator<Item = BackendResult<Commit>> + use<'_> {
58 let store = self.commit.store();
59 self.predecessor_ids().iter().map(|id| store.get_commit(id))
60 }
61}
62
63#[allow(missing_docs)]
64#[derive(Debug, Error)]
65pub enum WalkPredecessorsError {
66 #[error(transparent)]
67 Backend(#[from] BackendError),
68 #[error(transparent)]
69 OpStore(#[from] OpStoreError),
70 #[error("Predecessors cycle detected around commit {0}")]
71 CycleDetected(CommitId),
72}
73
74pub fn walk_predecessors(
76 repo: &ReadonlyRepo,
77 start_commits: &[CommitId],
78) -> impl Iterator<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> {
79 WalkPredecessors {
80 store: repo.store().clone(),
81 op_ancestors: op_walk::walk_ancestors(slice::from_ref(repo.operation())),
82 to_visit: start_commits.to_vec(),
83 queued: VecDeque::new(),
84 }
85}
86
87struct WalkPredecessors<I> {
88 store: Arc<Store>,
89 op_ancestors: I,
90 to_visit: Vec<CommitId>,
91 queued: VecDeque<CommitEvolutionEntry>,
92}
93
94impl<I> WalkPredecessors<I>
95where
96 I: Iterator<Item = OpStoreResult<Operation>>,
97{
98 fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
99 while !self.to_visit.is_empty() && self.queued.is_empty() {
100 let Some(op) = self.op_ancestors.next().transpose()? else {
101 self.flush_commits()?;
103 break;
104 };
105 if !op.stores_commit_predecessors() {
106 self.scan_commits()?;
110 break;
111 }
112 self.visit_op(&op)?;
113 }
114 Ok(self.queued.pop_front())
115 }
116
117 fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
119 let mut to_emit = Vec::new(); let mut has_dup = false;
121 let mut i = 0;
122 while let Some(cur_id) = self.to_visit.get(i) {
123 if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
124 if to_emit.contains(cur_id) {
125 self.to_visit.remove(i);
126 has_dup = true;
127 continue;
128 }
129 to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
130 } else {
131 i += 1;
132 }
133 }
134
135 let mut emit = |id: &CommitId| -> BackendResult<()> {
136 let commit = self.store.get_commit(id)?;
137 self.queued.push_back(CommitEvolutionEntry {
138 commit,
139 operation: Some(op.clone()),
140 });
141 Ok(())
142 };
143 match &*to_emit {
144 [] => {}
145 [id] if !has_dup => emit(id)?,
146 _ => {
147 let sorted_ids = dag_walk::topo_order_reverse_ok(
148 to_emit.iter().map(Ok),
149 |&id| id,
150 |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
151 |id| id, )
153 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
154 for &id in &sorted_ids {
155 if op.predecessors_for_commit(id).is_some() {
156 emit(id)?;
157 }
158 }
159 }
160 }
161 Ok(())
162 }
163
164 fn scan_commits(&mut self) -> BackendResult<()> {
166 let commits = dag_walk::topo_order_reverse_ok(
169 self.to_visit.drain(..).map(|id| self.store.get_commit(&id)),
170 |commit: &Commit| commit.id().clone(),
171 |commit: &Commit| {
172 let ids = &commit.store_commit().predecessors;
173 ids.iter().map(|id| self.store.get_commit(id)).collect_vec()
174 },
175 |_| panic!("graph has cycle"),
176 )?;
177 self.queued
178 .extend(commits.into_iter().map(|commit| CommitEvolutionEntry {
179 commit,
180 operation: None,
181 }));
182 Ok(())
183 }
184
185 fn flush_commits(&mut self) -> BackendResult<()> {
187 self.queued.reserve(self.to_visit.len());
190 for id in self.to_visit.drain(..) {
191 let commit = self.store.get_commit(&id)?;
192 self.queued.push_back(CommitEvolutionEntry {
193 commit,
194 operation: None,
195 });
196 }
197 Ok(())
198 }
199}
200
201impl<I> Iterator for WalkPredecessors<I>
202where
203 I: Iterator<Item = OpStoreResult<Operation>>,
204{
205 type Item = Result<CommitEvolutionEntry, WalkPredecessorsError>;
206
207 fn next(&mut self) -> Option<Self::Item> {
208 self.try_next().transpose()
209 }
210}
211
212pub fn accumulate_predecessors(
220 new_ops: &[Operation],
221 old_ops: &[Operation],
222) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
223 if new_ops.is_empty() || old_ops.is_empty() {
224 return Ok(BTreeMap::new()); }
226
227 if let [op] = new_ops {
229 if op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id())) {
230 let Some(map) = &op.store_operation().commit_predecessors else {
231 return Ok(BTreeMap::new());
232 };
233 return resolve_transitive_edges(map, map.keys())
234 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
235 }
236 }
237
238 let mut accumulated = BTreeMap::new();
241 let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
242 if !try_collect_predecessors_into(&mut accumulated, reverse_ops)? {
243 return Ok(BTreeMap::new());
244 }
245 let mut accumulated = reverse_edges(accumulated);
246 let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
248 if !try_collect_predecessors_into(&mut accumulated, forward_ops)? {
249 return Ok(BTreeMap::new());
250 }
251 let new_commit_ids = new_ops
252 .iter()
253 .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
254 .flat_map(|map| map.keys());
255 resolve_transitive_edges(&accumulated, new_commit_ids)
256 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
257}
258
259fn try_collect_predecessors_into(
260 collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
261 ops: impl IntoIterator<Item = OpStoreResult<Operation>>,
262) -> OpStoreResult<bool> {
263 for op in ops {
264 let op = op?;
265 let Some(map) = &op.store_operation().commit_predecessors else {
266 return Ok(false);
267 };
268 collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
270 }
271 Ok(true)
272}
273
274fn resolve_transitive_edges<'a: 'b, 'b>(
278 graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
279 start: impl IntoIterator<Item = &'b CommitId>,
280) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
281 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
282 let sorted_ids = dag_walk::topo_order_forward_ok(
283 start.into_iter().map(Ok),
284 |&id| id,
285 |&id| graph.get(id).into_iter().flatten().map(Ok),
286 |id| id, )?;
288 for cur_id in sorted_ids {
289 let Some(neighbors) = graph.get(cur_id) else {
290 continue;
291 };
292 let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
293 let new_neighbors = match &neighbors[..] {
294 [id] => lookup(id).to_vec(), ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
296 };
297 new_graph.insert(cur_id.clone(), new_neighbors);
298 }
299 Ok(new_graph)
300}
301
302fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
303 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
304 for (node1, neighbors) in graph {
305 for node2 in neighbors {
306 new_graph.entry(node2).or_default().push(node1.clone());
307 }
308 }
309 new_graph
310}