1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14 COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15 COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16 COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17 COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18 COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19 COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20 COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21 COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22 COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26 decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27 encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30 consensus::{
31 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32 transaction::Recovered,
33 },
34 primitives::{
35 Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36 },
37};
38use signet_cold::{
39 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40 ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41 SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45 TransactionSigned,
46};
47use signet_zenith::{
48 Passage::{Enter, EnterToken},
49 Transactor::Transact,
50 Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76 pool: AnyPool,
77 is_postgres: bool,
78}
79
80fn is_in_memory_sqlite(url: &str) -> bool {
86 url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89async fn delete_above_in_tx(
92 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93 bn: i64,
94) -> Result<(), SqlColdError> {
95 for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96 {
97 sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98 .bind(bn)
99 .execute(&mut **tx)
100 .await?;
101 }
102 Ok(())
103}
104
105impl SqlColdBackend {
106 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113 let conn = pool.acquire().await?;
115 let backend = conn.backend_name();
116
117 let (migration, is_postgres) = match backend {
118 "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
119 "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
120 other => {
121 return Err(SqlColdError::Convert(format!(
122 "unsupported database backend: {other}"
123 )));
124 }
125 };
126 drop(conn);
127 sqlx::raw_sql(migration).execute(&pool).await?;
130 sqlx::raw_sql(include_str!("../migrations/002_add_topic_indexes.sql"))
131 .execute(&pool)
132 .await?;
133 Ok(Self { pool, is_postgres })
134 }
135
136 pub async fn connect_with(
145 url: &str,
146 pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
147 ) -> Result<Self, SqlColdError> {
148 sqlx::any::install_default_drivers();
149 let pool_opts =
150 if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
151 let pool: AnyPool = pool_opts.connect(url).await?;
152 Self::new(pool).await
153 }
154
155 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
159 Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
160 }
161
162 async fn resolve_header_spec(
167 &self,
168 spec: HeaderSpecifier,
169 ) -> Result<Option<BlockNumber>, SqlColdError> {
170 match spec {
171 HeaderSpecifier::Number(n) => Ok(Some(n)),
172 HeaderSpecifier::Hash(hash) => {
173 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
174 .bind(hash)
175 .fetch_optional(&self.pool)
176 .await?;
177 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
178 }
179 }
180 }
181
182 async fn fetch_header_by_number(
187 &self,
188 block_num: BlockNumber,
189 ) -> Result<Option<SealedHeader>, SqlColdError> {
190 let bn = to_i64(block_num);
191 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
192 .bind(bn)
193 .fetch_optional(&self.pool)
194 .await?;
195
196 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
197 }
198
199 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
204 let mut tx = self.pool.begin().await?;
205 write_block_to_tx(&mut tx, data).await?;
206 tx.commit().await?;
207 Ok(())
208 }
209
210 #[cfg(feature = "postgres")]
220 async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
221 use tokio_stream::StreamExt;
222
223 macro_rules! try_stream {
225 ($sender:expr, $expr:expr) => {
226 match $expr {
227 Ok(v) => v,
228 Err(e) => {
229 let _ = $sender
230 .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
231 .await;
232 return;
233 }
234 }
235 };
236 }
237
238 let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
239
240 let mut tx = try_stream!(sender, self.pool.begin().await);
244 try_stream!(
245 sender,
246 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
247 );
248
249 let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
257 let data_sql = format!(
258 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
259 (r.first_log_index + l.log_index) AS block_log_index \
260 FROM logs l \
261 JOIN headers h ON l.block_number = h.block_number \
262 JOIN transactions t ON l.block_number = t.block_number \
263 AND l.tx_index = t.tx_index \
264 JOIN receipts r ON l.block_number = r.block_number \
265 AND l.tx_index = r.tx_index \
266 WHERE l.block_number = $1{filter_clause} \
267 ORDER BY l.tx_index, l.log_index"
268 );
269
270 let mut total = 0usize;
271
272 for block_num in from..=to {
275 if tokio::time::Instant::now() > deadline {
278 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
279 return;
280 }
281
282 let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
283 for param in &filter_params {
284 query = query.bind(*param);
285 }
286
287 let mut stream = query.fetch(&mut *tx);
291 while let Some(row_result) = stream.next().await {
292 let r = try_stream!(sender, row_result);
293
294 total += 1;
296 if total > max_logs {
297 let _ =
298 sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
299 return;
300 }
301
302 let log = log_from_row(&r);
303 let rpc_log = RpcLog {
304 inner: log,
305 block_hash: Some(r.get(COL_BLOCK_HASH)),
306 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
307 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
308 transaction_hash: Some(r.get(COL_TX_HASH)),
309 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
310 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
311 removed: false,
312 };
313 match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
316 Ok(Ok(())) => {}
317 Ok(Err(_)) => return, Err(_) => {
319 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
320 return;
321 }
322 }
323 }
324
325 if total >= max_logs {
328 return;
329 }
330 }
331 }
332}
333
334fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
340 r.get(col)
341}
342
343fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
345 r.get(col)
346}
347
348fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
350 Ok(Header {
351 parent_hash: r.get(COL_PARENT_HASH),
352 ommers_hash: r.get(COL_OMMERS_HASH),
353 beneficiary: r.get(COL_BENEFICIARY),
354 state_root: r.get(COL_STATE_ROOT),
355 transactions_root: r.get(COL_TRANSACTIONS_ROOT),
356 receipts_root: r.get(COL_RECEIPTS_ROOT),
357 logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
358 difficulty: r.get(COL_DIFFICULTY),
359 number: from_i64(r.get(COL_BLOCK_NUMBER)),
360 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
361 gas_used: from_i64(r.get(COL_GAS_USED)),
362 timestamp: from_i64(r.get(COL_TIMESTAMP)),
363 extra_data: r.get(COL_EXTRA_DATA),
364 mix_hash: r.get(COL_MIX_HASH),
365 nonce: r.get(COL_NONCE),
366 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
367 withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
368 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
369 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
370 parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
371 requests_hash: r.get(COL_REQUESTS_HASH),
372 })
373}
374
375fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
377 use alloy::consensus::EthereumTxEnvelope;
378
379 let sig =
380 Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
381
382 let tx_type_raw: i32 = r.get(COL_TX_TYPE);
383 let tx_type_u8: u8 = tx_type_raw
384 .try_into()
385 .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
386 let tx_type = TxType::try_from(tx_type_u8)
387 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
388
389 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
390 let nonce = from_i64(r.get(COL_NONCE));
391 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
392 let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
393 let value: U256 = r.get(COL_VALUE);
394 let input: Bytes = r.get(COL_INPUT);
395
396 match tx_type {
397 TxType::Legacy => {
398 let tx = TxLegacy {
399 chain_id: chain_id.map(from_i64),
400 nonce,
401 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
402 gas_limit,
403 to: to_addr.map_or(TxKind::Create, TxKind::Call),
404 value,
405 input,
406 };
407 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
408 }
409 TxType::Eip2930 => {
410 let tx = TxEip2930 {
411 chain_id: from_i64(
412 chain_id
413 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
414 ),
415 nonce,
416 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
417 gas_limit,
418 to: to_addr.map_or(TxKind::Create, TxKind::Call),
419 value,
420 input,
421 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
422 };
423 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
424 }
425 TxType::Eip1559 => {
426 let tx = TxEip1559 {
427 chain_id: from_i64(
428 chain_id
429 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
430 ),
431 nonce,
432 gas_limit,
433 max_fee_per_gas: decode_u128_required(
434 opt_blob(r, COL_MAX_FEE_PER_GAS),
435 COL_MAX_FEE_PER_GAS,
436 )?,
437 max_priority_fee_per_gas: decode_u128_required(
438 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
439 COL_MAX_PRIORITY_FEE_PER_GAS,
440 )?,
441 to: to_addr.map_or(TxKind::Create, TxKind::Call),
442 value,
443 input,
444 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
445 };
446 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
447 }
448 TxType::Eip4844 => {
449 let tx = TxEip4844 {
450 chain_id: from_i64(
451 chain_id
452 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
453 ),
454 nonce,
455 gas_limit,
456 max_fee_per_gas: decode_u128_required(
457 opt_blob(r, COL_MAX_FEE_PER_GAS),
458 COL_MAX_FEE_PER_GAS,
459 )?,
460 max_priority_fee_per_gas: decode_u128_required(
461 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
462 COL_MAX_PRIORITY_FEE_PER_GAS,
463 )?,
464 to: to_addr
465 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
466 value,
467 input,
468 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
469 blob_versioned_hashes: decode_b256_vec(
470 opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
471 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
472 })?,
473 )?,
474 max_fee_per_blob_gas: decode_u128_required(
475 opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
476 COL_MAX_FEE_PER_BLOB_GAS,
477 )?,
478 };
479 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
480 }
481 TxType::Eip7702 => {
482 let tx = TxEip7702 {
483 chain_id: from_i64(
484 chain_id
485 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
486 ),
487 nonce,
488 gas_limit,
489 max_fee_per_gas: decode_u128_required(
490 opt_blob(r, COL_MAX_FEE_PER_GAS),
491 COL_MAX_FEE_PER_GAS,
492 )?,
493 max_priority_fee_per_gas: decode_u128_required(
494 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
495 COL_MAX_PRIORITY_FEE_PER_GAS,
496 )?,
497 to: to_addr
498 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
499 value,
500 input,
501 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
502 authorization_list: decode_authorization_list(
503 opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
504 SqlColdError::Convert("EIP7702 requires authorization_list".into())
505 })?,
506 )?,
507 };
508 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
509 }
510 }
511}
512
513fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
515 let sender: Address = r.get(COL_FROM_ADDRESS);
516 let tx = tx_from_row(r)?;
517 Ok(Recovered::new_unchecked(tx, sender))
519}
520
521fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
523 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
524 .into_iter()
525 .filter_map(|col| r.get::<Option<B256>, _>(col))
526 .collect();
527 Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
528}
529
530fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
532 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
533 let order = from_i64(r.get(COL_ORDER_INDEX));
534 let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
535
536 match event_type {
537 EVENT_TRANSACT => {
538 let sender: Address = r
539 .get::<Option<Address>, _>(COL_SENDER)
540 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
541 let to: Address = r
542 .get::<Option<Address>, _>(COL_TO_ADDRESS)
543 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
544 let value: U256 = r
545 .get::<Option<U256>, _>(COL_VALUE)
546 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
547 let gas: U256 = r
548 .get::<Option<U256>, _>(COL_GAS)
549 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
550 let max_fee: U256 = r
551 .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
552 .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
553 let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
554
555 Ok(DbSignetEvent::Transact(
556 order,
557 Transact {
558 rollupChainId: rollup_chain_id,
559 sender,
560 to,
561 value,
562 gas,
563 maxFeePerGas: max_fee,
564 data,
565 },
566 ))
567 }
568 EVENT_ENTER => {
569 let recipient: Address = r
570 .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
571 .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
572 let amount: U256 = r
573 .get::<Option<U256>, _>(COL_AMOUNT)
574 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
575
576 Ok(DbSignetEvent::Enter(
577 order,
578 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
579 ))
580 }
581 EVENT_ENTER_TOKEN => {
582 let token: Address = r
583 .get::<Option<Address>, _>(COL_TOKEN)
584 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
585 let recipient: Address =
586 r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
587 SqlColdError::Convert("EnterToken requires rollup_recipient".into())
588 })?;
589 let amount: U256 = r
590 .get::<Option<U256>, _>(COL_AMOUNT)
591 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
592
593 Ok(DbSignetEvent::EnterToken(
594 order,
595 EnterToken {
596 rollupChainId: rollup_chain_id,
597 token,
598 rollupRecipient: recipient,
599 amount,
600 },
601 ))
602 }
603 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
604 }
605}
606
607fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
609 Ok(DbZenithHeader(Zenith::BlockHeader {
610 hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
611 rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
612 gasLimit: r.get(COL_GAS_LIMIT),
613 rewardAddress: r.get(COL_REWARD_ADDRESS),
614 blockDataHash: r.get(COL_BLOCK_DATA_HASH),
615 }))
616}
617
618async fn write_block_to_tx(
624 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
625 data: BlockData,
626) -> Result<(), SqlColdError> {
627 let bn = to_i64(data.block_number());
628
629 let block_hash = data.header.hash_slow();
631 let difficulty = &data.header.difficulty;
632 sqlx::query(
633 "INSERT INTO headers (
634 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
635 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
636 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
637 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
638 parent_beacon_block_root, requests_hash
639 ) VALUES (
640 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
641 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
642 )",
643 )
644 .bind(bn)
645 .bind(block_hash)
646 .bind(data.header.parent_hash)
647 .bind(data.header.ommers_hash)
648 .bind(data.header.beneficiary)
649 .bind(data.header.state_root)
650 .bind(data.header.transactions_root)
651 .bind(data.header.receipts_root)
652 .bind(data.header.logs_bloom)
653 .bind(difficulty)
654 .bind(to_i64(data.header.gas_limit))
655 .bind(to_i64(data.header.gas_used))
656 .bind(to_i64(data.header.timestamp))
657 .bind(&data.header.extra_data)
658 .bind(data.header.mix_hash)
659 .bind(data.header.nonce)
660 .bind(data.header.base_fee_per_gas.map(to_i64))
661 .bind(data.header.withdrawals_root.as_ref())
662 .bind(data.header.blob_gas_used.map(to_i64))
663 .bind(data.header.excess_blob_gas.map(to_i64))
664 .bind(data.header.parent_beacon_block_root.as_ref())
665 .bind(data.header.requests_hash.as_ref())
666 .execute(&mut **tx)
667 .await?;
668
669 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
671 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
672 }
673
674 let mut first_log_index = 0i64;
677 for (idx, receipt) in data.receipts.iter().enumerate() {
678 let tx_idx = to_i64(idx as u64);
679 sqlx::query(
680 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
681 VALUES ($1, $2, $3, $4, $5, $6)",
682 )
683 .bind(bn)
684 .bind(tx_idx)
685 .bind(receipt.tx_type as i32)
686 .bind(receipt.inner.status.coerce_status() as i32)
687 .bind(to_i64(receipt.inner.cumulative_gas_used))
688 .bind(first_log_index)
689 .execute(&mut **tx)
690 .await?;
691 first_log_index += receipt.inner.logs.len() as i64;
692
693 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
694 let topics = log.topics();
695 sqlx::query(
696 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
697 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
698 )
699 .bind(bn)
700 .bind(tx_idx)
701 .bind(to_i64(log_idx as u64))
702 .bind(log.address)
703 .bind(topics.first())
704 .bind(topics.get(1))
705 .bind(topics.get(2))
706 .bind(topics.get(3))
707 .bind(&log.data.data)
708 .execute(&mut **tx)
709 .await?;
710 }
711 }
712
713 for (idx, event) in data.signet_events.iter().enumerate() {
715 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
716 }
717
718 if let Some(zh) = &data.zenith_header {
720 let h = &zh.0;
721 sqlx::query(
722 "INSERT INTO zenith_headers (
723 block_number, host_block_number, rollup_chain_id,
724 gas_limit, reward_address, block_data_hash
725 ) VALUES ($1, $2, $3, $4, $5, $6)",
726 )
727 .bind(bn)
728 .bind(h.hostBlockNumber)
729 .bind(h.rollupChainId)
730 .bind(h.gasLimit)
731 .bind(h.rewardAddress)
732 .bind(h.blockDataHash)
733 .execute(&mut **tx)
734 .await?;
735 }
736
737 Ok(())
738}
739
740async fn insert_transaction(
742 conn: &mut sqlx::AnyConnection,
743 bn: i64,
744 tx_index: i64,
745 recovered: &RecoveredTx,
746) -> Result<(), SqlColdError> {
747 use alloy::consensus::EthereumTxEnvelope;
748
749 let sender = recovered.signer();
750 let tx: &TransactionSigned = recovered;
751 let tx_hash = tx.tx_hash();
752 let tx_type = tx.tx_type() as i32;
753
754 macro_rules! sig {
755 ($s:expr) => {{
756 let sig = $s.signature();
757 (sig.v() as i32, sig.r(), sig.s())
758 }};
759 }
760 let (sig_y, sig_r, sig_s) = match tx {
761 EthereumTxEnvelope::Legacy(s) => sig!(s),
762 EthereumTxEnvelope::Eip2930(s) => sig!(s),
763 EthereumTxEnvelope::Eip1559(s) => sig!(s),
764 EthereumTxEnvelope::Eip4844(s) => sig!(s),
765 EthereumTxEnvelope::Eip7702(s) => sig!(s),
766 };
767
768 let (chain_id, nonce, gas_limit) = match tx {
769 EthereumTxEnvelope::Legacy(s) => {
770 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771 }
772 EthereumTxEnvelope::Eip2930(s) => {
773 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774 }
775 EthereumTxEnvelope::Eip1559(s) => {
776 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777 }
778 EthereumTxEnvelope::Eip4844(s) => {
779 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
780 }
781 EthereumTxEnvelope::Eip7702(s) => {
782 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
783 }
784 };
785
786 let (value, to_addr) = match tx {
787 EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
788 EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
789 EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
790 EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
791 EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
792 };
793
794 let input: &[u8] = match tx {
795 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
796 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
797 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
798 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
799 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
800 };
801
802 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
803 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
804 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
805 EthereumTxEnvelope::Eip1559(s) => (
806 None,
807 Some(encode_u128(s.tx().max_fee_per_gas)),
808 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
809 None,
810 ),
811 EthereumTxEnvelope::Eip4844(s) => (
812 None,
813 Some(encode_u128(s.tx().max_fee_per_gas)),
814 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
815 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
816 ),
817 EthereumTxEnvelope::Eip7702(s) => (
818 None,
819 Some(encode_u128(s.tx().max_fee_per_gas)),
820 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
821 None,
822 ),
823 };
824
825 let (access_list, blob_hashes, auth_list) = match tx {
826 EthereumTxEnvelope::Legacy(_) => (None, None, None),
827 EthereumTxEnvelope::Eip2930(s) => {
828 (Some(encode_access_list(&s.tx().access_list)), None, None)
829 }
830 EthereumTxEnvelope::Eip1559(s) => {
831 (Some(encode_access_list(&s.tx().access_list)), None, None)
832 }
833 EthereumTxEnvelope::Eip4844(s) => (
834 Some(encode_access_list(&s.tx().access_list)),
835 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
836 None,
837 ),
838 EthereumTxEnvelope::Eip7702(s) => (
839 Some(encode_access_list(&s.tx().access_list)),
840 None,
841 Some(encode_authorization_list(&s.tx().authorization_list)),
842 ),
843 };
844
845 sqlx::query(
846 "INSERT INTO transactions (
847 block_number, tx_index, tx_hash, tx_type,
848 sig_y_parity, sig_r, sig_s,
849 chain_id, nonce, gas_limit, to_address, value, input,
850 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
851 max_fee_per_blob_gas, blob_versioned_hashes,
852 access_list, authorization_list, from_address
853 ) VALUES (
854 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
855 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
856 )",
857 )
858 .bind(bn)
859 .bind(tx_index)
860 .bind(tx_hash)
861 .bind(tx_type)
862 .bind(sig_y)
863 .bind(sig_r)
864 .bind(sig_s)
865 .bind(chain_id)
866 .bind(nonce)
867 .bind(gas_limit)
868 .bind(to_addr)
869 .bind(value)
870 .bind(input)
871 .bind(gas_price.as_ref().map(|v| v.as_slice()))
872 .bind(max_fee.as_ref().map(|v| v.as_slice()))
873 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
874 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
875 .bind(blob_hashes.as_deref())
876 .bind(access_list.as_deref())
877 .bind(auth_list.as_deref())
878 .bind(sender)
879 .execute(&mut *conn)
880 .await?;
881
882 Ok(())
883}
884
885async fn insert_signet_event(
887 conn: &mut sqlx::AnyConnection,
888 block_number: i64,
889 event_index: i64,
890 event: &DbSignetEvent,
891) -> Result<(), SqlColdError> {
892 let (event_type, order, chain_id) = match event {
893 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
894 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
895 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
896 };
897
898 let (value, gas, max_fee, amount) = match event {
899 DbSignetEvent::Transact(_, t) => {
900 (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
901 }
902 DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
903 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
904 };
905
906 sqlx::query(
907 "INSERT INTO signet_events (
908 block_number, event_index, event_type, order_index,
909 rollup_chain_id, sender, to_address, value, gas,
910 max_fee_per_gas, data, rollup_recipient, amount, token
911 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
912 )
913 .bind(block_number)
914 .bind(event_index)
915 .bind(event_type)
916 .bind(order)
917 .bind(chain_id)
918 .bind(match event {
919 DbSignetEvent::Transact(_, t) => Some(&t.sender),
920 _ => None,
921 })
922 .bind(match event {
923 DbSignetEvent::Transact(_, t) => Some(&t.to),
924 _ => None,
925 })
926 .bind(value)
927 .bind(gas)
928 .bind(max_fee)
929 .bind(match event {
930 DbSignetEvent::Transact(_, t) => Some(&t.data),
931 _ => None,
932 })
933 .bind(match event {
934 DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
935 DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
936 _ => None,
937 })
938 .bind(amount)
939 .bind(match event {
940 DbSignetEvent::EnterToken(_, e) => Some(&e.token),
941 _ => None,
942 })
943 .execute(&mut *conn)
944 .await?;
945
946 Ok(())
947}
948
949fn append_filter_clause<'a>(
959 clause: &mut String,
960 params: &mut Vec<&'a [u8]>,
961 mut idx: u32,
962 column: &str,
963 values: impl ExactSizeIterator<Item = &'a [u8]>,
964) -> u32 {
965 use std::fmt::Write;
966
967 let len = values.len();
968 if len == 1 {
969 write!(clause, " AND {column} = ${idx}").unwrap();
970 values.for_each(|v| params.push(v));
971 return idx + 1;
972 }
973 write!(clause, " AND {column} IN (").unwrap();
974 for (i, v) in values.enumerate() {
975 if i > 0 {
976 clause.push_str(", ");
977 }
978 write!(clause, "${idx}").unwrap();
979 params.push(v);
980 idx += 1;
981 }
982 clause.push(')');
983 idx
984}
985
986fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
987 let mut clause = String::new();
988 let mut params: Vec<&[u8]> = Vec::new();
989 let mut idx = start_idx;
990
991 if !filter.address.is_empty() {
992 idx = append_filter_clause(
993 &mut clause,
994 &mut params,
995 idx,
996 "l.address",
997 filter.address.iter().map(|a| a.as_slice()),
998 );
999 }
1000
1001 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1002 for (i, topic_filter) in filter.topics.iter().enumerate() {
1003 if topic_filter.is_empty() {
1004 continue;
1005 }
1006 idx = append_filter_clause(
1007 &mut clause,
1008 &mut params,
1009 idx,
1010 topic_cols[i],
1011 topic_filter.iter().map(|v| v.as_slice()),
1012 );
1013 }
1014
1015 (clause, params)
1016}
1017
1018impl ColdStorageRead for SqlColdBackend {
1023 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1024 let Some(block_num) = self.resolve_header_spec(spec).await? else {
1025 return Ok(None);
1026 };
1027 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1028 }
1029
1030 async fn get_headers(
1031 &self,
1032 specs: Vec<HeaderSpecifier>,
1033 ) -> ColdResult<Vec<Option<SealedHeader>>> {
1034 let mut results = Vec::with_capacity(specs.len());
1035 for spec in specs {
1036 let header = self.get_header(spec).await?;
1037 results.push(header);
1038 }
1039 Ok(results)
1040 }
1041
1042 async fn get_transaction(
1043 &self,
1044 spec: TransactionSpecifier,
1045 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1046 let row = match spec {
1047 TransactionSpecifier::Hash(hash) => sqlx::query(
1048 "SELECT t.*, h.block_hash
1049 FROM transactions t
1050 JOIN headers h ON t.block_number = h.block_number
1051 WHERE t.tx_hash = $1",
1052 )
1053 .bind(hash)
1054 .fetch_optional(&self.pool)
1055 .await
1056 .map_err(SqlColdError::from)?,
1057 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1058 "SELECT t.*, h.block_hash
1059 FROM transactions t
1060 JOIN headers h ON t.block_number = h.block_number
1061 WHERE t.block_number = $1 AND t.tx_index = $2",
1062 )
1063 .bind(to_i64(block))
1064 .bind(to_i64(index))
1065 .fetch_optional(&self.pool)
1066 .await
1067 .map_err(SqlColdError::from)?,
1068 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1069 "SELECT t.*, h.block_hash
1070 FROM transactions t
1071 JOIN headers h ON t.block_number = h.block_number
1072 WHERE h.block_hash = $1 AND t.tx_index = $2",
1073 )
1074 .bind(block_hash)
1075 .bind(to_i64(index))
1076 .fetch_optional(&self.pool)
1077 .await
1078 .map_err(SqlColdError::from)?,
1079 };
1080
1081 let Some(r) = row else {
1082 return Ok(None);
1083 };
1084
1085 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1086 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1087 let block_hash = r.get(COL_BLOCK_HASH);
1088 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1089 let meta = ConfirmationMeta::new(block, block_hash, index);
1090 Ok(Some(Confirmed::new(recovered, meta)))
1091 }
1092
1093 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1094 let bn = to_i64(block);
1095 let rows =
1096 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1097 .bind(bn)
1098 .fetch_all(&self.pool)
1099 .await
1100 .map_err(SqlColdError::from)?;
1101
1102 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1103 }
1104
1105 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1106 let bn = to_i64(block);
1107 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1108 .bind(bn)
1109 .fetch_one(&self.pool)
1110 .await
1111 .map_err(SqlColdError::from)?;
1112
1113 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1114 }
1115
1116 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1117 let (block, index) = match spec {
1119 ReceiptSpecifier::TxHash(hash) => {
1120 let row = sqlx::query(
1121 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1122 )
1123 .bind(hash)
1124 .fetch_optional(&self.pool)
1125 .await
1126 .map_err(SqlColdError::from)?;
1127 let Some(r) = row else { return Ok(None) };
1128 (
1129 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1130 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1131 )
1132 }
1133 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1134 };
1135
1136 let combined = sqlx::query(
1140 "SELECT h.*, \
1141 r.tx_type AS r_tx_type, r.success AS r_success, \
1142 r.cumulative_gas_used AS r_cumulative_gas_used, \
1143 r.first_log_index AS r_first_log_index, \
1144 t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1145 COALESCE( \
1146 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1147 FROM receipts r2 \
1148 WHERE r2.block_number = r.block_number \
1149 AND r2.tx_index < r.tx_index), \
1150 0 \
1151 ) AS prior_gas \
1152 FROM receipts r \
1153 JOIN transactions t ON r.block_number = t.block_number \
1154 AND r.tx_index = t.tx_index \
1155 JOIN headers h ON r.block_number = h.block_number \
1156 WHERE r.block_number = $1 AND r.tx_index = $2",
1157 )
1158 .bind(to_i64(block))
1159 .bind(to_i64(index))
1160 .fetch_optional(&self.pool)
1161 .await
1162 .map_err(SqlColdError::from)?;
1163
1164 let Some(rr) = combined else {
1165 return Ok(None);
1166 };
1167
1168 let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1170
1171 let tx_hash = rr.get(COL_R_TX_HASH);
1173 let sender = rr.get(COL_R_FROM_ADDRESS);
1174 let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1175 let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1176 let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1177 let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1178 let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1179
1180 let log_rows = sqlx::query(
1182 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1183 )
1184 .bind(to_i64(block))
1185 .bind(to_i64(index))
1186 .fetch_all(&self.pool)
1187 .await
1188 .map_err(SqlColdError::from)?;
1189
1190 let logs = log_rows.iter().map(log_from_row).collect();
1191 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1192 .map_err(ColdStorageError::from)?;
1193 debug_assert!(
1196 built.inner.cumulative_gas_used >= prior_cumulative_gas,
1197 "cumulative gas decreased: {} < {}",
1198 built.inner.cumulative_gas_used,
1199 prior_cumulative_gas,
1200 );
1201 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1202
1203 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1204 Ok(Some(ColdReceipt::new(ir, &header, index)))
1205 }
1206
1207 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1208 let Some(header) =
1209 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1210 else {
1211 return Ok(Vec::new());
1212 };
1213
1214 let bn = to_i64(block);
1215
1216 let receipt_rows = sqlx::query(
1218 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1219 r.cumulative_gas_used, r.first_log_index, \
1220 t.tx_hash, t.from_address \
1221 FROM receipts r \
1222 JOIN transactions t ON r.block_number = t.block_number \
1223 AND r.tx_index = t.tx_index \
1224 WHERE r.block_number = $1 \
1225 ORDER BY r.tx_index",
1226 )
1227 .bind(bn)
1228 .fetch_all(&self.pool)
1229 .await
1230 .map_err(SqlColdError::from)?;
1231
1232 let all_log_rows =
1233 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1234 .bind(bn)
1235 .fetch_all(&self.pool)
1236 .await
1237 .map_err(SqlColdError::from)?;
1238
1239 let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1241 for r in &all_log_rows {
1242 let tx_idx: i64 = r.get(COL_TX_INDEX);
1243 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1244 }
1245
1246 let mut first_log_index = 0u64;
1247 let mut prior_cumulative_gas = 0u64;
1248 receipt_rows
1249 .into_iter()
1250 .enumerate()
1251 .map(|(idx, rr)| {
1252 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1253 let tx_hash = rr.get(COL_TX_HASH);
1254 let sender = rr.get(COL_FROM_ADDRESS);
1255 let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1256 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1257 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1258 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1259 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1260 .map_err(ColdStorageError::from)?;
1261 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1262 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1263 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1264 first_log_index += ir.receipt.inner.logs.len() as u64;
1265 Ok(ColdReceipt::new(ir, &header, idx as u64))
1266 })
1267 .collect()
1268 }
1269
1270 async fn get_signet_events(
1271 &self,
1272 spec: SignetEventsSpecifier,
1273 ) -> ColdResult<Vec<DbSignetEvent>> {
1274 let rows = match spec {
1275 SignetEventsSpecifier::Block(block) => {
1276 let bn = to_i64(block);
1277 sqlx::query(
1278 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1279 )
1280 .bind(bn)
1281 .fetch_all(&self.pool)
1282 .await
1283 .map_err(SqlColdError::from)?
1284 }
1285 SignetEventsSpecifier::BlockRange { start, end } => {
1286 let s = to_i64(start);
1287 let e = to_i64(end);
1288 sqlx::query(
1289 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1290 ORDER BY block_number, event_index",
1291 )
1292 .bind(s)
1293 .bind(e)
1294 .fetch_all(&self.pool)
1295 .await
1296 .map_err(SqlColdError::from)?
1297 }
1298 };
1299
1300 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1301 }
1302
1303 async fn get_zenith_header(
1304 &self,
1305 spec: ZenithHeaderSpecifier,
1306 ) -> ColdResult<Option<DbZenithHeader>> {
1307 let block = match spec {
1308 ZenithHeaderSpecifier::Number(n) => n,
1309 ZenithHeaderSpecifier::Range { start, .. } => start,
1310 };
1311 let bn = to_i64(block);
1312 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1313 .bind(bn)
1314 .fetch_optional(&self.pool)
1315 .await
1316 .map_err(SqlColdError::from)?;
1317
1318 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1319 }
1320
1321 async fn get_zenith_headers(
1322 &self,
1323 spec: ZenithHeaderSpecifier,
1324 ) -> ColdResult<Vec<DbZenithHeader>> {
1325 let rows = match spec {
1326 ZenithHeaderSpecifier::Number(n) => {
1327 let bn = to_i64(n);
1328 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1329 .bind(bn)
1330 .fetch_all(&self.pool)
1331 .await
1332 .map_err(SqlColdError::from)?
1333 }
1334 ZenithHeaderSpecifier::Range { start, end } => {
1335 let s = to_i64(start);
1336 let e = to_i64(end);
1337 sqlx::query(
1338 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1339 ORDER BY block_number",
1340 )
1341 .bind(s)
1342 .bind(e)
1343 .fetch_all(&self.pool)
1344 .await
1345 .map_err(SqlColdError::from)?
1346 }
1347 };
1348
1349 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1350 }
1351
1352 async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1353 let from = filter.get_from_block().unwrap_or(0);
1354 let to = filter.get_to_block().unwrap_or(u64::MAX);
1355
1356 let (filter_clause, params) = build_log_filter_clause(filter, 3);
1360 let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1361
1362 let limit_idx = 3 + params.len() as u32;
1367 let data_sql = format!(
1368 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1369 (r.first_log_index + l.log_index) AS block_log_index \
1370 FROM logs l \
1371 JOIN headers h ON l.block_number = h.block_number \
1372 JOIN transactions t ON l.block_number = t.block_number \
1373 AND l.tx_index = t.tx_index \
1374 JOIN receipts r ON l.block_number = r.block_number \
1375 AND l.tx_index = r.tx_index \
1376 WHERE {where_clause} \
1377 ORDER BY l.block_number, l.tx_index, l.log_index \
1378 LIMIT ${limit_idx}"
1379 );
1380 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1381 for param in ¶ms {
1382 query = query.bind(*param);
1383 }
1384 let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1387 query = query.bind(to_i64(limit as u64));
1388
1389 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1390
1391 if rows.len() > max_logs {
1392 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1393 }
1394
1395 rows.into_iter()
1396 .map(|r| {
1397 let log = log_from_row(&r);
1398 Ok(RpcLog {
1399 inner: log,
1400 block_hash: Some(r.get(COL_BLOCK_HASH)),
1401 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1402 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1403 transaction_hash: Some(r.get(COL_TX_HASH)),
1404 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1405 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1406 removed: false,
1407 })
1408 })
1409 .collect::<ColdResult<Vec<_>>>()
1410 }
1411
1412 async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1413 #[cfg(feature = "postgres")]
1414 if self.is_postgres {
1415 return self.produce_log_stream_pg(filter, params).await;
1416 }
1417 signet_cold::produce_log_stream_default(self, filter, params).await;
1418 }
1419
1420 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1421 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1422 .fetch_one(&self.pool)
1423 .await
1424 .map_err(SqlColdError::from)?;
1425 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1426 }
1427}
1428
1429impl ColdStorageWrite for SqlColdBackend {
1430 async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1431 self.insert_block(data).await.map_err(ColdStorageError::from)
1432 }
1433
1434 async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1435 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1436 for block_data in data {
1437 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1438 }
1439 tx.commit().await.map_err(SqlColdError::from)?;
1440 Ok(())
1441 }
1442
1443 async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1444 let bn = to_i64(block);
1445 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1446
1447 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1448
1449 tx.commit().await.map_err(SqlColdError::from)?;
1450 Ok(())
1451 }
1452}
1453
1454impl ColdStorage for SqlColdBackend {
1455 async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1456 let bn = to_i64(block);
1457 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1458
1459 let header_rows =
1461 sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1462 .bind(bn)
1463 .fetch_all(&mut *tx)
1464 .await
1465 .map_err(SqlColdError::from)?;
1466
1467 if header_rows.is_empty() {
1468 tx.commit().await.map_err(SqlColdError::from)?;
1469 return Ok(Vec::new());
1470 }
1471
1472 let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1473 for r in &header_rows {
1474 let num: i64 = r.get(COL_BLOCK_NUMBER);
1475 let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1476 headers.insert(num, h);
1477 }
1478
1479 let receipt_rows = sqlx::query(
1481 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1482 r.cumulative_gas_used, r.first_log_index, \
1483 t.tx_hash, t.from_address \
1484 FROM receipts r \
1485 JOIN transactions t ON r.block_number = t.block_number \
1486 AND r.tx_index = t.tx_index \
1487 WHERE r.block_number > $1 \
1488 ORDER BY r.block_number, r.tx_index",
1489 )
1490 .bind(bn)
1491 .fetch_all(&mut *tx)
1492 .await
1493 .map_err(SqlColdError::from)?;
1494
1495 let log_rows = sqlx::query(
1497 "SELECT * FROM logs WHERE block_number > $1 \
1498 ORDER BY block_number, tx_index, log_index",
1499 )
1500 .bind(bn)
1501 .fetch_all(&mut *tx)
1502 .await
1503 .map_err(SqlColdError::from)?;
1504
1505 let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1507 for r in &log_rows {
1508 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1509 let tx_idx: i64 = r.get(COL_TX_INDEX);
1510 logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1511 }
1512
1513 let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1515 for r in &receipt_rows {
1516 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1517 receipts_by_block.entry(block_num).or_default().push(r);
1518 }
1519
1520 let mut all_receipts = Vec::with_capacity(headers.len());
1522 for (&block_num, header) in &headers {
1523 let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1524 let mut prior_cumulative_gas = 0u64;
1525 let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1526 .into_iter()
1527 .map(|rr: &sqlx::any::AnyRow| {
1528 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1529 let tx_hash = rr.get(COL_TX_HASH);
1530 let sender = rr.get(COL_FROM_ADDRESS);
1531 let tx_type: i32 = rr.get(COL_TX_TYPE);
1532 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1533 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1534 let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1535 let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1536 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1537 .map_err(ColdStorageError::from)?;
1538 debug_assert!(
1541 receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1542 "cumulative gas decreased: {} < {}",
1543 receipt.inner.cumulative_gas_used,
1544 prior_cumulative_gas,
1545 );
1546 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1547 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1548 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1549 Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1550 })
1551 .collect();
1552 all_receipts.push(block_receipts?);
1553 }
1554
1555 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1557
1558 tx.commit().await.map_err(SqlColdError::from)?;
1559 Ok(all_receipts)
1560 }
1561}
1562
1563#[cfg(all(test, feature = "test-utils"))]
1564mod tests {
1565 use super::*;
1566 use signet_cold::conformance::conformance;
1567
1568 #[tokio::test]
1569 async fn sqlite_conformance() {
1570 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1571 conformance(backend).await.unwrap();
1572 }
1573
1574 #[tokio::test]
1575 async fn pg_conformance() {
1576 let Ok(url) = std::env::var("DATABASE_URL") else {
1577 eprintln!("skipping pg conformance: DATABASE_URL not set");
1578 return;
1579 };
1580 let backend = SqlColdBackend::connect(&url).await.unwrap();
1581 conformance(backend).await.unwrap();
1582 }
1583}