Skip to main content

jj_lib/
evolution.rs

1// Copyright 2025 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utility for commit evolution history.
16
17use std::collections::BTreeMap;
18use std::collections::VecDeque;
19use std::pin::pin;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::try_join_all;
27use futures::stream;
28use itertools::Itertools as _;
29use thiserror::Error;
30
31use crate::backend::BackendError;
32use crate::backend::BackendResult;
33use crate::backend::CommitId;
34use crate::commit::Commit;
35use crate::dag_walk;
36use crate::op_store::OpStoreError;
37use crate::op_store::OpStoreResult;
38use crate::op_walk;
39use crate::operation::Operation;
40use crate::repo::ReadonlyRepo;
41use crate::repo::Repo as _;
42use crate::store::Store;
43
44/// Commit with predecessor information.
45#[derive(Clone, Debug, serde::Serialize)]
46pub struct CommitEvolutionEntry {
47    /// Commit id and metadata.
48    pub commit: Commit,
49    /// Operation where the commit was created or rewritten.
50    pub operation: Option<Operation>,
51}
52
53impl CommitEvolutionEntry {
54    /// Predecessor ids of this commit.
55    pub fn predecessor_ids(&self) -> &[CommitId] {
56        match &self.operation {
57            Some(op) => op.predecessors_for_commit(self.commit.id()).unwrap(),
58            None => &[],
59        }
60    }
61
62    /// Predecessor commit objects of this commit.
63    pub async fn predecessors(&self) -> BackendResult<Vec<Commit>> {
64        let store = self.commit.store();
65        try_join_all(
66            self.predecessor_ids()
67                .iter()
68                .map(|id| store.get_commit_async(id)),
69        )
70        .await
71    }
72}
73
74#[expect(missing_docs)]
75#[derive(Debug, Error)]
76pub enum WalkPredecessorsError {
77    #[error(transparent)]
78    Backend(#[from] BackendError),
79    #[error(transparent)]
80    OpStore(#[from] OpStoreError),
81    #[error("Predecessors cycle detected around commit {0}")]
82    CycleDetected(CommitId),
83}
84
85/// Walks operations to emit commit predecessors in reverse topological order.
86pub fn walk_predecessors(
87    repo: &ReadonlyRepo,
88    start_commits: &[CommitId],
89) -> impl Stream<Item = Result<CommitEvolutionEntry, WalkPredecessorsError>> + use<> {
90    let op_ancestors = op_walk::walk_ancestors(slice::from_ref(repo.operation())).boxed_local();
91    let state = WalkPredecessors {
92        store: repo.store().clone(),
93        op_ancestors,
94        to_visit: start_commits.to_vec(),
95        queued: VecDeque::new(),
96    };
97    stream::unfold(state, |mut state| async move {
98        let result = state.try_next_impl().await.transpose()?;
99        Some((result, state))
100    })
101}
102
103struct WalkPredecessors<I> {
104    store: Arc<Store>,
105    op_ancestors: I,
106    to_visit: Vec<CommitId>,
107    queued: VecDeque<CommitEvolutionEntry>,
108}
109
110impl<I> WalkPredecessors<I>
111where
112    I: Stream<Item = OpStoreResult<Operation>> + Unpin,
113{
114    async fn try_next_impl(
115        &mut self,
116    ) -> Result<Option<CommitEvolutionEntry>, WalkPredecessorsError> {
117        while !self.to_visit.is_empty() && self.queued.is_empty() {
118            let Some(op) = self.op_ancestors.try_next().await? else {
119                self.flush_commits().await?;
120                break;
121            };
122            if !op.stores_commit_predecessors() {
123                // There may be concurrent ops, but let's ignore the rest.
124                // Operation history should be mostly linear.
125                self.flush_commits().await?;
126                break;
127            }
128            self.visit_op(&op).await?;
129        }
130        Ok(self.queued.pop_front())
131    }
132
133    /// Looks for predecessors within the given operation.
134    async fn visit_op(&mut self, op: &Operation) -> Result<(), WalkPredecessorsError> {
135        let mut to_emit = Vec::new(); // transitive edges should be short
136        let mut has_dup = false;
137        let mut i = 0;
138        while let Some(cur_id) = self.to_visit.get(i) {
139            if let Some(next_ids) = op.predecessors_for_commit(cur_id) {
140                if to_emit.contains(cur_id) {
141                    self.to_visit.remove(i);
142                    has_dup = true;
143                    continue;
144                }
145                to_emit.extend(self.to_visit.splice(i..=i, next_ids.iter().cloned()));
146            } else {
147                i += 1;
148            }
149        }
150
151        // TODO: We no longer need Commit objects. Should we move
152        // get_commit_async(id) to callers?
153        let mut emit = async |id: &CommitId| -> BackendResult<()> {
154            let commit = self.store.get_commit_async(id).await?;
155            self.queued.push_back(CommitEvolutionEntry {
156                commit,
157                operation: Some(op.clone()),
158            });
159            Ok(())
160        };
161        match &*to_emit {
162            [] => {}
163            [id] if !has_dup => emit(id).await?,
164            _ => {
165                let sorted_ids = dag_walk::topo_order_reverse_ok(
166                    to_emit.iter().map(Ok),
167                    |&id| id,
168                    |&id| op.predecessors_for_commit(id).into_iter().flatten().map(Ok),
169                    |id| id, // Err(&CommitId) if graph has cycle
170                )
171                .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))?;
172                for &id in &sorted_ids {
173                    if op.predecessors_for_commit(id).is_some() {
174                        emit(id).await?;
175                    }
176                }
177            }
178        }
179        Ok(())
180    }
181
182    /// Moves remainder commits to output queue.
183    async fn flush_commits(&mut self) -> BackendResult<()> {
184        self.queued.reserve(self.to_visit.len());
185        for id in self.to_visit.drain(..) {
186            let commit = self.store.get_commit_async(&id).await?;
187            self.queued.push_back(CommitEvolutionEntry {
188                commit,
189                operation: None,
190            });
191        }
192        Ok(())
193    }
194}
195
196/// Collects predecessor records from `new_ops` to `old_ops`, and resolves
197/// transitive entries.
198///
199/// This function assumes that there exists a single greatest common ancestors
200/// between `old_ops` and `new_ops`. If `old_ops` and `new_ops` have ancestors
201/// and descendants each other, or if criss-crossed merges exist between these
202/// operations, the returned mapping would be lossy.
203pub async fn accumulate_predecessors(
204    new_ops: &[Operation],
205    old_ops: &[Operation],
206) -> Result<BTreeMap<CommitId, Vec<CommitId>>, WalkPredecessorsError> {
207    if new_ops.is_empty() || old_ops.is_empty() {
208        return Ok(BTreeMap::new()); // No common ancestor exists
209    }
210
211    // Fast path for the single forward operation case.
212    if let [op] = new_ops
213        && op.parent_ids().iter().eq(old_ops.iter().map(|op| op.id()))
214    {
215        let Some(map) = &op.store_operation().commit_predecessors else {
216            return Ok(BTreeMap::new());
217        };
218        return resolve_transitive_edges(map, map.keys())
219            .await
220            .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()));
221    }
222
223    // Follow reverse edges from the common ancestor to old_ops. Here we use
224    // BTreeMap to stabilize order of the reversed edges.
225    let mut accumulated = BTreeMap::new();
226    let reverse_ops = op_walk::walk_ancestors_range(old_ops, new_ops);
227    if !try_collect_predecessors_into(&mut accumulated, reverse_ops).await? {
228        return Ok(BTreeMap::new());
229    }
230    let mut accumulated = reverse_edges(accumulated);
231    // Follow forward edges from new_ops to the common ancestor.
232    let forward_ops = op_walk::walk_ancestors_range(new_ops, old_ops);
233    if !try_collect_predecessors_into(&mut accumulated, forward_ops).await? {
234        return Ok(BTreeMap::new());
235    }
236    let new_commit_ids = new_ops
237        .iter()
238        .filter_map(|op| op.store_operation().commit_predecessors.as_ref())
239        .flat_map(|map| map.keys());
240    resolve_transitive_edges(&accumulated, new_commit_ids)
241        .await
242        .map_err(|id| WalkPredecessorsError::CycleDetected(id.clone()))
243}
244
245async fn try_collect_predecessors_into(
246    collected: &mut BTreeMap<CommitId, Vec<CommitId>>,
247    ops: impl Stream<Item = OpStoreResult<Operation>>,
248) -> OpStoreResult<bool> {
249    let mut ops = pin!(ops);
250    while let Some(op) = ops.try_next().await? {
251        let Some(map) = &op.store_operation().commit_predecessors else {
252            return Ok(false);
253        };
254        // Just insert. There should be no duplicate entries.
255        collected.extend(map.iter().map(|(k, v)| (k.clone(), v.clone())));
256    }
257    Ok(true)
258}
259
260/// Resolves transitive edges in `graph` starting from the `start` nodes,
261/// returns new DAG. The returned DAG only includes edges reachable from the
262/// `start` nodes.
263async fn resolve_transitive_edges<'a: 'b, 'b>(
264    graph: &'a BTreeMap<CommitId, Vec<CommitId>>,
265    start: impl IntoIterator<Item = &'b CommitId>,
266) -> Result<BTreeMap<CommitId, Vec<CommitId>>, &'b CommitId> {
267    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
268    let sorted_ids = dag_walk::topo_order_forward_ok(
269        start.into_iter().map(Ok),
270        |&id| id,
271        |&id| graph.get(id).into_iter().flatten().map(Ok),
272        |id| id, // Err(&CommitId) if graph has cycle
273    )?;
274    for cur_id in sorted_ids {
275        let Some(neighbors) = graph.get(cur_id) else {
276            continue;
277        };
278        let lookup = |id| new_graph.get(id).map_or(slice::from_ref(id), Vec::as_slice);
279        let new_neighbors = match &neighbors[..] {
280            [id] => lookup(id).to_vec(), // unique() not needed
281            ids => ids.iter().flat_map(lookup).unique().cloned().collect(),
282        };
283        new_graph.insert(cur_id.clone(), new_neighbors);
284    }
285    Ok(new_graph)
286}
287
288fn reverse_edges(graph: BTreeMap<CommitId, Vec<CommitId>>) -> BTreeMap<CommitId, Vec<CommitId>> {
289    let mut new_graph: BTreeMap<CommitId, Vec<CommitId>> = BTreeMap::new();
290    for (node1, neighbors) in graph {
291        for node2 in neighbors {
292            new_graph.entry(node2).or_default().push(node1.clone());
293        }
294    }
295    new_graph
296}