1#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use itertools::Itertools as _;
20use pollster::FutureExt as _;
21use thiserror::Error;
22
23use crate::backend::Timestamp;
24use crate::dag_walk;
25use crate::index::IndexStoreError;
26use crate::index::ReadonlyIndex;
27use crate::op_heads_store::OpHeadsStore;
28use crate::op_heads_store::OpHeadsStoreError;
29use crate::op_store;
30use crate::op_store::OpStoreError;
31use crate::op_store::OperationMetadata;
32use crate::op_store::TimestampRange;
33use crate::operation::Operation;
34use crate::repo::MutableRepo;
35use crate::repo::ReadonlyRepo;
36use crate::repo::Repo as _;
37use crate::repo::RepoLoader;
38use crate::repo::RepoLoaderError;
39use crate::settings::UserSettings;
40use crate::view::View;
41
42#[derive(Debug, Error)]
44#[error("Failed to commit new operation")]
45pub enum TransactionCommitError {
46 IndexStore(#[from] IndexStoreError),
47 OpHeadsStore(#[from] OpHeadsStoreError),
48 OpStore(#[from] OpStoreError),
49}
50
51pub struct Transaction {
63 mut_repo: MutableRepo,
64 parent_ops: Vec<Operation>,
65 op_metadata: OperationMetadata,
66 end_time: Option<Timestamp>,
67}
68
69impl Transaction {
70 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
71 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
72 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
73 let end_time = user_settings.operation_timestamp();
74 Self {
75 mut_repo,
76 parent_ops,
77 op_metadata,
78 end_time,
79 }
80 }
81
82 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
83 self.mut_repo.base_repo()
84 }
85
86 pub fn set_tag(&mut self, key: String, value: String) {
87 self.op_metadata.tags.insert(key, value);
88 }
89
90 pub fn repo(&self) -> &MutableRepo {
91 &self.mut_repo
92 }
93
94 pub fn repo_mut(&mut self) -> &mut MutableRepo {
95 &mut self.mut_repo
96 }
97
98 pub fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
99 let ancestor_op = dag_walk::closest_common_node_ok(
100 self.parent_ops.iter().cloned().map(Ok),
101 [Ok(other_op.clone())],
102 |op: &Operation| op.id().clone(),
103 |op: &Operation| op.parents().collect_vec(),
104 )?
105 .unwrap();
106 let repo_loader = self.base_repo().loader();
107 let base_repo = repo_loader.load_at(&ancestor_op)?;
108 let other_repo = repo_loader.load_at(&other_op)?;
109 self.parent_ops.push(other_op);
110 let merged_repo = self.repo_mut();
111 merged_repo.merge(&base_repo, &other_repo)?;
112 Ok(())
113 }
114
115 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
116 self.op_metadata.is_snapshot = is_snapshot;
117 }
118
119 pub fn commit(
121 self,
122 description: impl Into<String>,
123 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
124 self.write(description)?.publish()
125 }
126
127 pub fn write(
131 mut self,
132 description: impl Into<String>,
133 ) -> Result<UnpublishedOperation, TransactionCommitError> {
134 let mut_repo = self.mut_repo;
135 assert!(
137 !mut_repo.has_rewrites(),
138 "BUG: Descendants have not been rebased after the last rewrites."
139 );
140 let base_repo = mut_repo.base_repo().clone();
141 let (mut_index, view, predecessors) = mut_repo.consume();
142
143 let operation = {
144 let view_id = base_repo
145 .op_store()
146 .write_view(view.store_view())
147 .block_on()?;
148 self.op_metadata.description = description.into();
149 self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
150 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
151 let store_operation = op_store::Operation {
152 view_id,
153 parents,
154 metadata: self.op_metadata,
155 commit_predecessors: Some(predecessors),
156 };
157 let new_op_id = base_repo
158 .op_store()
159 .write_operation(&store_operation)
160 .block_on()?;
161 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
162 };
163
164 let index = base_repo.index_store().write_index(mut_index, &operation)?;
165 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
166 Ok(unpublished)
167 }
168}
169
170pub fn create_op_metadata(
171 user_settings: &UserSettings,
172 description: String,
173 is_snapshot: bool,
174) -> OperationMetadata {
175 let timestamp = user_settings
176 .operation_timestamp()
177 .unwrap_or_else(Timestamp::now);
178 let hostname = user_settings.operation_hostname().to_owned();
179 let username = user_settings.operation_username().to_owned();
180 OperationMetadata {
181 time: TimestampRange {
182 start: timestamp,
183 end: timestamp,
184 },
185 description,
186 hostname,
187 username,
188 is_snapshot,
189 tags: Default::default(),
190 }
191}
192
193#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
202pub struct UnpublishedOperation {
203 op_heads_store: Arc<dyn OpHeadsStore>,
204 repo: Arc<ReadonlyRepo>,
205}
206
207impl UnpublishedOperation {
208 fn new(
209 repo_loader: &RepoLoader,
210 operation: Operation,
211 view: View,
212 index: Box<dyn ReadonlyIndex>,
213 ) -> Self {
214 Self {
215 op_heads_store: repo_loader.op_heads_store().clone(),
216 repo: repo_loader.create_from(operation, view, index),
217 }
218 }
219
220 pub fn operation(&self) -> &Operation {
221 self.repo.operation()
222 }
223
224 pub fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
225 let _lock = self.op_heads_store.lock().block_on()?;
226 self.op_heads_store
227 .update_op_heads(self.operation().parent_ids(), self.operation().id())
228 .block_on()?;
229 Ok(self.repo)
230 }
231
232 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
233 self.repo
234 }
235}