1use foundationdb::{
2 options::{MutationType, TransactionOption},
3 tuple::Subspace,
4 FdbBindingError,
5};
6use foundationdb_simulation::{
7 details, register_workload, Metric, Metrics, RustWorkload, Severity, SimDatabase,
8 SingleRustWorkload, WorkloadContext,
9};
10
11pub struct AtomicWorkload {
12 context: WorkloadContext,
13 client_id: i32,
14 expected_count: usize,
16 success_count: usize,
18 error_count: usize,
20 maybe_committed_count: usize,
22}
23
24const COUNT_KEY: &[u8] = b"count";
25
26impl SingleRustWorkload for AtomicWorkload {
27 fn new(_name: String, context: WorkloadContext) -> Self {
28 Self {
29 client_id: context.client_id(),
30 expected_count: context.get_option("count").expect("Could not get count"),
31 context,
32 success_count: 0,
33 error_count: 0,
34 maybe_committed_count: 0,
35 }
36 }
37}
38
39impl RustWorkload for AtomicWorkload {
40 async fn setup(&mut self, _db: SimDatabase) {
41 println!("rust_setup({})", self.client_id);
42 }
43 async fn start(&mut self, db: SimDatabase) {
44 println!("rust_start({})", self.client_id);
45 if self.client_id == 0 {
47 for _ in 0..self.expected_count {
48 let trx = db.create_trx().expect("Could not create transaction");
49
50 trx.set_option(TransactionOption::AutomaticIdempotency)
52 .expect("could not setup automatic idempotency");
53
54 let buf: [u8; 8] = 1i64.to_le_bytes();
55
56 trx.atomic_op(&Subspace::all().pack(&COUNT_KEY), &buf, MutationType::Add);
57
58 match trx.commit().await {
59 Ok(_) => self.success_count += 1,
60 Err(err) => {
61 if err.is_maybe_committed() {
62 self.context.trace(
63 Severity::Warn,
64 "Detected an maybe_committed transactions with idempotency",
65 details![
66 "Layer" => "Rust",
67 "Client" => self.client_id
68 ],
69 );
70 self.maybe_committed_count += 1;
71 } else {
72 self.error_count += 1;
73 }
74 }
75 }
76
77 self.context.trace(
78 Severity::Info,
79 "Successfully setup workload",
80 details![
81 "Layer" => "Rust",
82 "Client" => self.client_id
83 ],
84 );
85 }
86 }
87 }
88 async fn check(&mut self, db: SimDatabase) {
89 println!("rust_check({})", self.client_id);
90 if self.client_id == 0 {
91 let count = db
94 .run(|trx, _maybe_committed| async move {
95 match trx.get(&Subspace::all().pack(&COUNT_KEY), true).await {
96 Err(e) => Err(FdbBindingError::from(e)),
97 Ok(None) => Ok(0),
98 Ok(Some(byte_count)) => {
99 let count = i64::from_le_bytes(byte_count[..8].try_into().unwrap());
100 Ok(count as usize)
101 }
102 }
103 })
104 .await
105 .expect("could not check using db.run");
106
107 if self.success_count == count {
108 self.context.trace(
109 Severity::Info,
110 "Atomic count match",
111 details![
112 "Layer" => "Rust",
113 "Client" => self.client_id,
114 "Expected" => self.expected_count,
115 "Found" => count,
116 "CommittedCount" => self.success_count,
117 "MaybeCommitted" => self.maybe_committed_count,
118 ],
119 );
120 } else {
121 self.context.trace(
122 Severity::Error,
123 "Atomic count doesn't match",
124 details![
125 "Layer" => "Rust",
126 "Client" => self.client_id,
127 "Expected" => self.expected_count,
128 "Found" => count,
129 "CommittedCount" => self.success_count,
130 "MaybeCommitted" => self.maybe_committed_count,
131 ],
132 );
133 }
134 }
135 }
136 fn get_metrics(&self, mut out: Metrics) {
137 println!("rust_get_metrics({})", self.client_id);
138 out.extend([
139 Metric::val("expected_count", self.expected_count as f64),
140 Metric::val("success_count", self.success_count as f64),
141 Metric::val("error_count", self.error_count as f64),
142 ]);
143 }
144 fn get_check_timeout(&self) -> f64 {
145 println!("rust_get_check_timeout({})", self.client_id);
146 5000.0
147 }
148}
149
150register_workload!(AtomicWorkload);