1use crate::SqlColdError;
8use crate::convert::{
9 HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10 receipt_from_rows, to_i64,
11};
12use alloy::{consensus::Header, primitives::BlockNumber};
13use signet_cold::{
14 BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier,
15 ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
16};
17use signet_storage_types::{
18 ConfirmationMeta, DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned,
19};
20use sqlx::{AnyPool, Row};
21
22#[derive(Debug, Clone)]
42pub struct SqlColdBackend {
43 pool: AnyPool,
44}
45
46impl SqlColdBackend {
47 pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
54 let conn = pool.acquire().await?;
56 let backend = conn.backend_name().to_owned();
57 drop(conn);
58
59 let migration = match backend.as_str() {
60 "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
61 "SQLite" => include_str!("../migrations/001_initial.sql"),
62 other => {
63 return Err(SqlColdError::Convert(format!(
64 "unsupported database backend: {other}"
65 )));
66 }
67 };
68 sqlx::raw_sql(migration).execute(&pool).await?;
71 Ok(Self { pool })
72 }
73
74 pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
83 sqlx::any::install_default_drivers();
84 let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
85 Self::new(pool).await
86 }
87
88 async fn resolve_header_spec(
93 &self,
94 spec: HeaderSpecifier,
95 ) -> Result<Option<BlockNumber>, SqlColdError> {
96 match spec {
97 HeaderSpecifier::Number(n) => Ok(Some(n)),
98 HeaderSpecifier::Hash(hash) => {
99 let hash_bytes = hash.as_slice();
100 let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
101 .bind(hash_bytes)
102 .fetch_optional(&self.pool)
103 .await?;
104 Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
105 }
106 }
107 }
108
109 async fn resolve_tx_spec(
110 &self,
111 spec: TransactionSpecifier,
112 ) -> Result<Option<(BlockNumber, u64)>, SqlColdError> {
113 match spec {
114 TransactionSpecifier::Hash(hash) => {
115 let hash_bytes = hash.as_slice();
116 let row = sqlx::query(
117 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
118 )
119 .bind(hash_bytes)
120 .fetch_optional(&self.pool)
121 .await?;
122 Ok(row.map(|r| {
123 (
124 from_i64(r.get::<i64, _>("block_number")),
125 from_i64(r.get::<i64, _>("tx_index")),
126 )
127 }))
128 }
129 TransactionSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))),
130 TransactionSpecifier::BlockHashAndIndex { block_hash, index } => {
131 let block = self.resolve_header_spec(HeaderSpecifier::Hash(block_hash)).await?;
132 Ok(block.map(|b| (b, index)))
133 }
134 }
135 }
136
137 async fn resolve_receipt_spec(
138 &self,
139 spec: ReceiptSpecifier,
140 ) -> Result<Option<(BlockNumber, u64)>, SqlColdError> {
141 match spec {
142 ReceiptSpecifier::TxHash(hash) => {
143 let hash_bytes = hash.as_slice();
144 let row = sqlx::query(
145 "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
146 )
147 .bind(hash_bytes)
148 .fetch_optional(&self.pool)
149 .await?;
150 Ok(row.map(|r| {
151 (
152 from_i64(r.get::<i64, _>("block_number")),
153 from_i64(r.get::<i64, _>("tx_index")),
154 )
155 }))
156 }
157 ReceiptSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))),
158 }
159 }
160
161 async fn fetch_header_by_number(
166 &self,
167 block_num: BlockNumber,
168 ) -> Result<Option<Header>, SqlColdError> {
169 let bn = to_i64(block_num);
170 let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
171 .bind(bn)
172 .fetch_optional(&self.pool)
173 .await?;
174
175 row.map(|r| {
176 HeaderRow {
177 block_number: r.get("block_number"),
178 block_hash: r.get("block_hash"),
179 parent_hash: r.get("parent_hash"),
180 ommers_hash: r.get("ommers_hash"),
181 beneficiary: r.get("beneficiary"),
182 state_root: r.get("state_root"),
183 transactions_root: r.get("transactions_root"),
184 receipts_root: r.get("receipts_root"),
185 logs_bloom: r.get("logs_bloom"),
186 difficulty: r.get("difficulty"),
187 gas_limit: r.get("gas_limit"),
188 gas_used: r.get("gas_used"),
189 timestamp: r.get("timestamp"),
190 extra_data: r.get("extra_data"),
191 mix_hash: r.get("mix_hash"),
192 nonce: r.get("nonce"),
193 base_fee_per_gas: r.get("base_fee_per_gas"),
194 withdrawals_root: r.get("withdrawals_root"),
195 blob_gas_used: r.get("blob_gas_used"),
196 excess_blob_gas: r.get("excess_blob_gas"),
197 parent_beacon_block_root: r.get("parent_beacon_block_root"),
198 requests_hash: r.get("requests_hash"),
199 }
200 .into_header()
201 })
202 .transpose()
203 }
204
205 async fn fetch_block_hash(
206 &self,
207 block_num: BlockNumber,
208 ) -> Result<Option<alloy::primitives::B256>, SqlColdError> {
209 let bn = to_i64(block_num);
210 let row = sqlx::query("SELECT block_hash FROM headers WHERE block_number = $1")
211 .bind(bn)
212 .fetch_optional(&self.pool)
213 .await?;
214 Ok(row.map(|r| {
215 let bytes: Vec<u8> = r.get("block_hash");
216 alloy::primitives::B256::from_slice(&bytes)
217 }))
218 }
219
220 async fn fetch_tx_by_location(
221 &self,
222 block: BlockNumber,
223 index: u64,
224 ) -> Result<Option<TransactionSigned>, SqlColdError> {
225 let bn = to_i64(block);
226 let idx = to_i64(index);
227 let row =
228 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 AND tx_index = $2")
229 .bind(bn)
230 .bind(idx)
231 .fetch_optional(&self.pool)
232 .await?;
233
234 row.map(|r| row_to_tx_row(&r).into_tx()).transpose()
235 }
236
237 async fn fetch_receipt_by_location(
238 &self,
239 block: BlockNumber,
240 index: u64,
241 ) -> Result<Option<Receipt>, SqlColdError> {
242 let bn = to_i64(block);
243 let idx = to_i64(index);
244
245 let receipt_row =
246 sqlx::query("SELECT * FROM receipts WHERE block_number = $1 AND tx_index = $2")
247 .bind(bn)
248 .bind(idx)
249 .fetch_optional(&self.pool)
250 .await?;
251
252 let Some(rr) = receipt_row else {
253 return Ok(None);
254 };
255
256 let receipt = ReceiptRow {
257 block_number: rr.get("block_number"),
258 tx_index: rr.get("tx_index"),
259 tx_type: rr.get::<i32, _>("tx_type") as i16,
260 success: rr.get::<i32, _>("success") != 0,
261 cumulative_gas_used: rr.get("cumulative_gas_used"),
262 };
263
264 let log_rows = sqlx::query(
265 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
266 )
267 .bind(bn)
268 .bind(idx)
269 .fetch_all(&self.pool)
270 .await?;
271
272 let logs = log_rows
273 .into_iter()
274 .map(|r| LogRow {
275 block_number: r.get("block_number"),
276 tx_index: r.get("tx_index"),
277 log_index: r.get("log_index"),
278 address: r.get("address"),
279 topic0: r.get("topic0"),
280 topic1: r.get("topic1"),
281 topic2: r.get("topic2"),
282 topic3: r.get("topic3"),
283 data: r.get("data"),
284 })
285 .collect();
286
287 receipt_from_rows(receipt, logs).map(Some)
288 }
289
290 async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
295 let mut tx = self.pool.begin().await?;
296 let block = data.block_number();
297 let bn = to_i64(block);
298
299 let hr = HeaderRow::from_header(&data.header);
301 sqlx::query(
302 "INSERT INTO headers (
303 block_number, block_hash, parent_hash, ommers_hash, beneficiary,
304 state_root, transactions_root, receipts_root, logs_bloom, difficulty,
305 gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
306 base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
307 parent_beacon_block_root, requests_hash
308 ) VALUES (
309 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
310 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
311 )",
312 )
313 .bind(hr.block_number)
314 .bind(&hr.block_hash)
315 .bind(&hr.parent_hash)
316 .bind(&hr.ommers_hash)
317 .bind(&hr.beneficiary)
318 .bind(&hr.state_root)
319 .bind(&hr.transactions_root)
320 .bind(&hr.receipts_root)
321 .bind(&hr.logs_bloom)
322 .bind(&hr.difficulty)
323 .bind(hr.gas_limit)
324 .bind(hr.gas_used)
325 .bind(hr.timestamp)
326 .bind(&hr.extra_data)
327 .bind(&hr.mix_hash)
328 .bind(&hr.nonce)
329 .bind(hr.base_fee_per_gas)
330 .bind(&hr.withdrawals_root)
331 .bind(hr.blob_gas_used)
332 .bind(hr.excess_blob_gas)
333 .bind(&hr.parent_beacon_block_root)
334 .bind(&hr.requests_hash)
335 .execute(&mut *tx)
336 .await?;
337
338 for (idx, tx_signed) in data.transactions.iter().enumerate() {
340 let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64))?;
341 sqlx::query(
342 "INSERT INTO transactions (
343 block_number, tx_index, tx_hash, tx_type,
344 sig_y_parity, sig_r, sig_s,
345 chain_id, nonce, gas_limit, to_address, value, input,
346 gas_price, max_fee_per_gas, max_priority_fee_per_gas,
347 max_fee_per_blob_gas, blob_versioned_hashes,
348 access_list, authorization_list
349 ) VALUES (
350 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
351 $11, $12, $13, $14, $15, $16, $17, $18, $19, $20
352 )",
353 )
354 .bind(tr.block_number)
355 .bind(tr.tx_index)
356 .bind(&tr.tx_hash)
357 .bind(tr.tx_type as i32)
358 .bind(tr.sig_y_parity as i32)
359 .bind(&tr.sig_r)
360 .bind(&tr.sig_s)
361 .bind(tr.chain_id)
362 .bind(tr.nonce)
363 .bind(tr.gas_limit)
364 .bind(&tr.to_address)
365 .bind(&tr.value)
366 .bind(&tr.input)
367 .bind(&tr.gas_price)
368 .bind(&tr.max_fee_per_gas)
369 .bind(&tr.max_priority_fee_per_gas)
370 .bind(&tr.max_fee_per_blob_gas)
371 .bind(&tr.blob_versioned_hashes)
372 .bind(&tr.access_list)
373 .bind(&tr.authorization_list)
374 .execute(&mut *tx)
375 .await?;
376 }
377
378 for (idx, receipt) in data.receipts.iter().enumerate() {
380 let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
381 sqlx::query(
382 "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
383 VALUES ($1, $2, $3, $4, $5)",
384 )
385 .bind(rr.block_number)
386 .bind(rr.tx_index)
387 .bind(rr.tx_type as i32)
388 .bind(rr.success as i32)
389 .bind(rr.cumulative_gas_used)
390 .execute(&mut *tx)
391 .await?;
392
393 for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
394 let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
395 sqlx::query(
396 "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
397 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
398 )
399 .bind(lr.block_number)
400 .bind(lr.tx_index)
401 .bind(lr.log_index)
402 .bind(&lr.address)
403 .bind(&lr.topic0)
404 .bind(&lr.topic1)
405 .bind(&lr.topic2)
406 .bind(&lr.topic3)
407 .bind(&lr.data)
408 .execute(&mut *tx)
409 .await?;
410 }
411 }
412
413 for (idx, event) in data.signet_events.iter().enumerate() {
415 let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
416 sqlx::query(
417 "INSERT INTO signet_events (
418 block_number, event_index, event_type, order_index,
419 rollup_chain_id, sender, to_address, value, gas,
420 max_fee_per_gas, data, rollup_recipient, amount, token
421 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
422 )
423 .bind(er.block_number)
424 .bind(er.event_index)
425 .bind(er.event_type as i32)
426 .bind(er.order_index)
427 .bind(&er.rollup_chain_id)
428 .bind(&er.sender)
429 .bind(&er.to_address)
430 .bind(&er.value)
431 .bind(&er.gas)
432 .bind(&er.max_fee_per_gas)
433 .bind(&er.data)
434 .bind(&er.rollup_recipient)
435 .bind(&er.amount)
436 .bind(&er.token)
437 .execute(&mut *tx)
438 .await?;
439 }
440
441 if let Some(zh) = &data.zenith_header {
443 let zr = ZenithHeaderRow::from_zenith(zh, bn);
444 sqlx::query(
445 "INSERT INTO zenith_headers (
446 block_number, host_block_number, rollup_chain_id,
447 gas_limit, reward_address, block_data_hash
448 ) VALUES ($1, $2, $3, $4, $5, $6)",
449 )
450 .bind(zr.block_number)
451 .bind(&zr.host_block_number)
452 .bind(&zr.rollup_chain_id)
453 .bind(&zr.gas_limit)
454 .bind(&zr.reward_address)
455 .bind(&zr.block_data_hash)
456 .execute(&mut *tx)
457 .await?;
458 }
459
460 let current_latest: Option<i64> =
462 sqlx::query("SELECT block_number FROM metadata WHERE key = 'latest_block'")
463 .fetch_optional(&mut *tx)
464 .await?
465 .map(|r| r.get("block_number"));
466
467 let new_latest = current_latest.map_or(bn, |prev| prev.max(bn));
468 sqlx::query(
469 "INSERT INTO metadata (key, block_number) VALUES ('latest_block', $1)
470 ON CONFLICT(key) DO UPDATE SET block_number = $1",
471 )
472 .bind(new_latest)
473 .execute(&mut *tx)
474 .await?;
475
476 let current_earliest: Option<i64> =
477 sqlx::query("SELECT block_number FROM metadata WHERE key = 'earliest_block'")
478 .fetch_optional(&mut *tx)
479 .await?
480 .map(|r| r.get("block_number"));
481
482 let new_earliest = current_earliest.map_or(bn, |prev| prev.min(bn));
483 sqlx::query(
484 "INSERT INTO metadata (key, block_number) VALUES ('earliest_block', $1)
485 ON CONFLICT(key) DO UPDATE SET block_number = $1",
486 )
487 .bind(new_earliest)
488 .execute(&mut *tx)
489 .await?;
490
491 tx.commit().await?;
492 Ok(())
493 }
494}
495
496fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
498 TxRow {
499 block_number: r.get("block_number"),
500 tx_index: r.get("tx_index"),
501 tx_hash: r.get("tx_hash"),
502 tx_type: r.get::<i32, _>("tx_type") as i16,
503 sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
504 sig_r: r.get("sig_r"),
505 sig_s: r.get("sig_s"),
506 chain_id: r.get("chain_id"),
507 nonce: r.get("nonce"),
508 gas_limit: r.get("gas_limit"),
509 to_address: r.get("to_address"),
510 value: r.get("value"),
511 input: r.get("input"),
512 gas_price: r.get("gas_price"),
513 max_fee_per_gas: r.get("max_fee_per_gas"),
514 max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
515 max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
516 blob_versioned_hashes: r.get("blob_versioned_hashes"),
517 access_list: r.get("access_list"),
518 authorization_list: r.get("authorization_list"),
519 }
520}
521
522fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
523 SignetEventRow {
524 block_number: r.get("block_number"),
525 event_index: r.get("event_index"),
526 event_type: r.get::<i32, _>("event_type") as i16,
527 order_index: r.get("order_index"),
528 rollup_chain_id: r.get("rollup_chain_id"),
529 sender: r.get("sender"),
530 to_address: r.get("to_address"),
531 value: r.get("value"),
532 gas: r.get("gas"),
533 max_fee_per_gas: r.get("max_fee_per_gas"),
534 data: r.get("data"),
535 rollup_recipient: r.get("rollup_recipient"),
536 amount: r.get("amount"),
537 token: r.get("token"),
538 }
539}
540
541fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
542 ZenithHeaderRow {
543 block_number: r.get("block_number"),
544 host_block_number: r.get("host_block_number"),
545 rollup_chain_id: r.get("rollup_chain_id"),
546 gas_limit: r.get("gas_limit"),
547 reward_address: r.get("reward_address"),
548 block_data_hash: r.get("block_data_hash"),
549 }
550}
551
552impl ColdStorage for SqlColdBackend {
553 async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<Header>> {
554 let Some(block_num) = self.resolve_header_spec(spec).await? else {
555 return Ok(None);
556 };
557 self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
558 }
559
560 async fn get_headers(&self, specs: Vec<HeaderSpecifier>) -> ColdResult<Vec<Option<Header>>> {
561 let mut results = Vec::with_capacity(specs.len());
562 for spec in specs {
563 let header = self.get_header(spec).await?;
564 results.push(header);
565 }
566 Ok(results)
567 }
568
569 async fn get_transaction(
570 &self,
571 spec: TransactionSpecifier,
572 ) -> ColdResult<Option<Confirmed<TransactionSigned>>> {
573 let Some((block, index)) = self.resolve_tx_spec(spec).await? else {
574 return Ok(None);
575 };
576 let Some(tx) = self.fetch_tx_by_location(block, index).await? else {
577 return Ok(None);
578 };
579 let Some(block_hash) = self.fetch_block_hash(block).await? else {
580 return Ok(None);
581 };
582 let meta = ConfirmationMeta::new(block, block_hash, index);
583 Ok(Some(Confirmed::new(tx, meta)))
584 }
585
586 async fn get_transactions_in_block(
587 &self,
588 block: BlockNumber,
589 ) -> ColdResult<Vec<TransactionSigned>> {
590 let bn = to_i64(block);
591 let rows =
592 sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
593 .bind(bn)
594 .fetch_all(&self.pool)
595 .await
596 .map_err(SqlColdError::from)?;
597
598 rows.into_iter()
599 .map(|r| row_to_tx_row(&r).into_tx().map_err(ColdStorageError::from))
600 .collect()
601 }
602
603 async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
604 let bn = to_i64(block);
605 let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
606 .bind(bn)
607 .fetch_one(&self.pool)
608 .await
609 .map_err(SqlColdError::from)?;
610
611 Ok(from_i64(row.get::<i64, _>("cnt")))
612 }
613
614 async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<Confirmed<Receipt>>> {
615 let Some((block, index)) = self.resolve_receipt_spec(spec).await? else {
616 return Ok(None);
617 };
618 let Some(receipt) = self.fetch_receipt_by_location(block, index).await? else {
619 return Ok(None);
620 };
621 let Some(block_hash) = self.fetch_block_hash(block).await? else {
622 return Ok(None);
623 };
624 let meta = ConfirmationMeta::new(block, block_hash, index);
625 Ok(Some(Confirmed::new(receipt, meta)))
626 }
627
628 async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<Receipt>> {
629 let bn = to_i64(block);
630
631 let receipt_rows =
632 sqlx::query("SELECT * FROM receipts WHERE block_number = $1 ORDER BY tx_index")
633 .bind(bn)
634 .fetch_all(&self.pool)
635 .await
636 .map_err(SqlColdError::from)?;
637
638 let mut receipts = Vec::with_capacity(receipt_rows.len());
639 for rr in receipt_rows {
640 let tx_idx: i64 = rr.get("tx_index");
641 let receipt = ReceiptRow {
642 block_number: rr.get("block_number"),
643 tx_index: tx_idx,
644 tx_type: rr.get::<i32, _>("tx_type") as i16,
645 success: rr.get::<i32, _>("success") != 0,
646 cumulative_gas_used: rr.get("cumulative_gas_used"),
647 };
648
649 let log_rows = sqlx::query(
650 "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
651 )
652 .bind(bn)
653 .bind(tx_idx)
654 .fetch_all(&self.pool)
655 .await
656 .map_err(SqlColdError::from)?;
657
658 let logs = log_rows
659 .into_iter()
660 .map(|r| LogRow {
661 block_number: r.get("block_number"),
662 tx_index: r.get("tx_index"),
663 log_index: r.get("log_index"),
664 address: r.get("address"),
665 topic0: r.get("topic0"),
666 topic1: r.get("topic1"),
667 topic2: r.get("topic2"),
668 topic3: r.get("topic3"),
669 data: r.get("data"),
670 })
671 .collect();
672
673 receipts.push(receipt_from_rows(receipt, logs)?);
674 }
675
676 Ok(receipts)
677 }
678
679 async fn get_signet_events(
680 &self,
681 spec: SignetEventsSpecifier,
682 ) -> ColdResult<Vec<DbSignetEvent>> {
683 let rows = match spec {
684 SignetEventsSpecifier::Block(block) => {
685 let bn = to_i64(block);
686 sqlx::query(
687 "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
688 )
689 .bind(bn)
690 .fetch_all(&self.pool)
691 .await
692 .map_err(SqlColdError::from)?
693 }
694 SignetEventsSpecifier::BlockRange { start, end } => {
695 let s = to_i64(start);
696 let e = to_i64(end);
697 sqlx::query(
698 "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
699 ORDER BY block_number, event_index",
700 )
701 .bind(s)
702 .bind(e)
703 .fetch_all(&self.pool)
704 .await
705 .map_err(SqlColdError::from)?
706 }
707 };
708
709 rows.into_iter()
710 .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
711 .collect()
712 }
713
714 async fn get_zenith_header(
715 &self,
716 spec: ZenithHeaderSpecifier,
717 ) -> ColdResult<Option<DbZenithHeader>> {
718 let block = match spec {
719 ZenithHeaderSpecifier::Number(n) => n,
720 ZenithHeaderSpecifier::Range { start, .. } => start,
721 };
722 let bn = to_i64(block);
723 let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
724 .bind(bn)
725 .fetch_optional(&self.pool)
726 .await
727 .map_err(SqlColdError::from)?;
728
729 row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
730 .transpose()
731 }
732
733 async fn get_zenith_headers(
734 &self,
735 spec: ZenithHeaderSpecifier,
736 ) -> ColdResult<Vec<DbZenithHeader>> {
737 let rows = match spec {
738 ZenithHeaderSpecifier::Number(n) => {
739 let bn = to_i64(n);
740 sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
741 .bind(bn)
742 .fetch_all(&self.pool)
743 .await
744 .map_err(SqlColdError::from)?
745 }
746 ZenithHeaderSpecifier::Range { start, end } => {
747 let s = to_i64(start);
748 let e = to_i64(end);
749 sqlx::query(
750 "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
751 ORDER BY block_number",
752 )
753 .bind(s)
754 .bind(e)
755 .fetch_all(&self.pool)
756 .await
757 .map_err(SqlColdError::from)?
758 }
759 };
760
761 rows.into_iter()
762 .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
763 .collect()
764 }
765
766 async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
767 let row = sqlx::query("SELECT block_number FROM metadata WHERE key = $1")
768 .bind("latest_block")
769 .fetch_optional(&self.pool)
770 .await
771 .map_err(SqlColdError::from)?;
772 Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
773 }
774
775 async fn append_block(&self, data: BlockData) -> ColdResult<()> {
776 self.insert_block(data).await.map_err(ColdStorageError::from)
777 }
778
779 async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
780 for block_data in data {
781 self.insert_block(block_data).await?;
782 }
783 Ok(())
784 }
785
786 async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
787 let bn = to_i64(block);
788 let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
789
790 sqlx::query("DELETE FROM logs WHERE block_number > $1")
791 .bind(bn)
792 .execute(&mut *tx)
793 .await
794 .map_err(SqlColdError::from)?;
795 sqlx::query("DELETE FROM transactions WHERE block_number > $1")
796 .bind(bn)
797 .execute(&mut *tx)
798 .await
799 .map_err(SqlColdError::from)?;
800 sqlx::query("DELETE FROM receipts WHERE block_number > $1")
801 .bind(bn)
802 .execute(&mut *tx)
803 .await
804 .map_err(SqlColdError::from)?;
805 sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
806 .bind(bn)
807 .execute(&mut *tx)
808 .await
809 .map_err(SqlColdError::from)?;
810 sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
811 .bind(bn)
812 .execute(&mut *tx)
813 .await
814 .map_err(SqlColdError::from)?;
815 sqlx::query("DELETE FROM headers WHERE block_number > $1")
816 .bind(bn)
817 .execute(&mut *tx)
818 .await
819 .map_err(SqlColdError::from)?;
820
821 let new_latest: Option<i64> =
823 sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
824 .fetch_one(&mut *tx)
825 .await
826 .map_err(SqlColdError::from)?
827 .get("max_bn");
828
829 match new_latest {
830 Some(latest) => {
831 sqlx::query(
832 "INSERT INTO metadata (key, block_number) VALUES ('latest_block', $1)
833 ON CONFLICT(key) DO UPDATE SET block_number = $1",
834 )
835 .bind(latest)
836 .execute(&mut *tx)
837 .await
838 .map_err(SqlColdError::from)?;
839 }
840 None => {
841 sqlx::query("DELETE FROM metadata WHERE key = 'latest_block'")
842 .execute(&mut *tx)
843 .await
844 .map_err(SqlColdError::from)?;
845 }
846 }
847
848 tx.commit().await.map_err(SqlColdError::from)?;
849 Ok(())
850 }
851}
852
853#[cfg(all(test, feature = "test-utils"))]
854mod tests {
855 use super::*;
856 use signet_cold::conformance::conformance;
857
858 #[tokio::test]
859 async fn sqlite_conformance() {
860 let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
861 conformance(&backend).await.unwrap();
862 }
863
864 #[tokio::test]
865 async fn pg_conformance() {
866 let Ok(url) = std::env::var("DATABASE_URL") else {
867 eprintln!("skipping pg conformance: DATABASE_URL not set");
868 return;
869 };
870 let backend = SqlColdBackend::connect(&url).await.unwrap();
871 conformance(&backend).await.unwrap();
872 }
873}