use crate::StorageResult;
use alloy::primitives::BlockNumber;
use signet_cold::{BlockData, ColdReceipt, ColdStorage, ColdStorageBackend, ColdStorageError};
use signet_hot::{
HistoryRead, HistoryWrite, HotKv,
model::{HotKvReadError, HotKvWrite, RevmRead},
};
use signet_storage_types::{ExecutedBlock, SealedHeader};
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct DrainedBlock {
pub header: SealedHeader,
pub receipts: Vec<ColdReceipt>,
}
pub struct UnifiedStorage<H: HotKv, B: ColdStorageBackend = signet_cold::ErasedBackend> {
hot: H,
cold: ColdStorage<B>,
}
impl<H: HotKv, B: ColdStorageBackend> std::fmt::Debug for UnifiedStorage<H, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnifiedStorage").finish_non_exhaustive()
}
}
impl<H: HotKv, B: ColdStorageBackend> UnifiedStorage<H, B> {
pub const fn new(hot: H, cold: ColdStorage<B>) -> Self {
Self { hot, cold }
}
pub fn spawn(hot: H, cold_backend: B, cancel_token: CancellationToken) -> Self {
let cold = ColdStorage::new(cold_backend, cancel_token);
Self::new(hot, cold)
}
}
impl<H: HotKv> UnifiedStorage<H, signet_cold::ErasedBackend> {
pub fn spawn_erased<B: ColdStorageBackend>(
hot: H,
cold_backend: B,
cancel_token: CancellationToken,
) -> Self {
let cold = ColdStorage::new_erased(cold_backend, cancel_token);
Self::new(hot, cold)
}
}
impl<H: HotKv, B: ColdStorageBackend> UnifiedStorage<H, B> {
pub const fn hot(&self) -> &H {
&self.hot
}
pub fn into_hot(self) -> H {
self.hot
}
pub const fn cold(&self) -> &ColdStorage<B> {
&self.cold
}
pub fn cold_reader(&self) -> ColdStorage<B> {
self.cold.clone()
}
pub fn reader(&self) -> StorageResult<H::RoTx> {
self.hot.reader().map_err(Into::into)
}
pub fn revm_reader(&self) -> StorageResult<RevmRead<H::RoTx>> {
self.hot.revm_reader().map_err(Into::into)
}
pub fn revm_reader_at_height(&self, height: u64) -> StorageResult<RevmRead<H::RoTx>> {
self.hot.revm_reader_at_height(height).map_err(Into::into)
}
pub async fn append_blocks(&self, blocks: Vec<ExecutedBlock>) -> StorageResult<()> {
if blocks.is_empty() {
return Ok(());
}
self.write_hot(&blocks)?;
let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
self.cold.append_blocks(cold_data).await.map_err(Into::into)
}
fn write_hot(&self, blocks: &[ExecutedBlock]) -> StorageResult<()> {
let writer = self.hot.writer()?;
writer
.append_blocks(blocks.iter().map(|b| (&b.header, &b.bundle)))
.map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
Ok(())
}
pub async fn drain_above(&self, block: BlockNumber) -> StorageResult<Vec<DrainedBlock>> {
let headers = self.unwind_hot_above(block)?;
if headers.is_empty() {
return Ok(Vec::new());
}
let cold_receipts = self.cold.drain_above(block).await.unwrap_or_default();
let drained = headers
.into_iter()
.zip(cold_receipts.into_iter().chain(std::iter::repeat_with(Vec::new)))
.map(|(header, receipts)| DrainedBlock { header, receipts })
.collect();
Ok(drained)
}
fn unwind_hot_above(&self, block: BlockNumber) -> StorageResult<Vec<SealedHeader>> {
let writer = self.hot.writer()?;
let last = match writer.get_execution_range().map_err(|e| e.into_hot_kv_error())? {
Some((_, last)) if last > block => last,
_ => return Ok(Vec::new()),
};
let headers =
writer.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?;
writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
Ok(headers)
}
pub async fn unwind_above(&self, block: BlockNumber) -> StorageResult<()> {
self.unwind_hot_sync(block)?;
self.cold.truncate_above(block).await.map_err(Into::into)
}
fn unwind_hot_sync(&self, block: BlockNumber) -> StorageResult<()> {
let writer = self.hot.writer()?;
writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
Ok(())
}
pub async fn cold_lag(&self) -> StorageResult<Option<BlockNumber>> {
let reader = self.reader()?;
let hot_tip = reader.get_chain_tip().map_err(|e| e.into_hot_kv_error())?;
let cold_tip = self.cold.get_latest_block().await?;
match (hot_tip, cold_tip) {
(Some((hot_num, _)), Some(cold_num)) if cold_num < hot_num => Ok(Some(cold_num + 1)),
(Some((_, _)), None) => Ok(Some(0)),
_ => Ok(None),
}
}
pub async fn replay_to_cold(&self, blocks: Vec<ExecutedBlock>) -> Result<(), ColdStorageError> {
let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
self.cold.append_blocks(cold_data).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use signet_cold_mdbx::MdbxColdBackend;
use signet_hot_mdbx::DatabaseEnv;
fn _assert_send<T: Send>(_: T) {}
fn _drain_above_is_send(s: &UnifiedStorage<DatabaseEnv, MdbxColdBackend>) {
_assert_send(s.drain_above(0));
}
fn _cold_lag_is_send(s: &UnifiedStorage<DatabaseEnv, MdbxColdBackend>) {
_assert_send(s.cold_lag());
}
fn _replay_to_cold_is_send(s: &UnifiedStorage<DatabaseEnv, MdbxColdBackend>) {
_assert_send(s.replay_to_cold(Vec::new()));
}
fn _append_blocks_is_send(s: &UnifiedStorage<DatabaseEnv, MdbxColdBackend>) {
_assert_send(s.append_blocks(Vec::new()));
}
fn _unwind_above_is_send(s: &UnifiedStorage<DatabaseEnv, MdbxColdBackend>) {
_assert_send(s.unwind_above(0));
}
}