1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14 COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15 COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16 COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17 COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18 COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19 COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20 COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21 COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22 COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26 decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27 encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30 consensus::{
31 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32 transaction::Recovered,
33 },
34 primitives::{
35 Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36 },
37};
38use signet_cold::{
39 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40 ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41 SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45 TransactionSigned,
46};
47use signet_zenith::{
48 Passage::{Enter, EnterToken},
49 Transactor::Transact,
50 Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76 pool: AnyPool,
77 is_postgres: bool,
78}
79
80fn is_in_memory_sqlite(url: &str) -> bool {
86 url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89async fn delete_above_in_tx(
92 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93 bn: i64,
94) -> Result<(), SqlColdError> {
95 for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96 {
97 sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98 .bind(bn)
99 .execute(&mut **tx)
100 .await?;
101 }
102 Ok(())
103}
104
105impl SqlColdBackend {
106 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113 let conn = pool.acquire().await?;
115 let backend = conn.backend_name().to_owned();
116 drop(conn);
117
118 let migration = match backend.as_str() {
119 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
120 "SQLite" => include_str!("../migrations/001_initial.sql"),
121 other => {
122 return Err(SqlColdError::Convert(format!(
123 "unsupported database backend: {other}"
124 )));
125 }
126 };
127 let is_postgres = backend == "PostgreSQL";
130 sqlx::raw_sql(migration).execute(&pool).await?;
131 Ok(Self { pool, is_postgres })
132 }
133
134 pub async fn connect_with(
143 url: &str,
144 pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
145 ) -> Result<Self, SqlColdError> {
146 sqlx::any::install_default_drivers();
147 let pool_opts =
148 if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
149 let pool: AnyPool = pool_opts.connect(url).await?;
150 Self::new(pool).await
151 }
152
153 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
157 Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
158 }
159
160 async fn resolve_header_spec(
165 &self,
166 spec: HeaderSpecifier,
167 ) -> Result<Option<BlockNumber>, SqlColdError> {
168 match spec {
169 HeaderSpecifier::Number(n) => Ok(Some(n)),
170 HeaderSpecifier::Hash(hash) => {
171 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
172 .bind(hash)
173 .fetch_optional(&self.pool)
174 .await?;
175 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
176 }
177 }
178 }
179
180 async fn fetch_header_by_number(
185 &self,
186 block_num: BlockNumber,
187 ) -> Result<Option<SealedHeader>, SqlColdError> {
188 let bn = to_i64(block_num);
189 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
190 .bind(bn)
191 .fetch_optional(&self.pool)
192 .await?;
193
194 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
195 }
196
197 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
202 let mut tx = self.pool.begin().await?;
203 write_block_to_tx(&mut tx, data).await?;
204 tx.commit().await?;
205 Ok(())
206 }
207
208 #[cfg(feature = "postgres")]
218 async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
219 use tokio_stream::StreamExt;
220
221 macro_rules! try_stream {
223 ($sender:expr, $expr:expr) => {
224 match $expr {
225 Ok(v) => v,
226 Err(e) => {
227 let _ = $sender
228 .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
229 .await;
230 return;
231 }
232 }
233 };
234 }
235
236 let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
237
238 let mut tx = try_stream!(sender, self.pool.begin().await);
242 try_stream!(
243 sender,
244 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
245 );
246
247 let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
251 let data_sql = format!(
252 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
253 (r.first_log_index + l.log_index) AS block_log_index \
254 FROM logs l \
255 JOIN headers h ON l.block_number = h.block_number \
256 JOIN transactions t ON l.block_number = t.block_number \
257 AND l.tx_index = t.tx_index \
258 JOIN receipts r ON l.block_number = r.block_number \
259 AND l.tx_index = r.tx_index \
260 WHERE l.block_number = $1{filter_clause} \
261 ORDER BY l.tx_index, l.log_index"
262 );
263
264 let mut total = 0usize;
265
266 for block_num in from..=to {
269 if tokio::time::Instant::now() > deadline {
272 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
273 return;
274 }
275
276 let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
277 for param in &filter_params {
278 query = query.bind(*param);
279 }
280
281 let mut stream = query.fetch(&mut *tx);
285 while let Some(row_result) = stream.next().await {
286 let r = try_stream!(sender, row_result);
287
288 total += 1;
290 if total > max_logs {
291 let _ =
292 sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
293 return;
294 }
295
296 let log = log_from_row(&r);
297 let rpc_log = RpcLog {
298 inner: log,
299 block_hash: Some(r.get(COL_BLOCK_HASH)),
300 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
301 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
302 transaction_hash: Some(r.get(COL_TX_HASH)),
303 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
304 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
305 removed: false,
306 };
307 match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
310 Ok(Ok(())) => {}
311 Ok(Err(_)) => return, Err(_) => {
313 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
314 return;
315 }
316 }
317 }
318
319 if total >= max_logs {
322 return;
323 }
324 }
325 }
326}
327
328fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
334 r.get(col)
335}
336
337fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
339 r.get(col)
340}
341
342fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
344 Ok(Header {
345 parent_hash: r.get(COL_PARENT_HASH),
346 ommers_hash: r.get(COL_OMMERS_HASH),
347 beneficiary: r.get(COL_BENEFICIARY),
348 state_root: r.get(COL_STATE_ROOT),
349 transactions_root: r.get(COL_TRANSACTIONS_ROOT),
350 receipts_root: r.get(COL_RECEIPTS_ROOT),
351 logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
352 difficulty: r.get(COL_DIFFICULTY),
353 number: from_i64(r.get(COL_BLOCK_NUMBER)),
354 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
355 gas_used: from_i64(r.get(COL_GAS_USED)),
356 timestamp: from_i64(r.get(COL_TIMESTAMP)),
357 extra_data: r.get(COL_EXTRA_DATA),
358 mix_hash: r.get(COL_MIX_HASH),
359 nonce: r.get(COL_NONCE),
360 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
361 withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
362 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
363 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
364 parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
365 requests_hash: r.get(COL_REQUESTS_HASH),
366 })
367}
368
369fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
371 use alloy::consensus::EthereumTxEnvelope;
372
373 let sig =
374 Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
375
376 let tx_type_raw: i32 = r.get(COL_TX_TYPE);
377 let tx_type_u8: u8 = tx_type_raw
378 .try_into()
379 .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
380 let tx_type = TxType::try_from(tx_type_u8)
381 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
382
383 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
384 let nonce = from_i64(r.get(COL_NONCE));
385 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
386 let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
387 let value: U256 = r.get(COL_VALUE);
388 let input: Bytes = r.get(COL_INPUT);
389
390 match tx_type {
391 TxType::Legacy => {
392 let tx = TxLegacy {
393 chain_id: chain_id.map(from_i64),
394 nonce,
395 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
396 gas_limit,
397 to: to_addr.map_or(TxKind::Create, TxKind::Call),
398 value,
399 input,
400 };
401 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
402 }
403 TxType::Eip2930 => {
404 let tx = TxEip2930 {
405 chain_id: from_i64(
406 chain_id
407 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
408 ),
409 nonce,
410 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
411 gas_limit,
412 to: to_addr.map_or(TxKind::Create, TxKind::Call),
413 value,
414 input,
415 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
416 };
417 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
418 }
419 TxType::Eip1559 => {
420 let tx = TxEip1559 {
421 chain_id: from_i64(
422 chain_id
423 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
424 ),
425 nonce,
426 gas_limit,
427 max_fee_per_gas: decode_u128_required(
428 opt_blob(r, COL_MAX_FEE_PER_GAS),
429 COL_MAX_FEE_PER_GAS,
430 )?,
431 max_priority_fee_per_gas: decode_u128_required(
432 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
433 COL_MAX_PRIORITY_FEE_PER_GAS,
434 )?,
435 to: to_addr.map_or(TxKind::Create, TxKind::Call),
436 value,
437 input,
438 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
439 };
440 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
441 }
442 TxType::Eip4844 => {
443 let tx = TxEip4844 {
444 chain_id: from_i64(
445 chain_id
446 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
447 ),
448 nonce,
449 gas_limit,
450 max_fee_per_gas: decode_u128_required(
451 opt_blob(r, COL_MAX_FEE_PER_GAS),
452 COL_MAX_FEE_PER_GAS,
453 )?,
454 max_priority_fee_per_gas: decode_u128_required(
455 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
456 COL_MAX_PRIORITY_FEE_PER_GAS,
457 )?,
458 to: to_addr
459 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
460 value,
461 input,
462 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
463 blob_versioned_hashes: decode_b256_vec(
464 opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
465 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
466 })?,
467 )?,
468 max_fee_per_blob_gas: decode_u128_required(
469 opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
470 COL_MAX_FEE_PER_BLOB_GAS,
471 )?,
472 };
473 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
474 }
475 TxType::Eip7702 => {
476 let tx = TxEip7702 {
477 chain_id: from_i64(
478 chain_id
479 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
480 ),
481 nonce,
482 gas_limit,
483 max_fee_per_gas: decode_u128_required(
484 opt_blob(r, COL_MAX_FEE_PER_GAS),
485 COL_MAX_FEE_PER_GAS,
486 )?,
487 max_priority_fee_per_gas: decode_u128_required(
488 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
489 COL_MAX_PRIORITY_FEE_PER_GAS,
490 )?,
491 to: to_addr
492 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
493 value,
494 input,
495 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
496 authorization_list: decode_authorization_list(
497 opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
498 SqlColdError::Convert("EIP7702 requires authorization_list".into())
499 })?,
500 )?,
501 };
502 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
503 }
504 }
505}
506
507fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
509 let sender: Address = r.get(COL_FROM_ADDRESS);
510 let tx = tx_from_row(r)?;
511 Ok(Recovered::new_unchecked(tx, sender))
513}
514
515fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
517 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
518 .into_iter()
519 .filter_map(|col| r.get::<Option<B256>, _>(col))
520 .collect();
521 Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
522}
523
524fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
526 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
527 let order = from_i64(r.get(COL_ORDER_INDEX));
528 let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
529
530 match event_type {
531 EVENT_TRANSACT => {
532 let sender: Address = r
533 .get::<Option<Address>, _>(COL_SENDER)
534 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
535 let to: Address = r
536 .get::<Option<Address>, _>(COL_TO_ADDRESS)
537 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
538 let value: U256 = r
539 .get::<Option<U256>, _>(COL_VALUE)
540 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
541 let gas: U256 = r
542 .get::<Option<U256>, _>(COL_GAS)
543 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
544 let max_fee: U256 = r
545 .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
546 .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
547 let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
548
549 Ok(DbSignetEvent::Transact(
550 order,
551 Transact {
552 rollupChainId: rollup_chain_id,
553 sender,
554 to,
555 value,
556 gas,
557 maxFeePerGas: max_fee,
558 data,
559 },
560 ))
561 }
562 EVENT_ENTER => {
563 let recipient: Address = r
564 .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
565 .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
566 let amount: U256 = r
567 .get::<Option<U256>, _>(COL_AMOUNT)
568 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
569
570 Ok(DbSignetEvent::Enter(
571 order,
572 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
573 ))
574 }
575 EVENT_ENTER_TOKEN => {
576 let token: Address = r
577 .get::<Option<Address>, _>(COL_TOKEN)
578 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
579 let recipient: Address =
580 r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
581 SqlColdError::Convert("EnterToken requires rollup_recipient".into())
582 })?;
583 let amount: U256 = r
584 .get::<Option<U256>, _>(COL_AMOUNT)
585 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
586
587 Ok(DbSignetEvent::EnterToken(
588 order,
589 EnterToken {
590 rollupChainId: rollup_chain_id,
591 token,
592 rollupRecipient: recipient,
593 amount,
594 },
595 ))
596 }
597 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
598 }
599}
600
601fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
603 Ok(DbZenithHeader(Zenith::BlockHeader {
604 hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
605 rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
606 gasLimit: r.get(COL_GAS_LIMIT),
607 rewardAddress: r.get(COL_REWARD_ADDRESS),
608 blockDataHash: r.get(COL_BLOCK_DATA_HASH),
609 }))
610}
611
612async fn write_block_to_tx(
618 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
619 data: BlockData,
620) -> Result<(), SqlColdError> {
621 let bn = to_i64(data.block_number());
622
623 let block_hash = data.header.hash_slow();
625 let difficulty = &data.header.difficulty;
626 sqlx::query(
627 "INSERT INTO headers (
628 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
629 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
630 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
631 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
632 parent_beacon_block_root, requests_hash
633 ) VALUES (
634 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
635 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
636 )",
637 )
638 .bind(bn)
639 .bind(block_hash)
640 .bind(data.header.parent_hash)
641 .bind(data.header.ommers_hash)
642 .bind(data.header.beneficiary)
643 .bind(data.header.state_root)
644 .bind(data.header.transactions_root)
645 .bind(data.header.receipts_root)
646 .bind(data.header.logs_bloom)
647 .bind(difficulty)
648 .bind(to_i64(data.header.gas_limit))
649 .bind(to_i64(data.header.gas_used))
650 .bind(to_i64(data.header.timestamp))
651 .bind(&data.header.extra_data)
652 .bind(data.header.mix_hash)
653 .bind(data.header.nonce)
654 .bind(data.header.base_fee_per_gas.map(to_i64))
655 .bind(data.header.withdrawals_root.as_ref())
656 .bind(data.header.blob_gas_used.map(to_i64))
657 .bind(data.header.excess_blob_gas.map(to_i64))
658 .bind(data.header.parent_beacon_block_root.as_ref())
659 .bind(data.header.requests_hash.as_ref())
660 .execute(&mut **tx)
661 .await?;
662
663 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
665 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
666 }
667
668 let mut first_log_index = 0i64;
671 for (idx, receipt) in data.receipts.iter().enumerate() {
672 let tx_idx = to_i64(idx as u64);
673 sqlx::query(
674 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
675 VALUES ($1, $2, $3, $4, $5, $6)",
676 )
677 .bind(bn)
678 .bind(tx_idx)
679 .bind(receipt.tx_type as i32)
680 .bind(receipt.inner.status.coerce_status() as i32)
681 .bind(to_i64(receipt.inner.cumulative_gas_used))
682 .bind(first_log_index)
683 .execute(&mut **tx)
684 .await?;
685 first_log_index += receipt.inner.logs.len() as i64;
686
687 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
688 let topics = log.topics();
689 sqlx::query(
690 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
691 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
692 )
693 .bind(bn)
694 .bind(tx_idx)
695 .bind(to_i64(log_idx as u64))
696 .bind(log.address)
697 .bind(topics.first())
698 .bind(topics.get(1))
699 .bind(topics.get(2))
700 .bind(topics.get(3))
701 .bind(&log.data.data)
702 .execute(&mut **tx)
703 .await?;
704 }
705 }
706
707 for (idx, event) in data.signet_events.iter().enumerate() {
709 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
710 }
711
712 if let Some(zh) = &data.zenith_header {
714 let h = &zh.0;
715 sqlx::query(
716 "INSERT INTO zenith_headers (
717 block_number, host_block_number, rollup_chain_id,
718 gas_limit, reward_address, block_data_hash
719 ) VALUES ($1, $2, $3, $4, $5, $6)",
720 )
721 .bind(bn)
722 .bind(h.hostBlockNumber)
723 .bind(h.rollupChainId)
724 .bind(h.gasLimit)
725 .bind(h.rewardAddress)
726 .bind(h.blockDataHash)
727 .execute(&mut **tx)
728 .await?;
729 }
730
731 Ok(())
732}
733
734async fn insert_transaction(
736 conn: &mut sqlx::AnyConnection,
737 bn: i64,
738 tx_index: i64,
739 recovered: &RecoveredTx,
740) -> Result<(), SqlColdError> {
741 use alloy::consensus::EthereumTxEnvelope;
742
743 let sender = recovered.signer();
744 let tx: &TransactionSigned = recovered;
745 let tx_hash = tx.tx_hash();
746 let tx_type = tx.tx_type() as i32;
747
748 macro_rules! sig {
749 ($s:expr) => {{
750 let sig = $s.signature();
751 (sig.v() as i32, sig.r(), sig.s())
752 }};
753 }
754 let (sig_y, sig_r, sig_s) = match tx {
755 EthereumTxEnvelope::Legacy(s) => sig!(s),
756 EthereumTxEnvelope::Eip2930(s) => sig!(s),
757 EthereumTxEnvelope::Eip1559(s) => sig!(s),
758 EthereumTxEnvelope::Eip4844(s) => sig!(s),
759 EthereumTxEnvelope::Eip7702(s) => sig!(s),
760 };
761
762 let (chain_id, nonce, gas_limit) = match tx {
763 EthereumTxEnvelope::Legacy(s) => {
764 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
765 }
766 EthereumTxEnvelope::Eip2930(s) => {
767 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
768 }
769 EthereumTxEnvelope::Eip1559(s) => {
770 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771 }
772 EthereumTxEnvelope::Eip4844(s) => {
773 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774 }
775 EthereumTxEnvelope::Eip7702(s) => {
776 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777 }
778 };
779
780 let (value, to_addr) = match tx {
781 EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
782 EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
783 EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
784 EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
785 EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
786 };
787
788 let input: &[u8] = match tx {
789 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
790 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
791 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
792 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
793 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
794 };
795
796 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
797 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
798 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
799 EthereumTxEnvelope::Eip1559(s) => (
800 None,
801 Some(encode_u128(s.tx().max_fee_per_gas)),
802 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
803 None,
804 ),
805 EthereumTxEnvelope::Eip4844(s) => (
806 None,
807 Some(encode_u128(s.tx().max_fee_per_gas)),
808 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
809 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
810 ),
811 EthereumTxEnvelope::Eip7702(s) => (
812 None,
813 Some(encode_u128(s.tx().max_fee_per_gas)),
814 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
815 None,
816 ),
817 };
818
819 let (access_list, blob_hashes, auth_list) = match tx {
820 EthereumTxEnvelope::Legacy(_) => (None, None, None),
821 EthereumTxEnvelope::Eip2930(s) => {
822 (Some(encode_access_list(&s.tx().access_list)), None, None)
823 }
824 EthereumTxEnvelope::Eip1559(s) => {
825 (Some(encode_access_list(&s.tx().access_list)), None, None)
826 }
827 EthereumTxEnvelope::Eip4844(s) => (
828 Some(encode_access_list(&s.tx().access_list)),
829 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
830 None,
831 ),
832 EthereumTxEnvelope::Eip7702(s) => (
833 Some(encode_access_list(&s.tx().access_list)),
834 None,
835 Some(encode_authorization_list(&s.tx().authorization_list)),
836 ),
837 };
838
839 sqlx::query(
840 "INSERT INTO transactions (
841 block_number, tx_index, tx_hash, tx_type,
842 sig_y_parity, sig_r, sig_s,
843 chain_id, nonce, gas_limit, to_address, value, input,
844 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
845 max_fee_per_blob_gas, blob_versioned_hashes,
846 access_list, authorization_list, from_address
847 ) VALUES (
848 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
849 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
850 )",
851 )
852 .bind(bn)
853 .bind(tx_index)
854 .bind(tx_hash)
855 .bind(tx_type)
856 .bind(sig_y)
857 .bind(sig_r)
858 .bind(sig_s)
859 .bind(chain_id)
860 .bind(nonce)
861 .bind(gas_limit)
862 .bind(to_addr)
863 .bind(value)
864 .bind(input)
865 .bind(gas_price.as_ref().map(|v| v.as_slice()))
866 .bind(max_fee.as_ref().map(|v| v.as_slice()))
867 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
868 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
869 .bind(blob_hashes.as_deref())
870 .bind(access_list.as_deref())
871 .bind(auth_list.as_deref())
872 .bind(sender)
873 .execute(&mut *conn)
874 .await?;
875
876 Ok(())
877}
878
879async fn insert_signet_event(
881 conn: &mut sqlx::AnyConnection,
882 block_number: i64,
883 event_index: i64,
884 event: &DbSignetEvent,
885) -> Result<(), SqlColdError> {
886 let (event_type, order, chain_id) = match event {
887 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
888 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
889 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
890 };
891
892 let (value, gas, max_fee, amount) = match event {
893 DbSignetEvent::Transact(_, t) => {
894 (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
895 }
896 DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
897 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
898 };
899
900 sqlx::query(
901 "INSERT INTO signet_events (
902 block_number, event_index, event_type, order_index,
903 rollup_chain_id, sender, to_address, value, gas,
904 max_fee_per_gas, data, rollup_recipient, amount, token
905 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
906 )
907 .bind(block_number)
908 .bind(event_index)
909 .bind(event_type)
910 .bind(order)
911 .bind(chain_id)
912 .bind(match event {
913 DbSignetEvent::Transact(_, t) => Some(&t.sender),
914 _ => None,
915 })
916 .bind(match event {
917 DbSignetEvent::Transact(_, t) => Some(&t.to),
918 _ => None,
919 })
920 .bind(value)
921 .bind(gas)
922 .bind(max_fee)
923 .bind(match event {
924 DbSignetEvent::Transact(_, t) => Some(&t.data),
925 _ => None,
926 })
927 .bind(match event {
928 DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
929 DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
930 _ => None,
931 })
932 .bind(amount)
933 .bind(match event {
934 DbSignetEvent::EnterToken(_, e) => Some(&e.token),
935 _ => None,
936 })
937 .execute(&mut *conn)
938 .await?;
939
940 Ok(())
941}
942
943fn append_filter_clause<'a>(
953 clause: &mut String,
954 params: &mut Vec<&'a [u8]>,
955 mut idx: u32,
956 column: &str,
957 values: impl ExactSizeIterator<Item = &'a [u8]>,
958) -> u32 {
959 use std::fmt::Write;
960
961 let len = values.len();
962 if len == 1 {
963 write!(clause, " AND {column} = ${idx}").unwrap();
964 values.for_each(|v| params.push(v));
965 return idx + 1;
966 }
967 write!(clause, " AND {column} IN (").unwrap();
968 for (i, v) in values.enumerate() {
969 if i > 0 {
970 clause.push_str(", ");
971 }
972 write!(clause, "${idx}").unwrap();
973 params.push(v);
974 idx += 1;
975 }
976 clause.push(')');
977 idx
978}
979
980fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
981 let mut clause = String::new();
982 let mut params: Vec<&[u8]> = Vec::new();
983 let mut idx = start_idx;
984
985 if !filter.address.is_empty() {
986 idx = append_filter_clause(
987 &mut clause,
988 &mut params,
989 idx,
990 "l.address",
991 filter.address.iter().map(|a| a.as_slice()),
992 );
993 }
994
995 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
996 for (i, topic_filter) in filter.topics.iter().enumerate() {
997 if topic_filter.is_empty() {
998 continue;
999 }
1000 idx = append_filter_clause(
1001 &mut clause,
1002 &mut params,
1003 idx,
1004 topic_cols[i],
1005 topic_filter.iter().map(|v| v.as_slice()),
1006 );
1007 }
1008
1009 (clause, params)
1010}
1011
1012impl ColdStorageRead for SqlColdBackend {
1017 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1018 let Some(block_num) = self.resolve_header_spec(spec).await? else {
1019 return Ok(None);
1020 };
1021 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1022 }
1023
1024 async fn get_headers(
1025 &self,
1026 specs: Vec<HeaderSpecifier>,
1027 ) -> ColdResult<Vec<Option<SealedHeader>>> {
1028 let mut results = Vec::with_capacity(specs.len());
1029 for spec in specs {
1030 let header = self.get_header(spec).await?;
1031 results.push(header);
1032 }
1033 Ok(results)
1034 }
1035
1036 async fn get_transaction(
1037 &self,
1038 spec: TransactionSpecifier,
1039 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1040 let row = match spec {
1041 TransactionSpecifier::Hash(hash) => sqlx::query(
1042 "SELECT t.*, h.block_hash
1043 FROM transactions t
1044 JOIN headers h ON t.block_number = h.block_number
1045 WHERE t.tx_hash = $1",
1046 )
1047 .bind(hash)
1048 .fetch_optional(&self.pool)
1049 .await
1050 .map_err(SqlColdError::from)?,
1051 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1052 "SELECT t.*, h.block_hash
1053 FROM transactions t
1054 JOIN headers h ON t.block_number = h.block_number
1055 WHERE t.block_number = $1 AND t.tx_index = $2",
1056 )
1057 .bind(to_i64(block))
1058 .bind(to_i64(index))
1059 .fetch_optional(&self.pool)
1060 .await
1061 .map_err(SqlColdError::from)?,
1062 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1063 "SELECT t.*, h.block_hash
1064 FROM transactions t
1065 JOIN headers h ON t.block_number = h.block_number
1066 WHERE h.block_hash = $1 AND t.tx_index = $2",
1067 )
1068 .bind(block_hash)
1069 .bind(to_i64(index))
1070 .fetch_optional(&self.pool)
1071 .await
1072 .map_err(SqlColdError::from)?,
1073 };
1074
1075 let Some(r) = row else {
1076 return Ok(None);
1077 };
1078
1079 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1080 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1081 let block_hash = r.get(COL_BLOCK_HASH);
1082 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1083 let meta = ConfirmationMeta::new(block, block_hash, index);
1084 Ok(Some(Confirmed::new(recovered, meta)))
1085 }
1086
1087 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1088 let bn = to_i64(block);
1089 let rows =
1090 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1091 .bind(bn)
1092 .fetch_all(&self.pool)
1093 .await
1094 .map_err(SqlColdError::from)?;
1095
1096 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1097 }
1098
1099 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1100 let bn = to_i64(block);
1101 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1102 .bind(bn)
1103 .fetch_one(&self.pool)
1104 .await
1105 .map_err(SqlColdError::from)?;
1106
1107 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1108 }
1109
1110 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1111 let (block, index) = match spec {
1113 ReceiptSpecifier::TxHash(hash) => {
1114 let row = sqlx::query(
1115 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1116 )
1117 .bind(hash)
1118 .fetch_optional(&self.pool)
1119 .await
1120 .map_err(SqlColdError::from)?;
1121 let Some(r) = row else { return Ok(None) };
1122 (
1123 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1124 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1125 )
1126 }
1127 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1128 };
1129
1130 let combined = sqlx::query(
1134 "SELECT h.*, \
1135 r.tx_type AS r_tx_type, r.success AS r_success, \
1136 r.cumulative_gas_used AS r_cumulative_gas_used, \
1137 r.first_log_index AS r_first_log_index, \
1138 t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1139 COALESCE( \
1140 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1141 FROM receipts r2 \
1142 WHERE r2.block_number = r.block_number \
1143 AND r2.tx_index < r.tx_index), \
1144 0 \
1145 ) AS prior_gas \
1146 FROM receipts r \
1147 JOIN transactions t ON r.block_number = t.block_number \
1148 AND r.tx_index = t.tx_index \
1149 JOIN headers h ON r.block_number = h.block_number \
1150 WHERE r.block_number = $1 AND r.tx_index = $2",
1151 )
1152 .bind(to_i64(block))
1153 .bind(to_i64(index))
1154 .fetch_optional(&self.pool)
1155 .await
1156 .map_err(SqlColdError::from)?;
1157
1158 let Some(rr) = combined else {
1159 return Ok(None);
1160 };
1161
1162 let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1164
1165 let tx_hash = rr.get(COL_R_TX_HASH);
1167 let sender = rr.get(COL_R_FROM_ADDRESS);
1168 let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1169 let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1170 let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1171 let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1172 let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1173
1174 let log_rows = sqlx::query(
1176 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1177 )
1178 .bind(to_i64(block))
1179 .bind(to_i64(index))
1180 .fetch_all(&self.pool)
1181 .await
1182 .map_err(SqlColdError::from)?;
1183
1184 let logs = log_rows.iter().map(log_from_row).collect();
1185 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1186 .map_err(ColdStorageError::from)?;
1187 debug_assert!(
1190 built.inner.cumulative_gas_used >= prior_cumulative_gas,
1191 "cumulative gas decreased: {} < {}",
1192 built.inner.cumulative_gas_used,
1193 prior_cumulative_gas,
1194 );
1195 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1196
1197 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1198 Ok(Some(ColdReceipt::new(ir, &header, index)))
1199 }
1200
1201 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1202 let Some(header) =
1203 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1204 else {
1205 return Ok(Vec::new());
1206 };
1207
1208 let bn = to_i64(block);
1209
1210 let receipt_rows = sqlx::query(
1212 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1213 r.cumulative_gas_used, r.first_log_index, \
1214 t.tx_hash, t.from_address \
1215 FROM receipts r \
1216 JOIN transactions t ON r.block_number = t.block_number \
1217 AND r.tx_index = t.tx_index \
1218 WHERE r.block_number = $1 \
1219 ORDER BY r.tx_index",
1220 )
1221 .bind(bn)
1222 .fetch_all(&self.pool)
1223 .await
1224 .map_err(SqlColdError::from)?;
1225
1226 let all_log_rows =
1227 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1228 .bind(bn)
1229 .fetch_all(&self.pool)
1230 .await
1231 .map_err(SqlColdError::from)?;
1232
1233 let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1235 for r in &all_log_rows {
1236 let tx_idx: i64 = r.get(COL_TX_INDEX);
1237 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1238 }
1239
1240 let mut first_log_index = 0u64;
1241 let mut prior_cumulative_gas = 0u64;
1242 receipt_rows
1243 .into_iter()
1244 .enumerate()
1245 .map(|(idx, rr)| {
1246 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1247 let tx_hash = rr.get(COL_TX_HASH);
1248 let sender = rr.get(COL_FROM_ADDRESS);
1249 let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1250 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1251 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1252 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1253 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1254 .map_err(ColdStorageError::from)?;
1255 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1256 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1257 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1258 first_log_index += ir.receipt.inner.logs.len() as u64;
1259 Ok(ColdReceipt::new(ir, &header, idx as u64))
1260 })
1261 .collect()
1262 }
1263
1264 async fn get_signet_events(
1265 &self,
1266 spec: SignetEventsSpecifier,
1267 ) -> ColdResult<Vec<DbSignetEvent>> {
1268 let rows = match spec {
1269 SignetEventsSpecifier::Block(block) => {
1270 let bn = to_i64(block);
1271 sqlx::query(
1272 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1273 )
1274 .bind(bn)
1275 .fetch_all(&self.pool)
1276 .await
1277 .map_err(SqlColdError::from)?
1278 }
1279 SignetEventsSpecifier::BlockRange { start, end } => {
1280 let s = to_i64(start);
1281 let e = to_i64(end);
1282 sqlx::query(
1283 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1284 ORDER BY block_number, event_index",
1285 )
1286 .bind(s)
1287 .bind(e)
1288 .fetch_all(&self.pool)
1289 .await
1290 .map_err(SqlColdError::from)?
1291 }
1292 };
1293
1294 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1295 }
1296
1297 async fn get_zenith_header(
1298 &self,
1299 spec: ZenithHeaderSpecifier,
1300 ) -> ColdResult<Option<DbZenithHeader>> {
1301 let block = match spec {
1302 ZenithHeaderSpecifier::Number(n) => n,
1303 ZenithHeaderSpecifier::Range { start, .. } => start,
1304 };
1305 let bn = to_i64(block);
1306 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1307 .bind(bn)
1308 .fetch_optional(&self.pool)
1309 .await
1310 .map_err(SqlColdError::from)?;
1311
1312 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1313 }
1314
1315 async fn get_zenith_headers(
1316 &self,
1317 spec: ZenithHeaderSpecifier,
1318 ) -> ColdResult<Vec<DbZenithHeader>> {
1319 let rows = match spec {
1320 ZenithHeaderSpecifier::Number(n) => {
1321 let bn = to_i64(n);
1322 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1323 .bind(bn)
1324 .fetch_all(&self.pool)
1325 .await
1326 .map_err(SqlColdError::from)?
1327 }
1328 ZenithHeaderSpecifier::Range { start, end } => {
1329 let s = to_i64(start);
1330 let e = to_i64(end);
1331 sqlx::query(
1332 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1333 ORDER BY block_number",
1334 )
1335 .bind(s)
1336 .bind(e)
1337 .fetch_all(&self.pool)
1338 .await
1339 .map_err(SqlColdError::from)?
1340 }
1341 };
1342
1343 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1344 }
1345
1346 async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1347 let from = filter.get_from_block().unwrap_or(0);
1348 let to = filter.get_to_block().unwrap_or(u64::MAX);
1349
1350 let (filter_clause, params) = build_log_filter_clause(filter, 3);
1352 let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1353
1354 let limit_idx = 3 + params.len() as u32;
1359 let data_sql = format!(
1360 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1361 (r.first_log_index + l.log_index) AS block_log_index \
1362 FROM logs l \
1363 JOIN headers h ON l.block_number = h.block_number \
1364 JOIN transactions t ON l.block_number = t.block_number \
1365 AND l.tx_index = t.tx_index \
1366 JOIN receipts r ON l.block_number = r.block_number \
1367 AND l.tx_index = r.tx_index \
1368 WHERE {where_clause} \
1369 ORDER BY l.block_number, l.tx_index, l.log_index \
1370 LIMIT ${limit_idx}"
1371 );
1372 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1373 for param in ¶ms {
1374 query = query.bind(*param);
1375 }
1376 let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1379 query = query.bind(to_i64(limit as u64));
1380
1381 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1382
1383 if rows.len() > max_logs {
1384 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1385 }
1386
1387 rows.into_iter()
1388 .map(|r| {
1389 let log = log_from_row(&r);
1390 Ok(RpcLog {
1391 inner: log,
1392 block_hash: Some(r.get(COL_BLOCK_HASH)),
1393 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1394 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1395 transaction_hash: Some(r.get(COL_TX_HASH)),
1396 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1397 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1398 removed: false,
1399 })
1400 })
1401 .collect::<ColdResult<Vec<_>>>()
1402 }
1403
1404 async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1405 #[cfg(feature = "postgres")]
1406 if self.is_postgres {
1407 return self.produce_log_stream_pg(filter, params).await;
1408 }
1409 signet_cold::produce_log_stream_default(self, filter, params).await;
1410 }
1411
1412 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1413 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1414 .fetch_one(&self.pool)
1415 .await
1416 .map_err(SqlColdError::from)?;
1417 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1418 }
1419}
1420
1421impl ColdStorageWrite for SqlColdBackend {
1422 async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1423 self.insert_block(data).await.map_err(ColdStorageError::from)
1424 }
1425
1426 async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1427 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1428 for block_data in data {
1429 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1430 }
1431 tx.commit().await.map_err(SqlColdError::from)?;
1432 Ok(())
1433 }
1434
1435 async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1436 let bn = to_i64(block);
1437 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1438
1439 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1440
1441 tx.commit().await.map_err(SqlColdError::from)?;
1442 Ok(())
1443 }
1444}
1445
1446impl ColdStorage for SqlColdBackend {
1447 async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1448 let bn = to_i64(block);
1449 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1450
1451 let header_rows =
1453 sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1454 .bind(bn)
1455 .fetch_all(&mut *tx)
1456 .await
1457 .map_err(SqlColdError::from)?;
1458
1459 if header_rows.is_empty() {
1460 tx.commit().await.map_err(SqlColdError::from)?;
1461 return Ok(Vec::new());
1462 }
1463
1464 let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1465 for r in &header_rows {
1466 let num: i64 = r.get(COL_BLOCK_NUMBER);
1467 let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1468 headers.insert(num, h);
1469 }
1470
1471 let receipt_rows = sqlx::query(
1473 "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1474 r.cumulative_gas_used, r.first_log_index, \
1475 t.tx_hash, t.from_address \
1476 FROM receipts r \
1477 JOIN transactions t ON r.block_number = t.block_number \
1478 AND r.tx_index = t.tx_index \
1479 WHERE r.block_number > $1 \
1480 ORDER BY r.block_number, r.tx_index",
1481 )
1482 .bind(bn)
1483 .fetch_all(&mut *tx)
1484 .await
1485 .map_err(SqlColdError::from)?;
1486
1487 let log_rows = sqlx::query(
1489 "SELECT * FROM logs WHERE block_number > $1 \
1490 ORDER BY block_number, tx_index, log_index",
1491 )
1492 .bind(bn)
1493 .fetch_all(&mut *tx)
1494 .await
1495 .map_err(SqlColdError::from)?;
1496
1497 let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1499 for r in &log_rows {
1500 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1501 let tx_idx: i64 = r.get(COL_TX_INDEX);
1502 logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1503 }
1504
1505 let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1507 for r in &receipt_rows {
1508 let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1509 receipts_by_block.entry(block_num).or_default().push(r);
1510 }
1511
1512 let mut all_receipts = Vec::with_capacity(headers.len());
1514 for (&block_num, header) in &headers {
1515 let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1516 let mut prior_cumulative_gas = 0u64;
1517 let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1518 .into_iter()
1519 .map(|rr: &sqlx::any::AnyRow| {
1520 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1521 let tx_hash = rr.get(COL_TX_HASH);
1522 let sender = rr.get(COL_FROM_ADDRESS);
1523 let tx_type: i32 = rr.get(COL_TX_TYPE);
1524 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1525 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1526 let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1527 let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1528 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1529 .map_err(ColdStorageError::from)?;
1530 debug_assert!(
1533 receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1534 "cumulative gas decreased: {} < {}",
1535 receipt.inner.cumulative_gas_used,
1536 prior_cumulative_gas,
1537 );
1538 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1539 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1540 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1541 Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1542 })
1543 .collect();
1544 all_receipts.push(block_receipts?);
1545 }
1546
1547 delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1549
1550 tx.commit().await.map_err(SqlColdError::from)?;
1551 Ok(all_receipts)
1552 }
1553}
1554
1555#[cfg(all(test, feature = "test-utils"))]
1556mod tests {
1557 use super::*;
1558 use signet_cold::conformance::conformance;
1559
1560 #[tokio::test]
1561 async fn sqlite_conformance() {
1562 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1563 conformance(backend).await.unwrap();
1564 }
1565
1566 #[tokio::test]
1567 async fn pg_conformance() {
1568 let Ok(url) = std::env::var("DATABASE_URL") else {
1569 eprintln!("skipping pg conformance: DATABASE_URL not set");
1570 return;
1571 };
1572 let backend = SqlColdBackend::connect(&url).await.unwrap();
1573 conformance(backend).await.unwrap();
1574 }
1575}