Skip to main content

amadeus_runtime/consensus/
consensus_kv.rs

1use crate::Result;
2use crate::consensus::consensus_apply::ApplyEnv;
3use crate::consensus::consensus_muts::Mutation;
4use amadeus_utils::rocksdb::{BoundColumnFamily, MultiThreaded, RocksDb, RocksDbTxn, Transaction, TransactionDB};
5use std::sync::Arc;
6
7pub fn kv_put(env: &mut ApplyEnv, key: &[u8], value: &[u8]) -> Result<()> {
8    let old_value = env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")?;
9    env.txn.put_cf(&env.cf, key, value).map_err(|_| "kv_put_failed")?;
10
11    env.muts.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: value.to_vec() });
12    match old_value {
13        None => env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() }),
14        Some(old) => env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old }),
15    }
16    Ok(())
17}
18
19pub fn kv_increment(env: &mut ApplyEnv, key: &[u8], value: i128) -> Result<i128> {
20    match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
21        None => {
22            env.muts.push(Mutation::Put {
23                op: b"put".to_vec(),
24                key: key.to_vec(),
25                value: value.to_string().into_bytes(),
26            });
27            env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() });
28            env.txn.put_cf(&env.cf, key, value.to_string().into_bytes()).map_err(|_| "kv_put_failed")?;
29            Ok(value)
30        }
31        Some(old) => {
32            let new_value: i128 = atoi::atoi::<i128>(&old).ok_or("invalid_integer")? + value;
33            env.muts.push(Mutation::Put {
34                op: b"put".to_vec(),
35                key: key.to_vec(),
36                value: new_value.to_string().into_bytes(),
37            });
38            env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old });
39            env.txn.put_cf(&env.cf, key, new_value.to_string().into_bytes()).map_err(|_| "kv_put_failed")?;
40            Ok(new_value)
41        }
42    }
43}
44
45pub fn kv_delete(env: &mut ApplyEnv, key: &[u8]) -> Result<()> {
46    match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
47        None => (),
48        Some(old) => {
49            env.muts.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() });
50            env.muts_rev.push(Mutation::Put { op: b"put".to_vec(), key: key.to_vec(), value: old.to_vec() })
51        }
52    }
53    env.txn.delete_cf(&env.cf, key).map_err(|_| "kv_delete_failed")?;
54    Ok(())
55}
56
57pub fn kv_set_bit(env: &mut ApplyEnv, key: &[u8], bit_idx: u64) -> Result<bool> {
58    let (mut old, exists) = match env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")? {
59        None => (vec![0u8; crate::consensus::bic::sol_bloom::PAGE_SIZE as usize], false),
60        Some(value) => (value, true),
61    };
62
63    let byte_idx = (bit_idx / 8) as usize;
64    let bit_in = (bit_idx % 8) as u8;
65
66    let mask: u8 = 1u8 << (7 - bit_in);
67
68    if (old[byte_idx] & mask) != 0 {
69        Ok(false)
70    } else {
71        env.muts.push(Mutation::SetBit {
72            op: b"set_bit".to_vec(),
73            key: key.to_vec(),
74            value: bit_idx,
75            bloomsize: crate::consensus::bic::sol_bloom::PAGE_SIZE,
76        });
77        match exists {
78            true => {
79                env.muts_rev.push(Mutation::ClearBit { op: b"clear_bit".to_vec(), key: key.to_vec(), value: bit_idx })
80            }
81            false => env.muts_rev.push(Mutation::Delete { op: b"delete".to_vec(), key: key.to_vec() }),
82        };
83        old[byte_idx] |= mask;
84        env.txn.put_cf(&env.cf, key, &old).map_err(|_| "kv_put_failed")?;
85        Ok(true)
86    }
87}
88
89pub fn kv_exists(env: &mut ApplyEnv, key: &[u8]) -> Result<bool> {
90    Ok(env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")?.is_some())
91}
92
93pub fn kv_get(env: &ApplyEnv, key: &[u8]) -> Result<Option<Vec<u8>>> {
94    env.txn.get_cf(&env.cf, key).map_err(|_| "kv_get_failed")
95}
96
97pub fn kv_get_next(env: &mut ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
98    let mut seek = Vec::with_capacity(prefix.len() + key.len());
99    seek.extend_from_slice(prefix);
100    seek.extend_from_slice(key);
101
102    let mut iter = env.txn.raw_iterator_cf(&env.cf);
103    iter.seek(&seek);
104
105    if !iter.valid() {
106        return None;
107    }
108
109    // skip the exact match key if found
110    if let Some(k) = iter.key() {
111        if k == &seek[..] {
112            iter.next();
113        }
114    }
115
116    match (iter.key(), iter.value()) {
117        (Some(k), Some(v)) if k.starts_with(prefix) => {
118            // return key without prefix
119            let next_key_wo_prefix = k[prefix.len()..].to_vec();
120            Some((next_key_wo_prefix, v.to_vec()))
121        }
122        _ => None,
123    }
124}
125
126pub fn kv_get_prev(env: &mut ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
127    let mut seek = Vec::with_capacity(prefix.len() + key.len());
128    seek.extend_from_slice(prefix);
129    seek.extend_from_slice(key);
130
131    let mut iter = env.txn.raw_iterator_cf(&env.cf);
132    iter.seek_for_prev(&seek);
133
134    if !iter.valid() {
135        return None;
136    }
137
138    if let Some(k) = iter.key() {
139        if k == &seek[..] {
140            iter.prev();
141        }
142    }
143
144    match (iter.key(), iter.value()) {
145        (Some(k), Some(v)) if k.starts_with(prefix) => {
146            let prev_key_wo_prefix = k[prefix.len()..].to_vec();
147            Some((prev_key_wo_prefix, v.to_vec()))
148        }
149        _ => None,
150    }
151}
152
153pub fn kv_get_prev_or_exact(env: &ApplyEnv, prefix: &[u8], key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
154    let mut seek = Vec::with_capacity(prefix.len() + key.len());
155    seek.extend_from_slice(prefix);
156    seek.extend_from_slice(key);
157
158    let mut iter = env.txn.raw_iterator_cf(&env.cf);
159    iter.seek_for_prev(&seek);
160
161    match (iter.key(), iter.value()) {
162        (Some(k), Some(v)) if k.starts_with(prefix) => {
163            let key_wo_prefix = k[prefix.len()..].to_vec();
164            Some((key_wo_prefix, v.to_vec()))
165        }
166        _ => None,
167    }
168}
169
170/// Get all key-value pairs with the given prefix
171/// Returns vector of (key_without_prefix, value_bytes)
172pub fn kv_get_prefix(env: &ApplyEnv, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
173    let mut results = Vec::new();
174    let mut iter = env.txn.raw_iterator_cf(&env.cf);
175    iter.seek(prefix);
176
177    while iter.valid() {
178        if let (Some(key), Some(value)) = (iter.key(), iter.value())
179            && key.starts_with(prefix)
180        {
181            // extract key without prefix
182            let key_without_prefix = key[prefix.len()..].to_vec();
183            results.push((key_without_prefix, value.to_vec()));
184
185            iter.next();
186        } else {
187            break;
188        }
189    }
190
191    results
192}
193
194/// Clear all keys with the given prefix, tracking mutations for revert
195pub fn kv_clear_prefix(env: &mut ApplyEnv, prefix: &[u8]) -> Result<()> {
196    // collect all keys with this prefix in a separate scope to drop the iterator
197    let mut keys_to_delete = Vec::new();
198    let mut iter = env.txn.raw_iterator_cf(&env.cf);
199    iter.seek(prefix);
200
201    while iter.valid() {
202        if let Some(key) = iter.key()
203            && key.starts_with(prefix)
204        {
205            keys_to_delete.push(key.to_vec());
206            iter.next();
207        } else {
208            break;
209        }
210    }
211
212    drop(iter); // must explicitly go out of scope before modifying env
213
214    // now we can delete each key and track mutations
215    for key in keys_to_delete {
216        let _ = kv_delete(env, &key)?;
217    }
218
219    Ok(())
220}
221
222pub fn revert(env: &mut ApplyEnv) -> Result<()> {
223    for m in env.muts_rev.clone() {
224        match m {
225            Mutation::Put { op: _, key, value } => {
226                kv_put(env, key.as_slice(), value.as_slice())?;
227            }
228            Mutation::Delete { op: _, key } => {
229                kv_delete(env, key.as_slice())?;
230            }
231            Mutation::SetBit { op: _, key: _, value: _, bloomsize: _ } => {
232                panic!("SetBit should not be in reverse mutations");
233            }
234            Mutation::ClearBit { op: _, key, value } => {
235                let bit_idx = value;
236                if let Some(mut old) = kv_get(env, key.as_slice())? {
237                    let byte_idx = (bit_idx / 8) as usize;
238                    let bit_in = (bit_idx % 8) as u8;
239                    if byte_idx < old.len() {
240                        let mask: u8 = 1u8 << (7 - bit_in);
241                        old[byte_idx] &= !mask;
242                        kv_put(env, key.as_slice(), old.as_slice())?;
243                    }
244                }
245            }
246        }
247    }
248
249    Ok(())
250}
251
252pub fn apply_mutations(db: &RocksDb, cf_name: &str, muts_rev: &[Mutation]) -> Result<()> {
253    let cf = db.inner.cf_handle(cf_name).ok_or("cf_handle_failed")?;
254    let txn = db.begin_transaction();
255
256    for m in muts_rev {
257        match m {
258            Mutation::Put { op: _, key, value } => {
259                txn.put_cf(&cf, key, value).map_err(|_| "kv_put_failed")?;
260            }
261            Mutation::Delete { op: _, key } => {
262                txn.delete_cf(&cf, key).map_err(|_| "kv_delete_failed")?;
263            }
264            Mutation::SetBit { op: _, key: _, value: _, bloomsize: _ } => {
265                panic!("SetBit should not be in reverse mutations");
266            }
267            Mutation::ClearBit { op: _, key, value } => {
268                let bit_idx = value;
269                if let Some(mut old) = txn.get_cf(&cf, key).map_err(|_| "kv_get_failed")? {
270                    let byte_idx = (bit_idx / 8) as usize;
271                    let bit_in = (bit_idx % 8) as u8;
272                    if byte_idx < old.len() {
273                        let mask: u8 = 1u8 << (7 - bit_in);
274                        old[byte_idx] &= !mask;
275                        txn.put_cf(&cf, key, old).map_err(|_| "kv_put_failed")?;
276                    }
277                }
278            }
279        }
280    }
281
282    Ok(())
283}