jj_lib/
op_heads_store.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use itertools::Itertools as _;
23use thiserror::Error;
24
25use crate::dag_walk;
26use crate::op_store::OpStore;
27use crate::op_store::OpStoreError;
28use crate::op_store::OperationId;
29use crate::operation::Operation;
30
31#[derive(Debug, Error)]
32pub enum OpHeadsStoreError {
33    #[error("Failed to read operation heads")]
34    Read(#[source] Box<dyn std::error::Error + Send + Sync>),
35    #[error("Failed to record operation head {new_op_id}")]
36    Write {
37        new_op_id: OperationId,
38        source: Box<dyn std::error::Error + Send + Sync>,
39    },
40    #[error("Failed to lock operation heads store")]
41    Lock(#[source] Box<dyn std::error::Error + Send + Sync>),
42}
43
44#[derive(Debug, Error)]
45pub enum OpHeadResolutionError {
46    #[error("Operation log has no heads")]
47    NoHeads,
48}
49
50pub trait OpHeadsStoreLock {}
51
52/// Manages the set of current heads of the operation log.
53pub trait OpHeadsStore: Send + Sync + Debug {
54    fn as_any(&self) -> &dyn Any;
55
56    fn name(&self) -> &str;
57
58    /// Remove the old op heads and add the new one.
59    ///
60    /// The old op heads must not contain the new one.
61    fn update_op_heads(
62        &self,
63        old_ids: &[OperationId],
64        new_id: &OperationId,
65    ) -> Result<(), OpHeadsStoreError>;
66
67    fn get_op_heads(&self) -> Result<Vec<OperationId>, OpHeadsStoreError>;
68
69    /// Optionally takes a lock on the op heads store. The purpose of the lock
70    /// is to prevent concurrent processes from resolving the same divergent
71    /// operations. It is not needed for correctness; implementations are free
72    /// to return a type that doesn't hold a lock.
73    fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError>;
74}
75
76// Given an OpHeadsStore, fetch and resolve its op heads down to one under a
77// lock.
78//
79// This routine is defined outside the trait because it must support generics.
80pub fn resolve_op_heads<E>(
81    op_heads_store: &dyn OpHeadsStore,
82    op_store: &Arc<dyn OpStore>,
83    resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
84) -> Result<Operation, E>
85where
86    E: From<OpHeadResolutionError> + From<OpHeadsStoreError> + From<OpStoreError>,
87{
88    // This can be empty if the OpHeadsStore doesn't support atomic updates.
89    // For example, all entries ahead of a readdir() pointer could be deleted by
90    // another concurrent process.
91    let mut op_heads = op_heads_store.get_op_heads()?;
92
93    if op_heads.len() == 1 {
94        let operation_id = op_heads.pop().unwrap();
95        let operation = op_store.read_operation(&operation_id)?;
96        return Ok(Operation::new(op_store.clone(), operation_id, operation));
97    }
98
99    // There are no/multiple heads. We take a lock, then check if there are
100    // still no/multiple heads (it's likely that another process was in the
101    // process of deleting on of them). If there are still multiple heads, we
102    // attempt to merge all the views into one. We then write that view and a
103    // corresponding operation to the op-store.
104    // Note that the locking isn't necessary for correctness of merge; we take
105    // the lock only to prevent other concurrent processes from doing the same
106    // work (and producing another set of divergent heads).
107    let _lock = op_heads_store.lock()?;
108    let op_head_ids = op_heads_store.get_op_heads()?;
109
110    if op_head_ids.is_empty() {
111        return Err(OpHeadResolutionError::NoHeads.into());
112    }
113
114    if op_head_ids.len() == 1 {
115        let op_head_id = op_head_ids[0].clone();
116        let op_head = op_store.read_operation(&op_head_id)?;
117        return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
118    }
119
120    let op_heads: Vec<_> = op_head_ids
121        .iter()
122        .map(|op_id: &OperationId| -> Result<Operation, OpStoreError> {
123            let data = op_store.read_operation(op_id)?;
124            Ok(Operation::new(op_store.clone(), op_id.clone(), data))
125        })
126        .try_collect()?;
127    // Remove ancestors so we don't create merge operation with an operation and its
128    // ancestor
129    let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
130    let filtered_op_heads = dag_walk::heads_ok(
131        op_heads.into_iter().map(Ok),
132        |op: &Operation| op.id().clone(),
133        |op: &Operation| op.parents().collect_vec(),
134    )?;
135    let op_head_ids_after: HashSet<_> =
136        filtered_op_heads.iter().map(|op| op.id().clone()).collect();
137    let ancestor_op_heads = op_head_ids_before
138        .difference(&op_head_ids_after)
139        .cloned()
140        .collect_vec();
141    let mut op_heads = filtered_op_heads.into_iter().collect_vec();
142
143    // Return without creating a merge operation
144    if let [op_head] = &*op_heads {
145        op_heads_store.update_op_heads(&ancestor_op_heads, op_head.id())?;
146        return Ok(op_head.clone());
147    }
148
149    op_heads.sort_by_key(|op| op.metadata().end_time.timestamp);
150    let new_op = resolver(op_heads)?;
151    let mut old_op_heads = ancestor_op_heads;
152    old_op_heads.extend_from_slice(new_op.parent_ids());
153    op_heads_store.update_op_heads(&old_op_heads, new_op.id())?;
154    Ok(new_op)
155}