Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::convert::{
9    HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10    receipt_from_rows, to_i64,
11};
12use alloy::{consensus::Header, primitives::BlockNumber};
13use signet_cold::{
14    BlockData, ColdResult, ColdStorage, ColdStorageError, Confirmed, HeaderSpecifier,
15    ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
16};
17use signet_storage_types::{
18    ConfirmationMeta, DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned,
19};
20use sqlx::{AnyPool, Row};
21
22/// SQL-based cold storage backend.
23///
24/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
25/// PostgreSQL and SQLite through a single implementation. The backend
26/// is determined by the connection URL at construction time.
27///
28/// # Example
29///
30/// ```no_run
31/// # async fn example() {
32/// use signet_cold_sql::SqlColdBackend;
33///
34/// // SQLite (in-memory)
35/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
36///
37/// // PostgreSQL
38/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
39/// # }
40/// ```
41#[derive(Debug, Clone)]
42pub struct SqlColdBackend {
43    pool: AnyPool,
44}
45
46impl SqlColdBackend {
47    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
48    ///
49    /// Auto-detects the database backend and creates all tables if they
50    /// do not already exist. Callers must ensure
51    /// [`sqlx::any::install_default_drivers`] has been called before
52    /// constructing the pool.
53    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
54        // Detect backend from a pooled connection.
55        let conn = pool.acquire().await?;
56        let backend = conn.backend_name().to_owned();
57        drop(conn);
58
59        let migration = match backend.as_str() {
60            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
61            "SQLite" => include_str!("../migrations/001_initial.sql"),
62            other => {
63                return Err(SqlColdError::Convert(format!(
64                    "unsupported database backend: {other}"
65                )));
66            }
67        };
68        // Execute via pool to ensure the migration uses the same
69        // connection that subsequent queries will use.
70        sqlx::raw_sql(migration).execute(&pool).await?;
71        Ok(Self { pool })
72    }
73
74    /// Connect to a database URL and create the backend.
75    ///
76    /// Installs the default sqlx drivers on the first call. The database
77    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
78    ///
79    /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
80    /// limited to one connection to ensure all operations share the same
81    /// database.
82    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
83        sqlx::any::install_default_drivers();
84        let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
85        Self::new(pool).await
86    }
87
88    // ========================================================================
89    // Specifier resolution
90    // ========================================================================
91
92    async fn resolve_header_spec(
93        &self,
94        spec: HeaderSpecifier,
95    ) -> Result<Option<BlockNumber>, SqlColdError> {
96        match spec {
97            HeaderSpecifier::Number(n) => Ok(Some(n)),
98            HeaderSpecifier::Hash(hash) => {
99                let hash_bytes = hash.as_slice();
100                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
101                    .bind(hash_bytes)
102                    .fetch_optional(&self.pool)
103                    .await?;
104                Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
105            }
106        }
107    }
108
109    async fn resolve_tx_spec(
110        &self,
111        spec: TransactionSpecifier,
112    ) -> Result<Option<(BlockNumber, u64)>, SqlColdError> {
113        match spec {
114            TransactionSpecifier::Hash(hash) => {
115                let hash_bytes = hash.as_slice();
116                let row = sqlx::query(
117                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
118                )
119                .bind(hash_bytes)
120                .fetch_optional(&self.pool)
121                .await?;
122                Ok(row.map(|r| {
123                    (
124                        from_i64(r.get::<i64, _>("block_number")),
125                        from_i64(r.get::<i64, _>("tx_index")),
126                    )
127                }))
128            }
129            TransactionSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))),
130            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => {
131                let block = self.resolve_header_spec(HeaderSpecifier::Hash(block_hash)).await?;
132                Ok(block.map(|b| (b, index)))
133            }
134        }
135    }
136
137    async fn resolve_receipt_spec(
138        &self,
139        spec: ReceiptSpecifier,
140    ) -> Result<Option<(BlockNumber, u64)>, SqlColdError> {
141        match spec {
142            ReceiptSpecifier::TxHash(hash) => {
143                let hash_bytes = hash.as_slice();
144                let row = sqlx::query(
145                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
146                )
147                .bind(hash_bytes)
148                .fetch_optional(&self.pool)
149                .await?;
150                Ok(row.map(|r| {
151                    (
152                        from_i64(r.get::<i64, _>("block_number")),
153                        from_i64(r.get::<i64, _>("tx_index")),
154                    )
155                }))
156            }
157            ReceiptSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))),
158        }
159    }
160
161    // ========================================================================
162    // Read helpers
163    // ========================================================================
164
165    async fn fetch_header_by_number(
166        &self,
167        block_num: BlockNumber,
168    ) -> Result<Option<Header>, SqlColdError> {
169        let bn = to_i64(block_num);
170        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
171            .bind(bn)
172            .fetch_optional(&self.pool)
173            .await?;
174
175        row.map(|r| {
176            HeaderRow {
177                block_number: r.get("block_number"),
178                block_hash: r.get("block_hash"),
179                parent_hash: r.get("parent_hash"),
180                ommers_hash: r.get("ommers_hash"),
181                beneficiary: r.get("beneficiary"),
182                state_root: r.get("state_root"),
183                transactions_root: r.get("transactions_root"),
184                receipts_root: r.get("receipts_root"),
185                logs_bloom: r.get("logs_bloom"),
186                difficulty: r.get("difficulty"),
187                gas_limit: r.get("gas_limit"),
188                gas_used: r.get("gas_used"),
189                timestamp: r.get("timestamp"),
190                extra_data: r.get("extra_data"),
191                mix_hash: r.get("mix_hash"),
192                nonce: r.get("nonce"),
193                base_fee_per_gas: r.get("base_fee_per_gas"),
194                withdrawals_root: r.get("withdrawals_root"),
195                blob_gas_used: r.get("blob_gas_used"),
196                excess_blob_gas: r.get("excess_blob_gas"),
197                parent_beacon_block_root: r.get("parent_beacon_block_root"),
198                requests_hash: r.get("requests_hash"),
199            }
200            .into_header()
201        })
202        .transpose()
203    }
204
205    async fn fetch_block_hash(
206        &self,
207        block_num: BlockNumber,
208    ) -> Result<Option<alloy::primitives::B256>, SqlColdError> {
209        let bn = to_i64(block_num);
210        let row = sqlx::query("SELECT block_hash FROM headers WHERE block_number = $1")
211            .bind(bn)
212            .fetch_optional(&self.pool)
213            .await?;
214        Ok(row.map(|r| {
215            let bytes: Vec<u8> = r.get("block_hash");
216            alloy::primitives::B256::from_slice(&bytes)
217        }))
218    }
219
220    async fn fetch_tx_by_location(
221        &self,
222        block: BlockNumber,
223        index: u64,
224    ) -> Result<Option<TransactionSigned>, SqlColdError> {
225        let bn = to_i64(block);
226        let idx = to_i64(index);
227        let row =
228            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 AND tx_index = $2")
229                .bind(bn)
230                .bind(idx)
231                .fetch_optional(&self.pool)
232                .await?;
233
234        row.map(|r| row_to_tx_row(&r).into_tx()).transpose()
235    }
236
237    async fn fetch_receipt_by_location(
238        &self,
239        block: BlockNumber,
240        index: u64,
241    ) -> Result<Option<Receipt>, SqlColdError> {
242        let bn = to_i64(block);
243        let idx = to_i64(index);
244
245        let receipt_row =
246            sqlx::query("SELECT * FROM receipts WHERE block_number = $1 AND tx_index = $2")
247                .bind(bn)
248                .bind(idx)
249                .fetch_optional(&self.pool)
250                .await?;
251
252        let Some(rr) = receipt_row else {
253            return Ok(None);
254        };
255
256        let receipt = ReceiptRow {
257            block_number: rr.get("block_number"),
258            tx_index: rr.get("tx_index"),
259            tx_type: rr.get::<i32, _>("tx_type") as i16,
260            success: rr.get::<i32, _>("success") != 0,
261            cumulative_gas_used: rr.get("cumulative_gas_used"),
262        };
263
264        let log_rows = sqlx::query(
265            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
266        )
267        .bind(bn)
268        .bind(idx)
269        .fetch_all(&self.pool)
270        .await?;
271
272        let logs = log_rows
273            .into_iter()
274            .map(|r| LogRow {
275                block_number: r.get("block_number"),
276                tx_index: r.get("tx_index"),
277                log_index: r.get("log_index"),
278                address: r.get("address"),
279                topic0: r.get("topic0"),
280                topic1: r.get("topic1"),
281                topic2: r.get("topic2"),
282                topic3: r.get("topic3"),
283                data: r.get("data"),
284            })
285            .collect();
286
287        receipt_from_rows(receipt, logs).map(Some)
288    }
289
290    // ========================================================================
291    // Write helpers
292    // ========================================================================
293
294    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
295        let mut tx = self.pool.begin().await?;
296        let block = data.block_number();
297        let bn = to_i64(block);
298
299        // Insert header
300        let hr = HeaderRow::from_header(&data.header);
301        sqlx::query(
302            "INSERT INTO headers (
303                block_number, block_hash, parent_hash, ommers_hash, beneficiary,
304                state_root, transactions_root, receipts_root, logs_bloom, difficulty,
305                gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
306                base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
307                parent_beacon_block_root, requests_hash
308            ) VALUES (
309                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
310                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
311            )",
312        )
313        .bind(hr.block_number)
314        .bind(&hr.block_hash)
315        .bind(&hr.parent_hash)
316        .bind(&hr.ommers_hash)
317        .bind(&hr.beneficiary)
318        .bind(&hr.state_root)
319        .bind(&hr.transactions_root)
320        .bind(&hr.receipts_root)
321        .bind(&hr.logs_bloom)
322        .bind(&hr.difficulty)
323        .bind(hr.gas_limit)
324        .bind(hr.gas_used)
325        .bind(hr.timestamp)
326        .bind(&hr.extra_data)
327        .bind(&hr.mix_hash)
328        .bind(&hr.nonce)
329        .bind(hr.base_fee_per_gas)
330        .bind(&hr.withdrawals_root)
331        .bind(hr.blob_gas_used)
332        .bind(hr.excess_blob_gas)
333        .bind(&hr.parent_beacon_block_root)
334        .bind(&hr.requests_hash)
335        .execute(&mut *tx)
336        .await?;
337
338        // Insert transactions
339        for (idx, tx_signed) in data.transactions.iter().enumerate() {
340            let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64))?;
341            sqlx::query(
342                "INSERT INTO transactions (
343                    block_number, tx_index, tx_hash, tx_type,
344                    sig_y_parity, sig_r, sig_s,
345                    chain_id, nonce, gas_limit, to_address, value, input,
346                    gas_price, max_fee_per_gas, max_priority_fee_per_gas,
347                    max_fee_per_blob_gas, blob_versioned_hashes,
348                    access_list, authorization_list
349                ) VALUES (
350                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
351                    $11, $12, $13, $14, $15, $16, $17, $18, $19, $20
352                )",
353            )
354            .bind(tr.block_number)
355            .bind(tr.tx_index)
356            .bind(&tr.tx_hash)
357            .bind(tr.tx_type as i32)
358            .bind(tr.sig_y_parity as i32)
359            .bind(&tr.sig_r)
360            .bind(&tr.sig_s)
361            .bind(tr.chain_id)
362            .bind(tr.nonce)
363            .bind(tr.gas_limit)
364            .bind(&tr.to_address)
365            .bind(&tr.value)
366            .bind(&tr.input)
367            .bind(&tr.gas_price)
368            .bind(&tr.max_fee_per_gas)
369            .bind(&tr.max_priority_fee_per_gas)
370            .bind(&tr.max_fee_per_blob_gas)
371            .bind(&tr.blob_versioned_hashes)
372            .bind(&tr.access_list)
373            .bind(&tr.authorization_list)
374            .execute(&mut *tx)
375            .await?;
376        }
377
378        // Insert receipts and logs
379        for (idx, receipt) in data.receipts.iter().enumerate() {
380            let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
381            sqlx::query(
382                "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
383                 VALUES ($1, $2, $3, $4, $5)",
384            )
385            .bind(rr.block_number)
386            .bind(rr.tx_index)
387            .bind(rr.tx_type as i32)
388            .bind(rr.success as i32)
389            .bind(rr.cumulative_gas_used)
390            .execute(&mut *tx)
391            .await?;
392
393            for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
394                let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
395                sqlx::query(
396                    "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
397                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
398                )
399                .bind(lr.block_number)
400                .bind(lr.tx_index)
401                .bind(lr.log_index)
402                .bind(&lr.address)
403                .bind(&lr.topic0)
404                .bind(&lr.topic1)
405                .bind(&lr.topic2)
406                .bind(&lr.topic3)
407                .bind(&lr.data)
408                .execute(&mut *tx)
409                .await?;
410            }
411        }
412
413        // Insert signet events
414        for (idx, event) in data.signet_events.iter().enumerate() {
415            let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
416            sqlx::query(
417                "INSERT INTO signet_events (
418                    block_number, event_index, event_type, order_index,
419                    rollup_chain_id, sender, to_address, value, gas,
420                    max_fee_per_gas, data, rollup_recipient, amount, token
421                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
422            )
423            .bind(er.block_number)
424            .bind(er.event_index)
425            .bind(er.event_type as i32)
426            .bind(er.order_index)
427            .bind(&er.rollup_chain_id)
428            .bind(&er.sender)
429            .bind(&er.to_address)
430            .bind(&er.value)
431            .bind(&er.gas)
432            .bind(&er.max_fee_per_gas)
433            .bind(&er.data)
434            .bind(&er.rollup_recipient)
435            .bind(&er.amount)
436            .bind(&er.token)
437            .execute(&mut *tx)
438            .await?;
439        }
440
441        // Insert zenith header
442        if let Some(zh) = &data.zenith_header {
443            let zr = ZenithHeaderRow::from_zenith(zh, bn);
444            sqlx::query(
445                "INSERT INTO zenith_headers (
446                    block_number, host_block_number, rollup_chain_id,
447                    gas_limit, reward_address, block_data_hash
448                ) VALUES ($1, $2, $3, $4, $5, $6)",
449            )
450            .bind(zr.block_number)
451            .bind(&zr.host_block_number)
452            .bind(&zr.rollup_chain_id)
453            .bind(&zr.gas_limit)
454            .bind(&zr.reward_address)
455            .bind(&zr.block_data_hash)
456            .execute(&mut *tx)
457            .await?;
458        }
459
460        // Update metadata
461        let current_latest: Option<i64> =
462            sqlx::query("SELECT block_number FROM metadata WHERE key = 'latest_block'")
463                .fetch_optional(&mut *tx)
464                .await?
465                .map(|r| r.get("block_number"));
466
467        let new_latest = current_latest.map_or(bn, |prev| prev.max(bn));
468        sqlx::query(
469            "INSERT INTO metadata (key, block_number) VALUES ('latest_block', $1)
470             ON CONFLICT(key) DO UPDATE SET block_number = $1",
471        )
472        .bind(new_latest)
473        .execute(&mut *tx)
474        .await?;
475
476        let current_earliest: Option<i64> =
477            sqlx::query("SELECT block_number FROM metadata WHERE key = 'earliest_block'")
478                .fetch_optional(&mut *tx)
479                .await?
480                .map(|r| r.get("block_number"));
481
482        let new_earliest = current_earliest.map_or(bn, |prev| prev.min(bn));
483        sqlx::query(
484            "INSERT INTO metadata (key, block_number) VALUES ('earliest_block', $1)
485             ON CONFLICT(key) DO UPDATE SET block_number = $1",
486        )
487        .bind(new_earliest)
488        .execute(&mut *tx)
489        .await?;
490
491        tx.commit().await?;
492        Ok(())
493    }
494}
495
496/// Convert a sqlx row to a TxRow.
497fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
498    TxRow {
499        block_number: r.get("block_number"),
500        tx_index: r.get("tx_index"),
501        tx_hash: r.get("tx_hash"),
502        tx_type: r.get::<i32, _>("tx_type") as i16,
503        sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
504        sig_r: r.get("sig_r"),
505        sig_s: r.get("sig_s"),
506        chain_id: r.get("chain_id"),
507        nonce: r.get("nonce"),
508        gas_limit: r.get("gas_limit"),
509        to_address: r.get("to_address"),
510        value: r.get("value"),
511        input: r.get("input"),
512        gas_price: r.get("gas_price"),
513        max_fee_per_gas: r.get("max_fee_per_gas"),
514        max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
515        max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
516        blob_versioned_hashes: r.get("blob_versioned_hashes"),
517        access_list: r.get("access_list"),
518        authorization_list: r.get("authorization_list"),
519    }
520}
521
522fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
523    SignetEventRow {
524        block_number: r.get("block_number"),
525        event_index: r.get("event_index"),
526        event_type: r.get::<i32, _>("event_type") as i16,
527        order_index: r.get("order_index"),
528        rollup_chain_id: r.get("rollup_chain_id"),
529        sender: r.get("sender"),
530        to_address: r.get("to_address"),
531        value: r.get("value"),
532        gas: r.get("gas"),
533        max_fee_per_gas: r.get("max_fee_per_gas"),
534        data: r.get("data"),
535        rollup_recipient: r.get("rollup_recipient"),
536        amount: r.get("amount"),
537        token: r.get("token"),
538    }
539}
540
541fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
542    ZenithHeaderRow {
543        block_number: r.get("block_number"),
544        host_block_number: r.get("host_block_number"),
545        rollup_chain_id: r.get("rollup_chain_id"),
546        gas_limit: r.get("gas_limit"),
547        reward_address: r.get("reward_address"),
548        block_data_hash: r.get("block_data_hash"),
549    }
550}
551
552impl ColdStorage for SqlColdBackend {
553    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<Header>> {
554        let Some(block_num) = self.resolve_header_spec(spec).await? else {
555            return Ok(None);
556        };
557        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
558    }
559
560    async fn get_headers(&self, specs: Vec<HeaderSpecifier>) -> ColdResult<Vec<Option<Header>>> {
561        let mut results = Vec::with_capacity(specs.len());
562        for spec in specs {
563            let header = self.get_header(spec).await?;
564            results.push(header);
565        }
566        Ok(results)
567    }
568
569    async fn get_transaction(
570        &self,
571        spec: TransactionSpecifier,
572    ) -> ColdResult<Option<Confirmed<TransactionSigned>>> {
573        let Some((block, index)) = self.resolve_tx_spec(spec).await? else {
574            return Ok(None);
575        };
576        let Some(tx) = self.fetch_tx_by_location(block, index).await? else {
577            return Ok(None);
578        };
579        let Some(block_hash) = self.fetch_block_hash(block).await? else {
580            return Ok(None);
581        };
582        let meta = ConfirmationMeta::new(block, block_hash, index);
583        Ok(Some(Confirmed::new(tx, meta)))
584    }
585
586    async fn get_transactions_in_block(
587        &self,
588        block: BlockNumber,
589    ) -> ColdResult<Vec<TransactionSigned>> {
590        let bn = to_i64(block);
591        let rows =
592            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
593                .bind(bn)
594                .fetch_all(&self.pool)
595                .await
596                .map_err(SqlColdError::from)?;
597
598        rows.into_iter()
599            .map(|r| row_to_tx_row(&r).into_tx().map_err(ColdStorageError::from))
600            .collect()
601    }
602
603    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
604        let bn = to_i64(block);
605        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
606            .bind(bn)
607            .fetch_one(&self.pool)
608            .await
609            .map_err(SqlColdError::from)?;
610
611        Ok(from_i64(row.get::<i64, _>("cnt")))
612    }
613
614    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<Confirmed<Receipt>>> {
615        let Some((block, index)) = self.resolve_receipt_spec(spec).await? else {
616            return Ok(None);
617        };
618        let Some(receipt) = self.fetch_receipt_by_location(block, index).await? else {
619            return Ok(None);
620        };
621        let Some(block_hash) = self.fetch_block_hash(block).await? else {
622            return Ok(None);
623        };
624        let meta = ConfirmationMeta::new(block, block_hash, index);
625        Ok(Some(Confirmed::new(receipt, meta)))
626    }
627
628    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<Receipt>> {
629        let bn = to_i64(block);
630
631        let receipt_rows =
632            sqlx::query("SELECT * FROM receipts WHERE block_number = $1 ORDER BY tx_index")
633                .bind(bn)
634                .fetch_all(&self.pool)
635                .await
636                .map_err(SqlColdError::from)?;
637
638        let mut receipts = Vec::with_capacity(receipt_rows.len());
639        for rr in receipt_rows {
640            let tx_idx: i64 = rr.get("tx_index");
641            let receipt = ReceiptRow {
642                block_number: rr.get("block_number"),
643                tx_index: tx_idx,
644                tx_type: rr.get::<i32, _>("tx_type") as i16,
645                success: rr.get::<i32, _>("success") != 0,
646                cumulative_gas_used: rr.get("cumulative_gas_used"),
647            };
648
649            let log_rows = sqlx::query(
650                "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
651            )
652            .bind(bn)
653            .bind(tx_idx)
654            .fetch_all(&self.pool)
655            .await
656            .map_err(SqlColdError::from)?;
657
658            let logs = log_rows
659                .into_iter()
660                .map(|r| LogRow {
661                    block_number: r.get("block_number"),
662                    tx_index: r.get("tx_index"),
663                    log_index: r.get("log_index"),
664                    address: r.get("address"),
665                    topic0: r.get("topic0"),
666                    topic1: r.get("topic1"),
667                    topic2: r.get("topic2"),
668                    topic3: r.get("topic3"),
669                    data: r.get("data"),
670                })
671                .collect();
672
673            receipts.push(receipt_from_rows(receipt, logs)?);
674        }
675
676        Ok(receipts)
677    }
678
679    async fn get_signet_events(
680        &self,
681        spec: SignetEventsSpecifier,
682    ) -> ColdResult<Vec<DbSignetEvent>> {
683        let rows = match spec {
684            SignetEventsSpecifier::Block(block) => {
685                let bn = to_i64(block);
686                sqlx::query(
687                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
688                )
689                .bind(bn)
690                .fetch_all(&self.pool)
691                .await
692                .map_err(SqlColdError::from)?
693            }
694            SignetEventsSpecifier::BlockRange { start, end } => {
695                let s = to_i64(start);
696                let e = to_i64(end);
697                sqlx::query(
698                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
699                     ORDER BY block_number, event_index",
700                )
701                .bind(s)
702                .bind(e)
703                .fetch_all(&self.pool)
704                .await
705                .map_err(SqlColdError::from)?
706            }
707        };
708
709        rows.into_iter()
710            .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
711            .collect()
712    }
713
714    async fn get_zenith_header(
715        &self,
716        spec: ZenithHeaderSpecifier,
717    ) -> ColdResult<Option<DbZenithHeader>> {
718        let block = match spec {
719            ZenithHeaderSpecifier::Number(n) => n,
720            ZenithHeaderSpecifier::Range { start, .. } => start,
721        };
722        let bn = to_i64(block);
723        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
724            .bind(bn)
725            .fetch_optional(&self.pool)
726            .await
727            .map_err(SqlColdError::from)?;
728
729        row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
730            .transpose()
731    }
732
733    async fn get_zenith_headers(
734        &self,
735        spec: ZenithHeaderSpecifier,
736    ) -> ColdResult<Vec<DbZenithHeader>> {
737        let rows = match spec {
738            ZenithHeaderSpecifier::Number(n) => {
739                let bn = to_i64(n);
740                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
741                    .bind(bn)
742                    .fetch_all(&self.pool)
743                    .await
744                    .map_err(SqlColdError::from)?
745            }
746            ZenithHeaderSpecifier::Range { start, end } => {
747                let s = to_i64(start);
748                let e = to_i64(end);
749                sqlx::query(
750                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
751                     ORDER BY block_number",
752                )
753                .bind(s)
754                .bind(e)
755                .fetch_all(&self.pool)
756                .await
757                .map_err(SqlColdError::from)?
758            }
759        };
760
761        rows.into_iter()
762            .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
763            .collect()
764    }
765
766    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
767        let row = sqlx::query("SELECT block_number FROM metadata WHERE key = $1")
768            .bind("latest_block")
769            .fetch_optional(&self.pool)
770            .await
771            .map_err(SqlColdError::from)?;
772        Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
773    }
774
775    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
776        self.insert_block(data).await.map_err(ColdStorageError::from)
777    }
778
779    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
780        for block_data in data {
781            self.insert_block(block_data).await?;
782        }
783        Ok(())
784    }
785
786    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
787        let bn = to_i64(block);
788        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
789
790        sqlx::query("DELETE FROM logs WHERE block_number > $1")
791            .bind(bn)
792            .execute(&mut *tx)
793            .await
794            .map_err(SqlColdError::from)?;
795        sqlx::query("DELETE FROM transactions WHERE block_number > $1")
796            .bind(bn)
797            .execute(&mut *tx)
798            .await
799            .map_err(SqlColdError::from)?;
800        sqlx::query("DELETE FROM receipts WHERE block_number > $1")
801            .bind(bn)
802            .execute(&mut *tx)
803            .await
804            .map_err(SqlColdError::from)?;
805        sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
806            .bind(bn)
807            .execute(&mut *tx)
808            .await
809            .map_err(SqlColdError::from)?;
810        sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
811            .bind(bn)
812            .execute(&mut *tx)
813            .await
814            .map_err(SqlColdError::from)?;
815        sqlx::query("DELETE FROM headers WHERE block_number > $1")
816            .bind(bn)
817            .execute(&mut *tx)
818            .await
819            .map_err(SqlColdError::from)?;
820
821        // Update latest block metadata
822        let new_latest: Option<i64> =
823            sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
824                .fetch_one(&mut *tx)
825                .await
826                .map_err(SqlColdError::from)?
827                .get("max_bn");
828
829        match new_latest {
830            Some(latest) => {
831                sqlx::query(
832                    "INSERT INTO metadata (key, block_number) VALUES ('latest_block', $1)
833                     ON CONFLICT(key) DO UPDATE SET block_number = $1",
834                )
835                .bind(latest)
836                .execute(&mut *tx)
837                .await
838                .map_err(SqlColdError::from)?;
839            }
840            None => {
841                sqlx::query("DELETE FROM metadata WHERE key = 'latest_block'")
842                    .execute(&mut *tx)
843                    .await
844                    .map_err(SqlColdError::from)?;
845            }
846        }
847
848        tx.commit().await.map_err(SqlColdError::from)?;
849        Ok(())
850    }
851}
852
853#[cfg(all(test, feature = "test-utils"))]
854mod tests {
855    use super::*;
856    use signet_cold::conformance::conformance;
857
858    #[tokio::test]
859    async fn sqlite_conformance() {
860        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
861        conformance(&backend).await.unwrap();
862    }
863
864    #[tokio::test]
865    async fn pg_conformance() {
866        let Ok(url) = std::env::var("DATABASE_URL") else {
867            eprintln!("skipping pg conformance: DATABASE_URL not set");
868            return;
869        };
870        let backend = SqlColdBackend::connect(&url).await.unwrap();
871        conformance(&backend).await.unwrap();
872    }
873}