use std::{fmt::Debug, future::Future};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(with_testing)]
use crate::random::generate_test_namespace;
use crate::{
batch::{Batch, SimplifiedBatch},
common::from_bytes_option,
ViewError,
};
pub trait KeyValueStoreError:
std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
{
const BACKEND: &'static str;
fn must_reload_view(&self) -> bool {
false
}
}
impl<E: KeyValueStoreError> From<E> for ViewError {
fn from(error: E) -> Self {
let must_reload_view = error.must_reload_view();
Self::StoreError {
backend: E::BACKEND,
error: Box::new(error),
must_reload_view,
}
}
}
pub trait WithError {
type Error: KeyValueStoreError;
}
#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
pub trait ReadableKeyValueStore: WithError {
const MAX_KEY_SIZE: usize;
fn max_stream_queries(&self) -> usize;
fn root_key(&self) -> Result<Vec<u8>, Self::Error>;
async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error>;
async fn read_multi_values_bytes(
&self,
keys: &[Vec<u8>],
) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
fn read_value<V: DeserializeOwned>(
&self,
key: &[u8],
) -> impl Future<Output = Result<Option<V>, Self::Error>> {
async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
}
fn read_multi_values<V: DeserializeOwned + Send + Sync>(
&self,
keys: &[Vec<u8>],
) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
async {
let mut values = Vec::with_capacity(keys.len());
for entry in self.read_multi_values_bytes(keys).await? {
values.push(from_bytes_option(&entry)?);
}
Ok(values)
}
}
}
#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
pub trait WritableKeyValueStore: WithError {
const MAX_VALUE_SIZE: usize;
async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
async fn clear_journal(&self) -> Result<(), Self::Error>;
}
#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
pub trait DirectWritableKeyValueStore: WithError {
const MAX_BATCH_SIZE: usize;
const MAX_BATCH_TOTAL_SIZE: usize;
const MAX_VALUE_SIZE: usize;
type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
}
#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
pub trait KeyValueDatabase: WithError + linera_base::util::traits::AutoTraits + Sized {
type Config: Send + Sync;
type Store;
fn get_name() -> String;
async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error>;
fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
async {
let namespaces = Self::list_all(config).await?;
for namespace in namespaces {
Self::delete(config, &namespace).await?;
}
Ok(())
}
}
async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
fn maybe_create_and_connect(
config: &Self::Config,
namespace: &str,
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if !Self::exists(config, namespace).await? {
Self::create(config, namespace).await?;
}
Self::connect(config, namespace).await
}
}
fn recreate_and_connect(
config: &Self::Config,
namespace: &str,
) -> impl Future<Output = Result<Self, Self::Error>> {
async {
if Self::exists(config, namespace).await? {
Self::delete(config, namespace).await?;
}
Self::create(config, namespace).await?;
Self::connect(config, namespace).await
}
}
}
pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
#[cfg(with_testing)]
pub trait TestKeyValueDatabase: KeyValueDatabase {
async fn new_test_config() -> Result<Self::Config, Self::Error>;
async fn connect_test_namespace() -> Result<Self, Self::Error> {
let config = Self::new_test_config().await?;
let namespace = generate_test_namespace();
Self::recreate_and_connect(&config, &namespace).await
}
async fn new_test_store() -> Result<Self::Store, Self::Error> {
let database = Self::connect_test_namespace().await?;
database.open_shared(&[])
}
}
pub mod inactive_store {
use super::*;
pub struct InactiveStore;
#[derive(Clone, Copy, Debug)]
pub struct InactiveStoreError;
impl std::fmt::Display for InactiveStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "inactive store error")
}
}
impl From<bcs::Error> for InactiveStoreError {
fn from(_other: bcs::Error) -> Self {
Self
}
}
impl std::error::Error for InactiveStoreError {}
impl KeyValueStoreError for InactiveStoreError {
const BACKEND: &'static str = "inactive";
}
impl WithError for InactiveStore {
type Error = InactiveStoreError;
}
impl ReadableKeyValueStore for InactiveStore {
const MAX_KEY_SIZE: usize = 0;
fn max_stream_queries(&self) -> usize {
0
}
fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn contains_keys(&self, _keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn read_multi_values_bytes(
&self,
_keys: &[Vec<u8>],
) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn find_keys_by_prefix(
&self,
_key_prefix: &[u8],
) -> Result<Vec<Vec<u8>>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
async fn find_key_values_by_prefix(
&self,
_key_prefix: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
panic!("attempt to read from an inactive store!")
}
}
impl WritableKeyValueStore for InactiveStore {
const MAX_VALUE_SIZE: usize = 0;
async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
panic!("attempt to write to an inactive store!")
}
async fn clear_journal(&self) -> Result<(), Self::Error> {
panic!("attempt to write to an inactive store!")
}
}
}