use crate::{
AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError,
ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use alloy::primitives::{B256, BlockNumber};
use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
fn map_dispatch_error<T>(e: mpsc::error::TrySendError<T>) -> ColdStorageError {
match e {
mpsc::error::TrySendError::Full(_) => ColdStorageError::Backpressure,
mpsc::error::TrySendError::Closed(_) => ColdStorageError::TaskTerminated,
}
}
#[derive(Clone, Debug)]
pub struct ColdStorageReadHandle {
sender: mpsc::Sender<ColdReadRequest>,
}
impl ColdStorageReadHandle {
pub(crate) const fn new(sender: mpsc::Sender<ColdReadRequest>) -> Self {
Self { sender }
}
async fn send<T>(
&self,
req: ColdReadRequest,
rx: oneshot::Receiver<ColdResult<T>>,
) -> ColdResult<T> {
self.sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
rx.await.map_err(|_| ColdStorageError::Cancelled)?
}
pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetHeader { spec, resp }, rx).await
}
pub async fn get_header_by_number(
&self,
block: BlockNumber,
) -> ColdResult<Option<SealedHeader>> {
self.get_header(HeaderSpecifier::Number(block)).await
}
pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
self.get_header(HeaderSpecifier::Hash(hash)).await
}
pub async fn get_headers(
&self,
specs: Vec<HeaderSpecifier>,
) -> ColdResult<Vec<Option<SealedHeader>>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetHeaders { specs, resp }, rx).await
}
pub async fn get_transaction(
&self,
spec: TransactionSpecifier,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetTransaction { spec, resp }, rx).await
}
pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.get_transaction(TransactionSpecifier::Hash(hash)).await
}
pub async fn get_tx_by_block_and_index(
&self,
block: BlockNumber,
index: u64,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.get_transaction(TransactionSpecifier::BlockAndIndex { block, index }).await
}
pub async fn get_tx_by_block_hash_and_index(
&self,
block_hash: B256,
index: u64,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash, index }).await
}
pub async fn get_transactions_in_block(
&self,
block: BlockNumber,
) -> ColdResult<Vec<RecoveredTx>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetTransactionsInBlock { block, resp }, rx).await
}
pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetTransactionCount { block, resp }, rx).await
}
pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetReceipt { spec, resp }, rx).await
}
pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
self.get_receipt(ReceiptSpecifier::TxHash(hash)).await
}
pub async fn get_receipt_by_block_and_index(
&self,
block: BlockNumber,
index: u64,
) -> ColdResult<Option<ColdReceipt>> {
self.get_receipt(ReceiptSpecifier::BlockAndIndex { block, index }).await
}
pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetReceiptsInBlock { block, resp }, rx).await
}
pub async fn get_signet_events(
&self,
spec: SignetEventsSpecifier,
) -> ColdResult<Vec<DbSignetEvent>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetSignetEvents { spec, resp }, rx).await
}
pub async fn get_signet_events_in_block(
&self,
block: BlockNumber,
) -> ColdResult<Vec<DbSignetEvent>> {
self.get_signet_events(SignetEventsSpecifier::Block(block)).await
}
pub async fn get_signet_events_in_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ColdResult<Vec<DbSignetEvent>> {
self.get_signet_events(SignetEventsSpecifier::BlockRange { start, end }).await
}
pub async fn get_zenith_header(
&self,
block: BlockNumber,
) -> ColdResult<Option<DbZenithHeader>> {
let (resp, rx) = oneshot::channel();
self.send(
ColdReadRequest::GetZenithHeader { spec: ZenithHeaderSpecifier::Number(block), resp },
rx,
)
.await
}
pub async fn get_zenith_headers(
&self,
spec: ZenithHeaderSpecifier,
) -> ColdResult<Vec<DbZenithHeader>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetZenithHeaders { spec, resp }, rx).await
}
pub async fn get_zenith_headers_in_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ColdResult<Vec<DbZenithHeader>> {
self.get_zenith_headers(ZenithHeaderSpecifier::Range { start, end }).await
}
pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetLogs { filter: Box::new(filter), max_logs, resp }, rx).await
}
pub async fn stream_logs(
&self,
filter: Filter,
max_logs: usize,
deadline: Duration,
) -> ColdResult<LogStream> {
let (resp, rx) = oneshot::channel();
self.send(
ColdReadRequest::StreamLogs { filter: Box::new(filter), max_logs, deadline, resp },
rx,
)
.await
}
pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
let (resp, rx) = oneshot::channel();
self.send(ColdReadRequest::GetLatestBlock { resp }, rx).await
}
}
#[derive(Clone, Debug)]
pub struct ColdStorageHandle {
reader: ColdStorageReadHandle,
write_sender: mpsc::Sender<ColdWriteRequest>,
}
impl ColdStorageHandle {
pub(crate) const fn new(
read_sender: mpsc::Sender<ColdReadRequest>,
write_sender: mpsc::Sender<ColdWriteRequest>,
) -> Self {
Self { reader: ColdStorageReadHandle::new(read_sender), write_sender }
}
pub fn reader(&self) -> ColdStorageReadHandle {
self.reader.clone()
}
async fn send_write<T>(
&self,
req: ColdWriteRequest,
rx: oneshot::Receiver<ColdResult<T>>,
) -> ColdResult<T> {
self.write_sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
rx.await.map_err(|_| ColdStorageError::Cancelled)?
}
pub async fn append_block(&self, data: BlockData) -> ColdResult<()> {
let (resp, rx) = oneshot::channel();
self.send_write(
ColdWriteRequest::AppendBlock(Box::new(AppendBlockRequest { data, resp })),
rx,
)
.await
}
pub async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
let (resp, rx) = oneshot::channel();
self.send_write(ColdWriteRequest::AppendBlocks { data, resp }, rx).await
}
pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
let (resp, rx) = oneshot::channel();
self.send_write(ColdWriteRequest::TruncateAbove { block, resp }, rx).await
}
pub fn dispatch_append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
let (resp, _rx) = oneshot::channel();
self.write_sender
.try_send(ColdWriteRequest::AppendBlocks { data, resp })
.map_err(map_dispatch_error)
}
pub async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
let (resp, rx) = oneshot::channel();
self.send_write(ColdWriteRequest::DrainAbove { block, resp }, rx).await
}
pub fn dispatch_truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
let (resp, _rx) = oneshot::channel();
self.write_sender
.try_send(ColdWriteRequest::TruncateAbove { block, resp })
.map_err(map_dispatch_error)
}
pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
self.reader.get_header(spec).await
}
pub async fn get_header_by_number(
&self,
block: BlockNumber,
) -> ColdResult<Option<SealedHeader>> {
self.reader.get_header_by_number(block).await
}
pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
self.reader.get_header_by_hash(hash).await
}
pub async fn get_headers(
&self,
specs: Vec<HeaderSpecifier>,
) -> ColdResult<Vec<Option<SealedHeader>>> {
self.reader.get_headers(specs).await
}
pub async fn get_transaction(
&self,
spec: TransactionSpecifier,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.reader.get_transaction(spec).await
}
pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.reader.get_tx_by_hash(hash).await
}
pub async fn get_tx_by_block_and_index(
&self,
block: BlockNumber,
index: u64,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.reader.get_tx_by_block_and_index(block, index).await
}
pub async fn get_tx_by_block_hash_and_index(
&self,
block_hash: B256,
index: u64,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
self.reader.get_tx_by_block_hash_and_index(block_hash, index).await
}
pub async fn get_transactions_in_block(
&self,
block: BlockNumber,
) -> ColdResult<Vec<RecoveredTx>> {
self.reader.get_transactions_in_block(block).await
}
pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
self.reader.get_transaction_count(block).await
}
pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
self.reader.get_receipt(spec).await
}
pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
self.reader.get_receipt_by_tx_hash(hash).await
}
pub async fn get_receipt_by_block_and_index(
&self,
block: BlockNumber,
index: u64,
) -> ColdResult<Option<ColdReceipt>> {
self.reader.get_receipt_by_block_and_index(block, index).await
}
pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
self.reader.get_receipts_in_block(block).await
}
pub async fn get_signet_events(
&self,
spec: SignetEventsSpecifier,
) -> ColdResult<Vec<DbSignetEvent>> {
self.reader.get_signet_events(spec).await
}
pub async fn get_signet_events_in_block(
&self,
block: BlockNumber,
) -> ColdResult<Vec<DbSignetEvent>> {
self.reader.get_signet_events_in_block(block).await
}
pub async fn get_signet_events_in_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ColdResult<Vec<DbSignetEvent>> {
self.reader.get_signet_events_in_range(start, end).await
}
pub async fn get_zenith_header(
&self,
block: BlockNumber,
) -> ColdResult<Option<DbZenithHeader>> {
self.reader.get_zenith_header(block).await
}
pub async fn get_zenith_headers(
&self,
spec: ZenithHeaderSpecifier,
) -> ColdResult<Vec<DbZenithHeader>> {
self.reader.get_zenith_headers(spec).await
}
pub async fn get_zenith_headers_in_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ColdResult<Vec<DbZenithHeader>> {
self.reader.get_zenith_headers_in_range(start, end).await
}
pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
self.reader.get_logs(filter, max_logs).await
}
pub async fn stream_logs(
&self,
filter: Filter,
max_logs: usize,
deadline: Duration,
) -> ColdResult<LogStream> {
self.reader.stream_logs(filter, max_logs, deadline).await
}
pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
self.reader.get_latest_block().await
}
}