1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14 COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15 COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16 COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17 COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18 COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19 COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20 COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21 COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22 COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26 decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27 encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30 consensus::{
31 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32 transaction::Recovered,
33 },
34 primitives::{
35 Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36 },
37};
38use signet_cold::{
39 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40 ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41 SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45 TransactionSigned,
46};
47use signet_zenith::{
48 Passage::{Enter, EnterToken},
49 Transactor::Transact,
50 Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76 pool: AnyPool,
77 is_postgres: bool,
78}
79
80fn is_in_memory_sqlite(url: &str) -> bool {
86 url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89async fn delete_above_in_tx(
92 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93 bn: i64,
94) -> Result<(), SqlColdError> {
95 for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96 {
97 sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98 .bind(bn)
99 .execute(&mut **tx)
100 .await?;
101 }
102 Ok(())
103}
104
105impl SqlColdBackend {
106 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113 let conn = pool.acquire().await?;
115 let backend = conn.backend_name();
116
117 let (migration, is_postgres) = match backend {
118 "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
119 "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
120 other => {
121 return Err(SqlColdError::Convert(format!(
122 "unsupported database backend: {other}"
123 )));
124 }
125 };
126 drop(conn);
127 sqlx::raw_sql(migration).execute(&pool).await?;
130 Ok(Self { pool, is_postgres })
131 }
132
133 pub async fn connect_with(
142 url: &str,
143 pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
144 ) -> Result<Self, SqlColdError> {
145 sqlx::any::install_default_drivers();
146 let pool_opts =
147 if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
148 let pool: AnyPool = pool_opts.connect(url).await?;
149 Self::new(pool).await
150 }
151
152 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
156 Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
157 }
158
159 async fn resolve_header_spec(
164 &self,
165 spec: HeaderSpecifier,
166 ) -> Result<Option<BlockNumber>, SqlColdError> {
167 match spec {
168 HeaderSpecifier::Number(n) => Ok(Some(n)),
169 HeaderSpecifier::Hash(hash) => {
170 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
171 .bind(hash)
172 .fetch_optional(&self.pool)
173 .await?;
174 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
175 }
176 }
177 }
178
179 async fn fetch_header_by_number(
184 &self,
185 block_num: BlockNumber,
186 ) -> Result<Option<SealedHeader>, SqlColdError> {
187 let bn = to_i64(block_num);
188 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
189 .bind(bn)
190 .fetch_optional(&self.pool)
191 .await?;
192
193 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
194 }
195
196 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
201 let mut tx = self.pool.begin().await?;
202 write_block_to_tx(&mut tx, data).await?;
203 tx.commit().await?;
204 Ok(())
205 }
206
207 #[cfg(feature = "postgres")]
217 async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
218 use tokio_stream::StreamExt;
219
220 macro_rules! try_stream {
222 ($sender:expr, $expr:expr) => {
223 match $expr {
224 Ok(v) => v,
225 Err(e) => {
226 let _ = $sender
227 .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
228 .await;
229 return;
230 }
231 }
232 };
233 }
234
235 let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
236
237 let mut tx = try_stream!(sender, self.pool.begin().await);
241 try_stream!(
242 sender,
243 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
244 );
245
246 let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
254 let data_sql = format!(
255 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
256 (r.first_log_index + l.log_index) AS block_log_index \
257 FROM logs l \
258 JOIN headers h ON l.block_number = h.block_number \
259 JOIN transactions t ON l.block_number = t.block_number \
260 AND l.tx_index = t.tx_index \
261 JOIN receipts r ON l.block_number = r.block_number \
262 AND l.tx_index = r.tx_index \
263 WHERE l.block_number = $1{filter_clause} \
264 ORDER BY l.tx_index, l.log_index"
265 );
266
267 let mut total = 0usize;
268
269 for block_num in from..=to {
272 if tokio::time::Instant::now() > deadline {
275 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
276 return;
277 }
278
279 let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
280 for param in &filter_params {
281 query = query.bind(*param);
282 }
283
284 let mut stream = query.fetch(&mut *tx);
288 while let Some(row_result) = stream.next().await {
289 let r = try_stream!(sender, row_result);
290
291 total += 1;
293 if total > max_logs {
294 let _ =
295 sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
296 return;
297 }
298
299 let log = log_from_row(&r);
300 let rpc_log = RpcLog {
301 inner: log,
302 block_hash: Some(r.get(COL_BLOCK_HASH)),
303 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
304 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
305 transaction_hash: Some(r.get(COL_TX_HASH)),
306 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
307 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
308 removed: false,
309 };
310 match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
313 Ok(Ok(())) => {}
314 Ok(Err(_)) => return, Err(_) => {
316 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
317 return;
318 }
319 }
320 }
321
322 if total >= max_logs {
325 return;
326 }
327 }
328 }
329}
330
331fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
337 r.get(col)
338}
339
340fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
342 r.get(col)
343}
344
345fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
347 Ok(Header {
348 parent_hash: r.get(COL_PARENT_HASH),
349 ommers_hash: r.get(COL_OMMERS_HASH),
350 beneficiary: r.get(COL_BENEFICIARY),
351 state_root: r.get(COL_STATE_ROOT),
352 transactions_root: r.get(COL_TRANSACTIONS_ROOT),
353 receipts_root: r.get(COL_RECEIPTS_ROOT),
354 logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
355 difficulty: r.get(COL_DIFFICULTY),
356 number: from_i64(r.get(COL_BLOCK_NUMBER)),
357 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
358 gas_used: from_i64(r.get(COL_GAS_USED)),
359 timestamp: from_i64(r.get(COL_TIMESTAMP)),
360 extra_data: r.get(COL_EXTRA_DATA),
361 mix_hash: r.get(COL_MIX_HASH),
362 nonce: r.get(COL_NONCE),
363 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
364 withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
365 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
366 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
367 parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
368 requests_hash: r.get(COL_REQUESTS_HASH),
369 })
370}
371
372fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
374 use alloy::consensus::EthereumTxEnvelope;
375
376 let sig =
377 Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
378
379 let tx_type_raw: i32 = r.get(COL_TX_TYPE);
380 let tx_type_u8: u8 = tx_type_raw
381 .try_into()
382 .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
383 let tx_type = TxType::try_from(tx_type_u8)
384 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
385
386 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
387 let nonce = from_i64(r.get(COL_NONCE));
388 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
389 let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
390 let value: U256 = r.get(COL_VALUE);
391 let input: Bytes = r.get(COL_INPUT);
392
393 match tx_type {
394 TxType::Legacy => {
395 let tx = TxLegacy {
396 chain_id: chain_id.map(from_i64),
397 nonce,
398 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
399 gas_limit,
400 to: to_addr.map_or(TxKind::Create, TxKind::Call),
401 value,
402 input,
403 };
404 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
405 }
406 TxType::Eip2930 => {
407 let tx = TxEip2930 {
408 chain_id: from_i64(
409 chain_id
410 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
411 ),
412 nonce,
413 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
414 gas_limit,
415 to: to_addr.map_or(TxKind::Create, TxKind::Call),
416 value,
417 input,
418 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
419 };
420 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
421 }
422 TxType::Eip1559 => {
423 let tx = TxEip1559 {
424 chain_id: from_i64(
425 chain_id
426 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
427 ),
428 nonce,
429 gas_limit,
430 max_fee_per_gas: decode_u128_required(
431 opt_blob(r, COL_MAX_FEE_PER_GAS),
432 COL_MAX_FEE_PER_GAS,
433 )?,
434 max_priority_fee_per_gas: decode_u128_required(
435 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
436 COL_MAX_PRIORITY_FEE_PER_GAS,
437 )?,
438 to: to_addr.map_or(TxKind::Create, TxKind::Call),
439 value,
440 input,
441 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
442 };
443 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
444 }
445 TxType::Eip4844 => {
446 let tx = TxEip4844 {
447 chain_id: from_i64(
448 chain_id
449 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
450 ),
451 nonce,
452 gas_limit,
453 max_fee_per_gas: decode_u128_required(
454 opt_blob(r, COL_MAX_FEE_PER_GAS),
455 COL_MAX_FEE_PER_GAS,
456 )?,
457 max_priority_fee_per_gas: decode_u128_required(
458 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
459 COL_MAX_PRIORITY_FEE_PER_GAS,
460 )?,
461 to: to_addr
462 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
463 value,
464 input,
465 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
466 blob_versioned_hashes: decode_b256_vec(
467 opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
468 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
469 })?,
470 )?,
471 max_fee_per_blob_gas: decode_u128_required(
472 opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
473 COL_MAX_FEE_PER_BLOB_GAS,
474 )?,
475 };
476 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
477 }
478 TxType::Eip7702 => {
479 let tx = TxEip7702 {
480 chain_id: from_i64(
481 chain_id
482 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
483 ),
484 nonce,
485 gas_limit,
486 max_fee_per_gas: decode_u128_required(
487 opt_blob(r, COL_MAX_FEE_PER_GAS),
488 COL_MAX_FEE_PER_GAS,
489 )?,
490 max_priority_fee_per_gas: decode_u128_required(
491 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
492 COL_MAX_PRIORITY_FEE_PER_GAS,
493 )?,
494 to: to_addr
495 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
496 value,
497 input,
498 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
499 authorization_list: decode_authorization_list(
500 opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
501 SqlColdError::Convert("EIP7702 requires authorization_list".into())
502 })?,
503 )?,
504 };
505 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
506 }
507 }
508}
509
510fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
512 let sender: Address = r.get(COL_FROM_ADDRESS);
513 let tx = tx_from_row(r)?;
514 Ok(Recovered::new_unchecked(tx, sender))
516}
517
518fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
520 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
521 .into_iter()
522 .filter_map(|col| r.get::<Option<B256>, _>(col))
523 .collect();
524 Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
525}
526
527fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
529 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
530 let order = from_i64(r.get(COL_ORDER_INDEX));
531 let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
532
533 match event_type {
534 EVENT_TRANSACT => {
535 let sender: Address = r
536 .get::<Option<Address>, _>(COL_SENDER)
537 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
538 let to: Address = r
539 .get::<Option<Address>, _>(COL_TO_ADDRESS)
540 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
541 let value: U256 = r
542 .get::<Option<U256>, _>(COL_VALUE)
543 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
544 let gas: U256 = r
545 .get::<Option<U256>, _>(COL_GAS)
546 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
547 let max_fee: U256 = r
548 .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
549 .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
550 let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
551
552 Ok(DbSignetEvent::Transact(
553 order,
554 Transact {
555 rollupChainId: rollup_chain_id,
556 sender,
557 to,
558 value,
559 gas,
560 maxFeePerGas: max_fee,
561 data,
562 },
563 ))
564 }
565 EVENT_ENTER => {
566 let recipient: Address = r
567 .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
568 .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
569 let amount: U256 = r
570 .get::<Option<U256>, _>(COL_AMOUNT)
571 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
572
573 Ok(DbSignetEvent::Enter(
574 order,
575 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
576 ))
577 }
578 EVENT_ENTER_TOKEN => {
579 let token: Address = r
580 .get::<Option<Address>, _>(COL_TOKEN)
581 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
582 let recipient: Address =
583 r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
584 SqlColdError::Convert("EnterToken requires rollup_recipient".into())
585 })?;
586 let amount: U256 = r
587 .get::<Option<U256>, _>(COL_AMOUNT)
588 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
589
590 Ok(DbSignetEvent::EnterToken(
591 order,
592 EnterToken {
593 rollupChainId: rollup_chain_id,
594 token,
595 rollupRecipient: recipient,
596 amount,
597 },
598 ))
599 }
600 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
601 }
602}
603
604fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
606 Ok(DbZenithHeader(Zenith::BlockHeader {
607 hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
608 rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
609 gasLimit: r.get(COL_GAS_LIMIT),
610 rewardAddress: r.get(COL_REWARD_ADDRESS),
611 blockDataHash: r.get(COL_BLOCK_DATA_HASH),
612 }))
613}
614
615async fn write_block_to_tx(
621 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
622 data: BlockData,
623) -> Result<(), SqlColdError> {
624 let bn = to_i64(data.block_number());
625
626 let block_hash = data.header.hash_slow();
628 let difficulty = &data.header.difficulty;
629 sqlx::query(
630 "INSERT INTO headers (
631 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
632 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
633 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
634 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
635 parent_beacon_block_root, requests_hash
636 ) VALUES (
637 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
638 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
639 )",
640 )
641 .bind(bn)
642 .bind(block_hash)
643 .bind(data.header.parent_hash)
644 .bind(data.header.ommers_hash)
645 .bind(data.header.beneficiary)
646 .bind(data.header.state_root)
647 .bind(data.header.transactions_root)
648 .bind(data.header.receipts_root)
649 .bind(data.header.logs_bloom)
650 .bind(difficulty)
651 .bind(to_i64(data.header.gas_limit))
652 .bind(to_i64(data.header.gas_used))
653 .bind(to_i64(data.header.timestamp))
654 .bind(&data.header.extra_data)
655 .bind(data.header.mix_hash)
656 .bind(data.header.nonce)
657 .bind(data.header.base_fee_per_gas.map(to_i64))
658 .bind(data.header.withdrawals_root.as_ref())
659 .bind(data.header.blob_gas_used.map(to_i64))
660 .bind(data.header.excess_blob_gas.map(to_i64))
661 .bind(data.header.parent_beacon_block_root.as_ref())
662 .bind(data.header.requests_hash.as_ref())
663 .execute(&mut **tx)
664 .await?;
665
666 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
668 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
669 }
670
671 let mut first_log_index = 0i64;
674 for (idx, receipt) in data.receipts.iter().enumerate() {
675 let tx_idx = to_i64(idx as u64);
676 sqlx::query(
677 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
678 VALUES ($1, $2, $3, $4, $5, $6)",
679 )
680 .bind(bn)
681 .bind(tx_idx)
682 .bind(receipt.tx_type as i32)
683 .bind(receipt.inner.status.coerce_status() as i32)
684 .bind(to_i64(receipt.inner.cumulative_gas_used))
685 .bind(first_log_index)
686 .execute(&mut **tx)
687 .await?;
688 first_log_index += receipt.inner.logs.len() as i64;
689
690 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
691 let topics = log.topics();
692 sqlx::query(
693 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
694 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
695 )
696 .bind(bn)
697 .bind(tx_idx)
698 .bind(to_i64(log_idx as u64))
699 .bind(log.address)
700 .bind(topics.first())
701 .bind(topics.get(1))
702 .bind(topics.get(2))
703 .bind(topics.get(3))
704 .bind(&log.data.data)
705 .execute(&mut **tx)
706 .await?;
707 }
708 }
709
710 for (idx, event) in data.signet_events.iter().enumerate() {
712 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
713 }
714
715 if let Some(zh) = &data.zenith_header {
717 let h = &zh.0;
718 sqlx::query(
719 "INSERT INTO zenith_headers (
720 block_number, host_block_number, rollup_chain_id,
721 gas_limit, reward_address, block_data_hash
722 ) VALUES ($1, $2, $3, $4, $5, $6)",
723 )
724 .bind(bn)
725 .bind(h.hostBlockNumber)
726 .bind(h.rollupChainId)
727 .bind(h.gasLimit)
728 .bind(h.rewardAddress)
729 .bind(h.blockDataHash)
730 .execute(&mut **tx)
731 .await?;
732 }
733
734 Ok(())
735}
736
737async fn insert_transaction(
739 conn: &mut sqlx::AnyConnection,
740 bn: i64,
741 tx_index: i64,
742 recovered: &RecoveredTx,
743) -> Result<(), SqlColdError> {
744 use alloy::consensus::EthereumTxEnvelope;
745
746 let sender = recovered.signer();
747 let tx: &TransactionSigned = recovered;
748 let tx_hash = tx.tx_hash();
749 let tx_type = tx.tx_type() as i32;
750
751 macro_rules! sig {
752 ($s:expr) => {{
753 let sig = $s.signature();
754 (sig.v() as i32, sig.r(), sig.s())
755 }};
756 }
757 let (sig_y, sig_r, sig_s) = match tx {
758 EthereumTxEnvelope::Legacy(s) => sig!(s),
759 EthereumTxEnvelope::Eip2930(s) => sig!(s),
760 EthereumTxEnvelope::Eip1559(s) => sig!(s),
761 EthereumTxEnvelope::Eip4844(s) => sig!(s),
762 EthereumTxEnvelope::Eip7702(s) => sig!(s),
763 };
764
765 let (chain_id, nonce, gas_limit) = match tx {
766 EthereumTxEnvelope::Legacy(s) => {
767 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
768 }
769 EthereumTxEnvelope::Eip2930(s) => {
770 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771 }
772 EthereumTxEnvelope::Eip1559(s) => {
773 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774 }
775 EthereumTxEnvelope::Eip4844(s) => {
776 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777 }
778 EthereumTxEnvelope::Eip7702(s) => {
779 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
780 }
781 };
782
783 let (value, to_addr) = match tx {
784 EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
785 EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
786 EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
787 EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
788 EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
789 };
790
791 let input: &[u8] = match tx {
792 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
793 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
794 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
795 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
796 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
797 };
798
799 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
800 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
801 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
802 EthereumTxEnvelope::Eip1559(s) => (
803 None,
804 Some(encode_u128(s.tx().max_fee_per_gas)),
805 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
806 None,
807 ),
808 EthereumTxEnvelope::Eip4844(s) => (
809 None,
810 Some(encode_u128(s.tx().max_fee_per_gas)),
811 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
812 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
813 ),
814 EthereumTxEnvelope::Eip7702(s) => (
815 None,
816 Some(encode_u128(s.tx().max_fee_per_gas)),
817 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
818 None,
819 ),
820 };
821
822 let (access_list, blob_hashes, auth_list) = match tx {
823 EthereumTxEnvelope::Legacy(_) => (None, None, None),
824 EthereumTxEnvelope::Eip2930(s) => {
825 (Some(encode_access_list(&s.tx().access_list)), None, None)
826 }
827 EthereumTxEnvelope::Eip1559(s) => {
828 (Some(encode_access_list(&s.tx().access_list)), None, None)
829 }
830 EthereumTxEnvelope::Eip4844(s) => (
831 Some(encode_access_list(&s.tx().access_list)),
832 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
833 None,
834 ),
835 EthereumTxEnvelope::Eip7702(s) => (
836 Some(encode_access_list(&s.tx().access_list)),
837 None,
838 Some(encode_authorization_list(&s.tx().authorization_list)),
839 ),
840 };
841
842 sqlx::query(
843 "INSERT INTO transactions (
844 block_number, tx_index, tx_hash, tx_type,
845 sig_y_parity, sig_r, sig_s,
846 chain_id, nonce, gas_limit, to_address, value, input,
847 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
848 max_fee_per_blob_gas, blob_versioned_hashes,
849 access_list, authorization_list, from_address
850 ) VALUES (
851 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
852 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
853 )",
854 )
855 .bind(bn)
856 .bind(tx_index)
857 .bind(tx_hash)
858 .bind(tx_type)
859 .bind(sig_y)
860 .bind(sig_r)
861 .bind(sig_s)
862 .bind(chain_id)
863 .bind(nonce)
864 .bind(gas_limit)
865 .bind(to_addr)
866 .bind(value)
867 .bind(input)
868 .bind(gas_price.as_ref().map(|v| v.as_slice()))
869 .bind(max_fee.as_ref().map(|v| v.as_slice()))
870 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
871 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
872 .bind(blob_hashes.as_deref())
873 .bind(access_list.as_deref())
874 .bind(auth_list.as_deref())
875 .bind(sender)
876 .execute(&mut *conn)
877 .await?;
878
879 Ok(())
880}
881
882async fn insert_signet_event(
884 conn: &mut sqlx::AnyConnection,
885 block_number: i64,
886 event_index: i64,
887 event: &DbSignetEvent,
888) -> Result<(), SqlColdError> {
889 let (event_type, order, chain_id) = match event {
890 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
891 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
892 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
893 };
894
895 let (value, gas, max_fee, amount) = match event {
896 DbSignetEvent::Transact(_, t) => {
897 (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
898 }
899 DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
900 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
901 };
902
903 sqlx::query(
904 "INSERT INTO signet_events (
905 block_number, event_index, event_type, order_index,
906 rollup_chain_id, sender, to_address, value, gas,
907 max_fee_per_gas, data, rollup_recipient, amount, token
908 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
909 )
910 .bind(block_number)
911 .bind(event_index)
912 .bind(event_type)
913 .bind(order)
914 .bind(chain_id)
915 .bind(match event {
916 DbSignetEvent::Transact(_, t) => Some(&t.sender),
917 _ => None,
918 })
919 .bind(match event {
920 DbSignetEvent::Transact(_, t) => Some(&t.to),
921 _ => None,
922 })
923 .bind(value)
924 .bind(gas)
925 .bind(max_fee)
926 .bind(match event {
927 DbSignetEvent::Transact(_, t) => Some(&t.data),
928 _ => None,
929 })
930 .bind(match event {
931 DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
932 DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
933 _ => None,
934 })
935 .bind(amount)
936 .bind(match event {
937 DbSignetEvent::EnterToken(_, e) => Some(&e.token),
938 _ => None,
939 })
940 .execute(&mut *conn)
941 .await?;
942
943 Ok(())
944}
945
946fn append_filter_clause<'a>(
956 clause: &mut String,
957 params: &mut Vec<&'a [u8]>,
958 mut idx: u32,
959 column: &str,
960 values: impl ExactSizeIterator<Item = &'a [u8]>,
961) -> u32 {
962 use std::fmt::Write;
963
964 let len = values.len();
965 if len == 1 {
966 write!(clause, " AND {column} = ${idx}").unwrap();
967 values.for_each(|v| params.push(v));
968 return idx + 1;
969 }
970 write!(clause, " AND {column} IN (").unwrap();
971 for (i, v) in values.enumerate() {
972 if i > 0 {
973 clause.push_str(", ");
974 }
975 write!(clause, "${idx}").unwrap();
976 params.push(v);
977 idx += 1;
978 }
979 clause.push(')');
980 idx
981}
982
983fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
984 let mut clause = String::new();
985 let mut params: Vec<&[u8]> = Vec::new();
986 let mut idx = start_idx;
987
988 if !filter.address.is_empty() {
989 idx = append_filter_clause(
990 &mut clause,
991 &mut params,
992 idx,
993 "l.address",
994 filter.address.iter().map(|a| a.as_slice()),
995 );
996 }
997
998 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
999 for (i, topic_filter) in filter.topics.iter().enumerate() {
1000 if topic_filter.is_empty() {
1001 continue;
1002 }
1003 idx = append_filter_clause(
1004 &mut clause,
1005 &mut params,
1006 idx,
1007 topic_cols[i],
1008 topic_filter.iter().map(|v| v.as_slice()),
1009 );
1010 }
1011
1012 (clause, params)
1013}
1014
1015impl ColdStorageRead for SqlColdBackend {
1020 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1021 let Some(block_num) = self.resolve_header_spec(spec).await? else {
1022 return Ok(None);
1023 };
1024 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1025 }
1026
1027 async fn get_headers(
1028 &self,
1029 specs: Vec<HeaderSpecifier>,
1030 ) -> ColdResult<Vec<Option<SealedHeader>>> {
1031 let mut results = Vec::with_capacity(specs.len());
1032 for spec in specs {
1033 let header = self.get_header(spec).await?;
1034 results.push(header);
1035 }
1036 Ok(results)
1037 }
1038
1039 async fn get_transaction(
1040 &self,
1041 spec: TransactionSpecifier,
1042 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1043 let row = match spec {
1044 TransactionSpecifier::Hash(hash) => sqlx::query(
1045 "SELECT t.*, h.block_hash
1046 FROM transactions t
1047 JOIN headers h ON t.block_number = h.block_number
1048 WHERE t.tx_hash = $1",
1049 )
1050 .bind(hash)
1051 .fetch_optional(&self.pool)
1052 .await
1053 .map_err(SqlColdError::from)?,
1054 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1055 "SELECT t.*, h.block_hash
1056 FROM transactions t
1057 JOIN headers h ON t.block_number = h.block_number
1058 WHERE t.block_number = $1 AND t.tx_index = $2",
1059 )
1060 .bind(to_i64(block))
1061 .bind(to_i64(index))
1062 .fetch_optional(&self.pool)
1063 .await
1064 .map_err(SqlColdError::from)?,
1065 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1066 "SELECT t.*, h.block_hash
1067 FROM transactions t
1068 JOIN headers h ON t.block_number = h.block_number
1069 WHERE h.block_hash = $1 AND t.tx_index = $2",
1070 )
1071 .bind(block_hash)
1072 .bind(to_i64(index))
1073 .fetch_optional(&self.pool)
1074 .await
1075 .map_err(SqlColdError::from)?,
1076 };
1077
1078 let Some(r) = row else {
1079 return Ok(None);
1080 };
1081
1082 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1083 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1084 let block_hash = r.get(COL_BLOCK_HASH);
1085 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1086 let meta = ConfirmationMeta::new(block, block_hash, index);
1087 Ok(Some(Confirmed::new(recovered, meta)))
1088 }
1089
1090 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1091 let bn = to_i64(block);
1092 let rows =
1093 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1094 .bind(bn)
1095 .fetch_all(&self.pool)
1096 .await
1097 .map_err(SqlColdError::from)?;
1098
1099 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1100 }
1101
1102 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1103 let bn = to_i64(block);
1104 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1105 .bind(bn)
1106 .fetch_one(&self.pool)
1107 .await
1108 .map_err(SqlColdError::from)?;
1109
1110 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1111 }
1112
1113 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1114 let (block, index) = match spec {
1116 ReceiptSpecifier::TxHash(hash) => {
1117 let row = sqlx::query(
1118 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1119 )
1120 .bind(hash)
1121 .fetch_optional(&self.pool)
1122 .await
1123 .map_err(SqlColdError::from)?;
1124 let Some(r) = row else { return Ok(None) };
1125 (
1126 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1127 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1128 )
1129 }
1130 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1131 };
1132
1133 let combined = sqlx::query(
1137 "SELECT h.*, \
1138 r.tx_type AS r_tx_type, r.success AS r_success, \
1139 r.cumulative_gas_used AS r_cumulative_gas_used, \
1140 r.first_log_index AS r_first_log_index, \
1141 t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1142 COALESCE( \
1143 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1144 FROM receipts r2 \
1145 WHERE r2.block_number = r.block_number \
1146 AND r2.tx_index < r.tx_index), \
1147 0 \
1148 ) AS prior_gas \
1149 FROM receipts r \
1150 JOIN transactions t ON r.block_number = t.block_number \
1151 AND r.tx_index = t.tx_index \
1152 JOIN headers h ON r.block_number = h.block_number \
1153 WHERE r.block_number = $1 AND r.tx_index = $2",
1154 )
1155 .bind(to_i64(block))
1156 .bind(to_i64(index))
1157 .fetch_optional(&self.pool)
1158 .await
1159 .map_err(SqlColdError::from)?;
1160
1161 let Some(rr) = combined else {
1162 return Ok(None);
1163 };
1164
1165 let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1167
1168 let tx_hash = rr.get(COL_R_TX_HASH);
1170 let sender = rr.get(COL_R_FROM_ADDRESS);
1171 let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1172 let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1173 let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1174 let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1175 let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1176
1177 let log_rows = sqlx::query(
1179 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1180 )
1181 .bind(to_i64(block))
1182 .bind(to_i64(index))
1183 .fetch_all(&self.pool)
1184 .await
1185 .map_err(SqlColdError::from)?;
1186
1187 let logs = log_rows.iter().map(log_from_row).collect();
1188 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1189 .map_err(ColdStorageError::from)?;
1190 debug_assert!(
1193 built.inner.cumulative_gas_used >= prior_cumulative_gas,
1194 "cumulative gas decreased: {} < {}",
1195 built.inner.cumulative_gas_used,
1196 prior_cumulative_gas,
1197 );
1198 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1199
1200 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1201 Ok(Some(ColdReceipt::new(ir, &header, index)))
1202 }
1203
1204 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1205 let Some(header) =
1206 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1207 else {
1208 return Ok(Vec::new());
1209 };
1210
1211 let bn = to_i64(block);
1212
1213 let receipt_rows = sqlx::query(
1215 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1216 r.cumulative_gas_used, r.first_log_index, \
1217 t.tx_hash, t.from_address \
1218 FROM receipts r \
1219 JOIN transactions t ON r.block_number = t.block_number \
1220 AND r.tx_index = t.tx_index \
1221 WHERE r.block_number = $1 \
1222 ORDER BY r.tx_index",
1223 )
1224 .bind(bn)
1225 .fetch_all(&self.pool)
1226 .await
1227 .map_err(SqlColdError::from)?;
1228
1229 let all_log_rows =
1230 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1231 .bind(bn)
1232 .fetch_all(&self.pool)
1233 .await
1234 .map_err(SqlColdError::from)?;
1235
1236 let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1238 for r in &all_log_rows {
1239 let tx_idx: i64 = r.get(COL_TX_INDEX);
1240 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1241 }
1242
1243 let mut first_log_index = 0u64;
1244 let mut prior_cumulative_gas = 0u64;
1245 receipt_rows
1246 .into_iter()
1247 .enumerate()
1248 .map(|(idx, rr)| {
1249 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1250 let tx_hash = rr.get(COL_TX_HASH);
1251 let sender = rr.get(COL_FROM_ADDRESS);
1252 let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1253 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1254 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1255 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1256 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1257 .map_err(ColdStorageError::from)?;
1258 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1259 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1260 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1261 first_log_index += ir.receipt.inner.logs.len() as u64;
1262 Ok(ColdReceipt::new(ir, &header, idx as u64))
1263 })
1264 .collect()
1265 }
1266
1267 async fn get_signet_events(
1268 &self,
1269 spec: SignetEventsSpecifier,
1270 ) -> ColdResult<Vec<DbSignetEvent>> {
1271 let rows = match spec {
1272 SignetEventsSpecifier::Block(block) => {
1273 let bn = to_i64(block);
1274 sqlx::query(
1275 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1276 )
1277 .bind(bn)
1278 .fetch_all(&self.pool)
1279 .await
1280 .map_err(SqlColdError::from)?
1281 }
1282 SignetEventsSpecifier::BlockRange { start, end } => {
1283 let s = to_i64(start);
1284 let e = to_i64(end);
1285 sqlx::query(
1286 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1287 ORDER BY block_number, event_index",
1288 )
1289 .bind(s)
1290 .bind(e)
1291 .fetch_all(&self.pool)
1292 .await
1293 .map_err(SqlColdError::from)?
1294 }
1295 };
1296
1297 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1298 }
1299
1300 async fn get_zenith_header(
1301 &self,
1302 spec: ZenithHeaderSpecifier,
1303 ) -> ColdResult<Option<DbZenithHeader>> {
1304 let block = match spec {
1305 ZenithHeaderSpecifier::Number(n) => n,
1306 ZenithHeaderSpecifier::Range { start, .. } => start,
1307 };
1308 let bn = to_i64(block);
1309 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1310 .bind(bn)
1311 .fetch_optional(&self.pool)
1312 .await
1313 .map_err(SqlColdError::from)?;
1314
1315 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1316 }
1317
1318 async fn get_zenith_headers(
1319 &self,
1320 spec: ZenithHeaderSpecifier,
1321 ) -> ColdResult<Vec<DbZenithHeader>> {
1322 let rows = match spec {
1323 ZenithHeaderSpecifier::Number(n) => {
1324 let bn = to_i64(n);
1325 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1326 .bind(bn)
1327 .fetch_all(&self.pool)
1328 .await
1329 .map_err(SqlColdError::from)?
1330 }
1331 ZenithHeaderSpecifier::Range { start, end } => {
1332 let s = to_i64(start);
1333 let e = to_i64(end);
1334 sqlx::query(
1335 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1336 ORDER BY block_number",
1337 )
1338 .bind(s)
1339 .bind(e)
1340 .fetch_all(&self.pool)
1341 .await
1342 .map_err(SqlColdError::from)?
1343 }
1344 };
1345
1346 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1347 }
1348
1349 async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1350 let from = filter.get_from_block().unwrap_or(0);
1351 let to = filter.get_to_block().unwrap_or(u64::MAX);
1352
1353 let (filter_clause, params) = build_log_filter_clause(filter, 3);
1357 let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1358
1359 let limit_idx = 3 + params.len() as u32;
1364 let data_sql = format!(
1365 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1366 (r.first_log_index + l.log_index) AS block_log_index \
1367 FROM logs l \
1368 JOIN headers h ON l.block_number = h.block_number \
1369 JOIN transactions t ON l.block_number = t.block_number \
1370 AND l.tx_index = t.tx_index \
1371 JOIN receipts r ON l.block_number = r.block_number \
1372 AND l.tx_index = r.tx_index \
1373 WHERE {where_clause} \
1374 ORDER BY l.block_number, l.tx_index, l.log_index \
1375 LIMIT ${limit_idx}"
1376 );
1377 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1378 for param in ¶ms {
1379 query = query.bind(*param);
1380 }
1381 let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1384 query = query.bind(to_i64(limit as u64));
1385
1386 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1387
1388 if rows.len() > max_logs {
1389 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1390 }
1391
1392 rows.into_iter()
1393 .map(|r| {
1394 let log = log_from_row(&r);
1395 Ok(RpcLog {
1396 inner: log,
1397 block_hash: Some(r.get(COL_BLOCK_HASH)),
1398 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1399 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1400 transaction_hash: Some(r.get(COL_TX_HASH)),
1401 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1402 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1403 removed: false,
1404 })
1405 })
1406 .collect::<ColdResult<Vec<_>>>()
1407 }
1408
1409 async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1410 #[cfg(feature = "postgres")]
1411 if self.is_postgres {
1412 return self.produce_log_stream_pg(filter, params).await;
1413 }
1414 signet_cold::produce_log_stream_default(self, filter, params).await;
1415 }
1416
1417 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1418 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1419 .fetch_one(&self.pool)
1420 .await
1421 .map_err(SqlColdError::from)?;
1422 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1423 }
1424}
1425
1426impl ColdStorageWrite for SqlColdBackend {
1427 async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1428 self.insert_block(data).await.map_err(ColdStorageError::from)
1429 }
1430
1431 async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1432 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1433 for block_data in data {
1434 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1435 }
1436 tx.commit().await.map_err(SqlColdError::from)?;
1437 Ok(())
1438 }
1439
1440 async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1441 let bn = to_i64(block);
1442 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1443
1444 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1445
1446 tx.commit().await.map_err(SqlColdError::from)?;
1447 Ok(())
1448 }
1449}
1450
1451impl ColdStorage for SqlColdBackend {
1452 async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1453 let bn = to_i64(block);
1454 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1455
1456 let header_rows =
1458 sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1459 .bind(bn)
1460 .fetch_all(&mut *tx)
1461 .await
1462 .map_err(SqlColdError::from)?;
1463
1464 if header_rows.is_empty() {
1465 tx.commit().await.map_err(SqlColdError::from)?;
1466 return Ok(Vec::new());
1467 }
1468
1469 let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1470 for r in &header_rows {
1471 let num: i64 = r.get(COL_BLOCK_NUMBER);
1472 let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1473 headers.insert(num, h);
1474 }
1475
1476 let receipt_rows = sqlx::query(
1478 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1479 r.cumulative_gas_used, r.first_log_index, \
1480 t.tx_hash, t.from_address \
1481 FROM receipts r \
1482 JOIN transactions t ON r.block_number = t.block_number \
1483 AND r.tx_index = t.tx_index \
1484 WHERE r.block_number > $1 \
1485 ORDER BY r.block_number, r.tx_index",
1486 )
1487 .bind(bn)
1488 .fetch_all(&mut *tx)
1489 .await
1490 .map_err(SqlColdError::from)?;
1491
1492 let log_rows = sqlx::query(
1494 "SELECT * FROM logs WHERE block_number > $1 \
1495 ORDER BY block_number, tx_index, log_index",
1496 )
1497 .bind(bn)
1498 .fetch_all(&mut *tx)
1499 .await
1500 .map_err(SqlColdError::from)?;
1501
1502 let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1504 for r in &log_rows {
1505 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1506 let tx_idx: i64 = r.get(COL_TX_INDEX);
1507 logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1508 }
1509
1510 let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1512 for r in &receipt_rows {
1513 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1514 receipts_by_block.entry(block_num).or_default().push(r);
1515 }
1516
1517 let mut all_receipts = Vec::with_capacity(headers.len());
1519 for (&block_num, header) in &headers {
1520 let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1521 let mut prior_cumulative_gas = 0u64;
1522 let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1523 .into_iter()
1524 .map(|rr: &sqlx::any::AnyRow| {
1525 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1526 let tx_hash = rr.get(COL_TX_HASH);
1527 let sender = rr.get(COL_FROM_ADDRESS);
1528 let tx_type: i32 = rr.get(COL_TX_TYPE);
1529 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1530 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1531 let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1532 let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1533 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1534 .map_err(ColdStorageError::from)?;
1535 debug_assert!(
1538 receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1539 "cumulative gas decreased: {} < {}",
1540 receipt.inner.cumulative_gas_used,
1541 prior_cumulative_gas,
1542 );
1543 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1544 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1545 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1546 Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1547 })
1548 .collect();
1549 all_receipts.push(block_receipts?);
1550 }
1551
1552 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1554
1555 tx.commit().await.map_err(SqlColdError::from)?;
1556 Ok(all_receipts)
1557 }
1558}
1559
1560#[cfg(all(test, feature = "test-utils"))]
1561mod tests {
1562 use super::*;
1563 use signet_cold::conformance::conformance;
1564
1565 #[tokio::test]
1566 async fn sqlite_conformance() {
1567 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1568 conformance(backend).await.unwrap();
1569 }
1570
1571 #[tokio::test]
1572 async fn pg_conformance() {
1573 let Ok(url) = std::env::var("DATABASE_URL") else {
1574 eprintln!("skipping pg conformance: DATABASE_URL not set");
1575 return;
1576 };
1577 let backend = SqlColdBackend::connect(&url).await.unwrap();
1578 conformance(backend).await.unwrap();
1579 }
1580}