1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14 COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15 COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16 COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17 COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18 COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19 COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20 COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21 COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22 COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26 decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27 encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30 consensus::{
31 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32 transaction::Recovered,
33 },
34 primitives::{
35 Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36 },
37};
38use signet_cold::{
39 BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, ColdStorageRead,
40 ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41 SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45 TransactionSigned,
46};
47use signet_zenith::{
48 Passage::{Enter, EnterToken},
49 Transactor::Transact,
50 Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::{collections::BTreeMap, time::Duration};
54
55pub(crate) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_millis(500);
57pub(crate) const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(2);
59
60#[derive(Debug, Clone)]
80pub struct SqlColdBackend {
81 pool: AnyPool,
82 is_postgres: bool,
83 read_timeout: Duration,
84 write_timeout: Duration,
85}
86
87fn is_in_memory_sqlite(url: &str) -> bool {
93 url == "sqlite::memory:" || url.contains("mode=memory")
94}
95
96async fn delete_above_in_tx(
99 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
100 bn: i64,
101) -> Result<(), SqlColdError> {
102 for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
103 {
104 sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
105 .bind(bn)
106 .execute(&mut **tx)
107 .await?;
108 }
109 Ok(())
110}
111
112impl SqlColdBackend {
113 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
120 let conn = pool.acquire().await?;
122 let backend = conn.backend_name();
123
124 let (migration, is_postgres) = match backend {
125 "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
126 "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
127 other => {
128 return Err(SqlColdError::Convert(format!(
129 "unsupported database backend: {other}"
130 )));
131 }
132 };
133 drop(conn);
134 sqlx::raw_sql(migration).execute(&pool).await?;
137 sqlx::raw_sql(include_str!("../migrations/002_add_topic_indexes.sql"))
138 .execute(&pool)
139 .await?;
140 Ok(Self {
141 pool,
142 is_postgres,
143 read_timeout: DEFAULT_READ_TIMEOUT,
144 write_timeout: DEFAULT_WRITE_TIMEOUT,
145 })
146 }
147
148 #[must_use]
160 pub fn with_read_timeout(mut self, d: Duration) -> Self {
161 assert!(d.as_millis() >= 1, "read_timeout must be >= 1ms (got {d:?})");
162 self.read_timeout = d;
163 self
164 }
165
166 #[must_use]
176 pub fn with_write_timeout(mut self, d: Duration) -> Self {
177 assert!(d.as_millis() >= 1, "write_timeout must be >= 1ms (got {d:?})");
178 self.write_timeout = d;
179 self
180 }
181
182 async fn begin_read(&self) -> Result<sqlx::Transaction<'_, sqlx::Any>, SqlColdError> {
188 let tx = self.pool.begin().await?;
189 self.apply_statement_timeout(tx, self.read_timeout).await
190 }
191
192 async fn begin_write(&self) -> Result<sqlx::Transaction<'_, sqlx::Any>, SqlColdError> {
197 let tx = self.pool.begin().await?;
198 self.apply_statement_timeout(tx, self.write_timeout).await
199 }
200
201 #[cfg(any(test, feature = "test-utils"))]
207 pub async fn debug_pg_sleep(&self, d: Duration) -> Result<(), SqlColdError> {
208 let secs = d.as_secs_f64();
209 let mut tx = self.begin_read().await?;
210 sqlx::query(&format!("SELECT pg_sleep({secs})")).execute(&mut *tx).await?;
211 tx.commit().await?;
212 Ok(())
213 }
214
215 async fn apply_statement_timeout<'a>(
222 &self,
223 mut tx: sqlx::Transaction<'a, sqlx::Any>,
224 d: Duration,
225 ) -> Result<sqlx::Transaction<'a, sqlx::Any>, SqlColdError> {
226 if self.is_postgres {
227 let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
228 sqlx::query(&format!("SET LOCAL statement_timeout = {ms}")).execute(&mut *tx).await?;
229 }
230 Ok(tx)
231 }
232
233 pub async fn connect_with(
242 url: &str,
243 pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
244 ) -> Result<Self, SqlColdError> {
245 sqlx::any::install_default_drivers();
246 let pool_opts =
247 if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
248 let pool: AnyPool = pool_opts.connect(url).await?;
249 Self::new(pool).await
250 }
251
252 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
256 Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
257 }
258
259 async fn resolve_header_spec(
264 &self,
265 spec: HeaderSpecifier,
266 ) -> Result<Option<BlockNumber>, SqlColdError> {
267 match spec {
268 HeaderSpecifier::Number(n) => Ok(Some(n)),
269 HeaderSpecifier::Hash(hash) => {
270 let mut tx = self.begin_read().await?;
271 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
272 .bind(hash)
273 .fetch_optional(&mut *tx)
274 .await?;
275 tx.commit().await?;
276 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
277 }
278 }
279 }
280
281 async fn fetch_header_by_number(
286 &self,
287 block_num: BlockNumber,
288 ) -> Result<Option<SealedHeader>, SqlColdError> {
289 let bn = to_i64(block_num);
290 let mut tx = self.begin_read().await?;
291 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
292 .bind(bn)
293 .fetch_optional(&mut *tx)
294 .await?;
295 tx.commit().await?;
296
297 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
298 }
299
300 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
305 let mut tx = self.begin_write().await?;
306 write_block_to_tx(&mut tx, data).await?;
307 tx.commit().await?;
308 Ok(())
309 }
310
311 #[cfg(feature = "postgres")]
321 async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
322 use tokio_stream::StreamExt;
323
324 macro_rules! try_stream {
326 ($sender:expr, $expr:expr) => {
327 match $expr {
328 Ok(v) => v,
329 Err(e) => {
330 let _ = $sender
331 .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
332 .await;
333 return;
334 }
335 }
336 };
337 }
338
339 let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
340
341 let mut tx = try_stream!(sender, self.pool.begin().await);
348 try_stream!(
349 sender,
350 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
351 );
352 let read_ms = i64::try_from(self.read_timeout.as_millis()).unwrap_or(i64::MAX);
353 try_stream!(
354 sender,
355 sqlx::query(&format!("SET LOCAL statement_timeout = {read_ms}"))
356 .execute(&mut *tx)
357 .await
358 );
359
360 let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
368 let data_sql = format!(
369 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
370 (r.first_log_index + l.log_index) AS block_log_index \
371 FROM logs l \
372 JOIN headers h ON l.block_number = h.block_number \
373 JOIN transactions t ON l.block_number = t.block_number \
374 AND l.tx_index = t.tx_index \
375 JOIN receipts r ON l.block_number = r.block_number \
376 AND l.tx_index = r.tx_index \
377 WHERE l.block_number = $1{filter_clause} \
378 ORDER BY l.tx_index, l.log_index"
379 );
380
381 let mut total = 0usize;
382
383 for block_num in from..=to {
386 if tokio::time::Instant::now() > deadline {
389 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
390 return;
391 }
392
393 let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
394 for param in &filter_params {
395 query = query.bind(*param);
396 }
397
398 let mut stream = query.fetch(&mut *tx);
402 while let Some(row_result) = stream.next().await {
403 let r = try_stream!(sender, row_result);
404
405 total += 1;
407 if total > max_logs {
408 let _ =
409 sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
410 return;
411 }
412
413 let log = log_from_row(&r);
414 let rpc_log = RpcLog {
415 inner: log,
416 block_hash: Some(r.get(COL_BLOCK_HASH)),
417 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
418 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
419 transaction_hash: Some(r.get(COL_TX_HASH)),
420 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
421 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
422 removed: false,
423 };
424 match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
427 Ok(Ok(())) => {}
428 Ok(Err(_)) => return, Err(_) => {
430 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
431 return;
432 }
433 }
434 }
435
436 if total >= max_logs {
439 return;
440 }
441 }
442 }
443}
444
445fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
451 r.get(col)
452}
453
454fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
456 r.get(col)
457}
458
459fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
461 Ok(Header {
462 parent_hash: r.get(COL_PARENT_HASH),
463 ommers_hash: r.get(COL_OMMERS_HASH),
464 beneficiary: r.get(COL_BENEFICIARY),
465 state_root: r.get(COL_STATE_ROOT),
466 transactions_root: r.get(COL_TRANSACTIONS_ROOT),
467 receipts_root: r.get(COL_RECEIPTS_ROOT),
468 logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
469 difficulty: r.get(COL_DIFFICULTY),
470 number: from_i64(r.get(COL_BLOCK_NUMBER)),
471 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
472 gas_used: from_i64(r.get(COL_GAS_USED)),
473 timestamp: from_i64(r.get(COL_TIMESTAMP)),
474 extra_data: r.get(COL_EXTRA_DATA),
475 mix_hash: r.get(COL_MIX_HASH),
476 nonce: r.get(COL_NONCE),
477 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
478 withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
479 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
480 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
481 parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
482 requests_hash: r.get(COL_REQUESTS_HASH),
483 })
484}
485
486fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
488 use alloy::consensus::EthereumTxEnvelope;
489
490 let sig =
491 Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
492
493 let tx_type_raw: i32 = r.get(COL_TX_TYPE);
494 let tx_type_u8: u8 = tx_type_raw
495 .try_into()
496 .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
497 let tx_type = TxType::try_from(tx_type_u8)
498 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
499
500 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
501 let nonce = from_i64(r.get(COL_NONCE));
502 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
503 let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
504 let value: U256 = r.get(COL_VALUE);
505 let input: Bytes = r.get(COL_INPUT);
506
507 match tx_type {
508 TxType::Legacy => {
509 let tx = TxLegacy {
510 chain_id: chain_id.map(from_i64),
511 nonce,
512 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
513 gas_limit,
514 to: to_addr.map_or(TxKind::Create, TxKind::Call),
515 value,
516 input,
517 };
518 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
519 }
520 TxType::Eip2930 => {
521 let tx = TxEip2930 {
522 chain_id: from_i64(
523 chain_id
524 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
525 ),
526 nonce,
527 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
528 gas_limit,
529 to: to_addr.map_or(TxKind::Create, TxKind::Call),
530 value,
531 input,
532 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
533 };
534 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
535 }
536 TxType::Eip1559 => {
537 let tx = TxEip1559 {
538 chain_id: from_i64(
539 chain_id
540 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
541 ),
542 nonce,
543 gas_limit,
544 max_fee_per_gas: decode_u128_required(
545 opt_blob(r, COL_MAX_FEE_PER_GAS),
546 COL_MAX_FEE_PER_GAS,
547 )?,
548 max_priority_fee_per_gas: decode_u128_required(
549 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
550 COL_MAX_PRIORITY_FEE_PER_GAS,
551 )?,
552 to: to_addr.map_or(TxKind::Create, TxKind::Call),
553 value,
554 input,
555 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
556 };
557 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
558 }
559 TxType::Eip4844 => {
560 let tx = TxEip4844 {
561 chain_id: from_i64(
562 chain_id
563 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
564 ),
565 nonce,
566 gas_limit,
567 max_fee_per_gas: decode_u128_required(
568 opt_blob(r, COL_MAX_FEE_PER_GAS),
569 COL_MAX_FEE_PER_GAS,
570 )?,
571 max_priority_fee_per_gas: decode_u128_required(
572 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
573 COL_MAX_PRIORITY_FEE_PER_GAS,
574 )?,
575 to: to_addr
576 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
577 value,
578 input,
579 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
580 blob_versioned_hashes: decode_b256_vec(
581 opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
582 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
583 })?,
584 )?,
585 max_fee_per_blob_gas: decode_u128_required(
586 opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
587 COL_MAX_FEE_PER_BLOB_GAS,
588 )?,
589 };
590 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
591 }
592 TxType::Eip7702 => {
593 let tx = TxEip7702 {
594 chain_id: from_i64(
595 chain_id
596 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
597 ),
598 nonce,
599 gas_limit,
600 max_fee_per_gas: decode_u128_required(
601 opt_blob(r, COL_MAX_FEE_PER_GAS),
602 COL_MAX_FEE_PER_GAS,
603 )?,
604 max_priority_fee_per_gas: decode_u128_required(
605 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
606 COL_MAX_PRIORITY_FEE_PER_GAS,
607 )?,
608 to: to_addr
609 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
610 value,
611 input,
612 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
613 authorization_list: decode_authorization_list(
614 opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
615 SqlColdError::Convert("EIP7702 requires authorization_list".into())
616 })?,
617 )?,
618 };
619 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
620 }
621 }
622}
623
624fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
626 let sender: Address = r.get(COL_FROM_ADDRESS);
627 let tx = tx_from_row(r)?;
628 Ok(Recovered::new_unchecked(tx, sender))
630}
631
632fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
634 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
635 .into_iter()
636 .filter_map(|col| r.get::<Option<B256>, _>(col))
637 .collect();
638 Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
639}
640
641fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
643 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
644 let order = from_i64(r.get(COL_ORDER_INDEX));
645 let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
646
647 match event_type {
648 EVENT_TRANSACT => {
649 let sender: Address = r
650 .get::<Option<Address>, _>(COL_SENDER)
651 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
652 let to: Address = r
653 .get::<Option<Address>, _>(COL_TO_ADDRESS)
654 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
655 let value: U256 = r
656 .get::<Option<U256>, _>(COL_VALUE)
657 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
658 let gas: U256 = r
659 .get::<Option<U256>, _>(COL_GAS)
660 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
661 let max_fee: U256 = r
662 .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
663 .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
664 let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
665
666 Ok(DbSignetEvent::Transact(
667 order,
668 Transact {
669 rollupChainId: rollup_chain_id,
670 sender,
671 to,
672 value,
673 gas,
674 maxFeePerGas: max_fee,
675 data,
676 },
677 ))
678 }
679 EVENT_ENTER => {
680 let recipient: Address = r
681 .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
682 .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
683 let amount: U256 = r
684 .get::<Option<U256>, _>(COL_AMOUNT)
685 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
686
687 Ok(DbSignetEvent::Enter(
688 order,
689 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
690 ))
691 }
692 EVENT_ENTER_TOKEN => {
693 let token: Address = r
694 .get::<Option<Address>, _>(COL_TOKEN)
695 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
696 let recipient: Address =
697 r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
698 SqlColdError::Convert("EnterToken requires rollup_recipient".into())
699 })?;
700 let amount: U256 = r
701 .get::<Option<U256>, _>(COL_AMOUNT)
702 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
703
704 Ok(DbSignetEvent::EnterToken(
705 order,
706 EnterToken {
707 rollupChainId: rollup_chain_id,
708 token,
709 rollupRecipient: recipient,
710 amount,
711 },
712 ))
713 }
714 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
715 }
716}
717
718fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
720 Ok(DbZenithHeader(Zenith::BlockHeader {
721 hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
722 rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
723 gasLimit: r.get(COL_GAS_LIMIT),
724 rewardAddress: r.get(COL_REWARD_ADDRESS),
725 blockDataHash: r.get(COL_BLOCK_DATA_HASH),
726 }))
727}
728
729async fn write_block_to_tx(
735 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
736 data: BlockData,
737) -> Result<(), SqlColdError> {
738 let bn = to_i64(data.block_number());
739
740 let block_hash = data.header.hash_slow();
742 let difficulty = &data.header.difficulty;
743 sqlx::query(
744 "INSERT INTO headers (
745 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
746 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
747 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
748 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
749 parent_beacon_block_root, requests_hash
750 ) VALUES (
751 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
752 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
753 )",
754 )
755 .bind(bn)
756 .bind(block_hash)
757 .bind(data.header.parent_hash)
758 .bind(data.header.ommers_hash)
759 .bind(data.header.beneficiary)
760 .bind(data.header.state_root)
761 .bind(data.header.transactions_root)
762 .bind(data.header.receipts_root)
763 .bind(data.header.logs_bloom)
764 .bind(difficulty)
765 .bind(to_i64(data.header.gas_limit))
766 .bind(to_i64(data.header.gas_used))
767 .bind(to_i64(data.header.timestamp))
768 .bind(&data.header.extra_data)
769 .bind(data.header.mix_hash)
770 .bind(data.header.nonce)
771 .bind(data.header.base_fee_per_gas.map(to_i64))
772 .bind(data.header.withdrawals_root.as_ref())
773 .bind(data.header.blob_gas_used.map(to_i64))
774 .bind(data.header.excess_blob_gas.map(to_i64))
775 .bind(data.header.parent_beacon_block_root.as_ref())
776 .bind(data.header.requests_hash.as_ref())
777 .execute(&mut **tx)
778 .await?;
779
780 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
782 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
783 }
784
785 let mut first_log_index = 0i64;
788 for (idx, receipt) in data.receipts.iter().enumerate() {
789 let tx_idx = to_i64(idx as u64);
790 sqlx::query(
791 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
792 VALUES ($1, $2, $3, $4, $5, $6)",
793 )
794 .bind(bn)
795 .bind(tx_idx)
796 .bind(receipt.tx_type as i32)
797 .bind(receipt.inner.status.coerce_status() as i32)
798 .bind(to_i64(receipt.inner.cumulative_gas_used))
799 .bind(first_log_index)
800 .execute(&mut **tx)
801 .await?;
802 first_log_index += receipt.inner.logs.len() as i64;
803
804 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
805 let topics = log.topics();
806 sqlx::query(
807 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
808 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
809 )
810 .bind(bn)
811 .bind(tx_idx)
812 .bind(to_i64(log_idx as u64))
813 .bind(log.address)
814 .bind(topics.first())
815 .bind(topics.get(1))
816 .bind(topics.get(2))
817 .bind(topics.get(3))
818 .bind(&log.data.data)
819 .execute(&mut **tx)
820 .await?;
821 }
822 }
823
824 for (idx, event) in data.signet_events.iter().enumerate() {
826 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
827 }
828
829 if let Some(zh) = &data.zenith_header {
831 let h = &zh.0;
832 sqlx::query(
833 "INSERT INTO zenith_headers (
834 block_number, host_block_number, rollup_chain_id,
835 gas_limit, reward_address, block_data_hash
836 ) VALUES ($1, $2, $3, $4, $5, $6)",
837 )
838 .bind(bn)
839 .bind(h.hostBlockNumber)
840 .bind(h.rollupChainId)
841 .bind(h.gasLimit)
842 .bind(h.rewardAddress)
843 .bind(h.blockDataHash)
844 .execute(&mut **tx)
845 .await?;
846 }
847
848 Ok(())
849}
850
851async fn insert_transaction(
853 conn: &mut sqlx::AnyConnection,
854 bn: i64,
855 tx_index: i64,
856 recovered: &RecoveredTx,
857) -> Result<(), SqlColdError> {
858 use alloy::consensus::EthereumTxEnvelope;
859
860 let sender = recovered.signer();
861 let tx: &TransactionSigned = recovered;
862 let tx_hash = tx.tx_hash();
863 let tx_type = tx.tx_type() as i32;
864
865 macro_rules! sig {
866 ($s:expr) => {{
867 let sig = $s.signature();
868 (sig.v() as i32, sig.r(), sig.s())
869 }};
870 }
871 let (sig_y, sig_r, sig_s) = match tx {
872 EthereumTxEnvelope::Legacy(s) => sig!(s),
873 EthereumTxEnvelope::Eip2930(s) => sig!(s),
874 EthereumTxEnvelope::Eip1559(s) => sig!(s),
875 EthereumTxEnvelope::Eip4844(s) => sig!(s),
876 EthereumTxEnvelope::Eip7702(s) => sig!(s),
877 };
878
879 let (chain_id, nonce, gas_limit) = match tx {
880 EthereumTxEnvelope::Legacy(s) => {
881 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
882 }
883 EthereumTxEnvelope::Eip2930(s) => {
884 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
885 }
886 EthereumTxEnvelope::Eip1559(s) => {
887 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
888 }
889 EthereumTxEnvelope::Eip4844(s) => {
890 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
891 }
892 EthereumTxEnvelope::Eip7702(s) => {
893 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
894 }
895 };
896
897 let (value, to_addr) = match tx {
898 EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
899 EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
900 EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
901 EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
902 EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
903 };
904
905 let input: &[u8] = match tx {
906 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
907 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
908 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
909 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
910 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
911 };
912
913 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
914 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
915 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
916 EthereumTxEnvelope::Eip1559(s) => (
917 None,
918 Some(encode_u128(s.tx().max_fee_per_gas)),
919 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
920 None,
921 ),
922 EthereumTxEnvelope::Eip4844(s) => (
923 None,
924 Some(encode_u128(s.tx().max_fee_per_gas)),
925 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
926 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
927 ),
928 EthereumTxEnvelope::Eip7702(s) => (
929 None,
930 Some(encode_u128(s.tx().max_fee_per_gas)),
931 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
932 None,
933 ),
934 };
935
936 let (access_list, blob_hashes, auth_list) = match tx {
937 EthereumTxEnvelope::Legacy(_) => (None, None, None),
938 EthereumTxEnvelope::Eip2930(s) => {
939 (Some(encode_access_list(&s.tx().access_list)), None, None)
940 }
941 EthereumTxEnvelope::Eip1559(s) => {
942 (Some(encode_access_list(&s.tx().access_list)), None, None)
943 }
944 EthereumTxEnvelope::Eip4844(s) => (
945 Some(encode_access_list(&s.tx().access_list)),
946 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
947 None,
948 ),
949 EthereumTxEnvelope::Eip7702(s) => (
950 Some(encode_access_list(&s.tx().access_list)),
951 None,
952 Some(encode_authorization_list(&s.tx().authorization_list)),
953 ),
954 };
955
956 sqlx::query(
957 "INSERT INTO transactions (
958 block_number, tx_index, tx_hash, tx_type,
959 sig_y_parity, sig_r, sig_s,
960 chain_id, nonce, gas_limit, to_address, value, input,
961 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
962 max_fee_per_blob_gas, blob_versioned_hashes,
963 access_list, authorization_list, from_address
964 ) VALUES (
965 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
966 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
967 )",
968 )
969 .bind(bn)
970 .bind(tx_index)
971 .bind(tx_hash)
972 .bind(tx_type)
973 .bind(sig_y)
974 .bind(sig_r)
975 .bind(sig_s)
976 .bind(chain_id)
977 .bind(nonce)
978 .bind(gas_limit)
979 .bind(to_addr)
980 .bind(value)
981 .bind(input)
982 .bind(gas_price.as_ref().map(|v| v.as_slice()))
983 .bind(max_fee.as_ref().map(|v| v.as_slice()))
984 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
985 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
986 .bind(blob_hashes.as_deref())
987 .bind(access_list.as_deref())
988 .bind(auth_list.as_deref())
989 .bind(sender)
990 .execute(&mut *conn)
991 .await?;
992
993 Ok(())
994}
995
996async fn insert_signet_event(
998 conn: &mut sqlx::AnyConnection,
999 block_number: i64,
1000 event_index: i64,
1001 event: &DbSignetEvent,
1002) -> Result<(), SqlColdError> {
1003 let (event_type, order, chain_id) = match event {
1004 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
1005 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
1006 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
1007 };
1008
1009 let (value, gas, max_fee, amount) = match event {
1010 DbSignetEvent::Transact(_, t) => {
1011 (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
1012 }
1013 DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
1014 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
1015 };
1016
1017 sqlx::query(
1018 "INSERT INTO signet_events (
1019 block_number, event_index, event_type, order_index,
1020 rollup_chain_id, sender, to_address, value, gas,
1021 max_fee_per_gas, data, rollup_recipient, amount, token
1022 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1023 )
1024 .bind(block_number)
1025 .bind(event_index)
1026 .bind(event_type)
1027 .bind(order)
1028 .bind(chain_id)
1029 .bind(match event {
1030 DbSignetEvent::Transact(_, t) => Some(&t.sender),
1031 _ => None,
1032 })
1033 .bind(match event {
1034 DbSignetEvent::Transact(_, t) => Some(&t.to),
1035 _ => None,
1036 })
1037 .bind(value)
1038 .bind(gas)
1039 .bind(max_fee)
1040 .bind(match event {
1041 DbSignetEvent::Transact(_, t) => Some(&t.data),
1042 _ => None,
1043 })
1044 .bind(match event {
1045 DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
1046 DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
1047 _ => None,
1048 })
1049 .bind(amount)
1050 .bind(match event {
1051 DbSignetEvent::EnterToken(_, e) => Some(&e.token),
1052 _ => None,
1053 })
1054 .execute(&mut *conn)
1055 .await?;
1056
1057 Ok(())
1058}
1059
1060fn append_filter_clause<'a>(
1070 clause: &mut String,
1071 params: &mut Vec<&'a [u8]>,
1072 mut idx: u32,
1073 column: &str,
1074 values: impl ExactSizeIterator<Item = &'a [u8]>,
1075) -> u32 {
1076 use std::fmt::Write;
1077
1078 let len = values.len();
1079 if len == 1 {
1080 write!(clause, " AND {column} = ${idx}").unwrap();
1081 values.for_each(|v| params.push(v));
1082 return idx + 1;
1083 }
1084 write!(clause, " AND {column} IN (").unwrap();
1085 for (i, v) in values.enumerate() {
1086 if i > 0 {
1087 clause.push_str(", ");
1088 }
1089 write!(clause, "${idx}").unwrap();
1090 params.push(v);
1091 idx += 1;
1092 }
1093 clause.push(')');
1094 idx
1095}
1096
1097fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
1098 let mut clause = String::new();
1099 let mut params: Vec<&[u8]> = Vec::new();
1100 let mut idx = start_idx;
1101
1102 if !filter.address.is_empty() {
1103 idx = append_filter_clause(
1104 &mut clause,
1105 &mut params,
1106 idx,
1107 "l.address",
1108 filter.address.iter().map(|a| a.as_slice()),
1109 );
1110 }
1111
1112 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1113 for (i, topic_filter) in filter.topics.iter().enumerate() {
1114 if topic_filter.is_empty() {
1115 continue;
1116 }
1117 idx = append_filter_clause(
1118 &mut clause,
1119 &mut params,
1120 idx,
1121 topic_cols[i],
1122 topic_filter.iter().map(|v| v.as_slice()),
1123 );
1124 }
1125
1126 (clause, params)
1127}
1128
1129impl ColdStorageRead for SqlColdBackend {
1134 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1135 let Some(block_num) = self.resolve_header_spec(spec).await? else {
1136 return Ok(None);
1137 };
1138 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1139 }
1140
1141 async fn get_headers(
1142 &self,
1143 specs: Vec<HeaderSpecifier>,
1144 ) -> ColdResult<Vec<Option<SealedHeader>>> {
1145 let mut results = Vec::with_capacity(specs.len());
1146 for spec in specs {
1147 let header = self.get_header(spec).await?;
1148 results.push(header);
1149 }
1150 Ok(results)
1151 }
1152
1153 async fn get_transaction(
1154 &self,
1155 spec: TransactionSpecifier,
1156 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1157 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1158 let row = match spec {
1159 TransactionSpecifier::Hash(hash) => sqlx::query(
1160 "SELECT t.*, h.block_hash
1161 FROM transactions t
1162 JOIN headers h ON t.block_number = h.block_number
1163 WHERE t.tx_hash = $1",
1164 )
1165 .bind(hash)
1166 .fetch_optional(&mut *tx)
1167 .await
1168 .map_err(SqlColdError::from)?,
1169 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1170 "SELECT t.*, h.block_hash
1171 FROM transactions t
1172 JOIN headers h ON t.block_number = h.block_number
1173 WHERE t.block_number = $1 AND t.tx_index = $2",
1174 )
1175 .bind(to_i64(block))
1176 .bind(to_i64(index))
1177 .fetch_optional(&mut *tx)
1178 .await
1179 .map_err(SqlColdError::from)?,
1180 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1181 "SELECT t.*, h.block_hash
1182 FROM transactions t
1183 JOIN headers h ON t.block_number = h.block_number
1184 WHERE h.block_hash = $1 AND t.tx_index = $2",
1185 )
1186 .bind(block_hash)
1187 .bind(to_i64(index))
1188 .fetch_optional(&mut *tx)
1189 .await
1190 .map_err(SqlColdError::from)?,
1191 };
1192 tx.commit().await.map_err(SqlColdError::from)?;
1193
1194 let Some(r) = row else {
1195 return Ok(None);
1196 };
1197
1198 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1199 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1200 let block_hash = r.get(COL_BLOCK_HASH);
1201 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1202 let meta = ConfirmationMeta::new(block, block_hash, index);
1203 Ok(Some(Confirmed::new(recovered, meta)))
1204 }
1205
1206 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1207 let bn = to_i64(block);
1208 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1209 let rows =
1210 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1211 .bind(bn)
1212 .fetch_all(&mut *tx)
1213 .await
1214 .map_err(SqlColdError::from)?;
1215 tx.commit().await.map_err(SqlColdError::from)?;
1216
1217 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1218 }
1219
1220 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1221 let bn = to_i64(block);
1222 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1223 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1224 .bind(bn)
1225 .fetch_one(&mut *tx)
1226 .await
1227 .map_err(SqlColdError::from)?;
1228 tx.commit().await.map_err(SqlColdError::from)?;
1229
1230 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1231 }
1232
1233 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1234 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1235 let (block, index) = match spec {
1237 ReceiptSpecifier::TxHash(hash) => {
1238 let row = sqlx::query(
1239 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1240 )
1241 .bind(hash)
1242 .fetch_optional(&mut *tx)
1243 .await
1244 .map_err(SqlColdError::from)?;
1245 let Some(r) = row else { return Ok(None) };
1246 (
1247 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1248 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1249 )
1250 }
1251 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1252 };
1253
1254 let combined = sqlx::query(
1258 "SELECT h.*, \
1259 r.tx_type AS r_tx_type, r.success AS r_success, \
1260 r.cumulative_gas_used AS r_cumulative_gas_used, \
1261 r.first_log_index AS r_first_log_index, \
1262 t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1263 COALESCE( \
1264 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1265 FROM receipts r2 \
1266 WHERE r2.block_number = r.block_number \
1267 AND r2.tx_index < r.tx_index), \
1268 0 \
1269 ) AS prior_gas \
1270 FROM receipts r \
1271 JOIN transactions t ON r.block_number = t.block_number \
1272 AND r.tx_index = t.tx_index \
1273 JOIN headers h ON r.block_number = h.block_number \
1274 WHERE r.block_number = $1 AND r.tx_index = $2",
1275 )
1276 .bind(to_i64(block))
1277 .bind(to_i64(index))
1278 .fetch_optional(&mut *tx)
1279 .await
1280 .map_err(SqlColdError::from)?;
1281
1282 let Some(rr) = combined else {
1283 return Ok(None);
1284 };
1285
1286 let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1288
1289 let tx_hash = rr.get(COL_R_TX_HASH);
1291 let sender = rr.get(COL_R_FROM_ADDRESS);
1292 let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1293 let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1294 let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1295 let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1296 let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1297
1298 let log_rows = sqlx::query(
1300 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1301 )
1302 .bind(to_i64(block))
1303 .bind(to_i64(index))
1304 .fetch_all(&mut *tx)
1305 .await
1306 .map_err(SqlColdError::from)?;
1307 tx.commit().await.map_err(SqlColdError::from)?;
1308
1309 let logs = log_rows.iter().map(log_from_row).collect();
1310 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1311 .map_err(ColdStorageError::from)?;
1312 debug_assert!(
1315 built.inner.cumulative_gas_used >= prior_cumulative_gas,
1316 "cumulative gas decreased: {} < {}",
1317 built.inner.cumulative_gas_used,
1318 prior_cumulative_gas,
1319 );
1320 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1321
1322 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1323 Ok(Some(ColdReceipt::new(ir, &header, index)))
1324 }
1325
1326 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1327 let Some(header) =
1328 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1329 else {
1330 return Ok(Vec::new());
1331 };
1332
1333 let bn = to_i64(block);
1334 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1335
1336 let receipt_rows = sqlx::query(
1338 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1339 r.cumulative_gas_used, r.first_log_index, \
1340 t.tx_hash, t.from_address \
1341 FROM receipts r \
1342 JOIN transactions t ON r.block_number = t.block_number \
1343 AND r.tx_index = t.tx_index \
1344 WHERE r.block_number = $1 \
1345 ORDER BY r.tx_index",
1346 )
1347 .bind(bn)
1348 .fetch_all(&mut *tx)
1349 .await
1350 .map_err(SqlColdError::from)?;
1351
1352 let all_log_rows =
1353 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1354 .bind(bn)
1355 .fetch_all(&mut *tx)
1356 .await
1357 .map_err(SqlColdError::from)?;
1358 tx.commit().await.map_err(SqlColdError::from)?;
1359
1360 let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1362 for r in &all_log_rows {
1363 let tx_idx: i64 = r.get(COL_TX_INDEX);
1364 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1365 }
1366
1367 let mut first_log_index = 0u64;
1368 let mut prior_cumulative_gas = 0u64;
1369 receipt_rows
1370 .into_iter()
1371 .enumerate()
1372 .map(|(idx, rr)| {
1373 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1374 let tx_hash = rr.get(COL_TX_HASH);
1375 let sender = rr.get(COL_FROM_ADDRESS);
1376 let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1377 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1378 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1379 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1380 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1381 .map_err(ColdStorageError::from)?;
1382 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1383 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1384 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1385 first_log_index += ir.receipt.inner.logs.len() as u64;
1386 Ok(ColdReceipt::new(ir, &header, idx as u64))
1387 })
1388 .collect()
1389 }
1390
1391 async fn get_signet_events(
1392 &self,
1393 spec: SignetEventsSpecifier,
1394 ) -> ColdResult<Vec<DbSignetEvent>> {
1395 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1396 let rows = match spec {
1397 SignetEventsSpecifier::Block(block) => {
1398 let bn = to_i64(block);
1399 sqlx::query(
1400 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1401 )
1402 .bind(bn)
1403 .fetch_all(&mut *tx)
1404 .await
1405 .map_err(SqlColdError::from)?
1406 }
1407 SignetEventsSpecifier::BlockRange { start, end } => {
1408 let s = to_i64(start);
1409 let e = to_i64(end);
1410 sqlx::query(
1411 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1412 ORDER BY block_number, event_index",
1413 )
1414 .bind(s)
1415 .bind(e)
1416 .fetch_all(&mut *tx)
1417 .await
1418 .map_err(SqlColdError::from)?
1419 }
1420 };
1421 tx.commit().await.map_err(SqlColdError::from)?;
1422
1423 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1424 }
1425
1426 async fn get_zenith_header(
1427 &self,
1428 spec: ZenithHeaderSpecifier,
1429 ) -> ColdResult<Option<DbZenithHeader>> {
1430 let block = match spec {
1431 ZenithHeaderSpecifier::Number(n) => n,
1432 ZenithHeaderSpecifier::Range { start, .. } => start,
1433 };
1434 let bn = to_i64(block);
1435 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1436 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1437 .bind(bn)
1438 .fetch_optional(&mut *tx)
1439 .await
1440 .map_err(SqlColdError::from)?;
1441 tx.commit().await.map_err(SqlColdError::from)?;
1442
1443 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1444 }
1445
1446 async fn get_zenith_headers(
1447 &self,
1448 spec: ZenithHeaderSpecifier,
1449 ) -> ColdResult<Vec<DbZenithHeader>> {
1450 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1451 let rows = match spec {
1452 ZenithHeaderSpecifier::Number(n) => {
1453 let bn = to_i64(n);
1454 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1455 .bind(bn)
1456 .fetch_all(&mut *tx)
1457 .await
1458 .map_err(SqlColdError::from)?
1459 }
1460 ZenithHeaderSpecifier::Range { start, end } => {
1461 let s = to_i64(start);
1462 let e = to_i64(end);
1463 sqlx::query(
1464 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1465 ORDER BY block_number",
1466 )
1467 .bind(s)
1468 .bind(e)
1469 .fetch_all(&mut *tx)
1470 .await
1471 .map_err(SqlColdError::from)?
1472 }
1473 };
1474 tx.commit().await.map_err(SqlColdError::from)?;
1475
1476 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1477 }
1478
1479 async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1480 let from = filter.get_from_block().unwrap_or(0);
1481 let to = filter.get_to_block().unwrap_or(u64::MAX);
1482
1483 let (filter_clause, params) = build_log_filter_clause(filter, 3);
1487 let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1488
1489 let limit_idx = 3 + params.len() as u32;
1494 let data_sql = format!(
1495 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1496 (r.first_log_index + l.log_index) AS block_log_index \
1497 FROM logs l \
1498 JOIN headers h ON l.block_number = h.block_number \
1499 JOIN transactions t ON l.block_number = t.block_number \
1500 AND l.tx_index = t.tx_index \
1501 JOIN receipts r ON l.block_number = r.block_number \
1502 AND l.tx_index = r.tx_index \
1503 WHERE {where_clause} \
1504 ORDER BY l.block_number, l.tx_index, l.log_index \
1505 LIMIT ${limit_idx}"
1506 );
1507 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1508 for param in ¶ms {
1509 query = query.bind(*param);
1510 }
1511 let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1514 query = query.bind(to_i64(limit as u64));
1515
1516 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1517 let rows = query.fetch_all(&mut *tx).await.map_err(SqlColdError::from)?;
1518 tx.commit().await.map_err(SqlColdError::from)?;
1519
1520 if rows.len() > max_logs {
1521 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1522 }
1523
1524 rows.into_iter()
1525 .map(|r| {
1526 let log = log_from_row(&r);
1527 Ok(RpcLog {
1528 inner: log,
1529 block_hash: Some(r.get(COL_BLOCK_HASH)),
1530 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1531 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1532 transaction_hash: Some(r.get(COL_TX_HASH)),
1533 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1534 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1535 removed: false,
1536 })
1537 })
1538 .collect::<ColdResult<Vec<_>>>()
1539 }
1540
1541 async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1542 #[cfg(feature = "postgres")]
1543 if self.is_postgres {
1544 return self.produce_log_stream_pg(filter, params).await;
1545 }
1546 signet_cold::produce_log_stream_default(self, filter, params).await;
1547 }
1548
1549 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1550 let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1551 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1552 .fetch_one(&mut *tx)
1553 .await
1554 .map_err(SqlColdError::from)?;
1555 tx.commit().await.map_err(SqlColdError::from)?;
1556 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1557 }
1558}
1559
1560impl ColdStorageWrite for SqlColdBackend {
1561 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1562 self.insert_block(data).await.map_err(ColdStorageError::from)
1563 }
1564
1565 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1566 let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1567 for block_data in data {
1568 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1569 }
1570 tx.commit().await.map_err(SqlColdError::from)?;
1571 Ok(())
1572 }
1573
1574 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1575 let bn = to_i64(block);
1576 let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1577
1578 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1579
1580 tx.commit().await.map_err(SqlColdError::from)?;
1581 Ok(())
1582 }
1583}
1584
1585impl ColdStorageBackend for SqlColdBackend {
1586 fn read_timeout(&self) -> Option<Duration> {
1587 Some(self.read_timeout)
1588 }
1589
1590 fn write_timeout(&self) -> Option<Duration> {
1591 Some(self.write_timeout)
1592 }
1593
1594 async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1595 let bn = to_i64(block);
1596 let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1597
1598 let header_rows =
1600 sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1601 .bind(bn)
1602 .fetch_all(&mut *tx)
1603 .await
1604 .map_err(SqlColdError::from)?;
1605
1606 if header_rows.is_empty() {
1607 tx.commit().await.map_err(SqlColdError::from)?;
1608 return Ok(Vec::new());
1609 }
1610
1611 let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1612 for r in &header_rows {
1613 let num: i64 = r.get(COL_BLOCK_NUMBER);
1614 let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1615 headers.insert(num, h);
1616 }
1617
1618 let receipt_rows = sqlx::query(
1620 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1621 r.cumulative_gas_used, r.first_log_index, \
1622 t.tx_hash, t.from_address \
1623 FROM receipts r \
1624 JOIN transactions t ON r.block_number = t.block_number \
1625 AND r.tx_index = t.tx_index \
1626 WHERE r.block_number > $1 \
1627 ORDER BY r.block_number, r.tx_index",
1628 )
1629 .bind(bn)
1630 .fetch_all(&mut *tx)
1631 .await
1632 .map_err(SqlColdError::from)?;
1633
1634 let log_rows = sqlx::query(
1636 "SELECT * FROM logs WHERE block_number > $1 \
1637 ORDER BY block_number, tx_index, log_index",
1638 )
1639 .bind(bn)
1640 .fetch_all(&mut *tx)
1641 .await
1642 .map_err(SqlColdError::from)?;
1643
1644 let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1646 for r in &log_rows {
1647 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1648 let tx_idx: i64 = r.get(COL_TX_INDEX);
1649 logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1650 }
1651
1652 let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1654 for r in &receipt_rows {
1655 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1656 receipts_by_block.entry(block_num).or_default().push(r);
1657 }
1658
1659 let mut all_receipts = Vec::with_capacity(headers.len());
1661 for (&block_num, header) in &headers {
1662 let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1663 let mut prior_cumulative_gas = 0u64;
1664 let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1665 .into_iter()
1666 .map(|rr: &sqlx::any::AnyRow| {
1667 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1668 let tx_hash = rr.get(COL_TX_HASH);
1669 let sender = rr.get(COL_FROM_ADDRESS);
1670 let tx_type: i32 = rr.get(COL_TX_TYPE);
1671 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1672 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1673 let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1674 let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1675 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1676 .map_err(ColdStorageError::from)?;
1677 debug_assert!(
1680 receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1681 "cumulative gas decreased: {} < {}",
1682 receipt.inner.cumulative_gas_used,
1683 prior_cumulative_gas,
1684 );
1685 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1686 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1687 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1688 Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1689 })
1690 .collect();
1691 all_receipts.push(block_receipts?);
1692 }
1693
1694 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1696
1697 tx.commit().await.map_err(SqlColdError::from)?;
1698 Ok(all_receipts)
1699 }
1700}
1701
1702#[cfg(all(test, feature = "test-utils"))]
1703mod tests {
1704 use super::*;
1705 use signet_cold::conformance::conformance;
1706
1707 #[tokio::test]
1708 async fn sqlite_conformance() {
1709 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1710 conformance(backend).await.unwrap();
1711 }
1712
1713 #[tokio::test]
1714 async fn pg_conformance() {
1715 let Ok(url) = std::env::var("DATABASE_URL") else {
1716 eprintln!("skipping pg conformance: DATABASE_URL not set");
1717 return;
1718 };
1719 let backend = SqlColdBackend::connect(&url).await.unwrap();
1720 conformance(backend).await.unwrap();
1721 }
1722}