use crate::stateful::db::{
ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig,
Unmerkleized as UnmerkleizedTrait,
};
use commonware_codec::{Codec, Read as CodecRead};
use commonware_cryptography::Hasher;
use commonware_parallel::Strategy;
use commonware_runtime::{Clock, Metrics, Storage};
use commonware_storage::{
index::{
unordered::Index as UnorderedIdx, Ordered as OrderedIndex, Unordered as UnorderedIndex,
},
journal::contiguous::{
fixed::Journal as FixedJournal, variable::Journal as VariableJournal, Contiguous, Mutable,
},
merkle::{Family, Location},
qmdb::{
any::{
batch::{MerkleizedBatch, UnmerkleizedBatch},
db::Db,
operation::{Operation, Update},
ordered, unordered,
value::{self, FixedEncoding, ValueEncoding, VariableEncoding},
FixedConfig, VariableConfig,
},
operation::Key,
sync::{self, resolver::Resolver, Target as AnySyncTarget},
Error,
},
translator::Translator,
Persistable,
};
use commonware_utils::{channel::mpsc, non_empty_range, sync::AsyncRwLock, Array};
use std::{ops::Deref, sync::Arc};
const ANY_BITMAP_CHUNK_BYTES: usize = 64;
type AnyDbHandle<F, E, C, I, H, U, S> =
Arc<AsyncRwLock<Db<F, E, C, I, H, U, ANY_BITMAP_CHUNK_BYTES, S>>>;
pub struct AnyUnmerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
batch: UnmerkleizedBatch<F, H, U, S>,
db: AnyDbHandle<F, E, C, I, H, U, S>,
metadata: Option<U::Value>,
}
impl<F, E, C, I, H, K, V, S> AnyUnmerkleized<F, E, C, I, H, unordered::Update<K, V>, S>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, unordered::Update<K, V>>>
+ Persistable<Error = commonware_storage::journal::Error>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, unordered::Update<K, V>>: Codec,
{
pub fn with_metadata(mut self, metadata: V::Value) -> Self {
self.metadata = Some(metadata);
self
}
pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
let db = self.db.read().await;
self.batch.get(key, &*db).await
}
pub async fn get_many(&self, keys: &[&K]) -> Result<Vec<Option<V::Value>>, Error<F>> {
let db = self.db.read().await;
self.batch.get_many(keys, &*db).await
}
pub fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.batch = self.batch.write(key, value);
self
}
}
pub struct AnyMerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
inner: Arc<MerkleizedBatch<F, H::Digest, U, S>>,
db: AnyDbHandle<F, E, C, I, H, U, S>,
}
impl<F, E, C, I, H, U, S> Deref for AnyUnmerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
type Target = UnmerkleizedBatch<F, H, U, S>;
fn deref(&self) -> &Self::Target {
&self.batch
}
}
impl<F, E, C, I, H, U, S> Deref for AnyMerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
type Target = MerkleizedBatch<F, H::Digest, U, S>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<F, E, C, I, H, K, V, S> AnyUnmerkleized<F, E, C, I, H, ordered::Update<K, V>, S>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, ordered::Update<K, V>>>
+ Persistable<Error = commonware_storage::journal::Error>,
I: OrderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, ordered::Update<K, V>>: Codec,
{
pub fn with_metadata(mut self, metadata: V::Value) -> Self {
self.metadata = Some(metadata);
self
}
pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
let db = self.db.read().await;
self.batch.get(key, &*db).await
}
pub async fn get_many(&self, keys: &[&K]) -> Result<Vec<Option<V::Value>>, Error<F>> {
let db = self.db.read().await;
self.batch.get_many(keys, &*db).await
}
pub fn write(mut self, key: K, value: Option<V::Value>) -> Self {
self.batch = self.batch.write(key, value);
self
}
}
impl<F, E, C, I, H, U, S> AnyMerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
{
pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, Error<F>> {
let db = self.db.read().await;
self.inner.get(key, &*db).await
}
pub async fn get_many(&self, keys: &[&U::Key]) -> Result<Vec<Option<U::Value>>, Error<F>> {
let db = self.db.read().await;
self.inner.get_many(keys, &*db).await
}
}
impl<F, E, C, I, H, K, V, S> UnmerkleizedTrait
for AnyUnmerkleized<F, E, C, I, H, unordered::Update<K, V>, S>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, unordered::Update<K, V>>>
+ Persistable<Error = commonware_storage::journal::Error>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, unordered::Update<K, V>>: Codec,
{
type Merkleized = AnyMerkleized<F, E, C, I, H, unordered::Update<K, V>, S>;
type Error = Error<F>;
async fn merkleize(self) -> Result<Self::Merkleized, Error<F>> {
let db = self.db.read().await;
let merkleized = self.batch.merkleize(&*db, self.metadata).await?;
Ok(AnyMerkleized {
inner: merkleized,
db: self.db.clone(),
})
}
}
impl<F, E, C, I, H, K, V, S> UnmerkleizedTrait
for AnyUnmerkleized<F, E, C, I, H, ordered::Update<K, V>, S>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: ValueEncoding + 'static,
C: Mutable<Item = Operation<F, ordered::Update<K, V>>>
+ Persistable<Error = commonware_storage::journal::Error>,
I: OrderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, ordered::Update<K, V>>: Codec,
{
type Merkleized = AnyMerkleized<F, E, C, I, H, ordered::Update<K, V>, S>;
type Error = Error<F>;
async fn merkleize(self) -> Result<Self::Merkleized, Error<F>> {
let db = self.db.read().await;
let merkleized = self.batch.merkleize(&*db, self.metadata).await?;
Ok(AnyMerkleized {
inner: merkleized,
db: self.db.clone(),
})
}
}
impl<F, E, C, I, H, U, S> MerkleizedTrait for AnyMerkleized<F, E, C, I, H, U, S>
where
F: Family,
E: Storage + Clock + Metrics,
U: Update,
C: Mutable<Item = Operation<F, U>> + Persistable<Error = commonware_storage::journal::Error>,
I: UnorderedIndex<Value = Location<F>> + 'static,
H: Hasher,
S: Strategy,
Operation<F, U>: Codec,
AnyUnmerkleized<F, E, C, I, H, U, S>: UnmerkleizedTrait,
{
type Digest = H::Digest;
type Unmerkleized = AnyUnmerkleized<F, E, C, I, H, U, S>;
fn root(&self) -> H::Digest {
self.inner.root()
}
fn new_batch(&self) -> Self::Unmerkleized {
AnyUnmerkleized {
batch: self.inner.new_batch::<H>(),
db: self.db.clone(),
metadata: None,
}
}
}
impl<F, E, K, V, H, T, S> ManagedDb<E>
for Db<
F,
E,
FixedJournal<E, Operation<F, unordered::Update<K, FixedEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, FixedEncoding<V>>,
ANY_BITMAP_CHUNK_BYTES,
S,
>
where
F: Family,
E: Storage + Clock + Metrics,
K: Array,
V: value::FixedValue + 'static,
H: Hasher + 'static,
T: Translator,
S: Strategy,
{
type Unmerkleized = AnyUnmerkleized<
F,
E,
FixedJournal<E, Operation<F, unordered::Update<K, FixedEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, FixedEncoding<V>>,
S,
>;
type Merkleized = AnyMerkleized<
F,
E,
FixedJournal<E, Operation<F, unordered::Update<K, FixedEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, FixedEncoding<V>>,
S,
>;
type Error = Error<F>;
type Config = FixedConfig<T, S>;
type SyncTarget = AnySyncTarget<F, H::Digest>;
async fn init(context: E, config: Self::Config) -> Result<Self, Error<F>> {
<Self>::init(context, config).await
}
async fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
let inner = db.read().await;
AnyUnmerkleized {
batch: inner.new_batch(),
db: db.clone(),
metadata: None,
}
}
fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool {
batch.root() == target.root
&& *target.range.start() == batch.bounds().inactivity_floor
&& *target.range.end() == Location::<F>::new(batch.bounds().total_size)
}
async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error<F>> {
self.apply_batch(batch.inner).await?;
self.sync().await?;
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
let bounds = self.bounds().await;
AnySyncTarget::new(
self.root(),
non_empty_range!(self.sync_boundary(), bounds.end),
)
}
async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error<F>> {
self.rewind(target.range.end()).await?;
self.sync().await?;
let rewound_target = self.sync_target().await;
assert_eq!(
rewound_target, target,
"rewound database target mismatch after rewind",
);
Ok(())
}
}
impl<F, E, K, V, H, T, S> ManagedDb<E>
for Db<
F,
E,
VariableJournal<E, Operation<F, unordered::Update<K, VariableEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, VariableEncoding<V>>,
ANY_BITMAP_CHUNK_BYTES,
S,
>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: value::VariableValue + 'static,
H: Hasher,
T: Translator,
S: Strategy,
Operation<F, unordered::Update<K, VariableEncoding<V>>>: Codec,
{
type Unmerkleized = AnyUnmerkleized<
F,
E,
VariableJournal<E, Operation<F, unordered::Update<K, VariableEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, VariableEncoding<V>>,
S,
>;
type Merkleized = AnyMerkleized<
F,
E,
VariableJournal<E, Operation<F, unordered::Update<K, VariableEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, VariableEncoding<V>>,
S,
>;
type Error = Error<F>;
type Config = VariableConfig<
T,
<Operation<F, unordered::Update<K, VariableEncoding<V>>> as CodecRead>::Cfg,
S,
>;
type SyncTarget = AnySyncTarget<F, H::Digest>;
async fn init(context: E, config: Self::Config) -> Result<Self, Error<F>> {
<Self>::init(context, config).await
}
async fn new_batch(db: &Arc<AsyncRwLock<Self>>) -> Self::Unmerkleized {
let inner = db.read().await;
AnyUnmerkleized {
batch: inner.new_batch(),
db: db.clone(),
metadata: None,
}
}
fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool {
batch.root() == target.root
&& *target.range.start() == batch.bounds().inactivity_floor
&& *target.range.end() == Location::<F>::new(batch.bounds().total_size)
}
async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error<F>> {
self.apply_batch(batch.inner).await?;
self.sync().await?;
Ok(())
}
async fn sync_target(&self) -> Self::SyncTarget {
let bounds = self.bounds().await;
AnySyncTarget::new(
self.root(),
non_empty_range!(self.sync_boundary(), bounds.end),
)
}
async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error<F>> {
self.rewind(target.range.end()).await?;
self.sync().await?;
let rewound_target = self.sync_target().await;
assert_eq!(
rewound_target, target,
"rewound database target mismatch after rewind",
);
Ok(())
}
}
impl<F, E, K, V, H, T, S, R> StateSyncDb<E, R>
for Db<
F,
E,
FixedJournal<E, Operation<F, unordered::Update<K, FixedEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, FixedEncoding<V>>,
ANY_BITMAP_CHUNK_BYTES,
S,
>
where
F: Family,
E: Storage + Clock + Metrics,
K: Array,
V: value::FixedValue + 'static,
H: Hasher,
T: Translator,
S: Strategy,
R: Resolver<
Family = F,
Op = Operation<F, unordered::Update<K, FixedEncoding<V>>>,
Digest = H::Digest,
>,
{
type SyncError = sync::Error<F, R::Error, H::Digest>;
async fn sync_db(
context: E,
config: Self::Config,
resolver: R,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
sync::sync(sync::engine::Config {
context,
resolver,
target,
max_outstanding_requests: sync_config.max_outstanding_requests,
fetch_batch_size: sync_config.fetch_batch_size,
apply_batch_size: sync_config.apply_batch_size,
db_config: config,
update_rx: Some(tip_updates),
finish_rx: finish,
reached_target_tx: reached_target,
max_retained_roots: sync_config.max_retained_roots,
})
.await
}
}
impl<F, E, K, V, H, T, S, R> StateSyncDb<E, R>
for Db<
F,
E,
VariableJournal<E, Operation<F, unordered::Update<K, VariableEncoding<V>>>>,
UnorderedIdx<T, Location<F>>,
H,
unordered::Update<K, VariableEncoding<V>>,
ANY_BITMAP_CHUNK_BYTES,
S,
>
where
F: Family,
E: Storage + Clock + Metrics,
K: Key,
V: value::VariableValue + 'static,
H: Hasher,
T: Translator,
S: Strategy,
Operation<F, unordered::Update<K, VariableEncoding<V>>>: Codec,
R: Resolver<
Family = F,
Op = Operation<F, unordered::Update<K, VariableEncoding<V>>>,
Digest = H::Digest,
>,
{
type SyncError = sync::Error<F, R::Error, H::Digest>;
async fn sync_db(
context: E,
config: Self::Config,
resolver: R,
target: Self::SyncTarget,
tip_updates: mpsc::Receiver<Self::SyncTarget>,
finish: Option<mpsc::Receiver<()>>,
reached_target: Option<mpsc::Sender<Self::SyncTarget>>,
sync_config: SyncEngineConfig,
) -> Result<Self, Self::SyncError> {
sync::sync(sync::engine::Config {
context,
resolver,
target,
max_outstanding_requests: sync_config.max_outstanding_requests,
fetch_batch_size: sync_config.fetch_batch_size,
apply_batch_size: sync_config.apply_batch_size,
db_config: config,
update_rx: Some(tip_updates),
finish_rx: finish,
reached_target_tx: reached_target,
max_retained_roots: sync_config.max_retained_roots,
})
.await
}
}