Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::convert::{
9    HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10    receipt_from_rows, to_i64,
11};
12use alloy::{
13    consensus::transaction::Recovered,
14    primitives::{BlockNumber, Sealable},
15};
16use signet_cold::{
17    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
18    HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
19    ZenithHeaderSpecifier,
20};
21use signet_storage_types::{
22    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
23    TransactionSigned,
24};
25use sqlx::{AnyPool, Row};
26
27/// SQL-based cold storage backend.
28///
29/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
30/// PostgreSQL and SQLite through a single implementation. The backend
31/// is determined by the connection URL at construction time.
32///
33/// # Example
34///
35/// ```no_run
36/// # async fn example() {
37/// use signet_cold_sql::SqlColdBackend;
38///
39/// // SQLite (in-memory)
40/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
41///
42/// // PostgreSQL
43/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
44/// # }
45/// ```
46#[derive(Debug, Clone)]
47pub struct SqlColdBackend {
48    pool: AnyPool,
49}
50
51impl SqlColdBackend {
52    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
53    ///
54    /// Auto-detects the database backend and creates all tables if they
55    /// do not already exist. Callers must ensure
56    /// [`sqlx::any::install_default_drivers`] has been called before
57    /// constructing the pool.
58    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
59        // Detect backend from a pooled connection.
60        let conn = pool.acquire().await?;
61        let backend = conn.backend_name().to_owned();
62        drop(conn);
63
64        let migration = match backend.as_str() {
65            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
66            "SQLite" => include_str!("../migrations/001_initial.sql"),
67            other => {
68                return Err(SqlColdError::Convert(format!(
69                    "unsupported database backend: {other}"
70                )));
71            }
72        };
73        // Execute via pool to ensure the migration uses the same
74        // connection that subsequent queries will use.
75        sqlx::raw_sql(migration).execute(&pool).await?;
76        Ok(Self { pool })
77    }
78
79    /// Connect to a database URL and create the backend.
80    ///
81    /// Installs the default sqlx drivers on the first call. The database
82    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
83    ///
84    /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
85    /// limited to one connection to ensure all operations share the same
86    /// database.
87    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
88        sqlx::any::install_default_drivers();
89        let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
90        Self::new(pool).await
91    }
92
93    // ========================================================================
94    // Specifier resolution
95    // ========================================================================
96
97    async fn resolve_header_spec(
98        &self,
99        spec: HeaderSpecifier,
100    ) -> Result<Option<BlockNumber>, SqlColdError> {
101        match spec {
102            HeaderSpecifier::Number(n) => Ok(Some(n)),
103            HeaderSpecifier::Hash(hash) => {
104                let hash_bytes = hash.as_slice();
105                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
106                    .bind(hash_bytes)
107                    .fetch_optional(&self.pool)
108                    .await?;
109                Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
110            }
111        }
112    }
113
114    // ========================================================================
115    // Read helpers
116    // ========================================================================
117
118    async fn fetch_header_by_number(
119        &self,
120        block_num: BlockNumber,
121    ) -> Result<Option<SealedHeader>, SqlColdError> {
122        let bn = to_i64(block_num);
123        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
124            .bind(bn)
125            .fetch_optional(&self.pool)
126            .await?;
127
128        row.map(|r| {
129            let header = HeaderRow {
130                block_number: r.get("block_number"),
131                block_hash: r.get("block_hash"),
132                parent_hash: r.get("parent_hash"),
133                ommers_hash: r.get("ommers_hash"),
134                beneficiary: r.get("beneficiary"),
135                state_root: r.get("state_root"),
136                transactions_root: r.get("transactions_root"),
137                receipts_root: r.get("receipts_root"),
138                logs_bloom: r.get("logs_bloom"),
139                difficulty: r.get("difficulty"),
140                gas_limit: r.get("gas_limit"),
141                gas_used: r.get("gas_used"),
142                timestamp: r.get("timestamp"),
143                extra_data: r.get("extra_data"),
144                mix_hash: r.get("mix_hash"),
145                nonce: r.get("nonce"),
146                base_fee_per_gas: r.get("base_fee_per_gas"),
147                withdrawals_root: r.get("withdrawals_root"),
148                blob_gas_used: r.get("blob_gas_used"),
149                excess_blob_gas: r.get("excess_blob_gas"),
150                parent_beacon_block_root: r.get("parent_beacon_block_root"),
151                requests_hash: r.get("requests_hash"),
152            }
153            .into_header()?;
154            Ok(header.seal_slow())
155        })
156        .transpose()
157    }
158
159    // ========================================================================
160    // Write helpers
161    // ========================================================================
162
163    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
164        let mut tx = self.pool.begin().await?;
165        write_block_to_tx(&mut tx, data).await?;
166        tx.commit().await?;
167        Ok(())
168    }
169}
170
171/// Write a single block's data into an open SQL transaction.
172async fn write_block_to_tx(
173    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
174    data: BlockData,
175) -> Result<(), SqlColdError> {
176    let block = data.block_number();
177    let bn = to_i64(block);
178
179    // Insert header
180    let hr = HeaderRow::from_header(&data.header);
181    sqlx::query(
182        "INSERT INTO headers (
183            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
184            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
185            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
186            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
187            parent_beacon_block_root, requests_hash
188        ) VALUES (
189            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
190            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
191        )",
192    )
193    .bind(hr.block_number)
194    .bind(&hr.block_hash)
195    .bind(&hr.parent_hash)
196    .bind(&hr.ommers_hash)
197    .bind(&hr.beneficiary)
198    .bind(&hr.state_root)
199    .bind(&hr.transactions_root)
200    .bind(&hr.receipts_root)
201    .bind(&hr.logs_bloom)
202    .bind(&hr.difficulty)
203    .bind(hr.gas_limit)
204    .bind(hr.gas_used)
205    .bind(hr.timestamp)
206    .bind(&hr.extra_data)
207    .bind(&hr.mix_hash)
208    .bind(&hr.nonce)
209    .bind(hr.base_fee_per_gas)
210    .bind(&hr.withdrawals_root)
211    .bind(hr.blob_gas_used)
212    .bind(hr.excess_blob_gas)
213    .bind(&hr.parent_beacon_block_root)
214    .bind(&hr.requests_hash)
215    .execute(&mut **tx)
216    .await?;
217
218    // Insert transactions
219    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
220        let sender = recovered_tx.signer();
221        let tx_signed: &TransactionSigned = recovered_tx;
222        let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64), &sender)?;
223        sqlx::query(
224            "INSERT INTO transactions (
225                block_number, tx_index, tx_hash, tx_type,
226                sig_y_parity, sig_r, sig_s,
227                chain_id, nonce, gas_limit, to_address, value, input,
228                gas_price, max_fee_per_gas, max_priority_fee_per_gas,
229                max_fee_per_blob_gas, blob_versioned_hashes,
230                access_list, authorization_list, from_address
231            ) VALUES (
232                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
233                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
234            )",
235        )
236        .bind(tr.block_number)
237        .bind(tr.tx_index)
238        .bind(&tr.tx_hash)
239        .bind(tr.tx_type as i32)
240        .bind(tr.sig_y_parity as i32)
241        .bind(&tr.sig_r)
242        .bind(&tr.sig_s)
243        .bind(tr.chain_id)
244        .bind(tr.nonce)
245        .bind(tr.gas_limit)
246        .bind(&tr.to_address)
247        .bind(&tr.value)
248        .bind(&tr.input)
249        .bind(&tr.gas_price)
250        .bind(&tr.max_fee_per_gas)
251        .bind(&tr.max_priority_fee_per_gas)
252        .bind(&tr.max_fee_per_blob_gas)
253        .bind(&tr.blob_versioned_hashes)
254        .bind(&tr.access_list)
255        .bind(&tr.authorization_list)
256        .bind(&tr.from_address)
257        .execute(&mut **tx)
258        .await?;
259    }
260
261    // Insert receipts and logs
262    for (idx, receipt) in data.receipts.iter().enumerate() {
263        let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
264        sqlx::query(
265            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
266             VALUES ($1, $2, $3, $4, $5)",
267        )
268        .bind(rr.block_number)
269        .bind(rr.tx_index)
270        .bind(rr.tx_type as i32)
271        .bind(rr.success as i32)
272        .bind(rr.cumulative_gas_used)
273        .execute(&mut **tx)
274        .await?;
275
276        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
277            let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
278            sqlx::query(
279                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
280                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
281            )
282            .bind(lr.block_number)
283            .bind(lr.tx_index)
284            .bind(lr.log_index)
285            .bind(&lr.address)
286            .bind(&lr.topic0)
287            .bind(&lr.topic1)
288            .bind(&lr.topic2)
289            .bind(&lr.topic3)
290            .bind(&lr.data)
291            .execute(&mut **tx)
292            .await?;
293        }
294    }
295
296    // Insert signet events
297    for (idx, event) in data.signet_events.iter().enumerate() {
298        let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
299        sqlx::query(
300            "INSERT INTO signet_events (
301                block_number, event_index, event_type, order_index,
302                rollup_chain_id, sender, to_address, value, gas,
303                max_fee_per_gas, data, rollup_recipient, amount, token
304            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
305        )
306        .bind(er.block_number)
307        .bind(er.event_index)
308        .bind(er.event_type as i32)
309        .bind(er.order_index)
310        .bind(&er.rollup_chain_id)
311        .bind(&er.sender)
312        .bind(&er.to_address)
313        .bind(&er.value)
314        .bind(&er.gas)
315        .bind(&er.max_fee_per_gas)
316        .bind(&er.data)
317        .bind(&er.rollup_recipient)
318        .bind(&er.amount)
319        .bind(&er.token)
320        .execute(&mut **tx)
321        .await?;
322    }
323
324    // Insert zenith header
325    if let Some(zh) = &data.zenith_header {
326        let zr = ZenithHeaderRow::from_zenith(zh, bn);
327        sqlx::query(
328            "INSERT INTO zenith_headers (
329                block_number, host_block_number, rollup_chain_id,
330                gas_limit, reward_address, block_data_hash
331            ) VALUES ($1, $2, $3, $4, $5, $6)",
332        )
333        .bind(zr.block_number)
334        .bind(&zr.host_block_number)
335        .bind(&zr.rollup_chain_id)
336        .bind(&zr.gas_limit)
337        .bind(&zr.reward_address)
338        .bind(&zr.block_data_hash)
339        .execute(&mut **tx)
340        .await?;
341    }
342
343    Ok(())
344}
345
346/// Convert a sqlx row to a TxRow.
347fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
348    TxRow {
349        block_number: r.get("block_number"),
350        tx_index: r.get("tx_index"),
351        tx_hash: r.get("tx_hash"),
352        tx_type: r.get::<i32, _>("tx_type") as i16,
353        sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
354        sig_r: r.get("sig_r"),
355        sig_s: r.get("sig_s"),
356        chain_id: r.get("chain_id"),
357        nonce: r.get("nonce"),
358        gas_limit: r.get("gas_limit"),
359        to_address: r.get("to_address"),
360        value: r.get("value"),
361        input: r.get("input"),
362        gas_price: r.get("gas_price"),
363        max_fee_per_gas: r.get("max_fee_per_gas"),
364        max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
365        max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
366        blob_versioned_hashes: r.get("blob_versioned_hashes"),
367        access_list: r.get("access_list"),
368        authorization_list: r.get("authorization_list"),
369        from_address: r.get("from_address"),
370    }
371}
372
373fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
374    SignetEventRow {
375        block_number: r.get("block_number"),
376        event_index: r.get("event_index"),
377        event_type: r.get::<i32, _>("event_type") as i16,
378        order_index: r.get("order_index"),
379        rollup_chain_id: r.get("rollup_chain_id"),
380        sender: r.get("sender"),
381        to_address: r.get("to_address"),
382        value: r.get("value"),
383        gas: r.get("gas"),
384        max_fee_per_gas: r.get("max_fee_per_gas"),
385        data: r.get("data"),
386        rollup_recipient: r.get("rollup_recipient"),
387        amount: r.get("amount"),
388        token: r.get("token"),
389    }
390}
391
392fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
393    LogRow {
394        block_number: r.get("block_number"),
395        tx_index: r.get("tx_index"),
396        log_index: r.get("log_index"),
397        address: r.get("address"),
398        topic0: r.get("topic0"),
399        topic1: r.get("topic1"),
400        topic2: r.get("topic2"),
401        topic3: r.get("topic3"),
402        data: r.get("data"),
403    }
404}
405
406fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
407    ZenithHeaderRow {
408        block_number: r.get("block_number"),
409        host_block_number: r.get("host_block_number"),
410        rollup_chain_id: r.get("rollup_chain_id"),
411        gas_limit: r.get("gas_limit"),
412        reward_address: r.get("reward_address"),
413        block_data_hash: r.get("block_data_hash"),
414    }
415}
416
417impl ColdStorage for SqlColdBackend {
418    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
419        let Some(block_num) = self.resolve_header_spec(spec).await? else {
420            return Ok(None);
421        };
422        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
423    }
424
425    async fn get_headers(
426        &self,
427        specs: Vec<HeaderSpecifier>,
428    ) -> ColdResult<Vec<Option<SealedHeader>>> {
429        let mut results = Vec::with_capacity(specs.len());
430        for spec in specs {
431            let header = self.get_header(spec).await?;
432            results.push(header);
433        }
434        Ok(results)
435    }
436
437    async fn get_transaction(
438        &self,
439        spec: TransactionSpecifier,
440    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
441        let row = match spec {
442            TransactionSpecifier::Hash(hash) => sqlx::query(
443                "SELECT t.*, h.block_hash
444                     FROM transactions t
445                     JOIN headers h ON t.block_number = h.block_number
446                     WHERE t.tx_hash = $1",
447            )
448            .bind(hash.as_slice())
449            .fetch_optional(&self.pool)
450            .await
451            .map_err(SqlColdError::from)?,
452            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
453                "SELECT t.*, h.block_hash
454                     FROM transactions t
455                     JOIN headers h ON t.block_number = h.block_number
456                     WHERE t.block_number = $1 AND t.tx_index = $2",
457            )
458            .bind(to_i64(block))
459            .bind(to_i64(index))
460            .fetch_optional(&self.pool)
461            .await
462            .map_err(SqlColdError::from)?,
463            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
464                "SELECT t.*, h.block_hash
465                     FROM transactions t
466                     JOIN headers h ON t.block_number = h.block_number
467                     WHERE h.block_hash = $1 AND t.tx_index = $2",
468            )
469            .bind(block_hash.as_slice())
470            .bind(to_i64(index))
471            .fetch_optional(&self.pool)
472            .await
473            .map_err(SqlColdError::from)?,
474        };
475
476        let Some(r) = row else {
477            return Ok(None);
478        };
479
480        let tx_row = row_to_tx_row(&r);
481        let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
482        let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
483        let block = from_i64(r.get::<i64, _>("block_number"));
484        let index = from_i64(r.get::<i64, _>("tx_index"));
485        let hash_bytes: Vec<u8> = r.get("block_hash");
486        let block_hash = alloy::primitives::B256::from_slice(&hash_bytes);
487        let meta = ConfirmationMeta::new(block, block_hash, index);
488        // SAFETY: the sender was recovered at append time and stored in from_address.
489        let recovered = Recovered::new_unchecked(tx, sender);
490        Ok(Some(Confirmed::new(recovered, meta)))
491    }
492
493    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
494        let bn = to_i64(block);
495        let rows =
496            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
497                .bind(bn)
498                .fetch_all(&self.pool)
499                .await
500                .map_err(SqlColdError::from)?;
501
502        rows.into_iter()
503            .map(|r| {
504                let tx_row = row_to_tx_row(&r);
505                let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
506                let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
507                // SAFETY: the sender was recovered at append time.
508                Ok(Recovered::new_unchecked(tx, sender))
509            })
510            .collect()
511    }
512
513    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
514        let bn = to_i64(block);
515        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
516            .bind(bn)
517            .fetch_one(&self.pool)
518            .await
519            .map_err(SqlColdError::from)?;
520
521        Ok(from_i64(row.get::<i64, _>("cnt")))
522    }
523
524    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
525        // Resolve to (block, index)
526        let (block, index) = match spec {
527            ReceiptSpecifier::TxHash(hash) => {
528                let row = sqlx::query(
529                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
530                )
531                .bind(hash.as_slice())
532                .fetch_optional(&self.pool)
533                .await
534                .map_err(SqlColdError::from)?;
535                let Some(r) = row else { return Ok(None) };
536                (from_i64(r.get::<i64, _>("block_number")), from_i64(r.get::<i64, _>("tx_index")))
537            }
538            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
539        };
540
541        let Some(header) = self.fetch_header_by_number(block).await? else {
542            return Ok(None);
543        };
544
545        // Fetch receipt + tx_hash + from_address
546        let receipt_row = sqlx::query(
547            "SELECT r.*, t.tx_hash, t.from_address
548             FROM receipts r
549             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
550             WHERE r.block_number = $1 AND r.tx_index = $2",
551        )
552        .bind(to_i64(block))
553        .bind(to_i64(index))
554        .fetch_optional(&self.pool)
555        .await
556        .map_err(SqlColdError::from)?;
557
558        let Some(rr) = receipt_row else {
559            return Ok(None);
560        };
561
562        let bn: i64 = rr.get("block_number");
563        let tx_idx: i64 = rr.get("tx_index");
564        let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
565        let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
566        let from_bytes: Vec<u8> = rr.get("from_address");
567        let sender = alloy::primitives::Address::from_slice(&from_bytes);
568
569        let receipt = ReceiptRow {
570            block_number: bn,
571            tx_index: tx_idx,
572            tx_type: rr.get::<i32, _>("tx_type") as i16,
573            success: rr.get::<i32, _>("success") != 0,
574            cumulative_gas_used: rr.get("cumulative_gas_used"),
575        };
576
577        let log_rows = sqlx::query(
578            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
579        )
580        .bind(bn)
581        .bind(tx_idx)
582        .fetch_all(&self.pool)
583        .await
584        .map_err(SqlColdError::from)?;
585
586        let logs: Vec<_> = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
587
588        let built = receipt_from_rows(receipt, logs).map_err(ColdStorageError::from)?;
589
590        // Compute gas_used and first_log_index by querying prior receipts
591        let prior = sqlx::query(
592            "SELECT CAST(SUM(
593                (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
594             ) AS bigint) as log_count,
595             CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
596             FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
597        )
598        .bind(to_i64(block))
599        .bind(to_i64(index))
600        .fetch_one(&self.pool)
601        .await
602        .map_err(SqlColdError::from)?;
603
604        let first_log_index: u64 = prior.get::<Option<i64>, _>("log_count").unwrap_or(0) as u64;
605        let prior_cumulative_gas: u64 =
606            prior.get::<Option<i64>, _>("prior_gas").unwrap_or(0) as u64;
607        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
608
609        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
610        Ok(Some(ColdReceipt::new(ir, &header, index)))
611    }
612
613    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
614        let Some(header) =
615            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
616        else {
617            return Ok(Vec::new());
618        };
619
620        let bn = to_i64(block);
621
622        // Fetch receipts joined with tx_hash and from_address
623        let receipt_rows = sqlx::query(
624            "SELECT r.*, t.tx_hash, t.from_address
625             FROM receipts r
626             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
627             WHERE r.block_number = $1
628             ORDER BY r.tx_index",
629        )
630        .bind(bn)
631        .fetch_all(&self.pool)
632        .await
633        .map_err(SqlColdError::from)?;
634
635        let all_log_rows =
636            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
637                .bind(bn)
638                .fetch_all(&self.pool)
639                .await
640                .map_err(SqlColdError::from)?;
641
642        // Group logs by tx_index
643        let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
644            std::collections::BTreeMap::new();
645        for r in all_log_rows {
646            logs_by_tx.entry(r.get::<i64, _>("tx_index")).or_default().push(row_to_log_row(&r));
647        }
648
649        let mut first_log_index = 0u64;
650        let mut prior_cumulative_gas = 0u64;
651        receipt_rows
652            .into_iter()
653            .enumerate()
654            .map(|(idx, rr)| {
655                let tx_idx: i64 = rr.get("tx_index");
656                let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
657                let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
658                let from_bytes: Vec<u8> = rr.get("from_address");
659                let sender = alloy::primitives::Address::from_slice(&from_bytes);
660                let receipt_row = ReceiptRow {
661                    block_number: rr.get("block_number"),
662                    tx_index: tx_idx,
663                    tx_type: rr.get::<i32, _>("tx_type") as i16,
664                    success: rr.get::<i32, _>("success") != 0,
665                    cumulative_gas_used: rr.get("cumulative_gas_used"),
666                };
667                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
668                let receipt =
669                    receipt_from_rows(receipt_row, logs).map_err(ColdStorageError::from)?;
670                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
671                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
672                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
673                first_log_index += ir.receipt.inner.logs.len() as u64;
674                Ok(ColdReceipt::new(ir, &header, idx as u64))
675            })
676            .collect()
677    }
678
679    async fn get_signet_events(
680        &self,
681        spec: SignetEventsSpecifier,
682    ) -> ColdResult<Vec<DbSignetEvent>> {
683        let rows = match spec {
684            SignetEventsSpecifier::Block(block) => {
685                let bn = to_i64(block);
686                sqlx::query(
687                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
688                )
689                .bind(bn)
690                .fetch_all(&self.pool)
691                .await
692                .map_err(SqlColdError::from)?
693            }
694            SignetEventsSpecifier::BlockRange { start, end } => {
695                let s = to_i64(start);
696                let e = to_i64(end);
697                sqlx::query(
698                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
699                     ORDER BY block_number, event_index",
700                )
701                .bind(s)
702                .bind(e)
703                .fetch_all(&self.pool)
704                .await
705                .map_err(SqlColdError::from)?
706            }
707        };
708
709        rows.into_iter()
710            .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
711            .collect()
712    }
713
714    async fn get_zenith_header(
715        &self,
716        spec: ZenithHeaderSpecifier,
717    ) -> ColdResult<Option<DbZenithHeader>> {
718        let block = match spec {
719            ZenithHeaderSpecifier::Number(n) => n,
720            ZenithHeaderSpecifier::Range { start, .. } => start,
721        };
722        let bn = to_i64(block);
723        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
724            .bind(bn)
725            .fetch_optional(&self.pool)
726            .await
727            .map_err(SqlColdError::from)?;
728
729        row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
730            .transpose()
731    }
732
733    async fn get_zenith_headers(
734        &self,
735        spec: ZenithHeaderSpecifier,
736    ) -> ColdResult<Vec<DbZenithHeader>> {
737        let rows = match spec {
738            ZenithHeaderSpecifier::Number(n) => {
739                let bn = to_i64(n);
740                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
741                    .bind(bn)
742                    .fetch_all(&self.pool)
743                    .await
744                    .map_err(SqlColdError::from)?
745            }
746            ZenithHeaderSpecifier::Range { start, end } => {
747                let s = to_i64(start);
748                let e = to_i64(end);
749                sqlx::query(
750                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
751                     ORDER BY block_number",
752                )
753                .bind(s)
754                .bind(e)
755                .fetch_all(&self.pool)
756                .await
757                .map_err(SqlColdError::from)?
758            }
759        };
760
761        rows.into_iter()
762            .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
763            .collect()
764    }
765
766    async fn get_logs(&self, filter: Filter) -> ColdResult<Vec<RpcLog>> {
767        let from = filter.get_from_block().unwrap_or(0);
768        let to = filter.get_to_block().unwrap_or(u64::MAX);
769
770        // Build dynamic SQL with positional $N placeholders.
771        // The correlated subquery computes block_log_index: the absolute
772        // position of each log among all logs in its block, leveraging the
773        // PK index on (block_number, tx_index, log_index).
774        let mut sql = String::from(
775            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
776               (SELECT COUNT(*) FROM logs l2 \
777                WHERE l2.block_number = l.block_number \
778                  AND (l2.tx_index < l.tx_index \
779                       OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
780               ) AS block_log_index \
781             FROM logs l \
782             JOIN headers h ON l.block_number = h.block_number \
783             JOIN transactions t ON l.block_number = t.block_number \
784               AND l.tx_index = t.tx_index \
785             WHERE l.block_number >= $1 AND l.block_number <= $2",
786        );
787        let mut params: Vec<Vec<u8>> = Vec::new();
788        let mut idx = 3u32;
789
790        // Address filter
791        if !filter.address.is_empty() {
792            let addrs: Vec<_> = filter.address.iter().collect();
793            if addrs.len() == 1 {
794                sql.push_str(&format!(" AND l.address = ${idx}"));
795                params.push(addrs[0].as_slice().to_vec());
796                idx += 1;
797            } else {
798                let placeholders: String = addrs
799                    .iter()
800                    .enumerate()
801                    .map(|(i, _)| format!("${}", idx + i as u32))
802                    .collect::<Vec<_>>()
803                    .join(", ");
804                sql.push_str(&format!(" AND l.address IN ({placeholders})"));
805                for addr in &addrs {
806                    params.push(addr.as_slice().to_vec());
807                }
808                idx += addrs.len() as u32;
809            }
810        }
811
812        // Topic filters
813        let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
814        for (i, topic_filter) in filter.topics.iter().enumerate() {
815            if topic_filter.is_empty() {
816                continue;
817            }
818            let values: Vec<_> = topic_filter.iter().collect();
819            if values.len() == 1 {
820                sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
821                params.push(values[0].as_slice().to_vec());
822                idx += 1;
823            } else {
824                let placeholders: String = values
825                    .iter()
826                    .enumerate()
827                    .map(|(j, _)| format!("${}", idx + j as u32))
828                    .collect::<Vec<_>>()
829                    .join(", ");
830                sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
831                for v in &values {
832                    params.push(v.as_slice().to_vec());
833                }
834                idx += values.len() as u32;
835            }
836        }
837
838        sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
839
840        // Bind parameters and execute.
841        let mut query = sqlx::query(&sql).bind(to_i64(from)).bind(to_i64(to));
842        for param in &params {
843            query = query.bind(param.as_slice());
844        }
845
846        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
847
848        rows.into_iter()
849            .map(|r| {
850                let log = row_to_log_row(&r).into_log();
851                let block_number = from_i64(r.get::<i64, _>("block_number"));
852                let block_hash_bytes: Vec<u8> = r.get("block_hash");
853                let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
854                Ok(RpcLog {
855                    inner: log,
856                    block_hash: Some(alloy::primitives::B256::from_slice(&block_hash_bytes)),
857                    block_number: Some(block_number),
858                    block_timestamp: Some(from_i64(r.get::<i64, _>("block_timestamp"))),
859                    transaction_hash: Some(alloy::primitives::B256::from_slice(&tx_hash_bytes)),
860                    transaction_index: Some(from_i64(r.get::<i64, _>("tx_index"))),
861                    log_index: Some(from_i64(r.get::<i64, _>("block_log_index"))),
862                    removed: false,
863                })
864            })
865            .collect::<ColdResult<Vec<_>>>()
866    }
867
868    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
869        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
870            .fetch_one(&self.pool)
871            .await
872            .map_err(SqlColdError::from)?;
873        Ok(row.get::<Option<i64>, _>("max_bn").map(from_i64))
874    }
875
876    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
877        self.insert_block(data).await.map_err(ColdStorageError::from)
878    }
879
880    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
881        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
882        for block_data in data {
883            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
884        }
885        tx.commit().await.map_err(SqlColdError::from)?;
886        Ok(())
887    }
888
889    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
890        let bn = to_i64(block);
891        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
892
893        sqlx::query("DELETE FROM logs WHERE block_number > $1")
894            .bind(bn)
895            .execute(&mut *tx)
896            .await
897            .map_err(SqlColdError::from)?;
898        sqlx::query("DELETE FROM transactions WHERE block_number > $1")
899            .bind(bn)
900            .execute(&mut *tx)
901            .await
902            .map_err(SqlColdError::from)?;
903        sqlx::query("DELETE FROM receipts WHERE block_number > $1")
904            .bind(bn)
905            .execute(&mut *tx)
906            .await
907            .map_err(SqlColdError::from)?;
908        sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
909            .bind(bn)
910            .execute(&mut *tx)
911            .await
912            .map_err(SqlColdError::from)?;
913        sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
914            .bind(bn)
915            .execute(&mut *tx)
916            .await
917            .map_err(SqlColdError::from)?;
918        sqlx::query("DELETE FROM headers WHERE block_number > $1")
919            .bind(bn)
920            .execute(&mut *tx)
921            .await
922            .map_err(SqlColdError::from)?;
923
924        tx.commit().await.map_err(SqlColdError::from)?;
925        Ok(())
926    }
927}
928
929#[cfg(all(test, feature = "test-utils"))]
930mod tests {
931    use super::*;
932    use signet_cold::conformance::conformance;
933
934    #[tokio::test]
935    async fn sqlite_conformance() {
936        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
937        conformance(&backend).await.unwrap();
938    }
939
940    #[tokio::test]
941    async fn pg_conformance() {
942        let Ok(url) = std::env::var("DATABASE_URL") else {
943            eprintln!("skipping pg conformance: DATABASE_URL not set");
944            return;
945        };
946        let backend = SqlColdBackend::connect(&url).await.unwrap();
947        conformance(&backend).await.unwrap();
948    }
949}