1use crate::SqlColdError;
8use crate::columns::{
9 COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10 COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11 COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12 COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13 COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FROM_ADDRESS, COL_GAS, COL_GAS_LIMIT, COL_GAS_PRICE,
14 COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOG_COUNT, COL_LOGS_BLOOM, COL_MAX_BN,
15 COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, COL_MIX_HASH,
16 COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, COL_PARENT_HASH,
17 COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID,
18 COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT,
19 COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2,
20 COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE,
21 COL_WITHDRAWALS_ROOT,
22};
23use crate::convert::{
24 EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
25 decode_authorization_list, decode_b256_vec, decode_u128_required, decode_u256,
26 encode_access_list, encode_authorization_list, encode_b256_vec, encode_u128, encode_u256,
27 from_address, from_i64, to_address, to_i64,
28};
29use alloy::{
30 consensus::{
31 Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32 transaction::Recovered,
33 },
34 primitives::{Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature},
35};
36use signet_cold::{
37 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
38 HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
39 ZenithHeaderSpecifier,
40};
41use signet_storage_types::{
42 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
43 TransactionSigned,
44};
45use signet_zenith::{
46 Passage::{Enter, EnterToken},
47 Transactor::Transact,
48 Zenith,
49};
50use sqlx::{AnyPool, Row};
51
52#[derive(Debug, Clone)]
72pub struct SqlColdBackend {
73 pool: AnyPool,
74}
75
76impl SqlColdBackend {
77 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
84 let conn = pool.acquire().await?;
86 let backend = conn.backend_name().to_owned();
87 drop(conn);
88
89 let migration = match backend.as_str() {
90 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
91 "SQLite" => include_str!("../migrations/001_initial.sql"),
92 other => {
93 return Err(SqlColdError::Convert(format!(
94 "unsupported database backend: {other}"
95 )));
96 }
97 };
98 sqlx::raw_sql(migration).execute(&pool).await?;
101 Ok(Self { pool })
102 }
103
104 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
113 sqlx::any::install_default_drivers();
114 let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
115 Self::new(pool).await
116 }
117
118 async fn resolve_header_spec(
123 &self,
124 spec: HeaderSpecifier,
125 ) -> Result<Option<BlockNumber>, SqlColdError> {
126 match spec {
127 HeaderSpecifier::Number(n) => Ok(Some(n)),
128 HeaderSpecifier::Hash(hash) => {
129 let hash_bytes = hash.as_slice();
130 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
131 .bind(hash_bytes)
132 .fetch_optional(&self.pool)
133 .await?;
134 Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
135 }
136 }
137 }
138
139 async fn fetch_header_by_number(
144 &self,
145 block_num: BlockNumber,
146 ) -> Result<Option<SealedHeader>, SqlColdError> {
147 let bn = to_i64(block_num);
148 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
149 .bind(bn)
150 .fetch_optional(&self.pool)
151 .await?;
152
153 row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
154 }
155
156 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
161 let mut tx = self.pool.begin().await?;
162 write_block_to_tx(&mut tx, data).await?;
163 tx.commit().await?;
164 Ok(())
165 }
166}
167
168fn blob(r: &sqlx::any::AnyRow, col: &str) -> Vec<u8> {
174 r.get(col)
175}
176
177fn opt_blob(r: &sqlx::any::AnyRow, col: &str) -> Option<Vec<u8>> {
179 r.get(col)
180}
181
182fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
184 Ok(Header {
185 parent_hash: B256::from_slice(&blob(r, COL_PARENT_HASH)),
186 ommers_hash: B256::from_slice(&blob(r, COL_OMMERS_HASH)),
187 beneficiary: Address::from_slice(&blob(r, COL_BENEFICIARY)),
188 state_root: B256::from_slice(&blob(r, COL_STATE_ROOT)),
189 transactions_root: B256::from_slice(&blob(r, COL_TRANSACTIONS_ROOT)),
190 receipts_root: B256::from_slice(&blob(r, COL_RECEIPTS_ROOT)),
191 logs_bloom: Bloom::from_slice(&blob(r, COL_LOGS_BLOOM)),
192 difficulty: decode_u256(&blob(r, COL_DIFFICULTY))?,
193 number: from_i64(r.get(COL_BLOCK_NUMBER)),
194 gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
195 gas_used: from_i64(r.get(COL_GAS_USED)),
196 timestamp: from_i64(r.get(COL_TIMESTAMP)),
197 extra_data: Bytes::from(blob(r, COL_EXTRA_DATA)),
198 mix_hash: B256::from_slice(&blob(r, COL_MIX_HASH)),
199 nonce: alloy::primitives::B64::from_slice(&blob(r, COL_NONCE)),
200 base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
201 withdrawals_root: opt_blob(r, COL_WITHDRAWALS_ROOT).map(|b| B256::from_slice(&b)),
202 blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
203 excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
204 parent_beacon_block_root: opt_blob(r, COL_PARENT_BEACON_BLOCK_ROOT)
205 .map(|b| B256::from_slice(&b)),
206 requests_hash: opt_blob(r, COL_REQUESTS_HASH).map(|b| B256::from_slice(&b)),
207 })
208}
209
210fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
212 use alloy::consensus::EthereumTxEnvelope;
213
214 let sig = Signature::new(
215 decode_u256(&r.get::<Vec<u8>, _>(COL_SIG_R))?,
216 decode_u256(&r.get::<Vec<u8>, _>(COL_SIG_S))?,
217 r.get::<i32, _>(COL_SIG_Y_PARITY) != 0,
218 );
219
220 let tx_type_raw = r.get::<i32, _>(COL_TX_TYPE) as u8;
221 let tx_type = TxType::try_from(tx_type_raw)
222 .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?;
223
224 let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
225 let nonce = from_i64(r.get(COL_NONCE));
226 let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
227 let to_addr = opt_blob(r, COL_TO_ADDRESS);
228 let value = decode_u256(&r.get::<Vec<u8>, _>(COL_VALUE))?;
229 let input = Bytes::from(r.get::<Vec<u8>, _>(COL_INPUT));
230
231 match tx_type {
232 TxType::Legacy => {
233 let tx = TxLegacy {
234 chain_id: chain_id.map(from_i64),
235 nonce,
236 gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
237 gas_limit,
238 to: from_address(to_addr.as_deref()),
239 value,
240 input,
241 };
242 Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
243 }
244 TxType::Eip2930 => {
245 let tx = TxEip2930 {
246 chain_id: from_i64(
247 chain_id
248 .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
249 ),
250 nonce,
251 gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
252 gas_limit,
253 to: from_address(to_addr.as_deref()),
254 value,
255 input,
256 access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
257 };
258 Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
259 }
260 TxType::Eip1559 => {
261 let tx = TxEip1559 {
262 chain_id: from_i64(
263 chain_id
264 .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
265 ),
266 nonce,
267 gas_limit,
268 max_fee_per_gas: decode_u128_required(
269 &opt_blob(r, COL_MAX_FEE_PER_GAS),
270 COL_MAX_FEE_PER_GAS,
271 )?,
272 max_priority_fee_per_gas: decode_u128_required(
273 &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
274 COL_MAX_PRIORITY_FEE_PER_GAS,
275 )?,
276 to: from_address(to_addr.as_deref()),
277 value,
278 input,
279 access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
280 };
281 Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
282 }
283 TxType::Eip4844 => {
284 let tx =
285 TxEip4844 {
286 chain_id: from_i64(chain_id.ok_or_else(|| {
287 SqlColdError::Convert("EIP4844 requires chain_id".into())
288 })?),
289 nonce,
290 gas_limit,
291 max_fee_per_gas: decode_u128_required(
292 &opt_blob(r, COL_MAX_FEE_PER_GAS),
293 COL_MAX_FEE_PER_GAS,
294 )?,
295 max_priority_fee_per_gas: decode_u128_required(
296 &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
297 COL_MAX_PRIORITY_FEE_PER_GAS,
298 )?,
299 to: Address::from_slice(to_addr.as_deref().ok_or_else(|| {
300 SqlColdError::Convert("EIP4844 requires to_address".into())
301 })?),
302 value,
303 input,
304 access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
305 blob_versioned_hashes: decode_b256_vec(
306 opt_blob(r, COL_BLOB_VERSIONED_HASHES).as_deref().ok_or_else(|| {
307 SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
308 })?,
309 )?,
310 max_fee_per_blob_gas: decode_u128_required(
311 &opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
312 COL_MAX_FEE_PER_BLOB_GAS,
313 )?,
314 };
315 Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
316 }
317 TxType::Eip7702 => {
318 let tx =
319 TxEip7702 {
320 chain_id: from_i64(chain_id.ok_or_else(|| {
321 SqlColdError::Convert("EIP7702 requires chain_id".into())
322 })?),
323 nonce,
324 gas_limit,
325 max_fee_per_gas: decode_u128_required(
326 &opt_blob(r, COL_MAX_FEE_PER_GAS),
327 COL_MAX_FEE_PER_GAS,
328 )?,
329 max_priority_fee_per_gas: decode_u128_required(
330 &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
331 COL_MAX_PRIORITY_FEE_PER_GAS,
332 )?,
333 to: Address::from_slice(to_addr.as_deref().ok_or_else(|| {
334 SqlColdError::Convert("EIP7702 requires to_address".into())
335 })?),
336 value,
337 input,
338 access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
339 authorization_list: decode_authorization_list(
340 opt_blob(r, COL_AUTHORIZATION_LIST).as_deref().ok_or_else(|| {
341 SqlColdError::Convert("EIP7702 requires authorization_list".into())
342 })?,
343 )?,
344 };
345 Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
346 }
347 }
348}
349
350fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
352 let sender = Address::from_slice(&r.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
353 let tx = tx_from_row(r)?;
354 Ok(Recovered::new_unchecked(tx, sender))
356}
357
358fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
360 let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
361 .into_iter()
362 .filter_map(|col| r.get::<Option<Vec<u8>>, _>(col))
363 .map(|t| B256::from_slice(&t))
364 .collect();
365 Log {
366 address: Address::from_slice(&r.get::<Vec<u8>, _>(COL_ADDRESS)),
367 data: LogData::new_unchecked(topics, Bytes::from(r.get::<Vec<u8>, _>(COL_DATA))),
368 }
369}
370
371fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
373 let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
374 let order = from_i64(r.get(COL_ORDER_INDEX));
375 let rollup_chain_id = decode_u256(&r.get::<Vec<u8>, _>(COL_ROLLUP_CHAIN_ID))?;
376
377 match event_type {
378 EVENT_TRANSACT => {
379 let sender = Address::from_slice(
380 opt_blob(r, COL_SENDER)
381 .as_deref()
382 .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?,
383 );
384 let to = Address::from_slice(
385 opt_blob(r, COL_TO_ADDRESS)
386 .as_deref()
387 .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?,
388 );
389 let value = decode_u256(
390 opt_blob(r, COL_VALUE)
391 .as_deref()
392 .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?,
393 )?;
394 let gas = decode_u256(
395 opt_blob(r, COL_GAS)
396 .as_deref()
397 .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?,
398 )?;
399 let max_fee =
400 decode_u256(opt_blob(r, COL_MAX_FEE_PER_GAS).as_deref().ok_or_else(|| {
401 SqlColdError::Convert("Transact requires max_fee_per_gas".into())
402 })?)?;
403 let data = Bytes::from(opt_blob(r, COL_DATA).unwrap_or_default());
404
405 Ok(DbSignetEvent::Transact(
406 order,
407 Transact {
408 rollupChainId: rollup_chain_id,
409 sender,
410 to,
411 value,
412 gas,
413 maxFeePerGas: max_fee,
414 data,
415 },
416 ))
417 }
418 EVENT_ENTER => {
419 let recipient =
420 Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else(
421 || SqlColdError::Convert("Enter requires rollup_recipient".into()),
422 )?);
423 let amount = decode_u256(
424 opt_blob(r, COL_AMOUNT)
425 .as_deref()
426 .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?,
427 )?;
428
429 Ok(DbSignetEvent::Enter(
430 order,
431 Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
432 ))
433 }
434 EVENT_ENTER_TOKEN => {
435 let token = Address::from_slice(
436 opt_blob(r, COL_TOKEN)
437 .as_deref()
438 .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?,
439 );
440 let recipient =
441 Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else(
442 || SqlColdError::Convert("EnterToken requires rollup_recipient".into()),
443 )?);
444 let amount = decode_u256(
445 opt_blob(r, COL_AMOUNT)
446 .as_deref()
447 .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?,
448 )?;
449
450 Ok(DbSignetEvent::EnterToken(
451 order,
452 EnterToken {
453 rollupChainId: rollup_chain_id,
454 token,
455 rollupRecipient: recipient,
456 amount,
457 },
458 ))
459 }
460 _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
461 }
462}
463
464fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
466 Ok(DbZenithHeader(Zenith::BlockHeader {
467 hostBlockNumber: decode_u256(&blob(r, COL_HOST_BLOCK_NUMBER))?,
468 rollupChainId: decode_u256(&blob(r, COL_ROLLUP_CHAIN_ID))?,
469 gasLimit: decode_u256(&blob(r, COL_GAS_LIMIT))?,
470 rewardAddress: Address::from_slice(&blob(r, COL_REWARD_ADDRESS)),
471 blockDataHash: alloy::primitives::FixedBytes::<32>::from_slice(&blob(
472 r,
473 COL_BLOCK_DATA_HASH,
474 )),
475 }))
476}
477
478async fn write_block_to_tx(
484 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
485 data: BlockData,
486) -> Result<(), SqlColdError> {
487 let bn = to_i64(data.block_number());
488
489 let block_hash = data.header.hash_slow();
491 let difficulty = encode_u256(&data.header.difficulty);
492 sqlx::query(
493 "INSERT INTO headers (
494 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
495 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
496 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
497 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
498 parent_beacon_block_root, requests_hash
499 ) VALUES (
500 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
501 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
502 )",
503 )
504 .bind(bn)
505 .bind(block_hash.as_slice())
506 .bind(data.header.parent_hash.as_slice())
507 .bind(data.header.ommers_hash.as_slice())
508 .bind(data.header.beneficiary.as_slice())
509 .bind(data.header.state_root.as_slice())
510 .bind(data.header.transactions_root.as_slice())
511 .bind(data.header.receipts_root.as_slice())
512 .bind(data.header.logs_bloom.as_slice())
513 .bind(difficulty.as_slice())
514 .bind(to_i64(data.header.gas_limit))
515 .bind(to_i64(data.header.gas_used))
516 .bind(to_i64(data.header.timestamp))
517 .bind(data.header.extra_data.as_ref())
518 .bind(data.header.mix_hash.as_slice())
519 .bind(data.header.nonce.as_slice())
520 .bind(data.header.base_fee_per_gas.map(to_i64))
521 .bind(data.header.withdrawals_root.as_ref().map(|r| r.as_slice()))
522 .bind(data.header.blob_gas_used.map(to_i64))
523 .bind(data.header.excess_blob_gas.map(to_i64))
524 .bind(data.header.parent_beacon_block_root.as_ref().map(|r| r.as_slice()))
525 .bind(data.header.requests_hash.as_ref().map(|r| r.as_slice()))
526 .execute(&mut **tx)
527 .await?;
528
529 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
531 insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
532 }
533
534 for (idx, receipt) in data.receipts.iter().enumerate() {
536 let tx_idx = to_i64(idx as u64);
537 sqlx::query(
538 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
539 VALUES ($1, $2, $3, $4, $5)",
540 )
541 .bind(bn)
542 .bind(tx_idx)
543 .bind(receipt.tx_type as i32)
544 .bind(receipt.inner.status.coerce_status() as i32)
545 .bind(to_i64(receipt.inner.cumulative_gas_used))
546 .execute(&mut **tx)
547 .await?;
548
549 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
550 let topics = log.topics();
551 sqlx::query(
552 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
553 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
554 )
555 .bind(bn)
556 .bind(tx_idx)
557 .bind(to_i64(log_idx as u64))
558 .bind(log.address.as_slice())
559 .bind(topics.first().map(|t| t.as_slice()))
560 .bind(topics.get(1).map(|t| t.as_slice()))
561 .bind(topics.get(2).map(|t| t.as_slice()))
562 .bind(topics.get(3).map(|t| t.as_slice()))
563 .bind(log.data.data.as_ref())
564 .execute(&mut **tx)
565 .await?;
566 }
567 }
568
569 for (idx, event) in data.signet_events.iter().enumerate() {
571 insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
572 }
573
574 if let Some(zh) = &data.zenith_header {
576 let h = &zh.0;
577 let host_bn = encode_u256(&h.hostBlockNumber);
578 let chain_id = encode_u256(&h.rollupChainId);
579 let gas_limit = encode_u256(&h.gasLimit);
580 sqlx::query(
581 "INSERT INTO zenith_headers (
582 block_number, host_block_number, rollup_chain_id,
583 gas_limit, reward_address, block_data_hash
584 ) VALUES ($1, $2, $3, $4, $5, $6)",
585 )
586 .bind(bn)
587 .bind(host_bn.as_slice())
588 .bind(chain_id.as_slice())
589 .bind(gas_limit.as_slice())
590 .bind(h.rewardAddress.as_slice())
591 .bind(h.blockDataHash.as_slice())
592 .execute(&mut **tx)
593 .await?;
594 }
595
596 Ok(())
597}
598
599async fn insert_transaction(
601 conn: &mut sqlx::AnyConnection,
602 bn: i64,
603 tx_index: i64,
604 recovered: &RecoveredTx,
605) -> Result<(), SqlColdError> {
606 use alloy::consensus::EthereumTxEnvelope;
607
608 let sender = recovered.signer();
609 let tx: &TransactionSigned = recovered;
610 let tx_hash = tx.tx_hash();
611 let tx_type = tx.tx_type() as i32;
612
613 macro_rules! sig {
614 ($s:expr) => {{
615 let sig = $s.signature();
616 (sig.v() as i32, encode_u256(&sig.r()), encode_u256(&sig.s()))
617 }};
618 }
619 let (sig_y, sig_r, sig_s) = match tx {
620 EthereumTxEnvelope::Legacy(s) => sig!(s),
621 EthereumTxEnvelope::Eip2930(s) => sig!(s),
622 EthereumTxEnvelope::Eip1559(s) => sig!(s),
623 EthereumTxEnvelope::Eip4844(s) => sig!(s),
624 EthereumTxEnvelope::Eip7702(s) => sig!(s),
625 };
626
627 let (chain_id, nonce, gas_limit) = match tx {
628 EthereumTxEnvelope::Legacy(s) => {
629 (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
630 }
631 EthereumTxEnvelope::Eip2930(s) => {
632 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
633 }
634 EthereumTxEnvelope::Eip1559(s) => {
635 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
636 }
637 EthereumTxEnvelope::Eip4844(s) => {
638 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
639 }
640 EthereumTxEnvelope::Eip7702(s) => {
641 (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
642 }
643 };
644
645 let (value, to_addr) = match tx {
646 EthereumTxEnvelope::Legacy(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
647 EthereumTxEnvelope::Eip2930(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
648 EthereumTxEnvelope::Eip1559(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
649 EthereumTxEnvelope::Eip4844(s) => {
650 (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec()))
651 }
652 EthereumTxEnvelope::Eip7702(s) => {
653 (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec()))
654 }
655 };
656
657 let input: &[u8] = match tx {
658 EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
659 EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
660 EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
661 EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
662 EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
663 };
664
665 let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
666 EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
667 EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
668 EthereumTxEnvelope::Eip1559(s) => (
669 None,
670 Some(encode_u128(s.tx().max_fee_per_gas)),
671 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
672 None,
673 ),
674 EthereumTxEnvelope::Eip4844(s) => (
675 None,
676 Some(encode_u128(s.tx().max_fee_per_gas)),
677 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
678 Some(encode_u128(s.tx().max_fee_per_blob_gas)),
679 ),
680 EthereumTxEnvelope::Eip7702(s) => (
681 None,
682 Some(encode_u128(s.tx().max_fee_per_gas)),
683 Some(encode_u128(s.tx().max_priority_fee_per_gas)),
684 None,
685 ),
686 };
687
688 let (access_list, blob_hashes, auth_list) = match tx {
689 EthereumTxEnvelope::Legacy(_) => (None, None, None),
690 EthereumTxEnvelope::Eip2930(s) => {
691 (Some(encode_access_list(&s.tx().access_list)), None, None)
692 }
693 EthereumTxEnvelope::Eip1559(s) => {
694 (Some(encode_access_list(&s.tx().access_list)), None, None)
695 }
696 EthereumTxEnvelope::Eip4844(s) => (
697 Some(encode_access_list(&s.tx().access_list)),
698 Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
699 None,
700 ),
701 EthereumTxEnvelope::Eip7702(s) => (
702 Some(encode_access_list(&s.tx().access_list)),
703 None,
704 Some(encode_authorization_list(&s.tx().authorization_list)),
705 ),
706 };
707
708 sqlx::query(
709 "INSERT INTO transactions (
710 block_number, tx_index, tx_hash, tx_type,
711 sig_y_parity, sig_r, sig_s,
712 chain_id, nonce, gas_limit, to_address, value, input,
713 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
714 max_fee_per_blob_gas, blob_versioned_hashes,
715 access_list, authorization_list, from_address
716 ) VALUES (
717 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
718 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
719 )",
720 )
721 .bind(bn)
722 .bind(tx_index)
723 .bind(tx_hash.as_slice())
724 .bind(tx_type)
725 .bind(sig_y)
726 .bind(sig_r.as_slice())
727 .bind(sig_s.as_slice())
728 .bind(chain_id)
729 .bind(nonce)
730 .bind(gas_limit)
731 .bind(to_addr.as_deref())
732 .bind(value.as_slice())
733 .bind(input)
734 .bind(gas_price.as_ref().map(|v| v.as_slice()))
735 .bind(max_fee.as_ref().map(|v| v.as_slice()))
736 .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
737 .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
738 .bind(blob_hashes.as_deref())
739 .bind(access_list.as_deref())
740 .bind(auth_list.as_deref())
741 .bind(sender.as_slice())
742 .execute(&mut *conn)
743 .await?;
744
745 Ok(())
746}
747
748async fn insert_signet_event(
750 conn: &mut sqlx::AnyConnection,
751 block_number: i64,
752 event_index: i64,
753 event: &DbSignetEvent,
754) -> Result<(), SqlColdError> {
755 let (event_type, order, chain_id) = match event {
756 DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), encode_u256(&t.rollupChainId)),
757 DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), encode_u256(&e.rollupChainId)),
758 DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), encode_u256(&e.rollupChainId)),
759 };
760
761 let (value, gas, max_fee, amount) = match event {
762 DbSignetEvent::Transact(_, t) => (
763 Some(encode_u256(&t.value)),
764 Some(encode_u256(&t.gas)),
765 Some(encode_u256(&t.maxFeePerGas)),
766 None,
767 ),
768 DbSignetEvent::Enter(_, e) => (None, None, None, Some(encode_u256(&e.amount))),
769 DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(encode_u256(&e.amount))),
770 };
771
772 sqlx::query(
773 "INSERT INTO signet_events (
774 block_number, event_index, event_type, order_index,
775 rollup_chain_id, sender, to_address, value, gas,
776 max_fee_per_gas, data, rollup_recipient, amount, token
777 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
778 )
779 .bind(block_number)
780 .bind(event_index)
781 .bind(event_type)
782 .bind(order)
783 .bind(chain_id.as_slice())
784 .bind(match event {
785 DbSignetEvent::Transact(_, t) => Some(t.sender.as_slice()),
786 _ => None,
787 })
788 .bind(match event {
789 DbSignetEvent::Transact(_, t) => Some(t.to.as_slice()),
790 _ => None,
791 })
792 .bind(value.as_ref().map(|v| v.as_slice()))
793 .bind(gas.as_ref().map(|v| v.as_slice()))
794 .bind(max_fee.as_ref().map(|v| v.as_slice()))
795 .bind(match event {
796 DbSignetEvent::Transact(_, t) => Some(t.data.as_ref()),
797 _ => None,
798 })
799 .bind(match event {
800 DbSignetEvent::Enter(_, e) => Some(e.rollupRecipient.as_slice()),
801 DbSignetEvent::EnterToken(_, e) => Some(e.rollupRecipient.as_slice()),
802 _ => None,
803 })
804 .bind(amount.as_ref().map(|v| v.as_slice()))
805 .bind(match event {
806 DbSignetEvent::EnterToken(_, e) => Some(e.token.as_slice()),
807 _ => None,
808 })
809 .execute(&mut *conn)
810 .await?;
811
812 Ok(())
813}
814
815impl ColdStorage for SqlColdBackend {
820 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
821 let Some(block_num) = self.resolve_header_spec(spec).await? else {
822 return Ok(None);
823 };
824 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
825 }
826
827 async fn get_headers(
828 &self,
829 specs: Vec<HeaderSpecifier>,
830 ) -> ColdResult<Vec<Option<SealedHeader>>> {
831 let mut results = Vec::with_capacity(specs.len());
832 for spec in specs {
833 let header = self.get_header(spec).await?;
834 results.push(header);
835 }
836 Ok(results)
837 }
838
839 async fn get_transaction(
840 &self,
841 spec: TransactionSpecifier,
842 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
843 let row = match spec {
844 TransactionSpecifier::Hash(hash) => sqlx::query(
845 "SELECT t.*, h.block_hash
846 FROM transactions t
847 JOIN headers h ON t.block_number = h.block_number
848 WHERE t.tx_hash = $1",
849 )
850 .bind(hash.as_slice())
851 .fetch_optional(&self.pool)
852 .await
853 .map_err(SqlColdError::from)?,
854 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
855 "SELECT t.*, h.block_hash
856 FROM transactions t
857 JOIN headers h ON t.block_number = h.block_number
858 WHERE t.block_number = $1 AND t.tx_index = $2",
859 )
860 .bind(to_i64(block))
861 .bind(to_i64(index))
862 .fetch_optional(&self.pool)
863 .await
864 .map_err(SqlColdError::from)?,
865 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
866 "SELECT t.*, h.block_hash
867 FROM transactions t
868 JOIN headers h ON t.block_number = h.block_number
869 WHERE h.block_hash = $1 AND t.tx_index = $2",
870 )
871 .bind(block_hash.as_slice())
872 .bind(to_i64(index))
873 .fetch_optional(&self.pool)
874 .await
875 .map_err(SqlColdError::from)?,
876 };
877
878 let Some(r) = row else {
879 return Ok(None);
880 };
881
882 let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
883 let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
884 let hash_bytes: Vec<u8> = r.get(COL_BLOCK_HASH);
885 let block_hash = B256::from_slice(&hash_bytes);
886 let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
887 let meta = ConfirmationMeta::new(block, block_hash, index);
888 Ok(Some(Confirmed::new(recovered, meta)))
889 }
890
891 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
892 let bn = to_i64(block);
893 let rows =
894 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
895 .bind(bn)
896 .fetch_all(&self.pool)
897 .await
898 .map_err(SqlColdError::from)?;
899
900 rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
901 }
902
903 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
904 let bn = to_i64(block);
905 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
906 .bind(bn)
907 .fetch_one(&self.pool)
908 .await
909 .map_err(SqlColdError::from)?;
910
911 Ok(from_i64(row.get::<i64, _>(COL_CNT)))
912 }
913
914 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
915 let (block, index) = match spec {
917 ReceiptSpecifier::TxHash(hash) => {
918 let row = sqlx::query(
919 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
920 )
921 .bind(hash.as_slice())
922 .fetch_optional(&self.pool)
923 .await
924 .map_err(SqlColdError::from)?;
925 let Some(r) = row else { return Ok(None) };
926 (
927 from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
928 from_i64(r.get::<i64, _>(COL_TX_INDEX)),
929 )
930 }
931 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
932 };
933
934 let Some(header) = self.fetch_header_by_number(block).await? else {
935 return Ok(None);
936 };
937
938 let receipt_row = sqlx::query(
940 "SELECT r.*, t.tx_hash, t.from_address
941 FROM receipts r
942 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
943 WHERE r.block_number = $1 AND r.tx_index = $2",
944 )
945 .bind(to_i64(block))
946 .bind(to_i64(index))
947 .fetch_optional(&self.pool)
948 .await
949 .map_err(SqlColdError::from)?;
950
951 let Some(rr) = receipt_row else {
952 return Ok(None);
953 };
954
955 let bn: i64 = rr.get(COL_BLOCK_NUMBER);
956 let tx_idx: i64 = rr.get(COL_TX_INDEX);
957 let tx_hash = B256::from_slice(&rr.get::<Vec<u8>, _>(COL_TX_HASH));
958 let sender = Address::from_slice(&rr.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
959 let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
960 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
961 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
962
963 let log_rows = sqlx::query(
964 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
965 )
966 .bind(bn)
967 .bind(tx_idx)
968 .fetch_all(&self.pool)
969 .await
970 .map_err(SqlColdError::from)?;
971
972 let logs = log_rows.iter().map(log_from_row).collect();
973 let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
974 .map_err(ColdStorageError::from)?;
975
976 let prior = sqlx::query(
978 "SELECT CAST(SUM(
979 (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
980 ) AS bigint) as log_count,
981 CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
982 FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
983 )
984 .bind(to_i64(block))
985 .bind(to_i64(index))
986 .fetch_one(&self.pool)
987 .await
988 .map_err(SqlColdError::from)?;
989
990 let first_log_index: u64 = prior.get::<Option<i64>, _>(COL_LOG_COUNT).unwrap_or(0) as u64;
991 let prior_cumulative_gas: u64 =
992 prior.get::<Option<i64>, _>(COL_PRIOR_GAS).unwrap_or(0) as u64;
993 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
994
995 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
996 Ok(Some(ColdReceipt::new(ir, &header, index)))
997 }
998
999 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1000 let Some(header) =
1001 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1002 else {
1003 return Ok(Vec::new());
1004 };
1005
1006 let bn = to_i64(block);
1007
1008 let receipt_rows = sqlx::query(
1010 "SELECT r.*, t.tx_hash, t.from_address
1011 FROM receipts r
1012 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1013 WHERE r.block_number = $1
1014 ORDER BY r.tx_index",
1015 )
1016 .bind(bn)
1017 .fetch_all(&self.pool)
1018 .await
1019 .map_err(SqlColdError::from)?;
1020
1021 let all_log_rows =
1022 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1023 .bind(bn)
1024 .fetch_all(&self.pool)
1025 .await
1026 .map_err(SqlColdError::from)?;
1027
1028 let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<Log>> =
1030 std::collections::BTreeMap::new();
1031 for r in &all_log_rows {
1032 let tx_idx: i64 = r.get(COL_TX_INDEX);
1033 logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1034 }
1035
1036 let mut first_log_index = 0u64;
1037 let mut prior_cumulative_gas = 0u64;
1038 receipt_rows
1039 .into_iter()
1040 .enumerate()
1041 .map(|(idx, rr)| {
1042 let tx_idx: i64 = rr.get(COL_TX_INDEX);
1043 let tx_hash = B256::from_slice(&rr.get::<Vec<u8>, _>(COL_TX_HASH));
1044 let sender = Address::from_slice(&rr.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
1045 let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1046 let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1047 let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1048 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1049 let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1050 .map_err(ColdStorageError::from)?;
1051 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1052 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1053 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1054 first_log_index += ir.receipt.inner.logs.len() as u64;
1055 Ok(ColdReceipt::new(ir, &header, idx as u64))
1056 })
1057 .collect()
1058 }
1059
1060 async fn get_signet_events(
1061 &self,
1062 spec: SignetEventsSpecifier,
1063 ) -> ColdResult<Vec<DbSignetEvent>> {
1064 let rows = match spec {
1065 SignetEventsSpecifier::Block(block) => {
1066 let bn = to_i64(block);
1067 sqlx::query(
1068 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1069 )
1070 .bind(bn)
1071 .fetch_all(&self.pool)
1072 .await
1073 .map_err(SqlColdError::from)?
1074 }
1075 SignetEventsSpecifier::BlockRange { start, end } => {
1076 let s = to_i64(start);
1077 let e = to_i64(end);
1078 sqlx::query(
1079 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1080 ORDER BY block_number, event_index",
1081 )
1082 .bind(s)
1083 .bind(e)
1084 .fetch_all(&self.pool)
1085 .await
1086 .map_err(SqlColdError::from)?
1087 }
1088 };
1089
1090 rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1091 }
1092
1093 async fn get_zenith_header(
1094 &self,
1095 spec: ZenithHeaderSpecifier,
1096 ) -> ColdResult<Option<DbZenithHeader>> {
1097 let block = match spec {
1098 ZenithHeaderSpecifier::Number(n) => n,
1099 ZenithHeaderSpecifier::Range { start, .. } => start,
1100 };
1101 let bn = to_i64(block);
1102 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1103 .bind(bn)
1104 .fetch_optional(&self.pool)
1105 .await
1106 .map_err(SqlColdError::from)?;
1107
1108 row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1109 }
1110
1111 async fn get_zenith_headers(
1112 &self,
1113 spec: ZenithHeaderSpecifier,
1114 ) -> ColdResult<Vec<DbZenithHeader>> {
1115 let rows = match spec {
1116 ZenithHeaderSpecifier::Number(n) => {
1117 let bn = to_i64(n);
1118 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1119 .bind(bn)
1120 .fetch_all(&self.pool)
1121 .await
1122 .map_err(SqlColdError::from)?
1123 }
1124 ZenithHeaderSpecifier::Range { start, end } => {
1125 let s = to_i64(start);
1126 let e = to_i64(end);
1127 sqlx::query(
1128 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1129 ORDER BY block_number",
1130 )
1131 .bind(s)
1132 .bind(e)
1133 .fetch_all(&self.pool)
1134 .await
1135 .map_err(SqlColdError::from)?
1136 }
1137 };
1138
1139 rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1140 }
1141
1142 async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1143 let from = filter.get_from_block().unwrap_or(0);
1144 let to = filter.get_to_block().unwrap_or(u64::MAX);
1145
1146 let mut where_clause = String::from("l.block_number >= $1 AND l.block_number <= $2");
1148 let mut params: Vec<Vec<u8>> = Vec::new();
1149 let mut idx = 3u32;
1150
1151 if !filter.address.is_empty() {
1153 let addrs: Vec<_> = filter.address.iter().collect();
1154 if addrs.len() == 1 {
1155 where_clause.push_str(&format!(" AND l.address = ${idx}"));
1156 params.push(addrs[0].as_slice().to_vec());
1157 idx += 1;
1158 } else {
1159 let placeholders: String = addrs
1160 .iter()
1161 .enumerate()
1162 .map(|(i, _)| format!("${}", idx + i as u32))
1163 .collect::<Vec<_>>()
1164 .join(", ");
1165 where_clause.push_str(&format!(" AND l.address IN ({placeholders})"));
1166 for addr in &addrs {
1167 params.push(addr.as_slice().to_vec());
1168 }
1169 idx += addrs.len() as u32;
1170 }
1171 }
1172
1173 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1175 for (i, topic_filter) in filter.topics.iter().enumerate() {
1176 if topic_filter.is_empty() {
1177 continue;
1178 }
1179 let values: Vec<_> = topic_filter.iter().collect();
1180 if values.len() == 1 {
1181 where_clause.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
1182 params.push(values[0].as_slice().to_vec());
1183 idx += 1;
1184 } else {
1185 let placeholders: String = values
1186 .iter()
1187 .enumerate()
1188 .map(|(j, _)| format!("${}", idx + j as u32))
1189 .collect::<Vec<_>>()
1190 .join(", ");
1191 where_clause.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
1192 for v in &values {
1193 params.push(v.as_slice().to_vec());
1194 }
1195 idx += values.len() as u32;
1196 }
1197 }
1198
1199 let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}");
1202 let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to));
1203 for param in ¶ms {
1204 count_query = count_query.bind(param.as_slice());
1205 }
1206 let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?;
1207 let count = from_i64(count_row.get::<i64, _>(COL_CNT)) as usize;
1208 if count > max_logs {
1209 return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1210 }
1211
1212 let data_sql = format!(
1215 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1216 (SELECT COUNT(*) FROM logs l2 \
1217 WHERE l2.block_number = l.block_number \
1218 AND (l2.tx_index < l.tx_index \
1219 OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
1220 ) AS block_log_index \
1221 FROM logs l \
1222 JOIN headers h ON l.block_number = h.block_number \
1223 JOIN transactions t ON l.block_number = t.block_number \
1224 AND l.tx_index = t.tx_index \
1225 WHERE {where_clause} \
1226 ORDER BY l.block_number, l.tx_index, l.log_index"
1227 );
1228 let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1229 for param in ¶ms {
1230 query = query.bind(param.as_slice());
1231 }
1232
1233 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1234
1235 rows.into_iter()
1236 .map(|r| {
1237 let log = log_from_row(&r);
1238 let block_number = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1239 let block_hash_bytes: Vec<u8> = r.get(COL_BLOCK_HASH);
1240 let tx_hash_bytes: Vec<u8> = r.get(COL_TX_HASH);
1241 Ok(RpcLog {
1242 inner: log,
1243 block_hash: Some(B256::from_slice(&block_hash_bytes)),
1244 block_number: Some(block_number),
1245 block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1246 transaction_hash: Some(B256::from_slice(&tx_hash_bytes)),
1247 transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1248 log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1249 removed: false,
1250 })
1251 })
1252 .collect::<ColdResult<Vec<_>>>()
1253 }
1254
1255 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1256 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1257 .fetch_one(&self.pool)
1258 .await
1259 .map_err(SqlColdError::from)?;
1260 Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1261 }
1262
1263 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1264 self.insert_block(data).await.map_err(ColdStorageError::from)
1265 }
1266
1267 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1268 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1269 for block_data in data {
1270 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1271 }
1272 tx.commit().await.map_err(SqlColdError::from)?;
1273 Ok(())
1274 }
1275
1276 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1277 let bn = to_i64(block);
1278 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1279
1280 sqlx::query("DELETE FROM logs WHERE block_number > $1")
1281 .bind(bn)
1282 .execute(&mut *tx)
1283 .await
1284 .map_err(SqlColdError::from)?;
1285 sqlx::query("DELETE FROM transactions WHERE block_number > $1")
1286 .bind(bn)
1287 .execute(&mut *tx)
1288 .await
1289 .map_err(SqlColdError::from)?;
1290 sqlx::query("DELETE FROM receipts WHERE block_number > $1")
1291 .bind(bn)
1292 .execute(&mut *tx)
1293 .await
1294 .map_err(SqlColdError::from)?;
1295 sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
1296 .bind(bn)
1297 .execute(&mut *tx)
1298 .await
1299 .map_err(SqlColdError::from)?;
1300 sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
1301 .bind(bn)
1302 .execute(&mut *tx)
1303 .await
1304 .map_err(SqlColdError::from)?;
1305 sqlx::query("DELETE FROM headers WHERE block_number > $1")
1306 .bind(bn)
1307 .execute(&mut *tx)
1308 .await
1309 .map_err(SqlColdError::from)?;
1310
1311 tx.commit().await.map_err(SqlColdError::from)?;
1312 Ok(())
1313 }
1314}
1315
1316#[cfg(all(test, feature = "test-utils"))]
1317mod tests {
1318 use super::*;
1319 use signet_cold::conformance::conformance;
1320
1321 #[tokio::test]
1322 async fn sqlite_conformance() {
1323 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1324 conformance(&backend).await.unwrap();
1325 }
1326
1327 #[tokio::test]
1328 async fn pg_conformance() {
1329 let Ok(url) = std::env::var("DATABASE_URL") else {
1330 eprintln!("skipping pg conformance: DATABASE_URL not set");
1331 return;
1332 };
1333 let backend = SqlColdBackend::connect(&url).await.unwrap();
1334 conformance(&backend).await.unwrap();
1335 }
1336}