1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14 COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15 COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16 COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17 COL_PARENT_HASH, COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS,
18 COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY,
19 COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1,
20 COL_TOPIC2, COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE,
21 COL_VALUE, COL_WITHDRAWALS_ROOT,
22};
23use crate::convert::{
24 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
25 decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
26 encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
27};
28use alloy::{
29 consensus::{
30 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
31 transaction::Recovered,
32 },
33 primitives::{
34 Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
35 },
36};
37use signet_cold::{
38 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
39 HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
40 ZenithHeaderSpecifier,
41};
42use signet_storage_types::{
43 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
44 TransactionSigned,
45};
46use signet_zenith::{
47 Passage::{Enter, EnterToken},
48 Transactor::Transact,
49 Zenith,
50};
51use sqlx::{AnyPool, Row};
52
53#[derive(Debug, Clone)]
73pub struct SqlColdBackend {
74 pool: AnyPool,
75 is_postgres: bool,
76}
77
78impl SqlColdBackend {
79 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
86 let conn = pool.acquire().await?;
88 let backend = conn.backend_name().to_owned();
89 drop(conn);
90
91 let migration = match backend.as_str() {
92 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
93 "SQLite" => include_str!("../migrations/001_initial.sql"),
94 other => {
95 return Err(SqlColdError::Convert(format!(
96 "unsupported database backend: {other}"
97 )));
98 }
99 };
100 let is_postgres = backend == "PostgreSQL";
103 sqlx::raw_sql(migration).execute(&pool).await?;
104 Ok(Self { pool, is_postgres })
105 }
106
107 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
116 sqlx::any::install_default_drivers();
117 let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
118 Self::new(pool).await
119 }
120
121 async fn resolve_header_spec(
126 &self,
127 spec: HeaderSpecifier,
128 ) -> Result<Option<BlockNumber>, SqlColdError> {
129 match spec {
130 HeaderSpecifier::Number(n) => Ok(Some(n)),
131 HeaderSpecifier::Hash(hash) => {
132 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
133 .bind(hash)
134 .fetch_optional(&self.pool)
135 .await?;
136 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
137 }
138 }
139 }
140
141 async fn fetch_header_by_number(
146 &self,
147 block_num: BlockNumber,
148 ) -> Result<Option<SealedHeader>, SqlColdError> {
149 let bn = to_i64(block_num);
150 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
151 .bind(bn)
152 .fetch_optional(&self.pool)
153 .await?;
154
155 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
156 }
157
158 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
163 let mut tx = self.pool.begin().await?;
164 write_block_to_tx(&mut tx, data).await?;
165 tx.commit().await?;
166 Ok(())
167 }
168
169 #[cfg(feature = "postgres")]
179 async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
180 use tokio_stream::StreamExt;
181
182 macro_rules! try_stream {
184 ($sender:expr, $expr:expr) => {
185 match $expr {
186 Ok(v) => v,
187 Err(e) => {
188 let _ = $sender
189 .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
190 .await;
191 return;
192 }
193 }
194 };
195 }
196
197 let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
198
199 let mut tx = try_stream!(sender, self.pool.begin().await);
203 try_stream!(
204 sender,
205 sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
206 );
207
208 let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
212 let data_sql = format!(
213 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
214 (r.first_log_index + l.log_index) AS block_log_index \
215 FROM logs l \
216 JOIN headers h ON l.block_number = h.block_number \
217 JOIN transactions t ON l.block_number = t.block_number \
218 AND l.tx_index = t.tx_index \
219 JOIN receipts r ON l.block_number = r.block_number \
220 AND l.tx_index = r.tx_index \
221 WHERE l.block_number = $1{filter_clause} \
222 ORDER BY l.tx_index, l.log_index"
223 );
224
225 let mut total = 0usize;
226
227 for block_num in from..=to {
230 if tokio::time::Instant::now() > deadline {
233 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
234 return;
235 }
236
237 let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
238 for param in &filter_params {
239 query = query.bind(*param);
240 }
241
242 let mut stream = query.fetch(&mut *tx);
246 while let Some(row_result) = stream.next().await {
247 let r = try_stream!(sender, row_result);
248
249 total += 1;
251 if total > max_logs {
252 let _ =
253 sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
254 return;
255 }
256
257 let log = log_from_row(&r);
258 let rpc_log = RpcLog {
259 inner: log,
260 block_hash: Some(r.get(COL_BLOCK_HASH)),
261 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
262 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
263 transaction_hash: Some(r.get(COL_TX_HASH)),
264 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
265 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
266 removed: false,
267 };
268 match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
271 Ok(Ok(())) => {}
272 Ok(Err(_)) => return, Err(_) => {
274 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
275 return;
276 }
277 }
278 }
279
280 if total >= max_logs {
283 return;
284 }
285 }
286 }
287}
288
289fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
295 r.get(col)
296}
297
298fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
300 r.get(col)
301}
302
303fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
305 Ok(Header {
306 parent_hash: r.get(COL_PARENT_HASH),
307 ommers_hash: r.get(COL_OMMERS_HASH),
308 beneficiary: r.get(COL_BENEFICIARY),
309 state_root: r.get(COL_STATE_ROOT),
310 transactions_root: r.get(COL_TRANSACTIONS_ROOT),
311 receipts_root: r.get(COL_RECEIPTS_ROOT),
312 logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
313 difficulty: r.get(COL_DIFFICULTY),
314 number: from_i64(r.get(COL_BLOCK_NUMBER)),
315 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
316 gas_used: from_i64(r.get(COL_GAS_USED)),
317 timestamp: from_i64(r.get(COL_TIMESTAMP)),
318 extra_data: r.get(COL_EXTRA_DATA),
319 mix_hash: r.get(COL_MIX_HASH),
320 nonce: r.get(COL_NONCE),
321 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
322 withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
323 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
324 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
325 parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
326 requests_hash: r.get(COL_REQUESTS_HASH),
327 })
328}
329
330fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
332 use alloy::consensus::EthereumTxEnvelope;
333
334 let sig =
335 Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
336
337 let tx_type_raw = r.get::<i32, _>(COL_TX_TYPE) as u8;
338 let tx_type = TxType::try_from(tx_type_raw)
339 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?;
340
341 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
342 let nonce = from_i64(r.get(COL_NONCE));
343 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
344 let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
345 let value: U256 = r.get(COL_VALUE);
346 let input: Bytes = r.get(COL_INPUT);
347
348 match tx_type {
349 TxType::Legacy => {
350 let tx = TxLegacy {
351 chain_id: chain_id.map(from_i64),
352 nonce,
353 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
354 gas_limit,
355 to: to_addr.map_or(TxKind::Create, TxKind::Call),
356 value,
357 input,
358 };
359 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
360 }
361 TxType::Eip2930 => {
362 let tx = TxEip2930 {
363 chain_id: from_i64(
364 chain_id
365 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
366 ),
367 nonce,
368 gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
369 gas_limit,
370 to: to_addr.map_or(TxKind::Create, TxKind::Call),
371 value,
372 input,
373 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
374 };
375 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
376 }
377 TxType::Eip1559 => {
378 let tx = TxEip1559 {
379 chain_id: from_i64(
380 chain_id
381 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
382 ),
383 nonce,
384 gas_limit,
385 max_fee_per_gas: decode_u128_required(
386 opt_blob(r, COL_MAX_FEE_PER_GAS),
387 COL_MAX_FEE_PER_GAS,
388 )?,
389 max_priority_fee_per_gas: decode_u128_required(
390 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
391 COL_MAX_PRIORITY_FEE_PER_GAS,
392 )?,
393 to: to_addr.map_or(TxKind::Create, TxKind::Call),
394 value,
395 input,
396 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
397 };
398 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
399 }
400 TxType::Eip4844 => {
401 let tx = TxEip4844 {
402 chain_id: from_i64(
403 chain_id
404 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
405 ),
406 nonce,
407 gas_limit,
408 max_fee_per_gas: decode_u128_required(
409 opt_blob(r, COL_MAX_FEE_PER_GAS),
410 COL_MAX_FEE_PER_GAS,
411 )?,
412 max_priority_fee_per_gas: decode_u128_required(
413 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
414 COL_MAX_PRIORITY_FEE_PER_GAS,
415 )?,
416 to: to_addr
417 .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
418 value,
419 input,
420 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
421 blob_versioned_hashes: decode_b256_vec(
422 opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
423 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
424 })?,
425 )?,
426 max_fee_per_blob_gas: decode_u128_required(
427 opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
428 COL_MAX_FEE_PER_BLOB_GAS,
429 )?,
430 };
431 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
432 }
433 TxType::Eip7702 => {
434 let tx = TxEip7702 {
435 chain_id: from_i64(
436 chain_id
437 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
438 ),
439 nonce,
440 gas_limit,
441 max_fee_per_gas: decode_u128_required(
442 opt_blob(r, COL_MAX_FEE_PER_GAS),
443 COL_MAX_FEE_PER_GAS,
444 )?,
445 max_priority_fee_per_gas: decode_u128_required(
446 opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
447 COL_MAX_PRIORITY_FEE_PER_GAS,
448 )?,
449 to: to_addr
450 .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
451 value,
452 input,
453 access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
454 authorization_list: decode_authorization_list(
455 opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
456 SqlColdError::Convert("EIP7702 requires authorization_list".into())
457 })?,
458 )?,
459 };
460 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
461 }
462 }
463}
464
465fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
467 let sender: Address = r.get(COL_FROM_ADDRESS);
468 let tx = tx_from_row(r)?;
469 Ok(Recovered::new_unchecked(tx, sender))
471}
472
473fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
475 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
476 .into_iter()
477 .filter_map(|col| r.get::<Option<B256>, _>(col))
478 .collect();
479 Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
480}
481
482fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
484 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
485 let order = from_i64(r.get(COL_ORDER_INDEX));
486 let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
487
488 match event_type {
489 EVENT_TRANSACT => {
490 let sender: Address = r
491 .get::<Option<Address>, _>(COL_SENDER)
492 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
493 let to: Address = r
494 .get::<Option<Address>, _>(COL_TO_ADDRESS)
495 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
496 let value: U256 = r
497 .get::<Option<U256>, _>(COL_VALUE)
498 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
499 let gas: U256 = r
500 .get::<Option<U256>, _>(COL_GAS)
501 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
502 let max_fee: U256 = r
503 .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
504 .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
505 let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
506
507 Ok(DbSignetEvent::Transact(
508 order,
509 Transact {
510 rollupChainId: rollup_chain_id,
511 sender,
512 to,
513 value,
514 gas,
515 maxFeePerGas: max_fee,
516 data,
517 },
518 ))
519 }
520 EVENT_ENTER => {
521 let recipient: Address = r
522 .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
523 .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
524 let amount: U256 = r
525 .get::<Option<U256>, _>(COL_AMOUNT)
526 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
527
528 Ok(DbSignetEvent::Enter(
529 order,
530 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
531 ))
532 }
533 EVENT_ENTER_TOKEN => {
534 let token: Address = r
535 .get::<Option<Address>, _>(COL_TOKEN)
536 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
537 let recipient: Address =
538 r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
539 SqlColdError::Convert("EnterToken requires rollup_recipient".into())
540 })?;
541 let amount: U256 = r
542 .get::<Option<U256>, _>(COL_AMOUNT)
543 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
544
545 Ok(DbSignetEvent::EnterToken(
546 order,
547 EnterToken {
548 rollupChainId: rollup_chain_id,
549 token,
550 rollupRecipient: recipient,
551 amount,
552 },
553 ))
554 }
555 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
556 }
557}
558
559fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
561 Ok(DbZenithHeader(Zenith::BlockHeader {
562 hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
563 rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
564 gasLimit: r.get(COL_GAS_LIMIT),
565 rewardAddress: r.get(COL_REWARD_ADDRESS),
566 blockDataHash: r.get(COL_BLOCK_DATA_HASH),
567 }))
568}
569
570async fn write_block_to_tx(
576 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
577 data: BlockData,
578) -> Result<(), SqlColdError> {
579 let bn = to_i64(data.block_number());
580
581 let block_hash = data.header.hash_slow();
583 let difficulty = &data.header.difficulty;
584 sqlx::query(
585 "INSERT INTO headers (
586 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
587 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
588 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
589 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
590 parent_beacon_block_root, requests_hash
591 ) VALUES (
592 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
593 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
594 )",
595 )
596 .bind(bn)
597 .bind(block_hash)
598 .bind(data.header.parent_hash)
599 .bind(data.header.ommers_hash)
600 .bind(data.header.beneficiary)
601 .bind(data.header.state_root)
602 .bind(data.header.transactions_root)
603 .bind(data.header.receipts_root)
604 .bind(data.header.logs_bloom)
605 .bind(difficulty)
606 .bind(to_i64(data.header.gas_limit))
607 .bind(to_i64(data.header.gas_used))
608 .bind(to_i64(data.header.timestamp))
609 .bind(&data.header.extra_data)
610 .bind(data.header.mix_hash)
611 .bind(data.header.nonce)
612 .bind(data.header.base_fee_per_gas.map(to_i64))
613 .bind(data.header.withdrawals_root.as_ref())
614 .bind(data.header.blob_gas_used.map(to_i64))
615 .bind(data.header.excess_blob_gas.map(to_i64))
616 .bind(data.header.parent_beacon_block_root.as_ref())
617 .bind(data.header.requests_hash.as_ref())
618 .execute(&mut **tx)
619 .await?;
620
621 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
623 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
624 }
625
626 let mut first_log_index = 0i64;
629 for (idx, receipt) in data.receipts.iter().enumerate() {
630 let tx_idx = to_i64(idx as u64);
631 sqlx::query(
632 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
633 VALUES ($1, $2, $3, $4, $5, $6)",
634 )
635 .bind(bn)
636 .bind(tx_idx)
637 .bind(receipt.tx_type as i32)
638 .bind(receipt.inner.status.coerce_status() as i32)
639 .bind(to_i64(receipt.inner.cumulative_gas_used))
640 .bind(first_log_index)
641 .execute(&mut **tx)
642 .await?;
643 first_log_index += receipt.inner.logs.len() as i64;
644
645 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
646 let topics = log.topics();
647 sqlx::query(
648 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
649 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
650 )
651 .bind(bn)
652 .bind(tx_idx)
653 .bind(to_i64(log_idx as u64))
654 .bind(log.address)
655 .bind(topics.first())
656 .bind(topics.get(1))
657 .bind(topics.get(2))
658 .bind(topics.get(3))
659 .bind(&log.data.data)
660 .execute(&mut **tx)
661 .await?;
662 }
663 }
664
665 for (idx, event) in data.signet_events.iter().enumerate() {
667 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
668 }
669
670 if let Some(zh) = &data.zenith_header {
672 let h = &zh.0;
673 sqlx::query(
674 "INSERT INTO zenith_headers (
675 block_number, host_block_number, rollup_chain_id,
676 gas_limit, reward_address, block_data_hash
677 ) VALUES ($1, $2, $3, $4, $5, $6)",
678 )
679 .bind(bn)
680 .bind(h.hostBlockNumber)
681 .bind(h.rollupChainId)
682 .bind(h.gasLimit)
683 .bind(h.rewardAddress)
684 .bind(h.blockDataHash)
685 .execute(&mut **tx)
686 .await?;
687 }
688
689 Ok(())
690}
691
692async fn insert_transaction(
694 conn: &mut sqlx::AnyConnection,
695 bn: i64,
696 tx_index: i64,
697 recovered: &RecoveredTx,
698) -> Result<(), SqlColdError> {
699 use alloy::consensus::EthereumTxEnvelope;
700
701 let sender = recovered.signer();
702 let tx: &TransactionSigned = recovered;
703 let tx_hash = tx.tx_hash();
704 let tx_type = tx.tx_type() as i32;
705
706 macro_rules! sig {
707 ($s:expr) => {{
708 let sig = $s.signature();
709 (sig.v() as i32, sig.r(), sig.s())
710 }};
711 }
712 let (sig_y, sig_r, sig_s) = match tx {
713 EthereumTxEnvelope::Legacy(s) => sig!(s),
714 EthereumTxEnvelope::Eip2930(s) => sig!(s),
715 EthereumTxEnvelope::Eip1559(s) => sig!(s),
716 EthereumTxEnvelope::Eip4844(s) => sig!(s),
717 EthereumTxEnvelope::Eip7702(s) => sig!(s),
718 };
719
720 let (chain_id, nonce, gas_limit) = match tx {
721 EthereumTxEnvelope::Legacy(s) => {
722 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
723 }
724 EthereumTxEnvelope::Eip2930(s) => {
725 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
726 }
727 EthereumTxEnvelope::Eip1559(s) => {
728 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
729 }
730 EthereumTxEnvelope::Eip4844(s) => {
731 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
732 }
733 EthereumTxEnvelope::Eip7702(s) => {
734 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
735 }
736 };
737
738 let (value, to_addr) = match tx {
739 EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
740 EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
741 EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
742 EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
743 EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
744 };
745
746 let input: &[u8] = match tx {
747 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
748 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
749 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
750 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
751 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
752 };
753
754 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
755 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
756 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
757 EthereumTxEnvelope::Eip1559(s) => (
758 None,
759 Some(encode_u128(s.tx().max_fee_per_gas)),
760 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
761 None,
762 ),
763 EthereumTxEnvelope::Eip4844(s) => (
764 None,
765 Some(encode_u128(s.tx().max_fee_per_gas)),
766 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
767 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
768 ),
769 EthereumTxEnvelope::Eip7702(s) => (
770 None,
771 Some(encode_u128(s.tx().max_fee_per_gas)),
772 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
773 None,
774 ),
775 };
776
777 let (access_list, blob_hashes, auth_list) = match tx {
778 EthereumTxEnvelope::Legacy(_) => (None, None, None),
779 EthereumTxEnvelope::Eip2930(s) => {
780 (Some(encode_access_list(&s.tx().access_list)), None, None)
781 }
782 EthereumTxEnvelope::Eip1559(s) => {
783 (Some(encode_access_list(&s.tx().access_list)), None, None)
784 }
785 EthereumTxEnvelope::Eip4844(s) => (
786 Some(encode_access_list(&s.tx().access_list)),
787 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
788 None,
789 ),
790 EthereumTxEnvelope::Eip7702(s) => (
791 Some(encode_access_list(&s.tx().access_list)),
792 None,
793 Some(encode_authorization_list(&s.tx().authorization_list)),
794 ),
795 };
796
797 sqlx::query(
798 "INSERT INTO transactions (
799 block_number, tx_index, tx_hash, tx_type,
800 sig_y_parity, sig_r, sig_s,
801 chain_id, nonce, gas_limit, to_address, value, input,
802 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
803 max_fee_per_blob_gas, blob_versioned_hashes,
804 access_list, authorization_list, from_address
805 ) VALUES (
806 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
807 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
808 )",
809 )
810 .bind(bn)
811 .bind(tx_index)
812 .bind(tx_hash)
813 .bind(tx_type)
814 .bind(sig_y)
815 .bind(sig_r)
816 .bind(sig_s)
817 .bind(chain_id)
818 .bind(nonce)
819 .bind(gas_limit)
820 .bind(to_addr)
821 .bind(value)
822 .bind(input)
823 .bind(gas_price.as_ref().map(|v| v.as_slice()))
824 .bind(max_fee.as_ref().map(|v| v.as_slice()))
825 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
826 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
827 .bind(blob_hashes.as_deref())
828 .bind(access_list.as_deref())
829 .bind(auth_list.as_deref())
830 .bind(sender)
831 .execute(&mut *conn)
832 .await?;
833
834 Ok(())
835}
836
837async fn insert_signet_event(
839 conn: &mut sqlx::AnyConnection,
840 block_number: i64,
841 event_index: i64,
842 event: &DbSignetEvent,
843) -> Result<(), SqlColdError> {
844 let (event_type, order, chain_id) = match event {
845 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
846 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
847 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
848 };
849
850 let (value, gas, max_fee, amount) = match event {
851 DbSignetEvent::Transact(_, t) => {
852 (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
853 }
854 DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
855 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
856 };
857
858 sqlx::query(
859 "INSERT INTO signet_events (
860 block_number, event_index, event_type, order_index,
861 rollup_chain_id, sender, to_address, value, gas,
862 max_fee_per_gas, data, rollup_recipient, amount, token
863 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
864 )
865 .bind(block_number)
866 .bind(event_index)
867 .bind(event_type)
868 .bind(order)
869 .bind(chain_id)
870 .bind(match event {
871 DbSignetEvent::Transact(_, t) => Some(&t.sender),
872 _ => None,
873 })
874 .bind(match event {
875 DbSignetEvent::Transact(_, t) => Some(&t.to),
876 _ => None,
877 })
878 .bind(value)
879 .bind(gas)
880 .bind(max_fee)
881 .bind(match event {
882 DbSignetEvent::Transact(_, t) => Some(&t.data),
883 _ => None,
884 })
885 .bind(match event {
886 DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
887 DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
888 _ => None,
889 })
890 .bind(amount)
891 .bind(match event {
892 DbSignetEvent::EnterToken(_, e) => Some(&e.token),
893 _ => None,
894 })
895 .execute(&mut *conn)
896 .await?;
897
898 Ok(())
899}
900
901fn append_filter_clause<'a>(
911 clause: &mut String,
912 params: &mut Vec<&'a [u8]>,
913 mut idx: u32,
914 column: &str,
915 values: impl ExactSizeIterator<Item = &'a [u8]>,
916) -> u32 {
917 use std::fmt::Write;
918
919 let len = values.len();
920 if len == 1 {
921 write!(clause, " AND {column} = ${idx}").unwrap();
922 values.for_each(|v| params.push(v));
923 return idx + 1;
924 }
925 write!(clause, " AND {column} IN (").unwrap();
926 for (i, v) in values.enumerate() {
927 if i > 0 {
928 clause.push_str(", ");
929 }
930 write!(clause, "${idx}").unwrap();
931 params.push(v);
932 idx += 1;
933 }
934 clause.push(')');
935 idx
936}
937
938fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
939 let mut clause = String::new();
940 let mut params: Vec<&[u8]> = Vec::new();
941 let mut idx = start_idx;
942
943 if !filter.address.is_empty() {
944 idx = append_filter_clause(
945 &mut clause,
946 &mut params,
947 idx,
948 "l.address",
949 filter.address.iter().map(|a| a.as_slice()),
950 );
951 }
952
953 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
954 for (i, topic_filter) in filter.topics.iter().enumerate() {
955 if topic_filter.is_empty() {
956 continue;
957 }
958 idx = append_filter_clause(
959 &mut clause,
960 &mut params,
961 idx,
962 topic_cols[i],
963 topic_filter.iter().map(|v| v.as_slice()),
964 );
965 }
966
967 (clause, params)
968}
969
970impl ColdStorage for SqlColdBackend {
975 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
976 let Some(block_num) = self.resolve_header_spec(spec).await? else {
977 return Ok(None);
978 };
979 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
980 }
981
982 async fn get_headers(
983 &self,
984 specs: Vec<HeaderSpecifier>,
985 ) -> ColdResult<Vec<Option<SealedHeader>>> {
986 let mut results = Vec::with_capacity(specs.len());
987 for spec in specs {
988 let header = self.get_header(spec).await?;
989 results.push(header);
990 }
991 Ok(results)
992 }
993
994 async fn get_transaction(
995 &self,
996 spec: TransactionSpecifier,
997 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
998 let row = match spec {
999 TransactionSpecifier::Hash(hash) => sqlx::query(
1000 "SELECT t.*, h.block_hash
1001 FROM transactions t
1002 JOIN headers h ON t.block_number = h.block_number
1003 WHERE t.tx_hash = $1",
1004 )
1005 .bind(hash)
1006 .fetch_optional(&self.pool)
1007 .await
1008 .map_err(SqlColdError::from)?,
1009 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1010 "SELECT t.*, h.block_hash
1011 FROM transactions t
1012 JOIN headers h ON t.block_number = h.block_number
1013 WHERE t.block_number = $1 AND t.tx_index = $2",
1014 )
1015 .bind(to_i64(block))
1016 .bind(to_i64(index))
1017 .fetch_optional(&self.pool)
1018 .await
1019 .map_err(SqlColdError::from)?,
1020 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1021 "SELECT t.*, h.block_hash
1022 FROM transactions t
1023 JOIN headers h ON t.block_number = h.block_number
1024 WHERE h.block_hash = $1 AND t.tx_index = $2",
1025 )
1026 .bind(block_hash)
1027 .bind(to_i64(index))
1028 .fetch_optional(&self.pool)
1029 .await
1030 .map_err(SqlColdError::from)?,
1031 };
1032
1033 let Some(r) = row else {
1034 return Ok(None);
1035 };
1036
1037 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1038 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1039 let block_hash = r.get(COL_BLOCK_HASH);
1040 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1041 let meta = ConfirmationMeta::new(block, block_hash, index);
1042 Ok(Some(Confirmed::new(recovered, meta)))
1043 }
1044
1045 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1046 let bn = to_i64(block);
1047 let rows =
1048 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1049 .bind(bn)
1050 .fetch_all(&self.pool)
1051 .await
1052 .map_err(SqlColdError::from)?;
1053
1054 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1055 }
1056
1057 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1058 let bn = to_i64(block);
1059 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1060 .bind(bn)
1061 .fetch_one(&self.pool)
1062 .await
1063 .map_err(SqlColdError::from)?;
1064
1065 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1066 }
1067
1068 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1069 let (block, index) = match spec {
1071 ReceiptSpecifier::TxHash(hash) => {
1072 let row = sqlx::query(
1073 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1074 )
1075 .bind(hash)
1076 .fetch_optional(&self.pool)
1077 .await
1078 .map_err(SqlColdError::from)?;
1079 let Some(r) = row else { return Ok(None) };
1080 (
1081 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1082 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1083 )
1084 }
1085 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1086 };
1087
1088 let Some(header) = self.fetch_header_by_number(block).await? else {
1089 return Ok(None);
1090 };
1091
1092 let receipt_row = sqlx::query(
1094 "SELECT r.*, t.tx_hash, t.from_address
1095 FROM receipts r
1096 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1097 WHERE r.block_number = $1 AND r.tx_index = $2",
1098 )
1099 .bind(to_i64(block))
1100 .bind(to_i64(index))
1101 .fetch_optional(&self.pool)
1102 .await
1103 .map_err(SqlColdError::from)?;
1104
1105 let Some(rr) = receipt_row else {
1106 return Ok(None);
1107 };
1108
1109 let bn: i64 = rr.get(COL_BLOCK_NUMBER);
1110 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1111 let tx_hash = rr.get(COL_TX_HASH);
1112 let sender = rr.get(COL_FROM_ADDRESS);
1113 let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1114 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1115 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1116
1117 let log_rows = sqlx::query(
1118 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1119 )
1120 .bind(bn)
1121 .bind(tx_idx)
1122 .fetch_all(&self.pool)
1123 .await
1124 .map_err(SqlColdError::from)?;
1125
1126 let logs = log_rows.iter().map(log_from_row).collect();
1127 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1128 .map_err(ColdStorageError::from)?;
1129
1130 let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1133 let prior = sqlx::query(
1134 "SELECT CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
1135 FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
1136 )
1137 .bind(to_i64(block))
1138 .bind(to_i64(index))
1139 .fetch_one(&self.pool)
1140 .await
1141 .map_err(SqlColdError::from)?;
1142 let prior_cumulative_gas: u64 =
1143 prior.get::<Option<i64>, _>(COL_PRIOR_GAS).unwrap_or(0) as u64;
1144 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1145
1146 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1147 Ok(Some(ColdReceipt::new(ir, &header, index)))
1148 }
1149
1150 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1151 let Some(header) =
1152 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1153 else {
1154 return Ok(Vec::new());
1155 };
1156
1157 let bn = to_i64(block);
1158
1159 let receipt_rows = sqlx::query(
1161 "SELECT r.*, t.tx_hash, t.from_address
1162 FROM receipts r
1163 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1164 WHERE r.block_number = $1
1165 ORDER BY r.tx_index",
1166 )
1167 .bind(bn)
1168 .fetch_all(&self.pool)
1169 .await
1170 .map_err(SqlColdError::from)?;
1171
1172 let all_log_rows =
1173 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1174 .bind(bn)
1175 .fetch_all(&self.pool)
1176 .await
1177 .map_err(SqlColdError::from)?;
1178
1179 let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<Log>> =
1181 std::collections::BTreeMap::new();
1182 for r in &all_log_rows {
1183 let tx_idx: i64 = r.get(COL_TX_INDEX);
1184 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1185 }
1186
1187 let mut first_log_index = 0u64;
1188 let mut prior_cumulative_gas = 0u64;
1189 receipt_rows
1190 .into_iter()
1191 .enumerate()
1192 .map(|(idx, rr)| {
1193 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1194 let tx_hash = rr.get(COL_TX_HASH);
1195 let sender = rr.get(COL_FROM_ADDRESS);
1196 let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1197 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1198 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1199 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1200 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1201 .map_err(ColdStorageError::from)?;
1202 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1203 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1204 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1205 first_log_index += ir.receipt.inner.logs.len() as u64;
1206 Ok(ColdReceipt::new(ir, &header, idx as u64))
1207 })
1208 .collect()
1209 }
1210
1211 async fn get_signet_events(
1212 &self,
1213 spec: SignetEventsSpecifier,
1214 ) -> ColdResult<Vec<DbSignetEvent>> {
1215 let rows = match spec {
1216 SignetEventsSpecifier::Block(block) => {
1217 let bn = to_i64(block);
1218 sqlx::query(
1219 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1220 )
1221 .bind(bn)
1222 .fetch_all(&self.pool)
1223 .await
1224 .map_err(SqlColdError::from)?
1225 }
1226 SignetEventsSpecifier::BlockRange { start, end } => {
1227 let s = to_i64(start);
1228 let e = to_i64(end);
1229 sqlx::query(
1230 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1231 ORDER BY block_number, event_index",
1232 )
1233 .bind(s)
1234 .bind(e)
1235 .fetch_all(&self.pool)
1236 .await
1237 .map_err(SqlColdError::from)?
1238 }
1239 };
1240
1241 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1242 }
1243
1244 async fn get_zenith_header(
1245 &self,
1246 spec: ZenithHeaderSpecifier,
1247 ) -> ColdResult<Option<DbZenithHeader>> {
1248 let block = match spec {
1249 ZenithHeaderSpecifier::Number(n) => n,
1250 ZenithHeaderSpecifier::Range { start, .. } => start,
1251 };
1252 let bn = to_i64(block);
1253 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1254 .bind(bn)
1255 .fetch_optional(&self.pool)
1256 .await
1257 .map_err(SqlColdError::from)?;
1258
1259 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1260 }
1261
1262 async fn get_zenith_headers(
1263 &self,
1264 spec: ZenithHeaderSpecifier,
1265 ) -> ColdResult<Vec<DbZenithHeader>> {
1266 let rows = match spec {
1267 ZenithHeaderSpecifier::Number(n) => {
1268 let bn = to_i64(n);
1269 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1270 .bind(bn)
1271 .fetch_all(&self.pool)
1272 .await
1273 .map_err(SqlColdError::from)?
1274 }
1275 ZenithHeaderSpecifier::Range { start, end } => {
1276 let s = to_i64(start);
1277 let e = to_i64(end);
1278 sqlx::query(
1279 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1280 ORDER BY block_number",
1281 )
1282 .bind(s)
1283 .bind(e)
1284 .fetch_all(&self.pool)
1285 .await
1286 .map_err(SqlColdError::from)?
1287 }
1288 };
1289
1290 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1291 }
1292
1293 async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1294 let from = filter.get_from_block().unwrap_or(0);
1295 let to = filter.get_to_block().unwrap_or(u64::MAX);
1296
1297 let (filter_clause, params) = build_log_filter_clause(filter, 3);
1299 let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1300
1301 let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}");
1304 let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to));
1305 for param in ¶ms {
1306 count_query = count_query.bind(*param);
1307 }
1308 let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?;
1309 let count = from_i64(count_row.get::<i64, _>(COL_CNT)) as usize;
1310 if count > max_logs {
1311 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1312 }
1313
1314 let data_sql = format!(
1317 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1318 (r.first_log_index + l.log_index) AS block_log_index \
1319 FROM logs l \
1320 JOIN headers h ON l.block_number = h.block_number \
1321 JOIN transactions t ON l.block_number = t.block_number \
1322 AND l.tx_index = t.tx_index \
1323 JOIN receipts r ON l.block_number = r.block_number \
1324 AND l.tx_index = r.tx_index \
1325 WHERE {where_clause} \
1326 ORDER BY l.block_number, l.tx_index, l.log_index"
1327 );
1328 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1329 for param in ¶ms {
1330 query = query.bind(*param);
1331 }
1332
1333 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1334
1335 rows.into_iter()
1336 .map(|r| {
1337 let log = log_from_row(&r);
1338 Ok(RpcLog {
1339 inner: log,
1340 block_hash: Some(r.get(COL_BLOCK_HASH)),
1341 block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1342 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1343 transaction_hash: Some(r.get(COL_TX_HASH)),
1344 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1345 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1346 removed: false,
1347 })
1348 })
1349 .collect::<ColdResult<Vec<_>>>()
1350 }
1351
1352 async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1353 #[cfg(feature = "postgres")]
1354 if self.is_postgres {
1355 return self.produce_log_stream_pg(filter, params).await;
1356 }
1357 signet_cold::produce_log_stream_default(self, filter, params).await;
1358 }
1359
1360 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1361 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1362 .fetch_one(&self.pool)
1363 .await
1364 .map_err(SqlColdError::from)?;
1365 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1366 }
1367
1368 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1369 self.insert_block(data).await.map_err(ColdStorageError::from)
1370 }
1371
1372 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1373 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1374 for block_data in data {
1375 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1376 }
1377 tx.commit().await.map_err(SqlColdError::from)?;
1378 Ok(())
1379 }
1380
1381 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1382 let bn = to_i64(block);
1383 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1384
1385 for table in
1387 ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
1388 {
1389 sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
1390 .bind(bn)
1391 .execute(&mut *tx)
1392 .await
1393 .map_err(SqlColdError::from)?;
1394 }
1395
1396 tx.commit().await.map_err(SqlColdError::from)?;
1397 Ok(())
1398 }
1399}
1400
1401#[cfg(all(test, feature = "test-utils"))]
1402mod tests {
1403 use super::*;
1404 use signet_cold::conformance::conformance;
1405
1406 #[tokio::test]
1407 async fn sqlite_conformance() {
1408 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1409 conformance(backend).await.unwrap();
1410 }
1411
1412 #[tokio::test]
1413 async fn pg_conformance() {
1414 let Ok(url) = std::env::var("DATABASE_URL") else {
1415 eprintln!("skipping pg conformance: DATABASE_URL not set");
1416 return;
1417 };
1418 let backend = SqlColdBackend::connect(&url).await.unwrap();
1419 conformance(backend).await.unwrap();
1420 }
1421}