Skip to main content

jj_lib/
transaction.rs

1// Copyright 2020 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use pollster::FutureExt as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexStoreError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::op_store::TimestampRange;
32use crate::operation::Operation;
33use crate::ref_name::WorkspaceName;
34use crate::repo::MutableRepo;
35use crate::repo::ReadonlyRepo;
36use crate::repo::Repo as _;
37use crate::repo::RepoLoader;
38use crate::repo::RepoLoaderError;
39use crate::settings::UserSettings;
40use crate::view::View;
41
42/// Error from attempts to write and publish transaction.
43#[derive(Debug, Error)]
44#[error("Failed to commit new operation")]
45pub enum TransactionCommitError {
46    IndexStore(#[from] IndexStoreError),
47    OpHeadsStore(#[from] OpHeadsStoreError),
48    OpStore(#[from] OpStoreError),
49}
50
51/// An in-memory representation of a repo and any changes being made to it.
52///
53/// Within the scope of a transaction, changes to the repository are made
54/// in-memory to `mut_repo` and published to the repo backend when
55/// [`Transaction::commit`] is called. When a transaction is committed, it
56/// becomes atomically visible as an Operation in the op log that represents the
57/// transaction itself, and as a View that represents the state of the repo
58/// after the transaction. This is similar to how a Commit represents a change
59/// to the contents of the repository and a Tree represents the repository's
60/// contents after the change. See the documentation for [`op_store::Operation`]
61/// and [`op_store::View`] for more information.
62pub struct Transaction {
63    mut_repo: MutableRepo,
64    parent_ops: Vec<Operation>,
65    op_metadata: OperationMetadata,
66    end_time: Option<Timestamp>,
67}
68
69impl Transaction {
70    pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
71        let parent_ops = vec![mut_repo.base_repo().operation().clone()];
72        let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
73        let end_time = user_settings.operation_timestamp();
74        Self {
75            mut_repo,
76            parent_ops,
77            op_metadata,
78            end_time,
79        }
80    }
81
82    pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
83        self.mut_repo.base_repo()
84    }
85
86    pub fn set_tag(&mut self, key: String, value: String) {
87        self.op_metadata.tags.insert(key, value);
88    }
89
90    pub fn repo(&self) -> &MutableRepo {
91        &self.mut_repo
92    }
93
94    pub fn repo_mut(&mut self) -> &mut MutableRepo {
95        &mut self.mut_repo
96    }
97
98    pub async fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
99        let ancestor_op = dag_walk::closest_common_node_ok(
100            self.parent_ops.iter().cloned().map(Ok),
101            [Ok(other_op.clone())],
102            |op: &Operation| op.id().clone(),
103            |op: &Operation| op.parents().block_on(),
104        )?
105        .unwrap();
106        let repo_loader = self.base_repo().loader();
107        let base_repo = repo_loader.load_at(&ancestor_op).await?;
108        let other_repo = repo_loader.load_at(&other_op).await?;
109        self.parent_ops.push(other_op);
110        let merged_repo = self.repo_mut();
111        merged_repo.merge(&base_repo, &other_repo).await?;
112        Ok(())
113    }
114
115    pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
116        self.op_metadata.is_snapshot = is_snapshot;
117    }
118
119    pub fn set_workspace_name(&mut self, workspace_name: &WorkspaceName) {
120        self.op_metadata.workspace_name = Some(workspace_name.to_owned());
121    }
122
123    /// Writes the transaction to the operation store and publishes it.
124    pub async fn commit(
125        self,
126        description: impl Into<String>,
127    ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
128        self.write(description).await?.publish().await
129    }
130
131    /// Writes the transaction to the operation store, but does not publish it.
132    /// That means that a repo can be loaded at the operation, but the
133    /// operation will not be seen when loading the repo at head.
134    pub async fn write(
135        mut self,
136        description: impl Into<String>,
137    ) -> Result<UnpublishedOperation, TransactionCommitError> {
138        let mut_repo = self.mut_repo;
139        // TODO: Should we instead just do the rebasing here if necessary?
140        assert!(
141            !mut_repo.has_rewrites(),
142            "BUG: Descendants have not been rebased after the last rewrites."
143        );
144        let base_repo = mut_repo.base_repo().clone();
145        let (mut_index, view, predecessors) = mut_repo.consume();
146
147        let operation = {
148            let view_id = base_repo.op_store().write_view(view.store_view()).await?;
149            self.op_metadata.description = description.into();
150            self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
151            let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
152            let store_operation = op_store::Operation {
153                view_id,
154                parents,
155                metadata: self.op_metadata,
156                commit_predecessors: Some(predecessors),
157            };
158            let new_op_id = base_repo
159                .op_store()
160                .write_operation(&store_operation)
161                .await?;
162            Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
163        };
164
165        let index = base_repo.index_store().write_index(mut_index, &operation)?;
166        let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
167        Ok(unpublished)
168    }
169}
170
171pub fn create_op_metadata(
172    user_settings: &UserSettings,
173    description: String,
174    is_snapshot: bool,
175) -> OperationMetadata {
176    let timestamp = user_settings
177        .operation_timestamp()
178        .unwrap_or_else(Timestamp::now);
179    let hostname = user_settings.operation_hostname().to_owned();
180    let username = user_settings.operation_username().to_owned();
181    OperationMetadata {
182        time: TimestampRange {
183            start: timestamp,
184            end: timestamp,
185        },
186        description,
187        hostname,
188        username,
189        is_snapshot,
190        workspace_name: None,
191        tags: Default::default(),
192    }
193}
194
195/// An unpublished operation in the store.
196///
197/// An Operation which has been written to the operation store but not
198/// published. The repo can be loaded at an unpublished Operation, but the
199/// Operation will not be visible in the op log if the repo is loaded at head.
200///
201/// Either [`Self::publish`] or [`Self::leave_unpublished`] must be called to
202/// finish the operation.
203#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
204pub struct UnpublishedOperation {
205    op_heads_store: Arc<dyn OpHeadsStore>,
206    repo: Arc<ReadonlyRepo>,
207}
208
209impl UnpublishedOperation {
210    fn new(
211        repo_loader: &RepoLoader,
212        operation: Operation,
213        view: View,
214        index: Box<dyn ReadonlyIndex>,
215    ) -> Self {
216        Self {
217            op_heads_store: repo_loader.op_heads_store().clone(),
218            repo: repo_loader.create_from(operation, view, index),
219        }
220    }
221
222    pub fn operation(&self) -> &Operation {
223        self.repo.operation()
224    }
225
226    pub async fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
227        let _lock = self.op_heads_store.lock().await?;
228        self.op_heads_store
229            .update_op_heads(self.operation().parent_ids(), self.operation().id())
230            .await?;
231        Ok(self.repo)
232    }
233
234    pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
235        self.repo
236    }
237}