#[cfg(test)]
mod tests;
use std::{cmp, collections::VecDeque, mem, time::Instant};
use engine_shared::{
logging::{log_duration, log_metric},
newtypes::{Blake2bHash, CorrelationId},
};
use types::bytesrepr::{self, FromBytes, ToBytes};
use crate::{
transaction_source::{Readable, Writable},
trie::{self, Parents, Pointer, Trie, RADIX},
trie_store::TrieStore,
GAUGE_METRIC_KEY,
};
const TRIE_STORE_READ_DURATION: &str = "trie_store_read_duration";
const TRIE_STORE_READ_GETS: &str = "trie_store_read_gets";
const TRIE_STORE_SCAN_DURATION: &str = "trie_store_scan_duration";
const TRIE_STORE_SCAN_GETS: &str = "trie_store_scan_gets";
const TRIE_STORE_WRITE_DURATION: &str = "trie_store_write_duration";
const TRIE_STORE_WRITE_PUTS: &str = "trie_store_write_puts";
const READ: &str = "read";
const GET: &str = "get";
const SCAN: &str = "scan";
const WRITE: &str = "write";
const PUT: &str = "put";
#[derive(Debug, PartialEq, Eq)]
pub enum ReadResult<V> {
Found(V),
NotFound,
RootNotFound,
}
pub fn read<K, V, T, S, E>(
correlation_id: CorrelationId,
txn: &T,
store: &S,
root: &Blake2bHash,
key: &K,
) -> Result<ReadResult<V>, E>
where
K: ToBytes + FromBytes + Eq + std::fmt::Debug,
V: ToBytes + FromBytes,
T: Readable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error>,
E: From<S::Error> + From<types::bytesrepr::Error>,
{
let path: Vec<u8> = key.to_bytes()?;
let mut depth: usize = 0;
let mut current: Trie<K, V> = match store.get(txn, root)? {
Some(root) => root,
None => return Ok(ReadResult::RootNotFound),
};
let start = Instant::now();
let mut get_counter: i32 = 0;
loop {
match current {
Trie::Leaf {
key: leaf_key,
value: leaf_value,
} => {
let result = if *key == leaf_key {
ReadResult::Found(leaf_value)
} else {
ReadResult::NotFound
};
log_metric(
correlation_id,
TRIE_STORE_READ_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_READ_DURATION,
READ,
start.elapsed(),
);
return Ok(result);
}
Trie::Node { pointer_block } => {
let index: usize = {
assert!(depth < path.len(), "depth must be < {}", path.len());
path[depth].into()
};
let maybe_pointer: Option<Pointer> = {
assert!(index < trie::RADIX, "key length must be < {}", trie::RADIX);
pointer_block[index]
};
match maybe_pointer {
Some(pointer) => match store.get(txn, pointer.hash())? {
Some(next) => {
get_counter += 1;
depth += 1;
current = next;
}
None => {
get_counter += 1;
log_metric(
correlation_id,
TRIE_STORE_READ_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_READ_DURATION,
READ,
start.elapsed(),
);
panic!(
"No trie value at key: {:?} (reading from key: {:?})",
pointer.hash(),
key
);
}
},
None => {
log_metric(
correlation_id,
TRIE_STORE_READ_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_READ_DURATION,
READ,
start.elapsed(),
);
return Ok(ReadResult::NotFound);
}
}
}
Trie::Extension { affix, pointer } => {
let sub_path = &path[depth..depth + affix.len()];
if sub_path == affix.as_slice() {
get_counter += 1;
match store.get(txn, pointer.hash())? {
Some(next) => {
get_counter += 1;
depth += affix.len();
current = next;
}
None => {
get_counter += 1;
log_metric(
correlation_id,
TRIE_STORE_READ_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_READ_DURATION,
READ,
start.elapsed(),
);
panic!(
"No trie value at key: {:?} (reading from key: {:?})",
pointer.hash(),
key
);
}
}
} else {
log_metric(
correlation_id,
TRIE_STORE_READ_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_READ_DURATION,
READ,
start.elapsed(),
);
return Ok(ReadResult::NotFound);
}
}
}
}
}
struct TrieScan<K, V> {
tip: Trie<K, V>,
parents: Parents<K, V>,
}
impl<K, V> TrieScan<K, V> {
fn new(tip: Trie<K, V>, parents: Parents<K, V>) -> Self {
TrieScan { tip, parents }
}
}
fn scan<K, V, T, S, E>(
correlation_id: CorrelationId,
txn: &T,
store: &S,
key_bytes: &[u8],
root: &Trie<K, V>,
) -> Result<TrieScan<K, V>, E>
where
K: ToBytes + FromBytes + Clone,
V: ToBytes + FromBytes + Clone,
T: Readable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error>,
E: From<S::Error> + From<types::bytesrepr::Error>,
{
let start = Instant::now();
let mut get_counter: i32 = 0;
let path = key_bytes;
let mut current = root.to_owned();
let mut depth: usize = 0;
let mut acc: Parents<K, V> = Vec::new();
loop {
match current {
leaf @ Trie::Leaf { .. } => {
log_metric(
correlation_id,
TRIE_STORE_SCAN_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_SCAN_DURATION,
SCAN,
start.elapsed(),
);
return Ok(TrieScan::new(leaf, acc));
}
Trie::Node { pointer_block } => {
let index = {
assert!(depth < path.len(), "depth must be < {}", path.len());
path[depth]
};
let maybe_pointer: Option<Pointer> = {
let index: usize = index.into();
assert!(index < trie::RADIX, "index must be < {}", trie::RADIX);
pointer_block[index]
};
let pointer = match maybe_pointer {
Some(pointer) => pointer,
None => {
log_metric(
correlation_id,
TRIE_STORE_SCAN_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_SCAN_DURATION,
SCAN,
start.elapsed(),
);
return Ok(TrieScan::new(Trie::Node { pointer_block }, acc));
}
};
match store.get(txn, pointer.hash())? {
Some(next) => {
get_counter += 1;
current = next;
depth += 1;
acc.push((index, Trie::Node { pointer_block }))
}
None => {
get_counter += 1;
log_metric(
correlation_id,
TRIE_STORE_SCAN_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_SCAN_DURATION,
SCAN,
start.elapsed(),
);
panic!(
"No trie value at key: {:?} (reading from path: {:?})",
pointer.hash(),
path
);
}
}
}
Trie::Extension { affix, pointer } => {
let sub_path = &path[depth..depth + affix.len()];
if sub_path != affix.as_slice() {
log_metric(
correlation_id,
TRIE_STORE_SCAN_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_SCAN_DURATION,
SCAN,
start.elapsed(),
);
return Ok(TrieScan::new(Trie::Extension { affix, pointer }, acc));
}
match store.get(txn, pointer.hash())? {
Some(next) => {
get_counter += 1;
let index = {
assert!(depth < path.len(), "depth must be < {}", path.len());
path[depth]
};
current = next;
depth += affix.len();
acc.push((index, Trie::Extension { affix, pointer }))
}
None => {
get_counter += 1;
log_metric(
correlation_id,
TRIE_STORE_SCAN_GETS,
GET,
GAUGE_METRIC_KEY,
f64::from(get_counter),
);
log_duration(
correlation_id,
TRIE_STORE_SCAN_DURATION,
SCAN,
start.elapsed(),
);
panic!(
"No trie value at key: {:?} (reading from path: {:?})",
pointer.hash(),
path
);
}
}
}
}
}
}
#[allow(clippy::type_complexity)]
fn rehash<K, V>(
mut tip: Trie<K, V>,
parents: Parents<K, V>,
) -> Result<Vec<(Blake2bHash, Trie<K, V>)>, bytesrepr::Error>
where
K: ToBytes + Clone,
V: ToBytes + Clone,
{
let mut ret: Vec<(Blake2bHash, Trie<K, V>)> = Vec::new();
let mut tip_hash = {
let trie_bytes = tip.to_bytes()?;
Blake2bHash::new(&trie_bytes)
};
ret.push((tip_hash, tip.to_owned()));
for (index, parent) in parents.into_iter().rev() {
match parent {
Trie::Leaf { .. } => {
panic!("parents should not contain any leaves");
}
Trie::Node { mut pointer_block } => {
tip = {
let pointer = match tip {
Trie::Leaf { .. } => Pointer::LeafPointer(tip_hash),
Trie::Node { .. } => Pointer::NodePointer(tip_hash),
Trie::Extension { .. } => Pointer::NodePointer(tip_hash),
};
pointer_block[index.into()] = Some(pointer);
Trie::Node { pointer_block }
};
tip_hash = {
let node_bytes = tip.to_bytes()?;
Blake2bHash::new(&node_bytes)
};
ret.push((tip_hash, tip.to_owned()))
}
Trie::Extension { affix, pointer } => {
tip = {
let pointer = pointer.update(tip_hash);
Trie::Extension { affix, pointer }
};
tip_hash = {
let extension_bytes = tip.to_bytes()?;
Blake2bHash::new(&extension_bytes)
};
ret.push((tip_hash, tip.to_owned()))
}
}
}
Ok(ret)
}
fn common_prefix<A: Eq + Clone>(ls: &[A], rs: &[A]) -> Vec<A> {
ls.iter()
.zip(rs.iter())
.take_while(|(l, r)| l == r)
.map(|(l, _)| l.to_owned())
.collect()
}
fn get_parents_path<K, V>(parents: &[(u8, Trie<K, V>)]) -> Vec<u8> {
let mut ret = Vec::new();
for (index, element) in parents.iter() {
if let Trie::Extension { affix, .. } = element {
ret.extend(affix);
} else {
ret.push(index.to_owned());
}
}
ret
}
fn add_node_to_parents<K, V>(
path_to_leaf: &[u8],
new_parent_node: Trie<K, V>,
mut parents: Parents<K, V>,
) -> Result<Parents<K, V>, bytesrepr::Error>
where
K: ToBytes,
V: ToBytes,
{
match new_parent_node {
Trie::Node { .. } => (),
_ => panic!("new_parent must be a node"),
}
let depth: usize = {
let path_to_node: Vec<u8> = get_parents_path(&parents);
let current_path = common_prefix(&path_to_leaf, &path_to_node);
assert_eq!(current_path, path_to_node);
path_to_node.len()
};
let index = {
assert!(
depth < path_to_leaf.len(),
"depth must be < {}",
path_to_leaf.len()
);
path_to_leaf[depth]
};
parents.push((index, new_parent_node));
Ok(parents)
}
#[allow(clippy::type_complexity)]
fn reparent_leaf<K, V>(
new_leaf_path: &[u8],
existing_leaf_path: &[u8],
parents: Parents<K, V>,
) -> Result<(Trie<K, V>, Parents<K, V>), bytesrepr::Error>
where
K: ToBytes,
V: ToBytes,
{
let mut parents = parents;
let (child_index, parent) = parents.pop().expect("parents should not be empty");
let pointer_block = match parent {
Trie::Node { pointer_block } => pointer_block,
_ => panic!("A leaf should have a node for its parent"),
};
let shared_path = common_prefix(&new_leaf_path, &existing_leaf_path);
let new_node = {
let index: usize = existing_leaf_path[shared_path.len()].into();
let existing_leaf_pointer =
pointer_block[<usize>::from(child_index)].expect("parent has lost the existing leaf");
Trie::node(&[(index, existing_leaf_pointer)])
};
parents.push((child_index, Trie::Node { pointer_block }));
let affix = {
let parents_path = get_parents_path(&parents);
&shared_path[parents_path.len()..]
};
if !affix.is_empty() {
let new_node_bytes = new_node.to_bytes()?;
let new_node_hash = Blake2bHash::new(&new_node_bytes);
let new_extension = Trie::extension(affix.to_vec(), Pointer::NodePointer(new_node_hash));
parents.push((child_index, new_extension));
}
Ok((new_node, parents))
}
struct SplitResult<K, V> {
new_node: Trie<K, V>,
parents: Parents<K, V>,
maybe_hashed_child_extension: Option<(Blake2bHash, Trie<K, V>)>,
}
fn split_extension<K, V>(
new_leaf_path: &[u8],
existing_extension: Trie<K, V>,
mut parents: Parents<K, V>,
) -> Result<SplitResult<K, V>, bytesrepr::Error>
where
K: ToBytes + Clone,
V: ToBytes + Clone,
{
let (affix, pointer) = match existing_extension {
Trie::Extension { affix, pointer } => (affix, pointer),
_ => panic!("existing_extension must be an extension"),
};
let parents_path = get_parents_path(&parents);
let existing_extension_path: Vec<u8> =
parents_path.iter().chain(affix.iter()).cloned().collect();
let shared_path = common_prefix(&new_leaf_path, &existing_extension_path);
let parent_extension_affix = shared_path[parents_path.len()..].to_vec();
let child_extension_affix = affix[parent_extension_affix.len() + 1..].to_vec();
let maybe_hashed_child_extension: Option<(Blake2bHash, Trie<K, V>)> =
if child_extension_affix.is_empty() {
None
} else {
let child_extension = Trie::extension(child_extension_affix.to_vec(), pointer);
let child_extension_bytes = child_extension.to_bytes()?;
let child_extension_hash = Blake2bHash::new(&child_extension_bytes);
Some((child_extension_hash, child_extension))
};
let new_node: Trie<K, V> = {
let index: usize = existing_extension_path[shared_path.len()].into();
let pointer = maybe_hashed_child_extension
.to_owned()
.map_or(pointer, |(hash, _)| Pointer::NodePointer(hash));
Trie::node(&[(index, pointer)])
};
if !parent_extension_affix.is_empty() {
let new_node_bytes = new_node.to_bytes()?;
let new_node_hash = Blake2bHash::new(&new_node_bytes);
let parent_extension = Trie::extension(
parent_extension_affix.to_vec(),
Pointer::NodePointer(new_node_hash),
);
parents.push((parent_extension_affix[0], parent_extension));
}
Ok(SplitResult {
new_node,
parents,
maybe_hashed_child_extension,
})
}
#[derive(Debug, PartialEq, Eq)]
pub enum WriteResult {
Written(Blake2bHash),
AlreadyExists,
RootNotFound,
}
pub fn write<K, V, T, S, E>(
correlation_id: CorrelationId,
txn: &mut T,
store: &S,
root: &Blake2bHash,
key: &K,
value: &V,
) -> Result<WriteResult, E>
where
K: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
V: ToBytes + FromBytes + Clone + Eq,
T: Readable<Handle = S::Handle> + Writable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error>,
E: From<S::Error> + From<types::bytesrepr::Error>,
{
let start = Instant::now();
let mut put_counter: i32 = 0;
match store.get(txn, root)? {
None => Ok(WriteResult::RootNotFound),
Some(current_root) => {
let new_leaf = Trie::Leaf {
key: key.to_owned(),
value: value.to_owned(),
};
let path: Vec<u8> = key.to_bytes()?;
let TrieScan { tip, parents } =
scan::<K, V, T, S, E>(correlation_id, txn, store, &path, ¤t_root)?;
let new_elements: Vec<(Blake2bHash, Trie<K, V>)> = match tip {
Trie::Leaf { .. } if new_leaf == tip => Vec::new(),
Trie::Leaf {
key: ref leaf_key,
value: ref leaf_value,
} if key == leaf_key && value != leaf_value => rehash(new_leaf, parents)?,
Trie::Leaf {
key: ref existing_leaf_key,
..
} if key != existing_leaf_key => {
let existing_leaf_path = existing_leaf_key.to_bytes()?;
let (new_node, parents) = reparent_leaf(&path, &existing_leaf_path, parents)?;
let parents = add_node_to_parents(&path, new_node, parents)?;
rehash(new_leaf, parents)?
}
Trie::Leaf { .. } => unreachable!(),
node @ Trie::Node { .. } => {
let parents = add_node_to_parents(&path, node, parents)?;
rehash(new_leaf, parents)?
}
extension @ Trie::Extension { .. } => {
let SplitResult {
new_node,
parents,
maybe_hashed_child_extension,
} = split_extension(&path, extension, parents)?;
let parents = add_node_to_parents(&path, new_node, parents)?;
if let Some(hashed_extension) = maybe_hashed_child_extension {
let mut ret = vec![hashed_extension];
ret.extend(rehash(new_leaf, parents)?);
ret
} else {
rehash(new_leaf, parents)?
}
}
};
if new_elements.is_empty() {
log_duration(
correlation_id,
TRIE_STORE_WRITE_DURATION,
WRITE,
start.elapsed(),
);
return Ok(WriteResult::AlreadyExists);
}
let mut root_hash = root.to_owned();
for (hash, element) in new_elements.iter() {
put_counter += 1;
store.put(txn, hash, element)?;
root_hash = *hash;
}
log_metric(
correlation_id,
TRIE_STORE_WRITE_PUTS,
PUT,
GAUGE_METRIC_KEY,
f64::from(put_counter),
);
log_duration(
correlation_id,
TRIE_STORE_WRITE_DURATION,
WRITE,
start.elapsed(),
);
Ok(WriteResult::Written(root_hash))
}
}
}
enum KeysIteratorState<K, V, S: TrieStore<K, V>> {
Ok,
ReturnError(S::Error),
Failed,
}
struct VisitedTrieNode<K, V> {
trie: Trie<K, V>,
maybe_index: Option<usize>,
path: Vec<u8>,
}
pub struct KeysIterator<'a, 'b, K, V, T, S: TrieStore<K, V>> {
initial_descend: VecDeque<u8>,
visited: Vec<VisitedTrieNode<K, V>>,
store: &'a S,
txn: &'b T,
state: KeysIteratorState<K, V, S>,
}
impl<'a, 'b, K, V, T, S> Iterator for KeysIterator<'a, 'b, K, V, T, S>
where
K: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
V: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
T: Readable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error> + From<types::bytesrepr::Error>,
{
type Item = Result<K, S::Error>;
fn next(&mut self) -> Option<Self::Item> {
match mem::replace(&mut self.state, KeysIteratorState::Ok) {
KeysIteratorState::Ok => (),
KeysIteratorState::ReturnError(e) => {
self.state = KeysIteratorState::Failed;
return Some(Err(e));
}
KeysIteratorState::Failed => {
return None;
}
}
while let Some(VisitedTrieNode {
trie,
maybe_index,
mut path,
}) = self.visited.pop()
{
let mut maybe_next_trie: Option<Trie<K, V>> = None;
match trie {
Trie::Leaf { key, .. } => {
let key_bytes = match key.to_bytes() {
Ok(bytes) => bytes,
Err(e) => {
self.state = KeysIteratorState::Failed;
return Some(Err(e.into()));
}
};
debug_assert!(key_bytes.starts_with(&path));
path.extend(&self.initial_descend);
if key_bytes.starts_with(&path) {
return Some(Ok(key));
}
}
Trie::Node { ref pointer_block } => {
let mut index: usize = self
.initial_descend
.front()
.map(|i| *i as usize)
.or(maybe_index)
.unwrap_or_default();
while index < RADIX {
if let Some(ref pointer) = pointer_block[index] {
maybe_next_trie = match self.store.get(self.txn, pointer.hash()) {
Ok(trie) => trie,
Err(e) => {
self.state = KeysIteratorState::Failed;
return Some(Err(e));
}
};
debug_assert!(maybe_next_trie.is_some());
if self.initial_descend.pop_front().is_none() {
self.visited.push(VisitedTrieNode {
trie,
maybe_index: Some(index + 1),
path: path.clone(),
});
}
path.push(index as u8);
break;
}
if !self.initial_descend.is_empty() {
break;
}
index += 1;
}
}
Trie::Extension { affix, pointer } => {
let descend_len = cmp::min(self.initial_descend.len(), affix.len());
let check_prefix = self
.initial_descend
.drain(..descend_len)
.collect::<Vec<_>>();
if affix.starts_with(&check_prefix) {
maybe_next_trie = match self.store.get(self.txn, pointer.hash()) {
Ok(trie) => trie,
Err(e) => {
self.state = KeysIteratorState::Failed;
return Some(Err(e));
}
};
debug_assert!({
match &maybe_next_trie {
Some(Trie::Node { .. }) => true,
_ => false,
}
});
path.extend(affix);
}
}
}
if let Some(next_trie) = maybe_next_trie {
self.visited.push(VisitedTrieNode {
trie: next_trie,
maybe_index: None,
path,
});
}
}
None
}
}
#[allow(dead_code)]
pub fn keys<'a, 'b, K, V, T, S>(
correlation_id: CorrelationId,
txn: &'b T,
store: &'a S,
root: &Blake2bHash,
) -> KeysIterator<'a, 'b, K, V, T, S>
where
K: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
V: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
T: Readable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error>,
{
keys_with_prefix(correlation_id, txn, store, root, &[])
}
#[allow(dead_code)]
pub fn keys_with_prefix<'a, 'b, K, V, T, S>(
_correlation_id: CorrelationId,
txn: &'b T,
store: &'a S,
root: &Blake2bHash,
prefix: &[u8],
) -> KeysIterator<'a, 'b, K, V, T, S>
where
K: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
V: ToBytes + FromBytes + Clone + Eq + std::fmt::Debug,
T: Readable<Handle = S::Handle>,
S: TrieStore<K, V>,
S::Error: From<T::Error>,
{
let (visited, init_state): (Vec<VisitedTrieNode<K, V>>, _) = match store.get(txn, root) {
Ok(None) => (vec![], KeysIteratorState::Ok),
Err(e) => (vec![], KeysIteratorState::ReturnError(e)),
Ok(Some(current_root)) => (
vec![VisitedTrieNode {
trie: current_root,
maybe_index: None,
path: vec![],
}],
KeysIteratorState::Ok,
),
};
KeysIterator {
initial_descend: prefix.iter().cloned().collect(),
visited,
store,
txn,
state: init_state,
}
}