1use std::collections::BTreeMap;
18use std::collections::VecDeque;
19use std::pin::pin;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::try_join_all;
27use futures::stream;
28use itertools::Itertools as _;
29use thiserror::Error;
30
31use crate::backend::BackendError;
32use crate::backend::BackendResult;
33use crate::backend::CommitId;
34use crate::commit::Commit;
35use crate::dag_walk;
36use crate::op_store::OpStoreError;
37use crate::op_store::OpStoreResult;
38use crate::op_walk;
39use crate::operation::Operation;
40use crate::repo::ReadonlyRepo;
41use crate::repo::Repo as _;
42use crate::store::Store;
43
44#[derive(Clone, Debug, serde::Serialize)]
46pub struct CommitEvolutionEntry {
47 pub commit: Commit,
49 pub operation: Option<Operation>,
51}
52
53impl CommitEvolutionEntry {
54 pub fn predecessor_ids(&self) -> &[CommitId] {
56 match &self.operation {
57 Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
58 None => &[],
59 }
60 }
61
62 pub async fn predecessors(&self) -> BackendResult<Vec<Commit>> {
64 let store = self.commit.store();
65 try_join_all(
66 self.predecessor_ids()
67 .iter()
68 .map(|id| store.get_commit_async(id)),
69 )
70 .await
71 }
72}
73
74#[expect(missing_docs)]
75#[derive(Debug, Error)]
76pub enum WalkPredecessorsError {
77 #[error(transparent)]
78 Backend(#[from] BackendError),
79 #[error(transparent)]
80 OpStore(#[from] OpStoreError),
81 #[error("Predecessors cycle detected around commit {0}")]
82 CycleDetected(CommitId),
83}
84
85pub fn walk_predecessors(
87 repo: &ReadonlyRepo,
88 start_commits: &[CommitId],
89) -> impl Stream<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<> {
90 let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed_local();
91 let state = WalkPredecessors {
92 store: repo.store().clone(),
93 op_ancestors,
94 to_visit: start_commits.to_vec(),
95 queued: VecDeque::new(),
96 };
97 stream::unfold(state, |mut state| async move {
98 let result = state.try_next_impl().await.transpose()?;
99 Some((result, state))
100 })
101}
102
103struct WalkPredecessors<I> {
104 store: Arc<Store>,
105 op_ancestors: I,
106 to_visit: Vec<CommitId>,
107 queued: VecDeque<CommitEvolutionEntry>,
108}
109
110impl<I> WalkPredecessors<I>
111where
112 I: Stream<Item = OpStoreResult<Operation>> + Unpin,
113{
114 async fn try_next_impl(
115 &mut self,
116 ) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
117 while !self.to_visit.is_empty() && self.queued.is_empty() {
118 let Some(op) = self.op_ancestors.try_next().await? else {
119 self.flush_commits().await?;
120 break;
121 };
122 if !op.stores_commit_predecessors() {
123 self.flush_commits().await?;
126 break;
127 }
128 self.visit_op(&op).await?;
129 }
130 Ok(self.queued.pop_front())
131 }
132
133 async fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
135 let mut to_emit = Vec::new(); let mut has_dup = false;
137 let mut i = 0;
138 while let Some(cur_id) = self.to_visit.get(i) {
139 if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
140 if to_emit.contains(cur_id) {
141 self.to_visit.remove(i);
142 has_dup = true;
143 continue;
144 }
145 to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
146 } else {
147 i += 1;
148 }
149 }
150
151 let mut emit = async |id: &CommitId| -> BackendResult<()> {
154 let commit = self.store.get_commit_async(id).await?;
155 self.queued.push_back(CommitEvolutionEntry {
156 commit,
157 operation: Some(op.clone()),
158 });
159 Ok(())
160 };
161 match &*to_emit {
162 [] => {}
163 [id] if !has_dup => emit(id).await?,
164 _ => {
165 let sorted_ids = dag_walk::topo_order_reverse_ok(
166 to_emit.iter().map(Ok),
167 |&id| id,
168 |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
169 |id| id, )
171 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
172 for &id in &sorted_ids {
173 if op.predecessors_for_commit(id).is_some() {
174 emit(id).await?;
175 }
176 }
177 }
178 }
179 Ok(())
180 }
181
182 async fn flush_commits(&mut self) -> BackendResult<()> {
184 self.queued.reserve(self.to_visit.len());
185 for id in self.to_visit.drain(..) {
186 let commit = self.store.get_commit_async(&id).await?;
187 self.queued.push_back(CommitEvolutionEntry {
188 commit,
189 operation: None,
190 });
191 }
192 Ok(())
193 }
194}
195
196pub async fn accumulate_predecessors(
204 new_ops: &[Operation],
205 old_ops: &[Operation],
206) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
207 if new_ops.is_empty() || old_ops.is_empty() {
208 return Ok(BTreeMap::new()); }
210
211 if let [op] = new_ops
213 && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
214 {
215 let Some(map) = &op.store_operation().commit_predecessors else {
216 return Ok(BTreeMap::new());
217 };
218 return resolve_transitive_edges(map, map.keys())
219 .await
220 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
221 }
222
223 let mut accumulated = BTreeMap::new();
226 let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
227 if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
228 return Ok(BTreeMap::new());
229 }
230 let mut accumulated = reverse_edges(accumulated);
231 let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
233 if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
234 return Ok(BTreeMap::new());
235 }
236 let new_commit_ids = new_ops
237 .iter()
238 .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
239 .flat_map(|map| map.keys());
240 resolve_transitive_edges(&accumulated, new_commit_ids)
241 .await
242 .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
243}
244
245async fn try_collect_predecessors_into(
246 collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
247 ops: impl Stream<Item = OpStoreResult<Operation>>,
248) -> OpStoreResult<bool> {
249 let mut ops = pin!(ops);
250 while let Some(op) = ops.try_next().await? {
251 let Some(map) = &op.store_operation().commit_predecessors else {
252 return Ok(false);
253 };
254 collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
256 }
257 Ok(true)
258}
259
260async fn resolve_transitive_edges<'a: 'b, 'b>(
264 graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
265 start: impl IntoIterator<Item = &'b CommitId>,
266) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
267 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
268 let sorted_ids = dag_walk::topo_order_forward_ok(
269 start.into_iter().map(Ok),
270 |&id| id,
271 |&id| graph.get(id).into_iter().flatten().map(Ok),
272 |id| id, )?;
274 for cur_id in sorted_ids {
275 let Some(neighbors) = graph.get(cur_id) else {
276 continue;
277 };
278 let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
279 let new_neighbors = match &neighbors[..] {
280 [id] => lookup(id).to_vec(), ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
282 };
283 new_graph.insert(cur_id.clone(), new_neighbors);
284 }
285 Ok(new_graph)
286}
287
288fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
289 let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
290 for (node1, neighbors) in graph {
291 for node2 in neighbors {
292 new_graph.entry(node2).or_default().push(node1.clone());
293 }
294 }
295 new_graph
296}