amadeus_runtime/consensus/
consensus_kv.rs1use crate::Result;
2use crate::consensus::consensus_apply::ApplyEnv;
3use crate::consensus::consensus_muts::Mutation;
4use amadeus_utils::rocksdb::{BoundColumnFamily, MultiThreaded, RocksDb, RocksDbTxn, Transaction, TransactionDB};
5use std::sync::Arc;
6
7pub fn kv_put(env: &mut ApplyEnv, key: &[u8], value: &[u8]) -> Result<()> {
8 let old_value = env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")?;
9 env.txn.put_cf(&env.cf, key, value).map_err(|_| "kv_put_failed")?;
10
11 env.muts.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: value.to_vec() });
12 match old_value {
13 None => env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() }),
14 Some(old) => env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old }),
15 }
16 Ok(())
17}
18
19pub fn kv_increment(env: &mut ApplyEnv, key: &[u8], value: i128) -> Result<i128> {
20 match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
21 None => {
22 env.muts.push(Mutation::Put {
23 op: b"put".to_vec(),
24 key: key.to_vec(),
25 value: value.to_string().into_bytes(),
26 });
27 env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() });
28 env.txn.put_cf(&env.cf, key, value.to_string().into_bytes()).map_err(|_| "kv_put_failed")?;
29 Ok(value)
30 }
31 Some(old) => {
32 let new_value: i128 = atoi::atoi::<i128>(&old).ok_or("invalid_integer")? + value;
33 env.muts.push(Mutation::Put {
34 op: b"put".to_vec(),
35 key: key.to_vec(),
36 value: new_value.to_string().into_bytes(),
37 });
38 env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old });
39 env.txn.put_cf(&env.cf, key, new_value.to_string().into_bytes()).map_err(|_| "kv_put_failed")?;
40 Ok(new_value)
41 }
42 }
43}
44
45pub fn kv_delete(env: &mut ApplyEnv, key: &[u8]) -> Result<()> {
46 match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
47 None => (),
48 Some(old) => {
49 env.muts.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() });
50 env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old.to_vec() })
51 }
52 }
53 env.txn.delete_cf(&env.cf, key).map_err(|_| "kv_delete_failed")?;
54 Ok(())
55}
56
57pub fn kv_set_bit(env: &mut ApplyEnv, key: &[u8], bit_idx: u64) -> Result<bool> {
58 let (mut old, exists) = match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
59 None => (vec![0u8; crate::consensus::bic::sol_bloom::PAGE_SIZE as usize], false),
60 Some(value) => (value, true),
61 };
62
63 let byte_idx = (bit_idx / 8) as usize;
64 let bit_in = (bit_idx % 8) as u8;
65
66 let mask: u8 = 1u8 << (7 - bit_in);
67
68 if (old[byte_idx] & mask) != 0 {
69 Ok(false)
70 } else {
71 env.muts.push(Mutation::SetBit {
72 op: b"set_bit".to_vec(),
73 key: key.to_vec(),
74 value: bit_idx,
75 bloomsize: crate::consensus::bic::sol_bloom::PAGE_SIZE,
76 });
77 match exists {
78 true => {
79 env.muts_rev.push(Mutation::ClearBit { op: b"clear_bit".to_vec(), key: key.to_vec(), value: bit_idx })
80 }
81 false => env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() }),
82 };
83 old[byte_idx] |= mask;
84 env.txn.put_cf(&env.cf, key, &old).map_err(|_| "kv_put_failed")?;
85 Ok(true)
86 }
87}
88
89pub fn kv_exists(env: &mut ApplyEnv, key: &[u8]) -> Result<bool> {
90 Ok(env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")?.is_some())
91}
92
93pub fn kv_get(env: &ApplyEnv, key: &[u8]) -> Result<Option<Vec<u8>>> {
94 env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")
95}
96
97pub fn kv_get_next(env: &mut ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
98 let mut seek = Vec::with_capacity(prefix.len() + key.len());
99 seek.extend_from_slice(prefix);
100 seek.extend_from_slice(key);
101
102 let mut iter = env.txn.raw_iterator_cf(&env.cf);
103 iter.seek(&seek);
104
105 if !iter.valid() {
106 return None;
107 }
108
109 if let Some(k) = iter.key() {
111 if k == &seek[..] {
112 iter.next();
113 }
114 }
115
116 match (iter.key(), iter.value()) {
117 (Some(k), Some(v)) if k.starts_with(prefix) => {
118 let next_key_wo_prefix = k[prefix.len()..].to_vec();
120 Some((next_key_wo_prefix, v.to_vec()))
121 }
122 _ => None,
123 }
124}
125
126pub fn kv_get_prev(env: &mut ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
127 let mut seek = Vec::with_capacity(prefix.len() + key.len());
128 seek.extend_from_slice(prefix);
129 seek.extend_from_slice(key);
130
131 let mut iter = env.txn.raw_iterator_cf(&env.cf);
132 iter.seek_for_prev(&seek);
133
134 if !iter.valid() {
135 return None;
136 }
137
138 if let Some(k) = iter.key() {
139 if k == &seek[..] {
140 iter.prev();
141 }
142 }
143
144 match (iter.key(), iter.value()) {
145 (Some(k), Some(v)) if k.starts_with(prefix) => {
146 let prev_key_wo_prefix = k[prefix.len()..].to_vec();
147 Some((prev_key_wo_prefix, v.to_vec()))
148 }
149 _ => None,
150 }
151}
152
153pub fn kv_get_prev_or_exact(env: &ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
154 let mut seek = Vec::with_capacity(prefix.len() + key.len());
155 seek.extend_from_slice(prefix);
156 seek.extend_from_slice(key);
157
158 let mut iter = env.txn.raw_iterator_cf(&env.cf);
159 iter.seek_for_prev(&seek);
160
161 match (iter.key(), iter.value()) {
162 (Some(k), Some(v)) if k.starts_with(prefix) => {
163 let key_wo_prefix = k[prefix.len()..].to_vec();
164 Some((key_wo_prefix, v.to_vec()))
165 }
166 _ => None,
167 }
168}
169
170pub fn kv_get_prefix(env: &ApplyEnv, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
173 let mut results = Vec::new();
174 let mut iter = env.txn.raw_iterator_cf(&env.cf);
175 iter.seek(prefix);
176
177 while iter.valid() {
178 if let (Some(key), Some(value)) = (iter.key(), iter.value())
179 && key.starts_with(prefix)
180 {
181 let key_without_prefix = key[prefix.len()..].to_vec();
183 results.push((key_without_prefix, value.to_vec()));
184
185 iter.next();
186 } else {
187 break;
188 }
189 }
190
191 results
192}
193
194pub fn kv_clear_prefix(env: &mut ApplyEnv, prefix: &[u8]) -> Result<()> {
196 let mut keys_to_delete = Vec::new();
198 let mut iter = env.txn.raw_iterator_cf(&env.cf);
199 iter.seek(prefix);
200
201 while iter.valid() {
202 if let Some(key) = iter.key()
203 && key.starts_with(prefix)
204 {
205 keys_to_delete.push(key.to_vec());
206 iter.next();
207 } else {
208 break;
209 }
210 }
211
212 drop(iter); for key in keys_to_delete {
216 let _ = kv_delete(env, &key)?;
217 }
218
219 Ok(())
220}
221
222pub fn revert(env: &mut ApplyEnv) -> Result<()> {
223 for m in env.muts_rev.clone() {
224 match m {
225 Mutation::Put { op: _, key, value } => {
226 kv_put(env, key.as_slice(), value.as_slice())?;
227 }
228 Mutation::Delete { op: _, key } => {
229 kv_delete(env, key.as_slice())?;
230 }
231 Mutation::SetBit { op: _, key: _, value: _, bloomsize: _ } => {
232 panic!("SetBit should not be in reverse mutations");
233 }
234 Mutation::ClearBit { op: _, key, value } => {
235 let bit_idx = value;
236 if let Some(mut old) = kv_get(env, key.as_slice())? {
237 let byte_idx = (bit_idx / 8) as usize;
238 let bit_in = (bit_idx % 8) as u8;
239 if byte_idx < old.len() {
240 let mask: u8 = 1u8 << (7 - bit_in);
241 old[byte_idx] &= !mask;
242 kv_put(env, key.as_slice(), old.as_slice())?;
243 }
244 }
245 }
246 }
247 }
248
249 Ok(())
250}
251
252pub fn apply_mutations(db: &RocksDb, cf_name: &str, muts_rev: &[Mutation]) -> Result<()> {
253 let cf = db.inner.cf_handle(cf_name).ok_or("cf_handle_failed")?;
254 let txn = db.begin_transaction();
255
256 for m in muts_rev {
257 match m {
258 Mutation::Put { op: _, key, value } => {
259 txn.put_cf(&cf, key, value).map_err(|_| "kv_put_failed")?;
260 }
261 Mutation::Delete { op: _, key } => {
262 txn.delete_cf(&cf, key).map_err(|_| "kv_delete_failed")?;
263 }
264 Mutation::SetBit { op: _, key: _, value: _, bloomsize: _ } => {
265 panic!("SetBit should not be in reverse mutations");
266 }
267 Mutation::ClearBit { op: _, key, value } => {
268 let bit_idx = value;
269 if let Some(mut old) = txn.get_cf(&cf, key).map_err(|_| "kv_get_failed")? {
270 let byte_idx = (bit_idx / 8) as usize;
271 let bit_in = (bit_idx % 8) as u8;
272 if byte_idx < old.len() {
273 let mask: u8 = 1u8 << (7 - bit_in);
274 old[byte_idx] &= !mask;
275 txn.put_cf(&cf, key, old).map_err(|_| "kv_put_failed")?;
276 }
277 }
278 }
279 }
280 }
281
282 Ok(())
283}