use crate::StorageResult;
use alloy::primitives::BlockNumber;
use signet_cold::{
BlockData, ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageReadHandle,
ColdStorageTask,
};
use signet_hot::{
HistoryRead, HistoryWrite, HotKv,
model::{HotKvReadError, HotKvWrite, RevmRead},
};
use signet_storage_types::ExecutedBlock;
use tokio_util::sync::CancellationToken;
#[derive(Debug)]
pub struct UnifiedStorage<H: HotKv> {
hot: H,
cold: ColdStorageHandle,
}
impl<H: HotKv> UnifiedStorage<H> {
pub const fn new(hot: H, cold: ColdStorageHandle) -> Self {
Self { hot, cold }
}
pub fn spawn<B: ColdStorage>(hot: H, cold_backend: B, cancel_token: CancellationToken) -> Self {
let cold = ColdStorageTask::spawn(cold_backend, cancel_token);
Self::new(hot, cold)
}
pub const fn hot(&self) -> &H {
&self.hot
}
pub const fn cold(&self) -> &ColdStorageHandle {
&self.cold
}
pub fn cold_reader(&self) -> ColdStorageReadHandle {
self.cold.reader()
}
pub fn reader(&self) -> StorageResult<H::RoTx> {
self.hot.reader().map_err(Into::into)
}
pub fn revm_reader(&self) -> StorageResult<RevmRead<H::RoTx>> {
self.hot.revm_reader().map_err(Into::into)
}
pub fn revm_reader_at_height(&self, height: u64) -> StorageResult<RevmRead<H::RoTx>> {
self.hot.revm_reader_at_height(height).map_err(Into::into)
}
pub fn append_blocks(&self, blocks: Vec<ExecutedBlock>) -> StorageResult<()> {
if blocks.is_empty() {
return Ok(());
}
self.write_hot(&blocks)?;
self.dispatch_cold(blocks)
}
fn write_hot(&self, blocks: &[ExecutedBlock]) -> StorageResult<()> {
let writer = self.hot.writer()?;
writer
.append_blocks(blocks.iter().map(|b| (&b.header, &b.bundle)))
.map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
Ok(())
}
fn dispatch_cold(&self, blocks: Vec<ExecutedBlock>) -> StorageResult<()> {
let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
self.cold.dispatch_append_blocks(cold_data).map_err(Into::into)
}
pub fn unwind_above(&self, block: BlockNumber) -> StorageResult<()> {
let writer = self.hot.writer()?;
writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
self.cold.dispatch_truncate_above(block).map_err(Into::into)
}
pub async fn cold_lag(&self) -> StorageResult<Option<BlockNumber>> {
let reader = self.reader()?;
let hot_tip = reader.get_chain_tip().map_err(|e| e.into_hot_kv_error())?;
let cold_tip = self.cold.get_latest_block().await?;
match (hot_tip, cold_tip) {
(Some((hot_num, _)), Some(cold_num)) if cold_num < hot_num => Ok(Some(cold_num + 1)),
(Some((_, _)), None) => Ok(Some(0)),
_ => Ok(None),
}
}
pub async fn replay_to_cold(&self, blocks: Vec<ExecutedBlock>) -> Result<(), ColdStorageError> {
let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
self.cold.append_blocks(cold_data).await
}
}