mod chain_guards;
#[cfg(feature = "aws")]
mod dynamo_db;
mod memory;
#[cfg(feature = "rocksdb")]
mod rocks_db;
#[cfg(feature = "scylladb")]
mod scylla_db;
#[cfg(feature = "aws")]
pub use crate::dynamo_db::DynamoDbStoreClient;
pub use crate::memory::MemoryStoreClient;
#[cfg(feature = "rocksdb")]
pub use crate::rocks_db::RocksDbStoreClient;
#[cfg(feature = "scylladb")]
pub use crate::scylla_db::ScyllaDbStoreClient;
use crate::chain_guards::ChainGuards;
use async_trait::async_trait;
use chain_guards::ChainGuard;
use dashmap::{mapref::entry::Entry, DashMap};
use futures::future;
use linera_base::{
crypto::{CryptoHash, PublicKey},
data_types::{Amount, BlockHeight, Timestamp},
identifiers::{ChainDescription, ChainId},
};
use linera_chain::{
data_types::{Certificate, CertificateValue, HashedValue, LiteCertificate},
ChainError, ChainStateView,
};
use linera_execution::{
committee::{Committee, Epoch},
ChainOwnership, ExecutionError, ExecutionRuntimeContext, UserApplicationCode,
UserApplicationDescription, UserApplicationId, WasmRuntime,
};
use linera_views::{
batch::Batch,
common::{Context, ContextFromDb, KeyValueStoreClient},
value_splitting::DatabaseConsistencyError,
views::{CryptoHashView, RootView, View, ViewError},
};
use metrics::increment_counter;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc};
pub const READ_VALUE_COUNTER: &str = "read_value";
pub const WRITE_VALUE_COUNTER: &str = "write_value";
pub const READ_CERTIFICATE_COUNTER: &str = "read_certificate";
pub const WRITE_CERTIFICATE_COUNTER: &str = "write_certificate";
#[cfg(any(feature = "wasmer", feature = "wasmtime"))]
use linera_execution::{Operation, SystemOperation, WasmApplication};
#[async_trait]
pub trait Store: Sized {
type Context: Context<Extra = ChainRuntimeContext<Self>, Error = Self::ContextError>
+ Clone
+ Send
+ Sync
+ 'static;
type ContextError: std::error::Error + Debug + Sync + Send;
fn current_time(&self) -> Timestamp;
async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
async fn read_value(&self, hash: CryptoHash) -> Result<HashedValue, ViewError>;
async fn read_values_downward(
&self,
from: CryptoHash,
limit: u32,
) -> Result<Vec<HashedValue>, ViewError>;
async fn write_value(&self, value: &HashedValue) -> Result<(), ViewError>;
async fn write_values(&self, values: &[HashedValue]) -> Result<(), ViewError>;
async fn read_certificate(&self, hash: CryptoHash) -> Result<Certificate, ViewError>;
async fn write_certificate(&self, certificate: &Certificate) -> Result<(), ViewError>;
async fn write_certificates(&self, certificate: &[Certificate]) -> Result<(), ViewError>;
async fn load_active_chain(
&self,
id: ChainId,
) -> Result<ChainStateView<Self::Context>, linera_chain::ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
ViewError: From<Self::ContextError>,
{
let chain = self.load_chain(id).await?;
chain.ensure_is_active()?;
Ok(chain)
}
async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
&self,
keys: I,
) -> Result<Vec<Certificate>, ViewError>
where
Self: Clone + Send + 'static,
{
let mut tasks = Vec::new();
for key in keys {
let client = self.clone();
tasks.push(tokio::task::spawn(async move {
client.read_certificate(key).await
}));
}
let results = future::join_all(tasks).await;
let mut certs = Vec::new();
for result in results {
certs.push(result.expect("storage access should not cancel or crash")?);
}
Ok(certs)
}
async fn create_chain(
&self,
committee: Committee,
admin_id: ChainId,
description: ChainDescription,
public_key: PublicKey,
balance: Amount,
timestamp: Timestamp,
) -> Result<(), ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
ViewError: From<Self::ContextError>,
{
let id = description.into();
let mut chain = self.load_chain(id).await?;
assert!(!chain.is_active(), "Attempting to create a chain twice");
let system_state = &mut chain.execution_state.system;
system_state.description.set(Some(description));
system_state.epoch.set(Some(Epoch::ZERO));
system_state.admin_id.set(Some(admin_id));
system_state
.committees
.get_mut()
.insert(Epoch::ZERO, committee);
system_state
.ownership
.set(ChainOwnership::single(public_key));
system_state.balance.set(balance);
system_state.timestamp.set(timestamp);
let state_hash = chain.execution_state.crypto_hash().await?;
chain.execution_state_hash.set(Some(state_hash));
chain.manager.get_mut().reset(
&ChainOwnership::single(public_key),
BlockHeight(0),
self.current_time(),
)?;
chain.save().await?;
Ok(())
}
fn wasm_runtime(&self) -> Option<WasmRuntime>;
#[cfg(any(feature = "wasmer", feature = "wasmtime"))]
async fn load_application(
&self,
application_description: &UserApplicationDescription,
) -> Result<UserApplicationCode, ExecutionError> {
let Some(wasm_runtime) = self.wasm_runtime() else {
panic!("A Wasm runtime is required to load user applications.");
};
let UserApplicationDescription {
bytecode_id,
bytecode_location,
..
} = application_description;
let value = self
.read_value(bytecode_location.certificate_hash)
.await
.map_err(|error| match error {
ViewError::NotFound(_) => ExecutionError::ApplicationBytecodeNotFound(Box::new(
application_description.clone(),
)),
_ => error.into(),
})?
.into_inner();
let operations = match value {
CertificateValue::ConfirmedBlock { executed_block, .. } => {
executed_block.block.operations
}
_ => return Err(ExecutionError::InvalidBytecodeId(*bytecode_id)),
};
let index = usize::try_from(bytecode_location.operation_index)
.map_err(|_| linera_base::data_types::ArithmeticError::Overflow)?;
match operations.get(index) {
Some(Operation::System(SystemOperation::PublishBytecode { contract, service })) => {
Ok(Arc::new(
WasmApplication::new(contract.clone(), service.clone(), wasm_runtime).await?,
))
}
_ => Err(ExecutionError::InvalidBytecodeId(*bytecode_id)),
}
}
#[cfg(not(any(feature = "wasmer", feature = "wasmtime")))]
async fn load_application(
&self,
_application_description: &UserApplicationDescription,
) -> Result<UserApplicationCode, ExecutionError> {
panic!(
"A Wasm runtime is required to load user applications. \
Please enable the `wasmer` or the `wasmtime` feature flags \
when compiling `linera-storage`."
);
}
}
pub struct DbStore<Client> {
client: Client,
guards: ChainGuards,
user_applications: Arc<DashMap<UserApplicationId, UserApplicationCode>>,
wasm_runtime: Option<WasmRuntime>,
}
#[derive(Clone)]
pub struct DbStoreClient<Client, Clock> {
client: Arc<DbStore<Client>>,
pub clock: Clock,
}
#[derive(Debug, Serialize, Deserialize)]
enum BaseKey {
ChainState(ChainId),
Certificate(CryptoHash),
Value(CryptoHash),
}
pub trait Clock {
fn current_time(&self) -> Timestamp;
}
#[derive(Clone)]
pub struct WallClock;
impl Clock for WallClock {
fn current_time(&self) -> Timestamp {
Timestamp::now()
}
}
#[cfg(any(test, feature = "test"))]
#[derive(Clone, Default)]
pub struct TestClock(Arc<std::sync::atomic::AtomicU64>);
#[cfg(any(test, feature = "test"))]
impl Clock for TestClock {
fn current_time(&self) -> Timestamp {
Timestamp::from(self.0.load(std::sync::atomic::Ordering::SeqCst))
}
}
#[cfg(any(test, feature = "test"))]
impl TestClock {
pub fn new() -> Self {
TestClock(Arc::new(0.into()))
}
pub fn set(&self, timestamp: Timestamp) {
self.0
.store(timestamp.micros(), std::sync::atomic::Ordering::SeqCst);
}
pub fn add_micros(&self, micros: u64) {
self.0
.fetch_add(micros, std::sync::atomic::Ordering::SeqCst);
}
}
#[async_trait]
impl<Client, C> Store for DbStoreClient<Client, C>
where
Client: KeyValueStoreClient + Clone + Send + Sync + 'static,
C: Clock + Clone + Send + Sync + 'static,
ViewError: From<<Client as KeyValueStoreClient>::Error>,
<Client as KeyValueStoreClient>::Error:
From<bcs::Error> + From<DatabaseConsistencyError> + Send + Sync + serde::ser::StdError,
{
type Context = ContextFromDb<ChainRuntimeContext<Self>, Client>;
type ContextError = <Client as KeyValueStoreClient>::Error;
fn current_time(&self) -> Timestamp {
self.clock.current_time()
}
async fn load_chain(
&self,
chain_id: ChainId,
) -> Result<ChainStateView<Self::Context>, ViewError> {
tracing::trace!("Acquiring lock on {:?}", chain_id);
let guard = self.client.guards.guard(chain_id).await;
let runtime_context = ChainRuntimeContext {
store: self.clone(),
chain_id,
user_applications: self.client.user_applications.clone(),
chain_guard: Some(Arc::new(guard)),
};
let client = self.client.client.clone();
let base_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
let context = ContextFromDb::create(client, base_key, runtime_context).await?;
ChainStateView::load(context).await
}
async fn read_value(&self, hash: CryptoHash) -> Result<HashedValue, ViewError> {
let value_key = bcs::to_bytes(&BaseKey::Value(hash))?;
let maybe_value: Option<CertificateValue> = self.client.client.read_key(&value_key).await?;
let id = match &maybe_value {
Some(value) => value.chain_id().to_string(),
None => "not found".to_string(),
};
increment_counter!(READ_VALUE_COUNTER, &[("chain_id", id)]);
let value = maybe_value.ok_or_else(|| ViewError::not_found("value for hash", hash))?;
Ok(value.with_hash_unchecked(hash))
}
async fn read_values_downward(
&self,
from: CryptoHash,
limit: u32,
) -> Result<Vec<HashedValue>, ViewError> {
let mut hash = Some(from);
let mut result = Vec::new();
for _ in 0..limit {
let Some(next_hash) = hash else {
break;
};
let value = self.read_value(next_hash).await?;
let Some(executed_block) = value.inner().executed_block() else {
break;
};
hash = executed_block.block.previous_block_hash;
result.push(value);
}
Ok(result)
}
async fn write_value(&self, value: &HashedValue) -> Result<(), ViewError> {
let mut batch = Batch::new();
self.add_value_to_batch(value, &mut batch)?;
self.write_batch(batch).await
}
async fn write_values(&self, values: &[HashedValue]) -> Result<(), ViewError> {
let mut batch = Batch::new();
for value in values {
self.add_value_to_batch(value, &mut batch)?;
}
self.write_batch(batch).await
}
async fn read_certificate(&self, hash: CryptoHash) -> Result<Certificate, ViewError> {
let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
let value_key = bcs::to_bytes(&BaseKey::Value(hash))?;
let (cert_result, value_result) = tokio::join!(
self.client.client.read_key::<LiteCertificate>(&cert_key),
self.client.client.read_key::<CertificateValue>(&value_key)
);
if let Ok(maybe_value) = &value_result {
let id = match maybe_value {
Some(value) => value.chain_id().to_string(),
None => "not found".to_string(),
};
increment_counter!(READ_CERTIFICATE_COUNTER, &[("chain_id", id)]);
};
let value: CertificateValue =
value_result?.ok_or_else(|| ViewError::not_found("value for hash", hash))?;
let cert: LiteCertificate =
cert_result?.ok_or_else(|| ViewError::not_found("certificate for hash", hash))?;
Ok(cert
.with_value(value.with_hash_unchecked(hash))
.ok_or(ViewError::InconsistentEntries)?)
}
async fn write_certificate(&self, certificate: &Certificate) -> Result<(), ViewError> {
let mut batch = Batch::new();
self.add_certificate_to_batch(certificate, &mut batch)?;
self.write_batch(batch).await
}
async fn write_certificates(&self, certificates: &[Certificate]) -> Result<(), ViewError> {
let mut batch = Batch::new();
for certificate in certificates {
self.add_certificate_to_batch(certificate, &mut batch)?;
}
self.write_batch(batch).await
}
fn wasm_runtime(&self) -> Option<WasmRuntime> {
self.client.wasm_runtime
}
}
impl<Client, C> DbStoreClient<Client, C>
where
Client: KeyValueStoreClient + Clone + Send + Sync + 'static,
C: Clock,
ViewError: From<<Client as KeyValueStoreClient>::Error>,
<Client as KeyValueStoreClient>::Error: From<bcs::Error> + Send + Sync + serde::ser::StdError,
{
fn add_value_to_batch(&self, value: &HashedValue, batch: &mut Batch) -> Result<(), ViewError> {
let id = value.inner().chain_id().to_string();
increment_counter!(WRITE_VALUE_COUNTER, &[("chain_id", id)]);
let value_key = bcs::to_bytes(&BaseKey::Value(value.hash()))?;
batch.put_key_value(value_key.to_vec(), value)?;
Ok(())
}
fn add_certificate_to_batch(
&self,
certificate: &Certificate,
batch: &mut Batch,
) -> Result<(), ViewError> {
let id = certificate.value().chain_id().to_string();
increment_counter!(WRITE_CERTIFICATE_COUNTER, &[("chain_id", id)]);
let hash = certificate.hash();
let cert_key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
let value_key = bcs::to_bytes(&BaseKey::Value(hash))?;
batch.put_key_value(cert_key.to_vec(), &certificate.lite_certificate())?;
batch.put_key_value(value_key.to_vec(), &certificate.value)?;
Ok(())
}
async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
self.client.client.write_batch(batch, &[]).await?;
Ok(())
}
}
#[derive(Clone)]
pub struct ChainRuntimeContext<StoreClient> {
store: StoreClient,
pub chain_id: ChainId,
pub user_applications: Arc<DashMap<UserApplicationId, UserApplicationCode>>,
pub chain_guard: Option<Arc<ChainGuard>>,
}
#[async_trait]
impl<StoreClient> ExecutionRuntimeContext for ChainRuntimeContext<StoreClient>
where
StoreClient: Store + Send + Sync,
{
fn chain_id(&self) -> ChainId {
self.chain_id
}
fn user_applications(&self) -> &Arc<DashMap<UserApplicationId, UserApplicationCode>> {
&self.user_applications
}
async fn get_user_application(
&self,
description: &UserApplicationDescription,
) -> Result<UserApplicationCode, ExecutionError> {
match self.user_applications.entry(description.into()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
let application = self.store.load_application(description).await?;
entry.insert(application.clone());
Ok(application)
}
}
}
}