1use crate::SqlColdError;
8use crate::convert::{
9 HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10 receipt_from_rows, to_i64,
11};
12use alloy::{
13 consensus::transaction::Recovered,
14 primitives::{BlockNumber, Sealable},
15};
16use signet_cold::{
17 BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
18 HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
19 ZenithHeaderSpecifier,
20};
21use signet_storage_types::{
22 ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
23 TransactionSigned,
24};
25use sqlx::{AnyPool, Row};
26
27#[derive(Debug, Clone)]
47pub struct SqlColdBackend {
48 pool: AnyPool,
49}
50
51impl SqlColdBackend {
52 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
59 let conn = pool.acquire().await?;
61 let backend = conn.backend_name().to_owned();
62 drop(conn);
63
64 let migration = match backend.as_str() {
65 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
66 "SQLite" => include_str!("../migrations/001_initial.sql"),
67 other => {
68 return Err(SqlColdError::Convert(format!(
69 "unsupported database backend: {other}"
70 )));
71 }
72 };
73 sqlx::raw_sql(migration).execute(&pool).await?;
76 Ok(Self { pool })
77 }
78
79 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
88 sqlx::any::install_default_drivers();
89 let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
90 Self::new(pool).await
91 }
92
93 async fn resolve_header_spec(
98 &self,
99 spec: HeaderSpecifier,
100 ) -> Result<Option<BlockNumber>, SqlColdError> {
101 match spec {
102 HeaderSpecifier::Number(n) => Ok(Some(n)),
103 HeaderSpecifier::Hash(hash) => {
104 let hash_bytes = hash.as_slice();
105 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
106 .bind(hash_bytes)
107 .fetch_optional(&self.pool)
108 .await?;
109 Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
110 }
111 }
112 }
113
114 async fn fetch_header_by_number(
119 &self,
120 block_num: BlockNumber,
121 ) -> Result<Option<SealedHeader>, SqlColdError> {
122 let bn = to_i64(block_num);
123 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
124 .bind(bn)
125 .fetch_optional(&self.pool)
126 .await?;
127
128 row.map(|r| {
129 let header = HeaderRow {
130 block_number: r.get("block_number"),
131 block_hash: r.get("block_hash"),
132 parent_hash: r.get("parent_hash"),
133 ommers_hash: r.get("ommers_hash"),
134 beneficiary: r.get("beneficiary"),
135 state_root: r.get("state_root"),
136 transactions_root: r.get("transactions_root"),
137 receipts_root: r.get("receipts_root"),
138 logs_bloom: r.get("logs_bloom"),
139 difficulty: r.get("difficulty"),
140 gas_limit: r.get("gas_limit"),
141 gas_used: r.get("gas_used"),
142 timestamp: r.get("timestamp"),
143 extra_data: r.get("extra_data"),
144 mix_hash: r.get("mix_hash"),
145 nonce: r.get("nonce"),
146 base_fee_per_gas: r.get("base_fee_per_gas"),
147 withdrawals_root: r.get("withdrawals_root"),
148 blob_gas_used: r.get("blob_gas_used"),
149 excess_blob_gas: r.get("excess_blob_gas"),
150 parent_beacon_block_root: r.get("parent_beacon_block_root"),
151 requests_hash: r.get("requests_hash"),
152 }
153 .into_header()?;
154 Ok(header.seal_slow())
155 })
156 .transpose()
157 }
158
159 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
164 let mut tx = self.pool.begin().await?;
165 let block = data.block_number();
166 let bn = to_i64(block);
167
168 let hr = HeaderRow::from_header(&data.header);
170 sqlx::query(
171 "INSERT INTO headers (
172 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
173 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
174 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
175 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
176 parent_beacon_block_root, requests_hash
177 ) VALUES (
178 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
179 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
180 )",
181 )
182 .bind(hr.block_number)
183 .bind(&hr.block_hash)
184 .bind(&hr.parent_hash)
185 .bind(&hr.ommers_hash)
186 .bind(&hr.beneficiary)
187 .bind(&hr.state_root)
188 .bind(&hr.transactions_root)
189 .bind(&hr.receipts_root)
190 .bind(&hr.logs_bloom)
191 .bind(&hr.difficulty)
192 .bind(hr.gas_limit)
193 .bind(hr.gas_used)
194 .bind(hr.timestamp)
195 .bind(&hr.extra_data)
196 .bind(&hr.mix_hash)
197 .bind(&hr.nonce)
198 .bind(hr.base_fee_per_gas)
199 .bind(&hr.withdrawals_root)
200 .bind(hr.blob_gas_used)
201 .bind(hr.excess_blob_gas)
202 .bind(&hr.parent_beacon_block_root)
203 .bind(&hr.requests_hash)
204 .execute(&mut *tx)
205 .await?;
206
207 for (idx, recovered_tx) in data.transactions.iter().enumerate() {
209 let sender = recovered_tx.signer();
210 let tx_signed: &TransactionSigned = recovered_tx;
211 let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64), &sender)?;
212 sqlx::query(
213 "INSERT INTO transactions (
214 block_number, tx_index, tx_hash, tx_type,
215 sig_y_parity, sig_r, sig_s,
216 chain_id, nonce, gas_limit, to_address, value, input,
217 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
218 max_fee_per_blob_gas, blob_versioned_hashes,
219 access_list, authorization_list, from_address
220 ) VALUES (
221 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
222 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
223 )",
224 )
225 .bind(tr.block_number)
226 .bind(tr.tx_index)
227 .bind(&tr.tx_hash)
228 .bind(tr.tx_type as i32)
229 .bind(tr.sig_y_parity as i32)
230 .bind(&tr.sig_r)
231 .bind(&tr.sig_s)
232 .bind(tr.chain_id)
233 .bind(tr.nonce)
234 .bind(tr.gas_limit)
235 .bind(&tr.to_address)
236 .bind(&tr.value)
237 .bind(&tr.input)
238 .bind(&tr.gas_price)
239 .bind(&tr.max_fee_per_gas)
240 .bind(&tr.max_priority_fee_per_gas)
241 .bind(&tr.max_fee_per_blob_gas)
242 .bind(&tr.blob_versioned_hashes)
243 .bind(&tr.access_list)
244 .bind(&tr.authorization_list)
245 .bind(&tr.from_address)
246 .execute(&mut *tx)
247 .await?;
248 }
249
250 for (idx, receipt) in data.receipts.iter().enumerate() {
252 let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
253 sqlx::query(
254 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
255 VALUES ($1, $2, $3, $4, $5)",
256 )
257 .bind(rr.block_number)
258 .bind(rr.tx_index)
259 .bind(rr.tx_type as i32)
260 .bind(rr.success as i32)
261 .bind(rr.cumulative_gas_used)
262 .execute(&mut *tx)
263 .await?;
264
265 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
266 let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
267 sqlx::query(
268 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
269 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
270 )
271 .bind(lr.block_number)
272 .bind(lr.tx_index)
273 .bind(lr.log_index)
274 .bind(&lr.address)
275 .bind(&lr.topic0)
276 .bind(&lr.topic1)
277 .bind(&lr.topic2)
278 .bind(&lr.topic3)
279 .bind(&lr.data)
280 .execute(&mut *tx)
281 .await?;
282 }
283 }
284
285 for (idx, event) in data.signet_events.iter().enumerate() {
287 let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
288 sqlx::query(
289 "INSERT INTO signet_events (
290 block_number, event_index, event_type, order_index,
291 rollup_chain_id, sender, to_address, value, gas,
292 max_fee_per_gas, data, rollup_recipient, amount, token
293 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
294 )
295 .bind(er.block_number)
296 .bind(er.event_index)
297 .bind(er.event_type as i32)
298 .bind(er.order_index)
299 .bind(&er.rollup_chain_id)
300 .bind(&er.sender)
301 .bind(&er.to_address)
302 .bind(&er.value)
303 .bind(&er.gas)
304 .bind(&er.max_fee_per_gas)
305 .bind(&er.data)
306 .bind(&er.rollup_recipient)
307 .bind(&er.amount)
308 .bind(&er.token)
309 .execute(&mut *tx)
310 .await?;
311 }
312
313 if let Some(zh) = &data.zenith_header {
315 let zr = ZenithHeaderRow::from_zenith(zh, bn);
316 sqlx::query(
317 "INSERT INTO zenith_headers (
318 block_number, host_block_number, rollup_chain_id,
319 gas_limit, reward_address, block_data_hash
320 ) VALUES ($1, $2, $3, $4, $5, $6)",
321 )
322 .bind(zr.block_number)
323 .bind(&zr.host_block_number)
324 .bind(&zr.rollup_chain_id)
325 .bind(&zr.gas_limit)
326 .bind(&zr.reward_address)
327 .bind(&zr.block_data_hash)
328 .execute(&mut *tx)
329 .await?;
330 }
331
332 tx.commit().await?;
333 Ok(())
334 }
335}
336
337fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
339 TxRow {
340 block_number: r.get("block_number"),
341 tx_index: r.get("tx_index"),
342 tx_hash: r.get("tx_hash"),
343 tx_type: r.get::<i32, _>("tx_type") as i16,
344 sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
345 sig_r: r.get("sig_r"),
346 sig_s: r.get("sig_s"),
347 chain_id: r.get("chain_id"),
348 nonce: r.get("nonce"),
349 gas_limit: r.get("gas_limit"),
350 to_address: r.get("to_address"),
351 value: r.get("value"),
352 input: r.get("input"),
353 gas_price: r.get("gas_price"),
354 max_fee_per_gas: r.get("max_fee_per_gas"),
355 max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
356 max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
357 blob_versioned_hashes: r.get("blob_versioned_hashes"),
358 access_list: r.get("access_list"),
359 authorization_list: r.get("authorization_list"),
360 from_address: r.get("from_address"),
361 }
362}
363
364fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
365 SignetEventRow {
366 block_number: r.get("block_number"),
367 event_index: r.get("event_index"),
368 event_type: r.get::<i32, _>("event_type") as i16,
369 order_index: r.get("order_index"),
370 rollup_chain_id: r.get("rollup_chain_id"),
371 sender: r.get("sender"),
372 to_address: r.get("to_address"),
373 value: r.get("value"),
374 gas: r.get("gas"),
375 max_fee_per_gas: r.get("max_fee_per_gas"),
376 data: r.get("data"),
377 rollup_recipient: r.get("rollup_recipient"),
378 amount: r.get("amount"),
379 token: r.get("token"),
380 }
381}
382
383fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
384 LogRow {
385 block_number: r.get("block_number"),
386 tx_index: r.get("tx_index"),
387 log_index: r.get("log_index"),
388 address: r.get("address"),
389 topic0: r.get("topic0"),
390 topic1: r.get("topic1"),
391 topic2: r.get("topic2"),
392 topic3: r.get("topic3"),
393 data: r.get("data"),
394 }
395}
396
397fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
398 ZenithHeaderRow {
399 block_number: r.get("block_number"),
400 host_block_number: r.get("host_block_number"),
401 rollup_chain_id: r.get("rollup_chain_id"),
402 gas_limit: r.get("gas_limit"),
403 reward_address: r.get("reward_address"),
404 block_data_hash: r.get("block_data_hash"),
405 }
406}
407
408impl ColdStorage for SqlColdBackend {
409 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
410 let Some(block_num) = self.resolve_header_spec(spec).await? else {
411 return Ok(None);
412 };
413 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
414 }
415
416 async fn get_headers(
417 &self,
418 specs: Vec<HeaderSpecifier>,
419 ) -> ColdResult<Vec<Option<SealedHeader>>> {
420 let mut results = Vec::with_capacity(specs.len());
421 for spec in specs {
422 let header = self.get_header(spec).await?;
423 results.push(header);
424 }
425 Ok(results)
426 }
427
428 async fn get_transaction(
429 &self,
430 spec: TransactionSpecifier,
431 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
432 let row = match spec {
433 TransactionSpecifier::Hash(hash) => sqlx::query(
434 "SELECT t.*, h.block_hash
435 FROM transactions t
436 JOIN headers h ON t.block_number = h.block_number
437 WHERE t.tx_hash = $1",
438 )
439 .bind(hash.as_slice())
440 .fetch_optional(&self.pool)
441 .await
442 .map_err(SqlColdError::from)?,
443 TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
444 "SELECT t.*, h.block_hash
445 FROM transactions t
446 JOIN headers h ON t.block_number = h.block_number
447 WHERE t.block_number = $1 AND t.tx_index = $2",
448 )
449 .bind(to_i64(block))
450 .bind(to_i64(index))
451 .fetch_optional(&self.pool)
452 .await
453 .map_err(SqlColdError::from)?,
454 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
455 "SELECT t.*, h.block_hash
456 FROM transactions t
457 JOIN headers h ON t.block_number = h.block_number
458 WHERE h.block_hash = $1 AND t.tx_index = $2",
459 )
460 .bind(block_hash.as_slice())
461 .bind(to_i64(index))
462 .fetch_optional(&self.pool)
463 .await
464 .map_err(SqlColdError::from)?,
465 };
466
467 let Some(r) = row else {
468 return Ok(None);
469 };
470
471 let tx_row = row_to_tx_row(&r);
472 let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
473 let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
474 let block = from_i64(r.get::<i64, _>("block_number"));
475 let index = from_i64(r.get::<i64, _>("tx_index"));
476 let hash_bytes: Vec<u8> = r.get("block_hash");
477 let block_hash = alloy::primitives::B256::from_slice(&hash_bytes);
478 let meta = ConfirmationMeta::new(block, block_hash, index);
479 let recovered = Recovered::new_unchecked(tx, sender);
481 Ok(Some(Confirmed::new(recovered, meta)))
482 }
483
484 async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
485 let bn = to_i64(block);
486 let rows =
487 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
488 .bind(bn)
489 .fetch_all(&self.pool)
490 .await
491 .map_err(SqlColdError::from)?;
492
493 rows.into_iter()
494 .map(|r| {
495 let tx_row = row_to_tx_row(&r);
496 let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
497 let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
498 Ok(Recovered::new_unchecked(tx, sender))
500 })
501 .collect()
502 }
503
504 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
505 let bn = to_i64(block);
506 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
507 .bind(bn)
508 .fetch_one(&self.pool)
509 .await
510 .map_err(SqlColdError::from)?;
511
512 Ok(from_i64(row.get::<i64, _>("cnt")))
513 }
514
515 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
516 let (block, index) = match spec {
518 ReceiptSpecifier::TxHash(hash) => {
519 let row = sqlx::query(
520 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
521 )
522 .bind(hash.as_slice())
523 .fetch_optional(&self.pool)
524 .await
525 .map_err(SqlColdError::from)?;
526 let Some(r) = row else { return Ok(None) };
527 (from_i64(r.get::<i64, _>("block_number")), from_i64(r.get::<i64, _>("tx_index")))
528 }
529 ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
530 };
531
532 let Some(header) = self.fetch_header_by_number(block).await? else {
533 return Ok(None);
534 };
535
536 let receipt_row = sqlx::query(
538 "SELECT r.*, t.tx_hash, t.from_address
539 FROM receipts r
540 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
541 WHERE r.block_number = $1 AND r.tx_index = $2",
542 )
543 .bind(to_i64(block))
544 .bind(to_i64(index))
545 .fetch_optional(&self.pool)
546 .await
547 .map_err(SqlColdError::from)?;
548
549 let Some(rr) = receipt_row else {
550 return Ok(None);
551 };
552
553 let bn: i64 = rr.get("block_number");
554 let tx_idx: i64 = rr.get("tx_index");
555 let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
556 let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
557 let from_bytes: Vec<u8> = rr.get("from_address");
558 let sender = alloy::primitives::Address::from_slice(&from_bytes);
559
560 let receipt = ReceiptRow {
561 block_number: bn,
562 tx_index: tx_idx,
563 tx_type: rr.get::<i32, _>("tx_type") as i16,
564 success: rr.get::<i32, _>("success") != 0,
565 cumulative_gas_used: rr.get("cumulative_gas_used"),
566 };
567
568 let log_rows = sqlx::query(
569 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
570 )
571 .bind(bn)
572 .bind(tx_idx)
573 .fetch_all(&self.pool)
574 .await
575 .map_err(SqlColdError::from)?;
576
577 let logs: Vec<_> = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
578
579 let built = receipt_from_rows(receipt, logs).map_err(ColdStorageError::from)?;
580
581 let prior = sqlx::query(
583 "SELECT CAST(SUM(
584 (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
585 ) AS bigint) as log_count,
586 CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
587 FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
588 )
589 .bind(to_i64(block))
590 .bind(to_i64(index))
591 .fetch_one(&self.pool)
592 .await
593 .map_err(SqlColdError::from)?;
594
595 let first_log_index: u64 = prior.get::<Option<i64>, _>("log_count").unwrap_or(0) as u64;
596 let prior_cumulative_gas: u64 =
597 prior.get::<Option<i64>, _>("prior_gas").unwrap_or(0) as u64;
598 let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
599
600 let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
601 Ok(Some(ColdReceipt::new(ir, &header, index)))
602 }
603
604 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
605 let Some(header) =
606 self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
607 else {
608 return Ok(Vec::new());
609 };
610
611 let bn = to_i64(block);
612
613 let receipt_rows = sqlx::query(
615 "SELECT r.*, t.tx_hash, t.from_address
616 FROM receipts r
617 JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
618 WHERE r.block_number = $1
619 ORDER BY r.tx_index",
620 )
621 .bind(bn)
622 .fetch_all(&self.pool)
623 .await
624 .map_err(SqlColdError::from)?;
625
626 let all_log_rows =
627 sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
628 .bind(bn)
629 .fetch_all(&self.pool)
630 .await
631 .map_err(SqlColdError::from)?;
632
633 let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
635 std::collections::BTreeMap::new();
636 for r in all_log_rows {
637 logs_by_tx.entry(r.get::<i64, _>("tx_index")).or_default().push(row_to_log_row(&r));
638 }
639
640 let mut first_log_index = 0u64;
641 let mut prior_cumulative_gas = 0u64;
642 receipt_rows
643 .into_iter()
644 .enumerate()
645 .map(|(idx, rr)| {
646 let tx_idx: i64 = rr.get("tx_index");
647 let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
648 let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
649 let from_bytes: Vec<u8> = rr.get("from_address");
650 let sender = alloy::primitives::Address::from_slice(&from_bytes);
651 let receipt_row = ReceiptRow {
652 block_number: rr.get("block_number"),
653 tx_index: tx_idx,
654 tx_type: rr.get::<i32, _>("tx_type") as i16,
655 success: rr.get::<i32, _>("success") != 0,
656 cumulative_gas_used: rr.get("cumulative_gas_used"),
657 };
658 let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
659 let receipt =
660 receipt_from_rows(receipt_row, logs).map_err(ColdStorageError::from)?;
661 let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
662 prior_cumulative_gas = receipt.inner.cumulative_gas_used;
663 let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
664 first_log_index += ir.receipt.inner.logs.len() as u64;
665 Ok(ColdReceipt::new(ir, &header, idx as u64))
666 })
667 .collect()
668 }
669
670 async fn get_signet_events(
671 &self,
672 spec: SignetEventsSpecifier,
673 ) -> ColdResult<Vec<DbSignetEvent>> {
674 let rows = match spec {
675 SignetEventsSpecifier::Block(block) => {
676 let bn = to_i64(block);
677 sqlx::query(
678 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
679 )
680 .bind(bn)
681 .fetch_all(&self.pool)
682 .await
683 .map_err(SqlColdError::from)?
684 }
685 SignetEventsSpecifier::BlockRange { start, end } => {
686 let s = to_i64(start);
687 let e = to_i64(end);
688 sqlx::query(
689 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
690 ORDER BY block_number, event_index",
691 )
692 .bind(s)
693 .bind(e)
694 .fetch_all(&self.pool)
695 .await
696 .map_err(SqlColdError::from)?
697 }
698 };
699
700 rows.into_iter()
701 .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
702 .collect()
703 }
704
705 async fn get_zenith_header(
706 &self,
707 spec: ZenithHeaderSpecifier,
708 ) -> ColdResult<Option<DbZenithHeader>> {
709 let block = match spec {
710 ZenithHeaderSpecifier::Number(n) => n,
711 ZenithHeaderSpecifier::Range { start, .. } => start,
712 };
713 let bn = to_i64(block);
714 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
715 .bind(bn)
716 .fetch_optional(&self.pool)
717 .await
718 .map_err(SqlColdError::from)?;
719
720 row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
721 .transpose()
722 }
723
724 async fn get_zenith_headers(
725 &self,
726 spec: ZenithHeaderSpecifier,
727 ) -> ColdResult<Vec<DbZenithHeader>> {
728 let rows = match spec {
729 ZenithHeaderSpecifier::Number(n) => {
730 let bn = to_i64(n);
731 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
732 .bind(bn)
733 .fetch_all(&self.pool)
734 .await
735 .map_err(SqlColdError::from)?
736 }
737 ZenithHeaderSpecifier::Range { start, end } => {
738 let s = to_i64(start);
739 let e = to_i64(end);
740 sqlx::query(
741 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
742 ORDER BY block_number",
743 )
744 .bind(s)
745 .bind(e)
746 .fetch_all(&self.pool)
747 .await
748 .map_err(SqlColdError::from)?
749 }
750 };
751
752 rows.into_iter()
753 .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
754 .collect()
755 }
756
757 async fn get_logs(&self, filter: Filter) -> ColdResult<Vec<RpcLog>> {
758 let from = filter.get_from_block().unwrap_or(0);
759 let to = filter.get_to_block().unwrap_or(u64::MAX);
760
761 let mut sql = String::from(
766 "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
767 (SELECT COUNT(*) FROM logs l2 \
768 WHERE l2.block_number = l.block_number \
769 AND (l2.tx_index < l.tx_index \
770 OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
771 ) AS block_log_index \
772 FROM logs l \
773 JOIN headers h ON l.block_number = h.block_number \
774 JOIN transactions t ON l.block_number = t.block_number \
775 AND l.tx_index = t.tx_index \
776 WHERE l.block_number >= $1 AND l.block_number <= $2",
777 );
778 let mut params: Vec<Vec<u8>> = Vec::new();
779 let mut idx = 3u32;
780
781 if !filter.address.is_empty() {
783 let addrs: Vec<_> = filter.address.iter().collect();
784 if addrs.len() == 1 {
785 sql.push_str(&format!(" AND l.address = ${idx}"));
786 params.push(addrs[0].as_slice().to_vec());
787 idx += 1;
788 } else {
789 let placeholders: String = addrs
790 .iter()
791 .enumerate()
792 .map(|(i, _)| format!("${}", idx + i as u32))
793 .collect::<Vec<_>>()
794 .join(", ");
795 sql.push_str(&format!(" AND l.address IN ({placeholders})"));
796 for addr in &addrs {
797 params.push(addr.as_slice().to_vec());
798 }
799 idx += addrs.len() as u32;
800 }
801 }
802
803 let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
805 for (i, topic_filter) in filter.topics.iter().enumerate() {
806 if topic_filter.is_empty() {
807 continue;
808 }
809 let values: Vec<_> = topic_filter.iter().collect();
810 if values.len() == 1 {
811 sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
812 params.push(values[0].as_slice().to_vec());
813 idx += 1;
814 } else {
815 let placeholders: String = values
816 .iter()
817 .enumerate()
818 .map(|(j, _)| format!("${}", idx + j as u32))
819 .collect::<Vec<_>>()
820 .join(", ");
821 sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
822 for v in &values {
823 params.push(v.as_slice().to_vec());
824 }
825 idx += values.len() as u32;
826 }
827 }
828
829 sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
830
831 let mut query = sqlx::query(&sql).bind(to_i64(from)).bind(to_i64(to));
833 for param in ¶ms {
834 query = query.bind(param.as_slice());
835 }
836
837 let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
838
839 rows.into_iter()
840 .map(|r| {
841 let log = row_to_log_row(&r).into_log();
842 let block_number = from_i64(r.get::<i64, _>("block_number"));
843 let block_hash_bytes: Vec<u8> = r.get("block_hash");
844 let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
845 Ok(RpcLog {
846 inner: log,
847 block_hash: Some(alloy::primitives::B256::from_slice(&block_hash_bytes)),
848 block_number: Some(block_number),
849 block_timestamp: Some(from_i64(r.get::<i64, _>("block_timestamp"))),
850 transaction_hash: Some(alloy::primitives::B256::from_slice(&tx_hash_bytes)),
851 transaction_index: Some(from_i64(r.get::<i64, _>("tx_index"))),
852 log_index: Some(from_i64(r.get::<i64, _>("block_log_index"))),
853 removed: false,
854 })
855 })
856 .collect::<ColdResult<Vec<_>>>()
857 }
858
859 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
860 let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
861 .fetch_one(&self.pool)
862 .await
863 .map_err(SqlColdError::from)?;
864 Ok(row.get::<Option<i64>, _>("max_bn").map(from_i64))
865 }
866
867 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
868 self.insert_block(data).await.map_err(ColdStorageError::from)
869 }
870
871 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
872 for block_data in data {
873 self.insert_block(block_data).await?;
874 }
875 Ok(())
876 }
877
878 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
879 let bn = to_i64(block);
880 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
881
882 sqlx::query("DELETE FROM logs WHERE block_number > $1")
883 .bind(bn)
884 .execute(&mut *tx)
885 .await
886 .map_err(SqlColdError::from)?;
887 sqlx::query("DELETE FROM transactions WHERE block_number > $1")
888 .bind(bn)
889 .execute(&mut *tx)
890 .await
891 .map_err(SqlColdError::from)?;
892 sqlx::query("DELETE FROM receipts WHERE block_number > $1")
893 .bind(bn)
894 .execute(&mut *tx)
895 .await
896 .map_err(SqlColdError::from)?;
897 sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
898 .bind(bn)
899 .execute(&mut *tx)
900 .await
901 .map_err(SqlColdError::from)?;
902 sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
903 .bind(bn)
904 .execute(&mut *tx)
905 .await
906 .map_err(SqlColdError::from)?;
907 sqlx::query("DELETE FROM headers WHERE block_number > $1")
908 .bind(bn)
909 .execute(&mut *tx)
910 .await
911 .map_err(SqlColdError::from)?;
912
913 tx.commit().await.map_err(SqlColdError::from)?;
914 Ok(())
915 }
916}
917
918#[cfg(all(test, feature = "test-utils"))]
919mod tests {
920 use super::*;
921 use signet_cold::conformance::conformance;
922
923 #[tokio::test]
924 async fn sqlite_conformance() {
925 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
926 conformance(&backend).await.unwrap();
927 }
928
929 #[tokio::test]
930 async fn pg_conformance() {
931 let Ok(url) = std::env::var("DATABASE_URL") else {
932 eprintln!("skipping pg conformance: DATABASE_URL not set");
933 return;
934 };
935 let backend = SqlColdBackend::connect(&url).await.unwrap();
936 conformance(&backend).await.unwrap();
937 }
938}