1#![expect(missing_docs)]
16
17use std::sync::Arc;
18
19use itertools::Itertools as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexWriteError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::op_store::TimestampRange;
32use crate::operation::Operation;
33use crate::repo::MutableRepo;
34use crate::repo::ReadonlyRepo;
35use crate::repo::Repo as _;
36use crate::repo::RepoLoader;
37use crate::repo::RepoLoaderError;
38use crate::settings::UserSettings;
39use crate::view::View;
40
41#[derive(Debug, Error)]
43#[error("Failed to commit new operation")]
44pub enum TransactionCommitError {
45 IndexWrite(#[from] IndexWriteError),
46 OpHeadsStore(#[from] OpHeadsStoreError),
47 OpStore(#[from] OpStoreError),
48}
49
50pub struct Transaction {
62 mut_repo: MutableRepo,
63 parent_ops: Vec<Operation>,
64 op_metadata: OperationMetadata,
65 end_time: Option<Timestamp>,
66}
67
68impl Transaction {
69 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Self {
70 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
71 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
72 let end_time = user_settings.operation_timestamp();
73 Self {
74 mut_repo,
75 parent_ops,
76 op_metadata,
77 end_time,
78 }
79 }
80
81 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
82 self.mut_repo.base_repo()
83 }
84
85 pub fn set_tag(&mut self, key: String, value: String) {
86 self.op_metadata.tags.insert(key, value);
87 }
88
89 pub fn repo(&self) -> &MutableRepo {
90 &self.mut_repo
91 }
92
93 pub fn repo_mut(&mut self) -> &mut MutableRepo {
94 &mut self.mut_repo
95 }
96
97 pub fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
98 let ancestor_op = dag_walk::closest_common_node_ok(
99 self.parent_ops.iter().cloned().map(Ok),
100 [Ok(other_op.clone())],
101 |op: &Operation| op.id().clone(),
102 |op: &Operation| op.parents().collect_vec(),
103 )?
104 .unwrap();
105 let repo_loader = self.base_repo().loader();
106 let base_repo = repo_loader.load_at(&ancestor_op)?;
107 let other_repo = repo_loader.load_at(&other_op)?;
108 self.parent_ops.push(other_op);
109 let merged_repo = self.repo_mut();
110 merged_repo.merge(&base_repo, &other_repo)?;
111 Ok(())
112 }
113
114 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
115 self.op_metadata.is_snapshot = is_snapshot;
116 }
117
118 pub fn commit(
120 self,
121 description: impl Into<String>,
122 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
123 self.write(description)?.publish()
124 }
125
126 pub fn write(
130 mut self,
131 description: impl Into<String>,
132 ) -> Result<UnpublishedOperation, TransactionCommitError> {
133 let mut_repo = self.mut_repo;
134 assert!(
136 !mut_repo.has_rewrites(),
137 "BUG: Descendants have not been rebased after the last rewrites."
138 );
139 let base_repo = mut_repo.base_repo().clone();
140 let (mut_index, view, predecessors) = mut_repo.consume();
141
142 let operation = {
143 let view_id = base_repo.op_store().write_view(view.store_view())?;
144 self.op_metadata.description = description.into();
145 self.op_metadata.time.end = self.end_time.unwrap_or_else(Timestamp::now);
146 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
147 let store_operation = op_store::Operation {
148 view_id,
149 parents,
150 metadata: self.op_metadata,
151 commit_predecessors: Some(predecessors),
152 };
153 let new_op_id = base_repo.op_store().write_operation(&store_operation)?;
154 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
155 };
156
157 let index = base_repo.index_store().write_index(mut_index, &operation)?;
158 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
159 Ok(unpublished)
160 }
161}
162
163pub fn create_op_metadata(
164 user_settings: &UserSettings,
165 description: String,
166 is_snapshot: bool,
167) -> OperationMetadata {
168 let timestamp = user_settings
169 .operation_timestamp()
170 .unwrap_or_else(Timestamp::now);
171 let hostname = user_settings.operation_hostname().to_owned();
172 let username = user_settings.operation_username().to_owned();
173 OperationMetadata {
174 time: TimestampRange {
175 start: timestamp,
176 end: timestamp,
177 },
178 description,
179 hostname,
180 username,
181 is_snapshot,
182 tags: Default::default(),
183 }
184}
185
186#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
195pub struct UnpublishedOperation {
196 op_heads_store: Arc<dyn OpHeadsStore>,
197 repo: Arc<ReadonlyRepo>,
198}
199
200impl UnpublishedOperation {
201 fn new(
202 repo_loader: &RepoLoader,
203 operation: Operation,
204 view: View,
205 index: Box<dyn ReadonlyIndex>,
206 ) -> Self {
207 Self {
208 op_heads_store: repo_loader.op_heads_store().clone(),
209 repo: repo_loader.create_from(operation, view, index),
210 }
211 }
212
213 pub fn operation(&self) -> &Operation {
214 self.repo.operation()
215 }
216
217 pub fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
218 let _lock = self.op_heads_store.lock()?;
219 self.op_heads_store
220 .update_op_heads(self.operation().parent_ids(), self.operation().id())?;
221 Ok(self.repo)
222 }
223
224 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
225 self.repo
226 }
227}