use std::collections::BTreeMap;
use std::error::Error;
use std::fmt::{self, Debug};
use std::marker::{self, PhantomData};
use std::ops::{self, DerefMut};
use std::pin::Pin;
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use fedimint_core::util::BoxFuture;
use fedimint_logging::LOG_DB;
use futures::{Stream, StreamExt};
use macro_rules_attribute::apply;
use serde::Serialize;
use strum_macros::EnumIter;
use thiserror::Error;
use tracing::{debug, error, info, instrument, trace, warn};
use crate::core::ModuleInstanceId;
use crate::encoding::{Decodable, Encodable};
use crate::fmt_utils::AbbreviateHexBytes;
use crate::task::{MaybeSend, MaybeSync};
use crate::{async_trait_maybe_send, maybe_add_send, timing};
pub mod mem_impl;
pub mod notifications;
pub use test_utils::*;
use self::notifications::{Notifications, NotifyQueue};
use crate::module::registry::ModuleDecoderRegistry;
pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
pub trait DatabaseKeyPrefix: Debug {
    fn to_bytes(&self) -> Vec<u8>;
}
pub trait DatabaseRecord: DatabaseKeyPrefix {
    const DB_PREFIX: u8;
    const NOTIFY_ON_MODIFY: bool = false;
    type Key: DatabaseKey + Debug;
    type Value: DatabaseValue + Debug;
}
pub trait DatabaseLookup: DatabaseKeyPrefix {
    type Record: DatabaseRecord;
}
impl<Record> DatabaseLookup for Record
where
    Record: DatabaseRecord + Debug + Decodable + Encodable,
{
    type Record = Record;
}
pub trait DatabaseKey: Sized {
    const NOTIFY_ON_MODIFY: bool = false;
    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
}
pub trait DatabaseKeyWithNotify {}
pub trait DatabaseValue: Sized + Debug {
    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
    fn to_bytes(&self) -> Vec<u8>;
}
pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
#[derive(Debug, Error)]
pub enum AutocommitError<E> {
    CommitFailed {
        attempts: usize,
        last_error: anyhow::Error,
    },
    ClosureError {
        attempts: usize,
        error: E,
    },
}
#[apply(async_trait_maybe_send!)]
pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
    type Transaction<'a>: IRawDatabaseTransaction;
    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
}
#[apply(async_trait_maybe_send!)]
impl<T> IRawDatabase for Box<T>
where
    T: IRawDatabase,
{
    type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
    async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
        (**self).begin_transaction().await
    }
}
pub trait IRawDatabaseExt: IRawDatabase + Sized {
    fn into_database(self) -> Database {
        Database::new(self, Default::default())
    }
}
impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
impl<T> From<T> for Database
where
    T: IRawDatabase,
{
    fn from(raw: T) -> Self {
        Database::new(raw, Default::default())
    }
}
#[apply(async_trait_maybe_send!)]
pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
    async fn register(&self, key: &[u8]);
    async fn notify(&self, key: &[u8]);
    fn prefix_len(&self) -> usize;
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabase for Arc<T>
where
    T: IDatabase + ?Sized,
{
    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
        (**self).begin_transaction().await
    }
    async fn register(&self, key: &[u8]) {
        (**self).register(key).await
    }
    async fn notify(&self, key: &[u8]) {
        (**self).notify(key).await
    }
    fn prefix_len(&self) -> usize {
        (**self).prefix_len()
    }
}
struct BaseDatabase<RawDatabase> {
    notifications: Arc<Notifications>,
    raw: RawDatabase,
}
impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("BaseDatabase")
    }
}
#[apply(async_trait_maybe_send!)]
impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
        Box::new(BaseDatabaseTransaction::new(
            self.raw.begin_transaction().await,
            self.notifications.clone(),
        ))
    }
    async fn register(&self, key: &[u8]) {
        self.notifications.register(key).await
    }
    async fn notify(&self, key: &[u8]) {
        self.notifications.notify(key).await
    }
    fn prefix_len(&self) -> usize {
        0
    }
}
#[derive(Clone, Debug)]
pub struct Database {
    inner: Arc<dyn IDatabase + 'static>,
    module_decoders: ModuleDecoderRegistry,
}
impl Database {
    pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
        let inner = BaseDatabase {
            raw,
            notifications: Arc::new(Notifications::new()),
        };
        Self::new_from_arc(
            Arc::new(inner) as Arc<dyn IDatabase + 'static>,
            module_decoders,
        )
    }
    pub fn new_from_arc(
        inner: Arc<dyn IDatabase + 'static>,
        module_decoders: ModuleDecoderRegistry,
    ) -> Self {
        Self {
            inner,
            module_decoders,
        }
    }
    pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
        Self {
            inner: Arc::new(PrefixDatabase {
                inner: self.inner.clone(),
                prefix,
            }),
            module_decoders: self.module_decoders.clone(),
        }
    }
    pub fn with_prefix_module_id(&self, module_instance_id: ModuleInstanceId) -> Self {
        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
        self.with_prefix(prefix)
    }
    pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
        Self {
            inner: self.inner.clone(),
            module_decoders,
        }
    }
    pub fn is_global(&self) -> bool {
        self.inner.prefix_len() == 0
    }
    pub fn ensure_global(&self) -> Result<()> {
        if !self.is_global() {
            bail!("Database instance not global");
        }
        Ok(())
    }
    pub fn ensure_isolated(&self) -> Result<()> {
        if self.is_global() {
            bail!("Database instance not isolated");
        }
        Ok(())
    }
    pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
    where
        's: 'tx,
    {
        DatabaseTransaction::<Committable>::new(
            self.inner.begin_transaction().await,
            self.module_decoders.clone(),
        )
    }
    pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
    where
        's: 'tx,
    {
        self.begin_transaction().await.into_nc()
    }
    pub async fn autocommit<'s, 'dbtx, F, T, E>(
        &'s self,
        tx_fn: F,
        max_attempts: Option<usize>,
    ) -> Result<T, AutocommitError<E>>
    where
        's: 'dbtx,
        for<'r, 'o> F: Fn(
            &'r mut DatabaseTransaction<'o>,
            PhantomBound<'dbtx, 'o>,
        ) -> BoxFuture<'r, Result<T, E>>,
    {
        assert_ne!(max_attempts, Some(0));
        let mut curr_attempts: usize = 0;
        loop {
            curr_attempts = curr_attempts
                .checked_add(1)
                .expect("db autocommit attempt counter overflowed");
            let mut dbtx = self.begin_transaction().await;
            let val = tx_fn(&mut dbtx.to_ref_nc(), PhantomData)
                .await
                .map_err(|err| AutocommitError::ClosureError {
                    attempts: curr_attempts,
                    error: err,
                })?;
            let _timing = timing::TimeReporter::new("autocmmit - commit_tx");
            match dbtx.commit_tx_result().await {
                Ok(()) => {
                    return Ok(val);
                }
                Err(err) => {
                    warn!(
                        target: LOG_DB,
                        curr_attempts, "Database commit failed in an autocommit block"
                    );
                    if max_attempts
                        .map(|max_att| max_att <= curr_attempts)
                        .unwrap_or(false)
                    {
                        return Err(AutocommitError::CommitFailed {
                            attempts: curr_attempts,
                            last_error: err,
                        });
                    }
                }
            }
        }
    }
    pub async fn wait_key_check<'a, K, T>(
        &'a self,
        key: &K,
        checker: impl Fn(Option<K::Value>) -> Option<T>,
    ) -> (T, DatabaseTransaction<'a, Committable>)
    where
        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
    {
        let key_bytes = key.to_bytes();
        loop {
            let notify = self.inner.register(&key_bytes);
            let mut tx = self.inner.begin_transaction().await;
            let maybe_value_bytes = tx
                .raw_get_bytes(&key_bytes)
                .await
                .expect("Unrecoverable error when reading from database")
                .map(|value_bytes| {
                    trace!(
                        "get_value: Decoding {} from bytes {:?}",
                        std::any::type_name::<K::Value>(),
                        value_bytes
                    );
                    K::Value::from_bytes(&value_bytes, &self.module_decoders)
                        .expect("Unrecoverable error when decoding the database value")
                });
            if let Some(value) = checker(maybe_value_bytes) {
                return (
                    value,
                    DatabaseTransaction::new(tx, self.module_decoders.clone()),
                );
            } else {
                notify.await;
                }
        }
    }
    pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
    where
        K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
    {
        self.wait_key_check(key, std::convert::identity).await.0
    }
}
fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
    let mut prefix = vec![MODULE_GLOBAL_PREFIX];
    module_instance_id
        .consensus_encode(&mut prefix)
        .expect("Error encoding module instance id as prefix");
    prefix
}
#[derive(Clone, Debug)]
struct PrefixDatabase<Inner>
where
    Inner: Debug,
{
    prefix: Vec<u8>,
    inner: Inner,
}
impl<Inner> PrefixDatabase<Inner>
where
    Inner: Debug,
{
    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
        let mut full_key = self.prefix.clone();
        full_key.extend_from_slice(key);
        full_key
    }
}
#[apply(async_trait_maybe_send!)]
impl<Inner> IDatabase for PrefixDatabase<Inner>
where
    Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
{
    async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
        Box::new(PrefixDatabaseTransaction {
            inner: self.inner.begin_transaction().await,
            prefix: self.prefix.clone(),
        })
    }
    async fn register(&self, key: &[u8]) {
        self.inner.register(&self.get_full_key(key)).await
    }
    async fn notify(&self, key: &[u8]) {
        self.inner.notify(&self.get_full_key(key)).await
    }
    fn prefix_len(&self) -> usize {
        self.inner.prefix_len() + self.prefix.len()
    }
}
struct PrefixDatabaseTransaction<Inner> {
    inner: Inner,
    prefix: Vec<u8>,
}
impl<Inner> PrefixDatabaseTransaction<Inner> {
    fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
        let mut full_key = self.prefix.clone();
        full_key.extend_from_slice(key);
        full_key
    }
    fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
        Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v))) }
}
#[apply(async_trait_maybe_send!)]
impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
where
    Inner: IDatabaseTransaction,
{
    async fn commit_tx(&mut self) -> Result<()> {
        self.inner.commit_tx().await
    }
    fn prefix_len(&self) -> usize {
        self.inner.prefix_len() + self.prefix.len()
    }
}
#[apply(async_trait_maybe_send!)]
impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
where
    Inner: IDatabaseTransactionOpsCore,
{
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
        let key = self.get_full_key(key);
        self.inner.raw_insert_bytes(&key, value).await
    }
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        let key = self.get_full_key(key);
        self.inner.raw_get_bytes(&key).await
    }
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        let key = self.get_full_key(key);
        self.inner.raw_remove_entry(&key).await
    }
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
        let key = self.get_full_key(key_prefix);
        let stream = self.inner.raw_find_by_prefix(&key).await?;
        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
    }
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>> {
        let key = self.get_full_key(key_prefix);
        let stream = self
            .inner
            .raw_find_by_prefix_sorted_descending(&key)
            .await?;
        Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
    }
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
        let key = self.get_full_key(key_prefix);
        self.inner.raw_remove_by_prefix(&key).await
    }
}
#[apply(async_trait_maybe_send!)]
impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
where
    Inner: IDatabaseTransactionOps,
{
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
        self.inner.rollback_tx_to_savepoint().await
    }
    async fn set_tx_savepoint(&mut self) -> Result<()> {
        self.set_tx_savepoint().await
    }
}
#[apply(async_trait_maybe_send!)]
pub trait IDatabaseTransactionOpsCore: MaybeSend {
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>>;
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabaseTransactionOpsCore for Box<T>
where
    T: IDatabaseTransactionOpsCore + ?Sized,
{
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_insert_bytes(key, value).await
    }
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_get_bytes(key).await
    }
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_remove_entry(key).await
    }
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
        (**self).raw_find_by_prefix(key_prefix).await
    }
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>> {
        (**self)
            .raw_find_by_prefix_sorted_descending(key_prefix)
            .await
    }
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
        (**self).raw_remove_by_prefix(key_prefix).await
    }
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabaseTransactionOpsCore for &mut T
where
    T: IDatabaseTransactionOpsCore + ?Sized,
{
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_insert_bytes(key, value).await
    }
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_get_bytes(key).await
    }
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        (**self).raw_remove_entry(key).await
    }
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
        (**self).raw_find_by_prefix(key_prefix).await
    }
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>> {
        (**self)
            .raw_find_by_prefix_sorted_descending(key_prefix)
            .await
    }
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
        (**self).raw_remove_by_prefix(key_prefix).await
    }
}
#[apply(async_trait_maybe_send!)]
pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
    async fn set_tx_savepoint(&mut self) -> Result<()>;
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabaseTransactionOps for Box<T>
where
    T: IDatabaseTransactionOps + ?Sized,
{
    async fn set_tx_savepoint(&mut self) -> Result<()> {
        (**self).set_tx_savepoint().await
    }
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
        (**self).rollback_tx_to_savepoint().await
    }
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabaseTransactionOps for &mut T
where
    T: IDatabaseTransactionOps + ?Sized,
{
    async fn set_tx_savepoint(&mut self) -> Result<()> {
        (**self).set_tx_savepoint().await
    }
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
        (**self).rollback_tx_to_savepoint().await
    }
}
#[apply(async_trait_maybe_send!)]
pub trait IDatabaseTransactionOpsCoreTyped<'a> {
    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
        K::Value: MaybeSend + MaybeSync;
    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
        K::Value: MaybeSend + MaybeSync;
    async fn find_by_prefix<KP>(
        &mut self,
        key_prefix: &KP,
    ) -> Pin<
        Box<
            maybe_add_send!(
                dyn Stream<
                        Item = (
                            KP::Record,
                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
                        ),
                    > + '_
            ),
        >,
    >
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync,
        KP::Record: DatabaseKey;
    async fn find_by_prefix_sorted_descending<KP>(
        &mut self,
        key_prefix: &KP,
    ) -> Pin<
        Box<
            maybe_add_send!(
                dyn Stream<
                        Item = (
                            KP::Record,
                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
                        ),
                    > + '_
            ),
        >,
    >
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync,
        KP::Record: DatabaseKey;
    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync;
}
#[apply(async_trait_maybe_send!)]
impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
where
    T: IDatabaseTransactionOpsCore + WithDecoders,
{
    async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
    {
        let raw = self
            .raw_get_bytes(&key.to_bytes())
            .await
            .expect("Unrecoverable error occurred while reading and entry from the database");
        raw.map(|value_bytes| {
            decode_value::<K::Value>(&value_bytes, self.decoders())
                .expect("Unrecoverable error when decoding the database value")
        })
    }
    async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
        K::Value: MaybeSend + MaybeSync,
    {
        self.raw_insert_bytes(&key.to_bytes(), &value.to_bytes())
            .await
            .expect("Unrecoverable error occurred while inserting entry into the database")
            .map(|value_bytes| {
                decode_value::<K::Value>(&value_bytes, self.decoders())
                    .expect("Unrecoverable error when decoding the database value")
            })
    }
    async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
        K::Value: MaybeSend + MaybeSync,
    {
        if let Some(prev) = self.insert_entry(key, value).await {
            warn!(
                target: LOG_DB,
                "Database overwriting element when expecting insertion of new
            entry. Key: {:?} Prev Value: {:?}",             key,
                prev,
            );
        }
    }
    async fn find_by_prefix<KP>(
        &mut self,
        key_prefix: &KP,
    ) -> Pin<
        Box<
            maybe_add_send!(
                dyn Stream<
                        Item = (
                            KP::Record,
                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
                        ),
                    > + '_
            ),
        >,
    >
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync,
        KP::Record: DatabaseKey,
    {
        let decoders = self.decoders().clone();
        Box::pin(
            self.raw_find_by_prefix(&key_prefix.to_bytes())
                .await
                .expect("Unrecoverable error occurred while listing entries from the database")
                .map(move |(key_bytes, value_bytes)| {
                    let key = KP::Record::from_bytes(&key_bytes, &decoders)
                        .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                        .expect("Unrecoverable error reading DatabaseKey");
                    let value = decode_value(&value_bytes, &decoders)
                        .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                        .expect("Unrecoverable decoding DatabaseValue");
                    (key, value)
                }),
        )
    }
    async fn find_by_prefix_sorted_descending<KP>(
        &mut self,
        key_prefix: &KP,
    ) -> Pin<
        Box<
            maybe_add_send!(
                dyn Stream<
                        Item = (
                            KP::Record,
                            <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
                        ),
                    > + '_
            ),
        >,
    >
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync,
        KP::Record: DatabaseKey,
    {
        let decoders = self.decoders().clone();
        Box::pin(
            self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
                .await
                .expect("Unrecoverable error occurred while listing entries from the database")
                .map(move |(key_bytes, value_bytes)| {
                    let key = KP::Record::from_bytes(&key_bytes, &decoders)
                        .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                        .expect("Unrecoverable error reading DatabaseKey");
                    let value = decode_value(&value_bytes, &decoders)
                        .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                        .expect("Unrecoverable decoding DatabaseValue");
                    (key, value)
                }),
        )
    }
    async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
    where
        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
    {
        self.raw_remove_entry(&key.to_bytes())
            .await
            .expect("Unrecoverable error occurred while inserting removing entry from the database")
            .map(|value_bytes| {
                decode_value::<K::Value>(&value_bytes, self.decoders())
                    .expect("Unrecoverable error when decoding the database value")
            })
    }
    async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
    where
        KP: DatabaseLookup + MaybeSend + MaybeSync,
    {
        self.raw_remove_by_prefix(&key_prefix.to_bytes())
            .await
            .expect("Unrecoverable error when removing entries from the database")
    }
}
pub trait WithDecoders {
    fn decoders(&self) -> &ModuleDecoderRegistry;
}
#[apply(async_trait_maybe_send!)]
pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
    async fn commit_tx(self) -> Result<()>;
}
#[apply(async_trait_maybe_send!)]
pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
    async fn commit_tx(&mut self) -> Result<()>;
    fn prefix_len(&self) -> usize;
}
#[apply(async_trait_maybe_send!)]
impl<T> IDatabaseTransaction for Box<T>
where
    T: IDatabaseTransaction + ?Sized,
{
    async fn commit_tx(&mut self) -> Result<()> {
        (**self).commit_tx().await
    }
    fn prefix_len(&self) -> usize {
        (**self).prefix_len()
    }
}
#[apply(async_trait_maybe_send!)]
impl<'a, T> IDatabaseTransaction for &'a mut T
where
    T: IDatabaseTransaction + ?Sized,
{
    async fn commit_tx(&mut self) -> Result<()> {
        (**self).commit_tx().await
    }
    fn prefix_len(&self) -> usize {
        (**self).prefix_len()
    }
}
struct BaseDatabaseTransaction<Tx> {
    raw: Option<Tx>,
    notify_queue: Option<NotifyQueue>,
    notifications: Arc<Notifications>,
}
impl<Tx> BaseDatabaseTransaction<Tx>
where
    Tx: IRawDatabaseTransaction,
{
    fn new(dbtx: Tx, notifications: Arc<Notifications>) -> BaseDatabaseTransaction<Tx> {
        BaseDatabaseTransaction {
            raw: Some(dbtx),
            notifications,
            notify_queue: Some(NotifyQueue::new()),
        }
    }
    fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
        self.notify_queue
            .as_mut()
            .context("can not call add_notification_key after commit")?
            .add(&key);
        Ok(())
    }
}
#[apply(async_trait_maybe_send!)]
impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
        self.add_notification_key(key)?;
        self.raw
            .as_mut()
            .context("Cannot insert into already consumed transaction")?
            .raw_insert_bytes(key, value)
            .await
    }
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        self.raw
            .as_mut()
            .context("Cannot retrieve from already consumed transaction")?
            .raw_get_bytes(key)
            .await
    }
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        self.add_notification_key(key)?;
        self.raw
            .as_mut()
            .context("Cannot remove from already consumed transaction")?
            .raw_remove_entry(key)
            .await
    }
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
        self.raw
            .as_mut()
            .context("Cannot retrieve from already consumed transaction")?
            .raw_find_by_prefix(key_prefix)
            .await
    }
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>> {
        self.raw
            .as_mut()
            .context("Cannot retrieve from already consumed transaction")?
            .raw_find_by_prefix_sorted_descending(key_prefix)
            .await
    }
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
        self.raw
            .as_mut()
            .context("Cannot remove from already consumed transaction")?
            .raw_remove_by_prefix(key_prefix)
            .await
    }
}
#[apply(async_trait_maybe_send!)]
impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
        self.raw
            .as_mut()
            .context("Cannot rollback to a savepoint on an already consumed transaction")?
            .rollback_tx_to_savepoint()
            .await?;
        Ok(())
    }
    async fn set_tx_savepoint(&mut self) -> Result<()> {
        self.raw
            .as_mut()
            .context("Cannot set a tx savepoint on an already consumed transaction")?
            .set_tx_savepoint()
            .await?;
        Ok(())
    }
}
#[apply(async_trait_maybe_send!)]
impl<Tx: IRawDatabaseTransaction> IDatabaseTransaction for BaseDatabaseTransaction<Tx> {
    async fn commit_tx(&mut self) -> Result<()> {
        self.raw
            .take()
            .context("Cannot commit an already committed transaction")?
            .commit_tx()
            .await?;
        self.notifications.submit_queue(
            self.notify_queue
                .take()
                .expect("commit must be called only once"),
        );
        Ok(())
    }
    fn prefix_len(&self) -> usize {
        0
    }
}
#[derive(Clone)]
struct CommitTracker {
    is_committed: bool,
    has_writes: bool,
    ignore_uncommitted: bool,
}
impl Drop for CommitTracker {
    fn drop(&mut self) {
        if self.has_writes && !self.is_committed {
            if self.ignore_uncommitted {
                debug!(
                    target: LOG_DB,
                    "DatabaseTransaction has writes and has not called commit, but that's expected."
                );
            } else {
                warn!(
                    target: LOG_DB,
                    location = ?backtrace::Backtrace::new(),
                    "DatabaseTransaction has writes and has not called commit."
                );
            }
        }
    }
}
enum MaybeRef<'a, T> {
    Owned(T),
    Borrowed(&'a mut T),
}
impl<'a, T> ops::Deref for MaybeRef<'a, T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        match self {
            MaybeRef::Owned(o) => o,
            MaybeRef::Borrowed(r) => r,
        }
    }
}
impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        match self {
            MaybeRef::Owned(o) => o,
            MaybeRef::Borrowed(r) => r,
        }
    }
}
pub struct Committable;
pub struct NonCommittable;
pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
    tx: Box<dyn IDatabaseTransaction + 'tx>,
    decoders: ModuleDecoderRegistry,
    commit_tracker: MaybeRef<'tx, CommitTracker>,
    on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
    capability: marker::PhantomData<Cap>,
}
impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
    fn decoders(&self) -> &ModuleDecoderRegistry {
        &self.decoders
    }
}
#[instrument(level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
fn decode_value<V: DatabaseValue>(
    value_bytes: &[u8],
    decoders: &ModuleDecoderRegistry,
) -> Result<V, DecodingError> {
    trace!(
        bytes = %AbbreviateHexBytes(value_bytes),
        "decoding value",
    );
    V::from_bytes(value_bytes, decoders)
}
impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
    pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
        DatabaseTransaction {
            tx: self.tx,
            decoders: self.decoders,
            commit_tracker: self.commit_tracker,
            on_commit_hooks: self.on_commit_hooks,
            capability: PhantomData::<NonCommittable>,
        }
    }
    pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
    where
        's: 'a,
    {
        self.to_ref().into_nc()
    }
    pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
    where
        'tx: 'a,
    {
        DatabaseTransaction {
            tx: Box::new(PrefixDatabaseTransaction {
                inner: self.tx,
                prefix,
            }),
            decoders: self.decoders,
            commit_tracker: self.commit_tracker,
            on_commit_hooks: self.on_commit_hooks,
            capability: self.capability,
        }
    }
    pub fn with_prefix_module_id<'a: 'tx>(
        self,
        module_instance_id: ModuleInstanceId,
    ) -> DatabaseTransaction<'a, Cap>
    where
        'tx: 'a,
    {
        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
        self.with_prefix(prefix)
    }
    pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
    where
        's: 'a,
    {
        let decoders = self.decoders.clone();
        DatabaseTransaction {
            tx: Box::new(&mut self.tx),
            decoders,
            commit_tracker: match self.commit_tracker {
                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
            },
            on_commit_hooks: match self.on_commit_hooks {
                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
            },
            capability: self.capability,
        }
    }
    pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
    where
        'tx: 'a,
    {
        DatabaseTransaction {
            tx: Box::new(PrefixDatabaseTransaction {
                inner: &mut self.tx,
                prefix,
            }),
            decoders: self.decoders.clone(),
            commit_tracker: match self.commit_tracker {
                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
            },
            on_commit_hooks: match self.on_commit_hooks {
                MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
                MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
            },
            capability: self.capability,
        }
    }
    pub fn to_ref_with_prefix_module_id<'a>(
        &'a mut self,
        module_instance_id: ModuleInstanceId,
    ) -> DatabaseTransaction<'a, Cap>
    where
        'tx: 'a,
    {
        let prefix = module_instance_id_to_byte_prefix(module_instance_id);
        self.to_ref_with_prefix(prefix)
    }
    pub fn is_global(&self) -> bool {
        self.tx.prefix_len() == 0
    }
    pub fn ensure_global(&self) -> Result<()> {
        if !self.is_global() {
            bail!("Database instance not global");
        }
        Ok(())
    }
    pub fn ensure_isolated(&self) -> Result<()> {
        if self.is_global() {
            bail!("Database instance not isolated");
        }
        Ok(())
    }
    pub fn ignore_uncommitted(&mut self) -> &mut Self {
        self.commit_tracker.ignore_uncommitted = true;
        self
    }
    pub fn warn_uncommitted(&mut self) -> &mut Self {
        self.commit_tracker.ignore_uncommitted = false;
        self
    }
    #[instrument(level = "debug", skip_all, ret)]
    pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
        self.on_commit_hooks.push(Box::new(f));
    }
}
impl<'tx> DatabaseTransaction<'tx, Committable> {
    pub fn new(
        dbtx: Box<dyn IDatabaseTransaction + 'tx>,
        decoders: ModuleDecoderRegistry,
    ) -> DatabaseTransaction<'tx, Committable> {
        DatabaseTransaction {
            tx: dbtx,
            decoders,
            commit_tracker: MaybeRef::Owned(CommitTracker {
                is_committed: false,
                has_writes: false,
                ignore_uncommitted: false,
            }),
            on_commit_hooks: MaybeRef::Owned(vec![]),
            capability: PhantomData,
        }
    }
    pub async fn commit_tx_result(mut self) -> Result<()> {
        self.commit_tracker.is_committed = true;
        let commit_result = self.tx.commit_tx().await;
        if commit_result.is_ok() {
            for hook in self.on_commit_hooks.deref_mut().drain(..) {
                hook();
            }
        }
        commit_result
    }
    pub async fn commit_tx(mut self) {
        self.commit_tracker.is_committed = true;
        self.commit_tx_result()
            .await
            .expect("Unrecoverable error occurred while committing to the database.");
    }
}
#[apply(async_trait_maybe_send!)]
impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
where
    Cap: Send,
{
    async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
        self.commit_tracker.has_writes = true;
        self.tx.raw_insert_bytes(key, value).await
    }
    async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        self.tx.raw_get_bytes(key).await
    }
    async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
        self.tx.raw_remove_entry(key).await
    }
    async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
        self.tx.raw_find_by_prefix(key_prefix).await
    }
    async fn raw_find_by_prefix_sorted_descending(
        &mut self,
        key_prefix: &[u8],
    ) -> Result<PrefixStream<'_>> {
        self.tx
            .raw_find_by_prefix_sorted_descending(key_prefix)
            .await
    }
    async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
        self.commit_tracker.has_writes = true;
        self.tx.raw_remove_by_prefix(key_prefix).await
    }
}
#[apply(async_trait_maybe_send!)]
impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
    async fn set_tx_savepoint(&mut self) -> Result<()> {
        self.tx.set_tx_savepoint().await
    }
    async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
        self.tx.rollback_tx_to_savepoint().await
    }
}
impl<T> DatabaseKeyPrefix for T
where
    T: DatabaseLookup + crate::encoding::Encodable + Debug,
{
    fn to_bytes(&self) -> Vec<u8> {
        let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
        self.consensus_encode(&mut data)
            .expect("Writing to vec is infallible");
        data
    }
}
impl<T> DatabaseKey for T
where
    T: DatabaseRecord + crate::encoding::Decodable + Sized,
{
    const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
        if data.is_empty() {
            return Err(DecodingError::wrong_length(1, 0));
        }
        if data[0] != Self::DB_PREFIX {
            return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
        }
        <Self as crate::encoding::Decodable>::consensus_decode(
            &mut std::io::Cursor::new(&data[1..]),
            modules,
        )
        .map_err(|decode_error| DecodingError::Other(decode_error.0))
    }
}
impl<T> DatabaseValue for T
where
    T: Debug + Encodable + Decodable,
{
    fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
        T::consensus_decode(&mut std::io::Cursor::new(data), modules)
            .map_err(|e| DecodingError::Other(e.0))
    }
    fn to_bytes(&self) -> Vec<u8> {
        let mut bytes = Vec::new();
        self.consensus_encode(&mut bytes)
            .expect("writing to vec can't fail");
        bytes
    }
}
#[macro_export]
macro_rules! impl_db_record {
    (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
        impl $crate::db::DatabaseRecord for $key {
            const DB_PREFIX: u8 = $db_prefix as u8;
            $(const NOTIFY_ON_MODIFY: bool = $notify;)?
            type Key = Self;
            type Value = $val;
        }
        $(
            impl_db_record! {
                @impl_notify_marker key = $key, notify_on_modify = $notify
            }
        )?
    };
    (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
        impl $crate::db::DatabaseKeyWithNotify for $key {}
    };
    (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
}
#[macro_export]
macro_rules! impl_db_lookup{
    (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
        $(
            impl $crate::db::DatabaseLookup for $query_prefix {
                type Record = $key;
            }
        )*
    };
}
#[derive(Debug, Encodable, Decodable, Serialize)]
pub struct DatabaseVersionKey;
#[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct DatabaseVersion(pub u64);
impl_db_record!(
    key = DatabaseVersionKey,
    value = DatabaseVersion,
    db_prefix = DbKeyPrefix::DatabaseVersion
);
impl std::fmt::Display for DatabaseVersion {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}
impl DatabaseVersion {
    pub fn increment(&mut self) {
        self.0 += 1;
    }
}
impl std::fmt::Display for DbKeyPrefix {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{self:?}")
    }
}
#[repr(u8)]
#[derive(Clone, EnumIter, Debug)]
pub enum DbKeyPrefix {
    DatabaseVersion = 0x50,
    ClientBackup = 0x51,
}
#[derive(Debug, Error)]
pub enum DecodingError {
    #[error("Key had a wrong prefix, expected {expected} but got {found}")]
    WrongPrefix { expected: u8, found: u8 },
    #[error("Key had a wrong length, expected {expected} but got {found}")]
    WrongLength { expected: usize, found: usize },
    #[error("Other decoding error: {0}")]
    Other(anyhow::Error),
}
impl DecodingError {
    pub fn other<E: Error + Send + Sync + 'static>(error: E) -> DecodingError {
        DecodingError::Other(anyhow::Error::from(error))
    }
    pub fn wrong_prefix(expected: u8, found: u8) -> DecodingError {
        DecodingError::WrongPrefix { expected, found }
    }
    pub fn wrong_length(expected: usize, found: usize) -> DecodingError {
        DecodingError::WrongLength { expected, found }
    }
}
#[macro_export]
macro_rules! push_db_pair_items {
    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
        let db_items =
            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
                .await
                .map(|(key, val)| {
                    (
                        $crate::encoding::Encodable::consensus_encode_to_hex(&key)
                            .expect("can't fail"),
                        val,
                    )
                })
                .collect::<BTreeMap<String, $value_type>>()
                .await;
        $map.insert($key_literal.to_string(), Box::new(db_items));
    };
}
#[macro_export]
macro_rules! push_db_pair_items_no_serde {
    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
        let db_items =
            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
                .await
                .map(|(key, val)| {
                    (
                        $crate::encoding::Encodable::consensus_encode_to_hex(&key)
                            .expect("can't fail"),
                        SerdeWrapper::from_encodable(val),
                    )
                })
                .collect::<BTreeMap<_, _>>()
                .await;
        $map.insert($key_literal.to_string(), Box::new(db_items));
    };
}
#[macro_export]
macro_rules! push_db_key_items {
    ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
        let db_items =
            $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
                .await
                .map(|(key, _)| key)
                .collect::<Vec<$key_type>>()
                .await;
        $map.insert($key_literal.to_string(), Box::new(db_items));
    };
}
pub type MigrationMap = BTreeMap<
    DatabaseVersion,
    for<'r, 'tx> fn(
        &'r mut DatabaseTransaction<'tx>,
    ) -> Pin<Box<dyn futures::Future<Output = anyhow::Result<()>> + Send + 'r>>,
>;
pub async fn apply_migrations(
    db: &Database,
    kind: String,
    target_db_version: DatabaseVersion,
    migrations: MigrationMap,
) -> Result<(), anyhow::Error> {
    let mut dbtx = db.begin_transaction().await;
    let disk_version = dbtx.get_value(&DatabaseVersionKey).await;
    let db_version = if let Some(disk_version) = disk_version {
        let mut current_db_version = disk_version;
        if current_db_version > target_db_version {
            return Err(anyhow::anyhow!(format!(
                "On disk database version for module {kind} was higher than the code database version."
            )));
        }
        while current_db_version < target_db_version {
            if let Some(migration) = migrations.get(¤t_db_version) {
                migration(&mut dbtx.to_ref_nc()).await?;
            } else {
                panic!("Missing migration for version {current_db_version}");
            }
            current_db_version.increment();
            dbtx.insert_entry(&DatabaseVersionKey, ¤t_db_version)
                .await;
        }
        current_db_version
    } else {
        dbtx.insert_entry(&DatabaseVersionKey, &target_db_version)
            .await;
        target_db_version
    };
    dbtx.commit_tx_result().await?;
    info!(target: LOG_DB, "{} module db version: {}", kind, db_version);
    Ok(())
}
#[allow(unused_imports)]
mod test_utils {
    use std::time::Duration;
    use futures::{Future, FutureExt, StreamExt};
    use super::{
        apply_migrations, Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
        MigrationMap,
    };
    use crate::core::ModuleKind;
    use crate::db::mem_impl::MemDatabase;
    use crate::db::{IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped};
    use crate::encoding::{Decodable, Encodable};
    use crate::module::registry::ModuleDecoderRegistry;
    pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
        crate::task::timeout(Duration::from_millis(10), fut)
            .await
            .ok()
    }
    #[repr(u8)]
    #[derive(Clone)]
    pub enum TestDbKeyPrefix {
        Test = 0x42,
        AltTest = 0x43,
        PercentTestKey = 0x25,
    }
    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
    pub(super) struct TestKey(pub u64);
    #[derive(Debug, Encodable, Decodable)]
    struct DbPrefixTestPrefix;
    impl_db_record!(
        key = TestKey,
        value = TestVal,
        db_prefix = TestDbKeyPrefix::Test,
        notify_on_modify = true,
    );
    impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
    #[derive(Debug, Encodable, Decodable)]
    struct TestKeyV0(u64, u64);
    #[derive(Debug, Encodable, Decodable)]
    struct DbPrefixTestPrefixV0;
    impl_db_record!(
        key = TestKeyV0,
        value = TestVal,
        db_prefix = TestDbKeyPrefix::Test,
    );
    impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
    #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
    struct AltTestKey(u64);
    #[derive(Debug, Encodable, Decodable)]
    struct AltDbPrefixTestPrefix;
    impl_db_record!(
        key = AltTestKey,
        value = TestVal,
        db_prefix = TestDbKeyPrefix::AltTest,
    );
    impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
    #[derive(Debug, Encodable, Decodable)]
    struct PercentTestKey(u64);
    #[derive(Debug, Encodable, Decodable)]
    struct PercentPrefixTestPrefix;
    impl_db_record!(
        key = PercentTestKey,
        value = TestVal,
        db_prefix = TestDbKeyPrefix::PercentTestKey,
    );
    impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
    #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
    pub(super) struct TestVal(pub u64);
    const TEST_MODULE_PREFIX: u16 = 1;
    const ALT_MODULE_PREFIX: u16 = 2;
    pub async fn verify_insert_elements(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
        assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
        dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
        dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(
            dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
            Some(TestVal(2))
        );
        assert_eq!(
            dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
            Some(TestVal(3))
        );
        dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
        assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
        dbtx.commit_tx().await;
    }
    pub async fn verify_remove_nonexisting(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
        let removed = dbtx.remove_entry(&TestKey(1)).await;
        assert!(removed.is_none());
        dbtx.commit_tx().await;
    }
    pub async fn verify_remove_existing(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
        let removed = dbtx.remove_entry(&TestKey(1)).await;
        assert_eq!(removed, Some(TestVal(2)));
        assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
        dbtx.commit_tx().await;
    }
    pub async fn verify_read_own_writes(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
        assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
        dbtx.commit_tx().await;
    }
    pub async fn verify_prevent_dirty_reads(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
        let mut dbtx2 = db.begin_transaction().await;
        assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
        dbtx.commit_tx().await;
    }
    pub async fn verify_find_by_prefix(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
        dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
        dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
        dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
        dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        let mut returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .collect::<Vec<_>>()
            .await;
        returned_keys.sort();
        let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
        assert_eq!(returned_keys, expected);
        let reversed = dbtx
            .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
            .await
            .collect::<Vec<_>>()
            .await;
        let mut reversed_expected = expected;
        reversed_expected.reverse();
        assert_eq!(reversed, reversed_expected);
        let mut returned_keys = dbtx
            .find_by_prefix(&AltDbPrefixTestPrefix)
            .await
            .collect::<Vec<_>>()
            .await;
        returned_keys.sort();
        let expected = vec![
            (AltTestKey(54), TestVal(6666)),
            (AltTestKey(55), TestVal(7777)),
        ];
        assert_eq!(returned_keys, expected);
        let reversed = dbtx
            .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
            .await
            .collect::<Vec<_>>()
            .await;
        let mut reversed_expected = expected;
        reversed_expected.reverse();
        assert_eq!(reversed, reversed_expected);
    }
    pub async fn verify_commit(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
        dbtx.commit_tx().await;
        let mut dbtx2 = db.begin_transaction().await;
        assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
    }
    pub async fn verify_rollback_to_savepoint(db: Database) {
        let mut dbtx_rollback = db.begin_transaction().await;
        dbtx_rollback
            .insert_entry(&TestKey(20), &TestVal(2000))
            .await;
        dbtx_rollback
            .set_tx_savepoint()
            .await
            .expect("Error setting transaction savepoint");
        dbtx_rollback
            .insert_entry(&TestKey(21), &TestVal(2001))
            .await;
        assert_eq!(
            dbtx_rollback.get_value(&TestKey(20)).await,
            Some(TestVal(2000))
        );
        assert_eq!(
            dbtx_rollback.get_value(&TestKey(21)).await,
            Some(TestVal(2001))
        );
        dbtx_rollback
            .rollback_tx_to_savepoint()
            .await
            .expect("Error setting transaction savepoint");
        assert_eq!(
            dbtx_rollback.get_value(&TestKey(20)).await,
            Some(TestVal(2000))
        );
        assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
        dbtx_rollback.commit_tx().await;
    }
    pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
        let mut dbtx2 = db.begin_transaction().await;
        dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
        dbtx2.commit_tx().await;
        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
        let expected_keys = 0;
        let returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                if let TestKey(100) = key {
                    assert!(value.eq(&TestVal(101)));
                }
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
    }
    pub async fn verify_phantom_entry(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
        dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        let expected_keys = 2;
        let returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                match key {
                    TestKey(100) => {
                        assert!(value.eq(&TestVal(101)));
                    }
                    TestKey(101) => {
                        assert!(value.eq(&TestVal(102)));
                    }
                    _ => {}
                };
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
        let mut dbtx2 = db.begin_transaction().await;
        dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
        dbtx2.commit_tx().await;
        let returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                match key {
                    TestKey(100) => {
                        assert!(value.eq(&TestVal(101)));
                    }
                    TestKey(101) => {
                        assert!(value.eq(&TestVal(102)));
                    }
                    _ => {}
                };
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
    }
    pub async fn expect_write_conflict(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
        dbtx.commit_tx().await;
        let mut dbtx2 = db.begin_transaction().await;
        let mut dbtx3 = db.begin_transaction().await;
        dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
        dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
        dbtx2.commit_tx().await;
        dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
    }
    pub async fn verify_string_prefix(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
        assert_eq!(
            dbtx.get_value(&PercentTestKey(100)).await,
            Some(TestVal(101))
        );
        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
        dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
        dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
        let expected_keys = 4;
        let returned_keys = dbtx
            .find_by_prefix(&PercentPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                if let PercentTestKey(101) = key {
                    assert!(value.eq(&TestVal(100)));
                }
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
    }
    pub async fn verify_remove_by_prefix(db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
        dbtx.commit_tx().await;
        let mut remove_dbtx = db.begin_transaction().await;
        remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
        remove_dbtx.commit_tx().await;
        let mut dbtx = db.begin_transaction().await;
        let expected_keys = 0;
        let returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                match key {
                    TestKey(100) => {
                        assert!(value.eq(&TestVal(101)));
                    }
                    TestKey(101) => {
                        assert!(value.eq(&TestVal(102)));
                    }
                    _ => {}
                };
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
    }
    pub async fn verify_module_db(db: Database, module_db: Database) {
        let mut dbtx = db.begin_transaction().await;
        dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
        dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
        dbtx.commit_tx().await;
        let mut module_dbtx = module_db.begin_transaction().await;
        assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
        assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
        let mut dbtx = db.begin_transaction().await;
        assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
        assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
        let mut module_dbtx = module_db.begin_transaction().await;
        module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
        module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
        module_dbtx.commit_tx().await;
        let expected_keys = 2;
        let mut dbtx = db.begin_transaction().await;
        let returned_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                match key {
                    TestKey(100) => {
                        assert!(value.eq(&TestVal(101)));
                    }
                    TestKey(101) => {
                        assert!(value.eq(&TestVal(102)));
                    }
                    _ => {}
                };
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
        let removed = dbtx.remove_entry(&TestKey(100)).await;
        assert_eq!(removed, Some(TestVal(101)));
        assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
        let mut module_dbtx = module_db.begin_transaction().await;
        assert_eq!(
            module_dbtx.get_value(&TestKey(100)).await,
            Some(TestVal(103))
        );
    }
    pub async fn verify_module_prefix(db: Database) {
        let mut test_dbtx = db.begin_transaction().await;
        {
            let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX);
            test_module_dbtx
                .insert_entry(&TestKey(100), &TestVal(101))
                .await;
            test_module_dbtx
                .insert_entry(&TestKey(101), &TestVal(102))
                .await;
        }
        test_dbtx.commit_tx().await;
        let mut alt_dbtx = db.begin_transaction().await;
        {
            let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX);
            alt_module_dbtx
                .insert_entry(&TestKey(100), &TestVal(103))
                .await;
            alt_module_dbtx
                .insert_entry(&TestKey(101), &TestVal(104))
                .await;
        }
        alt_dbtx.commit_tx().await;
        let mut test_dbtx = db.begin_transaction().await;
        let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX);
        assert_eq!(
            test_module_dbtx.get_value(&TestKey(100)).await,
            Some(TestVal(101))
        );
        assert_eq!(
            test_module_dbtx.get_value(&TestKey(101)).await,
            Some(TestVal(102))
        );
        let expected_keys = 2;
        let returned_keys = test_module_dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .fold(0, |returned_keys, (key, value)| async move {
                match key {
                    TestKey(100) => {
                        assert!(value.eq(&TestVal(101)));
                    }
                    TestKey(101) => {
                        assert!(value.eq(&TestVal(102)));
                    }
                    _ => {}
                };
                returned_keys + 1
            })
            .await;
        assert_eq!(returned_keys, expected_keys);
        let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
        assert_eq!(removed, Some(TestVal(101)));
        assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
        let mut test_dbtx = db.begin_transaction().await;
        assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
        test_dbtx.commit_tx().await;
    }
    #[cfg(test)]
    #[tokio::test]
    pub async fn verify_test_migration() {
        let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
        let expected_test_keys_size: usize = 100;
        let mut dbtx = db.begin_transaction().await;
        for i in 0..expected_test_keys_size {
            dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
                .await;
        }
        dbtx.insert_new_entry(&DatabaseVersionKey, &DatabaseVersion(0))
            .await;
        dbtx.commit_tx().await;
        let mut migrations = MigrationMap::new();
        migrations.insert(DatabaseVersion(0), move |dbtx| {
            migrate_test_db_version_0(dbtx).boxed()
        });
        apply_migrations(
            &db,
            "TestModule".to_string(),
            DatabaseVersion(1),
            migrations,
        )
        .await
        .expect("Error applying migrations for TestModule");
        let mut dbtx = db.begin_transaction().await;
        let test_keys = dbtx
            .find_by_prefix(&DbPrefixTestPrefix)
            .await
            .collect::<Vec<_>>()
            .await;
        let test_keys_size = test_keys.len();
        assert_eq!(test_keys_size, expected_test_keys_size);
        for (key, val) in test_keys {
            assert_eq!(key.0, val.0 + 1);
        }
    }
    #[allow(dead_code)]
    async fn migrate_test_db_version_0<'a, 'b>(
        dbtx: &'b mut DatabaseTransaction<'a>,
    ) -> Result<(), anyhow::Error> {
        let example_keys_v0 = dbtx
            .find_by_prefix(&DbPrefixTestPrefixV0)
            .await
            .collect::<Vec<_>>()
            .await;
        dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
        for (key, val) in example_keys_v0 {
            let key_v2 = TestKey(key.1);
            dbtx.insert_new_entry(&key_v2, &val).await;
        }
        Ok(())
    }
    #[cfg(test)]
    #[tokio::test]
    async fn test_autocommit() {
        use std::marker::PhantomData;
        use anyhow::anyhow;
        use async_trait::async_trait;
        use crate::db::{
            AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
            IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
            IRawDatabaseTransaction,
        };
        use crate::ModuleDecoderRegistry;
        #[derive(Debug)]
        struct FakeDatabase;
        #[async_trait]
        impl IRawDatabase for FakeDatabase {
            type Transaction<'a> = FakeTransaction<'a>;
            async fn begin_transaction(&self) -> FakeTransaction {
                FakeTransaction(PhantomData)
            }
        }
        #[derive(Debug)]
        struct FakeTransaction<'a>(PhantomData<&'a ()>);
        #[async_trait]
        impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
            async fn raw_insert_bytes(
                &mut self,
                _key: &[u8],
                _value: &[u8],
            ) -> anyhow::Result<Option<Vec<u8>>> {
                unimplemented!()
            }
            async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
                unimplemented!()
            }
            async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
                unimplemented!()
            }
            async fn raw_find_by_prefix(
                &mut self,
                _key_prefix: &[u8],
            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
                unimplemented!()
            }
            async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
                unimplemented!()
            }
            async fn raw_find_by_prefix_sorted_descending(
                &mut self,
                _key_prefix: &[u8],
            ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
                unimplemented!()
            }
        }
        #[async_trait]
        impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
            async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
                unimplemented!()
            }
            async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
                unimplemented!()
            }
        }
        #[async_trait]
        impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
            async fn commit_tx(self) -> anyhow::Result<()> {
                Err(anyhow!("Can't commit!"))
            }
        }
        let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
        let err = db
            .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
            .await
            .unwrap_err();
        match err {
            AutocommitError::CommitFailed {
                attempts: failed_attempts,
                ..
            } => {
                assert_eq!(failed_attempts, 5)
            }
            AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
        }
    }
}
pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
    tx: &'r mut (dyn IDatabaseTransaction + 'inner),
    decoders: ModuleDecoderRegistry,
    key_prefix: &KP,
) -> impl Stream<
    Item = (
        KP::Record,
        <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
    ),
> + 'r
where
    'inner: 'r,
    KP: DatabaseLookup,
    KP::Record: DatabaseKey,
{
    debug!("find by prefix sorted descending");
    let prefix_bytes = key_prefix.to_bytes();
    tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
        .await
        .expect("Error doing prefix search in database")
        .map(move |(key_bytes, value_bytes)| {
            let key = KP::Record::from_bytes(&key_bytes, &decoders)
                .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                .expect("Unrecoverable error reading DatabaseKey");
            let value = decode_value(&value_bytes, &decoders)
                .with_context(|| anyhow::anyhow!("key: {}", AbbreviateHexBytes(&key_bytes)))
                .expect("Unrecoverable decoding DatabaseValue");
            (key, value)
        })
}
#[cfg(test)]
mod tests {
    use tokio::sync::oneshot;
    use super::mem_impl::MemDatabase;
    use super::*;
    use crate::task::spawn;
    async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
        let db = db.clone();
        let (tx, rx) = oneshot::channel::<()>();
        let join_handle = spawn("wait key exists", async move {
            let sub = db.wait_key_exists(&key);
            tx.send(()).unwrap();
            sub.await
        })
        .expect("some handle on non-wasm");
        rx.await.unwrap();
        join_handle
    }
    #[tokio::test]
    async fn test_wait_key_before_transaction() {
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let key_task = waiter(&db, TestKey(1)).await;
        let mut tx = db.begin_transaction().await;
        tx.insert_new_entry(&key, &val).await;
        tx.commit_tx().await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_before_insert() {
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let mut tx = db.begin_transaction().await;
        let key_task = waiter(&db, TestKey(1)).await;
        tx.insert_new_entry(&key, &val).await;
        tx.commit_tx().await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_after_insert() {
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let mut tx = db.begin_transaction().await;
        tx.insert_new_entry(&key, &val).await;
        let key_task = waiter(&db, TestKey(1)).await;
        tx.commit_tx().await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_after_commit() {
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let mut tx = db.begin_transaction().await;
        tx.insert_new_entry(&key, &val).await;
        tx.commit_tx().await;
        let key_task = waiter(&db, TestKey(1)).await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_isolated_db() {
        let module_instance_id = 10;
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let db = db.with_prefix_module_id(module_instance_id);
        let key_task = waiter(&db, TestKey(1)).await;
        let mut tx = db.begin_transaction().await;
        tx.insert_new_entry(&key, &val).await;
        tx.commit_tx().await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_isolated_tx() {
        let module_instance_id = 10;
        let key = TestKey(1);
        let val = TestVal(2);
        let db = MemDatabase::new().into_database();
        let key_task = waiter(&db.with_prefix_module_id(module_instance_id), TestKey(1)).await;
        let mut tx = db.begin_transaction().await;
        let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id);
        tx_mod.insert_new_entry(&key, &val).await;
        drop(tx_mod);
        tx.commit_tx().await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            Some(TestVal(2)),
            "should notify"
        );
    }
    #[tokio::test]
    async fn test_wait_key_no_transaction() {
        let db = MemDatabase::new().into_database();
        let key_task = waiter(&db, TestKey(1)).await;
        assert_eq!(
            future_returns_shortly(async { key_task.await.unwrap() }).await,
            None,
            "should not notify"
        );
    }
}