use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryFrom;
use std::sync::{
mpsc::{channel, Receiver, RecvTimeoutError, Sender},
Arc, RwLock,
};
use std::time::{Duration, Instant, SystemTime};
use std::{fmt, path::Path};
use protobuf::Message;
use sawtooth::store::lmdb::LmdbOrderedStore;
use sawtooth::store::receipt_store::TransactionReceiptStore;
use sawtooth_sabre::handler::SabreTransactionHandler;
use sawtooth_sabre::{ADMINISTRATORS_SETTING_ADDRESS, ADMINISTRATORS_SETTING_KEY};
use transact::context::manager::sync::ContextManager;
use transact::database::{
lmdb::{LmdbContext, LmdbDatabase},
Database,
};
use transact::sawtooth::SawtoothToTransactHandlerAdapter;
use transact::scheduler::{serial::SerialScheduler, BatchExecutionResult, Scheduler};
use transact::state::{
merkle::{MerkleRadixTree, MerkleState, INDEXES},
StateChange as TransactStateChange, Write,
};
use transact::{
execution::{adapter::static_adapter::StaticExecutionAdapter, executor::Executor},
protocol::{
batch::BatchPair,
receipt::{TransactionReceipt, TransactionResult},
},
};
#[cfg(feature = "events")]
use crate::events::{ParseBytes, ParseError};
use crate::hex;
use crate::protos::scabbard::{Setting, Setting_Entry};
use super::error::{ScabbardStateError, StateSubscriberError};
const EXECUTION_TIMEOUT: u64 = 300; const CURRENT_STATE_ROOT_INDEX: &str = "current_state_root";
const ITER_CACHE_SIZE: usize = 64;
const COMPLETED_BATCH_INFO_ITER_RETRY_MILLIS: u64 = 100;
const DEFAULT_BATCH_HISTORY_SIZE: usize = 100;
#[cfg(feature = "scabbard-get-state")]
pub type StateIter = dyn Iterator<Item = Result<(String, Vec<u8>), ScabbardStateError>>;
pub struct ScabbardState {
db: Box<dyn Database>,
context_manager: ContextManager,
executor: Executor,
current_state_root: String,
transaction_receipt_store: Arc<RwLock<TransactionReceiptStore>>,
pending_changes: Option<(String, Vec<TransactionReceipt>)>,
event_subscribers: Vec<Box<dyn StateSubscriber>>,
batch_history: BatchHistory,
}
impl ScabbardState {
pub fn new(
state_db_path: &Path,
state_db_size: usize,
receipt_db_path: &Path,
receipt_db_size: usize,
admin_keys: Vec<String>,
) -> Result<Self, ScabbardStateError> {
let mut indexes = INDEXES.to_vec();
indexes.push(CURRENT_STATE_ROOT_INDEX);
let db = Box::new(LmdbDatabase::new(
LmdbContext::new(state_db_path, indexes.len(), Some(state_db_size))?,
&indexes,
)?);
let current_state_root = if let Some(current_state_root) =
Self::read_current_state_root(&*db)?
{
debug!("Restoring scabbard state on root {}", current_state_root);
current_state_root
} else {
let mut admin_keys_entry = Setting_Entry::new();
admin_keys_entry.set_key(ADMINISTRATORS_SETTING_KEY.into());
admin_keys_entry.set_value(admin_keys.join(","));
let mut admin_keys_setting = Setting::new();
admin_keys_setting.set_entries(vec![admin_keys_entry].into());
let admin_keys_setting_bytes = admin_keys_setting.write_to_bytes().map_err(|err| {
ScabbardStateError(format!(
"failed to write admin keys setting to bytes: {}",
err
))
})?;
let admin_keys_state_change = TransactStateChange::Set {
key: ADMINISTRATORS_SETTING_ADDRESS.into(),
value: admin_keys_setting_bytes,
};
let initial_state_root = MerkleRadixTree::new(db.clone_box(), None)?.get_merkle_root();
MerkleState::new(db.clone()).commit(
&initial_state_root,
vec![admin_keys_state_change].as_slice(),
)?
};
let context_manager = ContextManager::new(Box::new(MerkleState::new(db.clone())));
let mut executor = Executor::new(vec![Box::new(StaticExecutionAdapter::new_adapter(
vec![Box::new(SawtoothToTransactHandlerAdapter::new(
SabreTransactionHandler::new(),
))],
context_manager.clone(),
)?)]);
executor
.start()
.map_err(|err| ScabbardStateError(format!("failed to start executor: {}", err)))?;
Ok(ScabbardState {
db,
context_manager,
executor,
current_state_root,
transaction_receipt_store: Arc::new(RwLock::new(TransactionReceiptStore::new(
Box::new(
LmdbOrderedStore::new(receipt_db_path, Some(receipt_db_size))
.map_err(|err| ScabbardStateError(err.to_string()))?,
),
))),
pending_changes: None,
event_subscribers: vec![],
batch_history: BatchHistory::new(),
})
}
fn read_current_state_root(db: &dyn Database) -> Result<Option<String>, ScabbardStateError> {
db.get_reader()
.and_then(|reader| reader.index_get(CURRENT_STATE_ROOT_INDEX, b"HEAD"))
.map(|head| head.map(|bytes| hex::to_hex(&bytes)))
.map_err(|e| ScabbardStateError(format!("Unable to read HEAD entry: {}", e)))
}
fn write_current_state_root(&self) -> Result<(), ScabbardStateError> {
let current_root_bytes = hex::parse_hex(&self.current_state_root).map_err(|e| {
ScabbardStateError(format!(
"The in-memory current state root is invalid: {}",
e
))
})?;
let mut writer = self.db.get_writer().map_err(|e| {
ScabbardStateError(format!(
"Unable to start write transaction for HEAD entry: {}",
e
))
})?;
writer
.index_put(CURRENT_STATE_ROOT_INDEX, b"HEAD", ¤t_root_bytes)
.map_err(|e| ScabbardStateError(format!("Unable to write HEAD entry: {}", e)))?;
writer
.commit()
.map_err(|e| ScabbardStateError(format!("Unable to commit HEAD entry: {}", e)))?;
Ok(())
}
#[cfg(feature = "scabbard-get-state")]
pub fn get_state_at_address(
&self,
address: &str,
) -> Result<Option<Vec<u8>>, ScabbardStateError> {
Ok(
MerkleRadixTree::new(self.db.clone(), Some(&self.current_state_root))?
.get_value(address)?,
)
}
#[cfg(feature = "scabbard-get-state")]
pub fn get_state_with_prefix(
&self,
prefix: Option<&str>,
) -> Result<Box<StateIter>, ScabbardStateError> {
Ok(Box::new(
MerkleRadixTree::new(self.db.clone(), Some(&self.current_state_root))?
.leaves(prefix)?
.map(|res| res.map_err(ScabbardStateError::from)),
))
}
pub fn prepare_change(&mut self, batch: BatchPair) -> Result<String, ScabbardStateError> {
let (result_tx, result_rx) = std::sync::mpsc::channel();
let mut scheduler = SerialScheduler::new(
Box::new(self.context_manager.clone()),
self.current_state_root.clone(),
)?;
scheduler.set_result_callback(Box::new(move |batch_result| {
if result_tx.send(batch_result).is_err() {
error!("Unable to send batch result; receiver must have dropped");
}
}))?;
scheduler.add_batch(batch.clone())?;
scheduler.finalize()?;
self.executor
.execute(scheduler.take_task_iterator()?, scheduler.new_notifier()?)?;
let batch_result = result_rx
.recv_timeout(Duration::from_secs(EXECUTION_TIMEOUT))
.map_err(|_| ScabbardStateError("failed to receive result in reasonable time".into()))?
.ok_or_else(|| ScabbardStateError("no result returned from executor".into()))?;
let batch_status = batch_result.clone().into();
let signature = batch.batch().header_signature();
self.batch_history
.update_batch_status(&signature, batch_status);
let txn_receipts = batch_result
.receipts
.into_iter()
.map(|receipt| match receipt.transaction_result {
TransactionResult::Valid { .. } => Ok(receipt),
TransactionResult::Invalid { error_message, .. } => Err(ScabbardStateError(
format!("transaction failed: {:?}", error_message),
)),
})
.collect::<Result<Vec<_>, _>>()?;
scheduler.shutdown();
let state_root = MerkleState::new(self.db.clone()).compute_state_id(
&self.current_state_root,
&receipts_into_transact_state_changes(&txn_receipts)?,
)?;
self.pending_changes = Some((signature.to_string(), txn_receipts));
Ok(state_root)
}
pub fn commit(&mut self) -> Result<(), ScabbardStateError> {
match self.pending_changes.take() {
Some((signature, txn_receipts)) => {
let state_changes = receipts_into_transact_state_changes(&txn_receipts)?;
self.current_state_root = MerkleState::new(self.db.clone())
.commit(&self.current_state_root, &state_changes)?;
self.write_current_state_root()?;
info!(
"committed {} change(s) for new state root {}",
state_changes.len(),
self.current_state_root,
);
let events = txn_receipts
.iter()
.cloned()
.map(StateChangeEvent::try_from)
.collect::<Result<Vec<_>, _>>()?;
self.transaction_receipt_store
.write()
.map_err(|err| {
ScabbardStateError(format!(
"transaction receipt store lock poisoned: {}",
err
))
})?
.append(txn_receipts)
.map_err(|err| {
ScabbardStateError(format!(
"failed to add transaction receipts to store: {}",
err
))
})?;
for event in events {
self.event_subscribers.retain(|subscriber| {
match subscriber.handle_event(event.clone()) {
Ok(()) => true,
Err(StateSubscriberError::Unsubscribe) => false,
Err(err @ StateSubscriberError::UnableToHandleEvent(_)) => {
error!("{}", err);
true
}
}
});
}
self.batch_history.commit(&signature);
Ok(())
}
None => Err(ScabbardStateError("no pending changes to commit".into())),
}
}
pub fn rollback(&mut self) -> Result<(), ScabbardStateError> {
match self.pending_changes.take() {
Some((_, txn_receipts)) => info!(
"discarded {} change(s)",
receipts_into_transact_state_changes(&txn_receipts)?.len()
),
None => debug!("no changes to rollback"),
}
Ok(())
}
pub fn batch_history(&mut self) -> &mut BatchHistory {
&mut self.batch_history
}
pub fn get_events_since(&self, event_id: Option<String>) -> Result<Events, ScabbardStateError> {
Events::new(self.transaction_receipt_store.clone(), event_id)
}
pub fn add_subscriber(&mut self, subscriber: Box<dyn StateSubscriber>) {
self.event_subscribers.push(subscriber);
}
pub fn clear_subscribers(&mut self) {
self.event_subscribers.clear();
}
}
fn receipts_into_transact_state_changes(
receipts: &[TransactionReceipt],
) -> Result<Vec<TransactStateChange>, ScabbardStateError> {
Ok(receipts
.iter()
.cloned()
.map(Vec::<TransactStateChange>::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(|err| ScabbardStateError(err.to_string()))?
.into_iter()
.flatten()
.collect())
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StateChangeEvent {
pub id: String,
pub state_changes: Vec<StateChange>,
}
#[cfg(feature = "events")]
impl ParseBytes<StateChangeEvent> for StateChangeEvent {
fn from_bytes(bytes: &[u8]) -> Result<StateChangeEvent, ParseError> {
serde_json::from_slice(bytes)
.map_err(Box::new)
.map_err(|err| ParseError::MalformedMessage(err))
}
}
impl TryFrom<TransactionReceipt> for StateChangeEvent {
type Error = ScabbardStateError;
fn try_from(receipt: TransactionReceipt) -> Result<Self, Self::Error> {
let TransactionReceipt {
transaction_id,
transaction_result,
} = receipt;
match transaction_result {
TransactionResult::Valid { state_changes, .. } => {
Ok(StateChangeEvent {
id: transaction_id,
state_changes: state_changes.into_iter().map(StateChange::from).collect(),
})
}
TransactionResult::Invalid { .. } => Err(ScabbardStateError(
format!("cannot convert transaction receipt ({}) to state cahnge event because transction result is `Invalid`", transaction_id)
)),
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub enum StateChange {
Set { key: String, value: Vec<u8> },
Delete { key: String },
}
impl fmt::Display for StateChange {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
StateChange::Set { key, value } => {
write!(f, "Set(key: {}, payload_size: {})", key, value.len())
}
StateChange::Delete { key } => write!(f, "Delete(key: {})", key),
}
}
}
impl fmt::Debug for StateChange {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self)
}
}
impl From<transact::protocol::receipt::StateChange> for StateChange {
fn from(change: transact::protocol::receipt::StateChange) -> Self {
match change {
transact::protocol::receipt::StateChange::Set { key, value } => {
StateChange::Set { key, value }
}
transact::protocol::receipt::StateChange::Delete { key } => StateChange::Delete { key },
}
}
}
pub trait StateSubscriber: Send {
fn handle_event(&self, event: StateChangeEvent) -> Result<(), StateSubscriberError>;
}
#[derive(PartialEq)]
enum EventQuery {
Fetch(Option<String>),
Exhausted,
}
pub struct Events {
transaction_receipt_store: Arc<RwLock<TransactionReceiptStore>>,
query: EventQuery,
cache: VecDeque<StateChangeEvent>,
}
impl Events {
fn new(
transaction_receipt_store: Arc<RwLock<TransactionReceiptStore>>,
start_id: Option<String>,
) -> Result<Self, ScabbardStateError> {
let mut iter = Events {
transaction_receipt_store,
query: EventQuery::Fetch(start_id),
cache: VecDeque::default(),
};
iter.reload_cache()?;
Ok(iter)
}
fn reload_cache(&mut self) -> Result<(), ScabbardStateError> {
match self.query {
EventQuery::Fetch(ref start_id) => {
let transaction_receipt_store =
self.transaction_receipt_store.read().map_err(|err| {
ScabbardStateError(format!(
"transaction receipt store lock poisoned: {}",
err
))
})?;
self.cache = if let Some(id) = start_id.as_ref() {
transaction_receipt_store.iter_since_id(id.clone())
} else {
transaction_receipt_store.iter()
}
.map_err(|err| {
ScabbardStateError(format!(
"failed to get transaction receipts from store: {}",
err
))
})?
.take(ITER_CACHE_SIZE)
.map(StateChangeEvent::try_from)
.collect::<Result<VecDeque<_>, _>>()?;
self.query = self
.cache
.back()
.map(|event| EventQuery::Fetch(Some(event.id.clone())))
.unwrap_or(EventQuery::Exhausted);
Ok(())
}
EventQuery::Exhausted => Ok(()),
}
}
}
impl Iterator for Events {
type Item = StateChangeEvent;
fn next(&mut self) -> Option<Self::Item> {
if self.cache.is_empty() && self.query != EventQuery::Exhausted {
if let Err(err) = self.reload_cache() {
error!("Unable to reload iterator cache: {}", err);
}
}
self.cache.pop_front()
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "statusType", content = "message")]
pub enum BatchStatus {
Unknown,
Pending,
Invalid(Vec<InvalidTransaction>),
Valid(Vec<ValidTransaction>),
Committed(Vec<ValidTransaction>),
}
impl From<BatchExecutionResult> for BatchStatus {
fn from(batch_result: BatchExecutionResult) -> Self {
let mut valid = Vec::new();
let mut invalid = Vec::new();
for receipt in batch_result.receipts.into_iter() {
match receipt.transaction_result {
TransactionResult::Valid { .. } => {
valid.push(ValidTransaction::new(receipt.transaction_id));
}
TransactionResult::Invalid {
error_message,
error_data,
} => {
invalid.push(InvalidTransaction::new(
receipt.transaction_id,
error_message,
error_data,
));
}
}
}
if !invalid.is_empty() {
BatchStatus::Invalid(invalid)
} else {
BatchStatus::Valid(valid)
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ValidTransaction {
transaction_id: String,
}
impl ValidTransaction {
fn new(transaction_id: String) -> Self {
Self { transaction_id }
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct InvalidTransaction {
transaction_id: String,
error_message: String,
error_data: Vec<u8>,
}
impl InvalidTransaction {
fn new(transaction_id: String, error_message: String, error_data: Vec<u8>) -> Self {
Self {
transaction_id,
error_message,
error_data,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BatchInfo {
pub id: String,
pub status: BatchStatus,
#[serde(skip, default = "SystemTime::now")]
pub timestamp: SystemTime,
}
impl BatchInfo {
fn set_status(&mut self, status: BatchStatus) {
self.status = status;
}
}
pub struct BatchHistory {
history: HashMap<String, BatchInfo>,
limit: usize,
batch_subscribers: Vec<(HashSet<String>, Sender<BatchInfo>)>,
}
impl BatchHistory {
pub fn new() -> Self {
Self::default()
}
pub fn add_batch(&mut self, signature: &str) {
self.upsert_batch(signature.into(), BatchStatus::Pending);
}
fn update_batch_status(&mut self, signature: &str, status: BatchStatus) {
let batch_info = self.upsert_batch(signature.into(), status);
match batch_info.status {
BatchStatus::Invalid(_) | BatchStatus::Valid(_) => {
self.send_completed_batch_info_to_subscribers(batch_info)
}
_ => {}
}
}
fn commit(&mut self, signature: &str) {
match self.history.get_mut(signature) {
Some(info) => match info.status.clone() {
BatchStatus::Valid(txns) => {
info.set_status(BatchStatus::Committed(txns));
}
_ => {
error!(
"Received commit for batch that was not valid: {:?}",
signature
);
}
},
None => {
debug!(
"Received commit for batch that is not in the history: {:?}",
signature
);
}
}
}
fn upsert_batch(&mut self, signature: String, status: BatchStatus) -> BatchInfo {
match self.history.get_mut(&signature) {
Some(info) => {
info.set_status(status);
info.clone()
}
None => {
let batch_info = BatchInfo {
id: signature.clone(),
status,
timestamp: SystemTime::now(),
};
self.history.insert(signature, batch_info.clone());
if self.history.len() > self.limit {
self.history
.clone()
.into_iter()
.min_by_key(|(_, v)| v.timestamp)
.and_then(|(k, _)| self.history.remove(&k));
}
batch_info
}
}
}
pub fn get_batch_info(
&mut self,
ids: HashSet<String>,
wait: Option<Duration>,
) -> Result<BatchInfoIter, ScabbardStateError> {
match wait {
Some(timeout) => self.completed_batch_info_iter(ids, timeout),
None => Ok(self.no_wait_batch_info_iter(&ids)),
}
}
fn no_wait_batch_info_iter(&self, ids: &HashSet<String>) -> BatchInfoIter {
Box::new(
ids.iter()
.map(|id| {
Ok(if let Some(info) = self.history.get(id) {
info.clone()
} else {
BatchInfo {
id: id.to_string(),
status: BatchStatus::Unknown,
timestamp: SystemTime::now(),
}
})
})
.collect::<Vec<_>>()
.into_iter(),
)
}
fn completed_batch_info_iter(
&mut self,
mut ids: HashSet<String>,
timeout: Duration,
) -> Result<BatchInfoIter, ScabbardStateError> {
let iter = self
.no_wait_batch_info_iter(&ids)
.filter_map(|res| {
let info = res.ok()?;
match info.status {
BatchStatus::Invalid(_) | BatchStatus::Committed(_) => {
ids.remove(&info.id);
Some(Ok(info))
}
_ => None,
}
})
.collect::<Vec<_>>()
.into_iter();
let (sender, receiver) = channel();
self.batch_subscribers.push((ids.clone(), sender));
Ok(Box::new(
iter.chain(ChannelBatchInfoIter::new(receiver, timeout, ids)?),
))
}
fn send_completed_batch_info_to_subscribers(&mut self, info: BatchInfo) {
self.batch_subscribers = self
.batch_subscribers
.drain(..)
.filter_map(|(mut pending_signatures, sender)| {
if pending_signatures.remove(&info.id) && sender.send(info.clone()).is_err() {
return None;
}
if pending_signatures.is_empty() {
None
} else {
Some((pending_signatures, sender))
}
})
.collect();
}
}
impl Default for BatchHistory {
fn default() -> Self {
Self {
history: HashMap::new(),
limit: DEFAULT_BATCH_HISTORY_SIZE,
batch_subscribers: vec![],
}
}
}
pub type BatchInfoIter = Box<dyn Iterator<Item = Result<BatchInfo, String>>>;
pub struct ChannelBatchInfoIter {
receiver: Receiver<BatchInfo>,
retry_interval: Duration,
timeout: Instant,
pending_ids: HashSet<String>,
}
impl ChannelBatchInfoIter {
fn new(
receiver: Receiver<BatchInfo>,
timeout: Duration,
pending_ids: HashSet<String>,
) -> Result<Self, ScabbardStateError> {
let timeout = Instant::now()
.checked_add(timeout)
.ok_or_else(|| ScabbardStateError("failed to schedule timeout".into()))?;
Ok(Self {
receiver,
retry_interval: Duration::from_millis(COMPLETED_BATCH_INFO_ITER_RETRY_MILLIS),
timeout,
pending_ids,
})
}
}
impl Iterator for ChannelBatchInfoIter {
type Item = Result<BatchInfo, String>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.pending_ids.is_empty() {
return None;
}
if Instant::now() >= self.timeout {
return Some(Err(format!(
"timeout expired while waiting for incompleted batches: {:?}",
self.pending_ids
)));
}
match self.receiver.recv_timeout(self.retry_interval) {
Ok(batch_info) => {
self.pending_ids.remove(&batch_info.id);
return Some(Ok(batch_info));
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return None,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const TEMP_DB_SIZE: usize = 1 << 30;
#[test]
fn empty_event_iterator() {
let temp_db_path = get_temp_db_path();
let test_result = std::panic::catch_unwind(|| {
let transaction_receipt_store =
Arc::new(RwLock::new(TransactionReceiptStore::new(Box::new(
LmdbOrderedStore::new(&temp_db_path, Some(TEMP_DB_SIZE))
.expect("Failed to create LMDB store"),
))));
let all_events = Events::new(transaction_receipt_store.clone(), None)
.expect("failed to get iterator for all events");
let all_event_ids = all_events.map(|event| event.id.clone()).collect::<Vec<_>>();
assert!(
all_event_ids.is_empty(),
"All events should have been empty"
);
});
std::fs::remove_file(temp_db_path.as_path()).expect("Failed to remove temp DB file");
assert!(test_result.is_ok());
}
#[test]
fn event_iterator() {
let temp_db_path = get_temp_db_path();
let test_result = std::panic::catch_unwind(|| {
let receipts = vec![
mock_transaction_receipt("ab"),
mock_transaction_receipt("cd"),
mock_transaction_receipt("ef"),
];
let receipt_ids = receipts
.iter()
.map(|receipt| receipt.transaction_id.clone())
.collect::<Vec<_>>();
let transaction_receipt_store =
Arc::new(RwLock::new(TransactionReceiptStore::new(Box::new(
LmdbOrderedStore::new(&temp_db_path, Some(TEMP_DB_SIZE))
.expect("Failed to create LMDB store"),
))));
transaction_receipt_store
.write()
.expect("failed to get write lock")
.append(receipts.clone())
.expect("failed to add receipts to store");
let all_events = Events::new(transaction_receipt_store.clone(), None)
.expect("failed to get iterator for all events");
let all_event_ids = all_events.map(|event| event.id.clone()).collect::<Vec<_>>();
assert_eq!(all_event_ids, receipt_ids);
let some_events = Events::new(
transaction_receipt_store.clone(),
Some(receipt_ids[0].clone()),
)
.expect("failed to get iterator for some events");
let some_event_ids = some_events
.map(|event| event.id.clone())
.collect::<Vec<_>>();
assert_eq!(some_event_ids, receipt_ids[1..].to_vec());
});
std::fs::remove_file(temp_db_path.as_path()).expect("Failed to remove temp DB file");
assert!(test_result.is_ok());
}
fn get_temp_db_path() -> std::path::PathBuf {
let mut temp_db_path = std::env::temp_dir();
let thread_id = std::thread::current().id();
temp_db_path.push(format!("store-{:?}.lmdb", thread_id));
temp_db_path
}
fn mock_transaction_receipt(id: &str) -> TransactionReceipt {
TransactionReceipt {
transaction_id: id.into(),
transaction_result: TransactionResult::Valid {
state_changes: vec![],
events: vec![],
data: vec![],
},
}
}
}