1use crate::SqlColdError;
8use crate::convert::{
9 HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10 receipt_from_rows, to_i64,
11};
12use alloy::{
13 consensus::transaction::Recovered,
14 primitives::{BlockNumber, Sealable},
15};
16use signet_cold::{
17 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
18 HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
19 ZenithHeaderSpecifier,
20};
21use signet_storage_types::{
22 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
23 TransactionSigned,
24};
25use sqlx::{AnyPool, Row};
26
27#[derive(Debug, Clone)]
47pub struct SqlColdBackend {
48 pool: AnyPool,
49}
50
51impl SqlColdBackend {
52 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
59 let conn = pool.acquire().await?;
61 let backend = conn.backend_name().to_owned();
62 drop(conn);
63
64 let migration = match backend.as_str() {
65 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
66 "SQLite" => include_str!("../migrations/001_initial.sql"),
67 other => {
68 return Err(SqlColdError::Convert(format!(
69 "unsupported database backend: {other}"
70 )));
71 }
72 };
73 sqlx::raw_sql(migration).execute(&pool).await?;
76 Ok(Self { pool })
77 }
78
79 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
88 sqlx::any::install_default_drivers();
89 let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
90 Self::new(pool).await
91 }
92
93 async fn resolve_header_spec(
98 &self,
99 spec: HeaderSpecifier,
100 ) -> Result<Option<BlockNumber>, SqlColdError> {
101 match spec {
102 HeaderSpecifier::Number(n) => Ok(Some(n)),
103 HeaderSpecifier::Hash(hash) => {
104 let hash_bytes = hash.as_slice();
105 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
106 .bind(hash_bytes)
107 .fetch_optional(&self.pool)
108 .await?;
109 Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
110 }
111 }
112 }
113
114 async fn fetch_header_by_number(
119 &self,
120 block_num: BlockNumber,
121 ) -> Result<Option<SealedHeader>, SqlColdError> {
122 let bn = to_i64(block_num);
123 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
124 .bind(bn)
125 .fetch_optional(&self.pool)
126 .await?;
127
128 row.map(|r| {
129 let header = HeaderRow {
130 block_number: r.get("block_number"),
131 block_hash: r.get("block_hash"),
132 parent_hash: r.get("parent_hash"),
133 ommers_hash: r.get("ommers_hash"),
134 beneficiary: r.get("beneficiary"),
135 state_root: r.get("state_root"),
136 transactions_root: r.get("transactions_root"),
137 receipts_root: r.get("receipts_root"),
138 logs_bloom: r.get("logs_bloom"),
139 difficulty: r.get("difficulty"),
140 gas_limit: r.get("gas_limit"),
141 gas_used: r.get("gas_used"),
142 timestamp: r.get("timestamp"),
143 extra_data: r.get("extra_data"),
144 mix_hash: r.get("mix_hash"),
145 nonce: r.get("nonce"),
146 base_fee_per_gas: r.get("base_fee_per_gas"),
147 withdrawals_root: r.get("withdrawals_root"),
148 blob_gas_used: r.get("blob_gas_used"),
149 excess_blob_gas: r.get("excess_blob_gas"),
150 parent_beacon_block_root: r.get("parent_beacon_block_root"),
151 requests_hash: r.get("requests_hash"),
152 }
153 .into_header()?;
154 Ok(header.seal_slow())
155 })
156 .transpose()
157 }
158
159 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
164 let mut tx = self.pool.begin().await?;
165 write_block_to_tx(&mut tx, data).await?;
166 tx.commit().await?;
167 Ok(())
168 }
169}
170
171async fn write_block_to_tx(
173 tx: &mut sqlx::Transaction<'_, sqlx::Any>,
174 data: BlockData,
175) -> Result<(), SqlColdError> {
176 let block = data.block_number();
177 let bn = to_i64(block);
178
179 let hr = HeaderRow::from_header(&data.header);
181 sqlx::query(
182 "INSERT INTO headers (
183 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
184 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
185 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
186 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
187 parent_beacon_block_root, requests_hash
188 ) VALUES (
189 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
190 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
191 )",
192 )
193 .bind(hr.block_number)
194 .bind(&hr.block_hash)
195 .bind(&hr.parent_hash)
196 .bind(&hr.ommers_hash)
197 .bind(&hr.beneficiary)
198 .bind(&hr.state_root)
199 .bind(&hr.transactions_root)
200 .bind(&hr.receipts_root)
201 .bind(&hr.logs_bloom)
202 .bind(&hr.difficulty)
203 .bind(hr.gas_limit)
204 .bind(hr.gas_used)
205 .bind(hr.timestamp)
206 .bind(&hr.extra_data)
207 .bind(&hr.mix_hash)
208 .bind(&hr.nonce)
209 .bind(hr.base_fee_per_gas)
210 .bind(&hr.withdrawals_root)
211 .bind(hr.blob_gas_used)
212 .bind(hr.excess_blob_gas)
213 .bind(&hr.parent_beacon_block_root)
214 .bind(&hr.requests_hash)
215 .execute(&mut **tx)
216 .await?;
217
218 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
220 let sender = recovered_tx.signer();
221 let tx_signed: &TransactionSigned = recovered_tx;
222 let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64), &sender)?;
223 sqlx::query(
224 "INSERT INTO transactions (
225 block_number, tx_index, tx_hash, tx_type,
226 sig_y_parity, sig_r, sig_s,
227 chain_id, nonce, gas_limit, to_address, value, input,
228 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
229 max_fee_per_blob_gas, blob_versioned_hashes,
230 access_list, authorization_list, from_address
231 ) VALUES (
232 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
233 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
234 )",
235 )
236 .bind(tr.block_number)
237 .bind(tr.tx_index)
238 .bind(&tr.tx_hash)
239 .bind(tr.tx_type as i32)
240 .bind(tr.sig_y_parity as i32)
241 .bind(&tr.sig_r)
242 .bind(&tr.sig_s)
243 .bind(tr.chain_id)
244 .bind(tr.nonce)
245 .bind(tr.gas_limit)
246 .bind(&tr.to_address)
247 .bind(&tr.value)
248 .bind(&tr.input)
249 .bind(&tr.gas_price)
250 .bind(&tr.max_fee_per_gas)
251 .bind(&tr.max_priority_fee_per_gas)
252 .bind(&tr.max_fee_per_blob_gas)
253 .bind(&tr.blob_versioned_hashes)
254 .bind(&tr.access_list)
255 .bind(&tr.authorization_list)
256 .bind(&tr.from_address)
257 .execute(&mut **tx)
258 .await?;
259 }
260
261 for (idx, receipt) in data.receipts.iter().enumerate() {
263 let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
264 sqlx::query(
265 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
266 VALUES ($1, $2, $3, $4, $5)",
267 )
268 .bind(rr.block_number)
269 .bind(rr.tx_index)
270 .bind(rr.tx_type as i32)
271 .bind(rr.success as i32)
272 .bind(rr.cumulative_gas_used)
273 .execute(&mut **tx)
274 .await?;
275
276 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
277 let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
278 sqlx::query(
279 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
280 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
281 )
282 .bind(lr.block_number)
283 .bind(lr.tx_index)
284 .bind(lr.log_index)
285 .bind(&lr.address)
286 .bind(&lr.topic0)
287 .bind(&lr.topic1)
288 .bind(&lr.topic2)
289 .bind(&lr.topic3)
290 .bind(&lr.data)
291 .execute(&mut **tx)
292 .await?;
293 }
294 }
295
296 for (idx, event) in data.signet_events.iter().enumerate() {
298 let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
299 sqlx::query(
300 "INSERT INTO signet_events (
301 block_number, event_index, event_type, order_index,
302 rollup_chain_id, sender, to_address, value, gas,
303 max_fee_per_gas, data, rollup_recipient, amount, token
304 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
305 )
306 .bind(er.block_number)
307 .bind(er.event_index)
308 .bind(er.event_type as i32)
309 .bind(er.order_index)
310 .bind(&er.rollup_chain_id)
311 .bind(&er.sender)
312 .bind(&er.to_address)
313 .bind(&er.value)
314 .bind(&er.gas)
315 .bind(&er.max_fee_per_gas)
316 .bind(&er.data)
317 .bind(&er.rollup_recipient)
318 .bind(&er.amount)
319 .bind(&er.token)
320 .execute(&mut **tx)
321 .await?;
322 }
323
324 if let Some(zh) = &data.zenith_header {
326 let zr = ZenithHeaderRow::from_zenith(zh, bn);
327 sqlx::query(
328 "INSERT INTO zenith_headers (
329 block_number, host_block_number, rollup_chain_id,
330 gas_limit, reward_address, block_data_hash
331 ) VALUES ($1, $2, $3, $4, $5, $6)",
332 )
333 .bind(zr.block_number)
334 .bind(&zr.host_block_number)
335 .bind(&zr.rollup_chain_id)
336 .bind(&zr.gas_limit)
337 .bind(&zr.reward_address)
338 .bind(&zr.block_data_hash)
339 .execute(&mut **tx)
340 .await?;
341 }
342
343 Ok(())
344}
345
346fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
348 TxRow {
349 block_number: r.get("block_number"),
350 tx_index: r.get("tx_index"),
351 tx_hash: r.get("tx_hash"),
352 tx_type: r.get::<i32, _>("tx_type") as i16,
353 sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
354 sig_r: r.get("sig_r"),
355 sig_s: r.get("sig_s"),
356 chain_id: r.get("chain_id"),
357 nonce: r.get("nonce"),
358 gas_limit: r.get("gas_limit"),
359 to_address: r.get("to_address"),
360 value: r.get("value"),
361 input: r.get("input"),
362 gas_price: r.get("gas_price"),
363 max_fee_per_gas: r.get("max_fee_per_gas"),
364 max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
365 max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
366 blob_versioned_hashes: r.get("blob_versioned_hashes"),
367 access_list: r.get("access_list"),
368 authorization_list: r.get("authorization_list"),
369 from_address: r.get("from_address"),
370 }
371}
372
373fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
374 SignetEventRow {
375 block_number: r.get("block_number"),
376 event_index: r.get("event_index"),
377 event_type: r.get::<i32, _>("event_type") as i16,
378 order_index: r.get("order_index"),
379 rollup_chain_id: r.get("rollup_chain_id"),
380 sender: r.get("sender"),
381 to_address: r.get("to_address"),
382 value: r.get("value"),
383 gas: r.get("gas"),
384 max_fee_per_gas: r.get("max_fee_per_gas"),
385 data: r.get("data"),
386 rollup_recipient: r.get("rollup_recipient"),
387 amount: r.get("amount"),
388 token: r.get("token"),
389 }
390}
391
392fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
393 LogRow {
394 block_number: r.get("block_number"),
395 tx_index: r.get("tx_index"),
396 log_index: r.get("log_index"),
397 address: r.get("address"),
398 topic0: r.get("topic0"),
399 topic1: r.get("topic1"),
400 topic2: r.get("topic2"),
401 topic3: r.get("topic3"),
402 data: r.get("data"),
403 }
404}
405
406fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
407 ZenithHeaderRow {
408 block_number: r.get("block_number"),
409 host_block_number: r.get("host_block_number"),
410 rollup_chain_id: r.get("rollup_chain_id"),
411 gas_limit: r.get("gas_limit"),
412 reward_address: r.get("reward_address"),
413 block_data_hash: r.get("block_data_hash"),
414 }
415}
416
417impl ColdStorage for SqlColdBackend {
418 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
419 let Some(block_num) = self.resolve_header_spec(spec).await? else {
420 return Ok(None);
421 };
422 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
423 }
424
425 async fn get_headers(
426 &self,
427 specs: Vec<HeaderSpecifier>,
428 ) -> ColdResult<Vec<Option<SealedHeader>>> {
429 let mut results = Vec::with_capacity(specs.len());
430 for spec in specs {
431 let header = self.get_header(spec).await?;
432 results.push(header);
433 }
434 Ok(results)
435 }
436
437 async fn get_transaction(
438 &self,
439 spec: TransactionSpecifier,
440 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
441 let row = match spec {
442 TransactionSpecifier::Hash(hash) => sqlx::query(
443 "SELECT t.*, h.block_hash
444 FROM transactions t
445 JOIN headers h ON t.block_number = h.block_number
446 WHERE t.tx_hash = $1",
447 )
448 .bind(hash.as_slice())
449 .fetch_optional(&self.pool)
450 .await
451 .map_err(SqlColdError::from)?,
452 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
453 "SELECT t.*, h.block_hash
454 FROM transactions t
455 JOIN headers h ON t.block_number = h.block_number
456 WHERE t.block_number = $1 AND t.tx_index = $2",
457 )
458 .bind(to_i64(block))
459 .bind(to_i64(index))
460 .fetch_optional(&self.pool)
461 .await
462 .map_err(SqlColdError::from)?,
463 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
464 "SELECT t.*, h.block_hash
465 FROM transactions t
466 JOIN headers h ON t.block_number = h.block_number
467 WHERE h.block_hash = $1 AND t.tx_index = $2",
468 )
469 .bind(block_hash.as_slice())
470 .bind(to_i64(index))
471 .fetch_optional(&self.pool)
472 .await
473 .map_err(SqlColdError::from)?,
474 };
475
476 let Some(r) = row else {
477 return Ok(None);
478 };
479
480 let tx_row = row_to_tx_row(&r);
481 let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
482 let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
483 let block = from_i64(r.get::<i64, _>("block_number"));
484 let index = from_i64(r.get::<i64, _>("tx_index"));
485 let hash_bytes: Vec<u8> = r.get("block_hash");
486 let block_hash = alloy::primitives::B256::from_slice(&hash_bytes);
487 let meta = ConfirmationMeta::new(block, block_hash, index);
488 let recovered = Recovered::new_unchecked(tx, sender);
490 Ok(Some(Confirmed::new(recovered, meta)))
491 }
492
493 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
494 let bn = to_i64(block);
495 let rows =
496 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
497 .bind(bn)
498 .fetch_all(&self.pool)
499 .await
500 .map_err(SqlColdError::from)?;
501
502 rows.into_iter()
503 .map(|r| {
504 let tx_row = row_to_tx_row(&r);
505 let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
506 let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
507 Ok(Recovered::new_unchecked(tx, sender))
509 })
510 .collect()
511 }
512
513 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
514 let bn = to_i64(block);
515 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
516 .bind(bn)
517 .fetch_one(&self.pool)
518 .await
519 .map_err(SqlColdError::from)?;
520
521 Ok(from_i64(row.get::<i64, _>("cnt")))
522 }
523
524 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
525 let (block, index) = match spec {
527 ReceiptSpecifier::TxHash(hash) => {
528 let row = sqlx::query(
529 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
530 )
531 .bind(hash.as_slice())
532 .fetch_optional(&self.pool)
533 .await
534 .map_err(SqlColdError::from)?;
535 let Some(r) = row else { return Ok(None) };
536 (from_i64(r.get::<i64, _>("block_number")), from_i64(r.get::<i64, _>("tx_index")))
537 }
538 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
539 };
540
541 let Some(header) = self.fetch_header_by_number(block).await? else {
542 return Ok(None);
543 };
544
545 let receipt_row = sqlx::query(
547 "SELECT r.*, t.tx_hash, t.from_address
548 FROM receipts r
549 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
550 WHERE r.block_number = $1 AND r.tx_index = $2",
551 )
552 .bind(to_i64(block))
553 .bind(to_i64(index))
554 .fetch_optional(&self.pool)
555 .await
556 .map_err(SqlColdError::from)?;
557
558 let Some(rr) = receipt_row else {
559 return Ok(None);
560 };
561
562 let bn: i64 = rr.get("block_number");
563 let tx_idx: i64 = rr.get("tx_index");
564 let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
565 let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
566 let from_bytes: Vec<u8> = rr.get("from_address");
567 let sender = alloy::primitives::Address::from_slice(&from_bytes);
568
569 let receipt = ReceiptRow {
570 block_number: bn,
571 tx_index: tx_idx,
572 tx_type: rr.get::<i32, _>("tx_type") as i16,
573 success: rr.get::<i32, _>("success") != 0,
574 cumulative_gas_used: rr.get("cumulative_gas_used"),
575 };
576
577 let log_rows = sqlx::query(
578 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
579 )
580 .bind(bn)
581 .bind(tx_idx)
582 .fetch_all(&self.pool)
583 .await
584 .map_err(SqlColdError::from)?;
585
586 let logs: Vec<_> = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
587
588 let built = receipt_from_rows(receipt, logs).map_err(ColdStorageError::from)?;
589
590 let prior = sqlx::query(
592 "SELECT CAST(SUM(
593 (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
594 ) AS bigint) as log_count,
595 CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
596 FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
597 )
598 .bind(to_i64(block))
599 .bind(to_i64(index))
600 .fetch_one(&self.pool)
601 .await
602 .map_err(SqlColdError::from)?;
603
604 let first_log_index: u64 = prior.get::<Option<i64>, _>("log_count").unwrap_or(0) as u64;
605 let prior_cumulative_gas: u64 =
606 prior.get::<Option<i64>, _>("prior_gas").unwrap_or(0) as u64;
607 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
608
609 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
610 Ok(Some(ColdReceipt::new(ir, &header, index)))
611 }
612
613 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
614 let Some(header) =
615 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
616 else {
617 return Ok(Vec::new());
618 };
619
620 let bn = to_i64(block);
621
622 let receipt_rows = sqlx::query(
624 "SELECT r.*, t.tx_hash, t.from_address
625 FROM receipts r
626 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
627 WHERE r.block_number = $1
628 ORDER BY r.tx_index",
629 )
630 .bind(bn)
631 .fetch_all(&self.pool)
632 .await
633 .map_err(SqlColdError::from)?;
634
635 let all_log_rows =
636 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
637 .bind(bn)
638 .fetch_all(&self.pool)
639 .await
640 .map_err(SqlColdError::from)?;
641
642 let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
644 std::collections::BTreeMap::new();
645 for r in all_log_rows {
646 logs_by_tx.entry(r.get::<i64, _>("tx_index")).or_default().push(row_to_log_row(&r));
647 }
648
649 let mut first_log_index = 0u64;
650 let mut prior_cumulative_gas = 0u64;
651 receipt_rows
652 .into_iter()
653 .enumerate()
654 .map(|(idx, rr)| {
655 let tx_idx: i64 = rr.get("tx_index");
656 let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
657 let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
658 let from_bytes: Vec<u8> = rr.get("from_address");
659 let sender = alloy::primitives::Address::from_slice(&from_bytes);
660 let receipt_row = ReceiptRow {
661 block_number: rr.get("block_number"),
662 tx_index: tx_idx,
663 tx_type: rr.get::<i32, _>("tx_type") as i16,
664 success: rr.get::<i32, _>("success") != 0,
665 cumulative_gas_used: rr.get("cumulative_gas_used"),
666 };
667 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
668 let receipt =
669 receipt_from_rows(receipt_row, logs).map_err(ColdStorageError::from)?;
670 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
671 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
672 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
673 first_log_index += ir.receipt.inner.logs.len() as u64;
674 Ok(ColdReceipt::new(ir, &header, idx as u64))
675 })
676 .collect()
677 }
678
679 async fn get_signet_events(
680 &self,
681 spec: SignetEventsSpecifier,
682 ) -> ColdResult<Vec<DbSignetEvent>> {
683 let rows = match spec {
684 SignetEventsSpecifier::Block(block) => {
685 let bn = to_i64(block);
686 sqlx::query(
687 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
688 )
689 .bind(bn)
690 .fetch_all(&self.pool)
691 .await
692 .map_err(SqlColdError::from)?
693 }
694 SignetEventsSpecifier::BlockRange { start, end } => {
695 let s = to_i64(start);
696 let e = to_i64(end);
697 sqlx::query(
698 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
699 ORDER BY block_number, event_index",
700 )
701 .bind(s)
702 .bind(e)
703 .fetch_all(&self.pool)
704 .await
705 .map_err(SqlColdError::from)?
706 }
707 };
708
709 rows.into_iter()
710 .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
711 .collect()
712 }
713
714 async fn get_zenith_header(
715 &self,
716 spec: ZenithHeaderSpecifier,
717 ) -> ColdResult<Option<DbZenithHeader>> {
718 let block = match spec {
719 ZenithHeaderSpecifier::Number(n) => n,
720 ZenithHeaderSpecifier::Range { start, .. } => start,
721 };
722 let bn = to_i64(block);
723 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
724 .bind(bn)
725 .fetch_optional(&self.pool)
726 .await
727 .map_err(SqlColdError::from)?;
728
729 row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
730 .transpose()
731 }
732
733 async fn get_zenith_headers(
734 &self,
735 spec: ZenithHeaderSpecifier,
736 ) -> ColdResult<Vec<DbZenithHeader>> {
737 let rows = match spec {
738 ZenithHeaderSpecifier::Number(n) => {
739 let bn = to_i64(n);
740 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
741 .bind(bn)
742 .fetch_all(&self.pool)
743 .await
744 .map_err(SqlColdError::from)?
745 }
746 ZenithHeaderSpecifier::Range { start, end } => {
747 let s = to_i64(start);
748 let e = to_i64(end);
749 sqlx::query(
750 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
751 ORDER BY block_number",
752 )
753 .bind(s)
754 .bind(e)
755 .fetch_all(&self.pool)
756 .await
757 .map_err(SqlColdError::from)?
758 }
759 };
760
761 rows.into_iter()
762 .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
763 .collect()
764 }
765
766 async fn get_logs(&self, filter: Filter) -> ColdResult<Vec<RpcLog>> {
767 let from = filter.get_from_block().unwrap_or(0);
768 let to = filter.get_to_block().unwrap_or(u64::MAX);
769
770 let mut sql = String::from(
775 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
776 (SELECT COUNT(*) FROM logs l2 \
777 WHERE l2.block_number = l.block_number \
778 AND (l2.tx_index < l.tx_index \
779 OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
780 ) AS block_log_index \
781 FROM logs l \
782 JOIN headers h ON l.block_number = h.block_number \
783 JOIN transactions t ON l.block_number = t.block_number \
784 AND l.tx_index = t.tx_index \
785 WHERE l.block_number >= $1 AND l.block_number <= $2",
786 );
787 let mut params: Vec<Vec<u8>> = Vec::new();
788 let mut idx = 3u32;
789
790 if !filter.address.is_empty() {
792 let addrs: Vec<_> = filter.address.iter().collect();
793 if addrs.len() == 1 {
794 sql.push_str(&format!(" AND l.address = ${idx}"));
795 params.push(addrs[0].as_slice().to_vec());
796 idx += 1;
797 } else {
798 let placeholders: String = addrs
799 .iter()
800 .enumerate()
801 .map(|(i, _)| format!("${}", idx + i as u32))
802 .collect::<Vec<_>>()
803 .join(", ");
804 sql.push_str(&format!(" AND l.address IN ({placeholders})"));
805 for addr in &addrs {
806 params.push(addr.as_slice().to_vec());
807 }
808 idx += addrs.len() as u32;
809 }
810 }
811
812 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
814 for (i, topic_filter) in filter.topics.iter().enumerate() {
815 if topic_filter.is_empty() {
816 continue;
817 }
818 let values: Vec<_> = topic_filter.iter().collect();
819 if values.len() == 1 {
820 sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
821 params.push(values[0].as_slice().to_vec());
822 idx += 1;
823 } else {
824 let placeholders: String = values
825 .iter()
826 .enumerate()
827 .map(|(j, _)| format!("${}", idx + j as u32))
828 .collect::<Vec<_>>()
829 .join(", ");
830 sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
831 for v in &values {
832 params.push(v.as_slice().to_vec());
833 }
834 idx += values.len() as u32;
835 }
836 }
837
838 sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
839
840 let mut query = sqlx::query(&sql).bind(to_i64(from)).bind(to_i64(to));
842 for param in ¶ms {
843 query = query.bind(param.as_slice());
844 }
845
846 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
847
848 rows.into_iter()
849 .map(|r| {
850 let log = row_to_log_row(&r).into_log();
851 let block_number = from_i64(r.get::<i64, _>("block_number"));
852 let block_hash_bytes: Vec<u8> = r.get("block_hash");
853 let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
854 Ok(RpcLog {
855 inner: log,
856 block_hash: Some(alloy::primitives::B256::from_slice(&block_hash_bytes)),
857 block_number: Some(block_number),
858 block_timestamp: Some(from_i64(r.get::<i64, _>("block_timestamp"))),
859 transaction_hash: Some(alloy::primitives::B256::from_slice(&tx_hash_bytes)),
860 transaction_index: Some(from_i64(r.get::<i64, _>("tx_index"))),
861 log_index: Some(from_i64(r.get::<i64, _>("block_log_index"))),
862 removed: false,
863 })
864 })
865 .collect::<ColdResult<Vec<_>>>()
866 }
867
868 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
869 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
870 .fetch_one(&self.pool)
871 .await
872 .map_err(SqlColdError::from)?;
873 Ok(row.get::<Option<i64>, _>("max_bn").map(from_i64))
874 }
875
876 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
877 self.insert_block(data).await.map_err(ColdStorageError::from)
878 }
879
880 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
881 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
882 for block_data in data {
883 write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
884 }
885 tx.commit().await.map_err(SqlColdError::from)?;
886 Ok(())
887 }
888
889 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
890 let bn = to_i64(block);
891 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
892
893 sqlx::query("DELETE FROM logs WHERE block_number > $1")
894 .bind(bn)
895 .execute(&mut *tx)
896 .await
897 .map_err(SqlColdError::from)?;
898 sqlx::query("DELETE FROM transactions WHERE block_number > $1")
899 .bind(bn)
900 .execute(&mut *tx)
901 .await
902 .map_err(SqlColdError::from)?;
903 sqlx::query("DELETE FROM receipts WHERE block_number > $1")
904 .bind(bn)
905 .execute(&mut *tx)
906 .await
907 .map_err(SqlColdError::from)?;
908 sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
909 .bind(bn)
910 .execute(&mut *tx)
911 .await
912 .map_err(SqlColdError::from)?;
913 sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
914 .bind(bn)
915 .execute(&mut *tx)
916 .await
917 .map_err(SqlColdError::from)?;
918 sqlx::query("DELETE FROM headers WHERE block_number > $1")
919 .bind(bn)
920 .execute(&mut *tx)
921 .await
922 .map_err(SqlColdError::from)?;
923
924 tx.commit().await.map_err(SqlColdError::from)?;
925 Ok(())
926 }
927}
928
929#[cfg(all(test, feature = "test-utils"))]
930mod tests {
931 use super::*;
932 use signet_cold::conformance::conformance;
933
934 #[tokio::test]
935 async fn sqlite_conformance() {
936 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
937 conformance(&backend).await.unwrap();
938 }
939
940 #[tokio::test]
941 async fn pg_conformance() {
942 let Ok(url) = std::env::var("DATABASE_URL") else {
943 eprintln!("skipping pg conformance: DATABASE_URL not set");
944 return;
945 };
946 let backend = SqlColdBackend::connect(&url).await.unwrap();
947 conformance(&backend).await.unwrap();
948 }
949}