1use crate::{
17 BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, Confirmed,
18 ErasedBackend, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
19 SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier,
20 cache::ColdCache, metrics,
21};
22use alloy::primitives::{B256, BlockNumber};
23use parking_lot::Mutex;
24use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
25use std::{
26 sync::{Arc, Weak},
27 time::Duration,
28};
29use tokio::{
30 sync::{Semaphore, mpsc},
31 time::Instant,
32};
33use tokio_stream::wrappers::ReceiverStream;
34use tokio_util::{
35 sync::{CancellationToken, DropGuard},
36 task::TaskTracker,
37};
38use tracing::Instrument;
39
40const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
42
43const DEFAULT_STREAM_SETUP_TIMEOUT: Duration = Duration::from_millis(500);
49
50fn warn_on_write_overrun(
55 op: &'static str,
56 elapsed: Duration,
57 threshold: Option<Duration>,
58 is_ok: bool,
59) {
60 let Some(threshold) = threshold else { return };
61 if is_ok && elapsed > threshold {
62 tracing::warn!(
63 op,
64 elapsed_ms = elapsed.as_millis() as u64,
65 threshold_ms = threshold.as_millis() as u64,
66 "cold write exceeded end-to-end write timeout (queue + drain + commit)",
67 );
68 }
69}
70
71fn log_join_error(op: &'static str, e: &tokio::task::JoinError) {
76 if e.is_panic() {
77 tracing::error!(op, error = %e, "cold storage spawned task panicked");
78 } else if e.is_cancelled() {
79 tracing::debug!(op, "cold storage spawned task cancelled");
80 }
81}
82
83const MAX_CONCURRENT_READERS: usize = 64;
85
86const MAX_CONCURRENT_WRITES: usize = 1;
88
89const MAX_CONCURRENT_STREAMS: usize = 8;
91
92const STREAM_CHANNEL_BUFFER: usize = 256;
94
95pub(crate) struct Inner<B> {
97 pub(crate) backend: B,
98 pub(crate) cache: Mutex<ColdCache>,
99 pub(crate) max_stream_deadline: Duration,
100 pub(crate) read_sem: Arc<Semaphore>,
101 pub(crate) write_sem: Arc<Semaphore>,
102 pub(crate) stream_sem: Arc<Semaphore>,
103 pub(crate) tracker: TaskTracker,
104 _shutdown_guard: DropGuard,
108}
109
110pub struct ColdStorage<B: ColdStorageBackend = ErasedBackend> {
116 inner: Arc<Inner<B>>,
117}
118
119impl<B: ColdStorageBackend> Clone for ColdStorage<B> {
120 fn clone(&self) -> Self {
121 Self { inner: Arc::clone(&self.inner) }
122 }
123}
124
125impl<B: ColdStorageBackend> std::fmt::Debug for ColdStorage<B> {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("ColdStorage").finish_non_exhaustive()
128 }
129}
130
131impl<B: ColdStorageBackend> ColdStorage<B> {
132 pub fn new(backend: B, cancel: CancellationToken) -> Self {
140 let shutdown = cancel.child_token();
144 let shutdown_guard = shutdown.clone().drop_guard();
145 let inner = Arc::new(Inner {
146 backend,
147 cache: Mutex::new(ColdCache::new()),
148 max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
149 read_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)),
150 write_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_WRITES)),
151 stream_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
152 tracker: TaskTracker::new(),
153 _shutdown_guard: shutdown_guard,
154 });
155 let weak: Weak<Inner<B>> = Arc::downgrade(&inner);
156 tokio::spawn(async move {
161 shutdown.cancelled().await;
162 let Some(inner) = weak.upgrade() else { return };
163 inner.read_sem.close();
164 inner.write_sem.close();
165 inner.stream_sem.close();
166 inner.tracker.close();
167 });
168 Self { inner }
169 }
170
171 pub async fn wait_shutdown(&self) {
176 self.inner.tracker.close();
177 self.inner.tracker.wait().await;
178 }
179
180 async fn spawn_read<T, F, Fut>(&self, op: &'static str, f: F) -> ColdResult<T>
182 where
183 T: Send + 'static,
184 F: FnOnce(Arc<Inner<B>>) -> Fut + Send + 'static,
185 Fut: std::future::Future<Output = ColdResult<T>> + Send,
186 {
187 let wait = Instant::now();
188 let permit = self
189 .inner
190 .read_sem
191 .clone()
192 .acquire_owned()
193 .await
194 .map_err(|_| ColdStorageError::TaskTerminated)?;
195 metrics::record_permit_wait("read", wait.elapsed());
196 let inner = Arc::clone(&self.inner);
197 self.inner
198 .tracker
199 .spawn(
200 async move {
201 let _p = permit;
202 let _guard = metrics::InFlightGuard::new("read");
203 let start = Instant::now();
204 let result = f(inner).await;
205 metrics::record_op_duration(op, start.elapsed());
206 if let Err(ref e) = result {
207 metrics::record_op_error(op, e.kind());
208 }
209 result
210 }
211 .in_current_span(),
212 )
213 .await
214 .map_err(|e| {
215 log_join_error(op, &e);
216 ColdStorageError::TaskTerminated
217 })?
218 }
219
220 async fn spawn_write<T, F, Fut>(&self, op: &'static str, f: F) -> ColdResult<T>
231 where
232 T: Send + 'static,
233 F: FnOnce(Arc<Inner<B>>) -> Fut + Send + 'static,
234 Fut: std::future::Future<Output = ColdResult<T>> + Send,
235 {
236 let e2e_start = Instant::now();
242 let threshold = self.inner.backend.write_timeout();
243
244 let write_permit = self
245 .inner
246 .write_sem
247 .clone()
248 .acquire_owned()
249 .await
250 .map_err(|_| ColdStorageError::TaskTerminated)?;
251 metrics::record_permit_wait("write", e2e_start.elapsed());
252
253 let drain_wait = Instant::now();
254 let drain = self
255 .inner
256 .read_sem
257 .clone()
258 .acquire_many_owned(MAX_CONCURRENT_READERS as u32)
259 .await
260 .map_err(|_| ColdStorageError::TaskTerminated)?;
261 metrics::record_permit_wait("drain", drain_wait.elapsed());
262
263 let inner = Arc::clone(&self.inner);
264 self.inner
265 .tracker
266 .spawn(
267 async move {
268 let _w = write_permit;
269 let _d = drain;
270 let _guard = metrics::InFlightGuard::new("write");
271 let start = Instant::now();
272 let result = f(inner).await;
273 metrics::record_op_duration(op, start.elapsed());
274 if let Err(ref e) = result {
275 metrics::record_op_error(op, e.kind());
276 }
277 warn_on_write_overrun(op, e2e_start.elapsed(), threshold, result.is_ok());
278 result
279 }
280 .in_current_span(),
281 )
282 .await
283 .map_err(|e| {
284 log_join_error(op, &e);
285 ColdStorageError::TaskTerminated
286 })?
287 }
288
289 #[tracing::instrument(skip(self, spec), fields(op = "get_header"))]
295 pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
296 let op_start = Instant::now();
297 if let HeaderSpecifier::Number(n) = &spec
298 && let Some(hit) = self.inner.cache.lock().get_header(n)
299 {
300 metrics::record_op_duration("get_header", op_start.elapsed());
301 return Ok(Some(hit));
302 }
303 self.spawn_read("get_header", move |inner| async move {
304 let result = inner.backend.get_header(spec).await;
305 if let Ok(Some(ref h)) = result {
306 inner.cache.lock().put_header(h.number, h.clone());
307 }
308 result
309 })
310 .await
311 }
312
313 pub async fn get_header_by_number(
315 &self,
316 block: BlockNumber,
317 ) -> ColdResult<Option<SealedHeader>> {
318 self.get_header(HeaderSpecifier::Number(block)).await
319 }
320
321 pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
323 self.get_header(HeaderSpecifier::Hash(hash)).await
324 }
325
326 #[tracing::instrument(skip(self, specs), fields(op = "get_headers"))]
328 pub async fn get_headers(
329 &self,
330 specs: Vec<HeaderSpecifier>,
331 ) -> ColdResult<Vec<Option<SealedHeader>>> {
332 self.spawn_read("get_headers", move |inner| async move {
333 inner.backend.get_headers(specs).await
334 })
335 .await
336 }
337
338 #[tracing::instrument(skip(self, spec), fields(op = "get_transaction"))]
344 pub async fn get_transaction(
345 &self,
346 spec: TransactionSpecifier,
347 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
348 let op_start = Instant::now();
349 if let TransactionSpecifier::BlockAndIndex { block, index } = &spec
350 && let Some(hit) = self.inner.cache.lock().get_tx(&(*block, *index))
351 {
352 metrics::record_op_duration("get_transaction", op_start.elapsed());
353 return Ok(Some(hit));
354 }
355 self.spawn_read("get_transaction", move |inner| async move {
356 let result = inner.backend.get_transaction(spec).await;
357 if let Ok(Some(ref c)) = result {
358 let meta = c.meta();
359 inner
360 .cache
361 .lock()
362 .put_tx((meta.block_number(), meta.transaction_index()), c.clone());
363 }
364 result
365 })
366 .await
367 }
368
369 pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
371 self.get_transaction(TransactionSpecifier::Hash(hash)).await
372 }
373
374 pub async fn get_tx_by_block_and_index(
376 &self,
377 block: BlockNumber,
378 index: u64,
379 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
380 self.get_transaction(TransactionSpecifier::BlockAndIndex { block, index }).await
381 }
382
383 pub async fn get_tx_by_block_hash_and_index(
385 &self,
386 block_hash: B256,
387 index: u64,
388 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
389 self.get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash, index }).await
390 }
391
392 #[tracing::instrument(skip(self), fields(op = "get_transactions_in_block"))]
394 pub async fn get_transactions_in_block(
395 &self,
396 block: BlockNumber,
397 ) -> ColdResult<Vec<RecoveredTx>> {
398 self.spawn_read("get_transactions_in_block", move |inner| async move {
399 inner.backend.get_transactions_in_block(block).await
400 })
401 .await
402 }
403
404 #[tracing::instrument(skip(self), fields(op = "get_transaction_count"))]
406 pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
407 self.spawn_read("get_transaction_count", move |inner| async move {
408 inner.backend.get_transaction_count(block).await
409 })
410 .await
411 }
412
413 #[tracing::instrument(skip(self, spec), fields(op = "get_receipt"))]
419 pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
420 let op_start = Instant::now();
421 if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec
422 && let Some(hit) = self.inner.cache.lock().get_receipt(&(*block, *index))
423 {
424 metrics::record_op_duration("get_receipt", op_start.elapsed());
425 return Ok(Some(hit));
426 }
427 self.spawn_read("get_receipt", move |inner| async move {
428 let result = inner.backend.get_receipt(spec).await;
429 if let Ok(Some(ref c)) = result {
430 inner.cache.lock().put_receipt((c.block_number, c.transaction_index), c.clone());
431 }
432 result
433 })
434 .await
435 }
436
437 pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
439 self.get_receipt(ReceiptSpecifier::TxHash(hash)).await
440 }
441
442 pub async fn get_receipt_by_block_and_index(
444 &self,
445 block: BlockNumber,
446 index: u64,
447 ) -> ColdResult<Option<ColdReceipt>> {
448 self.get_receipt(ReceiptSpecifier::BlockAndIndex { block, index }).await
449 }
450
451 #[tracing::instrument(skip(self), fields(op = "get_receipts_in_block"))]
453 pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
454 self.spawn_read("get_receipts_in_block", move |inner| async move {
455 inner.backend.get_receipts_in_block(block).await
456 })
457 .await
458 }
459
460 #[tracing::instrument(skip(self, spec), fields(op = "get_signet_events"))]
466 pub async fn get_signet_events(
467 &self,
468 spec: SignetEventsSpecifier,
469 ) -> ColdResult<Vec<DbSignetEvent>> {
470 self.spawn_read("get_signet_events", move |inner| async move {
471 inner.backend.get_signet_events(spec).await
472 })
473 .await
474 }
475
476 pub async fn get_signet_events_in_block(
478 &self,
479 block: BlockNumber,
480 ) -> ColdResult<Vec<DbSignetEvent>> {
481 self.get_signet_events(SignetEventsSpecifier::Block(block)).await
482 }
483
484 pub async fn get_signet_events_in_range(
486 &self,
487 start: BlockNumber,
488 end: BlockNumber,
489 ) -> ColdResult<Vec<DbSignetEvent>> {
490 self.get_signet_events(SignetEventsSpecifier::BlockRange { start, end }).await
491 }
492
493 pub async fn get_zenith_header(
499 &self,
500 block: BlockNumber,
501 ) -> ColdResult<Option<DbZenithHeader>> {
502 self.get_zenith_header_by_spec(ZenithHeaderSpecifier::Number(block)).await
503 }
504
505 #[tracing::instrument(skip(self, spec), fields(op = "get_zenith_header_by_spec"))]
507 async fn get_zenith_header_by_spec(
508 &self,
509 spec: ZenithHeaderSpecifier,
510 ) -> ColdResult<Option<DbZenithHeader>> {
511 self.spawn_read("get_zenith_header_by_spec", move |inner| async move {
512 inner.backend.get_zenith_header(spec).await
513 })
514 .await
515 }
516
517 #[tracing::instrument(skip(self, spec), fields(op = "get_zenith_headers"))]
519 pub async fn get_zenith_headers(
520 &self,
521 spec: ZenithHeaderSpecifier,
522 ) -> ColdResult<Vec<DbZenithHeader>> {
523 self.spawn_read("get_zenith_headers", move |inner| async move {
524 inner.backend.get_zenith_headers(spec).await
525 })
526 .await
527 }
528
529 pub async fn get_zenith_headers_in_range(
531 &self,
532 start: BlockNumber,
533 end: BlockNumber,
534 ) -> ColdResult<Vec<DbZenithHeader>> {
535 self.get_zenith_headers(ZenithHeaderSpecifier::Range { start, end }).await
536 }
537
538 #[tracing::instrument(skip(self, filter), fields(op = "get_logs"))]
547 pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
548 self.spawn_read("get_logs", move |inner| async move {
549 inner.backend.get_logs(&filter, max_logs).await
550 })
551 .await
552 }
553
554 #[tracing::instrument(skip(self, filter), fields(op = "stream_logs"))]
562 pub async fn stream_logs(
563 &self,
564 filter: Filter,
565 max_logs: usize,
566 deadline: Duration,
567 ) -> ColdResult<LogStream> {
568 let from = filter.get_from_block().unwrap_or(0);
569 let to = match filter.get_to_block() {
583 Some(to) => to,
584 None => {
585 let setup_to =
586 self.inner.backend.read_timeout().unwrap_or(DEFAULT_STREAM_SETUP_TIMEOUT);
587 let latest = tokio::time::timeout(setup_to, self.inner.backend.get_latest_block())
588 .await
589 .map_err(|_| ColdStorageError::DeadlineExceeded(setup_to))??;
590 match latest {
591 Some(latest) => latest,
592 None => {
593 let (_tx, rx) = mpsc::channel(1);
594 return Ok(ReceiverStream::new(rx));
595 }
596 }
597 }
598 };
599
600 let wait = Instant::now();
601 let permit = self
602 .inner
603 .stream_sem
604 .clone()
605 .acquire_owned()
606 .await
607 .map_err(|_| ColdStorageError::TaskTerminated)?;
608 metrics::record_permit_wait("stream", wait.elapsed());
609
610 let effective = deadline.min(self.inner.max_stream_deadline);
611 let deadline_instant = Instant::now() + effective;
612 let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
613 let inner = Arc::clone(&self.inner);
614 let started = Instant::now();
615 self.inner.tracker.spawn(
616 async move {
617 let _p = permit;
618 let _guard = metrics::InFlightGuard::new("stream");
619 let params =
620 StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
621 inner.backend.produce_log_stream(&filter, params).await;
622 metrics::record_stream_lifetime(started.elapsed());
623 }
624 .in_current_span(),
625 );
626 Ok(ReceiverStream::new(rx))
627 }
628
629 #[tracing::instrument(skip(self), fields(op = "get_latest_block"))]
635 pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
636 self.spawn_read("get_latest_block", move |inner| async move {
637 inner.backend.get_latest_block().await
638 })
639 .await
640 }
641
642 #[tracing::instrument(skip(self, data), fields(op = "append_block"))]
648 pub async fn append_block(&self, data: BlockData) -> ColdResult<()> {
649 self.spawn_write("append_block", move |inner| async move {
650 inner.backend.append_block(data).await
651 })
652 .await
653 }
654
655 #[tracing::instrument(skip(self, data), fields(op = "append_blocks"))]
657 pub async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
658 self.spawn_write("append_blocks", move |inner| async move {
659 inner.backend.append_blocks(data).await
660 })
661 .await
662 }
663
664 #[tracing::instrument(skip(self), fields(op = "truncate_above"))]
669 pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
670 self.spawn_write("truncate_above", move |inner| async move {
671 let result = inner.backend.truncate_above(block).await;
672 if result.is_ok() {
673 inner.cache.lock().invalidate_above(block);
674 }
675 result
676 })
677 .await
678 }
679
680 #[tracing::instrument(skip(self), fields(op = "drain_above"))]
685 pub async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
686 self.spawn_write("drain_above", move |inner| async move {
687 let result = inner.backend.drain_above(block).await;
688 if result.is_ok() {
689 inner.cache.lock().invalidate_above(block);
690 }
691 result
692 })
693 .await
694 }
695}
696
697impl ColdStorage<ErasedBackend> {
698 pub fn new_erased<B: ColdStorageBackend>(backend: B, cancel: CancellationToken) -> Self {
709 Self::new(ErasedBackend::new(backend), cancel)
710 }
711}