use std::collections::HashMap;
use crate::error::{InternalError, InvalidStateError};
use crate::state::merkle::{node::Node, MerkleRadixLeafReadError, MerkleRadixLeafReader};
#[cfg(feature = "state-in-transaction")]
use crate::state::{
Committer, DryRunCommitter, Pruner, Reader, StateError, ValueIter, ValueIterResult,
};
use crate::state::{
Prune, Read, StateChange, StatePruneError, StateReadError, StateWriteError, Write,
};
#[cfg(feature = "state-merkle-sql-in-transaction")]
use super::backend::InTransactionSqliteBackend;
use super::backend::{Backend, Connection, SqliteBackend, WriteExclusiveExecute};
use super::encode_and_hash;
use super::{
store::{MerkleRadixStore, SqlMerkleRadixStore},
MerkleRadixOverlay, MerkleRadixPruner, SqlMerkleState, SqlMerkleStateBuildError,
SqlMerkleStateBuilder,
};
const DEFAULT_MIN_CACHED_DATA_SIZE: usize = 100 * 1024; const DEFAULT_CACHE_SIZE: u16 = 512; impl SqlMerkleStateBuilder<SqliteBackend> {
pub fn build(self) -> Result<SqlMerkleState<SqliteBackend>, SqlMerkleStateBuildError> {
do_build(self)
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> SqlMerkleStateBuilder<InTransactionSqliteBackend<'a>> {
pub fn build(
self,
) -> Result<SqlMerkleState<InTransactionSqliteBackend<'a>>, SqlMerkleStateBuildError> {
do_build(self)
}
}
fn do_build<B>(
builder: SqlMerkleStateBuilder<B>,
) -> Result<SqlMerkleState<B>, SqlMerkleStateBuildError>
where
B: Backend + WriteExclusiveExecute,
<B as Backend>::Connection: Connection<ConnectionType = diesel::SqliteConnection>,
{
let backend = builder
.backend
.ok_or_else(|| InvalidStateError::with_message("must provide a backend".into()))?;
let tree_name = builder
.tree_name
.ok_or_else(|| InvalidStateError::with_message("must provide a tree name".into()))?;
let cache = {
super::cache::DataCache::new(
builder
.min_cached_data_size
.unwrap_or(DEFAULT_MIN_CACHED_DATA_SIZE),
builder.cache_size.unwrap_or(DEFAULT_CACHE_SIZE),
)
};
let store = SqlMerkleRadixStore::new(&backend);
let (initial_state_root_hash, _) = encode_and_hash(Node::default())?;
let tree_id: i64 = if builder.create_tree {
store.get_or_create_tree(&tree_name, &hex::encode(initial_state_root_hash))?
} else {
store.get_tree_id_by_name(&tree_name)?.ok_or_else(|| {
InvalidStateError::with_message("must provide the name of an existing tree".into())
})?
};
Ok(SqlMerkleState {
backend,
tree_id,
cache,
})
}
impl SqlMerkleState<SqliteBackend> {
pub fn delete_tree(self) -> Result<(), InternalError> {
let store = self.new_store();
store.delete_tree(self.tree_id)?;
Ok(())
}
pub fn remove_pruned_entries(&self) -> Result<(), InternalError> {
let store = self.new_store();
store.remove_pruned_entries(self.tree_id)?;
Ok(())
}
fn new_store(&self) -> SqlMerkleRadixStore<SqliteBackend, diesel::SqliteConnection> {
SqlMerkleRadixStore::new_with_cache(&self.backend, &self.cache)
}
}
impl Write for SqlMerkleState<SqliteBackend> {
type StateId = String;
type Key = String;
type Value = Vec<u8>;
fn commit(
&self,
state_id: &Self::StateId,
state_changes: &[StateChange],
) -> Result<Self::StateId, StateWriteError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, tree_update) = overlay
.generate_updates(state_changes)
.map_err(|e| StateWriteError::StorageError(Box::new(e)))?;
overlay
.write_updates(&next_state_id, tree_update)
.map_err(|e| StateWriteError::StorageError(Box::new(e)))?;
Ok(next_state_id)
}
fn compute_state_id(
&self,
state_id: &Self::StateId,
state_changes: &[StateChange],
) -> Result<Self::StateId, StateWriteError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, _) = overlay
.generate_updates(state_changes)
.map_err(|e| StateWriteError::StorageError(Box::new(e)))?;
Ok(next_state_id)
}
}
impl Read for SqlMerkleState<SqliteBackend> {
type StateId = String;
type Key = String;
type Value = Vec<u8>;
fn get(
&self,
state_id: &Self::StateId,
keys: &[Self::Key],
) -> Result<HashMap<Self::Key, Self::Value>, StateReadError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
if !overlay
.has_root()
.map_err(|e| StateReadError::StorageError(Box::new(e)))?
{
return Err(StateReadError::InvalidStateId(state_id.into()));
}
overlay
.get_entries(keys)
.map_err(|e| StateReadError::StorageError(Box::new(e)))
}
fn clone_box(
&self,
) -> Box<dyn Read<StateId = Self::StateId, Key = Self::Key, Value = Self::Value>> {
Box::new(self.clone())
}
}
impl Prune for SqlMerkleState<SqliteBackend> {
type StateId = String;
type Key = String;
type Value = Vec<u8>;
fn prune(&self, state_ids: Vec<Self::StateId>) -> Result<Vec<Self::Key>, StatePruneError> {
let overlay = MerkleRadixPruner::new(self.tree_id, self.new_store());
overlay
.prune(&state_ids)
.map_err(|e| StatePruneError::StorageError(Box::new(e)))
}
}
#[cfg(feature = "state-in-transaction")]
impl Reader for SqlMerkleState<SqliteBackend> {
type Filter = str;
fn get(
&self,
state_id: &Self::StateId,
keys: &[Self::Key],
) -> Result<HashMap<Self::Key, Self::Value>, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
if !overlay.has_root()? {
return Err(InvalidStateError::with_message(state_id.into()).into());
}
Ok(overlay.get_entries(keys)?)
}
fn filter_iter(
&self,
state_id: &Self::StateId,
filter: Option<&Self::Filter>,
) -> ValueIterResult<ValueIter<(Self::Key, Self::Value)>> {
if &self.initial_state_root_hash()? == state_id {
return Ok(Box::new(std::iter::empty()));
}
let leaves = self
.new_store()
.list_entries(self.tree_id, state_id, filter)?;
Ok(Box::new(leaves.into_iter().map(Ok)))
}
}
#[cfg(feature = "state-in-transaction")]
impl Committer for SqlMerkleState<SqliteBackend> {
type StateChange = StateChange;
fn commit(
&self,
state_id: &Self::StateId,
state_changes: &[Self::StateChange],
) -> Result<Self::StateId, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, tree_update) = overlay
.generate_updates(state_changes)
.map_err(|e| InternalError::from_source(Box::new(e)))?;
overlay.write_updates(&next_state_id, tree_update)?;
Ok(next_state_id)
}
}
#[cfg(feature = "state-in-transaction")]
impl DryRunCommitter for SqlMerkleState<SqliteBackend> {
type StateChange = StateChange;
fn dry_run_commit(
&self,
state_id: &Self::StateId,
state_changes: &[Self::StateChange],
) -> Result<Self::StateId, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, _) = overlay
.generate_updates(state_changes)
.map_err(|e| InternalError::from_source(Box::new(e)))?;
Ok(next_state_id)
}
}
#[cfg(feature = "state-in-transaction")]
impl Pruner for SqlMerkleState<SqliteBackend> {
fn prune(&self, state_ids: Vec<Self::StateId>) -> Result<Vec<Self::Key>, StateError> {
let overlay = MerkleRadixPruner::new(self.tree_id, self.new_store());
overlay.prune(&state_ids).map_err(StateError::from)
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> SqlMerkleState<InTransactionSqliteBackend<'a>> {
pub fn delete_tree(self) -> Result<(), InternalError> {
let store = self.new_store();
store.delete_tree(self.tree_id)?;
Ok(())
}
pub fn remove_pruned_entries(&self) -> Result<(), InternalError> {
let store = self.new_store();
store.remove_pruned_entries(self.tree_id)?;
Ok(())
}
fn new_store(
&self,
) -> SqlMerkleRadixStore<InTransactionSqliteBackend<'a>, diesel::SqliteConnection> {
SqlMerkleRadixStore::new_with_cache(&self.backend, &self.cache)
}
}
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
impl<'a> Reader for SqlMerkleState<InTransactionSqliteBackend<'a>> {
type Filter = str;
fn get(
&self,
state_id: &Self::StateId,
keys: &[Self::Key],
) -> Result<HashMap<Self::Key, Self::Value>, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
if !overlay.has_root()? {
return Err(InvalidStateError::with_message(state_id.into()).into());
}
Ok(overlay.get_entries(keys)?)
}
fn filter_iter(
&self,
state_id: &Self::StateId,
filter: Option<&Self::Filter>,
) -> ValueIterResult<ValueIter<(Self::Key, Self::Value)>> {
if &self.initial_state_root_hash()? == state_id {
return Ok(Box::new(std::iter::empty()));
}
let leaves = self
.new_store()
.list_entries(self.tree_id, state_id, filter)?;
Ok(Box::new(leaves.into_iter().map(Ok)))
}
}
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
impl<'a> Committer for SqlMerkleState<InTransactionSqliteBackend<'a>> {
type StateChange = StateChange;
fn commit(
&self,
state_id: &Self::StateId,
state_changes: &[Self::StateChange],
) -> Result<Self::StateId, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, tree_update) = overlay
.generate_updates(state_changes)
.map_err(|e| InternalError::from_source(Box::new(e)))?;
overlay.write_updates(&next_state_id, tree_update)?;
Ok(next_state_id)
}
}
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
impl<'a> DryRunCommitter for SqlMerkleState<InTransactionSqliteBackend<'a>> {
type StateChange = StateChange;
fn dry_run_commit(
&self,
state_id: &Self::StateId,
state_changes: &[Self::StateChange],
) -> Result<Self::StateId, StateError> {
let overlay = MerkleRadixOverlay::new(self.tree_id, state_id, self.new_store());
let (next_state_id, _) = overlay
.generate_updates(state_changes)
.map_err(|e| InternalError::from_source(Box::new(e)))?;
Ok(next_state_id)
}
}
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
impl<'a> Pruner for SqlMerkleState<InTransactionSqliteBackend<'a>> {
fn prune(&self, state_ids: Vec<Self::StateId>) -> Result<Vec<Self::Key>, StateError> {
let overlay = MerkleRadixPruner::new(self.tree_id, self.new_store());
overlay.prune(&state_ids).map_err(StateError::from)
}
}
type IterResult<T> = Result<T, MerkleRadixLeafReadError>;
type LeafIter<T> = Box<dyn Iterator<Item = IterResult<T>>>;
impl MerkleRadixLeafReader for SqlMerkleState<SqliteBackend> {
fn leaves(
&self,
state_id: &Self::StateId,
subtree: Option<&str>,
) -> IterResult<LeafIter<(Self::Key, Self::Value)>> {
if &self.initial_state_root_hash()? == state_id {
return Ok(Box::new(std::iter::empty()));
}
let leaves = self
.new_store()
.list_entries(self.tree_id, state_id, subtree)?;
Ok(Box::new(leaves.into_iter().map(Ok)))
}
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::{Arc, RwLock};
use diesel::{
dsl::sql_query,
prelude::*,
r2d2::{ConnectionManager, Pool},
sqlite,
};
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
use crate::state::merkle::sql::backend;
use crate::state::merkle::sql::backend::SqliteBackendBuilder;
use crate::state::merkle::sql::migration::MigrationManager;
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
use crate::state::Committer;
#[test]
fn test_multiple_trees() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
let tree_1 = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.create_tree_if_necessary()
.build()?;
let initial_state_root_hash = tree_1.initial_state_root_hash()?;
let state_change_set = StateChange::Set {
key: "1234".to_string(),
value: "state_value".as_bytes().to_vec(),
};
let new_root =
Write::commit(&tree_1, &initial_state_root_hash, &[state_change_set]).unwrap();
assert_read_value_at_address(&tree_1, &new_root, "1234", Some("state_value"));
let tree_2 = SqlMerkleStateBuilder::new()
.with_backend(backend)
.with_tree("test-2")
.create_tree_if_necessary()
.build()?;
assert!(Read::get(&tree_2, &new_root, &["1234".to_string()]).is_err());
Ok(())
}
#[test]
fn test_build_fails_without_explicit_create() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
assert!(SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.build()
.is_err());
Ok(())
}
#[test]
fn test_delete_tree() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
let state = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.create_tree_if_necessary()
.build()?;
let initial_state_root_hash = state.initial_state_root_hash()?;
let state_change_set = StateChange::Set {
key: "1234".to_string(),
value: "state_value".as_bytes().to_vec(),
};
let new_root = Write::commit(&state, &initial_state_root_hash, &[state_change_set])?;
assert_read_value_at_address(&state, &new_root, "1234", Some("state_value"));
drop(state);
let state = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.build()?;
assert_read_value_at_address(&state, &new_root, "1234", Some("state_value"));
state.delete_tree()?;
assert!(
SqlMerkleStateBuilder::new()
.with_backend(backend)
.with_tree("test-1")
.build()
.is_err(),
"The tree should no longer exist"
);
Ok(())
}
#[test]
fn test_list_leaves() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
let tree_1 = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.create_tree_if_necessary()
.build()?;
let initial_state_root_hash = tree_1.initial_state_root_hash()?;
let state_change_set = StateChange::Set {
key: "012345".to_string(),
value: "state_value".as_bytes().to_vec(),
};
let new_root =
Write::commit(&tree_1, &initial_state_root_hash, &[state_change_set]).unwrap();
assert_read_value_at_address(&tree_1, &new_root, "012345", Some("state_value"));
let entry = tree_1
.leaves(&new_root, None)?
.next()
.expect("A value should have been listed")?;
assert_eq!(("012345".to_string(), b"state_value".to_vec()), entry);
Ok(())
}
#[cfg(all(
feature = "state-merkle-sql-in-transaction",
feature = "state-in-transaction"
))]
#[test]
fn test_in_transaction() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
let new_root = backend.execute_write(|conn| {
let in_txn_backend: backend::InTransactionSqliteBackend = conn.as_inner().into();
let tree = SqlMerkleStateBuilder::new()
.with_backend(in_txn_backend)
.with_tree("test-1")
.create_tree_if_necessary()
.build()
.map_err(|e| InternalError::from_source(Box::new(e)))?;
let initial_state_root_hash = tree.initial_state_root_hash()?;
let state_change_set = StateChange::Set {
key: "012345".to_string(),
value: "state_value".as_bytes().to_vec(),
};
tree.commit(&initial_state_root_hash, &[state_change_set])
.map_err(|e| InternalError::from_source(Box::new(e)))
})?;
let tree = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.build()?;
assert_read_value_at_address(&tree, &new_root, "012345", Some("state_value"));
Ok(())
}
#[test]
fn test_remove_pruned_entries() -> Result<(), Box<dyn std::error::Error>> {
let backend = SqliteBackendBuilder::new().with_memory_database().build()?;
backend.run_migrations()?;
let tree = SqlMerkleStateBuilder::new()
.with_backend(backend.clone())
.with_tree("test-1")
.create_tree_if_necessary()
.build()?;
let initial_state_root_hash = tree.initial_state_root_hash()?;
let state_change_set = StateChange::Set {
key: "012345".to_string(),
value: "state_value".as_bytes().to_vec(),
};
let first_root = Write::commit(&tree, &initial_state_root_hash, &[state_change_set])?;
assert_read_value_at_address(&tree, &first_root, "012345", Some("state_value"));
let new_state_changes = [
StateChange::Set {
key: "ab0000".to_string(),
value: "second_value".as_bytes().to_vec(),
},
StateChange::Delete {
key: "012345".to_string(),
},
];
let second_root = Write::commit(&tree, &first_root, &new_state_changes)?;
assert_read_value_at_address(&tree, &second_root, "ab0000", Some("second_value"));
assert_read_value_at_address(&tree, &second_root, "012345", None);
Prune::prune(&tree, vec![first_root])?;
#[derive(Debug, QueryableByName)]
struct Entries {
#[column_name = "entries"]
#[sql_type = "diesel::sql_types::BigInt"]
pub entries: i64,
}
let pool: Arc<RwLock<Pool<ConnectionManager<sqlite::SqliteConnection>>>> = backend.into();
{
let conn = pool.read().unwrap().get()?;
let count: Entries =
sql_query("SELECT count(id) as entries FROM merkle_radix_leaf WHERE address = ?")
.bind::<diesel::sql_types::Text, _>("012345")
.get_result(&*conn)?;
assert_eq!(count.entries, 1);
}
tree.remove_pruned_entries()?;
{
let conn = pool.read().unwrap().get()?;
let count: Entries =
sql_query("SELECT count(id) as entries FROM merkle_radix_leaf WHERE address = ?")
.bind::<diesel::sql_types::Text, _>("012345")
.get_result(&*conn)?;
assert_eq!(count.entries, 0);
}
Ok(())
}
fn assert_read_value_at_address<R>(
merkle_read: &R,
root_hash: &str,
address: &str,
expected_value: Option<&str>,
) where
R: Read<StateId = String, Key = String, Value = Vec<u8>>,
{
let value = merkle_read
.get(&root_hash.to_string(), &[address.to_string()])
.and_then(|mut values| {
Ok(values.remove(address).map(|value| {
String::from_utf8(value).expect("could not convert bytes to string")
}))
});
match value {
Ok(value) => assert_eq!(expected_value, value.as_deref()),
Err(err) => panic!("value at address {} produced an error: {}", address, err),
}
}
}