Skip to main content

reifydb_transaction/single/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use crossbeam_skiplist::SkipMap;
7use reifydb_core::{
8	delta::Delta,
9	encoded::{encoded::EncodedValues, key::EncodedKey},
10	event::EventBus,
11	interface::WithEventBus,
12};
13use reifydb_runtime::sync::rwlock::RwLock;
14use reifydb_store_single::SingleStore;
15
16pub mod read;
17pub mod write;
18
19use read::{KeyReadLock, SingleReadTransaction};
20use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
21use reifydb_type::Result;
22use write::{KeyWriteLock, SingleWriteTransaction};
23
24#[derive(Clone)]
25pub struct SingleTransaction {
26	inner: Arc<SingleTransactionInner>,
27}
28
29struct SingleTransactionInner {
30	store: RwLock<SingleStore>,
31	event_bus: EventBus,
32	key_locks: SkipMap<EncodedKey, Arc<RwLock<()>>>,
33}
34
35impl SingleTransactionInner {
36	fn get_or_create_lock(&self, key: &EncodedKey) -> Arc<RwLock<()>> {
37		// Check if lock exists
38		if let Some(entry) = self.key_locks.get(key) {
39			return entry.value().clone();
40		}
41
42		// Create new lock
43		let lock = Arc::new(RwLock::new(()));
44		self.key_locks.insert(key.clone(), lock.clone());
45		lock
46	}
47}
48
49impl SingleTransaction {
50	pub fn new(store: SingleStore, event_bus: EventBus) -> Self {
51		Self {
52			inner: Arc::new(SingleTransactionInner {
53				store: RwLock::new(store),
54				event_bus,
55				key_locks: SkipMap::new(),
56			}),
57		}
58	}
59
60	pub fn testing() -> Self {
61		let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
62		Self::new(SingleStore::testing_memory(), EventBus::new(&actor_system))
63	}
64
65	/// Helper for single-version queries.
66	pub fn with_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
67	where
68		I: IntoIterator<Item = &'a EncodedKey>,
69		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R>,
70	{
71		let mut tx = self.begin_query(keys)?;
72		f(&mut tx)
73	}
74
75	/// Helper for single-version commands.
76	pub fn with_command<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
77	where
78		I: IntoIterator<Item = &'a EncodedKey>,
79		F: FnOnce(&mut SingleWriteTransaction<'_>) -> Result<R>,
80	{
81		let mut tx = self.begin_command(keys)?;
82		let result = f(&mut tx)?;
83		tx.commit()?;
84		Ok(result)
85	}
86
87	pub fn begin_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
88	where
89		I: IntoIterator<Item = &'a EncodedKey>,
90	{
91		let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
92		assert!(
93			!keys_vec.is_empty(),
94			"SVL transactions must declare keys upfront - empty keysets are not allowed"
95		);
96
97		// Sort keys to establish consistent lock ordering and prevent deadlocks
98		keys_vec.sort();
99
100		// Acquire read locks on all keys in sorted order
101		let mut locks = Vec::new();
102		for key in &keys_vec {
103			let arc = self.inner.get_or_create_lock(key);
104			locks.push(KeyReadLock::new(arc));
105		}
106
107		Ok(SingleReadTransaction {
108			inner: &self.inner,
109			keys: keys_vec,
110			_key_locks: locks,
111		})
112	}
113
114	pub fn begin_command<'a, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
115	where
116		I: IntoIterator<Item = &'a EncodedKey>,
117	{
118		let mut keys_vec: Vec<EncodedKey> = keys.into_iter().cloned().collect();
119		assert!(
120			!keys_vec.is_empty(),
121			"SVL transactions must declare keys upfront - empty keysets are not allowed"
122		);
123
124		// Sort keys to establish consistent lock ordering and prevent deadlocks
125		keys_vec.sort();
126
127		// Acquire write locks on all keys in sorted order
128		let mut locks = Vec::new();
129		for key in &keys_vec {
130			let arc = self.inner.get_or_create_lock(key);
131			locks.push(KeyWriteLock::new(arc));
132		}
133
134		Ok(SingleWriteTransaction::new(&self.inner, keys_vec, locks))
135	}
136}
137
138impl WithEventBus for SingleTransaction {
139	fn event_bus(&self) -> &EventBus {
140		&self.inner.event_bus
141	}
142}
143
144#[cfg(test)]
145pub mod tests {
146	use std::{
147		iter,
148		sync::{Arc, Barrier},
149		thread,
150		time::Duration,
151	};
152
153	use reifydb_type::util::cowvec::CowVec;
154
155	use super::*;
156
157	fn make_key(s: &str) -> EncodedKey {
158		EncodedKey(CowVec::new(s.as_bytes().to_vec()))
159	}
160
161	fn make_value(s: &str) -> EncodedValues {
162		EncodedValues(CowVec::new(s.as_bytes().to_vec()))
163	}
164
165	fn create_test_svl() -> SingleTransaction {
166		SingleTransaction::testing()
167	}
168
169	#[test]
170	fn test_allowed_key_query() {
171		let svl = create_test_svl();
172		let key = make_key("test_key");
173
174		// Start scoped query with the key
175		let mut tx = svl.begin_query(vec![&key]).unwrap();
176
177		// Should be able to get the key
178		let result = tx.get(&key);
179		assert!(result.is_ok());
180	}
181
182	#[test]
183	fn test_disallowed_key_query() {
184		let svl = create_test_svl();
185		let key1 = make_key("allowed");
186		let key2 = make_key("disallowed");
187
188		// Start scoped query with only key1
189		let mut tx = svl.begin_query(vec![&key1]).unwrap();
190
191		// Should succeed for key1
192		assert!(tx.get(&key1).is_ok());
193
194		// Should fail for key2
195		let result = tx.get(&key2);
196		assert!(result.is_err());
197		let err = result.unwrap_err();
198		assert_eq!(err.0.code, "TXN_010");
199	}
200
201	#[test]
202	#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
203	fn test_empty_keyset_query_panics() {
204		let svl = create_test_svl();
205
206		// Should panic when trying to create transaction with empty keys
207		let _tx = svl.begin_query(iter::empty());
208	}
209
210	#[test]
211	#[should_panic(expected = "SVL transactions must declare keys upfront - empty keysets are not allowed")]
212	fn test_empty_keyset_command_panics() {
213		let svl = create_test_svl();
214
215		// Should panic when trying to create transaction with empty keys
216		let _tx = svl.begin_command(iter::empty());
217	}
218
219	#[test]
220	fn test_allowed_key_command() {
221		let svl = create_test_svl();
222		let key = make_key("test_key");
223		let value = make_value("test_value");
224
225		// Start scoped command with the key
226		let mut tx = svl.begin_command(vec![&key]).unwrap();
227
228		// Should be able to set and get the key
229		assert!(tx.set(&key, value.clone()).is_ok());
230		assert!(tx.get(&key).is_ok());
231		assert!(tx.commit().is_ok());
232	}
233
234	#[test]
235	fn test_disallowed_key_command() {
236		let svl = create_test_svl();
237		let key1 = make_key("allowed");
238		let key2 = make_key("disallowed");
239		let value = make_value("test_value");
240
241		// Start scoped command with only key1
242		let mut tx = svl.begin_command(vec![&key1]).unwrap();
243
244		// Should succeed for key1
245		assert!(tx.set(&key1, value.clone()).is_ok());
246
247		// Should fail for key2
248		let result = tx.set(&key2, value);
249		assert!(result.is_err());
250		let err = result.unwrap_err();
251		assert_eq!(err.0.code, "TXN_010");
252	}
253
254	#[test]
255	fn test_command_commit_with_valid_keys() {
256		let svl = create_test_svl();
257		let key1 = make_key("key1");
258		let key2 = make_key("key2");
259		let value1 = make_value("value1");
260		let value2 = make_value("value2");
261
262		// Write with scoped transaction
263		{
264			let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
265			tx.set(&key1, value1.clone()).unwrap();
266			tx.set(&key2, value2.clone()).unwrap();
267			tx.commit().unwrap();
268		}
269
270		// Verify with query
271		{
272			let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
273			let result1 = tx.get(&key1).unwrap();
274			let result2 = tx.get(&key2).unwrap();
275			assert!(result1.is_some());
276			assert!(result2.is_some());
277			assert_eq!(result1.unwrap().values, value1);
278			assert_eq!(result2.unwrap().values, value2);
279		}
280	}
281
282	#[test]
283	fn test_rollback_with_scoped_keys() {
284		let svl = create_test_svl();
285		let key = make_key("test_key");
286		let value = make_value("test_value");
287
288		// Start transaction and rollback
289		{
290			let mut tx = svl.begin_command(vec![&key]).unwrap();
291			tx.set(&key, value).unwrap();
292			tx.rollback().unwrap();
293		}
294
295		// Verify nothing was committed
296		{
297			let mut tx = svl.begin_query(vec![&key]).unwrap();
298			let result = tx.get(&key).unwrap();
299			assert!(result.is_none());
300		}
301	}
302
303	#[test]
304	fn test_concurrent_reads() {
305		let svl = Arc::new(create_test_svl());
306		let key = make_key("shared_key");
307		let value = make_value("shared_value");
308
309		// Write initial value
310		{
311			let mut tx = svl.begin_command(vec![&key]).unwrap();
312			tx.set(&key, value.clone()).unwrap();
313			tx.commit().unwrap();
314		}
315
316		// Spawn multiple readers
317		let mut handles = vec![];
318		for _ in 0..5 {
319			let svl_clone = Arc::clone(&svl);
320			let key_clone = key.clone();
321			let value_clone = value.clone();
322
323			let handle = thread::spawn(move || {
324				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
325				let result = tx.get(&key_clone).unwrap();
326				assert!(result.is_some());
327				assert_eq!(result.unwrap().values, value_clone);
328			});
329			handles.push(handle);
330		}
331
332		// Wait for all threads
333		for handle in handles {
334			handle.join().unwrap();
335		}
336	}
337
338	#[test]
339	fn test_concurrent_writers_disjoint_keys() {
340		let svl = Arc::new(create_test_svl());
341
342		// Spawn multiple writers with disjoint keys
343		let mut handles = vec![];
344		for i in 0..5 {
345			let svl_clone = Arc::clone(&svl);
346			let key = make_key(&format!("key_{}", i));
347			let value = make_value(&format!("value_{}", i));
348
349			let handle = thread::spawn(move || {
350				let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
351				tx.set(&key, value).unwrap();
352				tx.commit().unwrap();
353			});
354			handles.push(handle);
355		}
356
357		// Wait for all threads
358		for handle in handles {
359			handle.join().unwrap();
360		}
361
362		// Verify all values were written
363		for i in 0..5 {
364			let key = make_key(&format!("key_{}", i));
365			let expected_value = make_value(&format!("value_{}", i));
366
367			let mut tx = svl.begin_query(vec![&key]).unwrap();
368			let result = tx.get(&key).unwrap();
369			assert!(result.is_some());
370			assert_eq!(result.unwrap().values, expected_value);
371		}
372	}
373
374	#[test]
375	fn test_concurrent_readers_and_writer() {
376		let svl = Arc::new(create_test_svl());
377		let key1 = make_key("key1");
378		let key2 = make_key("key2");
379		let value1 = make_value("value1");
380		let value2 = make_value("value2");
381
382		// Write initial values
383		{
384			let mut tx = svl.begin_command(vec![&key1, &key2]).unwrap();
385			tx.set(&key1, value1.clone()).unwrap();
386			tx.set(&key2, value2.clone()).unwrap();
387			tx.commit().unwrap();
388		}
389
390		// Spawn readers for key1
391		let mut handles = vec![];
392		for _ in 0..3 {
393			let svl_clone = Arc::clone(&svl);
394			let key_clone = key1.clone();
395			let value_clone = value1.clone();
396
397			let handle = thread::spawn(move || {
398				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
399				let result = tx.get(&key_clone).unwrap();
400				assert!(result.is_some());
401				assert_eq!(result.unwrap().values, value_clone);
402			});
403			handles.push(handle);
404		}
405
406		// Spawn a writer for key2 (different key, should not block readers)
407		let svl_clone = Arc::clone(&svl);
408		let new_value = make_value("new_value2");
409		let handle = thread::spawn(move || {
410			let mut tx = svl_clone.begin_command(vec![&key2]).unwrap();
411			tx.set(&key2, new_value).unwrap();
412			tx.commit().unwrap();
413		});
414		handles.push(handle);
415
416		// Wait for all threads
417		for handle in handles {
418			handle.join().unwrap();
419		}
420	}
421
422	#[test]
423	fn test_no_panics_with_rwlock() {
424		let svl = Arc::new(create_test_svl());
425
426		// Mix of operations across multiple threads
427		let mut handles = vec![];
428		for i in 0..10 {
429			let svl_clone = Arc::clone(&svl);
430			let key = make_key(&format!("key_{}", i % 3)); // Some key overlap
431			let value = make_value(&format!("value_{}", i));
432
433			let handle = thread::spawn(move || {
434				// Alternate between reads and writes
435				if i % 2 == 0 {
436					let mut tx = svl_clone.begin_command(vec![&key]).unwrap();
437					let _ = tx.set(&key, value);
438					let _ = tx.commit();
439				} else {
440					let mut tx = svl_clone.begin_query(vec![&key]).unwrap();
441					let _ = tx.get(&key);
442				}
443			});
444			handles.push(handle);
445		}
446
447		// Wait for all threads - should not panic
448		for handle in handles {
449			handle.join().unwrap();
450		}
451	}
452
453	#[test]
454	fn test_write_blocks_concurrent_write() {
455		let svl = Arc::new(create_test_svl());
456		let key = make_key("blocking_key");
457		let barrier = Arc::new(Barrier::new(2));
458
459		// Thread 1: Hold write lock on key
460		let svl1 = Arc::clone(&svl);
461		let key1 = key.clone();
462		let barrier1 = Arc::clone(&barrier);
463		let handle1 = thread::spawn(move || {
464			let mut tx = svl1.begin_command(vec![&key1]).unwrap();
465			tx.set(&key1, make_value("value1")).unwrap();
466
467			// Signal that we have the lock
468			barrier1.wait();
469
470			// Hold the transaction (and locks) for a bit
471			thread::sleep(Duration::from_millis(100));
472
473			tx.commit().unwrap();
474		});
475
476		// Thread 2: Try to acquire write lock on same key (should block)
477		let svl2 = Arc::clone(&svl);
478		let key2 = key.clone();
479		let barrier2 = Arc::clone(&barrier);
480		let handle2 = thread::spawn(move || {
481			// Wait for thread 1 to acquire its lock
482			barrier2.wait();
483
484			// Small delay to ensure thread 1 is holding the lock
485			thread::sleep(Duration::from_millis(10));
486
487			// This should block until thread 1 commits
488			let mut tx = svl2.begin_command(vec![&key2]).unwrap();
489			tx.set(&key2, make_value("value2")).unwrap();
490			tx.commit().unwrap();
491		});
492
493		handle1.join().unwrap();
494		handle2.join().unwrap();
495
496		// Verify final value is from thread 2
497		let mut tx = svl.begin_query(vec![&key]).unwrap();
498		let result = tx.get(&key).unwrap();
499		assert!(result.is_some());
500		assert_eq!(result.unwrap().values, make_value("value2"));
501	}
502
503	#[test]
504	fn test_write_blocks_concurrent_read() {
505		let svl = Arc::new(create_test_svl());
506		let key = make_key("blocking_key");
507
508		// Write initial value
509		{
510			let mut tx = svl.begin_command(vec![&key]).unwrap();
511			tx.set(&key, make_value("initial")).unwrap();
512			tx.commit().unwrap();
513		}
514
515		let barrier = Arc::new(Barrier::new(2));
516
517		// Thread 1: Hold write lock
518		let svl1 = Arc::clone(&svl);
519		let key1 = key.clone();
520		let barrier1 = Arc::clone(&barrier);
521		let handle1 = thread::spawn(move || {
522			let mut tx = svl1.begin_command(vec![&key1]).unwrap();
523			tx.set(&key1, make_value("updated")).unwrap();
524
525			// Signal that we have the lock
526			barrier1.wait();
527
528			// Hold the transaction for a bit
529			thread::sleep(Duration::from_millis(100));
530
531			tx.commit().unwrap();
532		});
533
534		// Thread 2: Try to read (should block until write commits)
535		let svl2 = Arc::clone(&svl);
536		let key2 = key.clone();
537		let barrier2 = Arc::clone(&barrier);
538		let handle2 = thread::spawn(move || {
539			// Wait for thread 1 to acquire its lock
540			barrier2.wait();
541
542			// Small delay to ensure thread 1 is holding the lock
543			thread::sleep(Duration::from_millis(10));
544
545			// This should block until thread 1 commits
546			let mut tx = svl2.begin_query(vec![&key2]).unwrap();
547			let result = tx.get(&key2).unwrap();
548
549			// Should see the updated value after blocking
550			assert!(result.is_some());
551			assert_eq!(result.unwrap().values, make_value("updated"));
552		});
553
554		handle1.join().unwrap();
555		handle2.join().unwrap();
556	}
557
558	#[test]
559	fn test_concurrent_reads_allowed() {
560		let svl = Arc::new(create_test_svl());
561		let key = make_key("shared_read_key");
562
563		// Write initial value
564		{
565			let mut tx = svl.begin_command(vec![&key]).unwrap();
566			tx.set(&key, make_value("shared")).unwrap();
567			tx.commit().unwrap();
568		}
569
570		let barrier = Arc::new(Barrier::new(3));
571		let mut handles = vec![];
572
573		// Spawn 3 concurrent readers
574		for _ in 0..3 {
575			let svl_clone = Arc::clone(&svl);
576			let key_clone = key.clone();
577			let barrier_clone = Arc::clone(&barrier);
578
579			let handle = thread::spawn(move || {
580				let mut tx = svl_clone.begin_query(vec![&key_clone]).unwrap();
581
582				// Wait for all readers to start
583				barrier_clone.wait();
584
585				// All should be able to read concurrently
586				let result = tx.get(&key_clone).unwrap();
587				assert!(result.is_some());
588				assert_eq!(result.unwrap().values, make_value("shared"));
589
590				// Hold for a bit to ensure overlap
591				thread::sleep(Duration::from_millis(50));
592			});
593			handles.push(handle);
594		}
595
596		for handle in handles {
597			handle.join().unwrap();
598		}
599	}
600
601	#[test]
602	fn test_overlapping_keys_different_order() {
603		let svl = Arc::new(create_test_svl());
604		let key1 = make_key("deadlock_key1");
605		let key2 = make_key("deadlock_key2");
606		let barrier = Arc::new(Barrier::new(2));
607
608		// Thread 1: locks [key1, key2]
609		let svl1 = Arc::clone(&svl);
610		let key1_clone = key1.clone();
611		let key2_clone = key2.clone();
612		let barrier1 = Arc::clone(&barrier);
613		let handle1 = thread::spawn(move || {
614			barrier1.wait();
615			let mut tx = svl1.begin_command(vec![&key1_clone, &key2_clone]).unwrap();
616			tx.set(&key1_clone, make_value("from_thread1")).unwrap();
617			thread::sleep(Duration::from_millis(10)); // Hold locks briefly
618			tx.commit().unwrap();
619		});
620
621		// Thread 2: locks [key2, key1] - REVERSED ORDER
622		// With sorted locking, this should not deadlock
623		let svl2 = Arc::clone(&svl);
624		let key1_clone2 = key1.clone();
625		let key2_clone2 = key2.clone();
626		let barrier2 = Arc::clone(&barrier);
627		let handle2 = thread::spawn(move || {
628			barrier2.wait();
629			let mut tx = svl2.begin_command(vec![&key2_clone2, &key1_clone2]).unwrap();
630			tx.set(&key2_clone2, make_value("from_thread2")).unwrap();
631			thread::sleep(Duration::from_millis(10)); // Hold locks briefly
632			tx.commit().unwrap();
633		});
634
635		// Both threads should complete without deadlock
636		handle1.join().unwrap();
637		handle2.join().unwrap();
638
639		// Verify both commits succeeded
640		let mut tx = svl.begin_query(vec![&key1, &key2]).unwrap();
641		let result1 = tx.get(&key1).unwrap();
642		let result2 = tx.get(&key2).unwrap();
643		assert!(result1.is_some());
644		assert!(result2.is_some());
645	}
646
647	#[test]
648	fn test_circular_dependency_three_transactions() {
649		let svl = Arc::new(create_test_svl());
650		let key1 = make_key("circular_key1");
651		let key2 = make_key("circular_key2");
652		let key3 = make_key("circular_key3");
653		let barrier = Arc::new(Barrier::new(3));
654
655		// Thread 1: locks [key1, key2]
656		let svl1 = Arc::clone(&svl);
657		let k1_1 = key1.clone();
658		let k2_1 = key2.clone();
659		let barrier1 = Arc::clone(&barrier);
660		let handle1 = thread::spawn(move || {
661			barrier1.wait();
662			let mut tx = svl1.begin_command(vec![&k1_1, &k2_1]).unwrap();
663			tx.set(&k1_1, make_value("t1")).unwrap();
664			thread::sleep(Duration::from_millis(10));
665			tx.commit().unwrap();
666		});
667
668		// Thread 2: locks [key2, key3]
669		let svl2 = Arc::clone(&svl);
670		let k2_2 = key2.clone();
671		let k3_2 = key3.clone();
672		let barrier2 = Arc::clone(&barrier);
673		let handle2 = thread::spawn(move || {
674			barrier2.wait();
675			let mut tx = svl2.begin_command(vec![&k2_2, &k3_2]).unwrap();
676			tx.set(&k2_2, make_value("t2")).unwrap();
677			thread::sleep(Duration::from_millis(10));
678			tx.commit().unwrap();
679		});
680
681		// Thread 3: locks [key3, key1] - completes the potential cycle
682		// With sorted locking, this should not create a circular dependency
683		let svl3 = Arc::clone(&svl);
684		let barrier3 = Arc::clone(&barrier);
685		let handle3 = thread::spawn(move || {
686			barrier3.wait();
687			let mut tx = svl3.begin_command(vec![&key3, &key1]).unwrap();
688			tx.set(&key3, make_value("t3")).unwrap();
689			thread::sleep(Duration::from_millis(10));
690			tx.commit().unwrap();
691		});
692
693		// All threads should complete without circular deadlock
694		handle1.join().unwrap();
695		handle2.join().unwrap();
696		handle3.join().unwrap();
697	}
698
699	#[test]
700	fn test_locks_released_on_drop() {
701		let svl = Arc::new(create_test_svl());
702		let key = make_key("drop_test_key");
703
704		// Thread 1: Acquire lock and drop without commit
705		let svl1 = Arc::clone(&svl);
706		let key_clone = key.clone();
707		let handle1 = thread::spawn(move || {
708			let mut tx = svl1.begin_command(vec![&key_clone]).unwrap();
709			tx.set(&key_clone, make_value("dropped")).unwrap();
710			// Transaction dropped here without commit
711		});
712
713		handle1.join().unwrap();
714
715		// Small delay to ensure drop completes
716		thread::sleep(Duration::from_millis(10));
717
718		// Thread 2: Should be able to acquire the lock immediately
719		// If locks weren't released on drop, this would block indefinitely
720		let svl2 = Arc::clone(&svl);
721		let key_clone2 = key.clone();
722		let handle2 = thread::spawn(move || {
723			let mut tx = svl2.begin_command(vec![&key_clone2]).unwrap();
724			tx.set(&key_clone2, make_value("success")).unwrap();
725			tx.commit().unwrap();
726		});
727
728		// This should complete quickly if locks are released properly
729		handle2.join().unwrap();
730
731		// Verify the second transaction succeeded
732		let mut tx = svl.begin_query(vec![&key]).unwrap();
733		let result = tx.get(&key).unwrap();
734		assert!(result.is_some());
735		assert_eq!(result.unwrap().values, make_value("success"));
736	}
737}