Skip to main content

MultiTransaction

Struct MultiTransaction 

Source
pub struct MultiTransaction(/* private fields */);

Implementations§

Source§

impl MultiTransaction

Source

pub fn testing() -> Self

Examples found in repository?
examples/oracle_performance.rs (line 33)
24pub fn oracle_performance_benchmark() {
25	println!("=== Oracle Performance Benchmark ===\n");
26
27	// Test different transaction counts to show scaling behavior
28	let test_sizes = vec![1000, 5000, 10000, 25000];
29
30	for &num_txns in &test_sizes {
31		println!("Testing with {} transactions...", num_txns);
32
33		let engine = MultiTransaction::testing();
34
35		let start = Instant::now();
36
37		// Create transactions sequentially (worst case for O(N²) algorithm)
38		for i in 0..num_txns {
39			let mut tx = engine.begin_command().unwrap();
40
41			let key = as_key!(format!("key_{}", i));
42			let value = as_values!(format!("value_{}", i));
43
44			tx.set(&key, value).unwrap();
45			tx.commit().unwrap();
46		}
47
48		let duration = start.elapsed();
49		let tps = num_txns as f64 / duration.as_secs_f64();
50
51		println!("  {} transactions in {:?}", num_txns, duration);
52		println!("  {:.0} TPS (transactions per second)", tps);
53		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54	}
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59	println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61	let test_configs = vec![
62		(10, 1000), // 10 threads, 1000 txns each
63		(50, 500),  // 50 threads, 500 txns each
64		(100, 250), // 100 threads, 250 txns each
65		(1000, 50), // 1000 threads, 50 txns each
66	];
67
68	for &(num_threads, txns_per_thread) in &test_configs {
69		let total_txns = num_threads * txns_per_thread;
70		println!(
71			"Testing {} threads × {} transactions = {} total...",
72			num_threads, txns_per_thread, total_txns
73		);
74
75		let engine = Arc::new(MultiTransaction::testing());
76		let start = Instant::now();
77
78		let mut handles = vec![];
79
80		for thread_id in 0..num_threads {
81			let engine_clone = engine.clone();
82			let handle = spawn(move || {
83				let base_key = thread_id * txns_per_thread;
84				for i in 0..txns_per_thread {
85					let mut tx = engine_clone.begin_command().unwrap();
86
87					let key = as_key!(base_key + i);
88					let value = as_values!(i);
89
90					tx.set(&key, value).unwrap();
91					tx.commit().unwrap();
92				}
93			});
94			handles.push(handle);
95		}
96
97		for handle in handles {
98			handle.join().expect("Task panicked");
99		}
100
101		let duration = start.elapsed();
102		let tps = total_txns as f64 / duration.as_secs_f64();
103
104		println!("  {} total transactions in {:?}", total_txns, duration);
105		println!("  {:.0} TPS (transactions per second)", tps);
106		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107	}
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112	println!("=== Conflict Detection Performance Benchmark ===\n");
113
114	let engine = MultiTransaction::testing();
115
116	// Pre-populate with some data to create realistic conflict scenarios
117	for i in 0..1000 {
118		let mut tx = engine.begin_command().unwrap();
119		let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120		let value = as_values!(i);
121		tx.set(&key, value).unwrap();
122		tx.commit().unwrap();
123	}
124
125	println!("Pre-populated with 1000 transactions across 100 keys");
126
127	// Now test conflict detection performance
128	let num_conflict_txns = 10000;
129	let start = Instant::now();
130	let mut conflicts = 0;
131
132	for i in 0..num_conflict_txns {
133		let mut tx = engine.begin_command().unwrap();
134
135		// Try to modify keys that might conflict
136		let key = as_key!(format!("shared_key_{}", i % 100));
137		let value = as_values!(i + 1000);
138
139		tx.set(&key, value).unwrap();
140
141		match tx.commit() {
142			Ok(_) => {}
143			Err(e) if e.code == "TXN_001" => {
144				conflicts += 1;
145			}
146			Err(e) => panic!("Unexpected error: {:?}", e),
147		};
148	}
149
150	let duration = start.elapsed();
151	let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153	println!("  {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154	println!(
155		"  {} actual conflicts detected ({:.1}%)",
156		conflicts,
157		conflicts as f64 / num_conflict_txns as f64 * 100.0
158	);
159	println!("  {:.0} TPS (transactions per second)", tps);
160	println!("  {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}
Source§

impl MultiTransaction

Source

pub fn new( store: MultiStore, single: SingleTransaction, event_bus: EventBus, actor_system: ActorSystem, metrics_clock: Clock, system_config: SystemConfig, ) -> Result<Self>

Source

pub fn actor_system(&self) -> ActorSystem

Get the actor system

Source

pub fn system_config(&self) -> SystemConfig

Get the shared system config from the oracle.

Source§

impl MultiTransaction

Source

pub fn version(&self) -> Result<CommitVersion>

Source

pub fn begin_query(&self) -> Result<MultiReadTransaction>

Source

pub fn begin_query_at_version( &self, version: CommitVersion, ) -> Result<MultiReadTransaction>

Begin a query transaction at a specific version.

This is used for parallel query execution where multiple tasks need to read from the same snapshot (same CommitVersion) for consistency.

Source§

impl MultiTransaction

Source

pub fn begin_command(&self) -> Result<MultiWriteTransaction>

Examples found in repository?
examples/oracle_performance.rs (line 39)
24pub fn oracle_performance_benchmark() {
25	println!("=== Oracle Performance Benchmark ===\n");
26
27	// Test different transaction counts to show scaling behavior
28	let test_sizes = vec![1000, 5000, 10000, 25000];
29
30	for &num_txns in &test_sizes {
31		println!("Testing with {} transactions...", num_txns);
32
33		let engine = MultiTransaction::testing();
34
35		let start = Instant::now();
36
37		// Create transactions sequentially (worst case for O(N²) algorithm)
38		for i in 0..num_txns {
39			let mut tx = engine.begin_command().unwrap();
40
41			let key = as_key!(format!("key_{}", i));
42			let value = as_values!(format!("value_{}", i));
43
44			tx.set(&key, value).unwrap();
45			tx.commit().unwrap();
46		}
47
48		let duration = start.elapsed();
49		let tps = num_txns as f64 / duration.as_secs_f64();
50
51		println!("  {} transactions in {:?}", num_txns, duration);
52		println!("  {:.0} TPS (transactions per second)", tps);
53		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54	}
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59	println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61	let test_configs = vec![
62		(10, 1000), // 10 threads, 1000 txns each
63		(50, 500),  // 50 threads, 500 txns each
64		(100, 250), // 100 threads, 250 txns each
65		(1000, 50), // 1000 threads, 50 txns each
66	];
67
68	for &(num_threads, txns_per_thread) in &test_configs {
69		let total_txns = num_threads * txns_per_thread;
70		println!(
71			"Testing {} threads × {} transactions = {} total...",
72			num_threads, txns_per_thread, total_txns
73		);
74
75		let engine = Arc::new(MultiTransaction::testing());
76		let start = Instant::now();
77
78		let mut handles = vec![];
79
80		for thread_id in 0..num_threads {
81			let engine_clone = engine.clone();
82			let handle = spawn(move || {
83				let base_key = thread_id * txns_per_thread;
84				for i in 0..txns_per_thread {
85					let mut tx = engine_clone.begin_command().unwrap();
86
87					let key = as_key!(base_key + i);
88					let value = as_values!(i);
89
90					tx.set(&key, value).unwrap();
91					tx.commit().unwrap();
92				}
93			});
94			handles.push(handle);
95		}
96
97		for handle in handles {
98			handle.join().expect("Task panicked");
99		}
100
101		let duration = start.elapsed();
102		let tps = total_txns as f64 / duration.as_secs_f64();
103
104		println!("  {} total transactions in {:?}", total_txns, duration);
105		println!("  {:.0} TPS (transactions per second)", tps);
106		println!("  {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107	}
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112	println!("=== Conflict Detection Performance Benchmark ===\n");
113
114	let engine = MultiTransaction::testing();
115
116	// Pre-populate with some data to create realistic conflict scenarios
117	for i in 0..1000 {
118		let mut tx = engine.begin_command().unwrap();
119		let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120		let value = as_values!(i);
121		tx.set(&key, value).unwrap();
122		tx.commit().unwrap();
123	}
124
125	println!("Pre-populated with 1000 transactions across 100 keys");
126
127	// Now test conflict detection performance
128	let num_conflict_txns = 10000;
129	let start = Instant::now();
130	let mut conflicts = 0;
131
132	for i in 0..num_conflict_txns {
133		let mut tx = engine.begin_command().unwrap();
134
135		// Try to modify keys that might conflict
136		let key = as_key!(format!("shared_key_{}", i % 100));
137		let value = as_values!(i + 1000);
138
139		tx.set(&key, value).unwrap();
140
141		match tx.commit() {
142			Ok(_) => {}
143			Err(e) if e.code == "TXN_001" => {
144				conflicts += 1;
145			}
146			Err(e) => panic!("Unexpected error: {:?}", e),
147		};
148	}
149
150	let duration = start.elapsed();
151	let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153	println!("  {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154	println!(
155		"  {} actual conflicts detected ({:.1}%)",
156		conflicts,
157		conflicts as f64 / num_conflict_txns as f64 * 100.0
158	);
159	println!("  {:.0} TPS (transactions per second)", tps);
160	println!("  {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}
Source§

impl MultiTransaction

Source

pub fn get( &self, key: &EncodedKey, version: CommitVersion, ) -> Result<Option<Committed>>

Source

pub fn contains_key( &self, key: &EncodedKey, version: CommitVersion, ) -> Result<bool>

Source

pub fn store(&self) -> &MultiStore

Get a reference to the underlying transaction store.

Source§

impl MultiTransaction

Source

pub fn current_version(&self) -> Result<CommitVersion>

Get the current version from the transaction manager

Source

pub fn done_until(&self) -> CommitVersion

Returns the highest version where ALL prior versions have completed. This is useful for CDC polling to know the safe upper bound for fetching CDC events - all events up to this version are guaranteed to be in storage.

Source

pub fn wait_for_mark_timeout( &self, version: CommitVersion, timeout: Duration, ) -> bool

Wait for the watermark to reach the given version with a timeout. Returns true if the watermark reached the target, false if timeout occurred.

Trait Implementations§

Source§

impl Clone for MultiTransaction

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Deref for MultiTransaction

Source§

type Target = Inner

The resulting type after dereferencing.
Source§

fn deref(&self) -> &Self::Target

Dereferences the value.
Source§

impl WithEventBus for MultiTransaction

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<P, T> Receiver for P
where P: Deref<Target = T> + ?Sized, T: ?Sized,

Source§

type Target = T

🔬This is a nightly-only experimental API. (arbitrary_self_types)
The target type on which the method may be called.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more