Skip to main content

MultiWriteTransaction

Struct MultiWriteTransaction 

Source
pub struct MultiWriteTransaction { /* private fields */ }

Implementations§

Source§

impl MultiWriteTransaction

Source

pub fn new(engine: MultiTransaction) -> Result<Self>

Source§

impl MultiWriteTransaction

Source

pub fn savepoint(&self) -> WriteSavepoint

Snapshot pending writes for later restore.

Source

pub fn restore_savepoint(&mut self, sp: WriteSavepoint)

Restore pending writes from a savepoint.

Source§

impl MultiWriteTransaction

Source

pub fn commit(&mut self) -> Result<CommitVersion>

Examples found in repository?
examples/oracle_performance.rs (line 45)
24pub fn oracle_performance_benchmark() {
25	println!("=== Oracle Performance Benchmark ===\n");
26
27	// Test different transaction counts to show scaling behavior
28	let test_sizes = vec![1000, 5000, 10000, 25000];
29
30	for &num_txns in &test_sizes {
31		println!("Testing with {} transactions...", num_txns);
32
33		let engine = MultiTransaction::testing();
34
35		let start = Instant::now();
36
37		// Create transactions sequentially (worst case for O(N²) algorithm)
38		for i in 0..num_txns {
39			let mut tx = engine.begin_command().unwrap();
40
41			let key = as_key!(format!("key_{}", i));
42			let value = as_values!(format!("value_{}", i));
43
44			tx.set(&key, value).unwrap();
45			tx.commit().unwrap();
46		}
47
48		let duration = start.elapsed();
49		let tps = num_txns as f64 / duration.as_secs_f64();
50
51		println!("  {} transactions in {:?}", num_txns, duration);
52		println!("  {:.0} TPS (transactions per second)", tps);
53		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54	}
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59	println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61	let test_configs = vec![
62		(10, 1000), // 10 threads, 1000 txns each
63		(50, 500),  // 50 threads, 500 txns each
64		(100, 250), // 100 threads, 250 txns each
65		(1000, 50), // 1000 threads, 50 txns each
66	];
67
68	for &(num_threads, txns_per_thread) in &test_configs {
69		let total_txns = num_threads * txns_per_thread;
70		println!(
71			"Testing {} threads × {} transactions = {} total...",
72			num_threads, txns_per_thread, total_txns
73		);
74
75		let engine = Arc::new(MultiTransaction::testing());
76		let start = Instant::now();
77
78		let mut handles = vec![];
79
80		for thread_id in 0..num_threads {
81			let engine_clone = engine.clone();
82			let handle = spawn(move || {
83				let base_key = thread_id * txns_per_thread;
84				for i in 0..txns_per_thread {
85					let mut tx = engine_clone.begin_command().unwrap();
86
87					let key = as_key!(base_key + i);
88					let value = as_values!(i);
89
90					tx.set(&key, value).unwrap();
91					tx.commit().unwrap();
92				}
93			});
94			handles.push(handle);
95		}
96
97		for handle in handles {
98			handle.join().expect("Task panicked");
99		}
100
101		let duration = start.elapsed();
102		let tps = total_txns as f64 / duration.as_secs_f64();
103
104		println!("  {} total transactions in {:?}", total_txns, duration);
105		println!("  {:.0} TPS (transactions per second)", tps);
106		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107	}
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112	println!("=== Conflict Detection Performance Benchmark ===\n");
113
114	let engine = MultiTransaction::testing();
115
116	// Pre-populate with some data to create realistic conflict scenarios
117	for i in 0..1000 {
118		let mut tx = engine.begin_command().unwrap();
119		let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120		let value = as_values!(i);
121		tx.set(&key, value).unwrap();
122		tx.commit().unwrap();
123	}
124
125	println!("Pre-populated with 1000 transactions across 100 keys");
126
127	// Now test conflict detection performance
128	let num_conflict_txns = 10000;
129	let start = Instant::now();
130	let mut conflicts = 0;
131
132	for i in 0..num_conflict_txns {
133		let mut tx = engine.begin_command().unwrap();
134
135		// Try to modify keys that might conflict
136		let key = as_key!(format!("shared_key_{}", i % 100));
137		let value = as_values!(i + 1000);
138
139		tx.set(&key, value).unwrap();
140
141		match tx.commit() {
142			Ok(_) => {}
143			Err(e) if e.code == "TXN_001" => {
144				conflicts += 1;
145			}
146			Err(e) => panic!("Unexpected error: {:?}", e),
147		};
148	}
149
150	let duration = start.elapsed();
151	let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153	println!("  {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154	println!(
155		"  {} actual conflicts detected ({:.1}%)",
156		conflicts,
157		conflicts as f64 / num_conflict_txns as f64 * 100.0
158	);
159	println!("  {:.0} TPS (transactions per second)", tps);
160	println!("  {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}
Source§

impl MultiWriteTransaction

Source

pub fn version(&self) -> CommitVersion

Source

pub fn pending_writes(&self) -> &PendingWrites

Source

pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion)

Source

pub fn read_as_of_version_inclusive( &mut self, version: CommitVersion, ) -> Result<()>

Source

pub fn rollback(&mut self) -> Result<()>

Source

pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Source

pub fn get(&mut self, key: &EncodedKey) -> Result<Option<TransactionValue>>

Source

pub fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()>

Examples found in repository?
examples/oracle_performance.rs (line 44)
24pub fn oracle_performance_benchmark() {
25	println!("=== Oracle Performance Benchmark ===\n");
26
27	// Test different transaction counts to show scaling behavior
28	let test_sizes = vec![1000, 5000, 10000, 25000];
29
30	for &num_txns in &test_sizes {
31		println!("Testing with {} transactions...", num_txns);
32
33		let engine = MultiTransaction::testing();
34
35		let start = Instant::now();
36
37		// Create transactions sequentially (worst case for O(N²) algorithm)
38		for i in 0..num_txns {
39			let mut tx = engine.begin_command().unwrap();
40
41			let key = as_key!(format!("key_{}", i));
42			let value = as_values!(format!("value_{}", i));
43
44			tx.set(&key, value).unwrap();
45			tx.commit().unwrap();
46		}
47
48		let duration = start.elapsed();
49		let tps = num_txns as f64 / duration.as_secs_f64();
50
51		println!("  {} transactions in {:?}", num_txns, duration);
52		println!("  {:.0} TPS (transactions per second)", tps);
53		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54	}
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59	println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61	let test_configs = vec![
62		(10, 1000), // 10 threads, 1000 txns each
63		(50, 500),  // 50 threads, 500 txns each
64		(100, 250), // 100 threads, 250 txns each
65		(1000, 50), // 1000 threads, 50 txns each
66	];
67
68	for &(num_threads, txns_per_thread) in &test_configs {
69		let total_txns = num_threads * txns_per_thread;
70		println!(
71			"Testing {} threads × {} transactions = {} total...",
72			num_threads, txns_per_thread, total_txns
73		);
74
75		let engine = Arc::new(MultiTransaction::testing());
76		let start = Instant::now();
77
78		let mut handles = vec![];
79
80		for thread_id in 0..num_threads {
81			let engine_clone = engine.clone();
82			let handle = spawn(move || {
83				let base_key = thread_id * txns_per_thread;
84				for i in 0..txns_per_thread {
85					let mut tx = engine_clone.begin_command().unwrap();
86
87					let key = as_key!(base_key + i);
88					let value = as_values!(i);
89
90					tx.set(&key, value).unwrap();
91					tx.commit().unwrap();
92				}
93			});
94			handles.push(handle);
95		}
96
97		for handle in handles {
98			handle.join().expect("Task panicked");
99		}
100
101		let duration = start.elapsed();
102		let tps = total_txns as f64 / duration.as_secs_f64();
103
104		println!("  {} total transactions in {:?}", total_txns, duration);
105		println!("  {:.0} TPS (transactions per second)", tps);
106		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107	}
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112	println!("=== Conflict Detection Performance Benchmark ===\n");
113
114	let engine = MultiTransaction::testing();
115
116	// Pre-populate with some data to create realistic conflict scenarios
117	for i in 0..1000 {
118		let mut tx = engine.begin_command().unwrap();
119		let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120		let value = as_values!(i);
121		tx.set(&key, value).unwrap();
122		tx.commit().unwrap();
123	}
124
125	println!("Pre-populated with 1000 transactions across 100 keys");
126
127	// Now test conflict detection performance
128	let num_conflict_txns = 10000;
129	let start = Instant::now();
130	let mut conflicts = 0;
131
132	for i in 0..num_conflict_txns {
133		let mut tx = engine.begin_command().unwrap();
134
135		// Try to modify keys that might conflict
136		let key = as_key!(format!("shared_key_{}", i % 100));
137		let value = as_values!(i + 1000);
138
139		tx.set(&key, value).unwrap();
140
141		match tx.commit() {
142			Ok(_) => {}
143			Err(e) if e.code == "TXN_001" => {
144				conflicts += 1;
145			}
146			Err(e) => panic!("Unexpected error: {:?}", e),
147		};
148	}
149
150	let duration = start.elapsed();
151	let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153	println!("  {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154	println!(
155		"  {} actual conflicts detected ({:.1}%)",
156		conflicts,
157		conflicts as f64 / num_conflict_txns as f64 * 100.0
158	);
159	println!("  {:.0} TPS (transactions per second)", tps);
160	println!("  {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}
Source

pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()>

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Source

pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Source

pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Source

pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create a streaming iterator for forward range queries, merging pending writes.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The stream yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.

Source

pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create a streaming iterator for reverse range queries, merging pending writes.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The stream yields individual entries in reverse key order and maintains cursor state internally.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more