Skip to main content

jj_lib/
evolution.rs

1// Copyright 2025 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utility for commit evolution history.
16
17use std::collections::BTreeMap;
18use std::collections::HashMap;
19use std::collections::VecDeque;
20use std::collections::hash_map::Entry;
21use std::pin::pin;
22use std::slice;
23
24use futures::Stream;
25use futures::StreamExt as _;
26use futures::TryStreamExt as _;
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use thiserror::Error;
30
31use crate::backend::BackendError;
32use crate::backend::BackendResult;
33use crate::backend::CommitId;
34use crate::commit::Commit;
35use crate::dag_walk;
36use crate::index::IndexError;
37use crate::op_store::OpStoreError;
38use crate::op_store::OpStoreResult;
39use crate::op_walk;
40use crate::operation::Operation;
41use crate::repo::ReadonlyRepo;
42use crate::repo::Repo as _;
43
44/// Commit with predecessor information.
45#[derive(Clone, Debug, serde::Serialize)]
46pub struct CommitEvolutionEntry {
47    /// Commit id and metadata.
48    pub commit: Commit,
49    /// Operation where the commit was created or rewritten.
50    pub operation: Option<Operation>,
51    /// Reachable predecessor ids reconstructed from the commit metadata. This
52    /// should be set if the associated `operation` is unknown.
53    // TODO: remove with legacy commit.predecessors support
54    #[serde(skip)]
55    reachable_predecessors: Option<Vec<CommitId>>,
56}
57
58impl CommitEvolutionEntry {
59    /// Predecessor ids of this commit.
60    pub fn predecessor_ids(&self) -> &[CommitId] {
61        match &self.operation {
62            Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
63            None => self.reachable_predecessors.as_ref().unwrap(),
64        }
65    }
66
67    /// Predecessor commit objects of this commit.
68    pub fn predecessors(&self) -> impl ExactSizeIterator<Item = BackendResult<Commit>> {
69        let store = self.commit.store();
70        self.predecessor_ids().iter().map(|id| store.get_commit(id))
71    }
72}
73
74#[expect(missing_docs)]
75#[derive(Debug, Error)]
76pub enum WalkPredecessorsError {
77    #[error(transparent)]
78    Backend(#[from] BackendError),
79    #[error(transparent)]
80    Index(#[from] IndexError),
81    #[error(transparent)]
82    OpStore(#[from] OpStoreError),
83    #[error("Predecessors cycle detected around commit {0}")]
84    CycleDetected(CommitId),
85}
86
87/// Walks operations to emit commit predecessors in reverse topological order.
88pub fn walk_predecessors<'repo>(
89    repo: &'repo ReadonlyRepo,
90    start_commits: &[CommitId],
91) -> impl Iterator<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<'repo> {
92    let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed();
93    WalkPredecessors {
94        repo,
95        op_ancestors,
96        to_visit: start_commits.to_vec(),
97        queued: VecDeque::new(),
98    }
99}
100
101struct WalkPredecessors<'repo, I> {
102    repo: &'repo ReadonlyRepo,
103    op_ancestors: I,
104    to_visit: Vec<CommitId>,
105    queued: VecDeque<CommitEvolutionEntry>,
106}
107
108impl<I> WalkPredecessors<'_, I>
109where
110    I: Stream<Item = OpStoreResult<Operation>> + Unpin,
111{
112    fn try_next(&mut self) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
113        while !self.to_visit.is_empty() && self.queued.is_empty() {
114            let Some(op) = self.op_ancestors.next().block_on().transpose()? else {
115                // Scanned all operations, no fallback needed.
116                self.flush_commits()?;
117                break;
118            };
119            if !op.stores_commit_predecessors() {
120                // There may be concurrent ops, but let's simply switch to the
121                // legacy commit traversal. Operation history should be mostly
122                // linear.
123                self.scan_commits()?;
124                break;
125            }
126            self.visit_op(&op)?;
127        }
128        Ok(self.queued.pop_front())
129    }
130
131    /// Looks for predecessors within the given operation.
132    fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
133        let mut to_emit = Vec::new(); // transitive edges should be short
134        let mut has_dup = false;
135        let mut i = 0;
136        while let Some(cur_id) = self.to_visit.get(i) {
137            if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
138                if to_emit.contains(cur_id) {
139                    self.to_visit.remove(i);
140                    has_dup = true;
141                    continue;
142                }
143                to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
144            } else {
145                i += 1;
146            }
147        }
148
149        let store = self.repo.store();
150        let mut emit = |id: &CommitId| -> BackendResult<()> {
151            let commit = store.get_commit(id)?;
152            self.queued.push_back(CommitEvolutionEntry {
153                commit,
154                operation: Some(op.clone()),
155                reachable_predecessors: None,
156            });
157            Ok(())
158        };
159        match &*to_emit {
160            [] => {}
161            [id] if !has_dup => emit(id)?,
162            _ => {
163                let sorted_ids = dag_walk::topo_order_reverse_ok(
164                    to_emit.iter().map(Ok),
165                    |&id| id,
166                    |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
167                    |id| id, // Err(&CommitId) if graph has cycle
168                )
169                .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
170                for &id in &sorted_ids {
171                    if op.predecessors_for_commit(id).is_some() {
172                        emit(id)?;
173                    }
174                }
175            }
176        }
177        Ok(())
178    }
179
180    /// Traverses predecessors from remainder commits.
181    fn scan_commits(&mut self) -> Result<(), WalkPredecessorsError> {
182        let store = self.repo.store();
183        let index = self.repo.index();
184        let mut commit_predecessors: HashMap<CommitId, Vec<CommitId>> = HashMap::new();
185        let commits = dag_walk::topo_order_reverse_ok(
186            self.to_visit.drain(..).map(|id| {
187                store
188                    .get_commit(&id)
189                    .map_err(WalkPredecessorsError::Backend)
190            }),
191            |commit: &Commit| commit.id().clone(),
192            |commit: &Commit| {
193                let ids = match commit_predecessors.entry(commit.id().clone()) {
194                    Entry::Occupied(entry) => entry.into_mut(),
195                    Entry::Vacant(entry) => {
196                        let mut filtered = vec![];
197                        for id in &commit.store_commit().predecessors {
198                            match index.has_id(id) {
199                                Ok(true) => {
200                                    filtered.push(id.clone());
201                                }
202                                Ok(false) => {
203                                    // Ignore unreachable predecessors
204                                }
205                                Err(err) => {
206                                    return vec![Err(WalkPredecessorsError::Index(err))];
207                                }
208                            }
209                        }
210                        entry.insert(filtered)
211                    }
212                };
213
214                ids.iter()
215                    .map(|id| store.get_commit(id).map_err(WalkPredecessorsError::Backend))
216                    .collect_vec()
217            },
218            |_| panic!("graph has cycle"),
219        )?;
220        self.queued.extend(commits.into_iter().map(|commit| {
221            let predecessors = commit_predecessors
222                .remove(commit.id())
223                .expect("commit must be visited once");
224            CommitEvolutionEntry {
225                commit,
226                operation: None,
227                reachable_predecessors: Some(predecessors),
228            }
229        }));
230        Ok(())
231    }
232
233    /// Moves remainder commits to output queue.
234    fn flush_commits(&mut self) -> BackendResult<()> {
235        self.queued.reserve(self.to_visit.len());
236        for id in self.to_visit.drain(..) {
237            let commit = self.repo.store().get_commit(&id)?;
238            self.queued.push_back(CommitEvolutionEntry {
239                commit,
240                operation: None,
241                // There were no legacy operations, so the commit should have no
242                // predecessors.
243                reachable_predecessors: Some(vec![]),
244            });
245        }
246        Ok(())
247    }
248}
249
250impl<I> Iterator for WalkPredecessors<'_, I>
251where
252    I: Stream<Item = OpStoreResult<Operation>> + Unpin,
253{
254    type Item = Result<CommitEvolutionEntry, WalkPredecessorsError>;
255
256    fn next(&mut self) -> Option<Self::Item> {
257        self.try_next().transpose()
258    }
259}
260
261/// Collects predecessor records from `new_ops` to `old_ops`, and resolves
262/// transitive entries.
263///
264/// This function assumes that there exists a single greatest common ancestors
265/// between `old_ops` and `new_ops`. If `old_ops` and `new_ops` have ancestors
266/// and descendants each other, or if criss-crossed merges exist between these
267/// operations, the returned mapping would be lossy.
268pub async fn accumulate_predecessors(
269    new_ops: &[Operation],
270    old_ops: &[Operation],
271) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
272    if new_ops.is_empty() || old_ops.is_empty() {
273        return Ok(BTreeMap::new()); // No common ancestor exists
274    }
275
276    // Fast path for the single forward operation case.
277    if let [op] = new_ops
278        && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
279    {
280        let Some(map) = &op.store_operation().commit_predecessors else {
281            return Ok(BTreeMap::new());
282        };
283        return resolve_transitive_edges(map, map.keys())
284            .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
285    }
286
287    // Follow reverse edges from the common ancestor to old_ops. Here we use
288    // BTreeMap to stabilize order of the reversed edges.
289    let mut accumulated = BTreeMap::new();
290    let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
291    if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
292        return Ok(BTreeMap::new());
293    }
294    let mut accumulated = reverse_edges(accumulated);
295    // Follow forward edges from new_ops to the common ancestor.
296    let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
297    if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
298        return Ok(BTreeMap::new());
299    }
300    let new_commit_ids = new_ops
301        .iter()
302        .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
303        .flat_map(|map| map.keys());
304    resolve_transitive_edges(&accumulated, new_commit_ids)
305        .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
306}
307
308async fn try_collect_predecessors_into(
309    collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
310    ops: impl Stream<Item = OpStoreResult<Operation>>,
311) -> OpStoreResult<bool> {
312    let mut ops = pin!(ops);
313    while let Some(op) = ops.try_next().await? {
314        let Some(map) = &op.store_operation().commit_predecessors else {
315            return Ok(false);
316        };
317        // Just insert. There should be no duplicate entries.
318        collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
319    }
320    Ok(true)
321}
322
323/// Resolves transitive edges in `graph` starting from the `start` nodes,
324/// returns new DAG. The returned DAG only includes edges reachable from the
325/// `start` nodes.
326fn resolve_transitive_edges<'a: 'b, 'b>(
327    graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
328    start: impl IntoIterator<Item = &'b CommitId>,
329) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
330    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
331    let sorted_ids = dag_walk::topo_order_forward_ok(
332        start.into_iter().map(Ok),
333        |&id| id,
334        |&id| graph.get(id).into_iter().flatten().map(Ok),
335        |id| id, // Err(&CommitId) if graph has cycle
336    )?;
337    for cur_id in sorted_ids {
338        let Some(neighbors) = graph.get(cur_id) else {
339            continue;
340        };
341        let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
342        let new_neighbors = match &neighbors[..] {
343            [id] => lookup(id).to_vec(), // unique() not needed
344            ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
345        };
346        new_graph.insert(cur_id.clone(), new_neighbors);
347    }
348    Ok(new_graph)
349}
350
351fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
352    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
353    for (node1, neighbors) in graph {
354        for node2 in neighbors {
355            new_graph.entry(node2).or_default().push(node1.clone());
356        }
357    }
358    new_graph
359}