pub struct MultiTransaction(/* private fields */);Implementations§
Source§impl MultiTransaction
impl MultiTransaction
Sourcepub fn testing() -> Self
pub fn testing() -> Self
Examples found in repository?
examples/oracle_performance.rs (line 33)
24pub fn oracle_performance_benchmark() {
25 println!("=== Oracle Performance Benchmark ===\n");
26
27 // Test different transaction counts to show scaling behavior
28 let test_sizes = vec![1000, 5000, 10000, 25000];
29
30 for &num_txns in &test_sizes {
31 println!("Testing with {} transactions...", num_txns);
32
33 let engine = MultiTransaction::testing();
34
35 let start = Instant::now();
36
37 // Create transactions sequentially (worst case for O(N²) algorithm)
38 for i in 0..num_txns {
39 let mut tx = engine.begin_command().unwrap();
40
41 let key = as_key!(format!("key_{}", i));
42 let value = as_values!(format!("value_{}", i));
43
44 tx.set(&key, value).unwrap();
45 tx.commit().unwrap();
46 }
47
48 let duration = start.elapsed();
49 let tps = num_txns as f64 / duration.as_secs_f64();
50
51 println!(" {} transactions in {:?}", num_txns, duration);
52 println!(" {:.0} TPS (transactions per second)", tps);
53 println!(" {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54 }
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59 println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61 let test_configs = vec![
62 (10, 1000), // 10 threads, 1000 txns each
63 (50, 500), // 50 threads, 500 txns each
64 (100, 250), // 100 threads, 250 txns each
65 (1000, 50), // 1000 threads, 50 txns each
66 ];
67
68 for &(num_threads, txns_per_thread) in &test_configs {
69 let total_txns = num_threads * txns_per_thread;
70 println!(
71 "Testing {} threads × {} transactions = {} total...",
72 num_threads, txns_per_thread, total_txns
73 );
74
75 let engine = Arc::new(MultiTransaction::testing());
76 let start = Instant::now();
77
78 let mut handles = vec![];
79
80 for thread_id in 0..num_threads {
81 let engine_clone = engine.clone();
82 let handle = spawn(move || {
83 let base_key = thread_id * txns_per_thread;
84 for i in 0..txns_per_thread {
85 let mut tx = engine_clone.begin_command().unwrap();
86
87 let key = as_key!(base_key + i);
88 let value = as_values!(i);
89
90 tx.set(&key, value).unwrap();
91 tx.commit().unwrap();
92 }
93 });
94 handles.push(handle);
95 }
96
97 for handle in handles {
98 handle.join().expect("Task panicked");
99 }
100
101 let duration = start.elapsed();
102 let tps = total_txns as f64 / duration.as_secs_f64();
103
104 println!(" {} total transactions in {:?}", total_txns, duration);
105 println!(" {:.0} TPS (transactions per second)", tps);
106 println!(" {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107 }
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112 println!("=== Conflict Detection Performance Benchmark ===\n");
113
114 let engine = MultiTransaction::testing();
115
116 // Pre-populate with some data to create realistic conflict scenarios
117 for i in 0..1000 {
118 let mut tx = engine.begin_command().unwrap();
119 let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120 let value = as_values!(i);
121 tx.set(&key, value).unwrap();
122 tx.commit().unwrap();
123 }
124
125 println!("Pre-populated with 1000 transactions across 100 keys");
126
127 // Now test conflict detection performance
128 let num_conflict_txns = 10000;
129 let start = Instant::now();
130 let mut conflicts = 0;
131
132 for i in 0..num_conflict_txns {
133 let mut tx = engine.begin_command().unwrap();
134
135 // Try to modify keys that might conflict
136 let key = as_key!(format!("shared_key_{}", i % 100));
137 let value = as_values!(i + 1000);
138
139 tx.set(&key, value).unwrap();
140
141 match tx.commit() {
142 Ok(_) => {}
143 Err(e) if e.code == "TXN_001" => {
144 conflicts += 1;
145 }
146 Err(e) => panic!("Unexpected error: {:?}", e),
147 };
148 }
149
150 let duration = start.elapsed();
151 let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153 println!(" {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154 println!(
155 " {} actual conflicts detected ({:.1}%)",
156 conflicts,
157 conflicts as f64 / num_conflict_txns as f64 * 100.0
158 );
159 println!(" {:.0} TPS (transactions per second)", tps);
160 println!(" {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}Source§impl MultiTransaction
impl MultiTransaction
pub fn new( store: MultiStore, single: SingleTransaction, event_bus: EventBus, actor_system: ActorSystem, metrics_clock: Clock, system_config: SystemConfig, ) -> Result<Self>
Sourcepub fn actor_system(&self) -> ActorSystem
pub fn actor_system(&self) -> ActorSystem
Get the actor system
Sourcepub fn system_config(&self) -> SystemConfig
pub fn system_config(&self) -> SystemConfig
Get the shared system config from the oracle.
Source§impl MultiTransaction
impl MultiTransaction
pub fn version(&self) -> Result<CommitVersion>
pub fn begin_query(&self) -> Result<MultiReadTransaction>
Sourcepub fn begin_query_at_version(
&self,
version: CommitVersion,
) -> Result<MultiReadTransaction>
pub fn begin_query_at_version( &self, version: CommitVersion, ) -> Result<MultiReadTransaction>
Begin a query transaction at a specific version.
This is used for parallel query execution where multiple tasks need to read from the same snapshot (same CommitVersion) for consistency.
Source§impl MultiTransaction
impl MultiTransaction
Sourcepub fn begin_command(&self) -> Result<MultiWriteTransaction>
pub fn begin_command(&self) -> Result<MultiWriteTransaction>
Examples found in repository?
examples/oracle_performance.rs (line 39)
24pub fn oracle_performance_benchmark() {
25 println!("=== Oracle Performance Benchmark ===\n");
26
27 // Test different transaction counts to show scaling behavior
28 let test_sizes = vec![1000, 5000, 10000, 25000];
29
30 for &num_txns in &test_sizes {
31 println!("Testing with {} transactions...", num_txns);
32
33 let engine = MultiTransaction::testing();
34
35 let start = Instant::now();
36
37 // Create transactions sequentially (worst case for O(N²) algorithm)
38 for i in 0..num_txns {
39 let mut tx = engine.begin_command().unwrap();
40
41 let key = as_key!(format!("key_{}", i));
42 let value = as_values!(format!("value_{}", i));
43
44 tx.set(&key, value).unwrap();
45 tx.commit().unwrap();
46 }
47
48 let duration = start.elapsed();
49 let tps = num_txns as f64 / duration.as_secs_f64();
50
51 println!(" {} transactions in {:?}", num_txns, duration);
52 println!(" {:.0} TPS (transactions per second)", tps);
53 println!(" {:.2} μs per transaction\n", duration.as_micros() as f64 / num_txns as f64);
54 }
55}
56
57/// Benchmark concurrent performance
58pub fn concurrent_oracle_benchmark() {
59 println!("=== Concurrent Oracle Performance Benchmark ===\n");
60
61 let test_configs = vec![
62 (10, 1000), // 10 threads, 1000 txns each
63 (50, 500), // 50 threads, 500 txns each
64 (100, 250), // 100 threads, 250 txns each
65 (1000, 50), // 1000 threads, 50 txns each
66 ];
67
68 for &(num_threads, txns_per_thread) in &test_configs {
69 let total_txns = num_threads * txns_per_thread;
70 println!(
71 "Testing {} threads × {} transactions = {} total...",
72 num_threads, txns_per_thread, total_txns
73 );
74
75 let engine = Arc::new(MultiTransaction::testing());
76 let start = Instant::now();
77
78 let mut handles = vec![];
79
80 for thread_id in 0..num_threads {
81 let engine_clone = engine.clone();
82 let handle = spawn(move || {
83 let base_key = thread_id * txns_per_thread;
84 for i in 0..txns_per_thread {
85 let mut tx = engine_clone.begin_command().unwrap();
86
87 let key = as_key!(base_key + i);
88 let value = as_values!(i);
89
90 tx.set(&key, value).unwrap();
91 tx.commit().unwrap();
92 }
93 });
94 handles.push(handle);
95 }
96
97 for handle in handles {
98 handle.join().expect("Task panicked");
99 }
100
101 let duration = start.elapsed();
102 let tps = total_txns as f64 / duration.as_secs_f64();
103
104 println!(" {} total transactions in {:?}", total_txns, duration);
105 println!(" {:.0} TPS (transactions per second)", tps);
106 println!(" {:.2} μs per transaction\n", duration.as_micros() as f64 / total_txns as f64);
107 }
108}
109
110/// Benchmark with actual conflicts to test conflict detection performance
111pub fn conflict_detection_benchmark() {
112 println!("=== Conflict Detection Performance Benchmark ===\n");
113
114 let engine = MultiTransaction::testing();
115
116 // Pre-populate with some data to create realistic conflict scenarios
117 for i in 0..1000 {
118 let mut tx = engine.begin_command().unwrap();
119 let key = as_key!(format!("shared_key_{}", i % 100)); // 100 different keys
120 let value = as_values!(i);
121 tx.set(&key, value).unwrap();
122 tx.commit().unwrap();
123 }
124
125 println!("Pre-populated with 1000 transactions across 100 keys");
126
127 // Now test conflict detection performance
128 let num_conflict_txns = 10000;
129 let start = Instant::now();
130 let mut conflicts = 0;
131
132 for i in 0..num_conflict_txns {
133 let mut tx = engine.begin_command().unwrap();
134
135 // Try to modify keys that might conflict
136 let key = as_key!(format!("shared_key_{}", i % 100));
137 let value = as_values!(i + 1000);
138
139 tx.set(&key, value).unwrap();
140
141 match tx.commit() {
142 Ok(_) => {}
143 Err(e) if e.code == "TXN_001" => {
144 conflicts += 1;
145 }
146 Err(e) => panic!("Unexpected error: {:?}", e),
147 };
148 }
149
150 let duration = start.elapsed();
151 let tps = num_conflict_txns as f64 / duration.as_secs_f64();
152
153 println!(" {} transactions with potential conflicts in {:?}", num_conflict_txns, duration);
154 println!(
155 " {} actual conflicts detected ({:.1}%)",
156 conflicts,
157 conflicts as f64 / num_conflict_txns as f64 * 100.0
158 );
159 println!(" {:.0} TPS (transactions per second)", tps);
160 println!(" {:.2} μs per transaction", duration.as_micros() as f64 / num_conflict_txns as f64);
161}Source§impl MultiTransaction
impl MultiTransaction
pub fn get( &self, key: &EncodedKey, version: CommitVersion, ) -> Result<Option<Committed>>
pub fn contains_key( &self, key: &EncodedKey, version: CommitVersion, ) -> Result<bool>
Sourcepub fn store(&self) -> &MultiStore
pub fn store(&self) -> &MultiStore
Get a reference to the underlying transaction store.
Source§impl MultiTransaction
impl MultiTransaction
Sourcepub fn current_version(&self) -> Result<CommitVersion>
pub fn current_version(&self) -> Result<CommitVersion>
Get the current version from the transaction manager
Sourcepub fn done_until(&self) -> CommitVersion
pub fn done_until(&self) -> CommitVersion
Returns the highest version where ALL prior versions have completed. This is useful for CDC polling to know the safe upper bound for fetching CDC events - all events up to this version are guaranteed to be in storage.
Sourcepub fn wait_for_mark_timeout(
&self,
version: CommitVersion,
timeout: Duration,
) -> bool
pub fn wait_for_mark_timeout( &self, version: CommitVersion, timeout: Duration, ) -> bool
Wait for the watermark to reach the given version with a timeout. Returns true if the watermark reached the target, false if timeout occurred.
Trait Implementations§
Source§impl Clone for MultiTransaction
impl Clone for MultiTransaction
Source§impl Deref for MultiTransaction
impl Deref for MultiTransaction
Source§impl WithEventBus for MultiTransaction
impl WithEventBus for MultiTransaction
Auto Trait Implementations§
impl Freeze for MultiTransaction
impl !RefUnwindSafe for MultiTransaction
impl Send for MultiTransaction
impl Sync for MultiTransaction
impl Unpin for MultiTransaction
impl UnsafeUnpin for MultiTransaction
impl !UnwindSafe for MultiTransaction
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more