jj_lib/
transaction.rs

1// Copyright 2020 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(missing_docs)]
16
17use std::sync::Arc;
18
19use itertools::Itertools as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexWriteError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::operation::Operation;
32use crate::repo::MutableRepo;
33use crate::repo::ReadonlyRepo;
34use crate::repo::Repo as _;
35use crate::repo::RepoLoader;
36use crate::repo::RepoLoaderError;
37use crate::settings::UserSettings;
38use crate::view::View;
39
40/// Error from attempts to write and publish transaction.
41#[derive(Debug, Error)]
42#[error("Failed to commit new operation")]
43pub enum TransactionCommitError {
44    IndexWrite(#[from] IndexWriteError),
45    OpHeadsStore(#[from] OpHeadsStoreError),
46    OpStore(#[from] OpStoreError),
47}
48
49/// An in-memory representation of a repo and any changes being made to it.
50///
51/// Within the scope of a transaction, changes to the repository are made
52/// in-memory to `mut_repo` and published to the repo backend when
53/// [`Transaction::commit`] is called. When a transaction is committed, it
54/// becomes atomically visible as an Operation in the op log that represents the
55/// transaction itself, and as a View that represents the state of the repo
56/// after the transaction. This is similar to how a Commit represents a change
57/// to the contents of the repository and a Tree represents the repository's
58/// contents after the change. See the documentation for [`op_store::Operation`]
59/// and [`op_store::View`] for more information.
60pub struct Transaction {
61    mut_repo: MutableRepo,
62    parent_ops: Vec<Operation>,
63    op_metadata: OperationMetadata,
64    end_time: Option<Timestamp>,
65}
66
67impl Transaction {
68    pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Transaction {
69        let parent_ops = vec![mut_repo.base_repo().operation().clone()];
70        let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
71        let end_time = user_settings.operation_timestamp();
72        Transaction {
73            mut_repo,
74            parent_ops,
75            op_metadata,
76            end_time,
77        }
78    }
79
80    pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
81        self.mut_repo.base_repo()
82    }
83
84    pub fn set_tag(&mut self, key: String, value: String) {
85        self.op_metadata.tags.insert(key, value);
86    }
87
88    pub fn repo(&self) -> &MutableRepo {
89        &self.mut_repo
90    }
91
92    pub fn repo_mut(&mut self) -> &mut MutableRepo {
93        &mut self.mut_repo
94    }
95
96    pub fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
97        let ancestor_op = dag_walk::closest_common_node_ok(
98            self.parent_ops.iter().cloned().map(Ok),
99            [Ok(other_op.clone())],
100            |op: &Operation| op.id().clone(),
101            |op: &Operation| op.parents().collect_vec(),
102        )?
103        .unwrap();
104        let repo_loader = self.base_repo().loader();
105        let base_repo = repo_loader.load_at(&ancestor_op)?;
106        let other_repo = repo_loader.load_at(&other_op)?;
107        self.parent_ops.push(other_op);
108        let merged_repo = self.repo_mut();
109        merged_repo.merge(&base_repo, &other_repo)?;
110        Ok(())
111    }
112
113    pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
114        self.op_metadata.is_snapshot = is_snapshot;
115    }
116
117    /// Writes the transaction to the operation store and publishes it.
118    pub fn commit(
119        self,
120        description: impl Into<String>,
121    ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
122        self.write(description)?.publish()
123    }
124
125    /// Writes the transaction to the operation store, but does not publish it.
126    /// That means that a repo can be loaded at the operation, but the
127    /// operation will not be seen when loading the repo at head.
128    pub fn write(
129        mut self,
130        description: impl Into<String>,
131    ) -> Result<UnpublishedOperation, TransactionCommitError> {
132        let mut_repo = self.mut_repo;
133        // TODO: Should we instead just do the rebasing here if necessary?
134        assert!(
135            !mut_repo.has_rewrites(),
136            "BUG: Descendants have not been rebased after the last rewrites."
137        );
138        let base_repo = mut_repo.base_repo().clone();
139        let (mut_index, view, predecessors) = mut_repo.consume();
140
141        let operation = {
142            let view_id = base_repo.op_store().write_view(view.store_view())?;
143            self.op_metadata.description = description.into();
144            self.op_metadata.end_time = self.end_time.unwrap_or_else(Timestamp::now);
145            let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
146            let store_operation = op_store::Operation {
147                view_id,
148                parents,
149                metadata: self.op_metadata,
150                commit_predecessors: Some(predecessors),
151            };
152            let new_op_id = base_repo.op_store().write_operation(&store_operation)?;
153            Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
154        };
155
156        let index = base_repo.index_store().write_index(mut_index, &operation)?;
157        let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
158        Ok(unpublished)
159    }
160}
161
162pub fn create_op_metadata(
163    user_settings: &UserSettings,
164    description: String,
165    is_snapshot: bool,
166) -> OperationMetadata {
167    let start_time = user_settings
168        .operation_timestamp()
169        .unwrap_or_else(Timestamp::now);
170    let end_time = start_time;
171    let hostname = user_settings.operation_hostname().to_owned();
172    let username = user_settings.operation_username().to_owned();
173    OperationMetadata {
174        start_time,
175        end_time,
176        description,
177        hostname,
178        username,
179        is_snapshot,
180        tags: Default::default(),
181    }
182}
183
184/// An unpublished operation in the store.
185///
186/// An Operation which has been written to the operation store but not
187/// published. The repo can be loaded at an unpublished Operation, but the
188/// Operation will not be visible in the op log if the repo is loaded at head.
189///
190/// Either [`Self::publish`] or [`Self::leave_unpublished`] must be called to
191/// finish the operation.
192#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
193pub struct UnpublishedOperation {
194    op_heads_store: Arc<dyn OpHeadsStore>,
195    repo: Arc<ReadonlyRepo>,
196}
197
198impl UnpublishedOperation {
199    fn new(
200        repo_loader: &RepoLoader,
201        operation: Operation,
202        view: View,
203        index: Box<dyn ReadonlyIndex>,
204    ) -> Self {
205        UnpublishedOperation {
206            op_heads_store: repo_loader.op_heads_store().clone(),
207            repo: repo_loader.create_from(operation, view, index),
208        }
209    }
210
211    pub fn operation(&self) -> &Operation {
212        self.repo.operation()
213    }
214
215    pub fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
216        let _lock = self.op_heads_store.lock()?;
217        self.op_heads_store
218            .update_op_heads(self.operation().parent_ids(), self.operation().id())?;
219        Ok(self.repo)
220    }
221
222    pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
223        self.repo
224    }
225}