reifydb_sub_flow/transaction/
commit.rs1use diagnostic::flow::flow_transaction_keyspace_overlap;
5use reifydb_core::interface::MultiVersionCommandTransaction;
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_type::{diagnostic, return_error, util::hex};
8use tracing::instrument;
9
10use super::{FlowTransaction, FlowTransactionMetrics, Pending};
11
12impl FlowTransaction {
13 #[instrument(name = "flow::transaction::commit", level = "debug", skip(self, parent), fields(
28 pending_count = self.pending.len(),
29 writes,
30 removes
31 ))]
32 pub async fn commit(
33 &mut self,
34 parent: &mut StandardCommandTransaction,
35 ) -> crate::Result<FlowTransactionMetrics> {
36 {
39 let parent_pending = parent.pending_writes();
40 for (key, _) in self.pending.iter_sorted() {
41 if parent_pending.contains_key(key) {
43 return_error!(flow_transaction_keyspace_overlap(hex::encode(key.as_ref())));
44 }
45 }
46 }
47
48 let mut set_count = 0;
49 let mut remove_count = 0;
50 for (key, pending) in self.pending.iter_sorted() {
51 match pending {
52 Pending::Set(value) => {
53 parent.set(key, value.clone()).await?;
54 set_count += 1;
55 }
56 Pending::Remove => {
57 parent.remove(key).await?;
58 remove_count += 1;
59 }
60 }
61 }
62
63 tracing::Span::current().record("sets", set_count);
64 tracing::Span::current().record("removes", remove_count);
65
66 self.pending.clear();
67 Ok(self.metrics.clone())
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use reifydb_core::CommitVersion;
74
75 use super::*;
76 use crate::{
77 operator::stateful::test_utils::test::create_test_transaction,
78 transaction::utils::test::{from_store, make_key, make_value},
79 };
80
81 #[tokio::test]
82 async fn test_commit_empty_pending() {
83 let mut parent = create_test_transaction().await;
84 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
85
86 let metrics = txn.commit(&mut parent).await.unwrap();
87
88 assert_eq!(metrics.reads, 0);
90 assert_eq!(metrics.writes, 0);
91 assert_eq!(metrics.removes, 0);
92 }
93
94 #[tokio::test]
95 async fn test_commit_single_write() {
96 let mut parent = create_test_transaction().await;
97 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
98
99 let key = make_key("key1");
100 let value = make_value("value1");
101 txn.set(&key, value.clone()).unwrap();
102
103 txn.commit(&mut parent).await.unwrap();
104
105 assert_eq!(from_store(&mut parent, &key).await, Some(value));
107 }
108
109 #[tokio::test]
110 async fn test_commit_multiple_writes() {
111 let mut parent = create_test_transaction().await;
112 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
113
114 txn.set(&make_key("key1"), make_value("value1")).unwrap();
115 txn.set(&make_key("key2"), make_value("value2")).unwrap();
116 txn.set(&make_key("key3"), make_value("value3")).unwrap();
117
118 txn.commit(&mut parent).await.unwrap();
119
120 assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
122 assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
123 assert_eq!(from_store(&mut parent, &make_key("key3")).await, Some(make_value("value3")));
124 }
125
126 #[tokio::test]
127 async fn test_commit_removes() {
128 use reifydb_core::interface::Engine;
129
130 use crate::operator::stateful::test_utils::test::create_test_engine;
131
132 let engine = create_test_engine().await;
133 let mut parent = engine.begin_command().await.unwrap();
134
135 let key1 = make_key("key1");
137 let key2 = make_key("key2");
138 parent.set(&key1, make_value("value1")).await.unwrap();
139 parent.set(&key2, make_value("value2")).await.unwrap();
140 let commit_version = parent.commit().await.unwrap();
141
142 let mut parent = engine.begin_command().await.unwrap();
144
145 assert_eq!(from_store(&mut parent, &key1).await, Some(make_value("value1")));
147 assert_eq!(from_store(&mut parent, &key2).await, Some(make_value("value2")));
148
149 let mut txn = FlowTransaction::new(&parent, commit_version).await;
151 txn.remove(&key1).unwrap();
152 txn.remove(&key2).unwrap();
153
154 txn.commit(&mut parent).await.unwrap();
155
156 parent.commit().await.unwrap();
158
159 let mut parent = engine.begin_command().await.unwrap();
161 assert_eq!(from_store(&mut parent, &key1).await, None);
162 assert_eq!(from_store(&mut parent, &key2).await, None);
163 }
164
165 #[tokio::test]
166 async fn test_commit_mixed_writes_and_removes() {
167 use reifydb_core::interface::Engine;
168
169 use crate::operator::stateful::test_utils::test::create_test_engine;
170
171 let engine = create_test_engine().await;
172 let mut parent = engine.begin_command().await.unwrap();
173
174 let existing_key = make_key("existing");
176 parent.set(&existing_key, make_value("old")).await.unwrap();
177 let commit_version = parent.commit().await.unwrap();
178
179 let mut parent = engine.begin_command().await.unwrap();
181
182 assert_eq!(from_store(&mut parent, &existing_key).await, Some(make_value("old")));
184
185 let mut txn = FlowTransaction::new(&parent, commit_version).await;
187
188 let new_key = make_key("new");
190 txn.set(&new_key, make_value("value")).unwrap();
191 txn.remove(&existing_key).unwrap();
192
193 txn.commit(&mut parent).await.unwrap();
194
195 parent.commit().await.unwrap();
197
198 let mut parent = engine.begin_command().await.unwrap();
200 assert_eq!(from_store(&mut parent, &new_key).await, Some(make_value("value")));
201 assert_eq!(from_store(&mut parent, &existing_key).await, None);
202 }
203
204 #[tokio::test]
205 async fn test_commit_returns_metrics() {
206 let mut parent = create_test_transaction().await;
207 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
208
209 txn.set(&make_key("key1"), make_value("value1")).unwrap();
210 txn.get(&make_key("key2")).await.unwrap();
211 txn.remove(&make_key("key3")).unwrap();
212
213 let metrics = txn.commit(&mut parent).await.unwrap();
214
215 assert_eq!(metrics.writes, 1);
216 assert_eq!(metrics.reads, 1);
217 assert_eq!(metrics.removes, 1);
218 }
219
220 #[tokio::test]
221 async fn test_commit_overwrites_storage_value() {
222 use reifydb_core::interface::Engine;
223
224 use crate::operator::stateful::test_utils::test::create_test_engine;
225
226 let engine = create_test_engine().await;
227 let mut parent = engine.begin_command().await.unwrap();
228
229 let key = make_key("key1");
231 parent.set(&key, make_value("old")).await.unwrap();
232 let commit_version = parent.commit().await.unwrap();
233
234 let mut parent = engine.begin_command().await.unwrap();
236
237 assert_eq!(from_store(&mut parent, &key).await, Some(make_value("old")));
239
240 let mut txn = FlowTransaction::new(&parent, commit_version).await;
242 txn.set(&key, make_value("new")).unwrap();
243 txn.commit(&mut parent).await.unwrap();
244
245 assert_eq!(from_store(&mut parent, &key).await, Some(make_value("new")));
247 }
248
249 #[tokio::test]
250 async fn test_sequential_commits_different_keys() {
251 let mut parent = create_test_transaction().await;
252
253 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
257 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
258 txn1.commit(&mut parent).await.unwrap();
259
260 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
262 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
263 txn2.commit(&mut parent).await.unwrap();
264
265 assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
267 assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
268 }
269
270 #[tokio::test]
271 async fn test_same_key_multiple_overwrites() {
272 let mut parent = create_test_transaction().await;
273 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
274
275 let key = make_key("key1");
276
277 txn.set(&key, make_value("first")).unwrap();
279 txn.remove(&key).unwrap();
280
281 assert!(txn.pending.is_removed(&key));
283
284 txn.set(&key, make_value("second")).unwrap();
286 txn.remove(&key).unwrap();
287 txn.set(&key, make_value("final")).unwrap();
288
289 assert_eq!(txn.pending.get(&key), Some(&make_value("final")));
291
292 txn.commit(&mut parent).await.unwrap();
294
295 assert_eq!(from_store(&mut parent, &key).await, Some(make_value("final")));
297 }
298
299 #[tokio::test]
300 async fn test_commit_detects_overlapping_writes() {
301 let mut parent = create_test_transaction().await;
302
303 let key = make_key("key1");
304
305 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
307 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
308
309 txn1.set(&key, make_value("value1")).unwrap();
311 txn2.set(&key, make_value("value2")).unwrap();
312
313 txn1.commit(&mut parent).await.unwrap();
315
316 let result = txn2.commit(&mut parent).await;
319 assert!(result.is_err());
320
321 let err = result.unwrap_err();
323 assert_eq!(err.code, "FLOW_002");
324 }
325
326 #[tokio::test]
327 async fn test_double_commit_prevention() {
328 let mut parent = create_test_transaction().await;
329
330 let mut txn = FlowTransaction::new(&parent, CommitVersion(1)).await;
331 txn.set(&make_key("key1"), make_value("value1")).unwrap();
332
333 let metrics = txn.commit(&mut parent).await;
335 assert!(metrics.is_ok(), "First commit should succeed");
336
337 }
343
344 #[tokio::test]
345 async fn test_commit_allows_nonoverlapping_writes() {
346 let mut parent = create_test_transaction().await;
347
348 let mut txn1 = FlowTransaction::new(&parent, CommitVersion(1)).await;
350 txn1.set(&make_key("key1"), make_value("value1")).unwrap();
351 txn1.commit(&mut parent).await.unwrap();
352
353 let mut txn2 = FlowTransaction::new(&parent, CommitVersion(2)).await;
356 txn2.set(&make_key("key2"), make_value("value2")).unwrap();
357 let result = txn2.commit(&mut parent).await;
358
359 assert!(result.is_ok());
361
362 assert_eq!(from_store(&mut parent, &make_key("key1")).await, Some(make_value("value1")));
364 assert_eq!(from_store(&mut parent, &make_key("key2")).await, Some(make_value("value2")));
365 }
366}