Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::convert::{
9    HeaderRow, LogRow, ReceiptRow, SignetEventRow, TxRow, ZenithHeaderRow, from_i64,
10    receipt_from_rows, to_i64,
11};
12use alloy::{
13    consensus::transaction::Recovered,
14    primitives::{BlockNumber, Sealable},
15};
16use signet_cold::{
17    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
18    HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
19    ZenithHeaderSpecifier,
20};
21use signet_storage_types::{
22    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
23    TransactionSigned,
24};
25use sqlx::{AnyPool, Row};
26
27/// SQL-based cold storage backend.
28///
29/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
30/// PostgreSQL and SQLite through a single implementation. The backend
31/// is determined by the connection URL at construction time.
32///
33/// # Example
34///
35/// ```no_run
36/// # async fn example() {
37/// use signet_cold_sql::SqlColdBackend;
38///
39/// // SQLite (in-memory)
40/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
41///
42/// // PostgreSQL
43/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
44/// # }
45/// ```
46#[derive(Debug, Clone)]
47pub struct SqlColdBackend {
48    pool: AnyPool,
49}
50
51impl SqlColdBackend {
52    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
53    ///
54    /// Auto-detects the database backend and creates all tables if they
55    /// do not already exist. Callers must ensure
56    /// [`sqlx::any::install_default_drivers`] has been called before
57    /// constructing the pool.
58    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
59        // Detect backend from a pooled connection.
60        let conn = pool.acquire().await?;
61        let backend = conn.backend_name().to_owned();
62        drop(conn);
63
64        let migration = match backend.as_str() {
65            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
66            "SQLite" => include_str!("../migrations/001_initial.sql"),
67            other => {
68                return Err(SqlColdError::Convert(format!(
69                    "unsupported database backend: {other}"
70                )));
71            }
72        };
73        // Execute via pool to ensure the migration uses the same
74        // connection that subsequent queries will use.
75        sqlx::raw_sql(migration).execute(&pool).await?;
76        Ok(Self { pool })
77    }
78
79    /// Connect to a database URL and create the backend.
80    ///
81    /// Installs the default sqlx drivers on the first call. The database
82    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
83    ///
84    /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
85    /// limited to one connection to ensure all operations share the same
86    /// database.
87    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
88        sqlx::any::install_default_drivers();
89        let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
90        Self::new(pool).await
91    }
92
93    // ========================================================================
94    // Specifier resolution
95    // ========================================================================
96
97    async fn resolve_header_spec(
98        &self,
99        spec: HeaderSpecifier,
100    ) -> Result<Option<BlockNumber>, SqlColdError> {
101        match spec {
102            HeaderSpecifier::Number(n) => Ok(Some(n)),
103            HeaderSpecifier::Hash(hash) => {
104                let hash_bytes = hash.as_slice();
105                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
106                    .bind(hash_bytes)
107                    .fetch_optional(&self.pool)
108                    .await?;
109                Ok(row.map(|r| from_i64(r.get::<i64, _>("block_number"))))
110            }
111        }
112    }
113
114    // ========================================================================
115    // Read helpers
116    // ========================================================================
117
118    async fn fetch_header_by_number(
119        &self,
120        block_num: BlockNumber,
121    ) -> Result<Option<SealedHeader>, SqlColdError> {
122        let bn = to_i64(block_num);
123        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
124            .bind(bn)
125            .fetch_optional(&self.pool)
126            .await?;
127
128        row.map(|r| {
129            let header = HeaderRow {
130                block_number: r.get("block_number"),
131                block_hash: r.get("block_hash"),
132                parent_hash: r.get("parent_hash"),
133                ommers_hash: r.get("ommers_hash"),
134                beneficiary: r.get("beneficiary"),
135                state_root: r.get("state_root"),
136                transactions_root: r.get("transactions_root"),
137                receipts_root: r.get("receipts_root"),
138                logs_bloom: r.get("logs_bloom"),
139                difficulty: r.get("difficulty"),
140                gas_limit: r.get("gas_limit"),
141                gas_used: r.get("gas_used"),
142                timestamp: r.get("timestamp"),
143                extra_data: r.get("extra_data"),
144                mix_hash: r.get("mix_hash"),
145                nonce: r.get("nonce"),
146                base_fee_per_gas: r.get("base_fee_per_gas"),
147                withdrawals_root: r.get("withdrawals_root"),
148                blob_gas_used: r.get("blob_gas_used"),
149                excess_blob_gas: r.get("excess_blob_gas"),
150                parent_beacon_block_root: r.get("parent_beacon_block_root"),
151                requests_hash: r.get("requests_hash"),
152            }
153            .into_header()?;
154            Ok(header.seal_slow())
155        })
156        .transpose()
157    }
158
159    // ========================================================================
160    // Write helpers
161    // ========================================================================
162
163    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
164        let mut tx = self.pool.begin().await?;
165        let block = data.block_number();
166        let bn = to_i64(block);
167
168        // Insert header
169        let hr = HeaderRow::from_header(&data.header);
170        sqlx::query(
171            "INSERT INTO headers (
172                block_number, block_hash, parent_hash, ommers_hash, beneficiary,
173                state_root, transactions_root, receipts_root, logs_bloom, difficulty,
174                gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
175                base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
176                parent_beacon_block_root, requests_hash
177            ) VALUES (
178                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
179                $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
180            )",
181        )
182        .bind(hr.block_number)
183        .bind(&hr.block_hash)
184        .bind(&hr.parent_hash)
185        .bind(&hr.ommers_hash)
186        .bind(&hr.beneficiary)
187        .bind(&hr.state_root)
188        .bind(&hr.transactions_root)
189        .bind(&hr.receipts_root)
190        .bind(&hr.logs_bloom)
191        .bind(&hr.difficulty)
192        .bind(hr.gas_limit)
193        .bind(hr.gas_used)
194        .bind(hr.timestamp)
195        .bind(&hr.extra_data)
196        .bind(&hr.mix_hash)
197        .bind(&hr.nonce)
198        .bind(hr.base_fee_per_gas)
199        .bind(&hr.withdrawals_root)
200        .bind(hr.blob_gas_used)
201        .bind(hr.excess_blob_gas)
202        .bind(&hr.parent_beacon_block_root)
203        .bind(&hr.requests_hash)
204        .execute(&mut *tx)
205        .await?;
206
207        // Insert transactions
208        for (idx, recovered_tx) in data.transactions.iter().enumerate() {
209            let sender = recovered_tx.signer();
210            let tx_signed: &TransactionSigned = recovered_tx;
211            let tr = TxRow::from_tx(tx_signed, bn, to_i64(idx as u64), &sender)?;
212            sqlx::query(
213                "INSERT INTO transactions (
214                    block_number, tx_index, tx_hash, tx_type,
215                    sig_y_parity, sig_r, sig_s,
216                    chain_id, nonce, gas_limit, to_address, value, input,
217                    gas_price, max_fee_per_gas, max_priority_fee_per_gas,
218                    max_fee_per_blob_gas, blob_versioned_hashes,
219                    access_list, authorization_list, from_address
220                ) VALUES (
221                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
222                    $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
223                )",
224            )
225            .bind(tr.block_number)
226            .bind(tr.tx_index)
227            .bind(&tr.tx_hash)
228            .bind(tr.tx_type as i32)
229            .bind(tr.sig_y_parity as i32)
230            .bind(&tr.sig_r)
231            .bind(&tr.sig_s)
232            .bind(tr.chain_id)
233            .bind(tr.nonce)
234            .bind(tr.gas_limit)
235            .bind(&tr.to_address)
236            .bind(&tr.value)
237            .bind(&tr.input)
238            .bind(&tr.gas_price)
239            .bind(&tr.max_fee_per_gas)
240            .bind(&tr.max_priority_fee_per_gas)
241            .bind(&tr.max_fee_per_blob_gas)
242            .bind(&tr.blob_versioned_hashes)
243            .bind(&tr.access_list)
244            .bind(&tr.authorization_list)
245            .bind(&tr.from_address)
246            .execute(&mut *tx)
247            .await?;
248        }
249
250        // Insert receipts and logs
251        for (idx, receipt) in data.receipts.iter().enumerate() {
252            let rr = ReceiptRow::from_receipt(receipt, bn, to_i64(idx as u64));
253            sqlx::query(
254                "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
255                 VALUES ($1, $2, $3, $4, $5)",
256            )
257            .bind(rr.block_number)
258            .bind(rr.tx_index)
259            .bind(rr.tx_type as i32)
260            .bind(rr.success as i32)
261            .bind(rr.cumulative_gas_used)
262            .execute(&mut *tx)
263            .await?;
264
265            for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
266                let lr = LogRow::from_log(log, bn, to_i64(idx as u64), to_i64(log_idx as u64));
267                sqlx::query(
268                    "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
269                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
270                )
271                .bind(lr.block_number)
272                .bind(lr.tx_index)
273                .bind(lr.log_index)
274                .bind(&lr.address)
275                .bind(&lr.topic0)
276                .bind(&lr.topic1)
277                .bind(&lr.topic2)
278                .bind(&lr.topic3)
279                .bind(&lr.data)
280                .execute(&mut *tx)
281                .await?;
282            }
283        }
284
285        // Insert signet events
286        for (idx, event) in data.signet_events.iter().enumerate() {
287            let er = SignetEventRow::from_event(event, bn, to_i64(idx as u64));
288            sqlx::query(
289                "INSERT INTO signet_events (
290                    block_number, event_index, event_type, order_index,
291                    rollup_chain_id, sender, to_address, value, gas,
292                    max_fee_per_gas, data, rollup_recipient, amount, token
293                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
294            )
295            .bind(er.block_number)
296            .bind(er.event_index)
297            .bind(er.event_type as i32)
298            .bind(er.order_index)
299            .bind(&er.rollup_chain_id)
300            .bind(&er.sender)
301            .bind(&er.to_address)
302            .bind(&er.value)
303            .bind(&er.gas)
304            .bind(&er.max_fee_per_gas)
305            .bind(&er.data)
306            .bind(&er.rollup_recipient)
307            .bind(&er.amount)
308            .bind(&er.token)
309            .execute(&mut *tx)
310            .await?;
311        }
312
313        // Insert zenith header
314        if let Some(zh) = &data.zenith_header {
315            let zr = ZenithHeaderRow::from_zenith(zh, bn);
316            sqlx::query(
317                "INSERT INTO zenith_headers (
318                    block_number, host_block_number, rollup_chain_id,
319                    gas_limit, reward_address, block_data_hash
320                ) VALUES ($1, $2, $3, $4, $5, $6)",
321            )
322            .bind(zr.block_number)
323            .bind(&zr.host_block_number)
324            .bind(&zr.rollup_chain_id)
325            .bind(&zr.gas_limit)
326            .bind(&zr.reward_address)
327            .bind(&zr.block_data_hash)
328            .execute(&mut *tx)
329            .await?;
330        }
331
332        tx.commit().await?;
333        Ok(())
334    }
335}
336
337/// Convert a sqlx row to a TxRow.
338fn row_to_tx_row(r: &sqlx::any::AnyRow) -> TxRow {
339    TxRow {
340        block_number: r.get("block_number"),
341        tx_index: r.get("tx_index"),
342        tx_hash: r.get("tx_hash"),
343        tx_type: r.get::<i32, _>("tx_type") as i16,
344        sig_y_parity: r.get::<i32, _>("sig_y_parity") != 0,
345        sig_r: r.get("sig_r"),
346        sig_s: r.get("sig_s"),
347        chain_id: r.get("chain_id"),
348        nonce: r.get("nonce"),
349        gas_limit: r.get("gas_limit"),
350        to_address: r.get("to_address"),
351        value: r.get("value"),
352        input: r.get("input"),
353        gas_price: r.get("gas_price"),
354        max_fee_per_gas: r.get("max_fee_per_gas"),
355        max_priority_fee_per_gas: r.get("max_priority_fee_per_gas"),
356        max_fee_per_blob_gas: r.get("max_fee_per_blob_gas"),
357        blob_versioned_hashes: r.get("blob_versioned_hashes"),
358        access_list: r.get("access_list"),
359        authorization_list: r.get("authorization_list"),
360        from_address: r.get("from_address"),
361    }
362}
363
364fn row_to_signet_event_row(r: &sqlx::any::AnyRow) -> SignetEventRow {
365    SignetEventRow {
366        block_number: r.get("block_number"),
367        event_index: r.get("event_index"),
368        event_type: r.get::<i32, _>("event_type") as i16,
369        order_index: r.get("order_index"),
370        rollup_chain_id: r.get("rollup_chain_id"),
371        sender: r.get("sender"),
372        to_address: r.get("to_address"),
373        value: r.get("value"),
374        gas: r.get("gas"),
375        max_fee_per_gas: r.get("max_fee_per_gas"),
376        data: r.get("data"),
377        rollup_recipient: r.get("rollup_recipient"),
378        amount: r.get("amount"),
379        token: r.get("token"),
380    }
381}
382
383fn row_to_log_row(r: &sqlx::any::AnyRow) -> LogRow {
384    LogRow {
385        block_number: r.get("block_number"),
386        tx_index: r.get("tx_index"),
387        log_index: r.get("log_index"),
388        address: r.get("address"),
389        topic0: r.get("topic0"),
390        topic1: r.get("topic1"),
391        topic2: r.get("topic2"),
392        topic3: r.get("topic3"),
393        data: r.get("data"),
394    }
395}
396
397fn row_to_zenith_header_row(r: &sqlx::any::AnyRow) -> ZenithHeaderRow {
398    ZenithHeaderRow {
399        block_number: r.get("block_number"),
400        host_block_number: r.get("host_block_number"),
401        rollup_chain_id: r.get("rollup_chain_id"),
402        gas_limit: r.get("gas_limit"),
403        reward_address: r.get("reward_address"),
404        block_data_hash: r.get("block_data_hash"),
405    }
406}
407
408impl ColdStorage for SqlColdBackend {
409    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
410        let Some(block_num) = self.resolve_header_spec(spec).await? else {
411            return Ok(None);
412        };
413        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
414    }
415
416    async fn get_headers(
417        &self,
418        specs: Vec<HeaderSpecifier>,
419    ) -> ColdResult<Vec<Option<SealedHeader>>> {
420        let mut results = Vec::with_capacity(specs.len());
421        for spec in specs {
422            let header = self.get_header(spec).await?;
423            results.push(header);
424        }
425        Ok(results)
426    }
427
428    async fn get_transaction(
429        &self,
430        spec: TransactionSpecifier,
431    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
432        let row = match spec {
433            TransactionSpecifier::Hash(hash) => sqlx::query(
434                "SELECT t.*, h.block_hash
435                     FROM transactions t
436                     JOIN headers h ON t.block_number = h.block_number
437                     WHERE t.tx_hash = $1",
438            )
439            .bind(hash.as_slice())
440            .fetch_optional(&self.pool)
441            .await
442            .map_err(SqlColdError::from)?,
443            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
444                "SELECT t.*, h.block_hash
445                     FROM transactions t
446                     JOIN headers h ON t.block_number = h.block_number
447                     WHERE t.block_number = $1 AND t.tx_index = $2",
448            )
449            .bind(to_i64(block))
450            .bind(to_i64(index))
451            .fetch_optional(&self.pool)
452            .await
453            .map_err(SqlColdError::from)?,
454            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
455                "SELECT t.*, h.block_hash
456                     FROM transactions t
457                     JOIN headers h ON t.block_number = h.block_number
458                     WHERE h.block_hash = $1 AND t.tx_index = $2",
459            )
460            .bind(block_hash.as_slice())
461            .bind(to_i64(index))
462            .fetch_optional(&self.pool)
463            .await
464            .map_err(SqlColdError::from)?,
465        };
466
467        let Some(r) = row else {
468            return Ok(None);
469        };
470
471        let tx_row = row_to_tx_row(&r);
472        let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
473        let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
474        let block = from_i64(r.get::<i64, _>("block_number"));
475        let index = from_i64(r.get::<i64, _>("tx_index"));
476        let hash_bytes: Vec<u8> = r.get("block_hash");
477        let block_hash = alloy::primitives::B256::from_slice(&hash_bytes);
478        let meta = ConfirmationMeta::new(block, block_hash, index);
479        // SAFETY: the sender was recovered at append time and stored in from_address.
480        let recovered = Recovered::new_unchecked(tx, sender);
481        Ok(Some(Confirmed::new(recovered, meta)))
482    }
483
484    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
485        let bn = to_i64(block);
486        let rows =
487            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
488                .bind(bn)
489                .fetch_all(&self.pool)
490                .await
491                .map_err(SqlColdError::from)?;
492
493        rows.into_iter()
494            .map(|r| {
495                let tx_row = row_to_tx_row(&r);
496                let sender = alloy::primitives::Address::from_slice(&tx_row.from_address);
497                let tx = tx_row.into_tx().map_err(ColdStorageError::from)?;
498                // SAFETY: the sender was recovered at append time.
499                Ok(Recovered::new_unchecked(tx, sender))
500            })
501            .collect()
502    }
503
504    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
505        let bn = to_i64(block);
506        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
507            .bind(bn)
508            .fetch_one(&self.pool)
509            .await
510            .map_err(SqlColdError::from)?;
511
512        Ok(from_i64(row.get::<i64, _>("cnt")))
513    }
514
515    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
516        // Resolve to (block, index)
517        let (block, index) = match spec {
518            ReceiptSpecifier::TxHash(hash) => {
519                let row = sqlx::query(
520                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
521                )
522                .bind(hash.as_slice())
523                .fetch_optional(&self.pool)
524                .await
525                .map_err(SqlColdError::from)?;
526                let Some(r) = row else { return Ok(None) };
527                (from_i64(r.get::<i64, _>("block_number")), from_i64(r.get::<i64, _>("tx_index")))
528            }
529            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
530        };
531
532        let Some(header) = self.fetch_header_by_number(block).await? else {
533            return Ok(None);
534        };
535
536        // Fetch receipt + tx_hash + from_address
537        let receipt_row = sqlx::query(
538            "SELECT r.*, t.tx_hash, t.from_address
539             FROM receipts r
540             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
541             WHERE r.block_number = $1 AND r.tx_index = $2",
542        )
543        .bind(to_i64(block))
544        .bind(to_i64(index))
545        .fetch_optional(&self.pool)
546        .await
547        .map_err(SqlColdError::from)?;
548
549        let Some(rr) = receipt_row else {
550            return Ok(None);
551        };
552
553        let bn: i64 = rr.get("block_number");
554        let tx_idx: i64 = rr.get("tx_index");
555        let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
556        let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
557        let from_bytes: Vec<u8> = rr.get("from_address");
558        let sender = alloy::primitives::Address::from_slice(&from_bytes);
559
560        let receipt = ReceiptRow {
561            block_number: bn,
562            tx_index: tx_idx,
563            tx_type: rr.get::<i32, _>("tx_type") as i16,
564            success: rr.get::<i32, _>("success") != 0,
565            cumulative_gas_used: rr.get("cumulative_gas_used"),
566        };
567
568        let log_rows = sqlx::query(
569            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
570        )
571        .bind(bn)
572        .bind(tx_idx)
573        .fetch_all(&self.pool)
574        .await
575        .map_err(SqlColdError::from)?;
576
577        let logs: Vec<_> = log_rows.into_iter().map(|r| row_to_log_row(&r)).collect();
578
579        let built = receipt_from_rows(receipt, logs).map_err(ColdStorageError::from)?;
580
581        // Compute gas_used and first_log_index by querying prior receipts
582        let prior = sqlx::query(
583            "SELECT CAST(SUM(
584                (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
585             ) AS bigint) as log_count,
586             CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
587             FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
588        )
589        .bind(to_i64(block))
590        .bind(to_i64(index))
591        .fetch_one(&self.pool)
592        .await
593        .map_err(SqlColdError::from)?;
594
595        let first_log_index: u64 = prior.get::<Option<i64>, _>("log_count").unwrap_or(0) as u64;
596        let prior_cumulative_gas: u64 =
597            prior.get::<Option<i64>, _>("prior_gas").unwrap_or(0) as u64;
598        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
599
600        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
601        Ok(Some(ColdReceipt::new(ir, &header, index)))
602    }
603
604    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
605        let Some(header) =
606            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
607        else {
608            return Ok(Vec::new());
609        };
610
611        let bn = to_i64(block);
612
613        // Fetch receipts joined with tx_hash and from_address
614        let receipt_rows = sqlx::query(
615            "SELECT r.*, t.tx_hash, t.from_address
616             FROM receipts r
617             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
618             WHERE r.block_number = $1
619             ORDER BY r.tx_index",
620        )
621        .bind(bn)
622        .fetch_all(&self.pool)
623        .await
624        .map_err(SqlColdError::from)?;
625
626        let all_log_rows =
627            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
628                .bind(bn)
629                .fetch_all(&self.pool)
630                .await
631                .map_err(SqlColdError::from)?;
632
633        // Group logs by tx_index
634        let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<LogRow>> =
635            std::collections::BTreeMap::new();
636        for r in all_log_rows {
637            logs_by_tx.entry(r.get::<i64, _>("tx_index")).or_default().push(row_to_log_row(&r));
638        }
639
640        let mut first_log_index = 0u64;
641        let mut prior_cumulative_gas = 0u64;
642        receipt_rows
643            .into_iter()
644            .enumerate()
645            .map(|(idx, rr)| {
646                let tx_idx: i64 = rr.get("tx_index");
647                let tx_hash_bytes: Vec<u8> = rr.get("tx_hash");
648                let tx_hash = alloy::primitives::B256::from_slice(&tx_hash_bytes);
649                let from_bytes: Vec<u8> = rr.get("from_address");
650                let sender = alloy::primitives::Address::from_slice(&from_bytes);
651                let receipt_row = ReceiptRow {
652                    block_number: rr.get("block_number"),
653                    tx_index: tx_idx,
654                    tx_type: rr.get::<i32, _>("tx_type") as i16,
655                    success: rr.get::<i32, _>("success") != 0,
656                    cumulative_gas_used: rr.get("cumulative_gas_used"),
657                };
658                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
659                let receipt =
660                    receipt_from_rows(receipt_row, logs).map_err(ColdStorageError::from)?;
661                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
662                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
663                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
664                first_log_index += ir.receipt.inner.logs.len() as u64;
665                Ok(ColdReceipt::new(ir, &header, idx as u64))
666            })
667            .collect()
668    }
669
670    async fn get_signet_events(
671        &self,
672        spec: SignetEventsSpecifier,
673    ) -> ColdResult<Vec<DbSignetEvent>> {
674        let rows = match spec {
675            SignetEventsSpecifier::Block(block) => {
676                let bn = to_i64(block);
677                sqlx::query(
678                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
679                )
680                .bind(bn)
681                .fetch_all(&self.pool)
682                .await
683                .map_err(SqlColdError::from)?
684            }
685            SignetEventsSpecifier::BlockRange { start, end } => {
686                let s = to_i64(start);
687                let e = to_i64(end);
688                sqlx::query(
689                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
690                     ORDER BY block_number, event_index",
691                )
692                .bind(s)
693                .bind(e)
694                .fetch_all(&self.pool)
695                .await
696                .map_err(SqlColdError::from)?
697            }
698        };
699
700        rows.into_iter()
701            .map(|r| row_to_signet_event_row(&r).into_event().map_err(ColdStorageError::from))
702            .collect()
703    }
704
705    async fn get_zenith_header(
706        &self,
707        spec: ZenithHeaderSpecifier,
708    ) -> ColdResult<Option<DbZenithHeader>> {
709        let block = match spec {
710            ZenithHeaderSpecifier::Number(n) => n,
711            ZenithHeaderSpecifier::Range { start, .. } => start,
712        };
713        let bn = to_i64(block);
714        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
715            .bind(bn)
716            .fetch_optional(&self.pool)
717            .await
718            .map_err(SqlColdError::from)?;
719
720        row.map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
721            .transpose()
722    }
723
724    async fn get_zenith_headers(
725        &self,
726        spec: ZenithHeaderSpecifier,
727    ) -> ColdResult<Vec<DbZenithHeader>> {
728        let rows = match spec {
729            ZenithHeaderSpecifier::Number(n) => {
730                let bn = to_i64(n);
731                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
732                    .bind(bn)
733                    .fetch_all(&self.pool)
734                    .await
735                    .map_err(SqlColdError::from)?
736            }
737            ZenithHeaderSpecifier::Range { start, end } => {
738                let s = to_i64(start);
739                let e = to_i64(end);
740                sqlx::query(
741                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
742                     ORDER BY block_number",
743                )
744                .bind(s)
745                .bind(e)
746                .fetch_all(&self.pool)
747                .await
748                .map_err(SqlColdError::from)?
749            }
750        };
751
752        rows.into_iter()
753            .map(|r| row_to_zenith_header_row(&r).into_zenith().map_err(ColdStorageError::from))
754            .collect()
755    }
756
757    async fn get_logs(&self, filter: Filter) -> ColdResult<Vec<RpcLog>> {
758        let from = filter.get_from_block().unwrap_or(0);
759        let to = filter.get_to_block().unwrap_or(u64::MAX);
760
761        // Build dynamic SQL with positional $N placeholders.
762        // The correlated subquery computes block_log_index: the absolute
763        // position of each log among all logs in its block, leveraging the
764        // PK index on (block_number, tx_index, log_index).
765        let mut sql = String::from(
766            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
767               (SELECT COUNT(*) FROM logs l2 \
768                WHERE l2.block_number = l.block_number \
769                  AND (l2.tx_index < l.tx_index \
770                       OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
771               ) AS block_log_index \
772             FROM logs l \
773             JOIN headers h ON l.block_number = h.block_number \
774             JOIN transactions t ON l.block_number = t.block_number \
775               AND l.tx_index = t.tx_index \
776             WHERE l.block_number >= $1 AND l.block_number <= $2",
777        );
778        let mut params: Vec<Vec<u8>> = Vec::new();
779        let mut idx = 3u32;
780
781        // Address filter
782        if !filter.address.is_empty() {
783            let addrs: Vec<_> = filter.address.iter().collect();
784            if addrs.len() == 1 {
785                sql.push_str(&format!(" AND l.address = ${idx}"));
786                params.push(addrs[0].as_slice().to_vec());
787                idx += 1;
788            } else {
789                let placeholders: String = addrs
790                    .iter()
791                    .enumerate()
792                    .map(|(i, _)| format!("${}", idx + i as u32))
793                    .collect::<Vec<_>>()
794                    .join(", ");
795                sql.push_str(&format!(" AND l.address IN ({placeholders})"));
796                for addr in &addrs {
797                    params.push(addr.as_slice().to_vec());
798                }
799                idx += addrs.len() as u32;
800            }
801        }
802
803        // Topic filters
804        let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
805        for (i, topic_filter) in filter.topics.iter().enumerate() {
806            if topic_filter.is_empty() {
807                continue;
808            }
809            let values: Vec<_> = topic_filter.iter().collect();
810            if values.len() == 1 {
811                sql.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
812                params.push(values[0].as_slice().to_vec());
813                idx += 1;
814            } else {
815                let placeholders: String = values
816                    .iter()
817                    .enumerate()
818                    .map(|(j, _)| format!("${}", idx + j as u32))
819                    .collect::<Vec<_>>()
820                    .join(", ");
821                sql.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
822                for v in &values {
823                    params.push(v.as_slice().to_vec());
824                }
825                idx += values.len() as u32;
826            }
827        }
828
829        sql.push_str(" ORDER BY l.block_number, l.tx_index, l.log_index");
830
831        // Bind parameters and execute.
832        let mut query = sqlx::query(&sql).bind(to_i64(from)).bind(to_i64(to));
833        for param in &params {
834            query = query.bind(param.as_slice());
835        }
836
837        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
838
839        rows.into_iter()
840            .map(|r| {
841                let log = row_to_log_row(&r).into_log();
842                let block_number = from_i64(r.get::<i64, _>("block_number"));
843                let block_hash_bytes: Vec<u8> = r.get("block_hash");
844                let tx_hash_bytes: Vec<u8> = r.get("tx_hash");
845                Ok(RpcLog {
846                    inner: log,
847                    block_hash: Some(alloy::primitives::B256::from_slice(&block_hash_bytes)),
848                    block_number: Some(block_number),
849                    block_timestamp: Some(from_i64(r.get::<i64, _>("block_timestamp"))),
850                    transaction_hash: Some(alloy::primitives::B256::from_slice(&tx_hash_bytes)),
851                    transaction_index: Some(from_i64(r.get::<i64, _>("tx_index"))),
852                    log_index: Some(from_i64(r.get::<i64, _>("block_log_index"))),
853                    removed: false,
854                })
855            })
856            .collect::<ColdResult<Vec<_>>>()
857    }
858
859    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
860        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
861            .fetch_one(&self.pool)
862            .await
863            .map_err(SqlColdError::from)?;
864        Ok(row.get::<Option<i64>, _>("max_bn").map(from_i64))
865    }
866
867    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
868        self.insert_block(data).await.map_err(ColdStorageError::from)
869    }
870
871    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
872        for block_data in data {
873            self.insert_block(block_data).await?;
874        }
875        Ok(())
876    }
877
878    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
879        let bn = to_i64(block);
880        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
881
882        sqlx::query("DELETE FROM logs WHERE block_number > $1")
883            .bind(bn)
884            .execute(&mut *tx)
885            .await
886            .map_err(SqlColdError::from)?;
887        sqlx::query("DELETE FROM transactions WHERE block_number > $1")
888            .bind(bn)
889            .execute(&mut *tx)
890            .await
891            .map_err(SqlColdError::from)?;
892        sqlx::query("DELETE FROM receipts WHERE block_number > $1")
893            .bind(bn)
894            .execute(&mut *tx)
895            .await
896            .map_err(SqlColdError::from)?;
897        sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
898            .bind(bn)
899            .execute(&mut *tx)
900            .await
901            .map_err(SqlColdError::from)?;
902        sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
903            .bind(bn)
904            .execute(&mut *tx)
905            .await
906            .map_err(SqlColdError::from)?;
907        sqlx::query("DELETE FROM headers WHERE block_number > $1")
908            .bind(bn)
909            .execute(&mut *tx)
910            .await
911            .map_err(SqlColdError::from)?;
912
913        tx.commit().await.map_err(SqlColdError::from)?;
914        Ok(())
915    }
916}
917
918#[cfg(all(test, feature = "test-utils"))]
919mod tests {
920    use super::*;
921    use signet_cold::conformance::conformance;
922
923    #[tokio::test]
924    async fn sqlite_conformance() {
925        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
926        conformance(&backend).await.unwrap();
927    }
928
929    #[tokio::test]
930    async fn pg_conformance() {
931        let Ok(url) = std::env::var("DATABASE_URL") else {
932            eprintln!("skipping pg conformance: DATABASE_URL not set");
933            return;
934        };
935        let backend = SqlColdBackend::connect(&url).await.unwrap();
936        conformance(&backend).await.unwrap();
937    }
938}