use serde::{Deserialize, Serialize};
use static_assertions as sa;
use thiserror::Error;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{
exponential_bucket_interval, register_histogram, register_int_counter,
};
use prometheus::{Histogram, IntCounter};
pub static JOURNAL_FASTPATH_COUNT: LazyLock<IntCounter> = LazyLock::new(|| {
register_int_counter(
"journal_fastpath_count",
"Number of write_batch calls using the fast path",
)
});
pub static JOURNAL_SLOWPATH_COUNT: LazyLock<IntCounter> = LazyLock::new(|| {
register_int_counter(
"journal_slowpath_count",
"Number of write_batch calls requiring journaling",
)
});
pub static JOURNAL_RESOLUTION_FAILURES: LazyLock<IntCounter> = LazyLock::new(|| {
register_int_counter(
"journal_resolution_failures",
"Number of journal resolution failures (potential data inconsistency)",
)
});
pub static JOURNAL_PENDING_ON_LOAD: LazyLock<IntCounter> = LazyLock::new(|| {
register_int_counter(
"journal_pending_on_load",
"Number of pending journals found during chain reload",
)
});
pub static JOURNAL_BATCH_LEN: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram(
"journal_batch_len",
"Number of operations in write_batch calls",
exponential_bucket_interval(1.0, 10000.0),
)
});
}
use crate::{
batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch},
store::{
DirectKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
WithError, WritableKeyValueStore,
},
views::MIN_VIEW_TAG,
};
#[derive(Clone)]
pub struct JournalingKeyValueDatabase<D> {
database: D,
}
#[derive(Clone)]
pub struct JournalingKeyValueStore<S> {
store: S,
has_exclusive_access: bool,
}
#[derive(Error, Debug)]
pub enum JournalingError<E> {
#[error(transparent)]
Inner(#[from] E),
#[error(transparent)]
BcsError(bcs::Error),
#[error("Refusing to use the journal without exclusive database access to the root object.")]
JournalRequiresExclusiveAccess,
#[error("Journal resolution failed: {0}")]
JournalResolutionFailed(JournalingResolutionError<E>),
}
#[derive(Error, Debug)]
pub enum JournalingResolutionError<E> {
#[error(transparent)]
Inner(#[from] E),
#[error(transparent)]
BcsError(bcs::Error),
#[error("The journal block could not be retrieved, it could be missing or corrupted.")]
FailureToRetrieveJournalBlock,
}
impl<E: KeyValueStoreError> From<bcs::Error> for JournalingError<E> {
fn from(error: bcs::Error) -> Self {
JournalingError::BcsError(error)
}
}
impl<E: KeyValueStoreError + 'static> KeyValueStoreError for JournalingError<E> {
const BACKEND: &'static str = "journaling";
fn must_reload_view(&self) -> bool {
matches!(self, JournalingError::JournalResolutionFailed(_))
}
}
impl<E: KeyValueStoreError> From<bcs::Error> for JournalingResolutionError<E> {
fn from(error: bcs::Error) -> Self {
JournalingResolutionError::BcsError(error)
}
}
const JOURNAL_TAG: u8 = 0;
sa::const_assert!(JOURNAL_TAG < MIN_VIEW_TAG);
#[repr(u8)]
enum KeyTag {
Journal = 1,
Entry,
}
fn get_journaling_key(tag: u8, pos: u32) -> Result<Vec<u8>, bcs::Error> {
let mut key = vec![JOURNAL_TAG];
key.extend([tag]);
bcs::serialize_into(&mut key, &pos)?;
Ok(key)
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct JournalHeader {
block_count: u32,
}
impl<S> DeletePrefixExpander for &JournalingKeyValueStore<S>
where
S: DirectKeyValueStore,
{
type Error = S::Error;
async fn expand_delete_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
self.store.find_keys_by_prefix(key_prefix).await
}
}
impl<D> WithError for JournalingKeyValueDatabase<D>
where
D: WithError,
D::Error: 'static,
{
type Error = JournalingError<D::Error>;
}
impl<S> WithError for JournalingKeyValueStore<S>
where
S: WithError,
S::Error: 'static,
{
type Error = JournalingError<S::Error>;
}
impl<S> ReadableKeyValueStore for JournalingKeyValueStore<S>
where
S: ReadableKeyValueStore,
S::Error: 'static,
{
const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE;
fn max_stream_queries(&self) -> usize {
self.store.max_stream_queries()
}
fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
Ok(self.store.root_key()?)
}
async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.store.read_value_bytes(key).await?)
}
async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(self.store.contains_key(key).await?)
}
async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
Ok(self.store.contains_keys(keys).await?)
}
async fn read_multi_values_bytes(
&self,
keys: &[Vec<u8>],
) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
Ok(self.store.read_multi_values_bytes(keys).await?)
}
async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
Ok(self.store.find_keys_by_prefix(key_prefix).await?)
}
async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
Ok(self.store.find_key_values_by_prefix(key_prefix).await?)
}
}
impl<D> KeyValueDatabase for JournalingKeyValueDatabase<D>
where
D: KeyValueDatabase,
D::Error: 'static,
{
type Config = D::Config;
type Store = JournalingKeyValueStore<D::Store>;
fn get_name() -> String {
format!("journaling {}", D::get_name())
}
async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
let database = D::connect(config, namespace).await?;
Ok(Self { database })
}
fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
let store = self.database.open_shared(root_key)?;
Ok(JournalingKeyValueStore {
store,
has_exclusive_access: false,
})
}
fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
let store = self.database.open_exclusive(root_key)?;
Ok(JournalingKeyValueStore {
store,
has_exclusive_access: true,
})
}
async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
Ok(D::list_all(config).await?)
}
async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
Ok(self.database.list_root_keys().await?)
}
async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
Ok(D::delete_all(config).await?)
}
async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
Ok(D::exists(config, namespace).await?)
}
async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
Ok(D::create(config, namespace).await?)
}
async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
Ok(D::delete(config, namespace).await?)
}
}
impl<S> WritableKeyValueStore for JournalingKeyValueStore<S>
where
S: DirectKeyValueStore,
S::Error: 'static,
{
const MAX_VALUE_SIZE: usize = S::MAX_VALUE_SIZE;
async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
let batch = S::Batch::from_batch(self, batch).await?;
#[cfg(with_metrics)]
metrics::JOURNAL_BATCH_LEN.observe(batch.len() as f64);
if Self::is_fastpath_feasible(&batch) {
tracing::trace!(
batch_len = batch.len(),
batch_bytes = batch.num_bytes(),
"write_batch: using fast path"
);
#[cfg(with_metrics)]
metrics::JOURNAL_FASTPATH_COUNT.inc();
Ok(self.store.write_batch(batch).await?)
} else {
tracing::warn!(
batch_len = batch.len(),
batch_bytes = batch.num_bytes(),
max_batch_size = S::MAX_BATCH_SIZE,
max_batch_total_size = S::MAX_BATCH_TOTAL_SIZE,
"write_batch: batch exceeds fast path limits, using journal"
);
#[cfg(with_metrics)]
metrics::JOURNAL_SLOWPATH_COUNT.inc();
if !self.has_exclusive_access {
return Err(JournalingError::JournalRequiresExclusiveAccess);
}
let header = self.write_journal(batch).await?;
tracing::info!(
block_count = header.block_count,
"write_batch: journal written, resolving"
);
match self.coherently_resolve_journal(header).await {
Ok(()) => Ok(()),
Err(e) => {
tracing::error!(
"write_batch: FAILED to resolve journal — \
storage may be in an inconsistent state until \
the journal is cleared on next reload"
);
#[cfg(with_metrics)]
metrics::JOURNAL_RESOLUTION_FAILURES.inc();
Err(JournalingError::JournalResolutionFailed(e))
}
}
}
}
async fn clear_journal(&self) -> Result<(), Self::Error> {
let key = get_journaling_key(KeyTag::Journal as u8, 0)?;
let value = self.read_value::<JournalHeader>(&key).await?;
if let Some(header) = value {
tracing::warn!(
block_count = header.block_count,
"clear_journal: found pending journal, resolving"
);
#[cfg(with_metrics)]
metrics::JOURNAL_PENDING_ON_LOAD.inc();
match self.coherently_resolve_journal(header).await {
Ok(()) => Ok(()),
Err(e) => {
tracing::error!(
"write_batch: FAILED to resolve journal — \
storage may be in an inconsistent state until \
the journal is cleared on next reload"
);
#[cfg(with_metrics)]
metrics::JOURNAL_RESOLUTION_FAILURES.inc();
Err(JournalingError::JournalResolutionFailed(e))
}
}
} else {
Ok(())
}
}
}
impl<S> JournalingKeyValueStore<S>
where
S: DirectKeyValueStore,
S::Error: 'static,
{
async fn coherently_resolve_journal(
&self,
mut header: JournalHeader,
) -> Result<(), JournalingResolutionError<S::Error>> {
let total_blocks = header.block_count;
let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
while header.block_count > 0 {
let block_key = get_journaling_key(KeyTag::Entry as u8, header.block_count - 1)?;
let mut batch = self
.store
.read_value::<S::Batch>(&block_key)
.await?
.ok_or(JournalingResolutionError::FailureToRetrieveJournalBlock)?;
batch.add_delete(block_key);
header.block_count -= 1;
if header.block_count > 0 {
let value = bcs::to_bytes(&header)?;
batch.add_insert(header_key.clone(), value);
} else {
batch.add_delete(header_key.clone());
}
tracing::debug!(
remaining_blocks = header.block_count,
total_blocks,
"resolving journal block"
);
self.store.write_batch(batch).await?;
}
tracing::info!(total_blocks, "journal fully resolved");
Ok(())
}
async fn write_journal(
&self,
batch: S::Batch,
) -> Result<JournalHeader, JournalingError<S::Error>> {
let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
let key_len = header_key.len();
let header_value_len = bcs::serialized_size(&JournalHeader::default())?;
let journal_len_upper_bound = key_len + header_value_len;
let max_transaction_size = S::MAX_BATCH_TOTAL_SIZE;
let max_block_size = std::cmp::min(
S::MAX_VALUE_SIZE,
S::MAX_BATCH_TOTAL_SIZE - key_len - journal_len_upper_bound,
);
let mut iter = batch.into_iter();
let mut block_batch = S::Batch::default();
let mut block_size = 0;
let mut block_count = 0;
let mut transaction_batch = S::Batch::default();
let mut transaction_size = 0;
while iter.write_next_value(&mut block_batch, &mut block_size)? {
let (block_flush, transaction_flush) = {
if iter.is_empty() || transaction_batch.len() == S::MAX_BATCH_SIZE - 1 {
(true, true)
} else {
let next_block_size = iter
.next_batch_size(&block_batch, block_size)?
.expect("iter is not empty");
let next_transaction_size = transaction_size + next_block_size + key_len;
let transaction_flush = next_transaction_size > max_transaction_size;
let block_flush = transaction_flush
|| block_batch.len() == S::MAX_BATCH_SIZE - 2
|| next_block_size > max_block_size;
(block_flush, transaction_flush)
}
};
if block_flush {
block_size += block_batch.overhead_size();
let value = bcs::to_bytes(&block_batch)?;
block_batch = S::Batch::default();
assert_eq!(value.len(), block_size);
let key = get_journaling_key(KeyTag::Entry as u8, block_count)?;
transaction_batch.add_insert(key, value);
block_count += 1;
transaction_size += block_size + key_len;
block_size = 0;
}
if transaction_flush {
let batch = std::mem::take(&mut transaction_batch);
self.store.write_batch(batch).await?;
transaction_size = 0;
}
}
let header = JournalHeader { block_count };
if block_count > 0 {
let value = bcs::to_bytes(&header)?;
let mut batch = S::Batch::default();
batch.add_insert(header_key, value);
self.store.write_batch(batch).await?;
}
Ok(header)
}
fn is_fastpath_feasible(batch: &S::Batch) -> bool {
batch.len() <= S::MAX_BATCH_SIZE && batch.num_bytes() <= S::MAX_BATCH_TOTAL_SIZE
}
}
impl<S> JournalingKeyValueStore<S> {
pub fn new(store: S) -> Self {
Self {
store,
has_exclusive_access: false,
}
}
}