signet-storage 0.7.2

Unified storage interface for Signet hot and cold storage
Documentation
//! Unified storage combining hot and cold backends.
//!
//! The [`UnifiedStorage`] struct provides a single interface for writing
//! execution data to both hot storage (for fast state access) and cold storage
//! (for historical archival).

use crate::StorageResult;
use alloy::primitives::BlockNumber;
use signet_cold::{
    BlockData, ColdReceipt, ColdStorage, ColdStorageError, ColdStorageHandle,
    ColdStorageReadHandle, ColdStorageTask,
};
use signet_hot::{
    HistoryRead, HistoryWrite, HotKv,
    model::{HotKvReadError, HotKvWrite, RevmRead},
};
use signet_storage_types::{ExecutedBlock, SealedHeader};
use tokio_util::sync::CancellationToken;

/// Block data drained during a reorg unwind.
///
/// Contains the header (always available from hot storage) and receipts
/// (best-effort from cold storage — empty if cold storage lags behind hot).
#[derive(Debug, Clone)]
pub struct DrainedBlock {
    /// The sealed header of the removed block.
    pub header: SealedHeader,
    /// Receipts from cold storage. Empty if cold hasn't indexed this block yet.
    pub receipts: Vec<ColdReceipt>,
}

/// Unified storage combining hot and cold backends.
///
/// This struct provides a single interface for writing execution data to both
/// hot storage (for fast state access) and cold storage (for historical archival).
///
/// # Write Semantics
///
/// - Hot storage writes are synchronous and use database transactions
/// - Cold storage writes are dispatched synchronously via the handle (non-blocking)
/// - On `append_blocks`, hot storage is written first, then cold storage is notified
///
/// # Error Handling
///
/// Both hot storage and cold storage errors are returned. Cold storage dispatch
/// fails immediately if the channel is full (non-blocking).
///
/// # Backpressure and Failure Recovery
///
/// Cold storage dispatch uses non-blocking sends. Two distinct errors indicate
/// dispatch failure:
///
/// - [`ColdStorageError::Backpressure`]: Channel full. The task is alive but
///   cannot keep up. Transient; may resolve on its own or with retry.
/// - [`ColdStorageError::TaskTerminated`]: Channel closed. The task has stopped
///   and must be restarted. Persistent until recovery.
///
/// **Important**: Hot storage is always authoritative. When cold dispatch fails:
///
/// 1. Hot storage already contains the committed data
/// 2. Cold storage may be behind (backpressure) or unavailable (task failure)
/// 3. Use [`cold_lag`](Self::cold_lag) to detect gaps between hot and cold
/// 4. Use [`replay_to_cold`](Self::replay_to_cold) to recover cold storage
///
/// Callers should decide based on their requirements and the error type:
///
/// - **Backpressure**: Log and continue, or retry with backoff. The task will
///   catch up eventually. Use `cold_lag()` to monitor.
/// - **TaskTerminated**: The task must be restarted. Halt or switch to degraded
///   mode until recovery, then replay missing blocks.
///
/// [`ColdStorageError::Backpressure`]: signet_cold::ColdStorageError::Backpressure
/// [`ColdStorageError::TaskTerminated`]: signet_cold::ColdStorageError::TaskTerminated
///
/// # Example
///
/// ```ignore
/// use signet_storage::UnifiedStorage;
///
/// let storage = UnifiedStorage::new(hot_db, cold_handle);
///
/// // Append executed blocks (takes ownership)
/// storage.append_blocks(blocks)?;
///
/// // Handle reorgs
/// storage.unwind_above(reorg_block)?;
/// ```
#[derive(Debug)]
pub struct UnifiedStorage<H: HotKv> {
    hot: H,
    cold: ColdStorageHandle,
}

impl<H: HotKv> UnifiedStorage<H> {
    /// Create a new unified storage instance.
    pub const fn new(hot: H, cold: ColdStorageHandle) -> Self {
        Self { hot, cold }
    }

    /// Spawn a unified storage instance from hot and cold backends.
    ///
    /// This spawns the [`ColdStorageTask`] internally and returns a
    /// fully-assembled [`UnifiedStorage`]. The cold storage task runs
    /// until the cancellation token is triggered or all handles are
    /// dropped.
    ///
    /// Use [`new`](Self::new) instead if you need manual control over
    /// the cold storage task lifecycle or need to share the
    /// [`ColdStorageHandle`] before constructing unified storage.
    pub fn spawn<B: ColdStorage>(hot: H, cold_backend: B, cancel_token: CancellationToken) -> Self {
        let cold = ColdStorageTask::spawn(cold_backend, cancel_token);
        Self::new(hot, cold)
    }

    /// Get a reference to the hot storage backend.
    pub const fn hot(&self) -> &H {
        &self.hot
    }

    /// Consume self and return the hot storage backend.
    pub fn into_hot(self) -> H {
        self.hot
    }

    /// Get a reference to the cold storage handle.
    pub const fn cold(&self) -> &ColdStorageHandle {
        &self.cold
    }

    /// Get a read-only cold storage handle.
    ///
    /// The returned handle can only perform read operations and cannot modify
    /// storage. Use this for components that only need to query historical data.
    pub fn cold_reader(&self) -> ColdStorageReadHandle {
        self.cold.reader()
    }

    /// Create a read-only transaction for hot storage.
    ///
    /// # Errors
    ///
    /// Returns an error if the transaction cannot be created.
    pub fn reader(&self) -> StorageResult<H::RoTx> {
        self.hot.reader().map_err(Into::into)
    }

    /// Create a revm-compatible read-only database adapter.
    ///
    /// The returned [`RevmRead`] implements revm's `Database` and `DatabaseRef`
    /// traits, allowing it to be used directly with revm for EVM execution.
    ///
    /// # Errors
    ///
    /// Returns an error if the transaction cannot be created.
    pub fn revm_reader(&self) -> StorageResult<RevmRead<H::RoTx>> {
        self.hot.revm_reader().map_err(Into::into)
    }

    /// Create a revm-compatible read-only database adapter that reads state
    /// at a specific block height.
    ///
    /// The returned [`RevmRead`] uses history and change set tables to
    /// reconstruct state as it was at `height`.
    ///
    /// # Errors
    ///
    /// - [`NoBlocks`] if the database has no blocks.
    /// - [`HeightOutOfRange`] if `height` is outside the
    ///   stored block range.
    /// - [`Inner`] if the transaction cannot be created.
    ///
    /// [`NoBlocks`]: signet_hot::model::HotKvError::NoBlocks
    /// [`HeightOutOfRange`]: signet_hot::model::HotKvError::HeightOutOfRange
    /// [`Inner`]: signet_hot::model::HotKvError::Inner
    pub fn revm_reader_at_height(&self, height: u64) -> StorageResult<RevmRead<H::RoTx>> {
        self.hot.revm_reader_at_height(height).map_err(Into::into)
    }

    /// Append executed blocks to both hot and cold storage.
    ///
    /// This method:
    /// 1. Writes to hot storage synchronously (validates chain extension, updates state)
    /// 2. Dispatches writes to cold storage synchronously (non-blocking)
    ///
    /// # Errors
    ///
    /// - [`Hot`]: Hot storage write failed. No data was written.
    /// - [`Cold`]: Hot storage succeeded but cold dispatch failed.
    ///   Check the inner [`ColdStorageError`] variant:
    ///   - [`Backpressure`]: Task alive but channel full. May retry or continue.
    ///   - [`TaskTerminated`]: Task stopped. Requires restart.
    ///
    /// In both cold error cases, data is safely in hot storage and can be
    /// recovered later via [`replay_to_cold`](Self::replay_to_cold).
    ///
    /// [`Hot`]: crate::StorageError::Hot
    /// [`Cold`]: crate::StorageError::Cold
    /// [`Backpressure`]: signet_cold::ColdStorageError::Backpressure
    /// [`TaskTerminated`]: signet_cold::ColdStorageError::TaskTerminated
    pub fn append_blocks(&self, blocks: Vec<ExecutedBlock>) -> StorageResult<()> {
        if blocks.is_empty() {
            return Ok(());
        }

        // 1. Write to hot storage (borrows blocks)
        self.write_hot(&blocks)?;

        // 2. Dispatch to cold storage (consumes blocks)
        self.dispatch_cold(blocks)
    }

    /// Write blocks to hot storage.
    fn write_hot(&self, blocks: &[ExecutedBlock]) -> StorageResult<()> {
        let writer = self.hot.writer()?;

        writer
            .append_blocks(blocks.iter().map(|b| (&b.header, &b.bundle)))
            .map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
        writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;

        Ok(())
    }

    /// Dispatch blocks to cold storage synchronously (non-blocking).
    ///
    /// Consumes the blocks to avoid cloning.
    fn dispatch_cold(&self, blocks: Vec<ExecutedBlock>) -> StorageResult<()> {
        let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
        self.cold.dispatch_append_blocks(cold_data).map_err(Into::into)
    }

    /// Read and remove all blocks above the given block number.
    ///
    /// This combines reading the about-to-be-removed data with unwinding,
    /// returning the drained blocks so callers can emit reorg notifications.
    ///
    /// # Implementation
    ///
    /// 1. Reads headers from hot storage (sync)
    /// 2. Reads receipts from cold storage (async, best-effort)
    /// 3. Unwinds hot storage (sync)
    /// 4. Drains cold storage (async, best-effort)
    ///
    /// # Cold Lag
    ///
    /// If cold storage hasn't processed a block yet, its receipts will be
    /// empty. This is correct: no subscriber has seen those logs, so there
    /// is nothing to "remove" from their perspective.
    ///
    /// # Errors
    ///
    /// - [`Hot`]: Hot storage read or unwind failed.
    /// - [`Cold`]: Hot storage unwound but cold truncate dispatch failed.
    ///
    /// [`Hot`]: crate::StorageError::Hot
    /// [`Cold`]: crate::StorageError::Cold
    pub async fn drain_above(&self, block: BlockNumber) -> StorageResult<Vec<DrainedBlock>> {
        // 1–2. Read headers and unwind hot storage synchronously.
        //      Extracted to a sync helper so the `!Send` write transaction
        //      does not appear in the async state machine.
        let headers = self.unwind_hot_above(block)?;
        if headers.is_empty() {
            return Ok(Vec::new());
        }

        // 3. Atomically drain cold (best-effort — failure = normal cold lag)
        let cold_receipts = self.cold.drain_above(block).await.unwrap_or_default();

        // 4. Assemble drained blocks (zip headers with receipts, default empty).
        // Pad cold_receipts to match headers length so zip consumes both
        // without cloning.
        let drained = headers
            .into_iter()
            .zip(cold_receipts.into_iter().chain(std::iter::repeat_with(Vec::new)))
            .map(|(header, receipts)| DrainedBlock { header, receipts })
            .collect();

        Ok(drained)
    }

    /// Read headers above `block` and unwind hot storage in a single write
    /// transaction to avoid TOCTOU races. Returns an empty vec if there is
    /// nothing to unwind.
    fn unwind_hot_above(&self, block: BlockNumber) -> StorageResult<Vec<SealedHeader>> {
        let writer = self.hot.writer()?;
        let last = match writer.get_execution_range().map_err(|e| e.into_hot_kv_error())? {
            Some((_, last)) if last > block => last,
            _ => return Ok(Vec::new()),
        };
        let headers =
            writer.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?;
        writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
        writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
        Ok(headers)
    }

    /// Unwind storage above the given block number (reorg handling).
    ///
    /// This method:
    /// 1. Unwinds hot storage synchronously (restores previous state)
    /// 2. Truncates cold storage synchronously (non-blocking dispatch)
    ///
    /// # Errors
    ///
    /// - [`Hot`]: Hot storage unwind failed. State is unchanged.
    /// - [`Cold`]: Hot storage unwound but cold truncate dispatch
    ///   failed. Check the inner [`ColdStorageError`] variant:
    ///   - [`Backpressure`]: Task alive but channel full.
    ///   - [`TaskTerminated`]: Task stopped.
    ///
    /// Cold storage may temporarily contain stale blocks until the truncate is
    /// replayed. This is safe: hot storage is authoritative.
    ///
    /// [`Hot`]: crate::StorageError::Hot
    /// [`Cold`]: crate::StorageError::Cold
    /// [`Backpressure`]: signet_cold::ColdStorageError::Backpressure
    /// [`TaskTerminated`]: signet_cold::ColdStorageError::TaskTerminated
    pub fn unwind_above(&self, block: BlockNumber) -> StorageResult<()> {
        // 1. Unwind hot storage synchronously
        let writer = self.hot.writer()?;

        writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
        writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;

        // 2. Truncate cold storage synchronously (non-blocking dispatch)
        self.cold.dispatch_truncate_above(block).map_err(Into::into)
    }

    /// Check how far behind cold storage is compared to hot storage.
    ///
    /// Returns `Some(first_missing_block)` if cold is behind, `None` if synced.
    ///
    /// # Errors
    ///
    /// Returns an error if either storage cannot be queried.
    pub async fn cold_lag(&self) -> StorageResult<Option<BlockNumber>> {
        let reader = self.reader()?;
        let hot_tip = reader.get_chain_tip().map_err(|e| e.into_hot_kv_error())?;

        let cold_tip = self.cold.get_latest_block().await?;

        match (hot_tip, cold_tip) {
            (Some((hot_num, _)), Some(cold_num)) if cold_num < hot_num => Ok(Some(cold_num + 1)),
            (Some((_, _)), None) => Ok(Some(0)),
            _ => Ok(None),
        }
    }

    /// Replay blocks to cold storage from an external source.
    ///
    /// Use this to recover cold storage after failures. The caller is
    /// responsible for fetching the missing block data.
    ///
    /// Consumes the blocks to avoid cloning.
    ///
    /// # Errors
    ///
    /// Returns an error if cold storage write fails.
    pub async fn replay_to_cold(&self, blocks: Vec<ExecutedBlock>) -> Result<(), ColdStorageError> {
        let cold_data: Vec<_> = blocks.into_iter().map(BlockData::from).collect();
        self.cold.append_blocks(cold_data).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use signet_hot_mdbx::DatabaseEnv;

    /// Compile-time canaries: all async methods on `UnifiedStorage<DatabaseEnv>`
    /// must return `Send` futures, even though MDBX write transactions are
    /// `!Send`. If a `!Send` type leaks into the async state machine, these
    /// will fail to compile.
    fn _assert_send<T: Send>(_: T) {}
    fn _drain_above_is_send(s: &UnifiedStorage<DatabaseEnv>) {
        _assert_send(s.drain_above(0));
    }
    fn _cold_lag_is_send(s: &UnifiedStorage<DatabaseEnv>) {
        _assert_send(s.cold_lag());
    }
    fn _replay_to_cold_is_send(s: &UnifiedStorage<DatabaseEnv>) {
        _assert_send(s.replay_to_cold(Vec::new()));
    }
}