Skip to main content

reifydb_transaction/single/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use crossbeam_skiplist::SkipMap;
7use reifydb_core::{
8	delta::Delta,
9	encoded::{key::EncodedKey, row::EncodedRow},
10	event::EventBus,
11	interface::WithEventBus,
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14use reifydb_store_single::SingleStore;
15
16pub mod read;
17pub mod write;
18
19use read::{KeyReadLock, SingleReadTransaction};
20use reifydb_runtime::{
21	actor::system::ActorSystem,
22	context::clock::Clock,
23	pool::{PoolConfig, Pools},
24};
25use reifydb_type::Result;
26use write::{KeyWriteLock, SingleWriteTransaction};
27
28#[derive(Clone)]
29pub struct SingleTransaction {
30	inner: Arc<SingleTransactionInner>,
31}
32
33struct SingleTransactionInner {
34	store: RwLock<SingleStore>,
35	event_bus: EventBus,
36	key_locks: SkipMap<EncodedKey, Arc<RwLock<()>>>,
37}
38
39impl SingleTransactionInner {
40	fn get_or_create_lock(&self, key: &EncodedKey) -> Arc<RwLock<()>> {
41		// Check if lock exists
42		if let Some(entry) = self.key_locks.get(key) {
43			return entry.value().clone();
44		}
45
46		// Create new lock
47		let lock = Arc::new(RwLock::new(()));
48		self.key_locks.insert(key.clone(), lock.clone());
49		lock
50	}
51}
52
53impl SingleTransaction {
54	pub fn new(store: SingleStore, event_bus: EventBus) -> Self {
55		Self {
56			inner: Arc::new(SingleTransactionInner {
57				store: RwLock::new(store),
58				event_bus,
59				key_locks: SkipMap::new(),
60			}),
61		}
62	}
63
64	pub fn testing() -> Self {
65		let pools = Pools::new(PoolConfig::default());
66		let actor_system = ActorSystem::new(pools, Clock::Real);
67		Self::new(SingleStore::testing_memory(), EventBus::new(&actor_system))
68	}
69
70	/// Helper for single-version queries.
71	pub fn with_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
72	where
73		I: IntoIterator<Item = &'a EncodedKey>,
74		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R>,
75	{
76		let mut tx = self.begin_query(keys)?;
77		f(&mut tx)
78	}
79
80	/// Helper for single-version commands.
81	pub fn with_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
82	where
83		I: IntoIterator<Item = &'a EncodedKey>,
84		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R>,
85	{
86		let mut tx = self.begin_command(keys)?;
87		let result = f(&mut tx)?;
88		tx.commit()?;
89		Ok(result)
90	}
91
92	pub fn begin_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
93	where
94		I: IntoIterator<Item = &'a EncodedKey>,
95	{
96		let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
97		assert!(
98			!keys_vec.is_empty(),
99			"SVL transactions must declare keys upfront - empty keysets are not allowed"
100		);
101
102		// Sort keys to establish consistent lock ordering and prevent deadlocks
103		keys_vec.sort();
104
105		// Acquire read locks on all keys in sorted order
106		let mut locks = Vec::new();
107		for key in &keys_vec {
108			let arc = self.inner.get_or_create_lock(key);
109			locks.push(KeyReadLock::new(arc));
110		}
111
112		Ok(SingleReadTransaction {
113			inner: &self.inner,
114			keys: keys_vec,
115			_key_locks: locks,
116		})
117	}
118
119	pub fn begin_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
120	where
121		I: IntoIterator<Item = &'a EncodedKey>,
122	{
123		let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
124		assert!(
125			!keys_vec.is_empty(),
126			"SVL transactions must declare keys upfront - empty keysets are not allowed"
127		);
128
129		// Sort keys to establish consistent lock ordering and prevent deadlocks
130		keys_vec.sort();
131
132		// Acquire write locks on all keys in sorted order
133		let mut locks = Vec::new();
134		for key in &keys_vec {
135			let arc = self.inner.get_or_create_lock(key);
136			locks.push(KeyWriteLock::new(arc));
137		}
138
139		Ok(SingleWriteTransaction::new(&self.inner, keys_vec, locks))
140	}
141}
142
143impl WithEventBus for SingleTransaction {
144	fn event_bus(&self) -> &EventBus {
145		&self.inner.event_bus
146	}
147}
148
149#[cfg(test)]
150pub mod tests {
151	use std::{
152		iter,
153		sync::{Arc, Barrier},
154		thread,
155		time::Duration,
156	};
157
158	use reifydb_type::util::cowvec::CowVec;
159
160	use super::*;
161
162	fn make_key(s: &str) -> EncodedKey {
163		EncodedKey(CowVec::new(s.as_bytes().to_vec()))
164	}
165
166	fn make_value(s: &str) -> EncodedRow {
167		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
168	}
169
170	fn create_test_svl() -> SingleTransaction {
171		SingleTransaction::testing()
172	}
173
174	#[test]
175	fn test_allowed_key_query() {
176		let svl = create_test_svl();
177		let key = make_key("test_key");
178
179		// Start scoped query with the key
180		let mut tx = svl.begin_query(vec![&key]).unwrap();
181
182		// Should be able to get the key
183		let result = tx.get(&key);
184		assert!(result.is_ok());
185	}
186
187	#[test]
188	fn test_disallowed_key_query() {
189		let svl = create_test_svl();
190		let key1 = make_key("allowed");
191		let key2 = make_key("disallowed");
192
193		// Start scoped query with only key1
194		let mut tx = svl.begin_query(vec![&key1]).unwrap();
195
196		// Should succeed for key1
197		assert!(tx.get(&key1).is_ok());
198
199		// Should fail for key2
200		let result = tx.get(&key2);
201		assert!(result.is_err());
202		let err = result.unwrap_err();
203		assert_eq!(err.0.code, "TXN_010");
204	}
205
206	#[test]
207	#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
208	fn test_empty_keyset_query_panics() {
209		let svl = create_test_svl();
210
211		// Should panic when trying to create transaction with empty keys
212		let _tx = svl.begin_query(iter::empty());
213	}
214
215	#[test]
216	#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
217	fn test_empty_keyset_command_panics() {
218		let svl = create_test_svl();
219
220		// Should panic when trying to create transaction with empty keys
221		let _tx = svl.begin_command(iter::empty());
222	}
223
224	#[test]
225	fn test_allowed_key_command() {
226		let svl = create_test_svl();
227		let key = make_key("test_key");
228		let value = make_value("test_value");
229
230		// Start scoped command with the key
231		let mut tx = svl.begin_command(vec![&key]).unwrap();
232
233		// Should be able to set and get the key
234		assert!(tx.set(&key, value.clone()).is_ok());
235		assert!(tx.get(&key).is_ok());
236		assert!(tx.commit().is_ok());
237	}
238
239	#[test]
240	fn test_disallowed_key_command() {
241		let svl = create_test_svl();
242		let key1 = make_key("allowed");
243		let key2 = make_key("disallowed");
244		let value = make_value("test_value");
245
246		// Start scoped command with only key1
247		let mut tx = svl.begin_command(vec![&key1]).unwrap();
248
249		// Should succeed for key1
250		assert!(tx.set(&key1, value.clone()).is_ok());
251
252		// Should fail for key2
253		let result = tx.set(&key2, value);
254		assert!(result.is_err());
255		let err = result.unwrap_err();
256		assert_eq!(err.0.code, "TXN_010");
257	}
258
259	#[test]
260	fn test_command_commit_with_valid_keys() {
261		let svl = create_test_svl();
262		let key1 = make_key("key1");
263		let key2 = make_key("key2");
264		let value1 = make_value("value1");
265		let value2 = make_value("value2");
266
267		// Write with scoped transaction
268		{
269			let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
270			tx.set(&key1, value1.clone()).unwrap();
271			tx.set(&key2, value2.clone()).unwrap();
272			tx.commit().unwrap();
273		}
274
275		// Verify with query
276		{
277			let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
278			let result1 = tx.get(&key1).unwrap();
279			let result2 = tx.get(&key2).unwrap();
280			assert!(result1.is_some());
281			assert!(result2.is_some());
282			assert_eq!(result1.unwrap().row, value1);
283			assert_eq!(result2.unwrap().row, value2);
284		}
285	}
286
287	#[test]
288	fn test_rollback_with_scoped_keys() {
289		let svl = create_test_svl();
290		let key = make_key("test_key");
291		let value = make_value("test_value");
292
293		// Start transaction and rollback
294		{
295			let mut tx = svl.begin_command(vec![&key]).unwrap();
296			tx.set(&key, value).unwrap();
297			tx.rollback().unwrap();
298		}
299
300		// Verify nothing was committed
301		{
302			let mut tx = svl.begin_query(vec![&key]).unwrap();
303			let result = tx.get(&key).unwrap();
304			assert!(result.is_none());
305		}
306	}
307
308	#[test]
309	fn test_concurrent_reads() {
310		let svl = Arc::new(create_test_svl());
311		let key = make_key("shared_key");
312		let value = make_value("shared_value");
313
314		// Write initial value
315		{
316			let mut tx = svl.begin_command(vec![&key]).unwrap();
317			tx.set(&key, value.clone()).unwrap();
318			tx.commit().unwrap();
319		}
320
321		// Spawn multiple readers
322		let mut handles = vec![];
323		for _ in 0..5 {
324			let svl_clone = Arc::clone(&svl);
325			let key_clone = key.clone();
326			let value_clone = value.clone();
327
328			let handle = thread::spawn(move || {
329				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
330				let result = tx.get(&key_clone).unwrap();
331				assert!(result.is_some());
332				assert_eq!(result.unwrap().row, value_clone);
333			});
334			handles.push(handle);
335		}
336
337		// Wait for all threads
338		for handle in handles {
339			handle.join().unwrap();
340		}
341	}
342
343	#[test]
344	fn test_concurrent_writers_disjoint_keys() {
345		let svl = Arc::new(create_test_svl());
346
347		// Spawn multiple writers with disjoint keys
348		let mut handles = vec![];
349		for i in 0..5 {
350			let svl_clone = Arc::clone(&svl);
351			let key = make_key(&format!("key_{}", i));
352			let value = make_value(&format!("value_{}", i));
353
354			let handle = thread::spawn(move || {
355				let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
356				tx.set(&key, value).unwrap();
357				tx.commit().unwrap();
358			});
359			handles.push(handle);
360		}
361
362		// Wait for all threads
363		for handle in handles {
364			handle.join().unwrap();
365		}
366
367		// Verify all values were written
368		for i in 0..5 {
369			let key = make_key(&format!("key_{}", i));
370			let expected_value = make_value(&format!("value_{}", i));
371
372			let mut tx = svl.begin_query(vec![&key]).unwrap();
373			let result = tx.get(&key).unwrap();
374			assert!(result.is_some());
375			assert_eq!(result.unwrap().row, expected_value);
376		}
377	}
378
379	#[test]
380	fn test_concurrent_readers_and_writer() {
381		let svl = Arc::new(create_test_svl());
382		let key1 = make_key("key1");
383		let key2 = make_key("key2");
384		let value1 = make_value("value1");
385		let value2 = make_value("value2");
386
387		// Write initial values
388		{
389			let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
390			tx.set(&key1, value1.clone()).unwrap();
391			tx.set(&key2, value2.clone()).unwrap();
392			tx.commit().unwrap();
393		}
394
395		// Spawn readers for key1
396		let mut handles = vec![];
397		for _ in 0..3 {
398			let svl_clone = Arc::clone(&svl);
399			let key_clone = key1.clone();
400			let value_clone = value1.clone();
401
402			let handle = thread::spawn(move || {
403				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
404				let result = tx.get(&key_clone).unwrap();
405				assert!(result.is_some());
406				assert_eq!(result.unwrap().row, value_clone);
407			});
408			handles.push(handle);
409		}
410
411		// Spawn a writer for key2 (different key, should not block readers)
412		let svl_clone = Arc::clone(&svl);
413		let new_value = make_value("new_value2");
414		let handle = thread::spawn(move || {
415			let mut tx = svl_clone.begin_command(vec![&key2]).unwrap();
416			tx.set(&key2, new_value).unwrap();
417			tx.commit().unwrap();
418		});
419		handles.push(handle);
420
421		// Wait for all threads
422		for handle in handles {
423			handle.join().unwrap();
424		}
425	}
426
427	#[test]
428	fn test_no_panics_with_rwlock() {
429		let svl = Arc::new(create_test_svl());
430
431		// Mix of operations across multiple threads
432		let mut handles = vec![];
433		for i in 0..10 {
434			let svl_clone = Arc::clone(&svl);
435			let key = make_key(&format!("key_{}", i % 3)); // Some key overlap
436			let value = make_value(&format!("value_{}", i));
437
438			let handle = thread::spawn(move || {
439				// Alternate between reads and writes
440				if i % 2 == 0 {
441					let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
442					let _ = tx.set(&key, value);
443					let _ = tx.commit();
444				} else {
445					let mut tx = svl_clone.begin_query(vec![&key]).unwrap();
446					let _ = tx.get(&key);
447				}
448			});
449			handles.push(handle);
450		}
451
452		// Wait for all threads - should not panic
453		for handle in handles {
454			handle.join().unwrap();
455		}
456	}
457
458	#[test]
459	fn test_write_blocks_concurrent_write() {
460		let svl = Arc::new(create_test_svl());
461		let key = make_key("blocking_key");
462		let barrier = Arc::new(Barrier::new(2));
463
464		// Thread 1: Hold write lock on key
465		let svl1 = Arc::clone(&svl);
466		let key1 = key.clone();
467		let barrier1 = Arc::clone(&barrier);
468		let handle1 = thread::spawn(move || {
469			let mut tx = svl1.begin_command(vec![&key1]).unwrap();
470			tx.set(&key1, make_value("value1")).unwrap();
471
472			// Signal that we have the lock
473			barrier1.wait();
474
475			// Hold the transaction (and locks) for a bit
476			thread::sleep(Duration::from_millis(100));
477
478			tx.commit().unwrap();
479		});
480
481		// Thread 2: Try to acquire write lock on same key (should block)
482		let svl2 = Arc::clone(&svl);
483		let key2 = key.clone();
484		let barrier2 = Arc::clone(&barrier);
485		let handle2 = thread::spawn(move || {
486			// Wait for thread 1 to acquire its lock
487			barrier2.wait();
488
489			// Small delay to ensure thread 1 is holding the lock
490			thread::sleep(Duration::from_millis(10));
491
492			// This should block until thread 1 commits
493			let mut tx = svl2.begin_command(vec![&key2]).unwrap();
494			tx.set(&key2, make_value("value2")).unwrap();
495			tx.commit().unwrap();
496		});
497
498		handle1.join().unwrap();
499		handle2.join().unwrap();
500
501		// Verify final value is from thread 2
502		let mut tx = svl.begin_query(vec![&key]).unwrap();
503		let result = tx.get(&key).unwrap();
504		assert!(result.is_some());
505		assert_eq!(result.unwrap().row, make_value("value2"));
506	}
507
508	#[test]
509	fn test_write_blocks_concurrent_read() {
510		let svl = Arc::new(create_test_svl());
511		let key = make_key("blocking_key");
512
513		// Write initial value
514		{
515			let mut tx = svl.begin_command(vec![&key]).unwrap();
516			tx.set(&key, make_value("initial")).unwrap();
517			tx.commit().unwrap();
518		}
519
520		let barrier = Arc::new(Barrier::new(2));
521
522		// Thread 1: Hold write lock
523		let svl1 = Arc::clone(&svl);
524		let key1 = key.clone();
525		let barrier1 = Arc::clone(&barrier);
526		let handle1 = thread::spawn(move || {
527			let mut tx = svl1.begin_command(vec![&key1]).unwrap();
528			tx.set(&key1, make_value("updated")).unwrap();
529
530			// Signal that we have the lock
531			barrier1.wait();
532
533			// Hold the transaction for a bit
534			thread::sleep(Duration::from_millis(100));
535
536			tx.commit().unwrap();
537		});
538
539		// Thread 2: Try to read (should block until write commits)
540		let svl2 = Arc::clone(&svl);
541		let key2 = key.clone();
542		let barrier2 = Arc::clone(&barrier);
543		let handle2 = thread::spawn(move || {
544			// Wait for thread 1 to acquire its lock
545			barrier2.wait();
546
547			// Small delay to ensure thread 1 is holding the lock
548			thread::sleep(Duration::from_millis(10));
549
550			// This should block until thread 1 commits
551			let mut tx = svl2.begin_query(vec![&key2]).unwrap();
552			let result = tx.get(&key2).unwrap();
553
554			// Should see the updated value after blocking
555			assert!(result.is_some());
556			assert_eq!(result.unwrap().row, make_value("updated"));
557		});
558
559		handle1.join().unwrap();
560		handle2.join().unwrap();
561	}
562
563	#[test]
564	fn test_concurrent_reads_allowed() {
565		let svl = Arc::new(create_test_svl());
566		let key = make_key("shared_read_key");
567
568		// Write initial value
569		{
570			let mut tx = svl.begin_command(vec![&key]).unwrap();
571			tx.set(&key, make_value("shared")).unwrap();
572			tx.commit().unwrap();
573		}
574
575		let barrier = Arc::new(Barrier::new(3));
576		let mut handles = vec![];
577
578		// Spawn 3 concurrent readers
579		for _ in 0..3 {
580			let svl_clone = Arc::clone(&svl);
581			let key_clone = key.clone();
582			let barrier_clone = Arc::clone(&barrier);
583
584			let handle = thread::spawn(move || {
585				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
586
587				// Wait for all readers to start
588				barrier_clone.wait();
589
590				// All should be able to read concurrently
591				let result = tx.get(&key_clone).unwrap();
592				assert!(result.is_some());
593				assert_eq!(result.unwrap().row, make_value("shared"));
594
595				// Hold for a bit to ensure overlap
596				thread::sleep(Duration::from_millis(50));
597			});
598			handles.push(handle);
599		}
600
601		for handle in handles {
602			handle.join().unwrap();
603		}
604	}
605
606	#[test]
607	fn test_overlapping_keys_different_order() {
608		let svl = Arc::new(create_test_svl());
609		let key1 = make_key("deadlock_key1");
610		let key2 = make_key("deadlock_key2");
611		let barrier = Arc::new(Barrier::new(2));
612
613		// Thread 1: locks [key1, key2]
614		let svl1 = Arc::clone(&svl);
615		let key1_clone = key1.clone();
616		let key2_clone = key2.clone();
617		let barrier1 = Arc::clone(&barrier);
618		let handle1 = thread::spawn(move || {
619			barrier1.wait();
620			let mut tx = svl1.begin_command(vec![&key1_clone, &key2_clone]).unwrap();
621			tx.set(&key1_clone, make_value("from_thread1")).unwrap();
622			thread::sleep(Duration::from_millis(10)); // Hold locks briefly
623			tx.commit().unwrap();
624		});
625
626		// Thread 2: locks [key2, key1] - REVERSED ORDER
627		// With sorted locking, this should not deadlock
628		let svl2 = Arc::clone(&svl);
629		let key1_clone2 = key1.clone();
630		let key2_clone2 = key2.clone();
631		let barrier2 = Arc::clone(&barrier);
632		let handle2 = thread::spawn(move || {
633			barrier2.wait();
634			let mut tx = svl2.begin_command(vec![&key2_clone2, &key1_clone2]).unwrap();
635			tx.set(&key2_clone2, make_value("from_thread2")).unwrap();
636			thread::sleep(Duration::from_millis(10)); // Hold locks briefly
637			tx.commit().unwrap();
638		});
639
640		// Both threads should complete without deadlock
641		handle1.join().unwrap();
642		handle2.join().unwrap();
643
644		// Verify both commits succeeded
645		let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
646		let result1 = tx.get(&key1).unwrap();
647		let result2 = tx.get(&key2).unwrap();
648		assert!(result1.is_some());
649		assert!(result2.is_some());
650	}
651
652	#[test]
653	fn test_circular_dependency_three_transactions() {
654		let svl = Arc::new(create_test_svl());
655		let key1 = make_key("circular_key1");
656		let key2 = make_key("circular_key2");
657		let key3 = make_key("circular_key3");
658		let barrier = Arc::new(Barrier::new(3));
659
660		// Thread 1: locks [key1, key2]
661		let svl1 = Arc::clone(&svl);
662		let k1_1 = key1.clone();
663		let k2_1 = key2.clone();
664		let barrier1 = Arc::clone(&barrier);
665		let handle1 = thread::spawn(move || {
666			barrier1.wait();
667			let mut tx = svl1.begin_command(vec![&k1_1, &k2_1]).unwrap();
668			tx.set(&k1_1, make_value("t1")).unwrap();
669			thread::sleep(Duration::from_millis(10));
670			tx.commit().unwrap();
671		});
672
673		// Thread 2: locks [key2, key3]
674		let svl2 = Arc::clone(&svl);
675		let k2_2 = key2.clone();
676		let k3_2 = key3.clone();
677		let barrier2 = Arc::clone(&barrier);
678		let handle2 = thread::spawn(move || {
679			barrier2.wait();
680			let mut tx = svl2.begin_command(vec![&k2_2, &k3_2]).unwrap();
681			tx.set(&k2_2, make_value("t2")).unwrap();
682			thread::sleep(Duration::from_millis(10));
683			tx.commit().unwrap();
684		});
685
686		// Thread 3: locks [key3, key1] - completes the potential cycle
687		// With sorted locking, this should not create a circular dependency
688		let svl3 = Arc::clone(&svl);
689		let barrier3 = Arc::clone(&barrier);
690		let handle3 = thread::spawn(move || {
691			barrier3.wait();
692			let mut tx = svl3.begin_command(vec![&key3, &key1]).unwrap();
693			tx.set(&key3, make_value("t3")).unwrap();
694			thread::sleep(Duration::from_millis(10));
695			tx.commit().unwrap();
696		});
697
698		// All threads should complete without circular deadlock
699		handle1.join().unwrap();
700		handle2.join().unwrap();
701		handle3.join().unwrap();
702	}
703
704	#[test]
705	fn test_locks_released_on_drop() {
706		let svl = Arc::new(create_test_svl());
707		let key = make_key("drop_test_key");
708
709		// Thread 1: Acquire lock and drop without commit
710		let svl1 = Arc::clone(&svl);
711		let key_clone = key.clone();
712		let handle1 = thread::spawn(move || {
713			let mut tx = svl1.begin_command(vec![&key_clone]).unwrap();
714			tx.set(&key_clone, make_value("dropped")).unwrap();
715			// Transaction dropped here without commit
716		});
717
718		handle1.join().unwrap();
719
720		// Small delay to ensure drop completes
721		thread::sleep(Duration::from_millis(10));
722
723		// Thread 2: Should be able to acquire the lock immediately
724		// If locks weren't released on drop, this would block indefinitely
725		let svl2 = Arc::clone(&svl);
726		let key_clone2 = key.clone();
727		let handle2 = thread::spawn(move || {
728			let mut tx = svl2.begin_command(vec![&key_clone2]).unwrap();
729			tx.set(&key_clone2, make_value("success")).unwrap();
730			tx.commit().unwrap();
731		});
732
733		// This should complete quickly if locks are released properly
734		handle2.join().unwrap();
735
736		// Verify the second transaction succeeded
737		let mut tx = svl.begin_query(vec![&key]).unwrap();
738		let result = tx.get(&key).unwrap();
739		assert!(result.is_some());
740		assert_eq!(result.unwrap().row, make_value("success"));
741	}
742}