atomic/
lib.rs

1use foundationdb::{
2    options::{MutationType, TransactionOption},
3    tuple::Subspace,
4    FdbBindingError,
5};
6use foundationdb_simulation::{
7    details, register_workload, Metric, Metrics, RustWorkload, Severity, SimDatabase,
8    SingleRustWorkload, WorkloadContext,
9};
10
11pub struct AtomicWorkload {
12    context: WorkloadContext,
13    client_id: i32,
14    // how many transactions will be run
15    expected_count: usize,
16    // how many transactions succeeded
17    success_count: usize,
18    // how many transactions failed
19    error_count: usize,
20    // how many maybe_committed transactions we encountered
21    maybe_committed_count: usize,
22}
23
24const COUNT_KEY: &[u8] = b"count";
25
26impl SingleRustWorkload for AtomicWorkload {
27    fn new(_name: String, context: WorkloadContext) -> Self {
28        Self {
29            client_id: context.client_id(),
30            expected_count: context.get_option("count").expect("Could not get count"),
31            context,
32            success_count: 0,
33            error_count: 0,
34            maybe_committed_count: 0,
35        }
36    }
37}
38
39impl RustWorkload for AtomicWorkload {
40    async fn setup(&mut self, _db: SimDatabase) {
41        println!("rust_setup({})", self.client_id);
42    }
43    async fn start(&mut self, db: SimDatabase) {
44        println!("rust_start({})", self.client_id);
45        // Only use a single client
46        if self.client_id == 0 {
47            for _ in 0..self.expected_count {
48                let trx = db.create_trx().expect("Could not create transaction");
49
50                // Enable idempotent txn
51                trx.set_option(TransactionOption::AutomaticIdempotency)
52                    .expect("could not setup automatic idempotency");
53
54                let buf: [u8; 8] = 1i64.to_le_bytes();
55
56                trx.atomic_op(&Subspace::all().pack(&COUNT_KEY), &buf, MutationType::Add);
57
58                match trx.commit().await {
59                    Ok(_) => self.success_count += 1,
60                    Err(err) => {
61                        if err.is_maybe_committed() {
62                            self.context.trace(
63                                Severity::Warn,
64                                "Detected an maybe_committed transactions with idempotency",
65                                details![
66                                    "Layer" => "Rust",
67                                    "Client" => self.client_id
68                                ],
69                            );
70                            self.maybe_committed_count += 1;
71                        } else {
72                            self.error_count += 1;
73                        }
74                    }
75                }
76
77                self.context.trace(
78                    Severity::Info,
79                    "Successfully setup workload",
80                    details![
81                        "Layer" => "Rust",
82                        "Client" => self.client_id
83                    ],
84                );
85            }
86        }
87    }
88    async fn check(&mut self, db: SimDatabase) {
89        println!("rust_check({})", self.client_id);
90        if self.client_id == 0 {
91            // even if buggify is off in checks, transactions can failed because of the randomized knob,
92            // so we need to wrap the check in a db.run
93            let count = db
94                .run(|trx, _maybe_committed| async move {
95                    match trx.get(&Subspace::all().pack(&COUNT_KEY), true).await {
96                        Err(e) => Err(FdbBindingError::from(e)),
97                        Ok(None) => Ok(0),
98                        Ok(Some(byte_count)) => {
99                            let count = i64::from_le_bytes(byte_count[..8].try_into().unwrap());
100                            Ok(count as usize)
101                        }
102                    }
103                })
104                .await
105                .expect("could not check using db.run");
106
107            if self.success_count == count {
108                self.context.trace(
109                    Severity::Info,
110                    "Atomic count match",
111                    details![
112                        "Layer" => "Rust",
113                        "Client" => self.client_id,
114                        "Expected" => self.expected_count,
115                        "Found" => count,
116                        "CommittedCount" => self.success_count,
117                        "MaybeCommitted" => self.maybe_committed_count,
118                    ],
119                );
120            } else {
121                self.context.trace(
122                    Severity::Error,
123                    "Atomic count doesn't match",
124                    details![
125                        "Layer" => "Rust",
126                        "Client" => self.client_id,
127                        "Expected" => self.expected_count,
128                        "Found" => count,
129                        "CommittedCount" => self.success_count,
130                        "MaybeCommitted" => self.maybe_committed_count,
131                    ],
132                );
133            }
134        }
135    }
136    fn get_metrics(&self, mut out: Metrics) {
137        println!("rust_get_metrics({})", self.client_id);
138        out.extend([
139            Metric::val("expected_count", self.expected_count as f64),
140            Metric::val("success_count", self.success_count as f64),
141            Metric::val("error_count", self.error_count as f64),
142        ]);
143    }
144    fn get_check_timeout(&self) -> f64 {
145        println!("rust_get_check_timeout({})", self.client_id);
146        5000.0
147    }
148}
149
150register_workload!(AtomicWorkload);