Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14    COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15    COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16    COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17    COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18    COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19    COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20    COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21    COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22    COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26    decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27    encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30    consensus::{
31        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32        transaction::Recovered,
33    },
34    primitives::{
35        Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36    },
37};
38use signet_cold::{
39    BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, ColdStorageRead,
40    ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41    SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45    TransactionSigned,
46};
47use signet_zenith::{
48    Passage::{Enter, EnterToken},
49    Transactor::Transact,
50    Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::{collections::BTreeMap, time::Duration};
54
55/// Default statement timeout for read transactions (matches MDBX).
56pub(crate) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_millis(500);
57/// Default statement timeout for write transactions (matches MDBX).
58pub(crate) const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(2);
59
60/// SQL-based cold storage backend.
61///
62/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
63/// PostgreSQL and SQLite through a single implementation. The backend
64/// is determined by the connection URL at construction time.
65///
66/// # Example
67///
68/// ```no_run
69/// # async fn example() {
70/// use signet_cold_sql::SqlColdBackend;
71///
72/// // SQLite (in-memory)
73/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
74///
75/// // PostgreSQL
76/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
77/// # }
78/// ```
79#[derive(Debug, Clone)]
80pub struct SqlColdBackend {
81    pool: AnyPool,
82    is_postgres: bool,
83    read_timeout: Duration,
84    write_timeout: Duration,
85}
86
87/// Returns `true` if the URL refers to an in-memory SQLite database.
88///
89/// In-memory SQLite databases are per-connection, so the pool must be
90/// limited to a single connection to ensure all queries see the same
91/// state.
92fn is_in_memory_sqlite(url: &str) -> bool {
93    url == "sqlite::memory:" || url.contains("mode=memory")
94}
95
96/// Delete all rows with `block_number > bn` from every table, in
97/// child-first order to preserve foreign-key constraints.
98async fn delete_above_in_tx(
99    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
100    bn: i64,
101) -> Result<(), SqlColdError> {
102    for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
103    {
104        sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
105            .bind(bn)
106            .execute(&mut **tx)
107            .await?;
108    }
109    Ok(())
110}
111
112impl SqlColdBackend {
113    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
114    ///
115    /// Auto-detects the database backend and creates all tables if they
116    /// do not already exist. Callers must ensure
117    /// [`sqlx::any::install_default_drivers`] has been called before
118    /// constructing the pool.
119    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
120        // Detect backend from a pooled connection.
121        let conn = pool.acquire().await?;
122        let backend = conn.backend_name();
123
124        let (migration, is_postgres) = match backend {
125            "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
126            "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
127            other => {
128                return Err(SqlColdError::Convert(format!(
129                    "unsupported database backend: {other}"
130                )));
131            }
132        };
133        drop(conn);
134        // Execute via pool to ensure the migration uses the same
135        // connection that subsequent queries will use.
136        sqlx::raw_sql(migration).execute(&pool).await?;
137        sqlx::raw_sql(include_str!("../migrations/002_add_topic_indexes.sql"))
138            .execute(&pool)
139            .await?;
140        Ok(Self {
141            pool,
142            is_postgres,
143            read_timeout: DEFAULT_READ_TIMEOUT,
144            write_timeout: DEFAULT_WRITE_TIMEOUT,
145        })
146    }
147
148    /// Override the per-transaction read timeout (default 500 ms).
149    ///
150    /// On Postgres this sets `statement_timeout` on every transaction
151    /// opened by a read method. On SQLite the value is stored but
152    /// not enforced — SQLite has no equivalent mechanism.
153    ///
154    /// # Panics
155    ///
156    /// Panics if `d` rounds to 0 ms. Postgres interprets
157    /// `statement_timeout = 0` as "no timeout", which would silently
158    /// disable the trait-level mandatory-timeout contract.
159    #[must_use]
160    pub fn with_read_timeout(mut self, d: Duration) -> Self {
161        assert!(d.as_millis() >= 1, "read_timeout must be >= 1ms (got {d:?})");
162        self.read_timeout = d;
163        self
164    }
165
166    /// Override the per-transaction write timeout (default 2 s).
167    ///
168    /// On Postgres this sets `statement_timeout` on every transaction
169    /// opened by a write method. On SQLite the value is stored but
170    /// not enforced — SQLite has no equivalent mechanism.
171    ///
172    /// # Panics
173    ///
174    /// Panics if `d` rounds to 0 ms. See [`with_read_timeout`](Self::with_read_timeout).
175    #[must_use]
176    pub fn with_write_timeout(mut self, d: Duration) -> Self {
177        assert!(d.as_millis() >= 1, "write_timeout must be >= 1ms (got {d:?})");
178        self.write_timeout = d;
179        self
180    }
181
182    /// Open a transaction for a read operation.
183    ///
184    /// On Postgres, issues `SET LOCAL statement_timeout = <ms>` scoped
185    /// to the transaction. On SQLite the timeout is advisory (there is
186    /// no equivalent mechanism).
187    async fn begin_read(&self) -> Result<sqlx::Transaction<'_, sqlx::Any>, SqlColdError> {
188        let tx = self.pool.begin().await?;
189        self.apply_statement_timeout(tx, self.read_timeout).await
190    }
191
192    /// Open a transaction for a write operation.
193    ///
194    /// Same semantics as [`begin_read`](Self::begin_read) but uses
195    /// `write_timeout` for the `SET LOCAL`.
196    async fn begin_write(&self) -> Result<sqlx::Transaction<'_, sqlx::Any>, SqlColdError> {
197        let tx = self.pool.begin().await?;
198        self.apply_statement_timeout(tx, self.write_timeout).await
199    }
200
201    /// Run `SELECT pg_sleep($1)` inside a `begin_read` transaction.
202    ///
203    /// Intended only for verifying the `statement_timeout` behaviour in
204    /// integration tests. Returns whatever error the server produces if
205    /// the sleep trips the configured read timeout.
206    #[cfg(any(test, feature = "test-utils"))]
207    pub async fn debug_pg_sleep(&self, d: Duration) -> Result<(), SqlColdError> {
208        let secs = d.as_secs_f64();
209        let mut tx = self.begin_read().await?;
210        sqlx::query(&format!("SELECT pg_sleep({secs})")).execute(&mut *tx).await?;
211        tx.commit().await?;
212        Ok(())
213    }
214
215    /// Apply `SET LOCAL statement_timeout` on Postgres; no-op on SQLite.
216    ///
217    /// `SET` does not accept bind parameters in Postgres the same way
218    /// `SELECT` does, so the value is inline-formatted. This is SAFE
219    /// because `ms` is derived from an internal `Duration` field, never
220    /// from user input.
221    async fn apply_statement_timeout<'a>(
222        &self,
223        mut tx: sqlx::Transaction<'a, sqlx::Any>,
224        d: Duration,
225    ) -> Result<sqlx::Transaction<'a, sqlx::Any>, SqlColdError> {
226        if self.is_postgres {
227            let ms = i64::try_from(d.as_millis()).unwrap_or(i64::MAX);
228            sqlx::query(&format!("SET LOCAL statement_timeout = {ms}")).execute(&mut *tx).await?;
229        }
230        Ok(tx)
231    }
232
233    /// Connect to a database URL with explicit pool options.
234    ///
235    /// Installs the default sqlx drivers on the first call. The database
236    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
237    ///
238    /// For in-memory SQLite URLs (`sqlite::memory:` or `mode=memory`),
239    /// `max_connections` is forced to 1 regardless of the provided
240    /// options, because in-memory databases are per-connection.
241    pub async fn connect_with(
242        url: &str,
243        pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
244    ) -> Result<Self, SqlColdError> {
245        sqlx::any::install_default_drivers();
246        let pool_opts =
247            if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
248        let pool: AnyPool = pool_opts.connect(url).await?;
249        Self::new(pool).await
250    }
251
252    /// Connect to a database URL with default pool settings.
253    ///
254    /// Convenience wrapper around [`connect_with`](Self::connect_with).
255    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
256        Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
257    }
258
259    // ========================================================================
260    // Specifier resolution
261    // ========================================================================
262
263    async fn resolve_header_spec(
264        &self,
265        spec: HeaderSpecifier,
266    ) -> Result<Option<BlockNumber>, SqlColdError> {
267        match spec {
268            HeaderSpecifier::Number(n) => Ok(Some(n)),
269            HeaderSpecifier::Hash(hash) => {
270                let mut tx = self.begin_read().await?;
271                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
272                    .bind(hash)
273                    .fetch_optional(&mut *tx)
274                    .await?;
275                tx.commit().await?;
276                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
277            }
278        }
279    }
280
281    // ========================================================================
282    // Read helpers
283    // ========================================================================
284
285    async fn fetch_header_by_number(
286        &self,
287        block_num: BlockNumber,
288    ) -> Result<Option<SealedHeader>, SqlColdError> {
289        let bn = to_i64(block_num);
290        let mut tx = self.begin_read().await?;
291        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
292            .bind(bn)
293            .fetch_optional(&mut *tx)
294            .await?;
295        tx.commit().await?;
296
297        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
298    }
299
300    // ========================================================================
301    // Write helpers
302    // ========================================================================
303
304    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
305        let mut tx = self.begin_write().await?;
306        write_block_to_tx(&mut tx, data).await?;
307        tx.commit().await?;
308        Ok(())
309    }
310
311    // ========================================================================
312    // Streaming helpers
313    // ========================================================================
314
315    /// Stream logs using a PostgreSQL REPEATABLE READ transaction.
316    ///
317    /// The transaction provides a consistent snapshot across all per-block
318    /// queries, eliminating the need for anchor-hash reorg detection.
319    /// Rows are streamed individually rather than materialised per block.
320    #[cfg(feature = "postgres")]
321    async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
322        use tokio_stream::StreamExt;
323
324        /// Unwrap a `Result` or send the error through the stream and return.
325        macro_rules! try_stream {
326            ($sender:expr, $expr:expr) => {
327                match $expr {
328                    Ok(v) => v,
329                    Err(e) => {
330                        let _ = $sender
331                            .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
332                            .await;
333                        return;
334                    }
335                }
336            };
337        }
338
339        let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
340
341        // Open a REPEATABLE READ transaction so all per-block queries see a
342        // consistent snapshot. This makes reorg detection unnecessary — if a
343        // reorg lands mid-stream the transaction still reads the old data.
344        // Isolation level must be the first statement in the transaction,
345        // so we bypass `begin_read`'s `SET LOCAL statement_timeout` here
346        // and issue both settings ourselves in the correct order.
347        let mut tx = try_stream!(sender, self.pool.begin().await);
348        try_stream!(
349            sender,
350            sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
351        );
352        let read_ms = i64::try_from(self.read_timeout.as_millis()).unwrap_or(i64::MAX);
353        try_stream!(
354            sender,
355            sqlx::query(&format!("SET LOCAL statement_timeout = {read_ms}"))
356                .execute(&mut *tx)
357                .await
358        );
359
360        // Build the parameterised query once. $1 is the block number
361        // (bound per iteration); remaining parameters are the address
362        // and topic filters from the user's request.
363        //
364        // ENG-2036: format!() allocates here, but the dynamic filter clause
365        // makes this unavoidable. sqlx's prepared-statement cache mitigates
366        // repeated execution cost.
367        let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
368        let data_sql = format!(
369            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
370               (r.first_log_index + l.log_index) AS block_log_index \
371             FROM logs l \
372             JOIN headers h ON l.block_number = h.block_number \
373             JOIN transactions t ON l.block_number = t.block_number \
374               AND l.tx_index = t.tx_index \
375             JOIN receipts r ON l.block_number = r.block_number \
376               AND l.tx_index = r.tx_index \
377             WHERE l.block_number = $1{filter_clause} \
378             ORDER BY l.tx_index, l.log_index"
379        );
380
381        let mut total = 0usize;
382
383        // Walk through blocks one at a time, streaming matching log rows
384        // from each block directly to the channel.
385        for block_num in from..=to {
386            // Check the deadline before starting each block so we
387            // don't begin a new query after the caller's timeout.
388            if tokio::time::Instant::now() > deadline {
389                let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
390                return;
391            }
392
393            let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
394            for param in &filter_params {
395                query = query.bind(*param);
396            }
397
398            // Stream rows from this block's query. Each row is converted
399            // to an RpcLog and sent over the channel immediately rather
400            // than being collected into a Vec first.
401            let mut stream = query.fetch(&mut *tx);
402            while let Some(row_result) = stream.next().await {
403                let r = try_stream!(sender, row_result);
404
405                // Enforce the global log limit across all blocks.
406                total += 1;
407                if total > max_logs {
408                    let _ =
409                        sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
410                    return;
411                }
412
413                let log = log_from_row(&r);
414                let rpc_log = RpcLog {
415                    inner: log,
416                    block_hash: Some(r.get(COL_BLOCK_HASH)),
417                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
418                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
419                    transaction_hash: Some(r.get(COL_TX_HASH)),
420                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
421                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
422                    removed: false,
423                };
424                // Send the log to the caller. The timeout ensures we
425                // stop if the deadline passes while back-pressured.
426                match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
427                    Ok(Ok(())) => {}
428                    Ok(Err(_)) => return, // receiver dropped
429                    Err(_) => {
430                        let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
431                        return;
432                    }
433                }
434            }
435
436            // Early exit if we've already hit the limit — no need to
437            // query the next block.
438            if total >= max_logs {
439                return;
440            }
441        }
442    }
443}
444
445// ============================================================================
446// Row → domain type conversion (read path)
447// ============================================================================
448
449/// Extract a required BLOB column from a row as a borrowed slice.
450fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
451    r.get(col)
452}
453
454/// Extract an optional BLOB column from a row as a borrowed slice.
455fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
456    r.get(col)
457}
458
459/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
460fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
461    Ok(Header {
462        parent_hash: r.get(COL_PARENT_HASH),
463        ommers_hash: r.get(COL_OMMERS_HASH),
464        beneficiary: r.get(COL_BENEFICIARY),
465        state_root: r.get(COL_STATE_ROOT),
466        transactions_root: r.get(COL_TRANSACTIONS_ROOT),
467        receipts_root: r.get(COL_RECEIPTS_ROOT),
468        logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
469        difficulty: r.get(COL_DIFFICULTY),
470        number: from_i64(r.get(COL_BLOCK_NUMBER)),
471        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
472        gas_used: from_i64(r.get(COL_GAS_USED)),
473        timestamp: from_i64(r.get(COL_TIMESTAMP)),
474        extra_data: r.get(COL_EXTRA_DATA),
475        mix_hash: r.get(COL_MIX_HASH),
476        nonce: r.get(COL_NONCE),
477        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
478        withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
479        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
480        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
481        parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
482        requests_hash: r.get(COL_REQUESTS_HASH),
483    })
484}
485
486/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
487fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
488    use alloy::consensus::EthereumTxEnvelope;
489
490    let sig =
491        Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
492
493    let tx_type_raw: i32 = r.get(COL_TX_TYPE);
494    let tx_type_u8: u8 = tx_type_raw
495        .try_into()
496        .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
497    let tx_type = TxType::try_from(tx_type_u8)
498        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
499
500    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
501    let nonce = from_i64(r.get(COL_NONCE));
502    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
503    let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
504    let value: U256 = r.get(COL_VALUE);
505    let input: Bytes = r.get(COL_INPUT);
506
507    match tx_type {
508        TxType::Legacy => {
509            let tx = TxLegacy {
510                chain_id: chain_id.map(from_i64),
511                nonce,
512                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
513                gas_limit,
514                to: to_addr.map_or(TxKind::Create, TxKind::Call),
515                value,
516                input,
517            };
518            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
519        }
520        TxType::Eip2930 => {
521            let tx = TxEip2930 {
522                chain_id: from_i64(
523                    chain_id
524                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
525                ),
526                nonce,
527                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
528                gas_limit,
529                to: to_addr.map_or(TxKind::Create, TxKind::Call),
530                value,
531                input,
532                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
533            };
534            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
535        }
536        TxType::Eip1559 => {
537            let tx = TxEip1559 {
538                chain_id: from_i64(
539                    chain_id
540                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
541                ),
542                nonce,
543                gas_limit,
544                max_fee_per_gas: decode_u128_required(
545                    opt_blob(r, COL_MAX_FEE_PER_GAS),
546                    COL_MAX_FEE_PER_GAS,
547                )?,
548                max_priority_fee_per_gas: decode_u128_required(
549                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
550                    COL_MAX_PRIORITY_FEE_PER_GAS,
551                )?,
552                to: to_addr.map_or(TxKind::Create, TxKind::Call),
553                value,
554                input,
555                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
556            };
557            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
558        }
559        TxType::Eip4844 => {
560            let tx = TxEip4844 {
561                chain_id: from_i64(
562                    chain_id
563                        .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
564                ),
565                nonce,
566                gas_limit,
567                max_fee_per_gas: decode_u128_required(
568                    opt_blob(r, COL_MAX_FEE_PER_GAS),
569                    COL_MAX_FEE_PER_GAS,
570                )?,
571                max_priority_fee_per_gas: decode_u128_required(
572                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
573                    COL_MAX_PRIORITY_FEE_PER_GAS,
574                )?,
575                to: to_addr
576                    .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
577                value,
578                input,
579                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
580                blob_versioned_hashes: decode_b256_vec(
581                    opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
582                        SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
583                    })?,
584                )?,
585                max_fee_per_blob_gas: decode_u128_required(
586                    opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
587                    COL_MAX_FEE_PER_BLOB_GAS,
588                )?,
589            };
590            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
591        }
592        TxType::Eip7702 => {
593            let tx = TxEip7702 {
594                chain_id: from_i64(
595                    chain_id
596                        .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
597                ),
598                nonce,
599                gas_limit,
600                max_fee_per_gas: decode_u128_required(
601                    opt_blob(r, COL_MAX_FEE_PER_GAS),
602                    COL_MAX_FEE_PER_GAS,
603                )?,
604                max_priority_fee_per_gas: decode_u128_required(
605                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
606                    COL_MAX_PRIORITY_FEE_PER_GAS,
607                )?,
608                to: to_addr
609                    .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
610                value,
611                input,
612                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
613                authorization_list: decode_authorization_list(
614                    opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
615                        SqlColdError::Convert("EIP7702 requires authorization_list".into())
616                    })?,
617                )?,
618            };
619            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
620        }
621    }
622}
623
624/// Build a [`RecoveredTx`] from a row that includes `from_address`.
625fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
626    let sender: Address = r.get(COL_FROM_ADDRESS);
627    let tx = tx_from_row(r)?;
628    // SAFETY: the sender was recovered at append time and stored in from_address.
629    Ok(Recovered::new_unchecked(tx, sender))
630}
631
632/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
633fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
634    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
635        .into_iter()
636        .filter_map(|col| r.get::<Option<B256>, _>(col))
637        .collect();
638    Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
639}
640
641/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
642fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
643    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
644    let order = from_i64(r.get(COL_ORDER_INDEX));
645    let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
646
647    match event_type {
648        EVENT_TRANSACT => {
649            let sender: Address = r
650                .get::<Option<Address>, _>(COL_SENDER)
651                .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
652            let to: Address = r
653                .get::<Option<Address>, _>(COL_TO_ADDRESS)
654                .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
655            let value: U256 = r
656                .get::<Option<U256>, _>(COL_VALUE)
657                .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
658            let gas: U256 = r
659                .get::<Option<U256>, _>(COL_GAS)
660                .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
661            let max_fee: U256 = r
662                .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
663                .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
664            let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
665
666            Ok(DbSignetEvent::Transact(
667                order,
668                Transact {
669                    rollupChainId: rollup_chain_id,
670                    sender,
671                    to,
672                    value,
673                    gas,
674                    maxFeePerGas: max_fee,
675                    data,
676                },
677            ))
678        }
679        EVENT_ENTER => {
680            let recipient: Address = r
681                .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
682                .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
683            let amount: U256 = r
684                .get::<Option<U256>, _>(COL_AMOUNT)
685                .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
686
687            Ok(DbSignetEvent::Enter(
688                order,
689                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
690            ))
691        }
692        EVENT_ENTER_TOKEN => {
693            let token: Address = r
694                .get::<Option<Address>, _>(COL_TOKEN)
695                .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
696            let recipient: Address =
697                r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
698                    SqlColdError::Convert("EnterToken requires rollup_recipient".into())
699                })?;
700            let amount: U256 = r
701                .get::<Option<U256>, _>(COL_AMOUNT)
702                .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
703
704            Ok(DbSignetEvent::EnterToken(
705                order,
706                EnterToken {
707                    rollupChainId: rollup_chain_id,
708                    token,
709                    rollupRecipient: recipient,
710                    amount,
711                },
712            ))
713        }
714        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
715    }
716}
717
718/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
719fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
720    Ok(DbZenithHeader(Zenith::BlockHeader {
721        hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
722        rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
723        gasLimit: r.get(COL_GAS_LIMIT),
724        rewardAddress: r.get(COL_REWARD_ADDRESS),
725        blockDataHash: r.get(COL_BLOCK_DATA_HASH),
726    }))
727}
728
729// ============================================================================
730// Domain type → SQL INSERT (write path)
731// ============================================================================
732
733/// Write a single block's data into an open SQL transaction.
734async fn write_block_to_tx(
735    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
736    data: BlockData,
737) -> Result<(), SqlColdError> {
738    let bn = to_i64(data.block_number());
739
740    // Insert header
741    let block_hash = data.header.hash_slow();
742    let difficulty = &data.header.difficulty;
743    sqlx::query(
744        "INSERT INTO headers (
745            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
746            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
747            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
748            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
749            parent_beacon_block_root, requests_hash
750        ) VALUES (
751            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
752            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
753        )",
754    )
755    .bind(bn)
756    .bind(block_hash)
757    .bind(data.header.parent_hash)
758    .bind(data.header.ommers_hash)
759    .bind(data.header.beneficiary)
760    .bind(data.header.state_root)
761    .bind(data.header.transactions_root)
762    .bind(data.header.receipts_root)
763    .bind(data.header.logs_bloom)
764    .bind(difficulty)
765    .bind(to_i64(data.header.gas_limit))
766    .bind(to_i64(data.header.gas_used))
767    .bind(to_i64(data.header.timestamp))
768    .bind(&data.header.extra_data)
769    .bind(data.header.mix_hash)
770    .bind(data.header.nonce)
771    .bind(data.header.base_fee_per_gas.map(to_i64))
772    .bind(data.header.withdrawals_root.as_ref())
773    .bind(data.header.blob_gas_used.map(to_i64))
774    .bind(data.header.excess_blob_gas.map(to_i64))
775    .bind(data.header.parent_beacon_block_root.as_ref())
776    .bind(data.header.requests_hash.as_ref())
777    .execute(&mut **tx)
778    .await?;
779
780    // Insert transactions
781    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
782        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
783    }
784
785    // Insert receipts and logs, computing first_log_index as a running
786    // sum of log counts (same algorithm as the MDBX IndexedReceipt path).
787    let mut first_log_index = 0i64;
788    for (idx, receipt) in data.receipts.iter().enumerate() {
789        let tx_idx = to_i64(idx as u64);
790        sqlx::query(
791            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
792             VALUES ($1, $2, $3, $4, $5, $6)",
793        )
794        .bind(bn)
795        .bind(tx_idx)
796        .bind(receipt.tx_type as i32)
797        .bind(receipt.inner.status.coerce_status() as i32)
798        .bind(to_i64(receipt.inner.cumulative_gas_used))
799        .bind(first_log_index)
800        .execute(&mut **tx)
801        .await?;
802        first_log_index += receipt.inner.logs.len() as i64;
803
804        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
805            let topics = log.topics();
806            sqlx::query(
807                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
808                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
809            )
810            .bind(bn)
811            .bind(tx_idx)
812            .bind(to_i64(log_idx as u64))
813            .bind(log.address)
814            .bind(topics.first())
815            .bind(topics.get(1))
816            .bind(topics.get(2))
817            .bind(topics.get(3))
818            .bind(&log.data.data)
819            .execute(&mut **tx)
820            .await?;
821        }
822    }
823
824    // Insert signet events
825    for (idx, event) in data.signet_events.iter().enumerate() {
826        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
827    }
828
829    // Insert zenith header
830    if let Some(zh) = &data.zenith_header {
831        let h = &zh.0;
832        sqlx::query(
833            "INSERT INTO zenith_headers (
834                block_number, host_block_number, rollup_chain_id,
835                gas_limit, reward_address, block_data_hash
836            ) VALUES ($1, $2, $3, $4, $5, $6)",
837        )
838        .bind(bn)
839        .bind(h.hostBlockNumber)
840        .bind(h.rollupChainId)
841        .bind(h.gasLimit)
842        .bind(h.rewardAddress)
843        .bind(h.blockDataHash)
844        .execute(&mut **tx)
845        .await?;
846    }
847
848    Ok(())
849}
850
851/// Insert a transaction, binding directly from the source type.
852async fn insert_transaction(
853    conn: &mut sqlx::AnyConnection,
854    bn: i64,
855    tx_index: i64,
856    recovered: &RecoveredTx,
857) -> Result<(), SqlColdError> {
858    use alloy::consensus::EthereumTxEnvelope;
859
860    let sender = recovered.signer();
861    let tx: &TransactionSigned = recovered;
862    let tx_hash = tx.tx_hash();
863    let tx_type = tx.tx_type() as i32;
864
865    macro_rules! sig {
866        ($s:expr) => {{
867            let sig = $s.signature();
868            (sig.v() as i32, sig.r(), sig.s())
869        }};
870    }
871    let (sig_y, sig_r, sig_s) = match tx {
872        EthereumTxEnvelope::Legacy(s) => sig!(s),
873        EthereumTxEnvelope::Eip2930(s) => sig!(s),
874        EthereumTxEnvelope::Eip1559(s) => sig!(s),
875        EthereumTxEnvelope::Eip4844(s) => sig!(s),
876        EthereumTxEnvelope::Eip7702(s) => sig!(s),
877    };
878
879    let (chain_id, nonce, gas_limit) = match tx {
880        EthereumTxEnvelope::Legacy(s) => {
881            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
882        }
883        EthereumTxEnvelope::Eip2930(s) => {
884            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
885        }
886        EthereumTxEnvelope::Eip1559(s) => {
887            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
888        }
889        EthereumTxEnvelope::Eip4844(s) => {
890            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
891        }
892        EthereumTxEnvelope::Eip7702(s) => {
893            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
894        }
895    };
896
897    let (value, to_addr) = match tx {
898        EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
899        EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
900        EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
901        EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
902        EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
903    };
904
905    let input: &[u8] = match tx {
906        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
907        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
908        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
909        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
910        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
911    };
912
913    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
914        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
915        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
916        EthereumTxEnvelope::Eip1559(s) => (
917            None,
918            Some(encode_u128(s.tx().max_fee_per_gas)),
919            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
920            None,
921        ),
922        EthereumTxEnvelope::Eip4844(s) => (
923            None,
924            Some(encode_u128(s.tx().max_fee_per_gas)),
925            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
926            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
927        ),
928        EthereumTxEnvelope::Eip7702(s) => (
929            None,
930            Some(encode_u128(s.tx().max_fee_per_gas)),
931            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
932            None,
933        ),
934    };
935
936    let (access_list, blob_hashes, auth_list) = match tx {
937        EthereumTxEnvelope::Legacy(_) => (None, None, None),
938        EthereumTxEnvelope::Eip2930(s) => {
939            (Some(encode_access_list(&s.tx().access_list)), None, None)
940        }
941        EthereumTxEnvelope::Eip1559(s) => {
942            (Some(encode_access_list(&s.tx().access_list)), None, None)
943        }
944        EthereumTxEnvelope::Eip4844(s) => (
945            Some(encode_access_list(&s.tx().access_list)),
946            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
947            None,
948        ),
949        EthereumTxEnvelope::Eip7702(s) => (
950            Some(encode_access_list(&s.tx().access_list)),
951            None,
952            Some(encode_authorization_list(&s.tx().authorization_list)),
953        ),
954    };
955
956    sqlx::query(
957        "INSERT INTO transactions (
958            block_number, tx_index, tx_hash, tx_type,
959            sig_y_parity, sig_r, sig_s,
960            chain_id, nonce, gas_limit, to_address, value, input,
961            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
962            max_fee_per_blob_gas, blob_versioned_hashes,
963            access_list, authorization_list, from_address
964        ) VALUES (
965            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
966            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
967        )",
968    )
969    .bind(bn)
970    .bind(tx_index)
971    .bind(tx_hash)
972    .bind(tx_type)
973    .bind(sig_y)
974    .bind(sig_r)
975    .bind(sig_s)
976    .bind(chain_id)
977    .bind(nonce)
978    .bind(gas_limit)
979    .bind(to_addr)
980    .bind(value)
981    .bind(input)
982    .bind(gas_price.as_ref().map(|v| v.as_slice()))
983    .bind(max_fee.as_ref().map(|v| v.as_slice()))
984    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
985    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
986    .bind(blob_hashes.as_deref())
987    .bind(access_list.as_deref())
988    .bind(auth_list.as_deref())
989    .bind(sender)
990    .execute(&mut *conn)
991    .await?;
992
993    Ok(())
994}
995
996/// Insert a signet event, binding directly from the source type.
997async fn insert_signet_event(
998    conn: &mut sqlx::AnyConnection,
999    block_number: i64,
1000    event_index: i64,
1001    event: &DbSignetEvent,
1002) -> Result<(), SqlColdError> {
1003    let (event_type, order, chain_id) = match event {
1004        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
1005        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
1006        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
1007    };
1008
1009    let (value, gas, max_fee, amount) = match event {
1010        DbSignetEvent::Transact(_, t) => {
1011            (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
1012        }
1013        DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
1014        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
1015    };
1016
1017    sqlx::query(
1018        "INSERT INTO signet_events (
1019            block_number, event_index, event_type, order_index,
1020            rollup_chain_id, sender, to_address, value, gas,
1021            max_fee_per_gas, data, rollup_recipient, amount, token
1022        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1023    )
1024    .bind(block_number)
1025    .bind(event_index)
1026    .bind(event_type)
1027    .bind(order)
1028    .bind(chain_id)
1029    .bind(match event {
1030        DbSignetEvent::Transact(_, t) => Some(&t.sender),
1031        _ => None,
1032    })
1033    .bind(match event {
1034        DbSignetEvent::Transact(_, t) => Some(&t.to),
1035        _ => None,
1036    })
1037    .bind(value)
1038    .bind(gas)
1039    .bind(max_fee)
1040    .bind(match event {
1041        DbSignetEvent::Transact(_, t) => Some(&t.data),
1042        _ => None,
1043    })
1044    .bind(match event {
1045        DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
1046        DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
1047        _ => None,
1048    })
1049    .bind(amount)
1050    .bind(match event {
1051        DbSignetEvent::EnterToken(_, e) => Some(&e.token),
1052        _ => None,
1053    })
1054    .execute(&mut *conn)
1055    .await?;
1056
1057    Ok(())
1058}
1059
1060// ============================================================================
1061// Log filter helpers
1062// ============================================================================
1063
1064/// Append a SQL filter clause for a set of byte-encoded values.
1065///
1066/// For a single value, generates ` AND {column} = ${idx}`.
1067/// For multiple values, generates ` AND {column} IN (${idx}, ...)`.
1068/// Returns the next available parameter index.
1069fn append_filter_clause<'a>(
1070    clause: &mut String,
1071    params: &mut Vec<&'a [u8]>,
1072    mut idx: u32,
1073    column: &str,
1074    values: impl ExactSizeIterator<Item = &'a [u8]>,
1075) -> u32 {
1076    use std::fmt::Write;
1077
1078    let len = values.len();
1079    if len == 1 {
1080        write!(clause, " AND {column} = ${idx}").unwrap();
1081        values.for_each(|v| params.push(v));
1082        return idx + 1;
1083    }
1084    write!(clause, " AND {column} IN (").unwrap();
1085    for (i, v) in values.enumerate() {
1086        if i > 0 {
1087            clause.push_str(", ");
1088        }
1089        write!(clause, "${idx}").unwrap();
1090        params.push(v);
1091        idx += 1;
1092    }
1093    clause.push(')');
1094    idx
1095}
1096
1097fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
1098    let mut clause = String::new();
1099    let mut params: Vec<&[u8]> = Vec::new();
1100    let mut idx = start_idx;
1101
1102    if !filter.address.is_empty() {
1103        idx = append_filter_clause(
1104            &mut clause,
1105            &mut params,
1106            idx,
1107            "l.address",
1108            filter.address.iter().map(|a| a.as_slice()),
1109        );
1110    }
1111
1112    let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1113    for (i, topic_filter) in filter.topics.iter().enumerate() {
1114        if topic_filter.is_empty() {
1115            continue;
1116        }
1117        idx = append_filter_clause(
1118            &mut clause,
1119            &mut params,
1120            idx,
1121            topic_cols[i],
1122            topic_filter.iter().map(|v| v.as_slice()),
1123        );
1124    }
1125
1126    (clause, params)
1127}
1128
1129// ============================================================================
1130// ColdStorageBackend implementation
1131// ============================================================================
1132
1133impl ColdStorageRead for SqlColdBackend {
1134    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1135        let Some(block_num) = self.resolve_header_spec(spec).await? else {
1136            return Ok(None);
1137        };
1138        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1139    }
1140
1141    async fn get_headers(
1142        &self,
1143        specs: Vec<HeaderSpecifier>,
1144    ) -> ColdResult<Vec<Option<SealedHeader>>> {
1145        let mut results = Vec::with_capacity(specs.len());
1146        for spec in specs {
1147            let header = self.get_header(spec).await?;
1148            results.push(header);
1149        }
1150        Ok(results)
1151    }
1152
1153    async fn get_transaction(
1154        &self,
1155        spec: TransactionSpecifier,
1156    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1157        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1158        let row = match spec {
1159            TransactionSpecifier::Hash(hash) => sqlx::query(
1160                "SELECT t.*, h.block_hash
1161                     FROM transactions t
1162                     JOIN headers h ON t.block_number = h.block_number
1163                     WHERE t.tx_hash = $1",
1164            )
1165            .bind(hash)
1166            .fetch_optional(&mut *tx)
1167            .await
1168            .map_err(SqlColdError::from)?,
1169            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1170                "SELECT t.*, h.block_hash
1171                     FROM transactions t
1172                     JOIN headers h ON t.block_number = h.block_number
1173                     WHERE t.block_number = $1 AND t.tx_index = $2",
1174            )
1175            .bind(to_i64(block))
1176            .bind(to_i64(index))
1177            .fetch_optional(&mut *tx)
1178            .await
1179            .map_err(SqlColdError::from)?,
1180            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1181                "SELECT t.*, h.block_hash
1182                     FROM transactions t
1183                     JOIN headers h ON t.block_number = h.block_number
1184                     WHERE h.block_hash = $1 AND t.tx_index = $2",
1185            )
1186            .bind(block_hash)
1187            .bind(to_i64(index))
1188            .fetch_optional(&mut *tx)
1189            .await
1190            .map_err(SqlColdError::from)?,
1191        };
1192        tx.commit().await.map_err(SqlColdError::from)?;
1193
1194        let Some(r) = row else {
1195            return Ok(None);
1196        };
1197
1198        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1199        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1200        let block_hash = r.get(COL_BLOCK_HASH);
1201        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1202        let meta = ConfirmationMeta::new(block, block_hash, index);
1203        Ok(Some(Confirmed::new(recovered, meta)))
1204    }
1205
1206    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1207        let bn = to_i64(block);
1208        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1209        let rows =
1210            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1211                .bind(bn)
1212                .fetch_all(&mut *tx)
1213                .await
1214                .map_err(SqlColdError::from)?;
1215        tx.commit().await.map_err(SqlColdError::from)?;
1216
1217        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1218    }
1219
1220    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1221        let bn = to_i64(block);
1222        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1223        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1224            .bind(bn)
1225            .fetch_one(&mut *tx)
1226            .await
1227            .map_err(SqlColdError::from)?;
1228        tx.commit().await.map_err(SqlColdError::from)?;
1229
1230        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1231    }
1232
1233    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1234        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1235        // Resolve to (block, index)
1236        let (block, index) = match spec {
1237            ReceiptSpecifier::TxHash(hash) => {
1238                let row = sqlx::query(
1239                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1240                )
1241                .bind(hash)
1242                .fetch_optional(&mut *tx)
1243                .await
1244                .map_err(SqlColdError::from)?;
1245                let Some(r) = row else { return Ok(None) };
1246                (
1247                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1248                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1249                )
1250            }
1251            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1252        };
1253
1254        // Combined query: receipt + tx metadata + full header + prior gas.
1255        // Header columns use standard names (h.*) so header_from_row works.
1256        // Receipt/tx columns use r_ prefix to avoid name collisions.
1257        let combined = sqlx::query(
1258            "SELECT h.*, \
1259               r.tx_type AS r_tx_type, r.success AS r_success, \
1260               r.cumulative_gas_used AS r_cumulative_gas_used, \
1261               r.first_log_index AS r_first_log_index, \
1262               t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1263               COALESCE( \
1264                 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1265                  FROM receipts r2 \
1266                  WHERE r2.block_number = r.block_number \
1267                    AND r2.tx_index < r.tx_index), \
1268                 0 \
1269               ) AS prior_gas \
1270             FROM receipts r \
1271             JOIN transactions t ON r.block_number = t.block_number \
1272               AND r.tx_index = t.tx_index \
1273             JOIN headers h ON r.block_number = h.block_number \
1274             WHERE r.block_number = $1 AND r.tx_index = $2",
1275        )
1276        .bind(to_i64(block))
1277        .bind(to_i64(index))
1278        .fetch_optional(&mut *tx)
1279        .await
1280        .map_err(SqlColdError::from)?;
1281
1282        let Some(rr) = combined else {
1283            return Ok(None);
1284        };
1285
1286        // Extract header using existing helper (h.* columns are unaliased).
1287        let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1288
1289        // Extract receipt fields from r_ prefixed aliases.
1290        let tx_hash = rr.get(COL_R_TX_HASH);
1291        let sender = rr.get(COL_R_FROM_ADDRESS);
1292        let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1293        let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1294        let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1295        let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1296        let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1297
1298        // Logs still require a separate query (variable row count).
1299        let log_rows = sqlx::query(
1300            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1301        )
1302        .bind(to_i64(block))
1303        .bind(to_i64(index))
1304        .fetch_all(&mut *tx)
1305        .await
1306        .map_err(SqlColdError::from)?;
1307        tx.commit().await.map_err(SqlColdError::from)?;
1308
1309        let logs = log_rows.iter().map(log_from_row).collect();
1310        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1311            .map_err(ColdStorageError::from)?;
1312        // Cumulative gas must be non-decreasing within a block; a violation
1313        // indicates database corruption.
1314        debug_assert!(
1315            built.inner.cumulative_gas_used >= prior_cumulative_gas,
1316            "cumulative gas decreased: {} < {}",
1317            built.inner.cumulative_gas_used,
1318            prior_cumulative_gas,
1319        );
1320        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1321
1322        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1323        Ok(Some(ColdReceipt::new(ir, &header, index)))
1324    }
1325
1326    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1327        let Some(header) =
1328            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1329        else {
1330            return Ok(Vec::new());
1331        };
1332
1333        let bn = to_i64(block);
1334        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1335
1336        // Fetch receipts joined with tx_hash and from_address
1337        let receipt_rows = sqlx::query(
1338            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1339               r.cumulative_gas_used, r.first_log_index, \
1340               t.tx_hash, t.from_address \
1341             FROM receipts r \
1342             JOIN transactions t ON r.block_number = t.block_number \
1343               AND r.tx_index = t.tx_index \
1344             WHERE r.block_number = $1 \
1345             ORDER BY r.tx_index",
1346        )
1347        .bind(bn)
1348        .fetch_all(&mut *tx)
1349        .await
1350        .map_err(SqlColdError::from)?;
1351
1352        let all_log_rows =
1353            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1354                .bind(bn)
1355                .fetch_all(&mut *tx)
1356                .await
1357                .map_err(SqlColdError::from)?;
1358        tx.commit().await.map_err(SqlColdError::from)?;
1359
1360        // Group logs by tx_index
1361        let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1362        for r in &all_log_rows {
1363            let tx_idx: i64 = r.get(COL_TX_INDEX);
1364            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1365        }
1366
1367        let mut first_log_index = 0u64;
1368        let mut prior_cumulative_gas = 0u64;
1369        receipt_rows
1370            .into_iter()
1371            .enumerate()
1372            .map(|(idx, rr)| {
1373                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1374                let tx_hash = rr.get(COL_TX_HASH);
1375                let sender = rr.get(COL_FROM_ADDRESS);
1376                let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1377                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1378                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1379                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1380                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1381                    .map_err(ColdStorageError::from)?;
1382                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1383                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1384                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1385                first_log_index += ir.receipt.inner.logs.len() as u64;
1386                Ok(ColdReceipt::new(ir, &header, idx as u64))
1387            })
1388            .collect()
1389    }
1390
1391    async fn get_signet_events(
1392        &self,
1393        spec: SignetEventsSpecifier,
1394    ) -> ColdResult<Vec<DbSignetEvent>> {
1395        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1396        let rows = match spec {
1397            SignetEventsSpecifier::Block(block) => {
1398                let bn = to_i64(block);
1399                sqlx::query(
1400                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1401                )
1402                .bind(bn)
1403                .fetch_all(&mut *tx)
1404                .await
1405                .map_err(SqlColdError::from)?
1406            }
1407            SignetEventsSpecifier::BlockRange { start, end } => {
1408                let s = to_i64(start);
1409                let e = to_i64(end);
1410                sqlx::query(
1411                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1412                     ORDER BY block_number, event_index",
1413                )
1414                .bind(s)
1415                .bind(e)
1416                .fetch_all(&mut *tx)
1417                .await
1418                .map_err(SqlColdError::from)?
1419            }
1420        };
1421        tx.commit().await.map_err(SqlColdError::from)?;
1422
1423        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1424    }
1425
1426    async fn get_zenith_header(
1427        &self,
1428        spec: ZenithHeaderSpecifier,
1429    ) -> ColdResult<Option<DbZenithHeader>> {
1430        let block = match spec {
1431            ZenithHeaderSpecifier::Number(n) => n,
1432            ZenithHeaderSpecifier::Range { start, .. } => start,
1433        };
1434        let bn = to_i64(block);
1435        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1436        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1437            .bind(bn)
1438            .fetch_optional(&mut *tx)
1439            .await
1440            .map_err(SqlColdError::from)?;
1441        tx.commit().await.map_err(SqlColdError::from)?;
1442
1443        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1444    }
1445
1446    async fn get_zenith_headers(
1447        &self,
1448        spec: ZenithHeaderSpecifier,
1449    ) -> ColdResult<Vec<DbZenithHeader>> {
1450        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1451        let rows = match spec {
1452            ZenithHeaderSpecifier::Number(n) => {
1453                let bn = to_i64(n);
1454                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1455                    .bind(bn)
1456                    .fetch_all(&mut *tx)
1457                    .await
1458                    .map_err(SqlColdError::from)?
1459            }
1460            ZenithHeaderSpecifier::Range { start, end } => {
1461                let s = to_i64(start);
1462                let e = to_i64(end);
1463                sqlx::query(
1464                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1465                     ORDER BY block_number",
1466                )
1467                .bind(s)
1468                .bind(e)
1469                .fetch_all(&mut *tx)
1470                .await
1471                .map_err(SqlColdError::from)?
1472            }
1473        };
1474        tx.commit().await.map_err(SqlColdError::from)?;
1475
1476        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1477    }
1478
1479    async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1480        let from = filter.get_from_block().unwrap_or(0);
1481        let to = filter.get_to_block().unwrap_or(u64::MAX);
1482
1483        // Build WHERE clause: block range ($1, $2) + address/topic filters.
1484        // ENG-2036: dynamic filter clause requires format!(); mitigated by
1485        // sqlx prepared-statement cache.
1486        let (filter_clause, params) = build_log_filter_clause(filter, 3);
1487        let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1488
1489        // Use LIMIT to cap results. Fetch one extra row to detect overflow
1490        // without a separate COUNT query. PostgreSQL stops scanning after
1491        // finding enough rows, making this faster than COUNT in both the
1492        // under-limit and over-limit cases.
1493        let limit_idx = 3 + params.len() as u32;
1494        let data_sql = format!(
1495            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1496               (r.first_log_index + l.log_index) AS block_log_index \
1497             FROM logs l \
1498             JOIN headers h ON l.block_number = h.block_number \
1499             JOIN transactions t ON l.block_number = t.block_number \
1500               AND l.tx_index = t.tx_index \
1501             JOIN receipts r ON l.block_number = r.block_number \
1502               AND l.tx_index = r.tx_index \
1503             WHERE {where_clause} \
1504             ORDER BY l.block_number, l.tx_index, l.log_index \
1505             LIMIT ${limit_idx}"
1506        );
1507        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1508        for param in &params {
1509            query = query.bind(*param);
1510        }
1511        // Clamp to i64::MAX before converting: SQL LIMIT is an i64, and
1512        // saturating_add avoids overflow when max_logs is usize::MAX.
1513        let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1514        query = query.bind(to_i64(limit as u64));
1515
1516        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1517        let rows = query.fetch_all(&mut *tx).await.map_err(SqlColdError::from)?;
1518        tx.commit().await.map_err(SqlColdError::from)?;
1519
1520        if rows.len() > max_logs {
1521            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1522        }
1523
1524        rows.into_iter()
1525            .map(|r| {
1526                let log = log_from_row(&r);
1527                Ok(RpcLog {
1528                    inner: log,
1529                    block_hash: Some(r.get(COL_BLOCK_HASH)),
1530                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1531                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1532                    transaction_hash: Some(r.get(COL_TX_HASH)),
1533                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1534                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1535                    removed: false,
1536                })
1537            })
1538            .collect::<ColdResult<Vec<_>>>()
1539    }
1540
1541    async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1542        #[cfg(feature = "postgres")]
1543        if self.is_postgres {
1544            return self.produce_log_stream_pg(filter, params).await;
1545        }
1546        signet_cold::produce_log_stream_default(self, filter, params).await;
1547    }
1548
1549    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1550        let mut tx = self.begin_read().await.map_err(ColdStorageError::from)?;
1551        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1552            .fetch_one(&mut *tx)
1553            .await
1554            .map_err(SqlColdError::from)?;
1555        tx.commit().await.map_err(SqlColdError::from)?;
1556        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1557    }
1558}
1559
1560impl ColdStorageWrite for SqlColdBackend {
1561    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1562        self.insert_block(data).await.map_err(ColdStorageError::from)
1563    }
1564
1565    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1566        let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1567        for block_data in data {
1568            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1569        }
1570        tx.commit().await.map_err(SqlColdError::from)?;
1571        Ok(())
1572    }
1573
1574    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1575        let bn = to_i64(block);
1576        let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1577
1578        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1579
1580        tx.commit().await.map_err(SqlColdError::from)?;
1581        Ok(())
1582    }
1583}
1584
1585impl ColdStorageBackend for SqlColdBackend {
1586    fn read_timeout(&self) -> Option<Duration> {
1587        Some(self.read_timeout)
1588    }
1589
1590    fn write_timeout(&self) -> Option<Duration> {
1591        Some(self.write_timeout)
1592    }
1593
1594    async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1595        let bn = to_i64(block);
1596        let mut tx = self.begin_write().await.map_err(ColdStorageError::from)?;
1597
1598        // 1. Fetch all headers above block.
1599        let header_rows =
1600            sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1601                .bind(bn)
1602                .fetch_all(&mut *tx)
1603                .await
1604                .map_err(SqlColdError::from)?;
1605
1606        if header_rows.is_empty() {
1607            tx.commit().await.map_err(SqlColdError::from)?;
1608            return Ok(Vec::new());
1609        }
1610
1611        let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1612        for r in &header_rows {
1613            let num: i64 = r.get(COL_BLOCK_NUMBER);
1614            let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1615            headers.insert(num, h);
1616        }
1617
1618        // 2. Fetch all receipt + tx metadata above block.
1619        let receipt_rows = sqlx::query(
1620            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1621               r.cumulative_gas_used, r.first_log_index, \
1622               t.tx_hash, t.from_address \
1623             FROM receipts r \
1624             JOIN transactions t ON r.block_number = t.block_number \
1625               AND r.tx_index = t.tx_index \
1626             WHERE r.block_number > $1 \
1627             ORDER BY r.block_number, r.tx_index",
1628        )
1629        .bind(bn)
1630        .fetch_all(&mut *tx)
1631        .await
1632        .map_err(SqlColdError::from)?;
1633
1634        // 3. Fetch all logs above block.
1635        let log_rows = sqlx::query(
1636            "SELECT * FROM logs WHERE block_number > $1 \
1637             ORDER BY block_number, tx_index, log_index",
1638        )
1639        .bind(bn)
1640        .fetch_all(&mut *tx)
1641        .await
1642        .map_err(SqlColdError::from)?;
1643
1644        // Group logs by (block_number, tx_index).
1645        let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1646        for r in &log_rows {
1647            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1648            let tx_idx: i64 = r.get(COL_TX_INDEX);
1649            logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1650        }
1651
1652        // Group receipt rows by block_number.
1653        let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1654        for r in &receipt_rows {
1655            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1656            receipts_by_block.entry(block_num).or_default().push(r);
1657        }
1658
1659        // 4. Assemble ColdReceipts per block.
1660        let mut all_receipts = Vec::with_capacity(headers.len());
1661        for (&block_num, header) in &headers {
1662            let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1663            let mut prior_cumulative_gas = 0u64;
1664            let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1665                .into_iter()
1666                .map(|rr: &sqlx::any::AnyRow| {
1667                    let tx_idx: i64 = rr.get(COL_TX_INDEX);
1668                    let tx_hash = rr.get(COL_TX_HASH);
1669                    let sender = rr.get(COL_FROM_ADDRESS);
1670                    let tx_type: i32 = rr.get(COL_TX_TYPE);
1671                    let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1672                    let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1673                    let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1674                    let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1675                    let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1676                        .map_err(ColdStorageError::from)?;
1677                    // Cumulative gas must be non-decreasing within a block; a violation
1678                    // indicates database corruption.
1679                    debug_assert!(
1680                        receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1681                        "cumulative gas decreased: {} < {}",
1682                        receipt.inner.cumulative_gas_used,
1683                        prior_cumulative_gas,
1684                    );
1685                    let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1686                    prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1687                    let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1688                    Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1689                })
1690                .collect();
1691            all_receipts.push(block_receipts?);
1692        }
1693
1694        // 5. Delete from all tables (same order as truncate_above).
1695        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1696
1697        tx.commit().await.map_err(SqlColdError::from)?;
1698        Ok(all_receipts)
1699    }
1700}
1701
1702#[cfg(all(test, feature = "test-utils"))]
1703mod tests {
1704    use super::*;
1705    use signet_cold::conformance::conformance;
1706
1707    #[tokio::test]
1708    async fn sqlite_conformance() {
1709        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1710        conformance(backend).await.unwrap();
1711    }
1712
1713    #[tokio::test]
1714    async fn pg_conformance() {
1715        let Ok(url) = std::env::var("DATABASE_URL") else {
1716            eprintln!("skipping pg conformance: DATABASE_URL not set");
1717            return;
1718        };
1719        let backend = SqlColdBackend::connect(&url).await.unwrap();
1720        conformance(backend).await.unwrap();
1721    }
1722}