1#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use itertools::Itertools as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexStoreError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::op_store::TimestampRange;
32use crate::operation::Operation;
33use crate::repo::MutableRepo;
34use crate::repo::ReadonlyRepo;
35use crate::repo::Repo as _;
36use crate::repo::RepoLoader;
37use crate::repo::RepoLoaderError;
38use crate::settings::UserSettings;
39use crate::view::View;
40
41#[derive(Debug, Error)]
43#[error("Failed to commit new operation")]
44pub enum TransactionCommitError {
45 IndexStore(#[from] IndexStoreError),
46 OpHeadsStore(#[from] OpHeadsStoreError),
47 OpStore(#[from] OpStoreError),
48}
49
50pub struct Transaction {
62 mut_repo: MutableRepo,
63 parent_ops: Vec<Operation>,
64 op_metadata: OperationMetadata,
65 end_time: Option<Timestamp>,
66}
67
68impl Transaction {
69 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
70 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
71 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
72 let end_time = user_settings.operation_timestamp();
73 Self {
74 mut_repo,
75 parent_ops,
76 op_metadata,
77 end_time,
78 }
79 }
80
81 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
82 self.mut_repo.base_repo()
83 }
84
85 pub fn set_tag(&mut self, key: String, value: String) {
86 self.op_metadata.tags.insert(key, value);
87 }
88
89 pub fn repo(&self) -> &MutableRepo {
90 &self.mut_repo
91 }
92
93 pub fn repo_mut(&mut self) -> &mut MutableRepo {
94 &mut self.mut_repo
95 }
96
97 pub async fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
98 let ancestor_op = dag_walk::closest_common_node_ok(
99 self.parent_ops.iter().cloned().map(Ok),
100 [Ok(other_op.clone())],
101 |op: &Operation| op.id().clone(),
102 |op: &Operation| op.parents().collect_vec(),
103 )?
104 .unwrap();
105 let repo_loader = self.base_repo().loader();
106 let base_repo = repo_loader.load_at(&ancestor_op).await?;
107 let other_repo = repo_loader.load_at(&other_op).await?;
108 self.parent_ops.push(other_op);
109 let merged_repo = self.repo_mut();
110 merged_repo.merge(&base_repo, &other_repo).await?;
111 Ok(())
112 }
113
114 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
115 self.op_metadata.is_snapshot = is_snapshot;
116 }
117
118 pub async fn commit(
120 self,
121 description: impl Into<String>,
122 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
123 self.write(description).await?.publish().await
124 }
125
126 pub async fn write(
130 mut self,
131 description: impl Into<String>,
132 ) -> Result<UnpublishedOperation, TransactionCommitError> {
133 let mut_repo = self.mut_repo;
134 assert!(
136 !mut_repo.has_rewrites(),
137 "BUG: Descendants have not been rebased after the last rewrites."
138 );
139 let base_repo = mut_repo.base_repo().clone();
140 let (mut_index, view, predecessors) = mut_repo.consume();
141
142 let operation = {
143 let view_id = base_repo.op_store().write_view(view.store_view()).await?;
144 self.op_metadata.description = description.into();
145 self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
146 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
147 let store_operation = op_store::Operation {
148 view_id,
149 parents,
150 metadata: self.op_metadata,
151 commit_predecessors: Some(predecessors),
152 };
153 let new_op_id = base_repo
154 .op_store()
155 .write_operation(&store_operation)
156 .await?;
157 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
158 };
159
160 let index = base_repo.index_store().write_index(mut_index, &operation)?;
161 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
162 Ok(unpublished)
163 }
164}
165
166pub fn create_op_metadata(
167 user_settings: &UserSettings,
168 description: String,
169 is_snapshot: bool,
170) -> OperationMetadata {
171 let timestamp = user_settings
172 .operation_timestamp()
173 .unwrap_or_else(Timestamp::now);
174 let hostname = user_settings.operation_hostname().to_owned();
175 let username = user_settings.operation_username().to_owned();
176 OperationMetadata {
177 time: TimestampRange {
178 start: timestamp,
179 end: timestamp,
180 },
181 description,
182 hostname,
183 username,
184 is_snapshot,
185 tags: Default::default(),
186 }
187}
188
189#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
198pub struct UnpublishedOperation {
199 op_heads_store: Arc<dyn OpHeadsStore>,
200 repo: Arc<ReadonlyRepo>,
201}
202
203impl UnpublishedOperation {
204 fn new(
205 repo_loader: &RepoLoader,
206 operation: Operation,
207 view: View,
208 index: Box<dyn ReadonlyIndex>,
209 ) -> Self {
210 Self {
211 op_heads_store: repo_loader.op_heads_store().clone(),
212 repo: repo_loader.create_from(operation, view, index),
213 }
214 }
215
216 pub fn operation(&self) -> &Operation {
217 self.repo.operation()
218 }
219
220 pub async fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
221 let _lock = self.op_heads_store.lock().await?;
222 self.op_heads_store
223 .update_op_heads(self.operation().parent_ids(), self.operation().id())
224 .await?;
225 Ok(self.repo)
226 }
227
228 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
229 self.repo
230 }
231}