use super::cache::ColdCache;
use crate::{
ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle,
ColdStorageRead, ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier,
TransactionSpecifier,
};
use signet_storage_types::{RecoveredTx, SealedHeader};
use std::{sync::Arc, time::Duration};
use tokio::sync::{Mutex, Semaphore, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{debug, instrument};
const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
const READ_CHANNEL_SIZE: usize = 256;
const WRITE_CHANNEL_SIZE: usize = 256;
const MAX_CONCURRENT_READERS: usize = 64;
const MAX_CONCURRENT_STREAMS: usize = 8;
const STREAM_CHANNEL_BUFFER: usize = 256;
struct ColdStorageTaskInner<B> {
read_backend: B,
cache: Mutex<ColdCache>,
max_stream_deadline: Duration,
stream_semaphore: Arc<Semaphore>,
stream_tracker: TaskTracker,
}
impl<B: ColdStorageRead> ColdStorageTaskInner<B> {
async fn fetch_and_cache_header(
&self,
spec: HeaderSpecifier,
) -> ColdResult<Option<SealedHeader>> {
let r = self.read_backend.get_header(spec).await;
if let Ok(Some(ref h)) = r {
self.cache.lock().await.put_header(h.number, h.clone());
}
r
}
async fn fetch_and_cache_tx(
&self,
spec: TransactionSpecifier,
) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
let r = self.read_backend.get_transaction(spec).await;
if let Ok(Some(ref c)) = r {
let meta = c.meta();
self.cache
.lock()
.await
.put_tx((meta.block_number(), meta.transaction_index()), c.clone());
}
r
}
async fn fetch_and_cache_receipt(
&self,
spec: ReceiptSpecifier,
) -> ColdResult<Option<ColdReceipt>> {
let r = self.read_backend.get_receipt(spec).await;
if let Ok(Some(ref c)) = r {
self.cache.lock().await.put_receipt((c.block_number, c.transaction_index), c.clone());
}
r
}
async fn handle_read(self: &Arc<Self>, req: ColdReadRequest) {
match req {
ColdReadRequest::GetHeader { spec, resp } => {
let result = if let HeaderSpecifier::Number(n) = &spec {
if let Some(hit) = self.cache.lock().await.get_header(n) {
Ok(Some(hit))
} else {
self.fetch_and_cache_header(spec).await
}
} else {
self.fetch_and_cache_header(spec).await
};
let _ = resp.send(result);
}
ColdReadRequest::GetHeaders { specs, resp } => {
let _ = resp.send(self.read_backend.get_headers(specs).await);
}
ColdReadRequest::GetTransaction { spec, resp } => {
let result = if let TransactionSpecifier::BlockAndIndex { block, index } = &spec {
if let Some(hit) = self.cache.lock().await.get_tx(&(*block, *index)) {
Ok(Some(hit))
} else {
self.fetch_and_cache_tx(spec).await
}
} else {
self.fetch_and_cache_tx(spec).await
};
let _ = resp.send(result);
}
ColdReadRequest::GetTransactionsInBlock { block, resp } => {
let _ = resp.send(self.read_backend.get_transactions_in_block(block).await);
}
ColdReadRequest::GetTransactionCount { block, resp } => {
let _ = resp.send(self.read_backend.get_transaction_count(block).await);
}
ColdReadRequest::GetReceipt { spec, resp } => {
let result = if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec {
if let Some(hit) = self.cache.lock().await.get_receipt(&(*block, *index)) {
Ok(Some(hit))
} else {
self.fetch_and_cache_receipt(spec).await
}
} else {
self.fetch_and_cache_receipt(spec).await
};
let _ = resp.send(result);
}
ColdReadRequest::GetReceiptsInBlock { block, resp } => {
let _ = resp.send(self.read_backend.get_receipts_in_block(block).await);
}
ColdReadRequest::GetSignetEvents { spec, resp } => {
let _ = resp.send(self.read_backend.get_signet_events(spec).await);
}
ColdReadRequest::GetZenithHeader { spec, resp } => {
let _ = resp.send(self.read_backend.get_zenith_header(spec).await);
}
ColdReadRequest::GetZenithHeaders { spec, resp } => {
let _ = resp.send(self.read_backend.get_zenith_headers(spec).await);
}
ColdReadRequest::GetLogs { filter, max_logs, resp } => {
let _ = resp.send(self.read_backend.get_logs(&filter, max_logs).await);
}
ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => {
let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await);
}
ColdReadRequest::GetLatestBlock { resp } => {
let _ = resp.send(self.read_backend.get_latest_block().await);
}
}
}
async fn handle_stream_logs(
self: &Arc<Self>,
filter: crate::Filter,
max_logs: usize,
deadline: Duration,
) -> ColdResult<LogStream> {
let permit = self
.stream_semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| ColdStorageError::Cancelled)?;
let from = filter.get_from_block().unwrap_or(0);
let to = match filter.get_to_block() {
Some(to) => to,
None => {
let Some(latest) = self.read_backend.get_latest_block().await? else {
let (_tx, rx) = mpsc::channel(1);
return Ok(ReceiverStream::new(rx));
};
latest
}
};
let effective = deadline.min(self.max_stream_deadline);
let deadline_instant = tokio::time::Instant::now() + effective;
let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
let stream_backend = self.read_backend.clone();
self.stream_tracker.spawn(async move {
let _permit = permit;
let params =
crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
stream_backend.produce_log_stream(&filter, params).await;
});
Ok(ReceiverStream::new(rx))
}
}
pub struct ColdStorageTask<B: ColdStorage> {
inner: Arc<ColdStorageTaskInner<B>>,
write_backend: B,
read_receiver: mpsc::Receiver<ColdReadRequest>,
write_receiver: mpsc::Receiver<ColdWriteRequest>,
cancel_token: CancellationToken,
task_tracker: TaskTracker,
}
impl<B: ColdStorage> std::fmt::Debug for ColdStorageTask<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ColdStorageTask").finish_non_exhaustive()
}
}
impl<B: ColdStorage> ColdStorageTask<B> {
pub fn new(backend: B, cancel_token: CancellationToken) -> (Self, ColdStorageHandle) {
let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE);
let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE);
let read_backend = backend.clone();
let task = Self {
inner: Arc::new(ColdStorageTaskInner {
read_backend,
cache: Mutex::new(ColdCache::new()),
max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
stream_tracker: TaskTracker::new(),
}),
write_backend: backend,
read_receiver,
write_receiver,
cancel_token,
task_tracker: TaskTracker::new(),
};
let handle = ColdStorageHandle::new(read_sender, write_sender);
(task, handle)
}
pub fn spawn(backend: B, cancel_token: CancellationToken) -> ColdStorageHandle {
let (task, handle) = Self::new(backend, cancel_token);
tokio::spawn(task.run());
handle
}
async fn handle_write(&mut self, req: ColdWriteRequest) {
match req {
ColdWriteRequest::AppendBlock(boxed) => {
let result = self.write_backend.append_block(boxed.data).await;
let _ = boxed.resp.send(result);
}
ColdWriteRequest::AppendBlocks { data, resp } => {
let result = self.write_backend.append_blocks(data).await;
let _ = resp.send(result);
}
ColdWriteRequest::TruncateAbove { block, resp } => {
let result = self.write_backend.truncate_above(block).await;
if result.is_ok() {
self.inner.cache.lock().await.invalidate_above(block);
}
let _ = resp.send(result);
}
ColdWriteRequest::DrainAbove { block, resp } => {
let result = self.write_backend.drain_above(block).await;
if result.is_ok() {
self.inner.cache.lock().await.invalidate_above(block);
}
let _ = resp.send(result);
}
}
}
#[instrument(skip(self), name = "cold_storage_task")]
pub async fn run(mut self) {
debug!("Cold storage task started");
loop {
tokio::select! {
biased;
_ = self.cancel_token.cancelled() => {
debug!("Cold storage task received cancellation signal");
break;
}
maybe_write = self.write_receiver.recv() => {
let Some(req) = maybe_write else {
debug!("Cold storage write channel closed");
break;
};
self.task_tracker.close();
self.task_tracker.wait().await;
self.task_tracker.reopen();
self.handle_write(req).await;
}
maybe_read = self.read_receiver.recv() => {
let Some(req) = maybe_read else {
debug!("Cold storage read channel closed");
break;
};
while self.task_tracker.len() >= MAX_CONCURRENT_READERS {
tokio::select! {
_ = self.cancel_token.cancelled() => {
debug!("Cancellation while waiting for read task slot");
break;
}
_ = self.task_tracker.wait() => {}
}
}
let inner = Arc::clone(&self.inner);
self.task_tracker.spawn(async move {
inner.handle_read(req).await;
});
}
}
}
debug!("Waiting for in-progress read handlers to complete");
self.task_tracker.close();
self.task_tracker.wait().await;
debug!("Waiting for in-progress stream producers to complete");
self.inner.stream_tracker.close();
self.inner.stream_tracker.wait().await;
debug!("Cold storage task shut down gracefully");
}
}