1#![allow(missing_docs)]
16
17use std::sync::Arc;
18
19use itertools::Itertools as _;
20use thiserror::Error;
21
22use crate::backend::Timestamp;
23use crate::dag_walk;
24use crate::index::IndexWriteError;
25use crate::index::ReadonlyIndex;
26use crate::op_heads_store::OpHeadsStore;
27use crate::op_heads_store::OpHeadsStoreError;
28use crate::op_store;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationMetadata;
31use crate::operation::Operation;
32use crate::repo::MutableRepo;
33use crate::repo::ReadonlyRepo;
34use crate::repo::Repo as _;
35use crate::repo::RepoLoader;
36use crate::repo::RepoLoaderError;
37use crate::settings::UserSettings;
38use crate::view::View;
39
40#[derive(Debug, Error)]
42#[error("Failed to commit new operation")]
43pub enum TransactionCommitError {
44 IndexWrite(#[from] IndexWriteError),
45 OpHeadsStore(#[from] OpHeadsStoreError),
46 OpStore(#[from] OpStoreError),
47}
48
49pub struct Transaction {
61 mut_repo: MutableRepo,
62 parent_ops: Vec<Operation>,
63 op_metadata: OperationMetadata,
64 end_time: Option<Timestamp>,
65}
66
67impl Transaction {
68 pub fn new(mut_repo: MutableRepo, user_settings: &UserSettings) -> Transaction {
69 let parent_ops = vec![mut_repo.base_repo().operation().clone()];
70 let op_metadata = create_op_metadata(user_settings, "".to_string(), false);
71 let end_time = user_settings.operation_timestamp();
72 Transaction {
73 mut_repo,
74 parent_ops,
75 op_metadata,
76 end_time,
77 }
78 }
79
80 pub fn base_repo(&self) -> &Arc<ReadonlyRepo> {
81 self.mut_repo.base_repo()
82 }
83
84 pub fn set_tag(&mut self, key: String, value: String) {
85 self.op_metadata.tags.insert(key, value);
86 }
87
88 pub fn repo(&self) -> &MutableRepo {
89 &self.mut_repo
90 }
91
92 pub fn repo_mut(&mut self) -> &mut MutableRepo {
93 &mut self.mut_repo
94 }
95
96 pub fn merge_operation(&mut self, other_op: Operation) -> Result<(), RepoLoaderError> {
97 let ancestor_op = dag_walk::closest_common_node_ok(
98 self.parent_ops.iter().cloned().map(Ok),
99 [Ok(other_op.clone())],
100 |op: &Operation| op.id().clone(),
101 |op: &Operation| op.parents().collect_vec(),
102 )?
103 .unwrap();
104 let repo_loader = self.base_repo().loader();
105 let base_repo = repo_loader.load_at(&ancestor_op)?;
106 let other_repo = repo_loader.load_at(&other_op)?;
107 self.parent_ops.push(other_op);
108 let merged_repo = self.repo_mut();
109 merged_repo.merge(&base_repo, &other_repo)?;
110 Ok(())
111 }
112
113 pub fn set_is_snapshot(&mut self, is_snapshot: bool) {
114 self.op_metadata.is_snapshot = is_snapshot;
115 }
116
117 pub fn commit(
119 self,
120 description: impl Into<String>,
121 ) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
122 self.write(description)?.publish()
123 }
124
125 pub fn write(
129 mut self,
130 description: impl Into<String>,
131 ) -> Result<UnpublishedOperation, TransactionCommitError> {
132 let mut_repo = self.mut_repo;
133 assert!(
135 !mut_repo.has_rewrites(),
136 "BUG: Descendants have not been rebased after the last rewrites."
137 );
138 let base_repo = mut_repo.base_repo().clone();
139 let (mut_index, view, predecessors) = mut_repo.consume();
140
141 let operation = {
142 let view_id = base_repo.op_store().write_view(view.store_view())?;
143 self.op_metadata.description = description.into();
144 self.op_metadata.end_time = self.end_time.unwrap_or_else(Timestamp::now);
145 let parents = self.parent_ops.iter().map(|op| op.id().clone()).collect();
146 let store_operation = op_store::Operation {
147 view_id,
148 parents,
149 metadata: self.op_metadata,
150 commit_predecessors: Some(predecessors),
151 };
152 let new_op_id = base_repo.op_store().write_operation(&store_operation)?;
153 Operation::new(base_repo.op_store().clone(), new_op_id, store_operation)
154 };
155
156 let index = base_repo.index_store().write_index(mut_index, &operation)?;
157 let unpublished = UnpublishedOperation::new(base_repo.loader(), operation, view, index);
158 Ok(unpublished)
159 }
160}
161
162pub fn create_op_metadata(
163 user_settings: &UserSettings,
164 description: String,
165 is_snapshot: bool,
166) -> OperationMetadata {
167 let start_time = user_settings
168 .operation_timestamp()
169 .unwrap_or_else(Timestamp::now);
170 let end_time = start_time;
171 let hostname = user_settings.operation_hostname().to_owned();
172 let username = user_settings.operation_username().to_owned();
173 OperationMetadata {
174 start_time,
175 end_time,
176 description,
177 hostname,
178 username,
179 is_snapshot,
180 tags: Default::default(),
181 }
182}
183
184#[must_use = "Either publish() or leave_unpublished() must be called to finish the operation."]
193pub struct UnpublishedOperation {
194 op_heads_store: Arc<dyn OpHeadsStore>,
195 repo: Arc<ReadonlyRepo>,
196}
197
198impl UnpublishedOperation {
199 fn new(
200 repo_loader: &RepoLoader,
201 operation: Operation,
202 view: View,
203 index: Box<dyn ReadonlyIndex>,
204 ) -> Self {
205 UnpublishedOperation {
206 op_heads_store: repo_loader.op_heads_store().clone(),
207 repo: repo_loader.create_from(operation, view, index),
208 }
209 }
210
211 pub fn operation(&self) -> &Operation {
212 self.repo.operation()
213 }
214
215 pub fn publish(self) -> Result<Arc<ReadonlyRepo>, TransactionCommitError> {
216 let _lock = self.op_heads_store.lock()?;
217 self.op_heads_store
218 .update_op_heads(self.operation().parent_ids(), self.operation().id())?;
219 Ok(self.repo)
220 }
221
222 pub fn leave_unpublished(self) -> Arc<ReadonlyRepo> {
223 self.repo
224 }
225}