Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS,
14    COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM,
15    COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS,
16    COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT,
17    COL_PARENT_HASH, COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS,
18    COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY,
19    COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1,
20    COL_TOPIC2, COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE,
21    COL_VALUE, COL_WITHDRAWALS_ROOT,
22};
23use crate::convert::{
24    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
25    decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list,
26    encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64,
27};
28use alloy::{
29    consensus::{
30        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
31        transaction::Recovered,
32    },
33    primitives::{
34        Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256,
35    },
36};
37use signet_cold::{
38    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
39    HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
40    ZenithHeaderSpecifier,
41};
42use signet_storage_types::{
43    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
44    TransactionSigned,
45};
46use signet_zenith::{
47    Passage::{Enter, EnterToken},
48    Transactor::Transact,
49    Zenith,
50};
51use sqlx::{AnyPool, Row};
52
53/// SQL-based cold storage backend.
54///
55/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
56/// PostgreSQL and SQLite through a single implementation. The backend
57/// is determined by the connection URL at construction time.
58///
59/// # Example
60///
61/// ```no_run
62/// # async fn example() {
63/// use signet_cold_sql::SqlColdBackend;
64///
65/// // SQLite (in-memory)
66/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
67///
68/// // PostgreSQL
69/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
70/// # }
71/// ```
72#[derive(Debug, Clone)]
73pub struct SqlColdBackend {
74    pool: AnyPool,
75    is_postgres: bool,
76}
77
78impl SqlColdBackend {
79    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
80    ///
81    /// Auto-detects the database backend and creates all tables if they
82    /// do not already exist. Callers must ensure
83    /// [`sqlx::any::install_default_drivers`] has been called before
84    /// constructing the pool.
85    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
86        // Detect backend from a pooled connection.
87        let conn = pool.acquire().await?;
88        let backend = conn.backend_name().to_owned();
89        drop(conn);
90
91        let migration = match backend.as_str() {
92            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
93            "SQLite" => include_str!("../migrations/001_initial.sql"),
94            other => {
95                return Err(SqlColdError::Convert(format!(
96                    "unsupported database backend: {other}"
97                )));
98            }
99        };
100        // Execute via pool to ensure the migration uses the same
101        // connection that subsequent queries will use.
102        let is_postgres = backend == "PostgreSQL";
103        sqlx::raw_sql(migration).execute(&pool).await?;
104        Ok(Self { pool, is_postgres })
105    }
106
107    /// Connect to a database URL and create the backend.
108    ///
109    /// Installs the default sqlx drivers on the first call. The database
110    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
111    ///
112    /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
113    /// limited to one connection to ensure all operations share the same
114    /// database.
115    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
116        sqlx::any::install_default_drivers();
117        let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
118        Self::new(pool).await
119    }
120
121    // ========================================================================
122    // Specifier resolution
123    // ========================================================================
124
125    async fn resolve_header_spec(
126        &self,
127        spec: HeaderSpecifier,
128    ) -> Result<Option<BlockNumber>, SqlColdError> {
129        match spec {
130            HeaderSpecifier::Number(n) => Ok(Some(n)),
131            HeaderSpecifier::Hash(hash) => {
132                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
133                    .bind(hash)
134                    .fetch_optional(&self.pool)
135                    .await?;
136                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
137            }
138        }
139    }
140
141    // ========================================================================
142    // Read helpers
143    // ========================================================================
144
145    async fn fetch_header_by_number(
146        &self,
147        block_num: BlockNumber,
148    ) -> Result<Option<SealedHeader>, SqlColdError> {
149        let bn = to_i64(block_num);
150        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
151            .bind(bn)
152            .fetch_optional(&self.pool)
153            .await?;
154
155        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
156    }
157
158    // ========================================================================
159    // Write helpers
160    // ========================================================================
161
162    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
163        let mut tx = self.pool.begin().await?;
164        write_block_to_tx(&mut tx, data).await?;
165        tx.commit().await?;
166        Ok(())
167    }
168
169    // ========================================================================
170    // Streaming helpers
171    // ========================================================================
172
173    /// Stream logs using a PostgreSQL REPEATABLE READ transaction.
174    ///
175    /// The transaction provides a consistent snapshot across all per-block
176    /// queries, eliminating the need for anchor-hash reorg detection.
177    /// Rows are streamed individually rather than materialised per block.
178    #[cfg(feature = "postgres")]
179    async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) {
180        use tokio_stream::StreamExt;
181
182        /// Unwrap a `Result` or send the error through the stream and return.
183        macro_rules! try_stream {
184            ($sender:expr, $expr:expr) => {
185                match $expr {
186                    Ok(v) => v,
187                    Err(e) => {
188                        let _ = $sender
189                            .send(Err(ColdStorageError::backend(SqlColdError::from(e))))
190                            .await;
191                        return;
192                    }
193                }
194            };
195        }
196
197        let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params;
198
199        // Open a REPEATABLE READ transaction so all per-block queries see a
200        // consistent snapshot. This makes reorg detection unnecessary — if a
201        // reorg lands mid-stream the transaction still reads the old data.
202        let mut tx = try_stream!(sender, self.pool.begin().await);
203        try_stream!(
204            sender,
205            sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await
206        );
207
208        // Build the parameterised query once. $1 is the block number
209        // (bound per iteration); remaining parameters are the address
210        // and topic filters from the user's request.
211        let (filter_clause, filter_params) = build_log_filter_clause(filter, 2);
212        let data_sql = format!(
213            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
214               (r.first_log_index + l.log_index) AS block_log_index \
215             FROM logs l \
216             JOIN headers h ON l.block_number = h.block_number \
217             JOIN transactions t ON l.block_number = t.block_number \
218               AND l.tx_index = t.tx_index \
219             JOIN receipts r ON l.block_number = r.block_number \
220               AND l.tx_index = r.tx_index \
221             WHERE l.block_number = $1{filter_clause} \
222             ORDER BY l.tx_index, l.log_index"
223        );
224
225        let mut total = 0usize;
226
227        // Walk through blocks one at a time, streaming matching log rows
228        // from each block directly to the channel.
229        for block_num in from..=to {
230            // Check the deadline before starting each block so we
231            // don't begin a new query after the caller's timeout.
232            if tokio::time::Instant::now() > deadline {
233                let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
234                return;
235            }
236
237            let mut query = sqlx::query(&data_sql).bind(to_i64(block_num));
238            for param in &filter_params {
239                query = query.bind(*param);
240            }
241
242            // Stream rows from this block's query. Each row is converted
243            // to an RpcLog and sent over the channel immediately rather
244            // than being collected into a Vec first.
245            let mut stream = query.fetch(&mut *tx);
246            while let Some(row_result) = stream.next().await {
247                let r = try_stream!(sender, row_result);
248
249                // Enforce the global log limit across all blocks.
250                total += 1;
251                if total > max_logs {
252                    let _ =
253                        sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
254                    return;
255                }
256
257                let log = log_from_row(&r);
258                let rpc_log = RpcLog {
259                    inner: log,
260                    block_hash: Some(r.get(COL_BLOCK_HASH)),
261                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
262                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
263                    transaction_hash: Some(r.get(COL_TX_HASH)),
264                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
265                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
266                    removed: false,
267                };
268                // Send the log to the caller. The timeout ensures we
269                // stop if the deadline passes while back-pressured.
270                match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await {
271                    Ok(Ok(())) => {}
272                    Ok(Err(_)) => return, // receiver dropped
273                    Err(_) => {
274                        let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
275                        return;
276                    }
277                }
278            }
279
280            // Early exit if we've already hit the limit — no need to
281            // query the next block.
282            if total >= max_logs {
283                return;
284            }
285        }
286    }
287}
288
289// ============================================================================
290// Row → domain type conversion (read path)
291// ============================================================================
292
293/// Extract a required BLOB column from a row as a borrowed slice.
294fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] {
295    r.get(col)
296}
297
298/// Extract an optional BLOB column from a row as a borrowed slice.
299fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> {
300    r.get(col)
301}
302
303/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
304fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
305    Ok(Header {
306        parent_hash: r.get(COL_PARENT_HASH),
307        ommers_hash: r.get(COL_OMMERS_HASH),
308        beneficiary: r.get(COL_BENEFICIARY),
309        state_root: r.get(COL_STATE_ROOT),
310        transactions_root: r.get(COL_TRANSACTIONS_ROOT),
311        receipts_root: r.get(COL_RECEIPTS_ROOT),
312        logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)),
313        difficulty: r.get(COL_DIFFICULTY),
314        number: from_i64(r.get(COL_BLOCK_NUMBER)),
315        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
316        gas_used: from_i64(r.get(COL_GAS_USED)),
317        timestamp: from_i64(r.get(COL_TIMESTAMP)),
318        extra_data: r.get(COL_EXTRA_DATA),
319        mix_hash: r.get(COL_MIX_HASH),
320        nonce: r.get(COL_NONCE),
321        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
322        withdrawals_root: r.get(COL_WITHDRAWALS_ROOT),
323        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
324        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
325        parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT),
326        requests_hash: r.get(COL_REQUESTS_HASH),
327    })
328}
329
330/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
331fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
332    use alloy::consensus::EthereumTxEnvelope;
333
334    let sig =
335        Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::<i32, _>(COL_SIG_Y_PARITY) != 0);
336
337    let tx_type_raw = r.get::<i32, _>(COL_TX_TYPE) as u8;
338    let tx_type = TxType::try_from(tx_type_raw)
339        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?;
340
341    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
342    let nonce = from_i64(r.get(COL_NONCE));
343    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
344    let to_addr: Option<Address> = r.get(COL_TO_ADDRESS);
345    let value: U256 = r.get(COL_VALUE);
346    let input: Bytes = r.get(COL_INPUT);
347
348    match tx_type {
349        TxType::Legacy => {
350            let tx = TxLegacy {
351                chain_id: chain_id.map(from_i64),
352                nonce,
353                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
354                gas_limit,
355                to: to_addr.map_or(TxKind::Create, TxKind::Call),
356                value,
357                input,
358            };
359            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
360        }
361        TxType::Eip2930 => {
362            let tx = TxEip2930 {
363                chain_id: from_i64(
364                    chain_id
365                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
366                ),
367                nonce,
368                gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
369                gas_limit,
370                to: to_addr.map_or(TxKind::Create, TxKind::Call),
371                value,
372                input,
373                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
374            };
375            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
376        }
377        TxType::Eip1559 => {
378            let tx = TxEip1559 {
379                chain_id: from_i64(
380                    chain_id
381                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
382                ),
383                nonce,
384                gas_limit,
385                max_fee_per_gas: decode_u128_required(
386                    opt_blob(r, COL_MAX_FEE_PER_GAS),
387                    COL_MAX_FEE_PER_GAS,
388                )?,
389                max_priority_fee_per_gas: decode_u128_required(
390                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
391                    COL_MAX_PRIORITY_FEE_PER_GAS,
392                )?,
393                to: to_addr.map_or(TxKind::Create, TxKind::Call),
394                value,
395                input,
396                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
397            };
398            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
399        }
400        TxType::Eip4844 => {
401            let tx = TxEip4844 {
402                chain_id: from_i64(
403                    chain_id
404                        .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?,
405                ),
406                nonce,
407                gas_limit,
408                max_fee_per_gas: decode_u128_required(
409                    opt_blob(r, COL_MAX_FEE_PER_GAS),
410                    COL_MAX_FEE_PER_GAS,
411                )?,
412                max_priority_fee_per_gas: decode_u128_required(
413                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
414                    COL_MAX_PRIORITY_FEE_PER_GAS,
415                )?,
416                to: to_addr
417                    .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?,
418                value,
419                input,
420                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
421                blob_versioned_hashes: decode_b256_vec(
422                    opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| {
423                        SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
424                    })?,
425                )?,
426                max_fee_per_blob_gas: decode_u128_required(
427                    opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
428                    COL_MAX_FEE_PER_BLOB_GAS,
429                )?,
430            };
431            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
432        }
433        TxType::Eip7702 => {
434            let tx = TxEip7702 {
435                chain_id: from_i64(
436                    chain_id
437                        .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?,
438                ),
439                nonce,
440                gas_limit,
441                max_fee_per_gas: decode_u128_required(
442                    opt_blob(r, COL_MAX_FEE_PER_GAS),
443                    COL_MAX_FEE_PER_GAS,
444                )?,
445                max_priority_fee_per_gas: decode_u128_required(
446                    opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
447                    COL_MAX_PRIORITY_FEE_PER_GAS,
448                )?,
449                to: to_addr
450                    .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?,
451                value,
452                input,
453                access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?,
454                authorization_list: decode_authorization_list(
455                    opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| {
456                        SqlColdError::Convert("EIP7702 requires authorization_list".into())
457                    })?,
458                )?,
459            };
460            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
461        }
462    }
463}
464
465/// Build a [`RecoveredTx`] from a row that includes `from_address`.
466fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
467    let sender: Address = r.get(COL_FROM_ADDRESS);
468    let tx = tx_from_row(r)?;
469    // SAFETY: the sender was recovered at append time and stored in from_address.
470    Ok(Recovered::new_unchecked(tx, sender))
471}
472
473/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
474fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
475    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
476        .into_iter()
477        .filter_map(|col| r.get::<Option<B256>, _>(col))
478        .collect();
479    Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) }
480}
481
482/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
483fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
484    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
485    let order = from_i64(r.get(COL_ORDER_INDEX));
486    let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID);
487
488    match event_type {
489        EVENT_TRANSACT => {
490            let sender: Address = r
491                .get::<Option<Address>, _>(COL_SENDER)
492                .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?;
493            let to: Address = r
494                .get::<Option<Address>, _>(COL_TO_ADDRESS)
495                .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?;
496            let value: U256 = r
497                .get::<Option<U256>, _>(COL_VALUE)
498                .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?;
499            let gas: U256 = r
500                .get::<Option<U256>, _>(COL_GAS)
501                .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?;
502            let max_fee: U256 = r
503                .get::<Option<U256>, _>(COL_MAX_FEE_PER_GAS)
504                .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?;
505            let data: Bytes = r.get::<Option<Bytes>, _>(COL_DATA).unwrap_or_default();
506
507            Ok(DbSignetEvent::Transact(
508                order,
509                Transact {
510                    rollupChainId: rollup_chain_id,
511                    sender,
512                    to,
513                    value,
514                    gas,
515                    maxFeePerGas: max_fee,
516                    data,
517                },
518            ))
519        }
520        EVENT_ENTER => {
521            let recipient: Address = r
522                .get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT)
523                .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?;
524            let amount: U256 = r
525                .get::<Option<U256>, _>(COL_AMOUNT)
526                .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?;
527
528            Ok(DbSignetEvent::Enter(
529                order,
530                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
531            ))
532        }
533        EVENT_ENTER_TOKEN => {
534            let token: Address = r
535                .get::<Option<Address>, _>(COL_TOKEN)
536                .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?;
537            let recipient: Address =
538                r.get::<Option<Address>, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| {
539                    SqlColdError::Convert("EnterToken requires rollup_recipient".into())
540                })?;
541            let amount: U256 = r
542                .get::<Option<U256>, _>(COL_AMOUNT)
543                .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?;
544
545            Ok(DbSignetEvent::EnterToken(
546                order,
547                EnterToken {
548                    rollupChainId: rollup_chain_id,
549                    token,
550                    rollupRecipient: recipient,
551                    amount,
552                },
553            ))
554        }
555        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
556    }
557}
558
559/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
560fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
561    Ok(DbZenithHeader(Zenith::BlockHeader {
562        hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER),
563        rollupChainId: r.get(COL_ROLLUP_CHAIN_ID),
564        gasLimit: r.get(COL_GAS_LIMIT),
565        rewardAddress: r.get(COL_REWARD_ADDRESS),
566        blockDataHash: r.get(COL_BLOCK_DATA_HASH),
567    }))
568}
569
570// ============================================================================
571// Domain type → SQL INSERT (write path)
572// ============================================================================
573
574/// Write a single block's data into an open SQL transaction.
575async fn write_block_to_tx(
576    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
577    data: BlockData,
578) -> Result<(), SqlColdError> {
579    let bn = to_i64(data.block_number());
580
581    // Insert header
582    let block_hash = data.header.hash_slow();
583    let difficulty = &data.header.difficulty;
584    sqlx::query(
585        "INSERT INTO headers (
586            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
587            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
588            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
589            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
590            parent_beacon_block_root, requests_hash
591        ) VALUES (
592            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
593            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
594        )",
595    )
596    .bind(bn)
597    .bind(block_hash)
598    .bind(data.header.parent_hash)
599    .bind(data.header.ommers_hash)
600    .bind(data.header.beneficiary)
601    .bind(data.header.state_root)
602    .bind(data.header.transactions_root)
603    .bind(data.header.receipts_root)
604    .bind(data.header.logs_bloom)
605    .bind(difficulty)
606    .bind(to_i64(data.header.gas_limit))
607    .bind(to_i64(data.header.gas_used))
608    .bind(to_i64(data.header.timestamp))
609    .bind(&data.header.extra_data)
610    .bind(data.header.mix_hash)
611    .bind(data.header.nonce)
612    .bind(data.header.base_fee_per_gas.map(to_i64))
613    .bind(data.header.withdrawals_root.as_ref())
614    .bind(data.header.blob_gas_used.map(to_i64))
615    .bind(data.header.excess_blob_gas.map(to_i64))
616    .bind(data.header.parent_beacon_block_root.as_ref())
617    .bind(data.header.requests_hash.as_ref())
618    .execute(&mut **tx)
619    .await?;
620
621    // Insert transactions
622    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
623        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
624    }
625
626    // Insert receipts and logs, computing first_log_index as a running
627    // sum of log counts (same algorithm as the MDBX IndexedReceipt path).
628    let mut first_log_index = 0i64;
629    for (idx, receipt) in data.receipts.iter().enumerate() {
630        let tx_idx = to_i64(idx as u64);
631        sqlx::query(
632            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index)
633             VALUES ($1, $2, $3, $4, $5, $6)",
634        )
635        .bind(bn)
636        .bind(tx_idx)
637        .bind(receipt.tx_type as i32)
638        .bind(receipt.inner.status.coerce_status() as i32)
639        .bind(to_i64(receipt.inner.cumulative_gas_used))
640        .bind(first_log_index)
641        .execute(&mut **tx)
642        .await?;
643        first_log_index += receipt.inner.logs.len() as i64;
644
645        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
646            let topics = log.topics();
647            sqlx::query(
648                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
649                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
650            )
651            .bind(bn)
652            .bind(tx_idx)
653            .bind(to_i64(log_idx as u64))
654            .bind(log.address)
655            .bind(topics.first())
656            .bind(topics.get(1))
657            .bind(topics.get(2))
658            .bind(topics.get(3))
659            .bind(&log.data.data)
660            .execute(&mut **tx)
661            .await?;
662        }
663    }
664
665    // Insert signet events
666    for (idx, event) in data.signet_events.iter().enumerate() {
667        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
668    }
669
670    // Insert zenith header
671    if let Some(zh) = &data.zenith_header {
672        let h = &zh.0;
673        sqlx::query(
674            "INSERT INTO zenith_headers (
675                block_number, host_block_number, rollup_chain_id,
676                gas_limit, reward_address, block_data_hash
677            ) VALUES ($1, $2, $3, $4, $5, $6)",
678        )
679        .bind(bn)
680        .bind(h.hostBlockNumber)
681        .bind(h.rollupChainId)
682        .bind(h.gasLimit)
683        .bind(h.rewardAddress)
684        .bind(h.blockDataHash)
685        .execute(&mut **tx)
686        .await?;
687    }
688
689    Ok(())
690}
691
692/// Insert a transaction, binding directly from the source type.
693async fn insert_transaction(
694    conn: &mut sqlx::AnyConnection,
695    bn: i64,
696    tx_index: i64,
697    recovered: &RecoveredTx,
698) -> Result<(), SqlColdError> {
699    use alloy::consensus::EthereumTxEnvelope;
700
701    let sender = recovered.signer();
702    let tx: &TransactionSigned = recovered;
703    let tx_hash = tx.tx_hash();
704    let tx_type = tx.tx_type() as i32;
705
706    macro_rules! sig {
707        ($s:expr) => {{
708            let sig = $s.signature();
709            (sig.v() as i32, sig.r(), sig.s())
710        }};
711    }
712    let (sig_y, sig_r, sig_s) = match tx {
713        EthereumTxEnvelope::Legacy(s) => sig!(s),
714        EthereumTxEnvelope::Eip2930(s) => sig!(s),
715        EthereumTxEnvelope::Eip1559(s) => sig!(s),
716        EthereumTxEnvelope::Eip4844(s) => sig!(s),
717        EthereumTxEnvelope::Eip7702(s) => sig!(s),
718    };
719
720    let (chain_id, nonce, gas_limit) = match tx {
721        EthereumTxEnvelope::Legacy(s) => {
722            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
723        }
724        EthereumTxEnvelope::Eip2930(s) => {
725            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
726        }
727        EthereumTxEnvelope::Eip1559(s) => {
728            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
729        }
730        EthereumTxEnvelope::Eip4844(s) => {
731            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
732        }
733        EthereumTxEnvelope::Eip7702(s) => {
734            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
735        }
736    };
737
738    let (value, to_addr) = match tx {
739        EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()),
740        EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()),
741        EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()),
742        EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)),
743        EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)),
744    };
745
746    let input: &[u8] = match tx {
747        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
748        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
749        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
750        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
751        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
752    };
753
754    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
755        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
756        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
757        EthereumTxEnvelope::Eip1559(s) => (
758            None,
759            Some(encode_u128(s.tx().max_fee_per_gas)),
760            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
761            None,
762        ),
763        EthereumTxEnvelope::Eip4844(s) => (
764            None,
765            Some(encode_u128(s.tx().max_fee_per_gas)),
766            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
767            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
768        ),
769        EthereumTxEnvelope::Eip7702(s) => (
770            None,
771            Some(encode_u128(s.tx().max_fee_per_gas)),
772            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
773            None,
774        ),
775    };
776
777    let (access_list, blob_hashes, auth_list) = match tx {
778        EthereumTxEnvelope::Legacy(_) => (None, None, None),
779        EthereumTxEnvelope::Eip2930(s) => {
780            (Some(encode_access_list(&s.tx().access_list)), None, None)
781        }
782        EthereumTxEnvelope::Eip1559(s) => {
783            (Some(encode_access_list(&s.tx().access_list)), None, None)
784        }
785        EthereumTxEnvelope::Eip4844(s) => (
786            Some(encode_access_list(&s.tx().access_list)),
787            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
788            None,
789        ),
790        EthereumTxEnvelope::Eip7702(s) => (
791            Some(encode_access_list(&s.tx().access_list)),
792            None,
793            Some(encode_authorization_list(&s.tx().authorization_list)),
794        ),
795    };
796
797    sqlx::query(
798        "INSERT INTO transactions (
799            block_number, tx_index, tx_hash, tx_type,
800            sig_y_parity, sig_r, sig_s,
801            chain_id, nonce, gas_limit, to_address, value, input,
802            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
803            max_fee_per_blob_gas, blob_versioned_hashes,
804            access_list, authorization_list, from_address
805        ) VALUES (
806            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
807            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
808        )",
809    )
810    .bind(bn)
811    .bind(tx_index)
812    .bind(tx_hash)
813    .bind(tx_type)
814    .bind(sig_y)
815    .bind(sig_r)
816    .bind(sig_s)
817    .bind(chain_id)
818    .bind(nonce)
819    .bind(gas_limit)
820    .bind(to_addr)
821    .bind(value)
822    .bind(input)
823    .bind(gas_price.as_ref().map(|v| v.as_slice()))
824    .bind(max_fee.as_ref().map(|v| v.as_slice()))
825    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
826    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
827    .bind(blob_hashes.as_deref())
828    .bind(access_list.as_deref())
829    .bind(auth_list.as_deref())
830    .bind(sender)
831    .execute(&mut *conn)
832    .await?;
833
834    Ok(())
835}
836
837/// Insert a signet event, binding directly from the source type.
838async fn insert_signet_event(
839    conn: &mut sqlx::AnyConnection,
840    block_number: i64,
841    event_index: i64,
842    event: &DbSignetEvent,
843) -> Result<(), SqlColdError> {
844    let (event_type, order, chain_id) = match event {
845        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId),
846        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId),
847        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId),
848    };
849
850    let (value, gas, max_fee, amount) = match event {
851        DbSignetEvent::Transact(_, t) => {
852            (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None)
853        }
854        DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)),
855        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)),
856    };
857
858    sqlx::query(
859        "INSERT INTO signet_events (
860            block_number, event_index, event_type, order_index,
861            rollup_chain_id, sender, to_address, value, gas,
862            max_fee_per_gas, data, rollup_recipient, amount, token
863        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
864    )
865    .bind(block_number)
866    .bind(event_index)
867    .bind(event_type)
868    .bind(order)
869    .bind(chain_id)
870    .bind(match event {
871        DbSignetEvent::Transact(_, t) => Some(&t.sender),
872        _ => None,
873    })
874    .bind(match event {
875        DbSignetEvent::Transact(_, t) => Some(&t.to),
876        _ => None,
877    })
878    .bind(value)
879    .bind(gas)
880    .bind(max_fee)
881    .bind(match event {
882        DbSignetEvent::Transact(_, t) => Some(&t.data),
883        _ => None,
884    })
885    .bind(match event {
886        DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient),
887        DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient),
888        _ => None,
889    })
890    .bind(amount)
891    .bind(match event {
892        DbSignetEvent::EnterToken(_, e) => Some(&e.token),
893        _ => None,
894    })
895    .execute(&mut *conn)
896    .await?;
897
898    Ok(())
899}
900
901// ============================================================================
902// Log filter helpers
903// ============================================================================
904
905/// Append a SQL filter clause for a set of byte-encoded values.
906///
907/// For a single value, generates ` AND {column} = ${idx}`.
908/// For multiple values, generates ` AND {column} IN (${idx}, ...)`.
909/// Returns the next available parameter index.
910fn append_filter_clause<'a>(
911    clause: &mut String,
912    params: &mut Vec<&'a [u8]>,
913    mut idx: u32,
914    column: &str,
915    values: impl ExactSizeIterator<Item = &'a [u8]>,
916) -> u32 {
917    use std::fmt::Write;
918
919    let len = values.len();
920    if len == 1 {
921        write!(clause, " AND {column} = ${idx}").unwrap();
922        values.for_each(|v| params.push(v));
923        return idx + 1;
924    }
925    write!(clause, " AND {column} IN (").unwrap();
926    for (i, v) in values.enumerate() {
927        if i > 0 {
928            clause.push_str(", ");
929        }
930        write!(clause, "${idx}").unwrap();
931        params.push(v);
932        idx += 1;
933    }
934    clause.push(')');
935    idx
936}
937
938fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) {
939    let mut clause = String::new();
940    let mut params: Vec<&[u8]> = Vec::new();
941    let mut idx = start_idx;
942
943    if !filter.address.is_empty() {
944        idx = append_filter_clause(
945            &mut clause,
946            &mut params,
947            idx,
948            "l.address",
949            filter.address.iter().map(|a| a.as_slice()),
950        );
951    }
952
953    let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
954    for (i, topic_filter) in filter.topics.iter().enumerate() {
955        if topic_filter.is_empty() {
956            continue;
957        }
958        idx = append_filter_clause(
959            &mut clause,
960            &mut params,
961            idx,
962            topic_cols[i],
963            topic_filter.iter().map(|v| v.as_slice()),
964        );
965    }
966
967    (clause, params)
968}
969
970// ============================================================================
971// ColdStorage implementation
972// ============================================================================
973
974impl ColdStorage for SqlColdBackend {
975    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
976        let Some(block_num) = self.resolve_header_spec(spec).await? else {
977            return Ok(None);
978        };
979        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
980    }
981
982    async fn get_headers(
983        &self,
984        specs: Vec<HeaderSpecifier>,
985    ) -> ColdResult<Vec<Option<SealedHeader>>> {
986        let mut results = Vec::with_capacity(specs.len());
987        for spec in specs {
988            let header = self.get_header(spec).await?;
989            results.push(header);
990        }
991        Ok(results)
992    }
993
994    async fn get_transaction(
995        &self,
996        spec: TransactionSpecifier,
997    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
998        let row = match spec {
999            TransactionSpecifier::Hash(hash) => sqlx::query(
1000                "SELECT t.*, h.block_hash
1001                     FROM transactions t
1002                     JOIN headers h ON t.block_number = h.block_number
1003                     WHERE t.tx_hash = $1",
1004            )
1005            .bind(hash)
1006            .fetch_optional(&self.pool)
1007            .await
1008            .map_err(SqlColdError::from)?,
1009            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
1010                "SELECT t.*, h.block_hash
1011                     FROM transactions t
1012                     JOIN headers h ON t.block_number = h.block_number
1013                     WHERE t.block_number = $1 AND t.tx_index = $2",
1014            )
1015            .bind(to_i64(block))
1016            .bind(to_i64(index))
1017            .fetch_optional(&self.pool)
1018            .await
1019            .map_err(SqlColdError::from)?,
1020            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
1021                "SELECT t.*, h.block_hash
1022                     FROM transactions t
1023                     JOIN headers h ON t.block_number = h.block_number
1024                     WHERE h.block_hash = $1 AND t.tx_index = $2",
1025            )
1026            .bind(block_hash)
1027            .bind(to_i64(index))
1028            .fetch_optional(&self.pool)
1029            .await
1030            .map_err(SqlColdError::from)?,
1031        };
1032
1033        let Some(r) = row else {
1034            return Ok(None);
1035        };
1036
1037        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1038        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
1039        let block_hash = r.get(COL_BLOCK_HASH);
1040        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
1041        let meta = ConfirmationMeta::new(block, block_hash, index);
1042        Ok(Some(Confirmed::new(recovered, meta)))
1043    }
1044
1045    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
1046        let bn = to_i64(block);
1047        let rows =
1048            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
1049                .bind(bn)
1050                .fetch_all(&self.pool)
1051                .await
1052                .map_err(SqlColdError::from)?;
1053
1054        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
1055    }
1056
1057    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
1058        let bn = to_i64(block);
1059        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
1060            .bind(bn)
1061            .fetch_one(&self.pool)
1062            .await
1063            .map_err(SqlColdError::from)?;
1064
1065        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
1066    }
1067
1068    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
1069        // Resolve to (block, index)
1070        let (block, index) = match spec {
1071            ReceiptSpecifier::TxHash(hash) => {
1072                let row = sqlx::query(
1073                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
1074                )
1075                .bind(hash)
1076                .fetch_optional(&self.pool)
1077                .await
1078                .map_err(SqlColdError::from)?;
1079                let Some(r) = row else { return Ok(None) };
1080                (
1081                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
1082                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
1083                )
1084            }
1085            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
1086        };
1087
1088        let Some(header) = self.fetch_header_by_number(block).await? else {
1089            return Ok(None);
1090        };
1091
1092        // Fetch receipt + tx_hash + from_address
1093        let receipt_row = sqlx::query(
1094            "SELECT r.*, t.tx_hash, t.from_address
1095             FROM receipts r
1096             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1097             WHERE r.block_number = $1 AND r.tx_index = $2",
1098        )
1099        .bind(to_i64(block))
1100        .bind(to_i64(index))
1101        .fetch_optional(&self.pool)
1102        .await
1103        .map_err(SqlColdError::from)?;
1104
1105        let Some(rr) = receipt_row else {
1106            return Ok(None);
1107        };
1108
1109        let bn: i64 = rr.get(COL_BLOCK_NUMBER);
1110        let tx_idx: i64 = rr.get(COL_TX_INDEX);
1111        let tx_hash = rr.get(COL_TX_HASH);
1112        let sender = rr.get(COL_FROM_ADDRESS);
1113        let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1114        let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1115        let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1116
1117        let log_rows = sqlx::query(
1118            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
1119        )
1120        .bind(bn)
1121        .bind(tx_idx)
1122        .fetch_all(&self.pool)
1123        .await
1124        .map_err(SqlColdError::from)?;
1125
1126        let logs = log_rows.iter().map(log_from_row).collect();
1127        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
1128            .map_err(ColdStorageError::from)?;
1129
1130        // Read first_log_index directly from the receipt row; compute
1131        // gas_used from the prior receipt's cumulative gas.
1132        let first_log_index: u64 = from_i64(rr.get::<i64, _>(COL_FIRST_LOG_INDEX));
1133        let prior = sqlx::query(
1134            "SELECT CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
1135             FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
1136        )
1137        .bind(to_i64(block))
1138        .bind(to_i64(index))
1139        .fetch_one(&self.pool)
1140        .await
1141        .map_err(SqlColdError::from)?;
1142        let prior_cumulative_gas: u64 =
1143            prior.get::<Option<i64>, _>(COL_PRIOR_GAS).unwrap_or(0) as u64;
1144        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
1145
1146        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
1147        Ok(Some(ColdReceipt::new(ir, &header, index)))
1148    }
1149
1150    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1151        let Some(header) =
1152            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1153        else {
1154            return Ok(Vec::new());
1155        };
1156
1157        let bn = to_i64(block);
1158
1159        // Fetch receipts joined with tx_hash and from_address
1160        let receipt_rows = sqlx::query(
1161            "SELECT r.*, t.tx_hash, t.from_address
1162             FROM receipts r
1163             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1164             WHERE r.block_number = $1
1165             ORDER BY r.tx_index",
1166        )
1167        .bind(bn)
1168        .fetch_all(&self.pool)
1169        .await
1170        .map_err(SqlColdError::from)?;
1171
1172        let all_log_rows =
1173            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1174                .bind(bn)
1175                .fetch_all(&self.pool)
1176                .await
1177                .map_err(SqlColdError::from)?;
1178
1179        // Group logs by tx_index
1180        let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<Log>> =
1181            std::collections::BTreeMap::new();
1182        for r in &all_log_rows {
1183            let tx_idx: i64 = r.get(COL_TX_INDEX);
1184            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1185        }
1186
1187        let mut first_log_index = 0u64;
1188        let mut prior_cumulative_gas = 0u64;
1189        receipt_rows
1190            .into_iter()
1191            .enumerate()
1192            .map(|(idx, rr)| {
1193                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1194                let tx_hash = rr.get(COL_TX_HASH);
1195                let sender = rr.get(COL_FROM_ADDRESS);
1196                let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1197                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1198                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1199                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1200                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1201                    .map_err(ColdStorageError::from)?;
1202                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1203                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1204                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1205                first_log_index += ir.receipt.inner.logs.len() as u64;
1206                Ok(ColdReceipt::new(ir, &header, idx as u64))
1207            })
1208            .collect()
1209    }
1210
1211    async fn get_signet_events(
1212        &self,
1213        spec: SignetEventsSpecifier,
1214    ) -> ColdResult<Vec<DbSignetEvent>> {
1215        let rows = match spec {
1216            SignetEventsSpecifier::Block(block) => {
1217                let bn = to_i64(block);
1218                sqlx::query(
1219                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1220                )
1221                .bind(bn)
1222                .fetch_all(&self.pool)
1223                .await
1224                .map_err(SqlColdError::from)?
1225            }
1226            SignetEventsSpecifier::BlockRange { start, end } => {
1227                let s = to_i64(start);
1228                let e = to_i64(end);
1229                sqlx::query(
1230                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1231                     ORDER BY block_number, event_index",
1232                )
1233                .bind(s)
1234                .bind(e)
1235                .fetch_all(&self.pool)
1236                .await
1237                .map_err(SqlColdError::from)?
1238            }
1239        };
1240
1241        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1242    }
1243
1244    async fn get_zenith_header(
1245        &self,
1246        spec: ZenithHeaderSpecifier,
1247    ) -> ColdResult<Option<DbZenithHeader>> {
1248        let block = match spec {
1249            ZenithHeaderSpecifier::Number(n) => n,
1250            ZenithHeaderSpecifier::Range { start, .. } => start,
1251        };
1252        let bn = to_i64(block);
1253        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1254            .bind(bn)
1255            .fetch_optional(&self.pool)
1256            .await
1257            .map_err(SqlColdError::from)?;
1258
1259        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1260    }
1261
1262    async fn get_zenith_headers(
1263        &self,
1264        spec: ZenithHeaderSpecifier,
1265    ) -> ColdResult<Vec<DbZenithHeader>> {
1266        let rows = match spec {
1267            ZenithHeaderSpecifier::Number(n) => {
1268                let bn = to_i64(n);
1269                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1270                    .bind(bn)
1271                    .fetch_all(&self.pool)
1272                    .await
1273                    .map_err(SqlColdError::from)?
1274            }
1275            ZenithHeaderSpecifier::Range { start, end } => {
1276                let s = to_i64(start);
1277                let e = to_i64(end);
1278                sqlx::query(
1279                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1280                     ORDER BY block_number",
1281                )
1282                .bind(s)
1283                .bind(e)
1284                .fetch_all(&self.pool)
1285                .await
1286                .map_err(SqlColdError::from)?
1287            }
1288        };
1289
1290        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1291    }
1292
1293    async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1294        let from = filter.get_from_block().unwrap_or(0);
1295        let to = filter.get_to_block().unwrap_or(u64::MAX);
1296
1297        // Build WHERE clause: block range ($1, $2) + address/topic filters.
1298        let (filter_clause, params) = build_log_filter_clause(filter, 3);
1299        let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}");
1300
1301        // Run a cheap COUNT(*) query first to reject queries that exceed
1302        // the limit without loading any row data.
1303        let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}");
1304        let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to));
1305        for param in &params {
1306            count_query = count_query.bind(*param);
1307        }
1308        let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?;
1309        let count = from_i64(count_row.get::<i64, _>(COL_CNT)) as usize;
1310        if count > max_logs {
1311            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1312        }
1313
1314        // Fetch the actual log data with JOINs and the correlated subquery
1315        // for block_log_index (absolute position within block).
1316        let data_sql = format!(
1317            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1318               (r.first_log_index + l.log_index) AS block_log_index \
1319             FROM logs l \
1320             JOIN headers h ON l.block_number = h.block_number \
1321             JOIN transactions t ON l.block_number = t.block_number \
1322               AND l.tx_index = t.tx_index \
1323             JOIN receipts r ON l.block_number = r.block_number \
1324               AND l.tx_index = r.tx_index \
1325             WHERE {where_clause} \
1326             ORDER BY l.block_number, l.tx_index, l.log_index"
1327        );
1328        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1329        for param in &params {
1330            query = query.bind(*param);
1331        }
1332
1333        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1334
1335        rows.into_iter()
1336            .map(|r| {
1337                let log = log_from_row(&r);
1338                Ok(RpcLog {
1339                    inner: log,
1340                    block_hash: Some(r.get(COL_BLOCK_HASH)),
1341                    block_number: Some(from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))),
1342                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1343                    transaction_hash: Some(r.get(COL_TX_HASH)),
1344                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1345                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1346                    removed: false,
1347                })
1348            })
1349            .collect::<ColdResult<Vec<_>>>()
1350    }
1351
1352    async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
1353        #[cfg(feature = "postgres")]
1354        if self.is_postgres {
1355            return self.produce_log_stream_pg(filter, params).await;
1356        }
1357        signet_cold::produce_log_stream_default(self, filter, params).await;
1358    }
1359
1360    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1361        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1362            .fetch_one(&self.pool)
1363            .await
1364            .map_err(SqlColdError::from)?;
1365        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1366    }
1367
1368    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1369        self.insert_block(data).await.map_err(ColdStorageError::from)
1370    }
1371
1372    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1373        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1374        for block_data in data {
1375            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1376        }
1377        tx.commit().await.map_err(SqlColdError::from)?;
1378        Ok(())
1379    }
1380
1381    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1382        let bn = to_i64(block);
1383        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1384
1385        // Delete child tables first, then headers (preserves FK ordering).
1386        for table in
1387            ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"]
1388        {
1389            sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1"))
1390                .bind(bn)
1391                .execute(&mut *tx)
1392                .await
1393                .map_err(SqlColdError::from)?;
1394        }
1395
1396        tx.commit().await.map_err(SqlColdError::from)?;
1397        Ok(())
1398    }
1399}
1400
1401#[cfg(all(test, feature = "test-utils"))]
1402mod tests {
1403    use super::*;
1404    use signet_cold::conformance::conformance;
1405
1406    #[tokio::test]
1407    async fn sqlite_conformance() {
1408        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1409        conformance(backend).await.unwrap();
1410    }
1411
1412    #[tokio::test]
1413    async fn pg_conformance() {
1414        let Ok(url) = std::env::var("DATABASE_URL") else {
1415            eprintln!("skipping pg conformance: DATABASE_URL not set");
1416            return;
1417        };
1418        let backend = SqlColdBackend::connect(&url).await.unwrap();
1419        conformance(backend).await.unwrap();
1420    }
1421}