Skip to main content

jj_lib/
evolution.rs

1// Copyright 2025 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utility for commit evolution history.
16
17use std::collections::BTreeMap;
18use std::collections::HashMap;
19use std::collections::VecDeque;
20use std::collections::hash_map::Entry;
21use std::pin::pin;
22use std::slice;
23
24use futures::Stream;
25use futures::StreamExt as _;
26use futures::TryStreamExt as _;
27use futures::future::join_all;
28use futures::future::try_join_all;
29use futures::stream;
30use itertools::Itertools as _;
31use thiserror::Error;
32
33use crate::backend::BackendError;
34use crate::backend::BackendResult;
35use crate::backend::CommitId;
36use crate::commit::Commit;
37use crate::dag_walk;
38use crate::dag_walk_async;
39use crate::index::IndexError;
40use crate::op_store::OpStoreError;
41use crate::op_store::OpStoreResult;
42use crate::op_walk;
43use crate::operation::Operation;
44use crate::repo::ReadonlyRepo;
45use crate::repo::Repo as _;
46
47/// Commit with predecessor information.
48#[derive(Clone, Debug, serde::Serialize)]
49pub struct CommitEvolutionEntry {
50    /// Commit id and metadata.
51    pub commit: Commit,
52    /// Operation where the commit was created or rewritten.
53    pub operation: Option<Operation>,
54    /// Reachable predecessor ids reconstructed from the commit metadata. This
55    /// should be set if the associated `operation` is unknown.
56    // TODO: remove with legacy commit.predecessors support
57    #[serde(skip)]
58    reachable_predecessors: Option<Vec<CommitId>>,
59}
60
61impl CommitEvolutionEntry {
62    /// Predecessor ids of this commit.
63    pub fn predecessor_ids(&self) -> &[CommitId] {
64        match &self.operation {
65            Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
66            None => self.reachable_predecessors.as_ref().unwrap(),
67        }
68    }
69
70    /// Predecessor commit objects of this commit.
71    pub async fn predecessors(&self) -> BackendResult<Vec<Commit>> {
72        let store = self.commit.store();
73        try_join_all(
74            self.predecessor_ids()
75                .iter()
76                .map(|id| store.get_commit_async(id)),
77        )
78        .await
79    }
80}
81
82#[expect(missing_docs)]
83#[derive(Debug, Error)]
84pub enum WalkPredecessorsError {
85    #[error(transparent)]
86    Backend(#[from] BackendError),
87    #[error(transparent)]
88    Index(#[from] IndexError),
89    #[error(transparent)]
90    OpStore(#[from] OpStoreError),
91    #[error("Predecessors cycle detected around commit {0}")]
92    CycleDetected(CommitId),
93}
94
95/// Walks operations to emit commit predecessors in reverse topological order.
96pub fn walk_predecessors<'repo>(
97    repo: &'repo ReadonlyRepo,
98    start_commits: &[CommitId],
99) -> impl Stream<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<'repo> {
100    let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed_local();
101    let state = WalkPredecessors {
102        repo,
103        op_ancestors,
104        to_visit: start_commits.to_vec(),
105        queued: VecDeque::new(),
106    };
107    stream::unfold(state, |mut state| async move {
108        let result = state.try_next_impl().await.transpose()?;
109        Some((result, state))
110    })
111}
112
113struct WalkPredecessors<'repo, I> {
114    repo: &'repo ReadonlyRepo,
115    op_ancestors: I,
116    to_visit: Vec<CommitId>,
117    queued: VecDeque<CommitEvolutionEntry>,
118}
119
120impl<I> WalkPredecessors<'_, I>
121where
122    I: Stream<Item = OpStoreResult<Operation>> + Unpin,
123{
124    async fn try_next_impl(
125        &mut self,
126    ) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
127        while !self.to_visit.is_empty() && self.queued.is_empty() {
128            let Some(op) = self.op_ancestors.try_next().await? else {
129                // Scanned all operations, no fallback needed.
130                self.flush_commits().await?;
131                break;
132            };
133            if !op.stores_commit_predecessors() {
134                // There may be concurrent ops, but let's simply switch to the
135                // legacy commit traversal. Operation history should be mostly
136                // linear.
137                self.scan_commits().await?;
138                break;
139            }
140            self.visit_op(&op).await?;
141        }
142        Ok(self.queued.pop_front())
143    }
144
145    /// Looks for predecessors within the given operation.
146    async fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
147        let mut to_emit = Vec::new(); // transitive edges should be short
148        let mut has_dup = false;
149        let mut i = 0;
150        while let Some(cur_id) = self.to_visit.get(i) {
151            if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
152                if to_emit.contains(cur_id) {
153                    self.to_visit.remove(i);
154                    has_dup = true;
155                    continue;
156                }
157                to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
158            } else {
159                i += 1;
160            }
161        }
162
163        let store = self.repo.store();
164        let mut emit = async |id: &CommitId| -> BackendResult<()> {
165            let commit = store.get_commit_async(id).await?;
166            self.queued.push_back(CommitEvolutionEntry {
167                commit,
168                operation: Some(op.clone()),
169                reachable_predecessors: None,
170            });
171            Ok(())
172        };
173        match &*to_emit {
174            [] => {}
175            [id] if !has_dup => emit(id).await?,
176            _ => {
177                let sorted_ids = dag_walk::topo_order_reverse_ok(
178                    to_emit.iter().map(Ok),
179                    |&id| id,
180                    |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
181                    |id| id, // Err(&CommitId) if graph has cycle
182                )
183                .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
184                for &id in &sorted_ids {
185                    if op.predecessors_for_commit(id).is_some() {
186                        emit(id).await?;
187                    }
188                }
189            }
190        }
191        Ok(())
192    }
193
194    /// Traverses predecessors from remainder commits.
195    async fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
196        let store = self.repo.store();
197        let index = self.repo.index();
198        let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
199        let commits = dag_walk_async::topo_order_reverse(
200            join_all(self.to_visit.drain(..).map(async |id| {
201                store
202                    .get_commit_async(&id)
203                    .await
204                    .map_err(WalkPredecessorsError::Backend)
205            }))
206            .await,
207            |commit: &Commit| commit.id().clone(),
208            async |commit: &Commit| {
209                let ids = match commit_predecessors.entry(commit.id().clone()) {
210                    Entry::Occupied(entry) => entry.into_mut(),
211                    Entry::Vacant(entry) => {
212                        let mut filtered = vec![];
213                        for id in &commit.store_commit().predecessors {
214                            match index.has_id(id) {
215                                Ok(true) => {
216                                    filtered.push(id.clone());
217                                }
218                                Ok(false) => {
219                                    // Ignore unreachable predecessors
220                                }
221                                Err(err) => {
222                                    return vec![Err(WalkPredecessorsError::Index(err))];
223                                }
224                            }
225                        }
226                        entry.insert(filtered)
227                    }
228                };
229
230                join_all(ids.iter().map(async |id| {
231                    store
232                        .get_commit_async(id)
233                        .await
234                        .map_err(WalkPredecessorsError::Backend)
235                }))
236                .await
237            },
238            |_| panic!("graph has cycle"),
239        )
240        .await?;
241        self.queued.extend(commits.into_iter().map(|commit| {
242            let predecessors = commit_predecessors
243                .remove(commit.id())
244                .expect("commit must be visited once");
245            CommitEvolutionEntry {
246                commit,
247                operation: None,
248                reachable_predecessors: Some(predecessors),
249            }
250        }));
251        Ok(())
252    }
253
254    /// Moves remainder commits to output queue.
255    async fn flush_commits(&mut self) -> BackendResult<()> {
256        self.queued.reserve(self.to_visit.len());
257        for id in self.to_visit.drain(..) {
258            let commit = self.repo.store().get_commit_async(&id).await?;
259            self.queued.push_back(CommitEvolutionEntry {
260                commit,
261                operation: None,
262                // There were no legacy operations, so the commit should have no
263                // predecessors.
264                reachable_predecessors: Some(vec![]),
265            });
266        }
267        Ok(())
268    }
269}
270
271/// Collects predecessor records from `new_ops` to `old_ops`, and resolves
272/// transitive entries.
273///
274/// This function assumes that there exists a single greatest common ancestors
275/// between `old_ops` and `new_ops`. If `old_ops` and `new_ops` have ancestors
276/// and descendants each other, or if criss-crossed merges exist between these
277/// operations, the returned mapping would be lossy.
278pub async fn accumulate_predecessors(
279    new_ops: &[Operation],
280    old_ops: &[Operation],
281) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
282    if new_ops.is_empty() || old_ops.is_empty() {
283        return Ok(BTreeMap::new()); // No common ancestor exists
284    }
285
286    // Fast path for the single forward operation case.
287    if let [op] = new_ops
288        && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
289    {
290        let Some(map) = &op.store_operation().commit_predecessors else {
291            return Ok(BTreeMap::new());
292        };
293        return resolve_transitive_edges(map, map.keys())
294            .await
295            .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
296    }
297
298    // Follow reverse edges from the common ancestor to old_ops. Here we use
299    // BTreeMap to stabilize order of the reversed edges.
300    let mut accumulated = BTreeMap::new();
301    let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
302    if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
303        return Ok(BTreeMap::new());
304    }
305    let mut accumulated = reverse_edges(accumulated);
306    // Follow forward edges from new_ops to the common ancestor.
307    let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
308    if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
309        return Ok(BTreeMap::new());
310    }
311    let new_commit_ids = new_ops
312        .iter()
313        .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
314        .flat_map(|map| map.keys());
315    resolve_transitive_edges(&accumulated, new_commit_ids)
316        .await
317        .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
318}
319
320async fn try_collect_predecessors_into(
321    collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
322    ops: impl Stream<Item = OpStoreResult<Operation>>,
323) -> OpStoreResult<bool> {
324    let mut ops = pin!(ops);
325    while let Some(op) = ops.try_next().await? {
326        let Some(map) = &op.store_operation().commit_predecessors else {
327            return Ok(false);
328        };
329        // Just insert. There should be no duplicate entries.
330        collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
331    }
332    Ok(true)
333}
334
335/// Resolves transitive edges in `graph` starting from the `start` nodes,
336/// returns new DAG. The returned DAG only includes edges reachable from the
337/// `start` nodes.
338async fn resolve_transitive_edges<'a: 'b, 'b>(
339    graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
340    start: impl IntoIterator<Item = &'b CommitId>,
341) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
342    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
343    let sorted_ids = dag_walk::topo_order_forward_ok(
344        start.into_iter().map(Ok),
345        |&id| id,
346        |&id| graph.get(id).into_iter().flatten().map(Ok),
347        |id| id, // Err(&CommitId) if graph has cycle
348    )?;
349    for cur_id in sorted_ids {
350        let Some(neighbors) = graph.get(cur_id) else {
351            continue;
352        };
353        let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
354        let new_neighbors = match &neighbors[..] {
355            [id] => lookup(id).to_vec(), // unique() not needed
356            ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
357        };
358        new_graph.insert(cur_id.clone(), new_neighbors);
359    }
360    Ok(new_graph)
361}
362
363fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
364    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
365    for (node1, neighbors) in graph {
366        for node2 in neighbors {
367            new_graph.entry(node2).or_default().push(node1.clone());
368        }
369    }
370    new_graph
371}