Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14    COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15    COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16    COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17    COL_PARENT_HASH, COL_PRIOR_GAS, COL_R_CUMULATIVE_GAS_USED, COL_R_FIRST_LOG_INDEX,
18    COL_R_FROM_ADDRESS, COL_R_SUCCESS, COL_R_TX_HASH, COL_R_TX_TYPE, COL_RECEIPTS_ROOT,
19    COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER,
20    COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP,
21    COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3,
22    COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, COL_WITHDRAWALS_ROOT,
23};
24use crate::convert::{
25    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
26    decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
27    encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
28};
29use alloy::{
30    consensus::{
31        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32        transaction::Recovered,
33    },
34    primitives::{
35        Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
36    },
37};
38use signet_cold::{
39    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
40    ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
41    SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
42};
43use signet_storage_types::{
44    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
45    TransactionSigned,
46};
47use signet_zenith::{
48    Passage::{Enter, EnterToken},
49    Transactor::Transact,
50    Zenith,
51};
52use sqlx::{AnyPool, Row};
53use std::collections::BTreeMap;
54
55/// SQL-based cold storage backend.
56///
57/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
58/// PostgreSQL and SQLite through a single implementation. The backend
59/// is determined by the connection URL at construction time.
60///
61/// # Example
62///
63/// ```no_run
64/// # async fn example() {
65/// use signet_cold_sql::SqlColdBackend;
66///
67/// // SQLite (in-memory)
68/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
69///
70/// // PostgreSQL
71/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
72/// # }
73/// ```
74#[derive(Debug, Clone)]
75pub struct SqlColdBackend {
76    pool: AnyPool,
77    is_postgres: bool,
78}
79
80/// Returns `true` if the URL refers to an in-memory SQLite database.
81///
82/// In-memory SQLite databases are per-connection, so the pool must be
83/// limited to a single connection to ensure all queries see the same
84/// state.
85fn is_in_memory_sqlite(url: &str) -> bool {
86    url == "sqlite::memory:" || url.contains("mode=memory")
87}
88
89/// Delete all rows with `block_number > bn` from every table, in
90/// child-first order to preserve foreign-key constraints.
91async fn delete_above_in_tx(
92    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
93    bn: i64,
94) -> Result<(), SqlColdError> {
95    for table in ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
96    {
97        sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
98            .bind(bn)
99            .execute(&mut **tx)
100            .await?;
101    }
102    Ok(())
103}
104
105impl SqlColdBackend {
106    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
107    ///
108    /// Auto-detects the database backend and creates all tables if they
109    /// do not already exist. Callers must ensure
110    /// [`sqlx::any::install_default_drivers`] has been called before
111    /// constructing the pool.
112    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
113        // Detect backend from a pooled connection.
114        let conn = pool.acquire().await?;
115        let backend = conn.backend_name().to_owned();
116        drop(conn);
117
118        let migration = match backend.as_str() {
119            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
120            "SQLite" => include_str!("../migrations/001_initial.sql"),
121            other => {
122                return Err(SqlColdError::Convert(format!(
123                    "unsupported database backend: {other}"
124                )));
125            }
126        };
127        // Execute via pool to ensure the migration uses the same
128        // connection that subsequent queries will use.
129        let is_postgres = backend == "PostgreSQL";
130        sqlx::raw_sql(migration).execute(&pool).await?;
131        Ok(Self { pool, is_postgres })
132    }
133
134    /// Connect to a database URL with explicit pool options.
135    ///
136    /// Installs the default sqlx drivers on the first call. The database
137    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
138    ///
139    /// For in-memory SQLite URLs (`sqlite::memory:` or `mode=memory`),
140    /// `max_connections` is forced to 1 regardless of the provided
141    /// options, because in-memory databases are per-connection.
142    pub async fn connect_with(
143        url: &str,
144        pool_opts: sqlx::pool::PoolOptions<sqlx::Any>,
145    ) -> Result<Self, SqlColdError> {
146        sqlx::any::install_default_drivers();
147        let pool_opts =
148            if is_in_memory_sqlite(url) { pool_opts.max_connections(1) } else { pool_opts };
149        let pool: AnyPool = pool_opts.connect(url).await?;
150        Self::new(pool).await
151    }
152
153    /// Connect to a database URL with default pool settings.
154    ///
155    /// Convenience wrapper around [`connect_with`](Self::connect_with).
156    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
157        Self::connect_with(url, sqlx::pool::PoolOptions::new()).await
158    }
159
160    // ========================================================================
161    // Specifier resolution
162    // ========================================================================
163
164    async fn resolve_header_spec(
165        &self,
166        spec: HeaderSpecifier,
167    ) -> Result<Option<BlockNumber>, SqlColdError> {
168        match spec {
169            HeaderSpecifier::Number(n) => Ok(Some(n)),
170            HeaderSpecifier::Hash(hash) => {
171                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
172                    .bind(hash)
173                    .fetch_optional(&self.pool)
174                    .await?;
175                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
176            }
177        }
178    }
179
180    // ========================================================================
181    // Read helpers
182    // ========================================================================
183
184    async fn fetch_header_by_number(
185        &self,
186        block_num: BlockNumber,
187    ) -> Result<Option<SealedHeader>, SqlColdError> {
188        let bn = to_i64(block_num);
189        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
190            .bind(bn)
191            .fetch_optional(&self.pool)
192            .await?;
193
194        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
195    }
196
197    // ========================================================================
198    // Write helpers
199    // ========================================================================
200
201    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
202        let mut tx = self.pool.begin().await?;
203        write_block_to_tx(&mut tx, data).await?;
204        tx.commit().await?;
205        Ok(())
206    }
207
208    // ========================================================================
209    // Streaming helpers
210    // ========================================================================
211
212    /// Stream logs using a PostgreSQL REPEATABLE READ transaction.
213    ///
214    /// The transaction provides a consistent snapshot across all per-block
215    /// queries, eliminating the need for anchor-hash reorg detection.
216    /// Rows are streamed individually rather than materialised per block.
217    #[cfg(feature = "postgres")]
218    async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
219        use tokio_stream::StreamExt;
220
221        /// Unwrap a `Result` or send the error through the stream and return.
222        macro_rules! try_stream {
223            ($sender:expr, $expr:expr) => {
224                match $expr {
225                    Ok(v) => v,
226                    Err(e) => {
227                        let _ = $sender
228                            .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
229                            .await;
230                        return;
231                    }
232                }
233            };
234        }
235
236        let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
237
238        // Open a REPEATABLE READ transaction so all per-block queries see a
239        // consistent snapshot. This makes reorg detection unnecessary — if a
240        // reorg lands mid-stream the transaction still reads the old data.
241        let mut tx = try_stream!(sender, self.pool.begin().await);
242        try_stream!(
243            sender,
244            sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
245        );
246
247        // Build the parameterised query once. $1 is the block number
248        // (bound per iteration); remaining parameters are the address
249        // and topic filters from the user's request.
250        let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
251        let data_sql = format!(
252            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
253               (r.first_log_index + l.log_index) AS block_log_index \
254             FROM logs l \
255             JOIN headers h ON l.block_number = h.block_number \
256             JOIN transactions t ON l.block_number = t.block_number \
257               AND l.tx_index = t.tx_index \
258             JOIN receipts r ON l.block_number = r.block_number \
259               AND l.tx_index = r.tx_index \
260             WHERE l.block_number = $1{filter_clause} \
261             ORDER BY l.tx_index, l.log_index"
262        );
263
264        let mut total = 0usize;
265
266        // Walk through blocks one at a time, streaming matching log rows
267        // from each block directly to the channel.
268        for block_num in from..=to {
269            // Check the deadline before starting each block so we
270            // don't begin a new query after the caller's timeout.
271            if tokio::time::Instant::now() > deadline {
272                let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
273                return;
274            }
275
276            let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
277            for param in &filter_params {
278                query = query.bind(*param);
279            }
280
281            // Stream rows from this block's query. Each row is converted
282            // to an RpcLog and sent over the channel immediately rather
283            // than being collected into a Vec first.
284            let mut stream = query.fetch(&mut *tx);
285            while let Some(row_result) = stream.next().await {
286                let r = try_stream!(sender, row_result);
287
288                // Enforce the global log limit across all blocks.
289                total += 1;
290                if total > max_logs {
291                    let _ =
292                        sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
293                    return;
294                }
295
296                let log = log_from_row(&r);
297                let rpc_log = RpcLog {
298                    inner: log,
299                    block_hash: Some(r.get(COL_BLOCK_HASH)),
300                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
301                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
302                    transaction_hash: Some(r.get(COL_TX_HASH)),
303                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
304                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
305                    removed: false,
306                };
307                // Send the log to the caller. The timeout ensures we
308                // stop if the deadline passes while back-pressured.
309                match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
310                    Ok(Ok(())) => {}
311                    Ok(Err(_)) => return, // receiver dropped
312                    Err(_) => {
313                        let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
314                        return;
315                    }
316                }
317            }
318
319            // Early exit if we've already hit the limit — no need to
320            // query the next block.
321            if total >= max_logs {
322                return;
323            }
324        }
325    }
326}
327
328// ============================================================================
329// Row → domain type conversion (read path)
330// ============================================================================
331
332/// Extract a required BLOB column from a row as a borrowed slice.
333fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
334    r.get(col)
335}
336
337/// Extract an optional BLOB column from a row as a borrowed slice.
338fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
339    r.get(col)
340}
341
342/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
343fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
344    Ok(Header {
345        parent_hash: r.get(COL_PARENT_HASH),
346        ommers_hash: r.get(COL_OMMERS_HASH),
347        beneficiary: r.get(COL_BENEFICIARY),
348        state_root: r.get(COL_STATE_ROOT),
349        transactions_root: r.get(COL_TRANSACTIONS_ROOT),
350        receipts_root: r.get(COL_RECEIPTS_ROOT),
351        logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
352        difficulty: r.get(COL_DIFFICULTY),
353        number: from_i64(r.get(COL_BLOCK_NUMBER)),
354        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
355        gas_used: from_i64(r.get(COL_GAS_USED)),
356        timestamp: from_i64(r.get(COL_TIMESTAMP)),
357        extra_data: r.get(COL_EXTRA_DATA),
358        mix_hash: r.get(COL_MIX_HASH),
359        nonce: r.get(COL_NONCE),
360        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
361        withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
362        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
363        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
364        parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
365        requests_hash: r.get(COL_REQUESTS_HASH),
366    })
367}
368
369/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
370fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
371    use alloy::consensus::EthereumTxEnvelope;
372
373    let sig =
374        Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
375
376    let tx_type_raw: i32 = r.get(COL_TX_TYPE);
377    let tx_type_u8: u8 = tx_type_raw
378        .try_into()
379        .map_err(|_| SqlColdError::Convert(format!("tx_type out of u8 range: {tx_type_raw}")))?;
380    let tx_type = TxType::try_from(tx_type_u8)
381        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_u8}")))?;
382
383    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
384    let nonce = from_i64(r.get(COL_NONCE));
385    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
386    let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
387    let value: U256 = r.get(COL_VALUE);
388    let input: Bytes = r.get(COL_INPUT);
389
390    match tx_type {
391        TxType::Legacy => {
392            let tx = TxLegacy {
393                chain_id: chain_id.map(from_i64),
394                nonce,
395                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
396                gas_limit,
397                to: to_addr.map_or(TxKind::Create, TxKind::Call),
398                value,
399                input,
400            };
401            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
402        }
403        TxType::Eip2930 => {
404            let tx = TxEip2930 {
405                chain_id: from_i64(
406                    chain_id
407                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
408                ),
409                nonce,
410                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
411                gas_limit,
412                to: to_addr.map_or(TxKind::Create, TxKind::Call),
413                value,
414                input,
415                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
416            };
417            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
418        }
419        TxType::Eip1559 => {
420            let tx = TxEip1559 {
421                chain_id: from_i64(
422                    chain_id
423                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
424                ),
425                nonce,
426                gas_limit,
427                max_fee_per_gas: decode_u128_required(
428                    opt_blob(r, COL_MAX_FEE_PER_GAS),
429                    COL_MAX_FEE_PER_GAS,
430                )?,
431                max_priority_fee_per_gas: decode_u128_required(
432                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
433                    COL_MAX_PRIORITY_FEE_PER_GAS,
434                )?,
435                to: to_addr.map_or(TxKind::Create, TxKind::Call),
436                value,
437                input,
438                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
439            };
440            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
441        }
442        TxType::Eip4844 => {
443            let tx = TxEip4844 {
444                chain_id: from_i64(
445                    chain_id
446                        .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
447                ),
448                nonce,
449                gas_limit,
450                max_fee_per_gas: decode_u128_required(
451                    opt_blob(r, COL_MAX_FEE_PER_GAS),
452                    COL_MAX_FEE_PER_GAS,
453                )?,
454                max_priority_fee_per_gas: decode_u128_required(
455                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
456                    COL_MAX_PRIORITY_FEE_PER_GAS,
457                )?,
458                to: to_addr
459                    .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
460                value,
461                input,
462                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
463                blob_versioned_hashes: decode_b256_vec(
464                    opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
465                        SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
466                    })?,
467                )?,
468                max_fee_per_blob_gas: decode_u128_required(
469                    opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
470                    COL_MAX_FEE_PER_BLOB_GAS,
471                )?,
472            };
473            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
474        }
475        TxType::Eip7702 => {
476            let tx = TxEip7702 {
477                chain_id: from_i64(
478                    chain_id
479                        .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
480                ),
481                nonce,
482                gas_limit,
483                max_fee_per_gas: decode_u128_required(
484                    opt_blob(r, COL_MAX_FEE_PER_GAS),
485                    COL_MAX_FEE_PER_GAS,
486                )?,
487                max_priority_fee_per_gas: decode_u128_required(
488                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
489                    COL_MAX_PRIORITY_FEE_PER_GAS,
490                )?,
491                to: to_addr
492                    .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
493                value,
494                input,
495                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
496                authorization_list: decode_authorization_list(
497                    opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
498                        SqlColdError::Convert("EIP7702 requires authorization_list".into())
499                    })?,
500                )?,
501            };
502            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
503        }
504    }
505}
506
507/// Build a [`RecoveredTx`] from a row that includes `from_address`.
508fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
509    let sender: Address = r.get(COL_FROM_ADDRESS);
510    let tx = tx_from_row(r)?;
511    // SAFETY: the sender was recovered at append time and stored in from_address.
512    Ok(Recovered::new_unchecked(tx, sender))
513}
514
515/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
516fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
517    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
518        .into_iter()
519        .filter_map(|col| r.get::<Option<B256>, _>(col))
520        .collect();
521    Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
522}
523
524/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
525fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
526    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
527    let order = from_i64(r.get(COL_ORDER_INDEX));
528    let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
529
530    match event_type {
531        EVENT_TRANSACT => {
532            let sender: Address = r
533                .get::<Option<Address>, _>(COL_SENDER)
534                .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
535            let to: Address = r
536                .get::<Option<Address>, _>(COL_TO_ADDRESS)
537                .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
538            let value: U256 = r
539                .get::<Option<U256>, _>(COL_VALUE)
540                .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
541            let gas: U256 = r
542                .get::<Option<U256>, _>(COL_GAS)
543                .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
544            let max_fee: U256 = r
545                .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
546                .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
547            let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
548
549            Ok(DbSignetEvent::Transact(
550                order,
551                Transact {
552                    rollupChainId: rollup_chain_id,
553                    sender,
554                    to,
555                    value,
556                    gas,
557                    maxFeePerGas: max_fee,
558                    data,
559                },
560            ))
561        }
562        EVENT_ENTER => {
563            let recipient: Address = r
564                .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
565                .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
566            let amount: U256 = r
567                .get::<Option<U256>, _>(COL_AMOUNT)
568                .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
569
570            Ok(DbSignetEvent::Enter(
571                order,
572                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
573            ))
574        }
575        EVENT_ENTER_TOKEN => {
576            let token: Address = r
577                .get::<Option<Address>, _>(COL_TOKEN)
578                .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
579            let recipient: Address =
580                r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
581                    SqlColdError::Convert("EnterToken requires rollup_recipient".into())
582                })?;
583            let amount: U256 = r
584                .get::<Option<U256>, _>(COL_AMOUNT)
585                .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
586
587            Ok(DbSignetEvent::EnterToken(
588                order,
589                EnterToken {
590                    rollupChainId: rollup_chain_id,
591                    token,
592                    rollupRecipient: recipient,
593                    amount,
594                },
595            ))
596        }
597        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
598    }
599}
600
601/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
602fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
603    Ok(DbZenithHeader(Zenith::BlockHeader {
604        hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
605        rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
606        gasLimit: r.get(COL_GAS_LIMIT),
607        rewardAddress: r.get(COL_REWARD_ADDRESS),
608        blockDataHash: r.get(COL_BLOCK_DATA_HASH),
609    }))
610}
611
612// ============================================================================
613// Domain type → SQL INSERT (write path)
614// ============================================================================
615
616/// Write a single block's data into an open SQL transaction.
617async fn write_block_to_tx(
618    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
619    data: BlockData,
620) -> Result<(), SqlColdError> {
621    let bn = to_i64(data.block_number());
622
623    // Insert header
624    let block_hash = data.header.hash_slow();
625    let difficulty = &data.header.difficulty;
626    sqlx::query(
627        "INSERT INTO headers (
628            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
629            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
630            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
631            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
632            parent_beacon_block_root, requests_hash
633        ) VALUES (
634            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
635            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
636        )",
637    )
638    .bind(bn)
639    .bind(block_hash)
640    .bind(data.header.parent_hash)
641    .bind(data.header.ommers_hash)
642    .bind(data.header.beneficiary)
643    .bind(data.header.state_root)
644    .bind(data.header.transactions_root)
645    .bind(data.header.receipts_root)
646    .bind(data.header.logs_bloom)
647    .bind(difficulty)
648    .bind(to_i64(data.header.gas_limit))
649    .bind(to_i64(data.header.gas_used))
650    .bind(to_i64(data.header.timestamp))
651    .bind(&data.header.extra_data)
652    .bind(data.header.mix_hash)
653    .bind(data.header.nonce)
654    .bind(data.header.base_fee_per_gas.map(to_i64))
655    .bind(data.header.withdrawals_root.as_ref())
656    .bind(data.header.blob_gas_used.map(to_i64))
657    .bind(data.header.excess_blob_gas.map(to_i64))
658    .bind(data.header.parent_beacon_block_root.as_ref())
659    .bind(data.header.requests_hash.as_ref())
660    .execute(&mut **tx)
661    .await?;
662
663    // Insert transactions
664    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
665        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
666    }
667
668    // Insert receipts and logs, computing first_log_index as a running
669    // sum of log counts (same algorithm as the MDBX IndexedReceipt path).
670    let mut first_log_index = 0i64;
671    for (idx, receipt) in data.receipts.iter().enumerate() {
672        let tx_idx = to_i64(idx as u64);
673        sqlx::query(
674            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
675             VALUES ($1, $2, $3, $4, $5, $6)",
676        )
677        .bind(bn)
678        .bind(tx_idx)
679        .bind(receipt.tx_type as i32)
680        .bind(receipt.inner.status.coerce_status() as i32)
681        .bind(to_i64(receipt.inner.cumulative_gas_used))
682        .bind(first_log_index)
683        .execute(&mut **tx)
684        .await?;
685        first_log_index += receipt.inner.logs.len() as i64;
686
687        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
688            let topics = log.topics();
689            sqlx::query(
690                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
691                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
692            )
693            .bind(bn)
694            .bind(tx_idx)
695            .bind(to_i64(log_idx as u64))
696            .bind(log.address)
697            .bind(topics.first())
698            .bind(topics.get(1))
699            .bind(topics.get(2))
700            .bind(topics.get(3))
701            .bind(&log.data.data)
702            .execute(&mut **tx)
703            .await?;
704        }
705    }
706
707    // Insert signet events
708    for (idx, event) in data.signet_events.iter().enumerate() {
709        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
710    }
711
712    // Insert zenith header
713    if let Some(zh) = &data.zenith_header {
714        let h = &zh.0;
715        sqlx::query(
716            "INSERT INTO zenith_headers (
717                block_number, host_block_number, rollup_chain_id,
718                gas_limit, reward_address, block_data_hash
719            ) VALUES ($1, $2, $3, $4, $5, $6)",
720        )
721        .bind(bn)
722        .bind(h.hostBlockNumber)
723        .bind(h.rollupChainId)
724        .bind(h.gasLimit)
725        .bind(h.rewardAddress)
726        .bind(h.blockDataHash)
727        .execute(&mut **tx)
728        .await?;
729    }
730
731    Ok(())
732}
733
734/// Insert a transaction, binding directly from the source type.
735async fn insert_transaction(
736    conn: &mut sqlx::AnyConnection,
737    bn: i64,
738    tx_index: i64,
739    recovered: &RecoveredTx,
740) -> Result<(), SqlColdError> {
741    use alloy::consensus::EthereumTxEnvelope;
742
743    let sender = recovered.signer();
744    let tx: &TransactionSigned = recovered;
745    let tx_hash = tx.tx_hash();
746    let tx_type = tx.tx_type() as i32;
747
748    macro_rules! sig {
749        ($s:expr) => {{
750            let sig = $s.signature();
751            (sig.v() as i32, sig.r(), sig.s())
752        }};
753    }
754    let (sig_y, sig_r, sig_s) = match tx {
755        EthereumTxEnvelope::Legacy(s) => sig!(s),
756        EthereumTxEnvelope::Eip2930(s) => sig!(s),
757        EthereumTxEnvelope::Eip1559(s) => sig!(s),
758        EthereumTxEnvelope::Eip4844(s) => sig!(s),
759        EthereumTxEnvelope::Eip7702(s) => sig!(s),
760    };
761
762    let (chain_id, nonce, gas_limit) = match tx {
763        EthereumTxEnvelope::Legacy(s) => {
764            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
765        }
766        EthereumTxEnvelope::Eip2930(s) => {
767            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
768        }
769        EthereumTxEnvelope::Eip1559(s) => {
770            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
771        }
772        EthereumTxEnvelope::Eip4844(s) => {
773            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
774        }
775        EthereumTxEnvelope::Eip7702(s) => {
776            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
777        }
778    };
779
780    let (value, to_addr) = match tx {
781        EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
782        EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
783        EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
784        EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
785        EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
786    };
787
788    let input: &[u8] = match tx {
789        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
790        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
791        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
792        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
793        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
794    };
795
796    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
797        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
798        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
799        EthereumTxEnvelope::Eip1559(s) => (
800            None,
801            Some(encode_u128(s.tx().max_fee_per_gas)),
802            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
803            None,
804        ),
805        EthereumTxEnvelope::Eip4844(s) => (
806            None,
807            Some(encode_u128(s.tx().max_fee_per_gas)),
808            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
809            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
810        ),
811        EthereumTxEnvelope::Eip7702(s) => (
812            None,
813            Some(encode_u128(s.tx().max_fee_per_gas)),
814            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
815            None,
816        ),
817    };
818
819    let (access_list, blob_hashes, auth_list) = match tx {
820        EthereumTxEnvelope::Legacy(_) => (None, None, None),
821        EthereumTxEnvelope::Eip2930(s) => {
822            (Some(encode_access_list(&s.tx().access_list)), None, None)
823        }
824        EthereumTxEnvelope::Eip1559(s) => {
825            (Some(encode_access_list(&s.tx().access_list)), None, None)
826        }
827        EthereumTxEnvelope::Eip4844(s) => (
828            Some(encode_access_list(&s.tx().access_list)),
829            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
830            None,
831        ),
832        EthereumTxEnvelope::Eip7702(s) => (
833            Some(encode_access_list(&s.tx().access_list)),
834            None,
835            Some(encode_authorization_list(&s.tx().authorization_list)),
836        ),
837    };
838
839    sqlx::query(
840        "INSERT INTO transactions (
841            block_number, tx_index, tx_hash, tx_type,
842            sig_y_parity, sig_r, sig_s,
843            chain_id, nonce, gas_limit, to_address, value, input,
844            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
845            max_fee_per_blob_gas, blob_versioned_hashes,
846            access_list, authorization_list, from_address
847        ) VALUES (
848            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
849            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
850        )",
851    )
852    .bind(bn)
853    .bind(tx_index)
854    .bind(tx_hash)
855    .bind(tx_type)
856    .bind(sig_y)
857    .bind(sig_r)
858    .bind(sig_s)
859    .bind(chain_id)
860    .bind(nonce)
861    .bind(gas_limit)
862    .bind(to_addr)
863    .bind(value)
864    .bind(input)
865    .bind(gas_price.as_ref().map(|v| v.as_slice()))
866    .bind(max_fee.as_ref().map(|v| v.as_slice()))
867    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
868    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
869    .bind(blob_hashes.as_deref())
870    .bind(access_list.as_deref())
871    .bind(auth_list.as_deref())
872    .bind(sender)
873    .execute(&mut *conn)
874    .await?;
875
876    Ok(())
877}
878
879/// Insert a signet event, binding directly from the source type.
880async fn insert_signet_event(
881    conn: &mut sqlx::AnyConnection,
882    block_number: i64,
883    event_index: i64,
884    event: &DbSignetEvent,
885) -> Result<(), SqlColdError> {
886    let (event_type, order, chain_id) = match event {
887        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
888        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
889        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
890    };
891
892    let (value, gas, max_fee, amount) = match event {
893        DbSignetEvent::Transact(_, t) => {
894            (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
895        }
896        DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
897        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
898    };
899
900    sqlx::query(
901        "INSERT INTO signet_events (
902            block_number, event_index, event_type, order_index,
903            rollup_chain_id, sender, to_address, value, gas,
904            max_fee_per_gas, data, rollup_recipient, amount, token
905        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
906    )
907    .bind(block_number)
908    .bind(event_index)
909    .bind(event_type)
910    .bind(order)
911    .bind(chain_id)
912    .bind(match event {
913        DbSignetEvent::Transact(_, t) => Some(&t.sender),
914        _ => None,
915    })
916    .bind(match event {
917        DbSignetEvent::Transact(_, t) => Some(&t.to),
918        _ => None,
919    })
920    .bind(value)
921    .bind(gas)
922    .bind(max_fee)
923    .bind(match event {
924        DbSignetEvent::Transact(_, t) => Some(&t.data),
925        _ => None,
926    })
927    .bind(match event {
928        DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
929        DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
930        _ => None,
931    })
932    .bind(amount)
933    .bind(match event {
934        DbSignetEvent::EnterToken(_, e) => Some(&e.token),
935        _ => None,
936    })
937    .execute(&mut *conn)
938    .await?;
939
940    Ok(())
941}
942
943// ============================================================================
944// Log filter helpers
945// ============================================================================
946
947/// Append a SQL filter clause for a set of byte-encoded values.
948///
949/// For a single value, generates ` AND {column} = ${idx}`.
950/// For multiple values, generates ` AND {column} IN (${idx}, ...)`.
951/// Returns the next available parameter index.
952fn append_filter_clause<'a>(
953    clause: &mut String,
954    params: &mut Vec<&'a [u8]>,
955    mut idx: u32,
956    column: &str,
957    values: impl ExactSizeIterator<Item = &'a [u8]>,
958) -> u32 {
959    use std::fmt::Write;
960
961    let len = values.len();
962    if len == 1 {
963        write!(clause, " AND {column} = ${idx}").unwrap();
964        values.for_each(|v| params.push(v));
965        return idx + 1;
966    }
967    write!(clause, " AND {column} IN (").unwrap();
968    for (i, v) in values.enumerate() {
969        if i > 0 {
970            clause.push_str(", ");
971        }
972        write!(clause, "${idx}").unwrap();
973        params.push(v);
974        idx += 1;
975    }
976    clause.push(')');
977    idx
978}
979
980fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
981    let mut clause = String::new();
982    let mut params: Vec<&[u8]> = Vec::new();
983    let mut idx = start_idx;
984
985    if !filter.address.is_empty() {
986        idx = append_filter_clause(
987            &mut clause,
988            &mut params,
989            idx,
990            "l.address",
991            filter.address.iter().map(|a| a.as_slice()),
992        );
993    }
994
995    let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
996    for (i, topic_filter) in filter.topics.iter().enumerate() {
997        if topic_filter.is_empty() {
998            continue;
999        }
1000        idx = append_filter_clause(
1001            &mut clause,
1002            &mut params,
1003            idx,
1004            topic_cols[i],
1005            topic_filter.iter().map(|v| v.as_slice()),
1006        );
1007    }
1008
1009    (clause, params)
1010}
1011
1012// ============================================================================
1013// ColdStorage implementation
1014// ============================================================================
1015
1016impl ColdStorageRead for SqlColdBackend {
1017    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
1018        let Some(block_num) = self.resolve_header_spec(spec).await? else {
1019            return Ok(None);
1020        };
1021        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
1022    }
1023
1024    async fn get_headers(
1025        &self,
1026        specs: Vec<HeaderSpecifier>,
1027    ) -> ColdResult<Vec<Option<SealedHeader>>> {
1028        let mut results = Vec::with_capacity(specs.len());
1029        for spec in specs {
1030            let header = self.get_header(spec).await?;
1031            results.push(header);
1032        }
1033        Ok(results)
1034    }
1035
1036    async fn get_transaction(
1037        &self,
1038        spec: TransactionSpecifier,
1039    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
1040        let row = match spec {
1041            TransactionSpecifier::Hash(hash) => sqlx::query(
1042                "SELECT t.*, h.block_hash
1043                     FROM transactions t
1044                     JOIN headers h ON t.block_number = h.block_number
1045                     WHERE t.tx_hash = $1",
1046            )
1047            .bind(hash)
1048            .fetch_optional(&self.pool)
1049            .await
1050            .map_err(SqlColdError::from)?,
1051            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1052                "SELECT t.*, h.block_hash
1053                     FROM transactions t
1054                     JOIN headers h ON t.block_number = h.block_number
1055                     WHERE t.block_number = $1 AND t.tx_index = $2",
1056            )
1057            .bind(to_i64(block))
1058            .bind(to_i64(index))
1059            .fetch_optional(&self.pool)
1060            .await
1061            .map_err(SqlColdError::from)?,
1062            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1063                "SELECT t.*, h.block_hash
1064                     FROM transactions t
1065                     JOIN headers h ON t.block_number = h.block_number
1066                     WHERE h.block_hash = $1 AND t.tx_index = $2",
1067            )
1068            .bind(block_hash)
1069            .bind(to_i64(index))
1070            .fetch_optional(&self.pool)
1071            .await
1072            .map_err(SqlColdError::from)?,
1073        };
1074
1075        let Some(r) = row else {
1076            return Ok(None);
1077        };
1078
1079        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1080        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1081        let block_hash = r.get(COL_BLOCK_HASH);
1082        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1083        let meta = ConfirmationMeta::new(block, block_hash, index);
1084        Ok(Some(Confirmed::new(recovered, meta)))
1085    }
1086
1087    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1088        let bn = to_i64(block);
1089        let rows =
1090            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1091                .bind(bn)
1092                .fetch_all(&self.pool)
1093                .await
1094                .map_err(SqlColdError::from)?;
1095
1096        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1097    }
1098
1099    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1100        let bn = to_i64(block);
1101        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1102            .bind(bn)
1103            .fetch_one(&self.pool)
1104            .await
1105            .map_err(SqlColdError::from)?;
1106
1107        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1108    }
1109
1110    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1111        // Resolve to (block, index)
1112        let (block, index) = match spec {
1113            ReceiptSpecifier::TxHash(hash) => {
1114                let row = sqlx::query(
1115                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1116                )
1117                .bind(hash)
1118                .fetch_optional(&self.pool)
1119                .await
1120                .map_err(SqlColdError::from)?;
1121                let Some(r) = row else { return Ok(None) };
1122                (
1123                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1124                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1125                )
1126            }
1127            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1128        };
1129
1130        // Combined query: receipt + tx metadata + full header + prior gas.
1131        // Header columns use standard names (h.*) so header_from_row works.
1132        // Receipt/tx columns use r_ prefix to avoid name collisions.
1133        let combined = sqlx::query(
1134            "SELECT h.*, \
1135               r.tx_type AS r_tx_type, r.success AS r_success, \
1136               r.cumulative_gas_used AS r_cumulative_gas_used, \
1137               r.first_log_index AS r_first_log_index, \
1138               t.tx_hash AS r_tx_hash, t.from_address AS r_from_address, \
1139               COALESCE( \
1140                 (SELECT CAST(MAX(r2.cumulative_gas_used) AS bigint) \
1141                  FROM receipts r2 \
1142                  WHERE r2.block_number = r.block_number \
1143                    AND r2.tx_index < r.tx_index), \
1144                 0 \
1145               ) AS prior_gas \
1146             FROM receipts r \
1147             JOIN transactions t ON r.block_number = t.block_number \
1148               AND r.tx_index = t.tx_index \
1149             JOIN headers h ON r.block_number = h.block_number \
1150             WHERE r.block_number = $1 AND r.tx_index = $2",
1151        )
1152        .bind(to_i64(block))
1153        .bind(to_i64(index))
1154        .fetch_optional(&self.pool)
1155        .await
1156        .map_err(SqlColdError::from)?;
1157
1158        let Some(rr) = combined else {
1159            return Ok(None);
1160        };
1161
1162        // Extract header using existing helper (h.* columns are unaliased).
1163        let header = header_from_row(&rr).map_err(ColdStorageError::from)?.seal_slow();
1164
1165        // Extract receipt fields from r_ prefixed aliases.
1166        let tx_hash = rr.get(COL_R_TX_HASH);
1167        let sender = rr.get(COL_R_FROM_ADDRESS);
1168        let tx_type: i32 = rr.get(COL_R_TX_TYPE);
1169        let success = rr.get::<i32, _>(COL_R_SUCCESS) != 0;
1170        let cumulative_gas_used: i64 = rr.get(COL_R_CUMULATIVE_GAS_USED);
1171        let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_R_FIRST_LOG_INDEX));
1172        let prior_cumulative_gas: u64 = from_i64(rr.get::<i64, _>(COL_PRIOR_GAS));
1173
1174        // Logs still require a separate query (variable row count).
1175        let log_rows = sqlx::query(
1176            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1177        )
1178        .bind(to_i64(block))
1179        .bind(to_i64(index))
1180        .fetch_all(&self.pool)
1181        .await
1182        .map_err(SqlColdError::from)?;
1183
1184        let logs = log_rows.iter().map(log_from_row).collect();
1185        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1186            .map_err(ColdStorageError::from)?;
1187        // Cumulative gas must be non-decreasing within a block; a violation
1188        // indicates database corruption.
1189        debug_assert!(
1190            built.inner.cumulative_gas_used >= prior_cumulative_gas,
1191            "cumulative gas decreased: {} < {}",
1192            built.inner.cumulative_gas_used,
1193            prior_cumulative_gas,
1194        );
1195        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1196
1197        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1198        Ok(Some(ColdReceipt::new(ir, &header, index)))
1199    }
1200
1201    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1202        let Some(header) =
1203            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1204        else {
1205            return Ok(Vec::new());
1206        };
1207
1208        let bn = to_i64(block);
1209
1210        // Fetch receipts joined with tx_hash and from_address
1211        let receipt_rows = sqlx::query(
1212            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1213               r.cumulative_gas_used, r.first_log_index, \
1214               t.tx_hash, t.from_address \
1215             FROM receipts r \
1216             JOIN transactions t ON r.block_number = t.block_number \
1217               AND r.tx_index = t.tx_index \
1218             WHERE r.block_number = $1 \
1219             ORDER BY r.tx_index",
1220        )
1221        .bind(bn)
1222        .fetch_all(&self.pool)
1223        .await
1224        .map_err(SqlColdError::from)?;
1225
1226        let all_log_rows =
1227            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1228                .bind(bn)
1229                .fetch_all(&self.pool)
1230                .await
1231                .map_err(SqlColdError::from)?;
1232
1233        // Group logs by tx_index
1234        let mut logs_by_tx: BTreeMap<i64, Vec<Log>> = BTreeMap::new();
1235        for r in &all_log_rows {
1236            let tx_idx: i64 = r.get(COL_TX_INDEX);
1237            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1238        }
1239
1240        let mut first_log_index = 0u64;
1241        let mut prior_cumulative_gas = 0u64;
1242        receipt_rows
1243            .into_iter()
1244            .enumerate()
1245            .map(|(idx, rr)| {
1246                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1247                let tx_hash = rr.get(COL_TX_HASH);
1248                let sender = rr.get(COL_FROM_ADDRESS);
1249                let tx_type = rr.get::<i32, _>(COL_TX_TYPE);
1250                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1251                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1252                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1253                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1254                    .map_err(ColdStorageError::from)?;
1255                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1256                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1257                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1258                first_log_index += ir.receipt.inner.logs.len() as u64;
1259                Ok(ColdReceipt::new(ir, &header, idx as u64))
1260            })
1261            .collect()
1262    }
1263
1264    async fn get_signet_events(
1265        &self,
1266        spec: SignetEventsSpecifier,
1267    ) -> ColdResult<Vec<DbSignetEvent>> {
1268        let rows = match spec {
1269            SignetEventsSpecifier::Block(block) => {
1270                let bn = to_i64(block);
1271                sqlx::query(
1272                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1273                )
1274                .bind(bn)
1275                .fetch_all(&self.pool)
1276                .await
1277                .map_err(SqlColdError::from)?
1278            }
1279            SignetEventsSpecifier::BlockRange { start, end } => {
1280                let s = to_i64(start);
1281                let e = to_i64(end);
1282                sqlx::query(
1283                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1284                     ORDER BY block_number, event_index",
1285                )
1286                .bind(s)
1287                .bind(e)
1288                .fetch_all(&self.pool)
1289                .await
1290                .map_err(SqlColdError::from)?
1291            }
1292        };
1293
1294        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1295    }
1296
1297    async fn get_zenith_header(
1298        &self,
1299        spec: ZenithHeaderSpecifier,
1300    ) -> ColdResult<Option<DbZenithHeader>> {
1301        let block = match spec {
1302            ZenithHeaderSpecifier::Number(n) => n,
1303            ZenithHeaderSpecifier::Range { start, .. } => start,
1304        };
1305        let bn = to_i64(block);
1306        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1307            .bind(bn)
1308            .fetch_optional(&self.pool)
1309            .await
1310            .map_err(SqlColdError::from)?;
1311
1312        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1313    }
1314
1315    async fn get_zenith_headers(
1316        &self,
1317        spec: ZenithHeaderSpecifier,
1318    ) -> ColdResult<Vec<DbZenithHeader>> {
1319        let rows = match spec {
1320            ZenithHeaderSpecifier::Number(n) => {
1321                let bn = to_i64(n);
1322                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1323                    .bind(bn)
1324                    .fetch_all(&self.pool)
1325                    .await
1326                    .map_err(SqlColdError::from)?
1327            }
1328            ZenithHeaderSpecifier::Range { start, end } => {
1329                let s = to_i64(start);
1330                let e = to_i64(end);
1331                sqlx::query(
1332                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1333                     ORDER BY block_number",
1334                )
1335                .bind(s)
1336                .bind(e)
1337                .fetch_all(&self.pool)
1338                .await
1339                .map_err(SqlColdError::from)?
1340            }
1341        };
1342
1343        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1344    }
1345
1346    async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1347        let from = filter.get_from_block().unwrap_or(0);
1348        let to = filter.get_to_block().unwrap_or(u64::MAX);
1349
1350        // Build WHERE clause: block range ($1, $2) + address/topic filters.
1351        let (filter_clause, params) = build_log_filter_clause(filter, 3);
1352        let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1353
1354        // Use LIMIT to cap results. Fetch one extra row to detect overflow
1355        // without a separate COUNT query. PostgreSQL stops scanning after
1356        // finding enough rows, making this faster than COUNT in both the
1357        // under-limit and over-limit cases.
1358        let limit_idx = 3 + params.len() as u32;
1359        let data_sql = format!(
1360            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1361               (r.first_log_index + l.log_index) AS block_log_index \
1362             FROM logs l \
1363             JOIN headers h ON l.block_number = h.block_number \
1364             JOIN transactions t ON l.block_number = t.block_number \
1365               AND l.tx_index = t.tx_index \
1366             JOIN receipts r ON l.block_number = r.block_number \
1367               AND l.tx_index = r.tx_index \
1368             WHERE {where_clause} \
1369             ORDER BY l.block_number, l.tx_index, l.log_index \
1370             LIMIT ${limit_idx}"
1371        );
1372        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1373        for param in &params {
1374            query = query.bind(*param);
1375        }
1376        // Clamp to i64::MAX before converting: SQL LIMIT is an i64, and
1377        // saturating_add avoids overflow when max_logs is usize::MAX.
1378        let limit = max_logs.saturating_add(1).min(i64::MAX as usize);
1379        query = query.bind(to_i64(limit as u64));
1380
1381        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1382
1383        if rows.len() > max_logs {
1384            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1385        }
1386
1387        rows.into_iter()
1388            .map(|r| {
1389                let log = log_from_row(&r);
1390                Ok(RpcLog {
1391                    inner: log,
1392                    block_hash: Some(r.get(COL_BLOCK_HASH)),
1393                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1394                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1395                    transaction_hash: Some(r.get(COL_TX_HASH)),
1396                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1397                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1398                    removed: false,
1399                })
1400            })
1401            .collect::<ColdResult<Vec<_>>>()
1402    }
1403
1404    async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1405        #[cfg(feature = "postgres")]
1406        if self.is_postgres {
1407            return self.produce_log_stream_pg(filter, params).await;
1408        }
1409        signet_cold::produce_log_stream_default(self, filter, params).await;
1410    }
1411
1412    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1413        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1414            .fetch_one(&self.pool)
1415            .await
1416            .map_err(SqlColdError::from)?;
1417        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1418    }
1419}
1420
1421impl ColdStorageWrite for SqlColdBackend {
1422    async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
1423        self.insert_block(data).await.map_err(ColdStorageError::from)
1424    }
1425
1426    async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
1427        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1428        for block_data in data {
1429            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1430        }
1431        tx.commit().await.map_err(SqlColdError::from)?;
1432        Ok(())
1433    }
1434
1435    async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
1436        let bn = to_i64(block);
1437        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1438
1439        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1440
1441        tx.commit().await.map_err(SqlColdError::from)?;
1442        Ok(())
1443    }
1444}
1445
1446impl ColdStorage for SqlColdBackend {
1447    async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
1448        let bn = to_i64(block);
1449        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1450
1451        // 1. Fetch all headers above block.
1452        let header_rows =
1453            sqlx::query("SELECT * FROM headers WHERE block_number > $1 ORDER BY block_number")
1454                .bind(bn)
1455                .fetch_all(&mut *tx)
1456                .await
1457                .map_err(SqlColdError::from)?;
1458
1459        if header_rows.is_empty() {
1460            tx.commit().await.map_err(SqlColdError::from)?;
1461            return Ok(Vec::new());
1462        }
1463
1464        let mut headers: BTreeMap<i64, SealedHeader> = BTreeMap::new();
1465        for r in &header_rows {
1466            let num: i64 = r.get(COL_BLOCK_NUMBER);
1467            let h = header_from_row(r).map_err(ColdStorageError::from)?.seal_slow();
1468            headers.insert(num, h);
1469        }
1470
1471        // 2. Fetch all receipt + tx metadata above block.
1472        let receipt_rows = sqlx::query(
1473            "SELECT r.block_number, r.tx_index, r.tx_type, r.success, \
1474               r.cumulative_gas_used, r.first_log_index, \
1475               t.tx_hash, t.from_address \
1476             FROM receipts r \
1477             JOIN transactions t ON r.block_number = t.block_number \
1478               AND r.tx_index = t.tx_index \
1479             WHERE r.block_number > $1 \
1480             ORDER BY r.block_number, r.tx_index",
1481        )
1482        .bind(bn)
1483        .fetch_all(&mut *tx)
1484        .await
1485        .map_err(SqlColdError::from)?;
1486
1487        // 3. Fetch all logs above block.
1488        let log_rows = sqlx::query(
1489            "SELECT * FROM logs WHERE block_number > $1 \
1490             ORDER BY block_number, tx_index, log_index",
1491        )
1492        .bind(bn)
1493        .fetch_all(&mut *tx)
1494        .await
1495        .map_err(SqlColdError::from)?;
1496
1497        // Group logs by (block_number, tx_index).
1498        let mut logs_by_block_tx: BTreeMap<(i64, i64), Vec<Log>> = BTreeMap::new();
1499        for r in &log_rows {
1500            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1501            let tx_idx: i64 = r.get(COL_TX_INDEX);
1502            logs_by_block_tx.entry((block_num, tx_idx)).or_default().push(log_from_row(r));
1503        }
1504
1505        // Group receipt rows by block_number.
1506        let mut receipts_by_block: BTreeMap<i64, Vec<&sqlx::any::AnyRow>> = BTreeMap::new();
1507        for r in &receipt_rows {
1508            let block_num: i64 = r.get(COL_BLOCK_NUMBER);
1509            receipts_by_block.entry(block_num).or_default().push(r);
1510        }
1511
1512        // 4. Assemble ColdReceipts per block.
1513        let mut all_receipts = Vec::with_capacity(headers.len());
1514        for (&block_num, header) in &headers {
1515            let block_receipt_rows = receipts_by_block.remove(&block_num).unwrap_or_default();
1516            let mut prior_cumulative_gas = 0u64;
1517            let block_receipts: ColdResult<Vec<ColdReceipt>> = block_receipt_rows
1518                .into_iter()
1519                .map(|rr: &sqlx::any::AnyRow| {
1520                    let tx_idx: i64 = rr.get(COL_TX_INDEX);
1521                    let tx_hash = rr.get(COL_TX_HASH);
1522                    let sender = rr.get(COL_FROM_ADDRESS);
1523                    let tx_type: i32 = rr.get(COL_TX_TYPE);
1524                    let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1525                    let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1526                    let first_log_index = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1527                    let logs = logs_by_block_tx.remove(&(block_num, tx_idx)).unwrap_or_default();
1528                    let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1529                        .map_err(ColdStorageError::from)?;
1530                    // Cumulative gas must be non-decreasing within a block; a violation
1531                    // indicates database corruption.
1532                    debug_assert!(
1533                        receipt.inner.cumulative_gas_used >= prior_cumulative_gas,
1534                        "cumulative gas decreased: {} < {}",
1535                        receipt.inner.cumulative_gas_used,
1536                        prior_cumulative_gas,
1537                    );
1538                    let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1539                    prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1540                    let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1541                    Ok(ColdReceipt::new(ir, header, from_i64(tx_idx)))
1542                })
1543                .collect();
1544            all_receipts.push(block_receipts?);
1545        }
1546
1547        // 5. Delete from all tables (same order as truncate_above).
1548        delete_above_in_tx(&mut tx, bn).await.map_err(ColdStorageError::from)?;
1549
1550        tx.commit().await.map_err(SqlColdError::from)?;
1551        Ok(all_receipts)
1552    }
1553}
1554
1555#[cfg(all(test, feature = "test-utils"))]
1556mod tests {
1557    use super::*;
1558    use signet_cold::conformance::conformance;
1559
1560    #[tokio::test]
1561    async fn sqlite_conformance() {
1562        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1563        conformance(backend).await.unwrap();
1564    }
1565
1566    #[tokio::test]
1567    async fn pg_conformance() {
1568        let Ok(url) = std::env::var("DATABASE_URL") else {
1569            eprintln!("skipping pg conformance: DATABASE_URL not set");
1570            return;
1571        };
1572        let backend = SqlColdBackend::connect(&url).await.unwrap();
1573        conformance(backend).await.unwrap();
1574    }
1575}