reifydb_transaction/single/
mod.rs1use std::sync::Arc;
5
6use crossbeam_skiplist::SkipMap;
7use reifydb_core::{
8 delta::Delta,
9 encoded::{encoded::EncodedValues, key::EncodedKey},
10 event::EventBus,
11 interface::WithEventBus,
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14use reifydb_store_single::SingleStore;
15
16pub mod read;
17pub mod write;
18
19use read::{KeyReadLock, SingleReadTransaction};
20use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
21use reifydb_type::Result;
22use write::{KeyWriteLock, SingleWriteTransaction};
23
24#[derive(Clone)]
25pub struct SingleTransaction {
26 inner: Arc<SingleTransactionInner>,
27}
28
29struct SingleTransactionInner {
30 store: RwLock<SingleStore>,
31 event_bus: EventBus,
32 key_locks: SkipMap<EncodedKey, Arc<RwLock<()>>>,
33}
34
35impl SingleTransactionInner {
36 fn get_or_create_lock(&self, key: &EncodedKey) -> Arc<RwLock<()>> {
37 if let Some(entry) = self.key_locks.get(key) {
39 return entry.value().clone();
40 }
41
42 let lock = Arc::new(RwLock::new(()));
44 self.key_locks.insert(key.clone(), lock.clone());
45 lock
46 }
47}
48
49impl SingleTransaction {
50 pub fn new(store: SingleStore, event_bus: EventBus) -> Self {
51 Self {
52 inner: Arc::new(SingleTransactionInner {
53 store: RwLock::new(store),
54 event_bus,
55 key_locks: SkipMap::new(),
56 }),
57 }
58 }
59
60 pub fn testing() -> Self {
61 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
62 Self::new(SingleStore::testing_memory(), EventBus::new(&actor_system))
63 }
64
65 pub fn with_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
67 where
68 I: IntoIterator<Item = &'a EncodedKey>,
69 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R>,
70 {
71 let mut tx = self.begin_query(keys)?;
72 f(&mut tx)
73 }
74
75 pub fn with_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
77 where
78 I: IntoIterator<Item = &'a EncodedKey>,
79 F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R>,
80 {
81 let mut tx = self.begin_command(keys)?;
82 let result = f(&mut tx)?;
83 tx.commit()?;
84 Ok(result)
85 }
86
87 pub fn begin_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
88 where
89 I: IntoIterator<Item = &'a EncodedKey>,
90 {
91 let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
92 assert!(
93 !keys_vec.is_empty(),
94 "SVL transactions must declare keys upfront - empty keysets are not allowed"
95 );
96
97 keys_vec.sort();
99
100 let mut locks = Vec::new();
102 for key in &keys_vec {
103 let arc = self.inner.get_or_create_lock(key);
104 locks.push(KeyReadLock::new(arc));
105 }
106
107 Ok(SingleReadTransaction {
108 inner: &self.inner,
109 keys: keys_vec,
110 _key_locks: locks,
111 })
112 }
113
114 pub fn begin_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
115 where
116 I: IntoIterator<Item = &'a EncodedKey>,
117 {
118 let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
119 assert!(
120 !keys_vec.is_empty(),
121 "SVL transactions must declare keys upfront - empty keysets are not allowed"
122 );
123
124 keys_vec.sort();
126
127 let mut locks = Vec::new();
129 for key in &keys_vec {
130 let arc = self.inner.get_or_create_lock(key);
131 locks.push(KeyWriteLock::new(arc));
132 }
133
134 Ok(SingleWriteTransaction::new(&self.inner, keys_vec, locks))
135 }
136}
137
138impl WithEventBus for SingleTransaction {
139 fn event_bus(&self) -> &EventBus {
140 &self.inner.event_bus
141 }
142}
143
144#[cfg(test)]
145pub mod tests {
146 use std::{
147 iter,
148 sync::{Arc, Barrier},
149 thread,
150 time::Duration,
151 };
152
153 use reifydb_type::util::cowvec::CowVec;
154
155 use super::*;
156
157 fn make_key(s: &str) -> EncodedKey {
158 EncodedKey(CowVec::new(s.as_bytes().to_vec()))
159 }
160
161 fn make_value(s: &str) -> EncodedValues {
162 EncodedValues(CowVec::new(s.as_bytes().to_vec()))
163 }
164
165 fn create_test_svl() -> SingleTransaction {
166 SingleTransaction::testing()
167 }
168
169 #[test]
170 fn test_allowed_key_query() {
171 let svl = create_test_svl();
172 let key = make_key("test_key");
173
174 let mut tx = svl.begin_query(vec![&key]).unwrap();
176
177 let result = tx.get(&key);
179 assert!(result.is_ok());
180 }
181
182 #[test]
183 fn test_disallowed_key_query() {
184 let svl = create_test_svl();
185 let key1 = make_key("allowed");
186 let key2 = make_key("disallowed");
187
188 let mut tx = svl.begin_query(vec![&key1]).unwrap();
190
191 assert!(tx.get(&key1).is_ok());
193
194 let result = tx.get(&key2);
196 assert!(result.is_err());
197 let err = result.unwrap_err();
198 assert_eq!(err.0.code, "TXN_010");
199 }
200
201 #[test]
202 #[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
203 fn test_empty_keyset_query_panics() {
204 let svl = create_test_svl();
205
206 let _tx = svl.begin_query(iter::empty());
208 }
209
210 #[test]
211 #[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
212 fn test_empty_keyset_command_panics() {
213 let svl = create_test_svl();
214
215 let _tx = svl.begin_command(iter::empty());
217 }
218
219 #[test]
220 fn test_allowed_key_command() {
221 let svl = create_test_svl();
222 let key = make_key("test_key");
223 let value = make_value("test_value");
224
225 let mut tx = svl.begin_command(vec![&key]).unwrap();
227
228 assert!(tx.set(&key, value.clone()).is_ok());
230 assert!(tx.get(&key).is_ok());
231 assert!(tx.commit().is_ok());
232 }
233
234 #[test]
235 fn test_disallowed_key_command() {
236 let svl = create_test_svl();
237 let key1 = make_key("allowed");
238 let key2 = make_key("disallowed");
239 let value = make_value("test_value");
240
241 let mut tx = svl.begin_command(vec![&key1]).unwrap();
243
244 assert!(tx.set(&key1, value.clone()).is_ok());
246
247 let result = tx.set(&key2, value);
249 assert!(result.is_err());
250 let err = result.unwrap_err();
251 assert_eq!(err.0.code, "TXN_010");
252 }
253
254 #[test]
255 fn test_command_commit_with_valid_keys() {
256 let svl = create_test_svl();
257 let key1 = make_key("key1");
258 let key2 = make_key("key2");
259 let value1 = make_value("value1");
260 let value2 = make_value("value2");
261
262 {
264 let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
265 tx.set(&key1, value1.clone()).unwrap();
266 tx.set(&key2, value2.clone()).unwrap();
267 tx.commit().unwrap();
268 }
269
270 {
272 let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
273 let result1 = tx.get(&key1).unwrap();
274 let result2 = tx.get(&key2).unwrap();
275 assert!(result1.is_some());
276 assert!(result2.is_some());
277 assert_eq!(result1.unwrap().values, value1);
278 assert_eq!(result2.unwrap().values, value2);
279 }
280 }
281
282 #[test]
283 fn test_rollback_with_scoped_keys() {
284 let svl = create_test_svl();
285 let key = make_key("test_key");
286 let value = make_value("test_value");
287
288 {
290 let mut tx = svl.begin_command(vec![&key]).unwrap();
291 tx.set(&key, value).unwrap();
292 tx.rollback().unwrap();
293 }
294
295 {
297 let mut tx = svl.begin_query(vec![&key]).unwrap();
298 let result = tx.get(&key).unwrap();
299 assert!(result.is_none());
300 }
301 }
302
303 #[test]
304 fn test_concurrent_reads() {
305 let svl = Arc::new(create_test_svl());
306 let key = make_key("shared_key");
307 let value = make_value("shared_value");
308
309 {
311 let mut tx = svl.begin_command(vec![&key]).unwrap();
312 tx.set(&key, value.clone()).unwrap();
313 tx.commit().unwrap();
314 }
315
316 let mut handles = vec![];
318 for _ in 0..5 {
319 let svl_clone = Arc::clone(&svl);
320 let key_clone = key.clone();
321 let value_clone = value.clone();
322
323 let handle = thread::spawn(move || {
324 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
325 let result = tx.get(&key_clone).unwrap();
326 assert!(result.is_some());
327 assert_eq!(result.unwrap().values, value_clone);
328 });
329 handles.push(handle);
330 }
331
332 for handle in handles {
334 handle.join().unwrap();
335 }
336 }
337
338 #[test]
339 fn test_concurrent_writers_disjoint_keys() {
340 let svl = Arc::new(create_test_svl());
341
342 let mut handles = vec![];
344 for i in 0..5 {
345 let svl_clone = Arc::clone(&svl);
346 let key = make_key(&format!("key_{}", i));
347 let value = make_value(&format!("value_{}", i));
348
349 let handle = thread::spawn(move || {
350 let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
351 tx.set(&key, value).unwrap();
352 tx.commit().unwrap();
353 });
354 handles.push(handle);
355 }
356
357 for handle in handles {
359 handle.join().unwrap();
360 }
361
362 for i in 0..5 {
364 let key = make_key(&format!("key_{}", i));
365 let expected_value = make_value(&format!("value_{}", i));
366
367 let mut tx = svl.begin_query(vec![&key]).unwrap();
368 let result = tx.get(&key).unwrap();
369 assert!(result.is_some());
370 assert_eq!(result.unwrap().values, expected_value);
371 }
372 }
373
374 #[test]
375 fn test_concurrent_readers_and_writer() {
376 let svl = Arc::new(create_test_svl());
377 let key1 = make_key("key1");
378 let key2 = make_key("key2");
379 let value1 = make_value("value1");
380 let value2 = make_value("value2");
381
382 {
384 let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
385 tx.set(&key1, value1.clone()).unwrap();
386 tx.set(&key2, value2.clone()).unwrap();
387 tx.commit().unwrap();
388 }
389
390 let mut handles = vec![];
392 for _ in 0..3 {
393 let svl_clone = Arc::clone(&svl);
394 let key_clone = key1.clone();
395 let value_clone = value1.clone();
396
397 let handle = thread::spawn(move || {
398 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
399 let result = tx.get(&key_clone).unwrap();
400 assert!(result.is_some());
401 assert_eq!(result.unwrap().values, value_clone);
402 });
403 handles.push(handle);
404 }
405
406 let svl_clone = Arc::clone(&svl);
408 let new_value = make_value("new_value2");
409 let handle = thread::spawn(move || {
410 let mut tx = svl_clone.begin_command(vec![&key2]).unwrap();
411 tx.set(&key2, new_value).unwrap();
412 tx.commit().unwrap();
413 });
414 handles.push(handle);
415
416 for handle in handles {
418 handle.join().unwrap();
419 }
420 }
421
422 #[test]
423 fn test_no_panics_with_rwlock() {
424 let svl = Arc::new(create_test_svl());
425
426 let mut handles = vec![];
428 for i in 0..10 {
429 let svl_clone = Arc::clone(&svl);
430 let key = make_key(&format!("key_{}", i % 3)); let value = make_value(&format!("value_{}", i));
432
433 let handle = thread::spawn(move || {
434 if i % 2 == 0 {
436 let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
437 let _ = tx.set(&key, value);
438 let _ = tx.commit();
439 } else {
440 let mut tx = svl_clone.begin_query(vec![&key]).unwrap();
441 let _ = tx.get(&key);
442 }
443 });
444 handles.push(handle);
445 }
446
447 for handle in handles {
449 handle.join().unwrap();
450 }
451 }
452
453 #[test]
454 fn test_write_blocks_concurrent_write() {
455 let svl = Arc::new(create_test_svl());
456 let key = make_key("blocking_key");
457 let barrier = Arc::new(Barrier::new(2));
458
459 let svl1 = Arc::clone(&svl);
461 let key1 = key.clone();
462 let barrier1 = Arc::clone(&barrier);
463 let handle1 = thread::spawn(move || {
464 let mut tx = svl1.begin_command(vec![&key1]).unwrap();
465 tx.set(&key1, make_value("value1")).unwrap();
466
467 barrier1.wait();
469
470 thread::sleep(Duration::from_millis(100));
472
473 tx.commit().unwrap();
474 });
475
476 let svl2 = Arc::clone(&svl);
478 let key2 = key.clone();
479 let barrier2 = Arc::clone(&barrier);
480 let handle2 = thread::spawn(move || {
481 barrier2.wait();
483
484 thread::sleep(Duration::from_millis(10));
486
487 let mut tx = svl2.begin_command(vec![&key2]).unwrap();
489 tx.set(&key2, make_value("value2")).unwrap();
490 tx.commit().unwrap();
491 });
492
493 handle1.join().unwrap();
494 handle2.join().unwrap();
495
496 let mut tx = svl.begin_query(vec![&key]).unwrap();
498 let result = tx.get(&key).unwrap();
499 assert!(result.is_some());
500 assert_eq!(result.unwrap().values, make_value("value2"));
501 }
502
503 #[test]
504 fn test_write_blocks_concurrent_read() {
505 let svl = Arc::new(create_test_svl());
506 let key = make_key("blocking_key");
507
508 {
510 let mut tx = svl.begin_command(vec![&key]).unwrap();
511 tx.set(&key, make_value("initial")).unwrap();
512 tx.commit().unwrap();
513 }
514
515 let barrier = Arc::new(Barrier::new(2));
516
517 let svl1 = Arc::clone(&svl);
519 let key1 = key.clone();
520 let barrier1 = Arc::clone(&barrier);
521 let handle1 = thread::spawn(move || {
522 let mut tx = svl1.begin_command(vec![&key1]).unwrap();
523 tx.set(&key1, make_value("updated")).unwrap();
524
525 barrier1.wait();
527
528 thread::sleep(Duration::from_millis(100));
530
531 tx.commit().unwrap();
532 });
533
534 let svl2 = Arc::clone(&svl);
536 let key2 = key.clone();
537 let barrier2 = Arc::clone(&barrier);
538 let handle2 = thread::spawn(move || {
539 barrier2.wait();
541
542 thread::sleep(Duration::from_millis(10));
544
545 let mut tx = svl2.begin_query(vec![&key2]).unwrap();
547 let result = tx.get(&key2).unwrap();
548
549 assert!(result.is_some());
551 assert_eq!(result.unwrap().values, make_value("updated"));
552 });
553
554 handle1.join().unwrap();
555 handle2.join().unwrap();
556 }
557
558 #[test]
559 fn test_concurrent_reads_allowed() {
560 let svl = Arc::new(create_test_svl());
561 let key = make_key("shared_read_key");
562
563 {
565 let mut tx = svl.begin_command(vec![&key]).unwrap();
566 tx.set(&key, make_value("shared")).unwrap();
567 tx.commit().unwrap();
568 }
569
570 let barrier = Arc::new(Barrier::new(3));
571 let mut handles = vec![];
572
573 for _ in 0..3 {
575 let svl_clone = Arc::clone(&svl);
576 let key_clone = key.clone();
577 let barrier_clone = Arc::clone(&barrier);
578
579 let handle = thread::spawn(move || {
580 let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
581
582 barrier_clone.wait();
584
585 let result = tx.get(&key_clone).unwrap();
587 assert!(result.is_some());
588 assert_eq!(result.unwrap().values, make_value("shared"));
589
590 thread::sleep(Duration::from_millis(50));
592 });
593 handles.push(handle);
594 }
595
596 for handle in handles {
597 handle.join().unwrap();
598 }
599 }
600
601 #[test]
602 fn test_overlapping_keys_different_order() {
603 let svl = Arc::new(create_test_svl());
604 let key1 = make_key("deadlock_key1");
605 let key2 = make_key("deadlock_key2");
606 let barrier = Arc::new(Barrier::new(2));
607
608 let svl1 = Arc::clone(&svl);
610 let key1_clone = key1.clone();
611 let key2_clone = key2.clone();
612 let barrier1 = Arc::clone(&barrier);
613 let handle1 = thread::spawn(move || {
614 barrier1.wait();
615 let mut tx = svl1.begin_command(vec![&key1_clone, &key2_clone]).unwrap();
616 tx.set(&key1_clone, make_value("from_thread1")).unwrap();
617 thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
619 });
620
621 let svl2 = Arc::clone(&svl);
624 let key1_clone2 = key1.clone();
625 let key2_clone2 = key2.clone();
626 let barrier2 = Arc::clone(&barrier);
627 let handle2 = thread::spawn(move || {
628 barrier2.wait();
629 let mut tx = svl2.begin_command(vec![&key2_clone2, &key1_clone2]).unwrap();
630 tx.set(&key2_clone2, make_value("from_thread2")).unwrap();
631 thread::sleep(Duration::from_millis(10)); tx.commit().unwrap();
633 });
634
635 handle1.join().unwrap();
637 handle2.join().unwrap();
638
639 let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
641 let result1 = tx.get(&key1).unwrap();
642 let result2 = tx.get(&key2).unwrap();
643 assert!(result1.is_some());
644 assert!(result2.is_some());
645 }
646
647 #[test]
648 fn test_circular_dependency_three_transactions() {
649 let svl = Arc::new(create_test_svl());
650 let key1 = make_key("circular_key1");
651 let key2 = make_key("circular_key2");
652 let key3 = make_key("circular_key3");
653 let barrier = Arc::new(Barrier::new(3));
654
655 let svl1 = Arc::clone(&svl);
657 let k1_1 = key1.clone();
658 let k2_1 = key2.clone();
659 let barrier1 = Arc::clone(&barrier);
660 let handle1 = thread::spawn(move || {
661 barrier1.wait();
662 let mut tx = svl1.begin_command(vec![&k1_1, &k2_1]).unwrap();
663 tx.set(&k1_1, make_value("t1")).unwrap();
664 thread::sleep(Duration::from_millis(10));
665 tx.commit().unwrap();
666 });
667
668 let svl2 = Arc::clone(&svl);
670 let k2_2 = key2.clone();
671 let k3_2 = key3.clone();
672 let barrier2 = Arc::clone(&barrier);
673 let handle2 = thread::spawn(move || {
674 barrier2.wait();
675 let mut tx = svl2.begin_command(vec![&k2_2, &k3_2]).unwrap();
676 tx.set(&k2_2, make_value("t2")).unwrap();
677 thread::sleep(Duration::from_millis(10));
678 tx.commit().unwrap();
679 });
680
681 let svl3 = Arc::clone(&svl);
684 let barrier3 = Arc::clone(&barrier);
685 let handle3 = thread::spawn(move || {
686 barrier3.wait();
687 let mut tx = svl3.begin_command(vec![&key3, &key1]).unwrap();
688 tx.set(&key3, make_value("t3")).unwrap();
689 thread::sleep(Duration::from_millis(10));
690 tx.commit().unwrap();
691 });
692
693 handle1.join().unwrap();
695 handle2.join().unwrap();
696 handle3.join().unwrap();
697 }
698
699 #[test]
700 fn test_locks_released_on_drop() {
701 let svl = Arc::new(create_test_svl());
702 let key = make_key("drop_test_key");
703
704 let svl1 = Arc::clone(&svl);
706 let key_clone = key.clone();
707 let handle1 = thread::spawn(move || {
708 let mut tx = svl1.begin_command(vec![&key_clone]).unwrap();
709 tx.set(&key_clone, make_value("dropped")).unwrap();
710 });
712
713 handle1.join().unwrap();
714
715 thread::sleep(Duration::from_millis(10));
717
718 let svl2 = Arc::clone(&svl);
721 let key_clone2 = key.clone();
722 let handle2 = thread::spawn(move || {
723 let mut tx = svl2.begin_command(vec![&key_clone2]).unwrap();
724 tx.set(&key_clone2, make_value("success")).unwrap();
725 tx.commit().unwrap();
726 });
727
728 handle2.join().unwrap();
730
731 let mut tx = svl.begin_query(vec![&key]).unwrap();
733 let result = tx.get(&key).unwrap();
734 assert!(result.is_some());
735 assert_eq!(result.unwrap().values, make_value("success"));
736 }
737}