Skip to main content

jj_lib/
op_walk.rs

1// Copyright 2020-2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utility for operation id resolution and traversal.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::ready;
27use futures::future::try_join_all;
28use futures::stream;
29use itertools::Itertools as _;
30use pollster::FutureExt as _;
31use thiserror::Error;
32
33use crate::dag_walk;
34use crate::object_id::HexPrefix;
35use crate::object_id::PrefixResolution;
36use crate::op_heads_store;
37use crate::op_heads_store::OpHeadResolutionError;
38use crate::op_heads_store::OpHeadsStore;
39use crate::op_heads_store::OpHeadsStoreError;
40use crate::op_store::OpStore;
41use crate::op_store::OpStoreError;
42use crate::op_store::OpStoreResult;
43use crate::op_store::OperationId;
44use crate::operation::Operation;
45use crate::repo::ReadonlyRepo;
46use crate::repo::Repo as _;
47use crate::repo::RepoLoader;
48
49/// Error that may occur during evaluation of operation set expression.
50#[derive(Debug, Error)]
51pub enum OpsetEvaluationError {
52    /// Failed to resolve operation set expression.
53    #[error(transparent)]
54    OpsetResolution(#[from] OpsetResolutionError),
55    /// Failed to read op heads.
56    #[error(transparent)]
57    OpHeadsStore(#[from] OpHeadsStoreError),
58    /// Failed to resolve the current operation heads.
59    #[error(transparent)]
60    OpHeadResolution(#[from] OpHeadResolutionError),
61    /// Failed to access operation object.
62    #[error(transparent)]
63    OpStore(#[from] OpStoreError),
64}
65
66/// Error that may occur during parsing and resolution of operation set
67/// expression.
68#[derive(Debug, Error)]
69pub enum OpsetResolutionError {
70    // TODO: Maybe empty/multiple operations should be allowed, and rejected by
71    // caller as needed.
72    /// Expression resolved to multiple operations.
73    #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
74    MultipleOperations {
75        /// Source expression.
76        expr: String,
77        /// Matched operation ids.
78        candidates: Vec<OperationId>,
79    },
80    /// Expression resolved to no operations.
81    #[error(r#"The "{0}" expression resolved to no operations"#)]
82    EmptyOperations(String),
83    /// Invalid symbol as an operation ID.
84    #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
85    InvalidIdPrefix(String),
86    /// Operation ID not found.
87    #[error(r#"No operation ID matching "{0}""#)]
88    NoSuchOperation(String),
89    /// Operation ID prefix matches multiple operations.
90    #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
91    AmbiguousIdPrefix(String),
92}
93
94/// Resolves operation set expression without loading a repo.
95pub async fn resolve_op_for_load(
96    repo_loader: &RepoLoader,
97    op_str: &str,
98) -> Result<Operation, OpsetEvaluationError> {
99    let op_store = repo_loader.op_store();
100    let op_heads_store = repo_loader.op_heads_store().as_ref();
101    let get_current_op = async || {
102        op_heads_store::resolve_op_heads(op_heads_store, op_store, async |op_heads| {
103            Err(OpsetResolutionError::MultipleOperations {
104                expr: "@".to_owned(),
105                candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
106            }
107            .into())
108        })
109        .await
110    };
111    let get_head_ops = async || get_current_head_ops(op_store, op_heads_store).await;
112    resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
113}
114
115/// Resolves operation set expression against the loaded repo.
116///
117/// The "@" symbol will be resolved to the operation the repo was loaded at.
118pub async fn resolve_op_with_repo(
119    repo: &ReadonlyRepo,
120    op_str: &str,
121) -> Result<Operation, OpsetEvaluationError> {
122    resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str).await
123}
124
125/// Resolves operation set expression at the given head operations.
126pub async fn resolve_op_at(
127    op_store: &Arc<dyn OpStore>,
128    head_ops: &[Operation],
129    op_str: &str,
130) -> Result<Operation, OpsetEvaluationError> {
131    let get_current_op = async || match head_ops {
132        [head_op] => Ok(head_op.clone()),
133        [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
134        _ => Err(OpsetResolutionError::MultipleOperations {
135            expr: "@".to_owned(),
136            candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
137        }
138        .into()),
139    };
140    let get_head_ops = async || Ok(head_ops.to_vec());
141    resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
142}
143
144/// Resolves operation set expression with the given "@" symbol resolution
145/// callbacks.
146async fn resolve_single_op(
147    op_store: &Arc<dyn OpStore>,
148    get_current_op: impl AsyncFnOnce() -> Result<Operation, OpsetEvaluationError>,
149    get_head_ops: impl AsyncFnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
150    op_str: &str,
151) -> Result<Operation, OpsetEvaluationError> {
152    let op_symbol = op_str.trim_end_matches(['-', '+']);
153    let op_postfix = &op_str[op_symbol.len()..];
154    let head_ops = if op_postfix.contains('+') {
155        Some(get_head_ops().await?)
156    } else {
157        None
158    };
159    let mut operation = match op_symbol {
160        "@" => get_current_op().await,
161        s => resolve_single_op_from_store(op_store, s).await,
162    }?;
163    for (i, c) in op_postfix.chars().enumerate() {
164        let mut neighbor_ops = match c {
165            '-' => operation.parents().await?,
166            '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id()).await?,
167            _ => unreachable!(),
168        };
169        operation = match neighbor_ops.len() {
170            // Since there is no hint provided for `EmptyOperations` in
171            // `opset_resolution_error_hint()` (there would be no useful hint for the
172            // user to take action on anyway), we don't have to worry about op ids being
173            // incoherent with the op set expression shown to the user, unlike for the
174            // `MultipleOperations` variant.
175            //
176            // The full op set expression is guaranteed to be empty in this case,
177            // because ancestors/descendants of an empty operation are empty.
178            0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
179            1 => neighbor_ops.pop().unwrap(),
180            // Returns the exact subexpression that resolves to multiple operations,
181            // rather than the full expression provided by the user.
182            _ => Err(OpsetResolutionError::MultipleOperations {
183                expr: op_str[..=op_symbol.len() + i].to_owned(),
184                candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
185            })?,
186        };
187    }
188    Ok(operation)
189}
190
191async fn resolve_single_op_from_store(
192    op_store: &Arc<dyn OpStore>,
193    op_str: &str,
194) -> Result<Operation, OpsetEvaluationError> {
195    if op_str.is_empty() {
196        return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
197    }
198    let prefix = HexPrefix::try_from_hex(op_str)
199        .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
200    match op_store.resolve_operation_id_prefix(&prefix).await? {
201        PrefixResolution::NoMatch => {
202            Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
203        }
204        PrefixResolution::SingleMatch(op_id) => {
205            let data = op_store.read_operation(&op_id).await?;
206            Ok(Operation::new(op_store.clone(), op_id, data))
207        }
208        PrefixResolution::AmbiguousMatch => {
209            Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
210        }
211    }
212}
213
214/// Loads the current head operations. The returned operations may contain
215/// redundant ones which are ancestors of the other heads.
216pub async fn get_current_head_ops(
217    op_store: &Arc<dyn OpStore>,
218    op_heads_store: &dyn OpHeadsStore,
219) -> Result<Vec<Operation>, OpsetEvaluationError> {
220    let head_ops_futures = op_heads_store.get_op_heads().await?.into_iter().map(
221        async |id| -> OpStoreResult<Operation> {
222            let data = op_store.read_operation(&id).await?;
223            Ok(Operation::new(op_store.clone(), id, data))
224        },
225    );
226    let mut head_ops = try_join_all(head_ops_futures).await?;
227    // To stabilize output, sort in the same order as resolve_op_heads()
228    head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
229    Ok(head_ops)
230}
231
232/// Looks up children of the `root_op_id` by traversing from the `head_ops`.
233///
234/// This will be slow if the `root_op_id` is far away (or unreachable) from the
235/// `head_ops`.
236async fn find_child_ops(
237    head_ops: &[Operation],
238    root_op_id: &OperationId,
239) -> OpStoreResult<Vec<Operation>> {
240    walk_ancestors(head_ops)
241        .take_while(|res| ready(res.as_ref().map_or(true, |op| op.id() != root_op_id)))
242        .try_filter(|op| ready(op.parent_ids().iter().any(|id| id == root_op_id)))
243        .try_collect()
244        .await
245}
246
247#[derive(Clone, Debug, Eq, Hash, PartialEq)]
248struct OperationByEndTime(Operation);
249
250impl Ord for OperationByEndTime {
251    fn cmp(&self, other: &Self) -> Ordering {
252        let self_end_time = &self.0.metadata().time.end;
253        let other_end_time = &other.0.metadata().time.end;
254        self_end_time
255            .cmp(other_end_time)
256            .then_with(|| self.0.cmp(&other.0)) // to comply with Eq
257    }
258}
259
260impl PartialOrd for OperationByEndTime {
261    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
262        Some(self.cmp(other))
263    }
264}
265
266/// Walks `head_ops` and their ancestors in reverse topological order.
267pub fn walk_ancestors(
268    head_ops: &[Operation],
269) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
270    let head_ops = head_ops
271        .iter()
272        .cloned()
273        .map(OperationByEndTime)
274        .collect_vec();
275    // Lazily load operations based on timestamp-based heuristic. This works so long
276    // as the operation history is mostly linear.
277    stream::iter(dag_walk::topo_order_reverse_lazy_ok(
278        head_ops.into_iter().map(Ok),
279        |OperationByEndTime(op)| op.id().clone(),
280        |OperationByEndTime(op)| match op.parents().block_on() {
281            Ok(parents) => parents
282                .into_iter()
283                .map(|parent| Ok(OperationByEndTime(parent)))
284                .collect_vec(),
285            Err(err) => vec![Err(err)],
286        },
287        |_| panic!("graph has cycle"),
288    ))
289    .map_ok(|OperationByEndTime(op)| op)
290}
291
292/// Walks ancestors from `head_ops` in reverse topological order, excluding
293/// ancestors of `root_ops`.
294pub fn walk_ancestors_range(
295    head_ops: &[Operation],
296    root_ops: &[Operation],
297) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
298    let mut start_ops = itertools::chain(head_ops, root_ops)
299        .cloned()
300        .map(OperationByEndTime)
301        .collect_vec();
302
303    // Consume items until root_ops to get rid of unwanted ops.
304    let leading_items = if root_ops.is_empty() {
305        vec![]
306    } else {
307        let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
308        collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
309    };
310
311    // Lazily load operations based on timestamp-based heuristic. This works so long
312    // as the operation history is mostly linear.
313    let trailing_iter = dag_walk::topo_order_reverse_lazy_ok(
314        start_ops.into_iter().map(Ok),
315        |OperationByEndTime(op)| op.id().clone(),
316        |OperationByEndTime(op)| match op.parents().block_on() {
317            Ok(parents) => parents
318                .into_iter()
319                .map(|op| Ok(OperationByEndTime(op)))
320                .collect_vec(),
321            Err(err) => vec![Err(err)],
322        },
323        |_| panic!("graph has cycle"),
324    )
325    .map_ok(|OperationByEndTime(op)| op);
326    stream::iter(leading_items).chain(stream::iter(trailing_iter))
327}
328
329fn collect_ancestors_until_roots(
330    start_ops: &mut Vec<OperationByEndTime>,
331    mut unwanted_ids: HashSet<OperationId>,
332) -> Vec<OpStoreResult<Operation>> {
333    let sorted_ops = match dag_walk::topo_order_reverse_chunked(
334        start_ops,
335        |OperationByEndTime(op)| op.id().clone(),
336        |OperationByEndTime(op)| match op.parents().block_on() {
337            Ok(parents) => parents
338                .into_iter()
339                .map(|op| Ok(OperationByEndTime(op)))
340                .collect_vec(),
341            Err(err) => vec![Err(err)],
342        },
343        |_| panic!("graph has cycle"),
344    ) {
345        Ok(sorted_ops) => sorted_ops,
346        Err(err) => return vec![Err(err)],
347    };
348    let mut items = Vec::new();
349    for OperationByEndTime(op) in sorted_ops {
350        if unwanted_ids.contains(op.id()) {
351            unwanted_ids.extend(op.parent_ids().iter().cloned());
352        } else {
353            items.push(Ok(op));
354        }
355    }
356    // Don't visit ancestors of unwanted ops further.
357    start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
358    items
359}
360
361/// Stats about `reparent_range()`.
362#[derive(Clone, Debug, Eq, PartialEq)]
363pub struct ReparentStats {
364    /// New head operation ids in order of the old `head_ops`.
365    pub new_head_ids: Vec<OperationId>,
366    /// The number of rewritten operations.
367    pub rewritten_count: usize,
368    /// The number of ancestor operations that become unreachable from the
369    /// rewritten heads.
370    pub unreachable_count: usize,
371}
372
373/// Reparents the operation range `root_ops..head_ops` onto the `dest_op`.
374///
375/// Returns the new head operation ids as well as some stats. If the old
376/// operation heads are remapped to the new heads, the operations within the
377/// range `dest_op..root_ops` become unreachable.
378///
379/// If the source operation range `root_ops..head_ops` was empty, the
380/// `new_head_ids` will be `[dest_op.id()]`, meaning the `dest_op` is the head.
381// TODO: Find better place to host this function. It might be an OpStore method.
382pub async fn reparent_range(
383    op_store: &dyn OpStore,
384    root_ops: &[Operation],
385    head_ops: &[Operation],
386    dest_op: &Operation,
387) -> OpStoreResult<ReparentStats> {
388    let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops)
389        .try_collect()
390        .await?;
391    let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
392        .try_fold(0, |acc, _| async move { Ok(acc + 1) })
393        .await?;
394
395    assert!(
396        ops_to_reparent
397            .last()
398            .is_none_or(|op| op.id() != op_store.root_operation_id()),
399        "root operation cannot be rewritten"
400    );
401    let mut rewritten_ids = HashMap::new();
402    for old_op in ops_to_reparent.into_iter().rev() {
403        let mut data = old_op.store_operation().clone();
404        let mut dest_once = Some(dest_op.id());
405        data.parents = data
406            .parents
407            .iter()
408            .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
409            .cloned()
410            .collect();
411        let new_id = op_store.write_operation(&data).await?;
412        rewritten_ids.insert(old_op.id().clone(), new_id);
413    }
414
415    let mut dest_once = Some(dest_op.id());
416    let new_head_ids = head_ops
417        .iter()
418        .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
419        .cloned()
420        .collect();
421    Ok(ReparentStats {
422        new_head_ids,
423        rewritten_count: rewritten_ids.len(),
424        unreachable_count,
425    })
426}