Skip to main content

jj_lib/
op_walk.rs

1// Copyright 2020-2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utility for operation id resolution and traversal.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::ready;
27use futures::future::try_join_all;
28use futures::stream;
29use itertools::Itertools as _;
30use pollster::FutureExt as _;
31use thiserror::Error;
32
33use crate::dag_walk_async;
34use crate::object_id::HexPrefix;
35use crate::object_id::PrefixResolution;
36use crate::op_heads_store;
37use crate::op_heads_store::OpHeadsStore;
38use crate::op_heads_store::OpHeadsStoreError;
39use crate::op_store::OpStore;
40use crate::op_store::OpStoreError;
41use crate::op_store::OpStoreResult;
42use crate::op_store::OperationId;
43use crate::operation::Operation;
44use crate::repo::ReadonlyRepo;
45use crate::repo::Repo as _;
46use crate::repo::RepoLoader;
47
48/// Error that may occur during evaluation of operation set expression.
49#[derive(Debug, Error)]
50pub enum OpsetEvaluationError {
51    /// Failed to resolve operation set expression.
52    #[error(transparent)]
53    OpsetResolution(#[from] OpsetResolutionError),
54    /// Failed to read op heads.
55    #[error(transparent)]
56    OpHeadsStore(#[from] OpHeadsStoreError),
57    /// Failed to access operation object.
58    #[error(transparent)]
59    OpStore(#[from] OpStoreError),
60}
61
62/// Error that may occur during parsing and resolution of operation set
63/// expression.
64#[derive(Debug, Error)]
65pub enum OpsetResolutionError {
66    // TODO: Maybe empty/multiple operations should be allowed, and rejected by
67    // caller as needed.
68    /// Expression resolved to multiple operations.
69    #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
70    MultipleOperations {
71        /// Source expression.
72        expr: String,
73        /// Matched operation ids.
74        candidates: Vec<OperationId>,
75    },
76    /// Expression resolved to no operations.
77    #[error(r#"The "{0}" expression resolved to no operations"#)]
78    EmptyOperations(String),
79    /// Invalid symbol as an operation ID.
80    #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
81    InvalidIdPrefix(String),
82    /// Operation ID not found.
83    #[error(r#"No operation ID matching "{0}""#)]
84    NoSuchOperation(String),
85    /// Operation ID prefix matches multiple operations.
86    #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
87    AmbiguousIdPrefix(String),
88}
89
90/// Resolves operation set expression without loading a repo.
91pub async fn resolve_op_for_load(
92    repo_loader: &RepoLoader,
93    op_str: &str,
94) -> Result<Operation, OpsetEvaluationError> {
95    let op_store = repo_loader.op_store();
96    let op_heads_store = repo_loader.op_heads_store().as_ref();
97    let get_current_op = async || {
98        op_heads_store::resolve_op_heads(op_heads_store, op_store, async |op_heads| {
99            Err(OpsetResolutionError::MultipleOperations {
100                expr: "@".to_owned(),
101                candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
102            }
103            .into())
104        })
105        .await
106    };
107    let get_head_ops = async || get_current_head_ops(op_store, op_heads_store).await;
108    resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
109}
110
111/// Resolves operation set expression against the loaded repo.
112///
113/// The "@" symbol will be resolved to the operation the repo was loaded at.
114pub async fn resolve_op_with_repo(
115    repo: &ReadonlyRepo,
116    op_str: &str,
117) -> Result<Operation, OpsetEvaluationError> {
118    resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str).await
119}
120
121/// Resolves operation set expression at the given head operations.
122pub async fn resolve_op_at(
123    op_store: &Arc<dyn OpStore>,
124    head_ops: &[Operation],
125    op_str: &str,
126) -> Result<Operation, OpsetEvaluationError> {
127    let get_current_op = async || match head_ops {
128        [head_op] => Ok(head_op.clone()),
129        [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
130        _ => Err(OpsetResolutionError::MultipleOperations {
131            expr: "@".to_owned(),
132            candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
133        }
134        .into()),
135    };
136    let get_head_ops = async || Ok(head_ops.to_vec());
137    resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
138}
139
140/// Resolves operation set expression with the given "@" symbol resolution
141/// callbacks.
142async fn resolve_single_op(
143    op_store: &Arc<dyn OpStore>,
144    get_current_op: impl AsyncFnOnce() -> Result<Operation, OpsetEvaluationError>,
145    get_head_ops: impl AsyncFnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
146    op_str: &str,
147) -> Result<Operation, OpsetEvaluationError> {
148    let op_symbol = op_str.trim_end_matches(['-', '+']);
149    let op_postfix = &op_str[op_symbol.len()..];
150    let head_ops = if op_postfix.contains('+') {
151        Some(get_head_ops().await?)
152    } else {
153        None
154    };
155    let mut operation = match op_symbol {
156        "@" => get_current_op().await,
157        s => resolve_single_op_from_store(op_store, s).await,
158    }?;
159    for (i, c) in op_postfix.chars().enumerate() {
160        let mut neighbor_ops = match c {
161            '-' => operation.parents().await?,
162            '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id()).await?,
163            _ => unreachable!(),
164        };
165        operation = match neighbor_ops.len() {
166            // Since there is no hint provided for `EmptyOperations` in
167            // `opset_resolution_error_hint()` (there would be no useful hint for the
168            // user to take action on anyway), we don't have to worry about op ids being
169            // incoherent with the op set expression shown to the user, unlike for the
170            // `MultipleOperations` variant.
171            //
172            // The full op set expression is guaranteed to be empty in this case,
173            // because ancestors/descendants of an empty operation are empty.
174            0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
175            1 => neighbor_ops.pop().unwrap(),
176            // Returns the exact subexpression that resolves to multiple operations,
177            // rather than the full expression provided by the user.
178            _ => Err(OpsetResolutionError::MultipleOperations {
179                expr: op_str[..=op_symbol.len() + i].to_owned(),
180                candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
181            })?,
182        };
183    }
184    Ok(operation)
185}
186
187async fn resolve_single_op_from_store(
188    op_store: &Arc<dyn OpStore>,
189    op_str: &str,
190) -> Result<Operation, OpsetEvaluationError> {
191    if op_str.is_empty() {
192        return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
193    }
194    let prefix = HexPrefix::try_from_hex(op_str)
195        .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
196    match op_store.resolve_operation_id_prefix(&prefix).await? {
197        PrefixResolution::NoMatch => {
198            Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
199        }
200        PrefixResolution::SingleMatch(op_id) => {
201            let data = op_store.read_operation(&op_id).await?;
202            Ok(Operation::new(op_store.clone(), op_id, data))
203        }
204        PrefixResolution::AmbiguousMatch => {
205            Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
206        }
207    }
208}
209
210/// Loads the current head operations. The returned operations may contain
211/// redundant ones which are ancestors of the other heads.
212pub async fn get_current_head_ops(
213    op_store: &Arc<dyn OpStore>,
214    op_heads_store: &dyn OpHeadsStore,
215) -> Result<Vec<Operation>, OpsetEvaluationError> {
216    let head_ops_futures = op_heads_store.get_op_heads().await?.into_iter().map(
217        async |id| -> OpStoreResult<Operation> {
218            let data = op_store.read_operation(&id).await?;
219            Ok(Operation::new(op_store.clone(), id, data))
220        },
221    );
222    let mut head_ops = try_join_all(head_ops_futures).await?;
223    // To stabilize output, sort in the same order as resolve_op_heads()
224    head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
225    Ok(head_ops)
226}
227
228/// Looks up children of the `root_op_id` by traversing from the `head_ops`.
229///
230/// This will be slow if the `root_op_id` is far away (or unreachable) from the
231/// `head_ops`.
232async fn find_child_ops(
233    head_ops: &[Operation],
234    root_op_id: &OperationId,
235) -> OpStoreResult<Vec<Operation>> {
236    walk_ancestors(head_ops)
237        .take_while(|res| ready(res.as_ref().map_or(true, |op| op.id() != root_op_id)))
238        .try_filter(|op| ready(op.parent_ids().iter().any(|id| id == root_op_id)))
239        .try_collect()
240        .await
241}
242
243#[derive(Clone, Debug, Eq, Hash, PartialEq)]
244struct OperationByEndTime(Operation);
245
246impl Ord for OperationByEndTime {
247    fn cmp(&self, other: &Self) -> Ordering {
248        let self_end_time = &self.0.metadata().time.end;
249        let other_end_time = &other.0.metadata().time.end;
250        self_end_time
251            .cmp(other_end_time)
252            .then_with(|| self.0.cmp(&other.0)) // to comply with Eq
253    }
254}
255
256impl PartialOrd for OperationByEndTime {
257    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258        Some(self.cmp(other))
259    }
260}
261
262/// Walks `head_ops` and their ancestors in reverse topological order.
263pub fn walk_ancestors(
264    head_ops: &[Operation],
265) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
266    let head_ops = head_ops
267        .iter()
268        .cloned()
269        .map(OperationByEndTime)
270        .collect_vec();
271    // Lazily load operations based on timestamp-based heuristic. This works so long
272    // as the operation history is mostly linear.
273    dag_walk_async::topo_order_reverse_lazy(
274        head_ops.into_iter().map(Ok),
275        |OperationByEndTime(op)| op.id().clone(),
276        async |OperationByEndTime(op)| match op.parents().await {
277            Ok(parents) => parents
278                .into_iter()
279                .map(|parent| Ok(OperationByEndTime(parent)))
280                .collect_vec(),
281            Err(err) => vec![Err(err)],
282        },
283        |_| panic!("graph has cycle"),
284    )
285    .map_ok(|OperationByEndTime(op)| op)
286}
287
288/// Walks ancestors from `head_ops` in reverse topological order, excluding
289/// ancestors of `root_ops`.
290pub fn walk_ancestors_range(
291    head_ops: &[Operation],
292    root_ops: &[Operation],
293) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
294    let mut start_ops = itertools::chain(head_ops, root_ops)
295        .cloned()
296        .map(OperationByEndTime)
297        .collect_vec();
298
299    // Consume items until root_ops to get rid of unwanted ops.
300    let leading_items = if root_ops.is_empty() {
301        vec![]
302    } else {
303        let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
304        collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
305    };
306
307    // Lazily load operations based on timestamp-based heuristic. This works so long
308    // as the operation history is mostly linear.
309    let trailing_stream = dag_walk_async::topo_order_reverse_lazy(
310        start_ops.into_iter().map(Ok),
311        |OperationByEndTime(op)| op.id().clone(),
312        async |OperationByEndTime(op)| match op.parents().await {
313            Ok(parents) => parents
314                .into_iter()
315                .map(|op| Ok(OperationByEndTime(op)))
316                .collect_vec(),
317            Err(err) => vec![Err(err)],
318        },
319        |_| panic!("graph has cycle"),
320    )
321    .map_ok(|OperationByEndTime(op)| op);
322    stream::iter(leading_items).chain(trailing_stream)
323}
324
325fn collect_ancestors_until_roots(
326    start_ops: &mut Vec<OperationByEndTime>,
327    mut unwanted_ids: HashSet<OperationId>,
328) -> Vec<OpStoreResult<Operation>> {
329    let sorted_ops = match dag_walk_async::topo_order_reverse_chunked(
330        start_ops,
331        |OperationByEndTime(op)| op.id().clone(),
332        async |OperationByEndTime(op)| match op.parents().await {
333            Ok(parents) => parents
334                .into_iter()
335                .map(|op| Ok(OperationByEndTime(op)))
336                .collect_vec(),
337            Err(err) => vec![Err(err)],
338        },
339        |_| panic!("graph has cycle"),
340    )
341    .block_on()
342    {
343        Ok(sorted_ops) => sorted_ops,
344        Err(err) => return vec![Err(err)],
345    };
346    let mut items = Vec::new();
347    for OperationByEndTime(op) in sorted_ops {
348        if unwanted_ids.contains(op.id()) {
349            unwanted_ids.extend(op.parent_ids().iter().cloned());
350        } else {
351            items.push(Ok(op));
352        }
353    }
354    // Don't visit ancestors of unwanted ops further.
355    start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
356    items
357}
358
359/// Finds the closest common ancestor of `set1` and `set2`. Uses the end
360/// timestamp as a heuristic.
361// TODO: We should probably make this a function on `OpStore` instead of
362// relying on heuristics (even if the implementation of the trait might rely on
363// the same heuristic initially).
364pub async fn closest_common_ancestors(
365    set1: impl IntoIterator<Item = Operation>,
366    set2: impl IntoIterator<Item = Operation>,
367) -> OpStoreResult<Vec<Operation>> {
368    let ancestor_ops = dag_walk_async::closest_common_nodes(
369        set1.into_iter().map(OperationByEndTime),
370        set2.into_iter().map(OperationByEndTime),
371        |op: &OperationByEndTime| op.0.id().clone(),
372        async |op: &OperationByEndTime| {
373            op.0.parents()
374                .await
375                .map(|parents| parents.into_iter().map(OperationByEndTime))
376        },
377    )
378    .await?;
379    Ok(ancestor_ops.into_iter().map(|op| op.0).collect())
380}
381
382/// Stats about `reparent_range()`.
383#[derive(Clone, Debug, Eq, PartialEq)]
384pub struct ReparentStats {
385    /// New head operation ids in order of the old `head_ops`.
386    pub new_head_ids: Vec<OperationId>,
387    /// The number of rewritten operations.
388    pub rewritten_count: usize,
389    /// The number of ancestor operations that become unreachable from the
390    /// rewritten heads.
391    pub unreachable_count: usize,
392}
393
394/// Reparents the operation range `root_ops..head_ops` onto the `dest_op`.
395///
396/// Returns the new head operation ids as well as some stats. If the old
397/// operation heads are remapped to the new heads, the operations within the
398/// range `dest_op..root_ops` become unreachable.
399///
400/// If the source operation range `root_ops..head_ops` was empty, the
401/// `new_head_ids` will be `[dest_op.id()]`, meaning the `dest_op` is the head.
402// TODO: Find better place to host this function. It might be an OpStore method.
403pub async fn reparent_range(
404    op_store: &dyn OpStore,
405    root_ops: &[Operation],
406    head_ops: &[Operation],
407    dest_op: &Operation,
408) -> OpStoreResult<ReparentStats> {
409    let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops)
410        .try_collect()
411        .await?;
412    let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
413        .try_fold(0, |acc, _| async move { Ok(acc + 1) })
414        .await?;
415
416    assert!(
417        ops_to_reparent
418            .last()
419            .is_none_or(|op| op.id() != op_store.root_operation_id()),
420        "root operation cannot be rewritten"
421    );
422    let mut rewritten_ids = HashMap::new();
423    for old_op in ops_to_reparent.into_iter().rev() {
424        let mut data = old_op.store_operation().clone();
425        let mut dest_once = Some(dest_op.id());
426        data.parents = data
427            .parents
428            .iter()
429            .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
430            .cloned()
431            .collect();
432        let new_id = op_store.write_operation(&data).await?;
433        rewritten_ids.insert(old_op.id().clone(), new_id);
434    }
435
436    let mut dest_once = Some(dest_op.id());
437    let new_head_ids = head_ops
438        .iter()
439        .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
440        .cloned()
441        .collect();
442    Ok(ReparentStats {
443        new_head_ids,
444        rewritten_count: rewritten_ids.len(),
445        unreachable_count,
446    })
447}