jj_lib/
op_heads_store.rs

1// Copyright 2021 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use itertools::Itertools as _;
24use pollster::FutureExt as _;
25use thiserror::Error;
26
27use crate::dag_walk;
28use crate::op_store::OpStore;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationId;
31use crate::operation::Operation;
32
33#[derive(Debug, Error)]
34pub enum OpHeadsStoreError {
35    #[error("Failed to read operation heads")]
36    Read(#[source] Box<dyn std::error::Error + Send + Sync>),
37    #[error("Failed to record operation head {new_op_id}")]
38    Write {
39        new_op_id: OperationId,
40        source: Box<dyn std::error::Error + Send + Sync>,
41    },
42    #[error("Failed to lock operation heads store")]
43    Lock(#[source] Box<dyn std::error::Error + Send + Sync>),
44}
45
46#[derive(Debug, Error)]
47pub enum OpHeadResolutionError {
48    #[error("Operation log has no heads")]
49    NoHeads,
50}
51
52pub trait OpHeadsStoreLock {}
53
54/// Manages the set of current heads of the operation log.
55#[async_trait]
56pub trait OpHeadsStore: Any + Send + Sync + Debug {
57    fn name(&self) -> &str;
58
59    /// Remove the old op heads and add the new one.
60    ///
61    /// The old op heads must not contain the new one.
62    async fn update_op_heads(
63        &self,
64        old_ids: &[OperationId],
65        new_id: &OperationId,
66    ) -> Result<(), OpHeadsStoreError>;
67
68    async fn get_op_heads(&self) -> Result<Vec<OperationId>, OpHeadsStoreError>;
69
70    /// Optionally takes a lock on the op heads store. The purpose of the lock
71    /// is to prevent concurrent processes from resolving the same divergent
72    /// operations. It is not needed for correctness; implementations are free
73    /// to return a type that doesn't hold a lock.
74    async fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError>;
75}
76
77impl dyn OpHeadsStore {
78    /// Returns reference of the implementation type.
79    pub fn downcast_ref<T: OpHeadsStore>(&self) -> Option<&T> {
80        (self as &dyn Any).downcast_ref()
81    }
82}
83
84// Given an OpHeadsStore, fetch and resolve its op heads down to one under a
85// lock.
86//
87// This routine is defined outside the trait because it must support generics.
88pub fn resolve_op_heads<E>(
89    op_heads_store: &dyn OpHeadsStore,
90    op_store: &Arc<dyn OpStore>,
91    resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
92) -> Result<Operation, E>
93where
94    E: From<OpHeadResolutionError> + From<OpHeadsStoreError> + From<OpStoreError>,
95{
96    // This can be empty if the OpHeadsStore doesn't support atomic updates.
97    // For example, all entries ahead of a readdir() pointer could be deleted by
98    // another concurrent process.
99    let mut op_heads = op_heads_store.get_op_heads().block_on()?;
100
101    if op_heads.len() == 1 {
102        let operation_id = op_heads.pop().unwrap();
103        let operation = op_store.read_operation(&operation_id).block_on()?;
104        return Ok(Operation::new(op_store.clone(), operation_id, operation));
105    }
106
107    // There are no/multiple heads. We take a lock, then check if there are
108    // still no/multiple heads (it's likely that another process was in the
109    // process of deleting on of them). If there are still multiple heads, we
110    // attempt to merge all the views into one. We then write that view and a
111    // corresponding operation to the op-store.
112    // Note that the locking isn't necessary for correctness of merge; we take
113    // the lock only to prevent other concurrent processes from doing the same
114    // work (and producing another set of divergent heads).
115    let _lock = op_heads_store.lock().block_on()?;
116    let op_head_ids = op_heads_store.get_op_heads().block_on()?;
117
118    if op_head_ids.is_empty() {
119        return Err(OpHeadResolutionError::NoHeads.into());
120    }
121
122    if op_head_ids.len() == 1 {
123        let op_head_id = op_head_ids[0].clone();
124        let op_head = op_store.read_operation(&op_head_id).block_on()?;
125        return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
126    }
127
128    let op_heads: Vec<_> = op_head_ids
129        .iter()
130        .map(|op_id: &OperationId| -> Result<Operation, OpStoreError> {
131            let data = op_store.read_operation(op_id).block_on()?;
132            Ok(Operation::new(op_store.clone(), op_id.clone(), data))
133        })
134        .try_collect()?;
135    // Remove ancestors so we don't create merge operation with an operation and its
136    // ancestor
137    let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
138    let filtered_op_heads = dag_walk::heads_ok(
139        op_heads.into_iter().map(Ok),
140        |op: &Operation| op.id().clone(),
141        |op: &Operation| op.parents().collect_vec(),
142    )?;
143    let op_head_ids_after: HashSet<_> =
144        filtered_op_heads.iter().map(|op| op.id().clone()).collect();
145    let ancestor_op_heads = op_head_ids_before
146        .difference(&op_head_ids_after)
147        .cloned()
148        .collect_vec();
149    let mut op_heads = filtered_op_heads.into_iter().collect_vec();
150
151    // Return without creating a merge operation
152    if let [op_head] = &*op_heads {
153        op_heads_store
154            .update_op_heads(&ancestor_op_heads, op_head.id())
155            .block_on()?;
156        return Ok(op_head.clone());
157    }
158
159    op_heads.sort_by_key(|op| op.metadata().time.end.timestamp);
160    let new_op = resolver(op_heads)?;
161    let mut old_op_heads = ancestor_op_heads;
162    old_op_heads.extend_from_slice(new_op.parent_ids());
163    op_heads_store
164        .update_op_heads(&old_op_heads, new_op.id())
165        .block_on()?;
166    Ok(new_op)
167}