Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14    COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15    COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16    COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17    COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18    COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19    COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20    COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21    COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22    COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26    decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27    encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30    consensus::{
31        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32        transaction::Recovered,
33    },
34    primitives::{
35        Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36    },
37};
38use signet_cold::{
39    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40    ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41    SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45    TransactionSigned,
46};
47use signet_zenith::{
48    Passage::{Enter, EnterToken},
49    Transactor::Transact,
50    Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55/// SQL-based cold storage backend.
56///
57/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
58/// PostgreSQL and SQLite through a single implementation. The backend
59/// is determined by the connection URL at construction time.
60///
61/// # Example
62///
63/// ```no_run
64/// # async fn example() {
65/// use signet_cold_sql::SqlColdBackend;
66///
67/// // SQLite (in-memory)
68/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
69///
70/// // PostgreSQL
71/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
72/// # }
73/// ```
74#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76    pool: AnyPool,
77    is_postgres: bool,
78}
79
80/// Returns `true` if the URL refers to an in-memory SQLite database.
81///
82/// In-memory SQLite databases are per-connection, so the pool must be
83/// limited to a single connection to ensure all queries see the same
84/// state.
85fn is_in_memory_sqlite(url: &str) -> bool {
86    url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89/// Delete all rows with `block_number > bn` from every table, in
90/// child-first order to preserve foreign-key constraints.
91async fn delete_above_in_tx(
92    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93    bn: i64,
94) -> Result<(), SqlColdError> {
95    for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96    {
97        sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98            .bind(bn)
99            .execute(&mut **tx)
100            .await?;
101    }
102    Ok(())
103}
104
105impl SqlColdBackend {
106    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
107    ///
108    /// Auto-detects the database backend and creates all tables if they
109    /// do not already exist. Callers must ensure
110    /// [`sqlx::any::install_default_drivers`] has been called before
111    /// constructing the pool.
112    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113        // Detect backend from a pooled connection.
114        let conn = pool.acquire().await?;
115        let backend = conn.backend_name();
116
117        let (migration, is_postgres) = match backend {
118            "PostgreSQL" => (include_str!("../migrations/001_initial_pg.sql"), true),
119            "SQLite" => (include_str!("../migrations/001_initial.sql"), false),
120            other => {
121                return Err(SqlColdError::Convert(format!(
122                    "unsupported database backend: {other}"
123                )));
124            }
125        };
126        drop(conn);
127        // Execute via pool to ensure the migration uses the same
128        // connection that subsequent queries will use.
129        sqlx::raw_sql(migration).execute(&pool).await?;
130        Ok(Self { pool, is_postgres })
131    }
132
133    /// Connect to a database URL with explicit pool options.
134    ///
135    /// Installs the default sqlx drivers on the first call. The database
136    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
137    ///
138    /// For in-memory SQLite URLs (`sqlite::memory:` or `mode=memory`),
139    /// `max_connections` is forced to 1 regardless of the provided
140    /// options, because in-memory databases are per-connection.
141    pub async fn connect_with(
142        url: &str,
143        pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
144    ) -> Result<Self, SqlColdError> {
145        sqlx::any::install_default_drivers();
146        let pool_opts =
147            if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
148        let pool: AnyPool = pool_opts.connect(url).await?;
149        Self::new(pool).await
150    }
151
152    /// Connect to a database URL with default pool settings.
153    ///
154    /// Convenience wrapper around [`connect_with`](Self::connect_with).
155    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
156        Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
157    }
158
159    // ========================================================================
160    // Specifier resolution
161    // ========================================================================
162
163    async fn resolve_header_spec(
164        &self,
165        spec: HeaderSpecifier,
166    ) -> Result<Option<BlockNumber>, SqlColdError> {
167        match spec {
168            HeaderSpecifier::Number(n) => Ok(Some(n)),
169            HeaderSpecifier::Hash(hash) => {
170                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
171                    .bind(hash)
172                    .fetch_optional(&self.pool)
173                    .await?;
174                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
175            }
176        }
177    }
178
179    // ========================================================================
180    // Read helpers
181    // ========================================================================
182
183    async fn fetch_header_by_number(
184        &self,
185        block_num: BlockNumber,
186    ) -> Result<Option<SealedHeader>, SqlColdError> {
187        let bn = to_i64(block_num);
188        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
189            .bind(bn)
190            .fetch_optional(&self.pool)
191            .await?;
192
193        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
194    }
195
196    // ========================================================================
197    // Write helpers
198    // ========================================================================
199
200    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
201        let mut tx = self.pool.begin().await?;
202        write_block_to_tx(&mut tx, data).await?;
203        tx.commit().await?;
204        Ok(())
205    }
206
207    // ========================================================================
208    // Streaming helpers
209    // ========================================================================
210
211    /// Stream logs using a PostgreSQL REPEATABLE READ transaction.
212    ///
213    /// The transaction provides a consistent snapshot across all per-block
214    /// queries, eliminating the need for anchor-hash reorg detection.
215    /// Rows are streamed individually rather than materialised per block.
216    #[cfg(feature = "postgres")]
217    async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
218        use tokio_stream::StreamExt;
219
220        /// Unwrap a `Result` or send the error through the stream and return.
221        macro_rules! try_stream {
222            ($sender:expr, $expr:expr) => {
223                match $expr {
224                    Ok(v) => v,
225                    Err(e) => {
226                        let _ = $sender
227                            .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
228                            .await;
229                        return;
230                    }
231                }
232            };
233        }
234
235        let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
236
237        // Open a REPEATABLE READ transaction so all per-block queries see a
238        // consistent snapshot. This makes reorg detection unnecessary — if a
239        // reorg lands mid-stream the transaction still reads the old data.
240        let mut tx = try_stream!(sender, self.pool.begin().await);
241        try_stream!(
242            sender,
243            sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
244        );
245
246        // Build the parameterised query once. $1 is the block number
247        // (bound per iteration); remaining parameters are the address
248        // and topic filters from the user's request.
249        //
250        // ENG-2036: format!() allocates here, but the dynamic filter clause
251        // makes this unavoidable. sqlx's prepared-statement cache mitigates
252        // repeated execution cost.
253        let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
254        let data_sql = format!(
255            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
256               (r.first_log_index + l.log_index) AS block_log_index \
257             FROM logs l \
258             JOIN headers h ON l.block_number = h.block_number \
259             JOIN transactions t ON l.block_number = t.block_number \
260               AND l.tx_index = t.tx_index \
261             JOIN receipts r ON l.block_number = r.block_number \
262               AND l.tx_index = r.tx_index \
263             WHERE l.block_number = $1{filter_clause} \
264             ORDER BY l.tx_index, l.log_index"
265        );
266
267        let mut total = 0usize;
268
269        // Walk through blocks one at a time, streaming matching log rows
270        // from each block directly to the channel.
271        for block_num in from..=to {
272            // Check the deadline before starting each block so we
273            // don't begin a new query after the caller's timeout.
274            if tokio::time::Instant::now() > deadline {
275                let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
276                return;
277            }
278
279            let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
280            for param in &filter_params {
281                query = query.bind(*param);
282            }
283
284            // Stream rows from this block's query. Each row is converted
285            // to an RpcLog and sent over the channel immediately rather
286            // than being collected into a Vec first.
287            let mut stream = query.fetch(&mut *tx);
288            while let Some(row_result) = stream.next().await {
289                let r = try_stream!(sender, row_result);
290
291                // Enforce the global log limit across all blocks.
292                total += 1;
293                if total > max_logs {
294                    let _ =
295                        sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
296                    return;
297                }
298
299                let log = log_from_row(&r);
300                let rpc_log = RpcLog {
301                    inner: log,
302                    block_hash: Some(r.get(COL_BLOCK_HASH)),
303                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
304                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
305                    transaction_hash: Some(r.get(COL_TX_HASH)),
306                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
307                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
308                    removed: false,
309                };
310                // Send the log to the caller. The timeout ensures we
311                // stop if the deadline passes while back-pressured.
312                match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
313                    Ok(Ok(())) => {}
314                    Ok(Err(_)) => return, // receiver dropped
315                    Err(_) => {
316                        let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
317                        return;
318                    }
319                }
320            }
321
322            // Early exit if we've already hit the limit — no need to
323            // query the next block.
324            if total >= max_logs {
325                return;
326            }
327        }
328    }
329}
330
331// ============================================================================
332// Row → domain type conversion (read path)
333// ============================================================================
334
335/// Extract a required BLOB column from a row as a borrowed slice.
336fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
337    r.get(col)
338}
339
340/// Extract an optional BLOB column from a row as a borrowed slice.
341fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
342    r.get(col)
343}
344
345/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
346fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
347    Ok(Header {
348        parent_hash: r.get(COL_PARENT_HASH),
349        ommers_hash: r.get(COL_OMMERS_HASH),
350        beneficiary: r.get(COL_BENEFICIARY),
351        state_root: r.get(COL_STATE_ROOT),
352        transactions_root: r.get(COL_TRANSACTIONS_ROOT),
353        receipts_root: r.get(COL_RECEIPTS_ROOT),
354        logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
355        difficulty: r.get(COL_DIFFICULTY),
356        number: from_i64(r.get(COL_BLOCK_NUMBER)),
357        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
358        gas_used: from_i64(r.get(COL_GAS_USED)),
359        timestamp: from_i64(r.get(COL_TIMESTAMP)),
360        extra_data: r.get(COL_EXTRA_DATA),
361        mix_hash: r.get(COL_MIX_HASH),
362        nonce: r.get(COL_NONCE),
363        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
364        withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
365        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
366        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
367        parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
368        requests_hash: r.get(COL_REQUESTS_HASH),
369    })
370}
371
372/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
373fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
374    use alloy::consensus::EthereumTxEnvelope;
375
376    let sig =
377        Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
378
379    let tx_type_raw: i32 = r.get(COL_TX_TYPE);
380    let tx_type_u8: u8 = tx_type_raw
381        .try_into()
382        .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
383    let tx_type = TxType::try_from(tx_type_u8)
384        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
385
386    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
387    let nonce = from_i64(r.get(COL_NONCE));
388    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
389    let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
390    let value: U256 = r.get(COL_VALUE);
391    let input: Bytes = r.get(COL_INPUT);
392
393    match tx_type {
394        TxType::Legacy => {
395            let tx = TxLegacy {
396                chain_id: chain_id.map(from_i64),
397                nonce,
398                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
399                gas_limit,
400                to: to_addr.map_or(TxKind::Create, TxKind::Call),
401                value,
402                input,
403            };
404            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
405        }
406        TxType::Eip2930 => {
407            let tx = TxEip2930 {
408                chain_id: from_i64(
409                    chain_id
410                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
411                ),
412                nonce,
413                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
414                gas_limit,
415                to: to_addr.map_or(TxKind::Create, TxKind::Call),
416                value,
417                input,
418                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
419            };
420            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
421        }
422        TxType::Eip1559 => {
423            let tx = TxEip1559 {
424                chain_id: from_i64(
425                    chain_id
426                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
427                ),
428                nonce,
429                gas_limit,
430                max_fee_per_gas: decode_u128_required(
431                    opt_blob(r, COL_MAX_FEE_PER_GAS),
432                    COL_MAX_FEE_PER_GAS,
433                )?,
434                max_priority_fee_per_gas: decode_u128_required(
435                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
436                    COL_MAX_PRIORITY_FEE_PER_GAS,
437                )?,
438                to: to_addr.map_or(TxKind::Create, TxKind::Call),
439                value,
440                input,
441                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
442            };
443            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
444        }
445        TxType::Eip4844 => {
446            let tx = TxEip4844 {
447                chain_id: from_i64(
448                    chain_id
449                        .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
450                ),
451                nonce,
452                gas_limit,
453                max_fee_per_gas: decode_u128_required(
454                    opt_blob(r, COL_MAX_FEE_PER_GAS),
455                    COL_MAX_FEE_PER_GAS,
456                )?,
457                max_priority_fee_per_gas: decode_u128_required(
458                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
459                    COL_MAX_PRIORITY_FEE_PER_GAS,
460                )?,
461                to: to_addr
462                    .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
463                value,
464                input,
465                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
466                blob_versioned_hashes: decode_b256_vec(
467                    opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
468                        SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
469                    })?,
470                )?,
471                max_fee_per_blob_gas: decode_u128_required(
472                    opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
473                    COL_MAX_FEE_PER_BLOB_GAS,
474                )?,
475            };
476            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
477        }
478        TxType::Eip7702 => {
479            let tx = TxEip7702 {
480                chain_id: from_i64(
481                    chain_id
482                        .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
483                ),
484                nonce,
485                gas_limit,
486                max_fee_per_gas: decode_u128_required(
487                    opt_blob(r, COL_MAX_FEE_PER_GAS),
488                    COL_MAX_FEE_PER_GAS,
489                )?,
490                max_priority_fee_per_gas: decode_u128_required(
491                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
492                    COL_MAX_PRIORITY_FEE_PER_GAS,
493                )?,
494                to: to_addr
495                    .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
496                value,
497                input,
498                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
499                authorization_list: decode_authorization_list(
500                    opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
501                        SqlColdError::Convert("EIP7702 requires authorization_list".into())
502                    })?,
503                )?,
504            };
505            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
506        }
507    }
508}
509
510/// Build a [`RecoveredTx`] from a row that includes `from_address`.
511fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
512    let sender: Address = r.get(COL_FROM_ADDRESS);
513    let tx = tx_from_row(r)?;
514    // SAFETY: the sender was recovered at append time and stored in from_address.
515    Ok(Recovered::new_unchecked(tx, sender))
516}
517
518/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
519fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
520    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
521        .into_iter()
522        .filter_map(|col| r.get::<Option<B256>, _>(col))
523        .collect();
524    Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
525}
526
527/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
528fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
529    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
530    let order = from_i64(r.get(COL_ORDER_INDEX));
531    let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
532
533    match event_type {
534        EVENT_TRANSACT => {
535            let sender: Address = r
536                .get::<Option<Address>, _>(COL_SENDER)
537                .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
538            let to: Address = r
539                .get::<Option<Address>, _>(COL_TO_ADDRESS)
540                .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
541            let value: U256 = r
542                .get::<Option<U256>, _>(COL_VALUE)
543                .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
544            let gas: U256 = r
545                .get::<Option<U256>, _>(COL_GAS)
546                .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
547            let max_fee: U256 = r
548                .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
549                .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
550            let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
551
552            Ok(DbSignetEvent::Transact(
553                order,
554                Transact {
555                    rollupChainId: rollup_chain_id,
556                    sender,
557                    to,
558                    value,
559                    gas,
560                    maxFeePerGas: max_fee,
561                    data,
562                },
563            ))
564        }
565        EVENT_ENTER => {
566            let recipient: Address = r
567                .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
568                .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
569            let amount: U256 = r
570                .get::<Option<U256>, _>(COL_AMOUNT)
571                .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
572
573            Ok(DbSignetEvent::Enter(
574                order,
575                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
576            ))
577        }
578        EVENT_ENTER_TOKEN => {
579            let token: Address = r
580                .get::<Option<Address>, _>(COL_TOKEN)
581                .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
582            let recipient: Address =
583                r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
584                    SqlColdError::Convert("EnterToken requires rollup_recipient".into())
585                })?;
586            let amount: U256 = r
587                .get::<Option<U256>, _>(COL_AMOUNT)
588                .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
589
590            Ok(DbSignetEvent::EnterToken(
591                order,
592                EnterToken {
593                    rollupChainId: rollup_chain_id,
594                    token,
595                    rollupRecipient: recipient,
596                    amount,
597                },
598            ))
599        }
600        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
601    }
602}
603
604/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
605fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
606    Ok(DbZenithHeader(Zenith::BlockHeader {
607        hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
608        rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
609        gasLimit: r.get(COL_GAS_LIMIT),
610        rewardAddress: r.get(COL_REWARD_ADDRESS),
611        blockDataHash: r.get(COL_BLOCK_DATA_HASH),
612    }))
613}
614
615// ============================================================================
616// Domain type → SQL INSERT (write path)
617// ============================================================================
618
619/// Write a single block's data into an open SQL transaction.
620async fn write_block_to_tx(
621    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
622    data: BlockData,
623) -> Result<(), SqlColdError> {
624    let bn = to_i64(data.block_number());
625
626    // Insert header
627    let block_hash = data.header.hash_slow();
628    let difficulty = &data.header.difficulty;
629    sqlx::query(
630        "INSERT INTO headers (
631            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
632            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
633            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
634            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
635            parent_beacon_block_root, requests_hash
636        ) VALUES (
637            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
638            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
639        )",
640    )
641    .bind(bn)
642    .bind(block_hash)
643    .bind(data.header.parent_hash)
644    .bind(data.header.ommers_hash)
645    .bind(data.header.beneficiary)
646    .bind(data.header.state_root)
647    .bind(data.header.transactions_root)
648    .bind(data.header.receipts_root)
649    .bind(data.header.logs_bloom)
650    .bind(difficulty)
651    .bind(to_i64(data.header.gas_limit))
652    .bind(to_i64(data.header.gas_used))
653    .bind(to_i64(data.header.timestamp))
654    .bind(&data.header.extra_data)
655    .bind(data.header.mix_hash)
656    .bind(data.header.nonce)
657    .bind(data.header.base_fee_per_gas.map(to_i64))
658    .bind(data.header.withdrawals_root.as_ref())
659    .bind(data.header.blob_gas_used.map(to_i64))
660    .bind(data.header.excess_blob_gas.map(to_i64))
661    .bind(data.header.parent_beacon_block_root.as_ref())
662    .bind(data.header.requests_hash.as_ref())
663    .execute(&mut **tx)
664    .await?;
665
666    // Insert transactions
667    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
668        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
669    }
670
671    // Insert receipts and logs, computing first_log_index as a running
672    // sum of log counts (same algorithm as the MDBX IndexedReceipt path).
673    let mut first_log_index = 0i64;
674    for (idx, receipt) in data.receipts.iter().enumerate() {
675        let tx_idx = to_i64(idx as u64);
676        sqlx::query(
677            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
678             VALUES ($1, $2, $3, $4, $5, $6)",
679        )
680        .bind(bn)
681        .bind(tx_idx)
682        .bind(receipt.tx_type as i32)
683        .bind(receipt.inner.status.coerce_status() as i32)
684        .bind(to_i64(receipt.inner.cumulative_gas_used))
685        .bind(first_log_index)
686        .execute(&mut **tx)
687        .await?;
688        first_log_index += receipt.inner.logs.len() as i64;
689
690        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
691            let topics = log.topics();
692            sqlx::query(
693                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
694                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
695            )
696            .bind(bn)
697            .bind(tx_idx)
698            .bind(to_i64(log_idx as u64))
699            .bind(log.address)
700            .bind(topics.first())
701            .bind(topics.get(1))
702            .bind(topics.get(2))
703            .bind(topics.get(3))
704            .bind(&log.data.data)
705            .execute(&mut **tx)
706            .await?;
707        }
708    }
709
710    // Insert signet events
711    for (idx, event) in data.signet_events.iter().enumerate() {
712        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
713    }
714
715    // Insert zenith header
716    if let Some(zh) = &data.zenith_header {
717        let h = &zh.0;
718        sqlx::query(
719            "INSERT INTO zenith_headers (
720                block_number, host_block_number, rollup_chain_id,
721                gas_limit, reward_address, block_data_hash
722            ) VALUES ($1, $2, $3, $4, $5, $6)",
723        )
724        .bind(bn)
725        .bind(h.hostBlockNumber)
726        .bind(h.rollupChainId)
727        .bind(h.gasLimit)
728        .bind(h.rewardAddress)
729        .bind(h.blockDataHash)
730        .execute(&mut **tx)
731        .await?;
732    }
733
734    Ok(())
735}
736
737/// Insert a transaction, binding directly from the source type.
738async fn insert_transaction(
739    conn: &mut sqlx::AnyConnection,
740    bn: i64,
741    tx_index: i64,
742    recovered: &RecoveredTx,
743) -> Result<(), SqlColdError> {
744    use alloy::consensus::EthereumTxEnvelope;
745
746    let sender = recovered.signer();
747    let tx: &TransactionSigned = recovered;
748    let tx_hash = tx.tx_hash();
749    let tx_type = tx.tx_type() as i32;
750
751    macro_rules! sig {
752        ($s:expr) => {{
753            let sig = $s.signature();
754            (sig.v() as i32, sig.r(), sig.s())
755        }};
756    }
757    let (sig_y, sig_r, sig_s) = match tx {
758        EthereumTxEnvelope::Legacy(s) => sig!(s),
759        EthereumTxEnvelope::Eip2930(s) => sig!(s),
760        EthereumTxEnvelope::Eip1559(s) => sig!(s),
761        EthereumTxEnvelope::Eip4844(s) => sig!(s),
762        EthereumTxEnvelope::Eip7702(s) => sig!(s),
763    };
764
765    let (chain_id, nonce, gas_limit) = match tx {
766        EthereumTxEnvelope::Legacy(s) => {
767            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
768        }
769        EthereumTxEnvelope::Eip2930(s) => {
770            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771        }
772        EthereumTxEnvelope::Eip1559(s) => {
773            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774        }
775        EthereumTxEnvelope::Eip4844(s) => {
776            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777        }
778        EthereumTxEnvelope::Eip7702(s) => {
779            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
780        }
781    };
782
783    let (value, to_addr) = match tx {
784        EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
785        EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
786        EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
787        EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
788        EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
789    };
790
791    let input: &[u8] = match tx {
792        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
793        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
794        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
795        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
796        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
797    };
798
799    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
800        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
801        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
802        EthereumTxEnvelope::Eip1559(s) => (
803            None,
804            Some(encode_u128(s.tx().max_fee_per_gas)),
805            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
806            None,
807        ),
808        EthereumTxEnvelope::Eip4844(s) => (
809            None,
810            Some(encode_u128(s.tx().max_fee_per_gas)),
811            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
812            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
813        ),
814        EthereumTxEnvelope::Eip7702(s) => (
815            None,
816            Some(encode_u128(s.tx().max_fee_per_gas)),
817            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
818            None,
819        ),
820    };
821
822    let (access_list, blob_hashes, auth_list) = match tx {
823        EthereumTxEnvelope::Legacy(_) => (None, None, None),
824        EthereumTxEnvelope::Eip2930(s) => {
825            (Some(encode_access_list(&s.tx().access_list)), None, None)
826        }
827        EthereumTxEnvelope::Eip1559(s) => {
828            (Some(encode_access_list(&s.tx().access_list)), None, None)
829        }
830        EthereumTxEnvelope::Eip4844(s) => (
831            Some(encode_access_list(&s.tx().access_list)),
832            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
833            None,
834        ),
835        EthereumTxEnvelope::Eip7702(s) => (
836            Some(encode_access_list(&s.tx().access_list)),
837            None,
838            Some(encode_authorization_list(&s.tx().authorization_list)),
839        ),
840    };
841
842    sqlx::query(
843        "INSERT INTO transactions (
844            block_number, tx_index, tx_hash, tx_type,
845            sig_y_parity, sig_r, sig_s,
846            chain_id, nonce, gas_limit, to_address, value, input,
847            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
848            max_fee_per_blob_gas, blob_versioned_hashes,
849            access_list, authorization_list, from_address
850        ) VALUES (
851            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
852            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
853        )",
854    )
855    .bind(bn)
856    .bind(tx_index)
857    .bind(tx_hash)
858    .bind(tx_type)
859    .bind(sig_y)
860    .bind(sig_r)
861    .bind(sig_s)
862    .bind(chain_id)
863    .bind(nonce)
864    .bind(gas_limit)
865    .bind(to_addr)
866    .bind(value)
867    .bind(input)
868    .bind(gas_price.as_ref().map(|v| v.as_slice()))
869    .bind(max_fee.as_ref().map(|v| v.as_slice()))
870    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
871    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
872    .bind(blob_hashes.as_deref())
873    .bind(access_list.as_deref())
874    .bind(auth_list.as_deref())
875    .bind(sender)
876    .execute(&mut *conn)
877    .await?;
878
879    Ok(())
880}
881
882/// Insert a signet event, binding directly from the source type.
883async fn insert_signet_event(
884    conn: &mut sqlx::AnyConnection,
885    block_number: i64,
886    event_index: i64,
887    event: &DbSignetEvent,
888) -> Result<(), SqlColdError> {
889    let (event_type, order, chain_id) = match event {
890        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
891        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
892        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
893    };
894
895    let (value, gas, max_fee, amount) = match event {
896        DbSignetEvent::Transact(_, t) => {
897            (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
898        }
899        DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
900        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
901    };
902
903    sqlx::query(
904        "INSERT INTO signet_events (
905            block_number, event_index, event_type, order_index,
906            rollup_chain_id, sender, to_address, value, gas,
907            max_fee_per_gas, data, rollup_recipient, amount, token
908        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
909    )
910    .bind(block_number)
911    .bind(event_index)
912    .bind(event_type)
913    .bind(order)
914    .bind(chain_id)
915    .bind(match event {
916        DbSignetEvent::Transact(_, t) => Some(&t.sender),
917        _ => None,
918    })
919    .bind(match event {
920        DbSignetEvent::Transact(_, t) => Some(&t.to),
921        _ => None,
922    })
923    .bind(value)
924    .bind(gas)
925    .bind(max_fee)
926    .bind(match event {
927        DbSignetEvent::Transact(_, t) => Some(&t.data),
928        _ => None,
929    })
930    .bind(match event {
931        DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
932        DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
933        _ => None,
934    })
935    .bind(amount)
936    .bind(match event {
937        DbSignetEvent::EnterToken(_, e) => Some(&e.token),
938        _ => None,
939    })
940    .execute(&mut *conn)
941    .await?;
942
943    Ok(())
944}
945
946// ============================================================================
947// Log filter helpers
948// ============================================================================
949
950/// Append a SQL filter clause for a set of byte-encoded values.
951///
952/// For a single value, generates ` AND {column} = ${idx}`.
953/// For multiple values, generates ` AND {column} IN (${idx}, ...)`.
954/// Returns the next available parameter index.
955fn append_filter_clause<'a>(
956    clause: &mut String,
957    params: &mut Vec<&'a [u8]>,
958    mut idx: u32,
959    column: &str,
960    values: impl ExactSizeIterator<Item = &'a [u8]>,
961) -> u32 {
962    use std::fmt::Write;
963
964    let len = values.len();
965    if len == 1 {
966        write!(clause, " AND {column} = ${idx}").unwrap();
967        values.for_each(|v| params.push(v));
968        return idx + 1;
969    }
970    write!(clause, " AND {column} IN (").unwrap();
971    for (i, v) in values.enumerate() {
972        if i > 0 {
973            clause.push_str(", ");
974        }
975        write!(clause, "${idx}").unwrap();
976        params.push(v);
977        idx += 1;
978    }
979    clause.push(')');
980    idx
981}
982
983fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
984    let mut clause = String::new();
985    let mut params: Vec<&[u8]> = Vec::new();
986    let mut idx = start_idx;
987
988    if !filter.address.is_empty() {
989        idx = append_filter_clause(
990            &mut clause,
991            &mut params,
992            idx,
993            "l.address",
994            filter.address.iter().map(|a| a.as_slice()),
995        );
996    }
997
998    let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
999    for (i, topic_filter) in filter.topics.iter().enumerate() {
1000        if topic_filter.is_empty() {
1001            continue;
1002        }
1003        idx = append_filter_clause(
1004            &mut clause,
1005            &mut params,
1006            idx,
1007            topic_cols[i],
1008            topic_filter.iter().map(|v| v.as_slice()),
1009        );
1010    }
1011
1012    (clause, params)
1013}
1014
1015// ============================================================================
1016// ColdStorage implementation
1017// ============================================================================
1018
1019impl ColdStorageRead for SqlColdBackend {
1020    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1021        let Some(block_num) = self.resolve_header_spec(spec).await? else {
1022            return Ok(None);
1023        };
1024        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1025    }
1026
1027    async fn get_headers(
1028        &self,
1029        specs: Vec<HeaderSpecifier>,
1030    ) -> ColdResult<Vec<Option<SealedHeader>>> {
1031        let mut results = Vec::with_capacity(specs.len());
1032        for spec in specs {
1033            let header = self.get_header(spec).await?;
1034            results.push(header);
1035        }
1036        Ok(results)
1037    }
1038
1039    async fn get_transaction(
1040        &self,
1041        spec: TransactionSpecifier,
1042    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1043        let row = match spec {
1044            TransactionSpecifier::Hash(hash) => sqlx::query(
1045                "SELECT t.*, h.block_hash
1046                     FROM transactions t
1047                     JOIN headers h ON t.block_number = h.block_number
1048                     WHERE t.tx_hash = $1",
1049            )
1050            .bind(hash)
1051            .fetch_optional(&self.pool)
1052            .await
1053            .map_err(SqlColdError::from)?,
1054            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1055                "SELECT t.*, h.block_hash
1056                     FROM transactions t
1057                     JOIN headers h ON t.block_number = h.block_number
1058                     WHERE t.block_number = $1 AND t.tx_index = $2",
1059            )
1060            .bind(to_i64(block))
1061            .bind(to_i64(index))
1062            .fetch_optional(&self.pool)
1063            .await
1064            .map_err(SqlColdError::from)?,
1065            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1066                "SELECT t.*, h.block_hash
1067                     FROM transactions t
1068                     JOIN headers h ON t.block_number = h.block_number
1069                     WHERE h.block_hash = $1 AND t.tx_index = $2",
1070            )
1071            .bind(block_hash)
1072            .bind(to_i64(index))
1073            .fetch_optional(&self.pool)
1074            .await
1075            .map_err(SqlColdError::from)?,
1076        };
1077
1078        let Some(r) = row else {
1079            return Ok(None);
1080        };
1081
1082        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1083        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1084        let block_hash = r.get(COL_BLOCK_HASH);
1085        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1086        let meta = ConfirmationMeta::new(block, block_hash, index);
1087        Ok(Some(Confirmed::new(recovered, meta)))
1088    }
1089
1090    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1091        let bn = to_i64(block);
1092        let rows =
1093            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1094                .bind(bn)
1095                .fetch_all(&self.pool)
1096                .await
1097                .map_err(SqlColdError::from)?;
1098
1099        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1100    }
1101
1102    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1103        let bn = to_i64(block);
1104        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1105            .bind(bn)
1106            .fetch_one(&self.pool)
1107            .await
1108            .map_err(SqlColdError::from)?;
1109
1110        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1111    }
1112
1113    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1114        // Resolve to (block, index)
1115        let (block, index) = match spec {
1116            ReceiptSpecifier::TxHash(hash) => {
1117                let row = sqlx::query(
1118                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1119                )
1120                .bind(hash)
1121                .fetch_optional(&self.pool)
1122                .await
1123                .map_err(SqlColdError::from)?;
1124                let Some(r) = row else { return Ok(None) };
1125                (
1126                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1127                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1128                )
1129            }
1130            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1131        };
1132
1133        // Combined query: receipt + tx metadata + full header + prior gas.
1134        // Header columns use standard names (h.*) so header_from_row works.
1135        // Receipt/tx columns use r_ prefix to avoid name collisions.
1136        let combined = sqlx::query(
1137            "SELECT h.*, \
1138               r.tx_type AS r_tx_type, r.success AS r_success, \
1139               r.cumulative_gas_used AS r_cumulative_gas_used, \
1140               r.first_log_index AS r_first_log_index, \
1141               t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1142               COALESCE( \
1143                 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1144                  FROM receipts r2 \
1145                  WHERE r2.block_number = r.block_number \
1146                    AND r2.tx_index < r.tx_index), \
1147                 0 \
1148               ) AS prior_gas \
1149             FROM receipts r \
1150             JOIN transactions t ON r.block_number = t.block_number \
1151               AND r.tx_index = t.tx_index \
1152             JOIN headers h ON r.block_number = h.block_number \
1153             WHERE r.block_number = $1 AND r.tx_index = $2",
1154        )
1155        .bind(to_i64(block))
1156        .bind(to_i64(index))
1157        .fetch_optional(&self.pool)
1158        .await
1159        .map_err(SqlColdError::from)?;
1160
1161        let Some(rr) = combined else {
1162            return Ok(None);
1163        };
1164
1165        // Extract header using existing helper (h.* columns are unaliased).
1166        let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1167
1168        // Extract receipt fields from r_ prefixed aliases.
1169        let tx_hash = rr.get(COL_R_TX_HASH);
1170        let sender = rr.get(COL_R_FROM_ADDRESS);
1171        let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1172        let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1173        let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1174        let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1175        let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1176
1177        // Logs still require a separate query (variable row count).
1178        let log_rows = sqlx::query(
1179            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1180        )
1181        .bind(to_i64(block))
1182        .bind(to_i64(index))
1183        .fetch_all(&self.pool)
1184        .await
1185        .map_err(SqlColdError::from)?;
1186
1187        let logs = log_rows.iter().map(log_from_row).collect();
1188        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1189            .map_err(ColdStorageError::from)?;
1190        // Cumulative gas must be non-decreasing within a block; a violation
1191        // indicates database corruption.
1192        debug_assert!(
1193            built.inner.cumulative_gas_used >= prior_cumulative_gas,
1194            "cumulative gas decreased: {} < {}",
1195            built.inner.cumulative_gas_used,
1196            prior_cumulative_gas,
1197        );
1198        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1199
1200        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1201        Ok(Some(ColdReceipt::new(ir, &header, index)))
1202    }
1203
1204    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1205        let Some(header) =
1206            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1207        else {
1208            return Ok(Vec::new());
1209        };
1210
1211        let bn = to_i64(block);
1212
1213        // Fetch receipts joined with tx_hash and from_address
1214        let receipt_rows = sqlx::query(
1215            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1216               r.cumulative_gas_used, r.first_log_index, \
1217               t.tx_hash, t.from_address \
1218             FROM receipts r \
1219             JOIN transactions t ON r.block_number = t.block_number \
1220               AND r.tx_index = t.tx_index \
1221             WHERE r.block_number = $1 \
1222             ORDER BY r.tx_index",
1223        )
1224        .bind(bn)
1225        .fetch_all(&self.pool)
1226        .await
1227        .map_err(SqlColdError::from)?;
1228
1229        let all_log_rows =
1230            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1231                .bind(bn)
1232                .fetch_all(&self.pool)
1233                .await
1234                .map_err(SqlColdError::from)?;
1235
1236        // Group logs by tx_index
1237        let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1238        for r in &all_log_rows {
1239            let tx_idx: i64 = r.get(COL_TX_INDEX);
1240            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1241        }
1242
1243        let mut first_log_index = 0u64;
1244        let mut prior_cumulative_gas = 0u64;
1245        receipt_rows
1246            .into_iter()
1247            .enumerate()
1248            .map(|(idx, rr)| {
1249                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1250                let tx_hash = rr.get(COL_TX_HASH);
1251                let sender = rr.get(COL_FROM_ADDRESS);
1252                let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1253                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1254                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1255                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1256                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1257                    .map_err(ColdStorageError::from)?;
1258                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1259                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1260                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1261                first_log_index += ir.receipt.inner.logs.len() as u64;
1262                Ok(ColdReceipt::new(ir, &header, idx as u64))
1263            })
1264            .collect()
1265    }
1266
1267    async fn get_signet_events(
1268        &self,
1269        spec: SignetEventsSpecifier,
1270    ) -> ColdResult<Vec<DbSignetEvent>> {
1271        let rows = match spec {
1272            SignetEventsSpecifier::Block(block) => {
1273                let bn = to_i64(block);
1274                sqlx::query(
1275                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1276                )
1277                .bind(bn)
1278                .fetch_all(&self.pool)
1279                .await
1280                .map_err(SqlColdError::from)?
1281            }
1282            SignetEventsSpecifier::BlockRange { start, end } => {
1283                let s = to_i64(start);
1284                let e = to_i64(end);
1285                sqlx::query(
1286                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1287                     ORDER BY block_number, event_index",
1288                )
1289                .bind(s)
1290                .bind(e)
1291                .fetch_all(&self.pool)
1292                .await
1293                .map_err(SqlColdError::from)?
1294            }
1295        };
1296
1297        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1298    }
1299
1300    async fn get_zenith_header(
1301        &self,
1302        spec: ZenithHeaderSpecifier,
1303    ) -> ColdResult<Option<DbZenithHeader>> {
1304        let block = match spec {
1305            ZenithHeaderSpecifier::Number(n) => n,
1306            ZenithHeaderSpecifier::Range { start, .. } => start,
1307        };
1308        let bn = to_i64(block);
1309        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1310            .bind(bn)
1311            .fetch_optional(&self.pool)
1312            .await
1313            .map_err(SqlColdError::from)?;
1314
1315        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1316    }
1317
1318    async fn get_zenith_headers(
1319        &self,
1320        spec: ZenithHeaderSpecifier,
1321    ) -> ColdResult<Vec<DbZenithHeader>> {
1322        let rows = match spec {
1323            ZenithHeaderSpecifier::Number(n) => {
1324                let bn = to_i64(n);
1325                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1326                    .bind(bn)
1327                    .fetch_all(&self.pool)
1328                    .await
1329                    .map_err(SqlColdError::from)?
1330            }
1331            ZenithHeaderSpecifier::Range { start, end } => {
1332                let s = to_i64(start);
1333                let e = to_i64(end);
1334                sqlx::query(
1335                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1336                     ORDER BY block_number",
1337                )
1338                .bind(s)
1339                .bind(e)
1340                .fetch_all(&self.pool)
1341                .await
1342                .map_err(SqlColdError::from)?
1343            }
1344        };
1345
1346        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1347    }
1348
1349    async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1350        let from = filter.get_from_block().unwrap_or(0);
1351        let to = filter.get_to_block().unwrap_or(u64::MAX);
1352
1353        // Build WHERE clause: block range ($1, $2) + address/topic filters.
1354        // ENG-2036: dynamic filter clause requires format!(); mitigated by
1355        // sqlx prepared-statement cache.
1356        let (filter_clause, params) = build_log_filter_clause(filter, 3);
1357        let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1358
1359        // Use LIMIT to cap results. Fetch one extra row to detect overflow
1360        // without a separate COUNT query. PostgreSQL stops scanning after
1361        // finding enough rows, making this faster than COUNT in both the
1362        // under-limit and over-limit cases.
1363        let limit_idx = 3 + params.len() as u32;
1364        let data_sql = format!(
1365            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1366               (r.first_log_index + l.log_index) AS block_log_index \
1367             FROM logs l \
1368             JOIN headers h ON l.block_number = h.block_number \
1369             JOIN transactions t ON l.block_number = t.block_number \
1370               AND l.tx_index = t.tx_index \
1371             JOIN receipts r ON l.block_number = r.block_number \
1372               AND l.tx_index = r.tx_index \
1373             WHERE {where_clause} \
1374             ORDER BY l.block_number, l.tx_index, l.log_index \
1375             LIMIT ${limit_idx}"
1376        );
1377        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1378        for param in &params {
1379            query = query.bind(*param);
1380        }
1381        // Clamp to i64::MAX before converting: SQL LIMIT is an i64, and
1382        // saturating_add avoids overflow when max_logs is usize::MAX.
1383        let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1384        query = query.bind(to_i64(limit as u64));
1385
1386        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1387
1388        if rows.len() > max_logs {
1389            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1390        }
1391
1392        rows.into_iter()
1393            .map(|r| {
1394                let log = log_from_row(&r);
1395                Ok(RpcLog {
1396                    inner: log,
1397                    block_hash: Some(r.get(COL_BLOCK_HASH)),
1398                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1399                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1400                    transaction_hash: Some(r.get(COL_TX_HASH)),
1401                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1402                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1403                    removed: false,
1404                })
1405            })
1406            .collect::<ColdResult<Vec<_>>>()
1407    }
1408
1409    async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1410        #[cfg(feature = "postgres")]
1411        if self.is_postgres {
1412            return self.produce_log_stream_pg(filter, params).await;
1413        }
1414        signet_cold::produce_log_stream_default(self, filter, params).await;
1415    }
1416
1417    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1418        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1419            .fetch_one(&self.pool)
1420            .await
1421            .map_err(SqlColdError::from)?;
1422        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1423    }
1424}
1425
1426impl ColdStorageWrite for SqlColdBackend {
1427    async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1428        self.insert_block(data).await.map_err(ColdStorageError::from)
1429    }
1430
1431    async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1432        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1433        for block_data in data {
1434            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1435        }
1436        tx.commit().await.map_err(SqlColdError::from)?;
1437        Ok(())
1438    }
1439
1440    async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1441        let bn = to_i64(block);
1442        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1443
1444        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1445
1446        tx.commit().await.map_err(SqlColdError::from)?;
1447        Ok(())
1448    }
1449}
1450
1451impl ColdStorage for SqlColdBackend {
1452    async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1453        let bn = to_i64(block);
1454        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1455
1456        // 1. Fetch all headers above block.
1457        let header_rows =
1458            sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1459                .bind(bn)
1460                .fetch_all(&mut *tx)
1461                .await
1462                .map_err(SqlColdError::from)?;
1463
1464        if header_rows.is_empty() {
1465            tx.commit().await.map_err(SqlColdError::from)?;
1466            return Ok(Vec::new());
1467        }
1468
1469        let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1470        for r in &header_rows {
1471            let num: i64 = r.get(COL_BLOCK_NUMBER);
1472            let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1473            headers.insert(num, h);
1474        }
1475
1476        // 2. Fetch all receipt + tx metadata above block.
1477        let receipt_rows = sqlx::query(
1478            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1479               r.cumulative_gas_used, r.first_log_index, \
1480               t.tx_hash, t.from_address \
1481             FROM receipts r \
1482             JOIN transactions t ON r.block_number = t.block_number \
1483               AND r.tx_index = t.tx_index \
1484             WHERE r.block_number > $1 \
1485             ORDER BY r.block_number, r.tx_index",
1486        )
1487        .bind(bn)
1488        .fetch_all(&mut *tx)
1489        .await
1490        .map_err(SqlColdError::from)?;
1491
1492        // 3. Fetch all logs above block.
1493        let log_rows = sqlx::query(
1494            "SELECT * FROM logs WHERE block_number > $1 \
1495             ORDER BY block_number, tx_index, log_index",
1496        )
1497        .bind(bn)
1498        .fetch_all(&mut *tx)
1499        .await
1500        .map_err(SqlColdError::from)?;
1501
1502        // Group logs by (block_number, tx_index).
1503        let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1504        for r in &log_rows {
1505            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1506            let tx_idx: i64 = r.get(COL_TX_INDEX);
1507            logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1508        }
1509
1510        // Group receipt rows by block_number.
1511        let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1512        for r in &receipt_rows {
1513            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1514            receipts_by_block.entry(block_num).or_default().push(r);
1515        }
1516
1517        // 4. Assemble ColdReceipts per block.
1518        let mut all_receipts = Vec::with_capacity(headers.len());
1519        for (&block_num, header) in &headers {
1520            let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1521            let mut prior_cumulative_gas = 0u64;
1522            let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1523                .into_iter()
1524                .map(|rr: &sqlx::any::AnyRow| {
1525                    let tx_idx: i64 = rr.get(COL_TX_INDEX);
1526                    let tx_hash = rr.get(COL_TX_HASH);
1527                    let sender = rr.get(COL_FROM_ADDRESS);
1528                    let tx_type: i32 = rr.get(COL_TX_TYPE);
1529                    let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1530                    let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1531                    let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1532                    let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1533                    let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1534                        .map_err(ColdStorageError::from)?;
1535                    // Cumulative gas must be non-decreasing within a block; a violation
1536                    // indicates database corruption.
1537                    debug_assert!(
1538                        receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1539                        "cumulative gas decreased: {} < {}",
1540                        receipt.inner.cumulative_gas_used,
1541                        prior_cumulative_gas,
1542                    );
1543                    let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1544                    prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1545                    let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1546                    Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1547                })
1548                .collect();
1549            all_receipts.push(block_receipts?);
1550        }
1551
1552        // 5. Delete from all tables (same order as truncate_above).
1553        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1554
1555        tx.commit().await.map_err(SqlColdError::from)?;
1556        Ok(all_receipts)
1557    }
1558}
1559
1560#[cfg(all(test, feature = "test-utils"))]
1561mod tests {
1562    use super::*;
1563    use signet_cold::conformance::conformance;
1564
1565    #[tokio::test]
1566    async fn sqlite_conformance() {
1567        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1568        conformance(backend).await.unwrap();
1569    }
1570
1571    #[tokio::test]
1572    async fn pg_conformance() {
1573        let Ok(url) = std::env::var("DATABASE_URL") else {
1574            eprintln!("skipping pg conformance: DATABASE_URL not set");
1575            return;
1576        };
1577        let backend = SqlColdBackend::connect(&url).await.unwrap();
1578        conformance(backend).await.unwrap();
1579    }
1580}