1#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use pollster::FutureExt as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexStoreError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::op_store::TimestampRange;
32use crate::operation::Operation;
33use crate::ref_name::WorkspaceName;
34use crate::repo::MutableRepo;
35use crate::repo::ReadonlyRepo;
36use crate::repo::Repo as _;
37use crate::repo::RepoLoader;
38use crate::repo::RepoLoaderError;
39use crate::settings::UserSettings;
40use crate::view::View;
41
42#[derive(Debug, Error)]
44#[error("Failed to commit new operation")]
45pub enum TransactionCommitError {
46 IndexStore(#[from] IndexStoreError),
47 OpHeadsStore(#[from] OpHeadsStoreError),
48 OpStore(#[from] OpStoreError),
49}
50
51pub struct Transaction {
63 mut_repo: MutableRepo,
64 parent_ops: Vec<Operation>,
65 op_metadata: OperationMetadata,
66 end_time: Option<Timestamp>,
67}
68
69impl Transaction {
70 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
71 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
72 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
73 let end_time = user_settings.operation_timestamp();
74 Self {
75 mut_repo,
76 parent_ops,
77 op_metadata,
78 end_time,
79 }
80 }
81
82 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
83 self.mut_repo.base_repo()
84 }
85
86 pub fn set_tag(&mut self, key: String, value: String) {
87 self.op_metadata.tags.insert(key, value);
88 }
89
90 pub fn repo(&self) -> &MutableRepo {
91 &self.mut_repo
92 }
93
94 pub fn repo_mut(&mut self) -> &mut MutableRepo {
95 &mut self.mut_repo
96 }
97
98 pub async fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
99 let ancestor_op = dag_walk::closest_common_node_ok(
100 self.parent_ops.iter().cloned().map(Ok),
101 [Ok(other_op.clone())],
102 |op: &Operation| op.id().clone(),
103 |op: &Operation| op.parents().block_on(),
104 )?
105 .unwrap();
106 let repo_loader = self.base_repo().loader();
107 let base_repo = repo_loader.load_at(&ancestor_op).await?;
108 let other_repo = repo_loader.load_at(&other_op).await?;
109 self.parent_ops.push(other_op);
110 let merged_repo = self.repo_mut();
111 merged_repo.merge(&base_repo, &other_repo).await?;
112 Ok(())
113 }
114
115 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
116 self.op_metadata.is_snapshot = is_snapshot;
117 }
118
119 pub fn set_workspace_name(&mut self, workspace_name: &WorkspaceName) {
120 self.op_metadata.workspace_name = Some(workspace_name.to_owned());
121 }
122
123 pub async fn commit(
125 self,
126 description: impl Into<String>,
127 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
128 self.write(description).await?.publish().await
129 }
130
131 pub async fn write(
135 mut self,
136 description: impl Into<String>,
137 ) -> Result<UnpublishedOperation, TransactionCommitError> {
138 let mut_repo = self.mut_repo;
139 assert!(
141 !mut_repo.has_rewrites(),
142 "BUG: Descendants have not been rebased after the last rewrites."
143 );
144 let base_repo = mut_repo.base_repo().clone();
145 let (mut_index, view, predecessors) = mut_repo.consume();
146
147 let operation = {
148 let view_id = base_repo.op_store().write_view(view.store_view()).await?;
149 self.op_metadata.description = description.into();
150 self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
151 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
152 let store_operation = op_store::Operation {
153 view_id,
154 parents,
155 metadata: self.op_metadata,
156 commit_predecessors: Some(predecessors),
157 };
158 let new_op_id = base_repo
159 .op_store()
160 .write_operation(&store_operation)
161 .await?;
162 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
163 };
164
165 let index = base_repo.index_store().write_index(mut_index, &operation)?;
166 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
167 Ok(unpublished)
168 }
169}
170
171pub fn create_op_metadata(
172 user_settings: &UserSettings,
173 description: String,
174 is_snapshot: bool,
175) -> OperationMetadata {
176 let timestamp = user_settings
177 .operation_timestamp()
178 .unwrap_or_else(Timestamp::now);
179 let hostname = user_settings.operation_hostname().to_owned();
180 let username = user_settings.operation_username().to_owned();
181 OperationMetadata {
182 time: TimestampRange {
183 start: timestamp,
184 end: timestamp,
185 },
186 description,
187 hostname,
188 username,
189 is_snapshot,
190 workspace_name: None,
191 tags: Default::default(),
192 }
193}
194
195#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
204pub struct UnpublishedOperation {
205 op_heads_store: Arc<dyn OpHeadsStore>,
206 repo: Arc<ReadonlyRepo>,
207}
208
209impl UnpublishedOperation {
210 fn new(
211 repo_loader: &RepoLoader,
212 operation: Operation,
213 view: View,
214 index: Box<dyn ReadonlyIndex>,
215 ) -> Self {
216 Self {
217 op_heads_store: repo_loader.op_heads_store().clone(),
218 repo: repo_loader.create_from(operation, view, index),
219 }
220 }
221
222 pub fn operation(&self) -> &Operation {
223 self.repo.operation()
224 }
225
226 pub async fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
227 let _lock = self.op_heads_store.lock().await?;
228 self.op_heads_store
229 .update_op_heads(self.operation().parent_ids(), self.operation().id())
230 .await?;
231 Ok(self.repo)
232 }
233
234 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
235 self.repo
236 }
237}