1#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use thiserror::Error;
20
21use crate::backend::Timestamp;
22use crate::index::IndexStoreError;
23use crate::index::ReadonlyIndex;
24use crate::op_heads_store::OpHeadsStore;
25use crate::op_heads_store::OpHeadsStoreError;
26use crate::op_store;
27use crate::op_store::OpStoreError;
28use crate::op_store::OperationMetadata;
29use crate::op_store::TimestampRange;
30use crate::op_walk;
31use crate::operation::Operation;
32use crate::ref_name::WorkspaceName;
33use crate::repo::MutableRepo;
34use crate::repo::ReadonlyRepo;
35use crate::repo::Repo as _;
36use crate::repo::RepoLoader;
37use crate::repo::RepoLoaderError;
38use crate::settings::UserSettings;
39use crate::view::View;
40
41#[derive(Debug, Error)]
43#[error("Failed to commit new operation")]
44pub enum TransactionCommitError {
45 IndexStore(#[from] IndexStoreError),
46 OpHeadsStore(#[from] OpHeadsStoreError),
47 OpStore(#[from] OpStoreError),
48}
49
50pub struct Transaction {
62 mut_repo: MutableRepo,
63 parent_ops: Vec<Operation>,
64 op_metadata: OperationMetadata,
65 end_time: Option<Timestamp>,
66}
67
68impl Transaction {
69 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
70 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
71 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
72 let end_time = user_settings.operation_timestamp();
73 Self {
74 mut_repo,
75 parent_ops,
76 op_metadata,
77 end_time,
78 }
79 }
80
81 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
82 self.mut_repo.base_repo()
83 }
84
85 pub fn set_attribute(&mut self, key: String, value: String) {
86 self.op_metadata.attributes.insert(key, value);
87 }
88
89 pub fn repo(&self) -> &MutableRepo {
90 &self.mut_repo
91 }
92
93 pub fn repo_mut(&mut self) -> &mut MutableRepo {
94 &mut self.mut_repo
95 }
96
97 pub async fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
98 let ancestor_ops =
99 op_walk::closest_common_ancestors(self.parent_ops.iter().cloned(), [other_op.clone()])
100 .await?;
101 let repo_loader = self.base_repo().loader();
102 let ancestor_op = Box::pin(repo_loader.merge_operations(ancestor_ops, None)).await?;
103 let base_repo = repo_loader.load_at(&ancestor_op).await?;
104 let other_repo = repo_loader.load_at(&other_op).await?;
105 self.parent_ops.push(other_op);
106 let merged_repo = self.repo_mut();
107 merged_repo.merge(&base_repo, &other_repo).await?;
108 Ok(())
109 }
110
111 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
112 self.op_metadata.is_snapshot = is_snapshot;
113 }
114
115 pub fn set_workspace_name(&mut self, workspace_name: &WorkspaceName) {
116 self.op_metadata.workspace_name = Some(workspace_name.to_owned());
117 }
118
119 pub async fn commit(
121 self,
122 description: impl Into<String>,
123 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
124 self.write(description).await?.publish().await
125 }
126
127 pub async fn write(
131 mut self,
132 description: impl Into<String>,
133 ) -> Result<UnpublishedOperation, TransactionCommitError> {
134 let mut_repo = self.mut_repo;
135 assert!(
137 !mut_repo.has_rewrites(),
138 "BUG: Descendants have not been rebased after the last rewrites."
139 );
140 let base_repo = mut_repo.base_repo().clone();
141 let (mut_index, view, predecessors) = mut_repo.consume();
142
143 let operation = {
144 let view_id = base_repo.op_store().write_view(view.store_view()).await?;
145 self.op_metadata.description = description.into();
146 self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
147 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
148 let store_operation = op_store::Operation {
149 view_id,
150 parents,
151 metadata: self.op_metadata,
152 commit_predecessors: Some(predecessors),
153 };
154 let new_op_id = base_repo
155 .op_store()
156 .write_operation(&store_operation)
157 .await?;
158 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
159 };
160
161 let index = base_repo.index_store().write_index(mut_index, &operation)?;
162 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
163 Ok(unpublished)
164 }
165}
166
167pub fn create_op_metadata(
168 user_settings: &UserSettings,
169 description: String,
170 is_snapshot: bool,
171) -> OperationMetadata {
172 let timestamp = user_settings
173 .operation_timestamp()
174 .unwrap_or_else(Timestamp::now);
175 let hostname = user_settings.operation_hostname().to_owned();
176 let username = user_settings.operation_username().to_owned();
177 OperationMetadata {
178 time: TimestampRange {
179 start: timestamp,
180 end: timestamp,
181 },
182 description,
183 hostname,
184 username,
185 is_snapshot,
186 workspace_name: None,
187 attributes: Default::default(),
188 }
189}
190
191#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
200pub struct UnpublishedOperation {
201 op_heads_store: Arc<dyn OpHeadsStore>,
202 repo: Arc<ReadonlyRepo>,
203}
204
205impl UnpublishedOperation {
206 fn new(
207 repo_loader: &RepoLoader,
208 operation: Operation,
209 view: View,
210 index: Box<dyn ReadonlyIndex>,
211 ) -> Self {
212 Self {
213 op_heads_store: repo_loader.op_heads_store().clone(),
214 repo: repo_loader.create_from(operation, view, index),
215 }
216 }
217
218 pub fn operation(&self) -> &Operation {
219 self.repo.operation()
220 }
221
222 pub async fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
223 let _lock = self.op_heads_store.lock().await?;
224 self.op_heads_store
225 .update_op_heads(self.operation().parent_ids(), self.operation().id())
226 .await?;
227 Ok(self.repo)
228 }
229
230 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
231 self.repo
232 }
233}