Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14    COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15    COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16    COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17    COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18    COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19    COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20    COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21    COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22    COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26    decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27    encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30    consensus::{
31        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32        transaction::Recovered,
33    },
34    primitives::{
35        Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36    },
37};
38use signet_cold::{
39    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40    ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41    SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45    TransactionSigned,
46};
47use signet_zenith::{
48    Passage::{Enter, EnterToken},
49    Transactor::Transact,
50    Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55/// SQL-based cold storage backend.
56///
57/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
58/// PostgreSQL and SQLite through a single implementation. The backend
59/// is determined by the connection URL at construction time.
60///
61/// # Example
62///
63/// ```no_run
64/// # async fn example() {
65/// use signet_cold_sql::SqlColdBackend;
66///
67/// // SQLite (in-memory)
68/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
69///
70/// // PostgreSQL
71/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
72/// # }
73/// ```
74#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76    pool: AnyPool,
77    is_postgres: bool,
78}
79
80/// Returns `true` if the URL refers to an in-memory SQLite database.
81///
82/// In-memory SQLite databases are per-connection, so the pool must be
83/// limited to a single connection to ensure all queries see the same
84/// state.
85fn is_in_memory_sqlite(url: &str) -> bool {
86    url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89/// Delete all rows with `block_number > bn` from every table, in
90/// child-first order to preserve foreign-key constraints.
91async fn delete_above_in_tx(
92    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93    bn: i64,
94) -> Result<(), SqlColdError> {
95    for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96    {
97        sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98            .bind(bn)
99            .execute(&mut **tx)
100            .await?;
101    }
102    Ok(())
103}
104
105impl SqlColdBackend {
106    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
107    ///
108    /// Auto-detects the database backend and creates all tables if they
109    /// do not already exist. Callers must ensure
110    /// [`sqlx::any::install_default_drivers`] has been called before
111    /// constructing the pool.
112    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113        // Detect backend from a pooled connection.
114        let conn = pool.acquire().await?;
115        let backend = conn.backend_name();
116
117        let (migration, is_postgres) = match backend {
118            "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
119            "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
120            other => {
121                return Err(SqlColdError::Convert(format!(
122                    "unsupported database backend: {other}"
123                )));
124            }
125        };
126        drop(conn);
127        // Execute via pool to ensure the migration uses the same
128        // connection that subsequent queries will use.
129        sqlx::raw_sql(migration).execute(&pool).await?;
130        sqlx::raw_sql(include_str!("../migrations/002_add_topic_indexes.sql"))
131            .execute(&pool)
132            .await?;
133        Ok(Self { pool, is_postgres })
134    }
135
136    /// Connect to a database URL with explicit pool options.
137    ///
138    /// Installs the default sqlx drivers on the first call. The database
139    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
140    ///
141    /// For in-memory SQLite URLs (`sqlite::memory:` or `mode=memory`),
142    /// `max_connections` is forced to 1 regardless of the provided
143    /// options, because in-memory databases are per-connection.
144    pub async fn connect_with(
145        url: &str,
146        pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
147    ) -> Result<Self, SqlColdError> {
148        sqlx::any::install_default_drivers();
149        let pool_opts =
150            if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
151        let pool: AnyPool = pool_opts.connect(url).await?;
152        Self::new(pool).await
153    }
154
155    /// Connect to a database URL with default pool settings.
156    ///
157    /// Convenience wrapper around [`connect_with`](Self::connect_with).
158    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
159        Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
160    }
161
162    // ========================================================================
163    // Specifier resolution
164    // ========================================================================
165
166    async fn resolve_header_spec(
167        &self,
168        spec: HeaderSpecifier,
169    ) -> Result<Option<BlockNumber>, SqlColdError> {
170        match spec {
171            HeaderSpecifier::Number(n) => Ok(Some(n)),
172            HeaderSpecifier::Hash(hash) => {
173                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
174                    .bind(hash)
175                    .fetch_optional(&self.pool)
176                    .await?;
177                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
178            }
179        }
180    }
181
182    // ========================================================================
183    // Read helpers
184    // ========================================================================
185
186    async fn fetch_header_by_number(
187        &self,
188        block_num: BlockNumber,
189    ) -> Result<Option<SealedHeader>, SqlColdError> {
190        let bn = to_i64(block_num);
191        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
192            .bind(bn)
193            .fetch_optional(&self.pool)
194            .await?;
195
196        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
197    }
198
199    // ========================================================================
200    // Write helpers
201    // ========================================================================
202
203    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
204        let mut tx = self.pool.begin().await?;
205        write_block_to_tx(&mut tx, data).await?;
206        tx.commit().await?;
207        Ok(())
208    }
209
210    // ========================================================================
211    // Streaming helpers
212    // ========================================================================
213
214    /// Stream logs using a PostgreSQL REPEATABLE READ transaction.
215    ///
216    /// The transaction provides a consistent snapshot across all per-block
217    /// queries, eliminating the need for anchor-hash reorg detection.
218    /// Rows are streamed individually rather than materialised per block.
219    #[cfg(feature = "postgres")]
220    async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
221        use tokio_stream::StreamExt;
222
223        /// Unwrap a `Result` or send the error through the stream and return.
224        macro_rules! try_stream {
225            ($sender:expr, $expr:expr) => {
226                match $expr {
227                    Ok(v) => v,
228                    Err(e) => {
229                        let _ = $sender
230                            .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
231                            .await;
232                        return;
233                    }
234                }
235            };
236        }
237
238        let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
239
240        // Open a REPEATABLE READ transaction so all per-block queries see a
241        // consistent snapshot. This makes reorg detection unnecessary — if a
242        // reorg lands mid-stream the transaction still reads the old data.
243        let mut tx = try_stream!(sender, self.pool.begin().await);
244        try_stream!(
245            sender,
246            sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
247        );
248
249        // Build the parameterised query once. $1 is the block number
250        // (bound per iteration); remaining parameters are the address
251        // and topic filters from the user's request.
252        //
253        // ENG-2036: format!() allocates here, but the dynamic filter clause
254        // makes this unavoidable. sqlx's prepared-statement cache mitigates
255        // repeated execution cost.
256        let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
257        let data_sql = format!(
258            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
259               (r.first_log_index + l.log_index) AS block_log_index \
260             FROM logs l \
261             JOIN headers h ON l.block_number = h.block_number \
262             JOIN transactions t ON l.block_number = t.block_number \
263               AND l.tx_index = t.tx_index \
264             JOIN receipts r ON l.block_number = r.block_number \
265               AND l.tx_index = r.tx_index \
266             WHERE l.block_number = $1{filter_clause} \
267             ORDER BY l.tx_index, l.log_index"
268        );
269
270        let mut total = 0usize;
271
272        // Walk through blocks one at a time, streaming matching log rows
273        // from each block directly to the channel.
274        for block_num in from..=to {
275            // Check the deadline before starting each block so we
276            // don't begin a new query after the caller's timeout.
277            if tokio::time::Instant::now() > deadline {
278                let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
279                return;
280            }
281
282            let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
283            for param in &filter_params {
284                query = query.bind(*param);
285            }
286
287            // Stream rows from this block's query. Each row is converted
288            // to an RpcLog and sent over the channel immediately rather
289            // than being collected into a Vec first.
290            let mut stream = query.fetch(&mut *tx);
291            while let Some(row_result) = stream.next().await {
292                let r = try_stream!(sender, row_result);
293
294                // Enforce the global log limit across all blocks.
295                total += 1;
296                if total > max_logs {
297                    let _ =
298                        sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
299                    return;
300                }
301
302                let log = log_from_row(&r);
303                let rpc_log = RpcLog {
304                    inner: log,
305                    block_hash: Some(r.get(COL_BLOCK_HASH)),
306                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
307                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
308                    transaction_hash: Some(r.get(COL_TX_HASH)),
309                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
310                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
311                    removed: false,
312                };
313                // Send the log to the caller. The timeout ensures we
314                // stop if the deadline passes while back-pressured.
315                match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
316                    Ok(Ok(())) => {}
317                    Ok(Err(_)) => return, // receiver dropped
318                    Err(_) => {
319                        let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
320                        return;
321                    }
322                }
323            }
324
325            // Early exit if we've already hit the limit — no need to
326            // query the next block.
327            if total >= max_logs {
328                return;
329            }
330        }
331    }
332}
333
334// ============================================================================
335// Row → domain type conversion (read path)
336// ============================================================================
337
338/// Extract a required BLOB column from a row as a borrowed slice.
339fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
340    r.get(col)
341}
342
343/// Extract an optional BLOB column from a row as a borrowed slice.
344fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
345    r.get(col)
346}
347
348/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
349fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
350    Ok(Header {
351        parent_hash: r.get(COL_PARENT_HASH),
352        ommers_hash: r.get(COL_OMMERS_HASH),
353        beneficiary: r.get(COL_BENEFICIARY),
354        state_root: r.get(COL_STATE_ROOT),
355        transactions_root: r.get(COL_TRANSACTIONS_ROOT),
356        receipts_root: r.get(COL_RECEIPTS_ROOT),
357        logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
358        difficulty: r.get(COL_DIFFICULTY),
359        number: from_i64(r.get(COL_BLOCK_NUMBER)),
360        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
361        gas_used: from_i64(r.get(COL_GAS_USED)),
362        timestamp: from_i64(r.get(COL_TIMESTAMP)),
363        extra_data: r.get(COL_EXTRA_DATA),
364        mix_hash: r.get(COL_MIX_HASH),
365        nonce: r.get(COL_NONCE),
366        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
367        withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
368        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
369        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
370        parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
371        requests_hash: r.get(COL_REQUESTS_HASH),
372    })
373}
374
375/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
376fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
377    use alloy::consensus::EthereumTxEnvelope;
378
379    let sig =
380        Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
381
382    let tx_type_raw: i32 = r.get(COL_TX_TYPE);
383    let tx_type_u8: u8 = tx_type_raw
384        .try_into()
385        .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
386    let tx_type = TxType::try_from(tx_type_u8)
387        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
388
389    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
390    let nonce = from_i64(r.get(COL_NONCE));
391    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
392    let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
393    let value: U256 = r.get(COL_VALUE);
394    let input: Bytes = r.get(COL_INPUT);
395
396    match tx_type {
397        TxType::Legacy => {
398            let tx = TxLegacy {
399                chain_id: chain_id.map(from_i64),
400                nonce,
401                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
402                gas_limit,
403                to: to_addr.map_or(TxKind::Create, TxKind::Call),
404                value,
405                input,
406            };
407            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
408        }
409        TxType::Eip2930 => {
410            let tx = TxEip2930 {
411                chain_id: from_i64(
412                    chain_id
413                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
414                ),
415                nonce,
416                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
417                gas_limit,
418                to: to_addr.map_or(TxKind::Create, TxKind::Call),
419                value,
420                input,
421                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
422            };
423            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
424        }
425        TxType::Eip1559 => {
426            let tx = TxEip1559 {
427                chain_id: from_i64(
428                    chain_id
429                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
430                ),
431                nonce,
432                gas_limit,
433                max_fee_per_gas: decode_u128_required(
434                    opt_blob(r, COL_MAX_FEE_PER_GAS),
435                    COL_MAX_FEE_PER_GAS,
436                )?,
437                max_priority_fee_per_gas: decode_u128_required(
438                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
439                    COL_MAX_PRIORITY_FEE_PER_GAS,
440                )?,
441                to: to_addr.map_or(TxKind::Create, TxKind::Call),
442                value,
443                input,
444                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
445            };
446            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
447        }
448        TxType::Eip4844 => {
449            let tx = TxEip4844 {
450                chain_id: from_i64(
451                    chain_id
452                        .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
453                ),
454                nonce,
455                gas_limit,
456                max_fee_per_gas: decode_u128_required(
457                    opt_blob(r, COL_MAX_FEE_PER_GAS),
458                    COL_MAX_FEE_PER_GAS,
459                )?,
460                max_priority_fee_per_gas: decode_u128_required(
461                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
462                    COL_MAX_PRIORITY_FEE_PER_GAS,
463                )?,
464                to: to_addr
465                    .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
466                value,
467                input,
468                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
469                blob_versioned_hashes: decode_b256_vec(
470                    opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
471                        SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
472                    })?,
473                )?,
474                max_fee_per_blob_gas: decode_u128_required(
475                    opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
476                    COL_MAX_FEE_PER_BLOB_GAS,
477                )?,
478            };
479            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
480        }
481        TxType::Eip7702 => {
482            let tx = TxEip7702 {
483                chain_id: from_i64(
484                    chain_id
485                        .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
486                ),
487                nonce,
488                gas_limit,
489                max_fee_per_gas: decode_u128_required(
490                    opt_blob(r, COL_MAX_FEE_PER_GAS),
491                    COL_MAX_FEE_PER_GAS,
492                )?,
493                max_priority_fee_per_gas: decode_u128_required(
494                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
495                    COL_MAX_PRIORITY_FEE_PER_GAS,
496                )?,
497                to: to_addr
498                    .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
499                value,
500                input,
501                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
502                authorization_list: decode_authorization_list(
503                    opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
504                        SqlColdError::Convert("EIP7702 requires authorization_list".into())
505                    })?,
506                )?,
507            };
508            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
509        }
510    }
511}
512
513/// Build a [`RecoveredTx`] from a row that includes `from_address`.
514fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
515    let sender: Address = r.get(COL_FROM_ADDRESS);
516    let tx = tx_from_row(r)?;
517    // SAFETY: the sender was recovered at append time and stored in from_address.
518    Ok(Recovered::new_unchecked(tx, sender))
519}
520
521/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
522fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
523    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
524        .into_iter()
525        .filter_map(|col| r.get::<Option<B256>, _>(col))
526        .collect();
527    Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
528}
529
530/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
531fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
532    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
533    let order = from_i64(r.get(COL_ORDER_INDEX));
534    let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
535
536    match event_type {
537        EVENT_TRANSACT => {
538            let sender: Address = r
539                .get::<Option<Address>, _>(COL_SENDER)
540                .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
541            let to: Address = r
542                .get::<Option<Address>, _>(COL_TO_ADDRESS)
543                .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
544            let value: U256 = r
545                .get::<Option<U256>, _>(COL_VALUE)
546                .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
547            let gas: U256 = r
548                .get::<Option<U256>, _>(COL_GAS)
549                .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
550            let max_fee: U256 = r
551                .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
552                .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
553            let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
554
555            Ok(DbSignetEvent::Transact(
556                order,
557                Transact {
558                    rollupChainId: rollup_chain_id,
559                    sender,
560                    to,
561                    value,
562                    gas,
563                    maxFeePerGas: max_fee,
564                    data,
565                },
566            ))
567        }
568        EVENT_ENTER => {
569            let recipient: Address = r
570                .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
571                .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
572            let amount: U256 = r
573                .get::<Option<U256>, _>(COL_AMOUNT)
574                .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
575
576            Ok(DbSignetEvent::Enter(
577                order,
578                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
579            ))
580        }
581        EVENT_ENTER_TOKEN => {
582            let token: Address = r
583                .get::<Option<Address>, _>(COL_TOKEN)
584                .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
585            let recipient: Address =
586                r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
587                    SqlColdError::Convert("EnterToken requires rollup_recipient".into())
588                })?;
589            let amount: U256 = r
590                .get::<Option<U256>, _>(COL_AMOUNT)
591                .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
592
593            Ok(DbSignetEvent::EnterToken(
594                order,
595                EnterToken {
596                    rollupChainId: rollup_chain_id,
597                    token,
598                    rollupRecipient: recipient,
599                    amount,
600                },
601            ))
602        }
603        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
604    }
605}
606
607/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
608fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
609    Ok(DbZenithHeader(Zenith::BlockHeader {
610        hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
611        rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
612        gasLimit: r.get(COL_GAS_LIMIT),
613        rewardAddress: r.get(COL_REWARD_ADDRESS),
614        blockDataHash: r.get(COL_BLOCK_DATA_HASH),
615    }))
616}
617
618// ============================================================================
619// Domain type → SQL INSERT (write path)
620// ============================================================================
621
622/// Write a single block's data into an open SQL transaction.
623async fn write_block_to_tx(
624    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
625    data: BlockData,
626) -> Result<(), SqlColdError> {
627    let bn = to_i64(data.block_number());
628
629    // Insert header
630    let block_hash = data.header.hash_slow();
631    let difficulty = &data.header.difficulty;
632    sqlx::query(
633        "INSERT INTO headers (
634            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
635            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
636            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
637            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
638            parent_beacon_block_root, requests_hash
639        ) VALUES (
640            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
641            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
642        )",
643    )
644    .bind(bn)
645    .bind(block_hash)
646    .bind(data.header.parent_hash)
647    .bind(data.header.ommers_hash)
648    .bind(data.header.beneficiary)
649    .bind(data.header.state_root)
650    .bind(data.header.transactions_root)
651    .bind(data.header.receipts_root)
652    .bind(data.header.logs_bloom)
653    .bind(difficulty)
654    .bind(to_i64(data.header.gas_limit))
655    .bind(to_i64(data.header.gas_used))
656    .bind(to_i64(data.header.timestamp))
657    .bind(&data.header.extra_data)
658    .bind(data.header.mix_hash)
659    .bind(data.header.nonce)
660    .bind(data.header.base_fee_per_gas.map(to_i64))
661    .bind(data.header.withdrawals_root.as_ref())
662    .bind(data.header.blob_gas_used.map(to_i64))
663    .bind(data.header.excess_blob_gas.map(to_i64))
664    .bind(data.header.parent_beacon_block_root.as_ref())
665    .bind(data.header.requests_hash.as_ref())
666    .execute(&mut **tx)
667    .await?;
668
669    // Insert transactions
670    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
671        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
672    }
673
674    // Insert receipts and logs, computing first_log_index as a running
675    // sum of log counts (same algorithm as the MDBX IndexedReceipt path).
676    let mut first_log_index = 0i64;
677    for (idx, receipt) in data.receipts.iter().enumerate() {
678        let tx_idx = to_i64(idx as u64);
679        sqlx::query(
680            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
681             VALUES ($1, $2, $3, $4, $5, $6)",
682        )
683        .bind(bn)
684        .bind(tx_idx)
685        .bind(receipt.tx_type as i32)
686        .bind(receipt.inner.status.coerce_status() as i32)
687        .bind(to_i64(receipt.inner.cumulative_gas_used))
688        .bind(first_log_index)
689        .execute(&mut **tx)
690        .await?;
691        first_log_index += receipt.inner.logs.len() as i64;
692
693        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
694            let topics = log.topics();
695            sqlx::query(
696                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
697                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
698            )
699            .bind(bn)
700            .bind(tx_idx)
701            .bind(to_i64(log_idx as u64))
702            .bind(log.address)
703            .bind(topics.first())
704            .bind(topics.get(1))
705            .bind(topics.get(2))
706            .bind(topics.get(3))
707            .bind(&log.data.data)
708            .execute(&mut **tx)
709            .await?;
710        }
711    }
712
713    // Insert signet events
714    for (idx, event) in data.signet_events.iter().enumerate() {
715        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
716    }
717
718    // Insert zenith header
719    if let Some(zh) = &data.zenith_header {
720        let h = &zh.0;
721        sqlx::query(
722            "INSERT INTO zenith_headers (
723                block_number, host_block_number, rollup_chain_id,
724                gas_limit, reward_address, block_data_hash
725            ) VALUES ($1, $2, $3, $4, $5, $6)",
726        )
727        .bind(bn)
728        .bind(h.hostBlockNumber)
729        .bind(h.rollupChainId)
730        .bind(h.gasLimit)
731        .bind(h.rewardAddress)
732        .bind(h.blockDataHash)
733        .execute(&mut **tx)
734        .await?;
735    }
736
737    Ok(())
738}
739
740/// Insert a transaction, binding directly from the source type.
741async fn insert_transaction(
742    conn: &mut sqlx::AnyConnection,
743    bn: i64,
744    tx_index: i64,
745    recovered: &RecoveredTx,
746) -> Result<(), SqlColdError> {
747    use alloy::consensus::EthereumTxEnvelope;
748
749    let sender = recovered.signer();
750    let tx: &TransactionSigned = recovered;
751    let tx_hash = tx.tx_hash();
752    let tx_type = tx.tx_type() as i32;
753
754    macro_rules! sig {
755        ($s:expr) => {{
756            let sig = $s.signature();
757            (sig.v() as i32, sig.r(), sig.s())
758        }};
759    }
760    let (sig_y, sig_r, sig_s) = match tx {
761        EthereumTxEnvelope::Legacy(s) => sig!(s),
762        EthereumTxEnvelope::Eip2930(s) => sig!(s),
763        EthereumTxEnvelope::Eip1559(s) => sig!(s),
764        EthereumTxEnvelope::Eip4844(s) => sig!(s),
765        EthereumTxEnvelope::Eip7702(s) => sig!(s),
766    };
767
768    let (chain_id, nonce, gas_limit) = match tx {
769        EthereumTxEnvelope::Legacy(s) => {
770            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771        }
772        EthereumTxEnvelope::Eip2930(s) => {
773            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774        }
775        EthereumTxEnvelope::Eip1559(s) => {
776            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777        }
778        EthereumTxEnvelope::Eip4844(s) => {
779            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
780        }
781        EthereumTxEnvelope::Eip7702(s) => {
782            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
783        }
784    };
785
786    let (value, to_addr) = match tx {
787        EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
788        EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
789        EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
790        EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
791        EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
792    };
793
794    let input: &[u8] = match tx {
795        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
796        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
797        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
798        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
799        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
800    };
801
802    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
803        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
804        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
805        EthereumTxEnvelope::Eip1559(s) => (
806            None,
807            Some(encode_u128(s.tx().max_fee_per_gas)),
808            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
809            None,
810        ),
811        EthereumTxEnvelope::Eip4844(s) => (
812            None,
813            Some(encode_u128(s.tx().max_fee_per_gas)),
814            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
815            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
816        ),
817        EthereumTxEnvelope::Eip7702(s) => (
818            None,
819            Some(encode_u128(s.tx().max_fee_per_gas)),
820            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
821            None,
822        ),
823    };
824
825    let (access_list, blob_hashes, auth_list) = match tx {
826        EthereumTxEnvelope::Legacy(_) => (None, None, None),
827        EthereumTxEnvelope::Eip2930(s) => {
828            (Some(encode_access_list(&s.tx().access_list)), None, None)
829        }
830        EthereumTxEnvelope::Eip1559(s) => {
831            (Some(encode_access_list(&s.tx().access_list)), None, None)
832        }
833        EthereumTxEnvelope::Eip4844(s) => (
834            Some(encode_access_list(&s.tx().access_list)),
835            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
836            None,
837        ),
838        EthereumTxEnvelope::Eip7702(s) => (
839            Some(encode_access_list(&s.tx().access_list)),
840            None,
841            Some(encode_authorization_list(&s.tx().authorization_list)),
842        ),
843    };
844
845    sqlx::query(
846        "INSERT INTO transactions (
847            block_number, tx_index, tx_hash, tx_type,
848            sig_y_parity, sig_r, sig_s,
849            chain_id, nonce, gas_limit, to_address, value, input,
850            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
851            max_fee_per_blob_gas, blob_versioned_hashes,
852            access_list, authorization_list, from_address
853        ) VALUES (
854            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
855            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
856        )",
857    )
858    .bind(bn)
859    .bind(tx_index)
860    .bind(tx_hash)
861    .bind(tx_type)
862    .bind(sig_y)
863    .bind(sig_r)
864    .bind(sig_s)
865    .bind(chain_id)
866    .bind(nonce)
867    .bind(gas_limit)
868    .bind(to_addr)
869    .bind(value)
870    .bind(input)
871    .bind(gas_price.as_ref().map(|v| v.as_slice()))
872    .bind(max_fee.as_ref().map(|v| v.as_slice()))
873    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
874    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
875    .bind(blob_hashes.as_deref())
876    .bind(access_list.as_deref())
877    .bind(auth_list.as_deref())
878    .bind(sender)
879    .execute(&mut *conn)
880    .await?;
881
882    Ok(())
883}
884
885/// Insert a signet event, binding directly from the source type.
886async fn insert_signet_event(
887    conn: &mut sqlx::AnyConnection,
888    block_number: i64,
889    event_index: i64,
890    event: &DbSignetEvent,
891) -> Result<(), SqlColdError> {
892    let (event_type, order, chain_id) = match event {
893        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
894        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
895        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
896    };
897
898    let (value, gas, max_fee, amount) = match event {
899        DbSignetEvent::Transact(_, t) => {
900            (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
901        }
902        DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
903        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
904    };
905
906    sqlx::query(
907        "INSERT INTO signet_events (
908            block_number, event_index, event_type, order_index,
909            rollup_chain_id, sender, to_address, value, gas,
910            max_fee_per_gas, data, rollup_recipient, amount, token
911        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
912    )
913    .bind(block_number)
914    .bind(event_index)
915    .bind(event_type)
916    .bind(order)
917    .bind(chain_id)
918    .bind(match event {
919        DbSignetEvent::Transact(_, t) => Some(&t.sender),
920        _ => None,
921    })
922    .bind(match event {
923        DbSignetEvent::Transact(_, t) => Some(&t.to),
924        _ => None,
925    })
926    .bind(value)
927    .bind(gas)
928    .bind(max_fee)
929    .bind(match event {
930        DbSignetEvent::Transact(_, t) => Some(&t.data),
931        _ => None,
932    })
933    .bind(match event {
934        DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
935        DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
936        _ => None,
937    })
938    .bind(amount)
939    .bind(match event {
940        DbSignetEvent::EnterToken(_, e) => Some(&e.token),
941        _ => None,
942    })
943    .execute(&mut *conn)
944    .await?;
945
946    Ok(())
947}
948
949// ============================================================================
950// Log filter helpers
951// ============================================================================
952
953/// Append a SQL filter clause for a set of byte-encoded values.
954///
955/// For a single value, generates ` AND {column} = ${idx}`.
956/// For multiple values, generates ` AND {column} IN (${idx}, ...)`.
957/// Returns the next available parameter index.
958fn append_filter_clause<'a>(
959    clause: &mut String,
960    params: &mut Vec<&'a [u8]>,
961    mut idx: u32,
962    column: &str,
963    values: impl ExactSizeIterator<Item = &'a [u8]>,
964) -> u32 {
965    use std::fmt::Write;
966
967    let len = values.len();
968    if len == 1 {
969        write!(clause, " AND {column} = ${idx}").unwrap();
970        values.for_each(|v| params.push(v));
971        return idx + 1;
972    }
973    write!(clause, " AND {column} IN (").unwrap();
974    for (i, v) in values.enumerate() {
975        if i > 0 {
976            clause.push_str(", ");
977        }
978        write!(clause, "${idx}").unwrap();
979        params.push(v);
980        idx += 1;
981    }
982    clause.push(')');
983    idx
984}
985
986fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
987    let mut clause = String::new();
988    let mut params: Vec<&[u8]> = Vec::new();
989    let mut idx = start_idx;
990
991    if !filter.address.is_empty() {
992        idx = append_filter_clause(
993            &mut clause,
994            &mut params,
995            idx,
996            "l.address",
997            filter.address.iter().map(|a| a.as_slice()),
998        );
999    }
1000
1001    let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1002    for (i, topic_filter) in filter.topics.iter().enumerate() {
1003        if topic_filter.is_empty() {
1004            continue;
1005        }
1006        idx = append_filter_clause(
1007            &mut clause,
1008            &mut params,
1009            idx,
1010            topic_cols[i],
1011            topic_filter.iter().map(|v| v.as_slice()),
1012        );
1013    }
1014
1015    (clause, params)
1016}
1017
1018// ============================================================================
1019// ColdStorage implementation
1020// ============================================================================
1021
1022impl ColdStorageRead for SqlColdBackend {
1023    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1024        let Some(block_num) = self.resolve_header_spec(spec).await? else {
1025            return Ok(None);
1026        };
1027        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1028    }
1029
1030    async fn get_headers(
1031        &self,
1032        specs: Vec<HeaderSpecifier>,
1033    ) -> ColdResult<Vec<Option<SealedHeader>>> {
1034        let mut results = Vec::with_capacity(specs.len());
1035        for spec in specs {
1036            let header = self.get_header(spec).await?;
1037            results.push(header);
1038        }
1039        Ok(results)
1040    }
1041
1042    async fn get_transaction(
1043        &self,
1044        spec: TransactionSpecifier,
1045    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1046        let row = match spec {
1047            TransactionSpecifier::Hash(hash) => sqlx::query(
1048                "SELECT t.*, h.block_hash
1049                     FROM transactions t
1050                     JOIN headers h ON t.block_number = h.block_number
1051                     WHERE t.tx_hash = $1",
1052            )
1053            .bind(hash)
1054            .fetch_optional(&self.pool)
1055            .await
1056            .map_err(SqlColdError::from)?,
1057            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1058                "SELECT t.*, h.block_hash
1059                     FROM transactions t
1060                     JOIN headers h ON t.block_number = h.block_number
1061                     WHERE t.block_number = $1 AND t.tx_index = $2",
1062            )
1063            .bind(to_i64(block))
1064            .bind(to_i64(index))
1065            .fetch_optional(&self.pool)
1066            .await
1067            .map_err(SqlColdError::from)?,
1068            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1069                "SELECT t.*, h.block_hash
1070                     FROM transactions t
1071                     JOIN headers h ON t.block_number = h.block_number
1072                     WHERE h.block_hash = $1 AND t.tx_index = $2",
1073            )
1074            .bind(block_hash)
1075            .bind(to_i64(index))
1076            .fetch_optional(&self.pool)
1077            .await
1078            .map_err(SqlColdError::from)?,
1079        };
1080
1081        let Some(r) = row else {
1082            return Ok(None);
1083        };
1084
1085        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1086        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1087        let block_hash = r.get(COL_BLOCK_HASH);
1088        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1089        let meta = ConfirmationMeta::new(block, block_hash, index);
1090        Ok(Some(Confirmed::new(recovered, meta)))
1091    }
1092
1093    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1094        let bn = to_i64(block);
1095        let rows =
1096            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1097                .bind(bn)
1098                .fetch_all(&self.pool)
1099                .await
1100                .map_err(SqlColdError::from)?;
1101
1102        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1103    }
1104
1105    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1106        let bn = to_i64(block);
1107        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1108            .bind(bn)
1109            .fetch_one(&self.pool)
1110            .await
1111            .map_err(SqlColdError::from)?;
1112
1113        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1114    }
1115
1116    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1117        // Resolve to (block, index)
1118        let (block, index) = match spec {
1119            ReceiptSpecifier::TxHash(hash) => {
1120                let row = sqlx::query(
1121                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1122                )
1123                .bind(hash)
1124                .fetch_optional(&self.pool)
1125                .await
1126                .map_err(SqlColdError::from)?;
1127                let Some(r) = row else { return Ok(None) };
1128                (
1129                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1130                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1131                )
1132            }
1133            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1134        };
1135
1136        // Combined query: receipt + tx metadata + full header + prior gas.
1137        // Header columns use standard names (h.*) so header_from_row works.
1138        // Receipt/tx columns use r_ prefix to avoid name collisions.
1139        let combined = sqlx::query(
1140            "SELECT h.*, \
1141               r.tx_type AS r_tx_type, r.success AS r_success, \
1142               r.cumulative_gas_used AS r_cumulative_gas_used, \
1143               r.first_log_index AS r_first_log_index, \
1144               t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1145               COALESCE( \
1146                 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1147                  FROM receipts r2 \
1148                  WHERE r2.block_number = r.block_number \
1149                    AND r2.tx_index < r.tx_index), \
1150                 0 \
1151               ) AS prior_gas \
1152             FROM receipts r \
1153             JOIN transactions t ON r.block_number = t.block_number \
1154               AND r.tx_index = t.tx_index \
1155             JOIN headers h ON r.block_number = h.block_number \
1156             WHERE r.block_number = $1 AND r.tx_index = $2",
1157        )
1158        .bind(to_i64(block))
1159        .bind(to_i64(index))
1160        .fetch_optional(&self.pool)
1161        .await
1162        .map_err(SqlColdError::from)?;
1163
1164        let Some(rr) = combined else {
1165            return Ok(None);
1166        };
1167
1168        // Extract header using existing helper (h.* columns are unaliased).
1169        let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1170
1171        // Extract receipt fields from r_ prefixed aliases.
1172        let tx_hash = rr.get(COL_R_TX_HASH);
1173        let sender = rr.get(COL_R_FROM_ADDRESS);
1174        let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1175        let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1176        let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1177        let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1178        let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1179
1180        // Logs still require a separate query (variable row count).
1181        let log_rows = sqlx::query(
1182            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1183        )
1184        .bind(to_i64(block))
1185        .bind(to_i64(index))
1186        .fetch_all(&self.pool)
1187        .await
1188        .map_err(SqlColdError::from)?;
1189
1190        let logs = log_rows.iter().map(log_from_row).collect();
1191        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1192            .map_err(ColdStorageError::from)?;
1193        // Cumulative gas must be non-decreasing within a block; a violation
1194        // indicates database corruption.
1195        debug_assert!(
1196            built.inner.cumulative_gas_used >= prior_cumulative_gas,
1197            "cumulative gas decreased: {} < {}",
1198            built.inner.cumulative_gas_used,
1199            prior_cumulative_gas,
1200        );
1201        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1202
1203        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1204        Ok(Some(ColdReceipt::new(ir, &header, index)))
1205    }
1206
1207    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1208        let Some(header) =
1209            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1210        else {
1211            return Ok(Vec::new());
1212        };
1213
1214        let bn = to_i64(block);
1215
1216        // Fetch receipts joined with tx_hash and from_address
1217        let receipt_rows = sqlx::query(
1218            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1219               r.cumulative_gas_used, r.first_log_index, \
1220               t.tx_hash, t.from_address \
1221             FROM receipts r \
1222             JOIN transactions t ON r.block_number = t.block_number \
1223               AND r.tx_index = t.tx_index \
1224             WHERE r.block_number = $1 \
1225             ORDER BY r.tx_index",
1226        )
1227        .bind(bn)
1228        .fetch_all(&self.pool)
1229        .await
1230        .map_err(SqlColdError::from)?;
1231
1232        let all_log_rows =
1233            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1234                .bind(bn)
1235                .fetch_all(&self.pool)
1236                .await
1237                .map_err(SqlColdError::from)?;
1238
1239        // Group logs by tx_index
1240        let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1241        for r in &all_log_rows {
1242            let tx_idx: i64 = r.get(COL_TX_INDEX);
1243            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1244        }
1245
1246        let mut first_log_index = 0u64;
1247        let mut prior_cumulative_gas = 0u64;
1248        receipt_rows
1249            .into_iter()
1250            .enumerate()
1251            .map(|(idx, rr)| {
1252                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1253                let tx_hash = rr.get(COL_TX_HASH);
1254                let sender = rr.get(COL_FROM_ADDRESS);
1255                let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1256                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1257                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1258                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1259                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1260                    .map_err(ColdStorageError::from)?;
1261                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1262                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1263                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1264                first_log_index += ir.receipt.inner.logs.len() as u64;
1265                Ok(ColdReceipt::new(ir, &header, idx as u64))
1266            })
1267            .collect()
1268    }
1269
1270    async fn get_signet_events(
1271        &self,
1272        spec: SignetEventsSpecifier,
1273    ) -> ColdResult<Vec<DbSignetEvent>> {
1274        let rows = match spec {
1275            SignetEventsSpecifier::Block(block) => {
1276                let bn = to_i64(block);
1277                sqlx::query(
1278                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1279                )
1280                .bind(bn)
1281                .fetch_all(&self.pool)
1282                .await
1283                .map_err(SqlColdError::from)?
1284            }
1285            SignetEventsSpecifier::BlockRange { start, end } => {
1286                let s = to_i64(start);
1287                let e = to_i64(end);
1288                sqlx::query(
1289                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1290                     ORDER BY block_number, event_index",
1291                )
1292                .bind(s)
1293                .bind(e)
1294                .fetch_all(&self.pool)
1295                .await
1296                .map_err(SqlColdError::from)?
1297            }
1298        };
1299
1300        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1301    }
1302
1303    async fn get_zenith_header(
1304        &self,
1305        spec: ZenithHeaderSpecifier,
1306    ) -> ColdResult<Option<DbZenithHeader>> {
1307        let block = match spec {
1308            ZenithHeaderSpecifier::Number(n) => n,
1309            ZenithHeaderSpecifier::Range { start, .. } => start,
1310        };
1311        let bn = to_i64(block);
1312        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1313            .bind(bn)
1314            .fetch_optional(&self.pool)
1315            .await
1316            .map_err(SqlColdError::from)?;
1317
1318        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1319    }
1320
1321    async fn get_zenith_headers(
1322        &self,
1323        spec: ZenithHeaderSpecifier,
1324    ) -> ColdResult<Vec<DbZenithHeader>> {
1325        let rows = match spec {
1326            ZenithHeaderSpecifier::Number(n) => {
1327                let bn = to_i64(n);
1328                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1329                    .bind(bn)
1330                    .fetch_all(&self.pool)
1331                    .await
1332                    .map_err(SqlColdError::from)?
1333            }
1334            ZenithHeaderSpecifier::Range { start, end } => {
1335                let s = to_i64(start);
1336                let e = to_i64(end);
1337                sqlx::query(
1338                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1339                     ORDER BY block_number",
1340                )
1341                .bind(s)
1342                .bind(e)
1343                .fetch_all(&self.pool)
1344                .await
1345                .map_err(SqlColdError::from)?
1346            }
1347        };
1348
1349        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1350    }
1351
1352    async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1353        let from = filter.get_from_block().unwrap_or(0);
1354        let to = filter.get_to_block().unwrap_or(u64::MAX);
1355
1356        // Build WHERE clause: block range ($1, $2) + address/topic filters.
1357        // ENG-2036: dynamic filter clause requires format!(); mitigated by
1358        // sqlx prepared-statement cache.
1359        let (filter_clause, params) = build_log_filter_clause(filter, 3);
1360        let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1361
1362        // Use LIMIT to cap results. Fetch one extra row to detect overflow
1363        // without a separate COUNT query. PostgreSQL stops scanning after
1364        // finding enough rows, making this faster than COUNT in both the
1365        // under-limit and over-limit cases.
1366        let limit_idx = 3 + params.len() as u32;
1367        let data_sql = format!(
1368            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1369               (r.first_log_index + l.log_index) AS block_log_index \
1370             FROM logs l \
1371             JOIN headers h ON l.block_number = h.block_number \
1372             JOIN transactions t ON l.block_number = t.block_number \
1373               AND l.tx_index = t.tx_index \
1374             JOIN receipts r ON l.block_number = r.block_number \
1375               AND l.tx_index = r.tx_index \
1376             WHERE {where_clause} \
1377             ORDER BY l.block_number, l.tx_index, l.log_index \
1378             LIMIT ${limit_idx}"
1379        );
1380        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1381        for param in &params {
1382            query = query.bind(*param);
1383        }
1384        // Clamp to i64::MAX before converting: SQL LIMIT is an i64, and
1385        // saturating_add avoids overflow when max_logs is usize::MAX.
1386        let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1387        query = query.bind(to_i64(limit as u64));
1388
1389        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1390
1391        if rows.len() > max_logs {
1392            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1393        }
1394
1395        rows.into_iter()
1396            .map(|r| {
1397                let log = log_from_row(&r);
1398                Ok(RpcLog {
1399                    inner: log,
1400                    block_hash: Some(r.get(COL_BLOCK_HASH)),
1401                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1402                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1403                    transaction_hash: Some(r.get(COL_TX_HASH)),
1404                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1405                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1406                    removed: false,
1407                })
1408            })
1409            .collect::<ColdResult<Vec<_>>>()
1410    }
1411
1412    async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1413        #[cfg(feature = "postgres")]
1414        if self.is_postgres {
1415            return self.produce_log_stream_pg(filter, params).await;
1416        }
1417        signet_cold::produce_log_stream_default(self, filter, params).await;
1418    }
1419
1420    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1421        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1422            .fetch_one(&self.pool)
1423            .await
1424            .map_err(SqlColdError::from)?;
1425        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1426    }
1427}
1428
1429impl ColdStorageWrite for SqlColdBackend {
1430    async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1431        self.insert_block(data).await.map_err(ColdStorageError::from)
1432    }
1433
1434    async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1435        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1436        for block_data in data {
1437            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1438        }
1439        tx.commit().await.map_err(SqlColdError::from)?;
1440        Ok(())
1441    }
1442
1443    async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1444        let bn = to_i64(block);
1445        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1446
1447        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1448
1449        tx.commit().await.map_err(SqlColdError::from)?;
1450        Ok(())
1451    }
1452}
1453
1454impl ColdStorage for SqlColdBackend {
1455    async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1456        let bn = to_i64(block);
1457        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1458
1459        // 1. Fetch all headers above block.
1460        let header_rows =
1461            sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1462                .bind(bn)
1463                .fetch_all(&mut *tx)
1464                .await
1465                .map_err(SqlColdError::from)?;
1466
1467        if header_rows.is_empty() {
1468            tx.commit().await.map_err(SqlColdError::from)?;
1469            return Ok(Vec::new());
1470        }
1471
1472        let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1473        for r in &header_rows {
1474            let num: i64 = r.get(COL_BLOCK_NUMBER);
1475            let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1476            headers.insert(num, h);
1477        }
1478
1479        // 2. Fetch all receipt + tx metadata above block.
1480        let receipt_rows = sqlx::query(
1481            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1482               r.cumulative_gas_used, r.first_log_index, \
1483               t.tx_hash, t.from_address \
1484             FROM receipts r \
1485             JOIN transactions t ON r.block_number = t.block_number \
1486               AND r.tx_index = t.tx_index \
1487             WHERE r.block_number > $1 \
1488             ORDER BY r.block_number, r.tx_index",
1489        )
1490        .bind(bn)
1491        .fetch_all(&mut *tx)
1492        .await
1493        .map_err(SqlColdError::from)?;
1494
1495        // 3. Fetch all logs above block.
1496        let log_rows = sqlx::query(
1497            "SELECT * FROM logs WHERE block_number > $1 \
1498             ORDER BY block_number, tx_index, log_index",
1499        )
1500        .bind(bn)
1501        .fetch_all(&mut *tx)
1502        .await
1503        .map_err(SqlColdError::from)?;
1504
1505        // Group logs by (block_number, tx_index).
1506        let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1507        for r in &log_rows {
1508            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1509            let tx_idx: i64 = r.get(COL_TX_INDEX);
1510            logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1511        }
1512
1513        // Group receipt rows by block_number.
1514        let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1515        for r in &receipt_rows {
1516            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1517            receipts_by_block.entry(block_num).or_default().push(r);
1518        }
1519
1520        // 4. Assemble ColdReceipts per block.
1521        let mut all_receipts = Vec::with_capacity(headers.len());
1522        for (&block_num, header) in &headers {
1523            let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1524            let mut prior_cumulative_gas = 0u64;
1525            let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1526                .into_iter()
1527                .map(|rr: &sqlx::any::AnyRow| {
1528                    let tx_idx: i64 = rr.get(COL_TX_INDEX);
1529                    let tx_hash = rr.get(COL_TX_HASH);
1530                    let sender = rr.get(COL_FROM_ADDRESS);
1531                    let tx_type: i32 = rr.get(COL_TX_TYPE);
1532                    let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1533                    let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1534                    let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1535                    let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1536                    let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1537                        .map_err(ColdStorageError::from)?;
1538                    // Cumulative gas must be non-decreasing within a block; a violation
1539                    // indicates database corruption.
1540                    debug_assert!(
1541                        receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1542                        "cumulative gas decreased: {} < {}",
1543                        receipt.inner.cumulative_gas_used,
1544                        prior_cumulative_gas,
1545                    );
1546                    let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1547                    prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1548                    let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1549                    Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1550                })
1551                .collect();
1552            all_receipts.push(block_receipts?);
1553        }
1554
1555        // 5. Delete from all tables (same order as truncate_above).
1556        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1557
1558        tx.commit().await.map_err(SqlColdError::from)?;
1559        Ok(all_receipts)
1560    }
1561}
1562
1563#[cfg(all(test, feature = "test-utils"))]
1564mod tests {
1565    use super::*;
1566    use signet_cold::conformance::conformance;
1567
1568    #[tokio::test]
1569    async fn sqlite_conformance() {
1570        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1571        conformance(backend).await.unwrap();
1572    }
1573
1574    #[tokio::test]
1575    async fn pg_conformance() {
1576        let Ok(url) = std::env::var("DATABASE_URL") else {
1577            eprintln!("skipping pg conformance: DATABASE_URL not set");
1578            return;
1579        };
1580        let backend = SqlColdBackend::connect(&url).await.unwrap();
1581        conformance(backend).await.unwrap();
1582    }
1583}