reifydb_transaction/single/
mod.rs1use std::sync::Arc;
5
6use crossbeam_skiplist::SkipMap;
7use reifydb_core::{
8 delta::Delta,
9 encoded::{key::EncodedKey, row::EncodedRow},
10 event::EventBus,
11 interface::WithEventBus,
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14use reifydb_store_single::SingleStore;
15
16pub mod read;
17pub mod write;
18
19use read::{KeyReadLock, SingleReadTransaction};
20use reifydb_runtime::{
21 actor::system::ActorSystem,
22 context::clock::Clock,
23 pool::{PoolConfig, Pools},
24};
25use reifydb_type::Result;
26use write::{KeyWriteLock, SingleWriteTransaction};
27
28#[derive(Clone)]
29pub struct SingleTransaction {
30 inner: Arc<SingleTransactionInner>,
31}
32
33struct SingleTransactionInner {
34 store: RwLock<SingleStore>,
35 event_bus: EventBus,
36 key_locks: SkipMap<EncodedKey, Arc<RwLock<()>>>,
37}
38
39impl SingleTransactionInner {
40 fn get_or_create_lock(&self, key: &EncodedKey) -> Arc<RwLock<()>> {
41 if let Some(entry) = self.key_locks.get(key) {
43 return entry.value().clone();
44 }
45
46 let lock = Arc::new(RwLock::new(()));
48 self.key_locks.insert(key.clone(), lock.clone());
49 lock
50 }
51}
52
53impl SingleTransaction {
54 pub fn new(store: SingleStore, event_bus: EventBus) -> Self {
55 Self {
56 inner: Arc::new(SingleTransactionInner {
57 store: RwLock::new(store),
58 event_bus,
59 key_locks: SkipMap::new(),
60 }),
61 }
62 }
63
64 pub fn testing() -> Self {
65 let pools = Pools::new(PoolConfig::default());
66 let actor_system = ActorSystem::new(pools, Clock::Real);
67 Self::new(SingleStore::testing_memory(), EventBus::new(&actor_system))
68 }
69
70 pub fn with_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
72 where
73 I: IntoIterator<Item = &'a EncodedKey>,
74 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R>,
75 {
76 let mut tx = self.begin_query(keys)?;
77 f(&mut tx)
78 }
79
80 pub fn with_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
82 where
83 I: IntoIterator<Item = &'a EncodedKey>,
84 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R>,
85 {
86 let mut tx = self.begin_command(keys)?;
87 let result = f(&mut tx)?;
88 tx.commit()?;
89 Ok(result)
90 }
91
92 pub fn begin_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
93 where
94 I: IntoIterator<Item = &'a EncodedKey>,
95 {
96 let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
97 assert!(
98 !keys_vec.is_empty(),
99 "SVL transactions must declare keys upfront - empty keysets are not allowed"
100 );
101
102 keys_vec.sort();
104
105 let mut locks = Vec::new();
107 for key in &keys_vec {
108 let arc = self.inner.get_or_create_lock(key);
109 locks.push(KeyReadLock::new(arc));
110 }
111
112 Ok(SingleReadTransaction {
113 inner: &self.inner,
114 keys: keys_vec,
115 _key_locks: locks,
116 })
117 }
118
119 pub fn begin_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
120 where
121 I: IntoIterator<Item = &'a EncodedKey>,
122 {
123 let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
124 assert!(
125 !keys_vec.is_empty(),
126 "SVL transactions must declare keys upfront - empty keysets are not allowed"
127 );
128
129 keys_vec.sort();
131
132 let mut locks = Vec::new();
134 for key in &keys_vec {
135 let arc = self.inner.get_or_create_lock(key);
136 locks.push(KeyWriteLock::new(arc));
137 }
138
139 Ok(SingleWriteTransaction::new(&self.inner, keys_vec, locks))
140 }
141}
142
143impl WithEventBus for SingleTransaction {
144 fn event_bus(&self) -> &EventBus {
145 &self.inner.event_bus
146 }
147}
148
149#[cfg(test)]
150pub mod tests {
151 use std::{
152 iter,
153 sync::{Arc, Barrier},
154 thread,
155 time::Duration,
156 };
157
158 use reifydb_type::util::cowvec::CowVec;
159
160 use super::*;
161
162 fn make_key(s: &str) -> EncodedKey {
163 EncodedKey(CowVec::new(s.as_bytes().to_vec()))
164 }
165
166 fn make_value(s: &str) -> EncodedRow {
167 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
168 }
169
170 fn create_test_svl() -> SingleTransaction {
171 SingleTransaction::testing()
172 }
173
174 #[test]
175 fn test_allowed_key_query() {
176 let svl = create_test_svl();
177 let key = make_key("test_key");
178
179 let mut tx = svl.begin_query(vec![&key]).unwrap();
181
182 let result = tx.get(&key);
184 assert!(result.is_ok());
185 }
186
187 #[test]
188 fn test_disallowed_key_query() {
189 let svl = create_test_svl();
190 let key1 = make_key("allowed");
191 let key2 = make_key("disallowed");
192
193 let mut tx = svl.begin_query(vec![&key1]).unwrap();
195
196 assert!(tx.get(&key1).is_ok());
198
199 let result = tx.get(&key2);
201 assert!(result.is_err());
202 let err = result.unwrap_err();
203 assert_eq!(err.0.code, "TXN_010");
204 }
205
206 #[test]
207 #[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
208 fn test_empty_keyset_query_panics() {
209 let svl = create_test_svl();
210
211 let _tx = svl.begin_query(iter::empty());
213 }
214
215 #[test]
216 #[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
217 fn test_empty_keyset_command_panics() {
218 let svl = create_test_svl();
219
220 let _tx = svl.begin_command(iter::empty());
222 }
223
224 #[test]
225 fn test_allowed_key_command() {
226 let svl = create_test_svl();
227 let key = make_key("test_key");
228 let value = make_value("test_value");
229
230 let mut tx = svl.begin_command(vec![&key]).unwrap();
232
233 assert!(tx.set(&key, value.clone()).is_ok());
235 assert!(tx.get(&key).is_ok());
236 assert!(tx.commit().is_ok());
237 }
238
239 #[test]
240 fn test_disallowed_key_command() {
241 let svl = create_test_svl();
242 let key1 = make_key("allowed");
243 let key2 = make_key("disallowed");
244 let value = make_value("test_value");
245
246 let mut tx = svl.begin_command(vec![&key1]).unwrap();
248
249 assert!(tx.set(&key1, value.clone()).is_ok());
251
252 let result = tx.set(&key2, value);
254 assert!(result.is_err());
255 let err = result.unwrap_err();
256 assert_eq!(err.0.code, "TXN_010");
257 }
258
259 #[test]
260 fn test_command_commit_with_valid_keys() {
261 let svl = create_test_svl();
262 let key1 = make_key("key1");
263 let key2 = make_key("key2");
264 let value1 = make_value("value1");
265 let value2 = make_value("value2");
266
267 {
269 let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
270 tx.set(&key1, value1.clone()).unwrap();
271 tx.set(&key2, value2.clone()).unwrap();
272 tx.commit().unwrap();
273 }
274
275 {
277 let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
278 let result1 = tx.get(&key1).unwrap();
279 let result2 = tx.get(&key2).unwrap();
280 assert!(result1.is_some());
281 assert!(result2.is_some());
282 assert_eq!(result1.unwrap().row, value1);
283 assert_eq!(result2.unwrap().row, value2);
284 }
285 }
286
287 #[test]
288 fn test_rollback_with_scoped_keys() {
289 let svl = create_test_svl();
290 let key = make_key("test_key");
291 let value = make_value("test_value");
292
293 {
295 let mut tx = svl.begin_command(vec![&key]).unwrap();
296 tx.set(&key, value).unwrap();
297 tx.rollback().unwrap();
298 }
299
300 {
302 let mut tx = svl.begin_query(vec![&key]).unwrap();
303 let result = tx.get(&key).unwrap();
304 assert!(result.is_none());
305 }
306 }
307
308 #[test]
309 fn test_concurrent_reads() {
310 let svl = Arc::new(create_test_svl());
311 let key = make_key("shared_key");
312 let value = make_value("shared_value");
313
314 {
316 let mut tx = svl.begin_command(vec![&key]).unwrap();
317 tx.set(&key, value.clone()).unwrap();
318 tx.commit().unwrap();
319 }
320
321 let mut handles = vec![];
323 for _ in 0..5 {
324 let svl_clone = Arc::clone(&svl);
325 let key_clone = key.clone();
326 let value_clone = value.clone();
327
328 let handle = thread::spawn(move || {
329 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
330 let result = tx.get(&key_clone).unwrap();
331 assert!(result.is_some());
332 assert_eq!(result.unwrap().row, value_clone);
333 });
334 handles.push(handle);
335 }
336
337 for handle in handles {
339 handle.join().unwrap();
340 }
341 }
342
343 #[test]
344 fn test_concurrent_writers_disjoint_keys() {
345 let svl = Arc::new(create_test_svl());
346
347 let mut handles = vec![];
349 for i in 0..5 {
350 let svl_clone = Arc::clone(&svl);
351 let key = make_key(&format!("key_{}", i));
352 let value = make_value(&format!("value_{}", i));
353
354 let handle = thread::spawn(move || {
355 let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
356 tx.set(&key, value).unwrap();
357 tx.commit().unwrap();
358 });
359 handles.push(handle);
360 }
361
362 for handle in handles {
364 handle.join().unwrap();
365 }
366
367 for i in 0..5 {
369 let key = make_key(&format!("key_{}", i));
370 let expected_value = make_value(&format!("value_{}", i));
371
372 let mut tx = svl.begin_query(vec![&key]).unwrap();
373 let result = tx.get(&key).unwrap();
374 assert!(result.is_some());
375 assert_eq!(result.unwrap().row, expected_value);
376 }
377 }
378
379 #[test]
380 fn test_concurrent_readers_and_writer() {
381 let svl = Arc::new(create_test_svl());
382 let key1 = make_key("key1");
383 let key2 = make_key("key2");
384 let value1 = make_value("value1");
385 let value2 = make_value("value2");
386
387 {
389 let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
390 tx.set(&key1, value1.clone()).unwrap();
391 tx.set(&key2, value2.clone()).unwrap();
392 tx.commit().unwrap();
393 }
394
395 let mut handles = vec![];
397 for _ in 0..3 {
398 let svl_clone = Arc::clone(&svl);
399 let key_clone = key1.clone();
400 let value_clone = value1.clone();
401
402 let handle = thread::spawn(move || {
403 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
404 let result = tx.get(&key_clone).unwrap();
405 assert!(result.is_some());
406 assert_eq!(result.unwrap().row, value_clone);
407 });
408 handles.push(handle);
409 }
410
411 let svl_clone = Arc::clone(&svl);
413 let new_value = make_value("new_value2");
414 let handle = thread::spawn(move || {
415 let mut tx = svl_clone.begin_command(vec![&key2]).unwrap();
416 tx.set(&key2, new_value).unwrap();
417 tx.commit().unwrap();
418 });
419 handles.push(handle);
420
421 for handle in handles {
423 handle.join().unwrap();
424 }
425 }
426
427 #[test]
428 fn test_no_panics_with_rwlock() {
429 let svl = Arc::new(create_test_svl());
430
431 let mut handles = vec![];
433 for i in 0..10 {
434 let svl_clone = Arc::clone(&svl);
435 let key = make_key(&format!("key_{}", i % 3)); let value = make_value(&format!("value_{}", i));
437
438 let handle = thread::spawn(move || {
439 if i % 2 == 0 {
441 let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
442 let _ = tx.set(&key, value);
443 let _ = tx.commit();
444 } else {
445 let mut tx = svl_clone.begin_query(vec![&key]).unwrap();
446 let _ = tx.get(&key);
447 }
448 });
449 handles.push(handle);
450 }
451
452 for handle in handles {
454 handle.join().unwrap();
455 }
456 }
457
458 #[test]
459 fn test_write_blocks_concurrent_write() {
460 let svl = Arc::new(create_test_svl());
461 let key = make_key("blocking_key");
462 let barrier = Arc::new(Barrier::new(2));
463
464 let svl1 = Arc::clone(&svl);
466 let key1 = key.clone();
467 let barrier1 = Arc::clone(&barrier);
468 let handle1 = thread::spawn(move || {
469 let mut tx = svl1.begin_command(vec![&key1]).unwrap();
470 tx.set(&key1, make_value("value1")).unwrap();
471
472 barrier1.wait();
474
475 thread::sleep(Duration::from_millis(100));
477
478 tx.commit().unwrap();
479 });
480
481 let svl2 = Arc::clone(&svl);
483 let key2 = key.clone();
484 let barrier2 = Arc::clone(&barrier);
485 let handle2 = thread::spawn(move || {
486 barrier2.wait();
488
489 thread::sleep(Duration::from_millis(10));
491
492 let mut tx = svl2.begin_command(vec![&key2]).unwrap();
494 tx.set(&key2, make_value("value2")).unwrap();
495 tx.commit().unwrap();
496 });
497
498 handle1.join().unwrap();
499 handle2.join().unwrap();
500
501 let mut tx = svl.begin_query(vec![&key]).unwrap();
503 let result = tx.get(&key).unwrap();
504 assert!(result.is_some());
505 assert_eq!(result.unwrap().row, make_value("value2"));
506 }
507
508 #[test]
509 fn test_write_blocks_concurrent_read() {
510 let svl = Arc::new(create_test_svl());
511 let key = make_key("blocking_key");
512
513 {
515 let mut tx = svl.begin_command(vec![&key]).unwrap();
516 tx.set(&key, make_value("initial")).unwrap();
517 tx.commit().unwrap();
518 }
519
520 let barrier = Arc::new(Barrier::new(2));
521
522 let svl1 = Arc::clone(&svl);
524 let key1 = key.clone();
525 let barrier1 = Arc::clone(&barrier);
526 let handle1 = thread::spawn(move || {
527 let mut tx = svl1.begin_command(vec![&key1]).unwrap();
528 tx.set(&key1, make_value("updated")).unwrap();
529
530 barrier1.wait();
532
533 thread::sleep(Duration::from_millis(100));
535
536 tx.commit().unwrap();
537 });
538
539 let svl2 = Arc::clone(&svl);
541 let key2 = key.clone();
542 let barrier2 = Arc::clone(&barrier);
543 let handle2 = thread::spawn(move || {
544 barrier2.wait();
546
547 thread::sleep(Duration::from_millis(10));
549
550 let mut tx = svl2.begin_query(vec![&key2]).unwrap();
552 let result = tx.get(&key2).unwrap();
553
554 assert!(result.is_some());
556 assert_eq!(result.unwrap().row, make_value("updated"));
557 });
558
559 handle1.join().unwrap();
560 handle2.join().unwrap();
561 }
562
563 #[test]
564 fn test_concurrent_reads_allowed() {
565 let svl = Arc::new(create_test_svl());
566 let key = make_key("shared_read_key");
567
568 {
570 let mut tx = svl.begin_command(vec![&key]).unwrap();
571 tx.set(&key, make_value("shared")).unwrap();
572 tx.commit().unwrap();
573 }
574
575 let barrier = Arc::new(Barrier::new(3));
576 let mut handles = vec![];
577
578 for _ in 0..3 {
580 let svl_clone = Arc::clone(&svl);
581 let key_clone = key.clone();
582 let barrier_clone = Arc::clone(&barrier);
583
584 let handle = thread::spawn(move || {
585 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
586
587 barrier_clone.wait();
589
590 let result = tx.get(&key_clone).unwrap();
592 assert!(result.is_some());
593 assert_eq!(result.unwrap().row, make_value("shared"));
594
595 thread::sleep(Duration::from_millis(50));
597 });
598 handles.push(handle);
599 }
600
601 for handle in handles {
602 handle.join().unwrap();
603 }
604 }
605
606 #[test]
607 fn test_overlapping_keys_different_order() {
608 let svl = Arc::new(create_test_svl());
609 let key1 = make_key("deadlock_key1");
610 let key2 = make_key("deadlock_key2");
611 let barrier = Arc::new(Barrier::new(2));
612
613 let svl1 = Arc::clone(&svl);
615 let key1_clone = key1.clone();
616 let key2_clone = key2.clone();
617 let barrier1 = Arc::clone(&barrier);
618 let handle1 = thread::spawn(move || {
619 barrier1.wait();
620 let mut tx = svl1.begin_command(vec![&key1_clone, &key2_clone]).unwrap();
621 tx.set(&key1_clone, make_value("from_thread1")).unwrap();
622 thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
624 });
625
626 let svl2 = Arc::clone(&svl);
629 let key1_clone2 = key1.clone();
630 let key2_clone2 = key2.clone();
631 let barrier2 = Arc::clone(&barrier);
632 let handle2 = thread::spawn(move || {
633 barrier2.wait();
634 let mut tx = svl2.begin_command(vec![&key2_clone2, &key1_clone2]).unwrap();
635 tx.set(&key2_clone2, make_value("from_thread2")).unwrap();
636 thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
638 });
639
640 handle1.join().unwrap();
642 handle2.join().unwrap();
643
644 let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
646 let result1 = tx.get(&key1).unwrap();
647 let result2 = tx.get(&key2).unwrap();
648 assert!(result1.is_some());
649 assert!(result2.is_some());
650 }
651
652 #[test]
653 fn test_circular_dependency_three_transactions() {
654 let svl = Arc::new(create_test_svl());
655 let key1 = make_key("circular_key1");
656 let key2 = make_key("circular_key2");
657 let key3 = make_key("circular_key3");
658 let barrier = Arc::new(Barrier::new(3));
659
660 let svl1 = Arc::clone(&svl);
662 let k1_1 = key1.clone();
663 let k2_1 = key2.clone();
664 let barrier1 = Arc::clone(&barrier);
665 let handle1 = thread::spawn(move || {
666 barrier1.wait();
667 let mut tx = svl1.begin_command(vec![&k1_1, &k2_1]).unwrap();
668 tx.set(&k1_1, make_value("t1")).unwrap();
669 thread::sleep(Duration::from_millis(10));
670 tx.commit().unwrap();
671 });
672
673 let svl2 = Arc::clone(&svl);
675 let k2_2 = key2.clone();
676 let k3_2 = key3.clone();
677 let barrier2 = Arc::clone(&barrier);
678 let handle2 = thread::spawn(move || {
679 barrier2.wait();
680 let mut tx = svl2.begin_command(vec![&k2_2, &k3_2]).unwrap();
681 tx.set(&k2_2, make_value("t2")).unwrap();
682 thread::sleep(Duration::from_millis(10));
683 tx.commit().unwrap();
684 });
685
686 let svl3 = Arc::clone(&svl);
689 let barrier3 = Arc::clone(&barrier);
690 let handle3 = thread::spawn(move || {
691 barrier3.wait();
692 let mut tx = svl3.begin_command(vec![&key3, &key1]).unwrap();
693 tx.set(&key3, make_value("t3")).unwrap();
694 thread::sleep(Duration::from_millis(10));
695 tx.commit().unwrap();
696 });
697
698 handle1.join().unwrap();
700 handle2.join().unwrap();
701 handle3.join().unwrap();
702 }
703
704 #[test]
705 fn test_locks_released_on_drop() {
706 let svl = Arc::new(create_test_svl());
707 let key = make_key("drop_test_key");
708
709 let svl1 = Arc::clone(&svl);
711 let key_clone = key.clone();
712 let handle1 = thread::spawn(move || {
713 let mut tx = svl1.begin_command(vec![&key_clone]).unwrap();
714 tx.set(&key_clone, make_value("dropped")).unwrap();
715 });
717
718 handle1.join().unwrap();
719
720 thread::sleep(Duration::from_millis(10));
722
723 let svl2 = Arc::clone(&svl);
726 let key_clone2 = key.clone();
727 let handle2 = thread::spawn(move || {
728 let mut tx = svl2.begin_command(vec![&key_clone2]).unwrap();
729 tx.set(&key_clone2, make_value("success")).unwrap();
730 tx.commit().unwrap();
731 });
732
733 handle2.join().unwrap();
735
736 let mut tx = svl.begin_query(vec![&key]).unwrap();
738 let result = tx.get(&key).unwrap();
739 assert!(result.is_some());
740 assert_eq!(result.unwrap().row, make_value("success"));
741 }
742}