Skip to main content

signet_cold_sql/
backend.rs

1//! Unified SQL backend for cold storage.
2//!
3//! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4//! auto-detects the database type at construction time and runs the
5//! appropriate migration.
6
7use crate::SqlColdError;
8use crate::columns::{
9    COL_ACCESS_LIST, COL_ADDRESS, COL_AMOUNT, COL_AUTHORIZATION_LIST, COL_BASE_FEE_PER_GAS,
10    COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH,
11    COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID,
12    COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE,
13    COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FROM_ADDRESS, COL_GAS, COL_GAS_LIMIT, COL_GAS_PRICE,
14    COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOG_COUNT, COL_LOGS_BLOOM, COL_MAX_BN,
15    COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, COL_MIX_HASH,
16    COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, COL_PARENT_HASH,
17    COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID,
18    COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT,
19    COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2,
20    COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE,
21    COL_WITHDRAWALS_ROOT,
22};
23use crate::convert::{
24    EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty,
25    decode_authorization_list, decode_b256_vec, decode_u128_required, decode_u256,
26    encode_access_list, encode_authorization_list, encode_b256_vec, encode_u128, encode_u256,
27    from_address, from_i64, to_address, to_i64,
28};
29use alloy::{
30    consensus::{
31        Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType,
32        transaction::Recovered,
33    },
34    primitives::{Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature},
35};
36use signet_cold::{
37    BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
38    HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
39    ZenithHeaderSpecifier,
40};
41use signet_storage_types::{
42    ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
43    TransactionSigned,
44};
45use signet_zenith::{
46    Passage::{Enter, EnterToken},
47    Transactor::Transact,
48    Zenith,
49};
50use sqlx::{AnyPool, Row};
51
52/// SQL-based cold storage backend.
53///
54/// Uses [`sqlx::Any`] for database-agnostic access, supporting both
55/// PostgreSQL and SQLite through a single implementation. The backend
56/// is determined by the connection URL at construction time.
57///
58/// # Example
59///
60/// ```no_run
61/// # async fn example() {
62/// use signet_cold_sql::SqlColdBackend;
63///
64/// // SQLite (in-memory)
65/// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
66///
67/// // PostgreSQL
68/// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
69/// # }
70/// ```
71#[derive(Debug, Clone)]
72pub struct SqlColdBackend {
73    pool: AnyPool,
74}
75
76impl SqlColdBackend {
77    /// Create a new SQL cold storage backend from an existing [`AnyPool`].
78    ///
79    /// Auto-detects the database backend and creates all tables if they
80    /// do not already exist. Callers must ensure
81    /// [`sqlx::any::install_default_drivers`] has been called before
82    /// constructing the pool.
83    pub async fn new(pool: AnyPool) -> Result<Self, SqlColdError> {
84        // Detect backend from a pooled connection.
85        let conn = pool.acquire().await?;
86        let backend = conn.backend_name().to_owned();
87        drop(conn);
88
89        let migration = match backend.as_str() {
90            "PostgreSQL" => include_str!("../migrations/001_initial_pg.sql"),
91            "SQLite" => include_str!("../migrations/001_initial.sql"),
92            other => {
93                return Err(SqlColdError::Convert(format!(
94                    "unsupported database backend: {other}"
95                )));
96            }
97        };
98        // Execute via pool to ensure the migration uses the same
99        // connection that subsequent queries will use.
100        sqlx::raw_sql(migration).execute(&pool).await?;
101        Ok(Self { pool })
102    }
103
104    /// Connect to a database URL and create the backend.
105    ///
106    /// Installs the default sqlx drivers on the first call. The database
107    /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
108    ///
109    /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
110    /// limited to one connection to ensure all operations share the same
111    /// database.
112    pub async fn connect(url: &str) -> Result<Self, SqlColdError> {
113        sqlx::any::install_default_drivers();
114        let pool: AnyPool = sqlx::pool::PoolOptions::new().max_connections(1).connect(url).await?;
115        Self::new(pool).await
116    }
117
118    // ========================================================================
119    // Specifier resolution
120    // ========================================================================
121
122    async fn resolve_header_spec(
123        &self,
124        spec: HeaderSpecifier,
125    ) -> Result<Option<BlockNumber>, SqlColdError> {
126        match spec {
127            HeaderSpecifier::Number(n) => Ok(Some(n)),
128            HeaderSpecifier::Hash(hash) => {
129                let hash_bytes = hash.as_slice();
130                let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1")
131                    .bind(hash_bytes)
132                    .fetch_optional(&self.pool)
133                    .await?;
134                Ok(row.map(|r| from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER))))
135            }
136        }
137    }
138
139    // ========================================================================
140    // Read helpers
141    // ========================================================================
142
143    async fn fetch_header_by_number(
144        &self,
145        block_num: BlockNumber,
146    ) -> Result<Option<SealedHeader>, SqlColdError> {
147        let bn = to_i64(block_num);
148        let row = sqlx::query("SELECT * FROM headers WHERE block_number = $1")
149            .bind(bn)
150            .fetch_optional(&self.pool)
151            .await?;
152
153        row.map(|r| header_from_row(&r).map(|h| h.seal_slow())).transpose()
154    }
155
156    // ========================================================================
157    // Write helpers
158    // ========================================================================
159
160    async fn insert_block(&self, data: BlockData) -> Result<(), SqlColdError> {
161        let mut tx = self.pool.begin().await?;
162        write_block_to_tx(&mut tx, data).await?;
163        tx.commit().await?;
164        Ok(())
165    }
166}
167
168// ============================================================================
169// Row → domain type conversion (read path)
170// ============================================================================
171
172/// Extract a required BLOB column from a row.
173fn blob(r: &sqlx::any::AnyRow, col: &str) -> Vec<u8> {
174    r.get(col)
175}
176
177/// Extract an optional BLOB column from a row.
178fn opt_blob(r: &sqlx::any::AnyRow, col: &str) -> Option<Vec<u8>> {
179    r.get(col)
180}
181
182/// Build a [`Header`] from an [`sqlx::any::AnyRow`].
183fn header_from_row(r: &sqlx::any::AnyRow) -> Result<Header, SqlColdError> {
184    Ok(Header {
185        parent_hash: B256::from_slice(&blob(r, COL_PARENT_HASH)),
186        ommers_hash: B256::from_slice(&blob(r, COL_OMMERS_HASH)),
187        beneficiary: Address::from_slice(&blob(r, COL_BENEFICIARY)),
188        state_root: B256::from_slice(&blob(r, COL_STATE_ROOT)),
189        transactions_root: B256::from_slice(&blob(r, COL_TRANSACTIONS_ROOT)),
190        receipts_root: B256::from_slice(&blob(r, COL_RECEIPTS_ROOT)),
191        logs_bloom: Bloom::from_slice(&blob(r, COL_LOGS_BLOOM)),
192        difficulty: decode_u256(&blob(r, COL_DIFFICULTY))?,
193        number: from_i64(r.get(COL_BLOCK_NUMBER)),
194        gas_limit: from_i64(r.get(COL_GAS_LIMIT)),
195        gas_used: from_i64(r.get(COL_GAS_USED)),
196        timestamp: from_i64(r.get(COL_TIMESTAMP)),
197        extra_data: Bytes::from(blob(r, COL_EXTRA_DATA)),
198        mix_hash: B256::from_slice(&blob(r, COL_MIX_HASH)),
199        nonce: alloy::primitives::B64::from_slice(&blob(r, COL_NONCE)),
200        base_fee_per_gas: r.get::<Option<i64>, _>(COL_BASE_FEE_PER_GAS).map(from_i64),
201        withdrawals_root: opt_blob(r, COL_WITHDRAWALS_ROOT).map(|b| B256::from_slice(&b)),
202        blob_gas_used: r.get::<Option<i64>, _>(COL_BLOB_GAS_USED).map(from_i64),
203        excess_blob_gas: r.get::<Option<i64>, _>(COL_EXCESS_BLOB_GAS).map(from_i64),
204        parent_beacon_block_root: opt_blob(r, COL_PARENT_BEACON_BLOCK_ROOT)
205            .map(|b| B256::from_slice(&b)),
206        requests_hash: opt_blob(r, COL_REQUESTS_HASH).map(|b| B256::from_slice(&b)),
207    })
208}
209
210/// Build a [`TransactionSigned`] from an [`sqlx::any::AnyRow`].
211fn tx_from_row(r: &sqlx::any::AnyRow) -> Result<TransactionSigned, SqlColdError> {
212    use alloy::consensus::EthereumTxEnvelope;
213
214    let sig = Signature::new(
215        decode_u256(&r.get::<Vec<u8>, _>(COL_SIG_R))?,
216        decode_u256(&r.get::<Vec<u8>, _>(COL_SIG_S))?,
217        r.get::<i32, _>(COL_SIG_Y_PARITY) != 0,
218    );
219
220    let tx_type_raw = r.get::<i32, _>(COL_TX_TYPE) as u8;
221    let tx_type = TxType::try_from(tx_type_raw)
222        .map_err(|_| SqlColdError::Convert(format!("invalid tx_type: {tx_type_raw}")))?;
223
224    let chain_id: Option<i64> = r.get(COL_CHAIN_ID);
225    let nonce = from_i64(r.get(COL_NONCE));
226    let gas_limit = from_i64(r.get(COL_GAS_LIMIT));
227    let to_addr = opt_blob(r, COL_TO_ADDRESS);
228    let value = decode_u256(&r.get::<Vec<u8>, _>(COL_VALUE))?;
229    let input = Bytes::from(r.get::<Vec<u8>, _>(COL_INPUT));
230
231    match tx_type {
232        TxType::Legacy => {
233            let tx = TxLegacy {
234                chain_id: chain_id.map(from_i64),
235                nonce,
236                gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
237                gas_limit,
238                to: from_address(to_addr.as_deref()),
239                value,
240                input,
241            };
242            Ok(EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)))
243        }
244        TxType::Eip2930 => {
245            let tx = TxEip2930 {
246                chain_id: from_i64(
247                    chain_id
248                        .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?,
249                ),
250                nonce,
251                gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?,
252                gas_limit,
253                to: from_address(to_addr.as_deref()),
254                value,
255                input,
256                access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
257            };
258            Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig)))
259        }
260        TxType::Eip1559 => {
261            let tx = TxEip1559 {
262                chain_id: from_i64(
263                    chain_id
264                        .ok_or_else(|| SqlColdError::Convert("EIP1559 requires chain_id".into()))?,
265                ),
266                nonce,
267                gas_limit,
268                max_fee_per_gas: decode_u128_required(
269                    &opt_blob(r, COL_MAX_FEE_PER_GAS),
270                    COL_MAX_FEE_PER_GAS,
271                )?,
272                max_priority_fee_per_gas: decode_u128_required(
273                    &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
274                    COL_MAX_PRIORITY_FEE_PER_GAS,
275                )?,
276                to: from_address(to_addr.as_deref()),
277                value,
278                input,
279                access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
280            };
281            Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig)))
282        }
283        TxType::Eip4844 => {
284            let tx =
285                TxEip4844 {
286                    chain_id: from_i64(chain_id.ok_or_else(|| {
287                        SqlColdError::Convert("EIP4844 requires chain_id".into())
288                    })?),
289                    nonce,
290                    gas_limit,
291                    max_fee_per_gas: decode_u128_required(
292                        &opt_blob(r, COL_MAX_FEE_PER_GAS),
293                        COL_MAX_FEE_PER_GAS,
294                    )?,
295                    max_priority_fee_per_gas: decode_u128_required(
296                        &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
297                        COL_MAX_PRIORITY_FEE_PER_GAS,
298                    )?,
299                    to: Address::from_slice(to_addr.as_deref().ok_or_else(|| {
300                        SqlColdError::Convert("EIP4844 requires to_address".into())
301                    })?),
302                    value,
303                    input,
304                    access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
305                    blob_versioned_hashes: decode_b256_vec(
306                        opt_blob(r, COL_BLOB_VERSIONED_HASHES).as_deref().ok_or_else(|| {
307                            SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into())
308                        })?,
309                    )?,
310                    max_fee_per_blob_gas: decode_u128_required(
311                        &opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS),
312                        COL_MAX_FEE_PER_BLOB_GAS,
313                    )?,
314                };
315            Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig)))
316        }
317        TxType::Eip7702 => {
318            let tx =
319                TxEip7702 {
320                    chain_id: from_i64(chain_id.ok_or_else(|| {
321                        SqlColdError::Convert("EIP7702 requires chain_id".into())
322                    })?),
323                    nonce,
324                    gas_limit,
325                    max_fee_per_gas: decode_u128_required(
326                        &opt_blob(r, COL_MAX_FEE_PER_GAS),
327                        COL_MAX_FEE_PER_GAS,
328                    )?,
329                    max_priority_fee_per_gas: decode_u128_required(
330                        &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS),
331                        COL_MAX_PRIORITY_FEE_PER_GAS,
332                    )?,
333                    to: Address::from_slice(to_addr.as_deref().ok_or_else(|| {
334                        SqlColdError::Convert("EIP7702 requires to_address".into())
335                    })?),
336                    value,
337                    input,
338                    access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?,
339                    authorization_list: decode_authorization_list(
340                        opt_blob(r, COL_AUTHORIZATION_LIST).as_deref().ok_or_else(|| {
341                            SqlColdError::Convert("EIP7702 requires authorization_list".into())
342                        })?,
343                    )?,
344                };
345            Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig)))
346        }
347    }
348}
349
350/// Build a [`RecoveredTx`] from a row that includes `from_address`.
351fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result<RecoveredTx, SqlColdError> {
352    let sender = Address::from_slice(&r.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
353    let tx = tx_from_row(r)?;
354    // SAFETY: the sender was recovered at append time and stored in from_address.
355    Ok(Recovered::new_unchecked(tx, sender))
356}
357
358/// Build a [`Log`] from an [`sqlx::any::AnyRow`].
359fn log_from_row(r: &sqlx::any::AnyRow) -> Log {
360    let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3]
361        .into_iter()
362        .filter_map(|col| r.get::<Option<Vec<u8>>, _>(col))
363        .map(|t| B256::from_slice(&t))
364        .collect();
365    Log {
366        address: Address::from_slice(&r.get::<Vec<u8>, _>(COL_ADDRESS)),
367        data: LogData::new_unchecked(topics, Bytes::from(r.get::<Vec<u8>, _>(COL_DATA))),
368    }
369}
370
371/// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`].
372fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result<DbSignetEvent, SqlColdError> {
373    let event_type = r.get::<i32, _>(COL_EVENT_TYPE) as i16;
374    let order = from_i64(r.get(COL_ORDER_INDEX));
375    let rollup_chain_id = decode_u256(&r.get::<Vec<u8>, _>(COL_ROLLUP_CHAIN_ID))?;
376
377    match event_type {
378        EVENT_TRANSACT => {
379            let sender = Address::from_slice(
380                opt_blob(r, COL_SENDER)
381                    .as_deref()
382                    .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?,
383            );
384            let to = Address::from_slice(
385                opt_blob(r, COL_TO_ADDRESS)
386                    .as_deref()
387                    .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?,
388            );
389            let value = decode_u256(
390                opt_blob(r, COL_VALUE)
391                    .as_deref()
392                    .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?,
393            )?;
394            let gas = decode_u256(
395                opt_blob(r, COL_GAS)
396                    .as_deref()
397                    .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?,
398            )?;
399            let max_fee =
400                decode_u256(opt_blob(r, COL_MAX_FEE_PER_GAS).as_deref().ok_or_else(|| {
401                    SqlColdError::Convert("Transact requires max_fee_per_gas".into())
402                })?)?;
403            let data = Bytes::from(opt_blob(r, COL_DATA).unwrap_or_default());
404
405            Ok(DbSignetEvent::Transact(
406                order,
407                Transact {
408                    rollupChainId: rollup_chain_id,
409                    sender,
410                    to,
411                    value,
412                    gas,
413                    maxFeePerGas: max_fee,
414                    data,
415                },
416            ))
417        }
418        EVENT_ENTER => {
419            let recipient =
420                Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else(
421                    || SqlColdError::Convert("Enter requires rollup_recipient".into()),
422                )?);
423            let amount = decode_u256(
424                opt_blob(r, COL_AMOUNT)
425                    .as_deref()
426                    .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?,
427            )?;
428
429            Ok(DbSignetEvent::Enter(
430                order,
431                Enter { rollupChainId: rollup_chain_id, rollupRecipient: recipient, amount },
432            ))
433        }
434        EVENT_ENTER_TOKEN => {
435            let token = Address::from_slice(
436                opt_blob(r, COL_TOKEN)
437                    .as_deref()
438                    .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?,
439            );
440            let recipient =
441                Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else(
442                    || SqlColdError::Convert("EnterToken requires rollup_recipient".into()),
443                )?);
444            let amount = decode_u256(
445                opt_blob(r, COL_AMOUNT)
446                    .as_deref()
447                    .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?,
448            )?;
449
450            Ok(DbSignetEvent::EnterToken(
451                order,
452                EnterToken {
453                    rollupChainId: rollup_chain_id,
454                    token,
455                    rollupRecipient: recipient,
456                    amount,
457                },
458            ))
459        }
460        _ => Err(SqlColdError::Convert(format!("invalid event_type: {event_type}"))),
461    }
462}
463
464/// Build a [`DbZenithHeader`] from an [`sqlx::any::AnyRow`].
465fn zenith_header_from_row(r: &sqlx::any::AnyRow) -> Result<DbZenithHeader, SqlColdError> {
466    Ok(DbZenithHeader(Zenith::BlockHeader {
467        hostBlockNumber: decode_u256(&blob(r, COL_HOST_BLOCK_NUMBER))?,
468        rollupChainId: decode_u256(&blob(r, COL_ROLLUP_CHAIN_ID))?,
469        gasLimit: decode_u256(&blob(r, COL_GAS_LIMIT))?,
470        rewardAddress: Address::from_slice(&blob(r, COL_REWARD_ADDRESS)),
471        blockDataHash: alloy::primitives::FixedBytes::<32>::from_slice(&blob(
472            r,
473            COL_BLOCK_DATA_HASH,
474        )),
475    }))
476}
477
478// ============================================================================
479// Domain type → SQL INSERT (write path)
480// ============================================================================
481
482/// Write a single block's data into an open SQL transaction.
483async fn write_block_to_tx(
484    tx: &mut sqlx::Transaction<'_, sqlx::Any>,
485    data: BlockData,
486) -> Result<(), SqlColdError> {
487    let bn = to_i64(data.block_number());
488
489    // Insert header
490    let block_hash = data.header.hash_slow();
491    let difficulty = encode_u256(&data.header.difficulty);
492    sqlx::query(
493        "INSERT INTO headers (
494            block_number, block_hash, parent_hash, ommers_hash, beneficiary,
495            state_root, transactions_root, receipts_root, logs_bloom, difficulty,
496            gas_limit, gas_used, timestamp, extra_data, mix_hash, nonce,
497            base_fee_per_gas, withdrawals_root, blob_gas_used, excess_blob_gas,
498            parent_beacon_block_root, requests_hash
499        ) VALUES (
500            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
501            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22
502        )",
503    )
504    .bind(bn)
505    .bind(block_hash.as_slice())
506    .bind(data.header.parent_hash.as_slice())
507    .bind(data.header.ommers_hash.as_slice())
508    .bind(data.header.beneficiary.as_slice())
509    .bind(data.header.state_root.as_slice())
510    .bind(data.header.transactions_root.as_slice())
511    .bind(data.header.receipts_root.as_slice())
512    .bind(data.header.logs_bloom.as_slice())
513    .bind(difficulty.as_slice())
514    .bind(to_i64(data.header.gas_limit))
515    .bind(to_i64(data.header.gas_used))
516    .bind(to_i64(data.header.timestamp))
517    .bind(data.header.extra_data.as_ref())
518    .bind(data.header.mix_hash.as_slice())
519    .bind(data.header.nonce.as_slice())
520    .bind(data.header.base_fee_per_gas.map(to_i64))
521    .bind(data.header.withdrawals_root.as_ref().map(|r| r.as_slice()))
522    .bind(data.header.blob_gas_used.map(to_i64))
523    .bind(data.header.excess_blob_gas.map(to_i64))
524    .bind(data.header.parent_beacon_block_root.as_ref().map(|r| r.as_slice()))
525    .bind(data.header.requests_hash.as_ref().map(|r| r.as_slice()))
526    .execute(&mut **tx)
527    .await?;
528
529    // Insert transactions
530    for (idx, recovered_tx) in data.transactions.iter().enumerate() {
531        insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?;
532    }
533
534    // Insert receipts and logs
535    for (idx, receipt) in data.receipts.iter().enumerate() {
536        let tx_idx = to_i64(idx as u64);
537        sqlx::query(
538            "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used)
539             VALUES ($1, $2, $3, $4, $5)",
540        )
541        .bind(bn)
542        .bind(tx_idx)
543        .bind(receipt.tx_type as i32)
544        .bind(receipt.inner.status.coerce_status() as i32)
545        .bind(to_i64(receipt.inner.cumulative_gas_used))
546        .execute(&mut **tx)
547        .await?;
548
549        for (log_idx, log) in receipt.inner.logs.iter().enumerate() {
550            let topics = log.topics();
551            sqlx::query(
552                "INSERT INTO logs (block_number, tx_index, log_index, address, topic0, topic1, topic2, topic3, data)
553                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
554            )
555            .bind(bn)
556            .bind(tx_idx)
557            .bind(to_i64(log_idx as u64))
558            .bind(log.address.as_slice())
559            .bind(topics.first().map(|t| t.as_slice()))
560            .bind(topics.get(1).map(|t| t.as_slice()))
561            .bind(topics.get(2).map(|t| t.as_slice()))
562            .bind(topics.get(3).map(|t| t.as_slice()))
563            .bind(log.data.data.as_ref())
564            .execute(&mut **tx)
565            .await?;
566        }
567    }
568
569    // Insert signet events
570    for (idx, event) in data.signet_events.iter().enumerate() {
571        insert_signet_event(tx, bn, to_i64(idx as u64), event).await?;
572    }
573
574    // Insert zenith header
575    if let Some(zh) = &data.zenith_header {
576        let h = &zh.0;
577        let host_bn = encode_u256(&h.hostBlockNumber);
578        let chain_id = encode_u256(&h.rollupChainId);
579        let gas_limit = encode_u256(&h.gasLimit);
580        sqlx::query(
581            "INSERT INTO zenith_headers (
582                block_number, host_block_number, rollup_chain_id,
583                gas_limit, reward_address, block_data_hash
584            ) VALUES ($1, $2, $3, $4, $5, $6)",
585        )
586        .bind(bn)
587        .bind(host_bn.as_slice())
588        .bind(chain_id.as_slice())
589        .bind(gas_limit.as_slice())
590        .bind(h.rewardAddress.as_slice())
591        .bind(h.blockDataHash.as_slice())
592        .execute(&mut **tx)
593        .await?;
594    }
595
596    Ok(())
597}
598
599/// Insert a transaction, binding directly from the source type.
600async fn insert_transaction(
601    conn: &mut sqlx::AnyConnection,
602    bn: i64,
603    tx_index: i64,
604    recovered: &RecoveredTx,
605) -> Result<(), SqlColdError> {
606    use alloy::consensus::EthereumTxEnvelope;
607
608    let sender = recovered.signer();
609    let tx: &TransactionSigned = recovered;
610    let tx_hash = tx.tx_hash();
611    let tx_type = tx.tx_type() as i32;
612
613    macro_rules! sig {
614        ($s:expr) => {{
615            let sig = $s.signature();
616            (sig.v() as i32, encode_u256(&sig.r()), encode_u256(&sig.s()))
617        }};
618    }
619    let (sig_y, sig_r, sig_s) = match tx {
620        EthereumTxEnvelope::Legacy(s) => sig!(s),
621        EthereumTxEnvelope::Eip2930(s) => sig!(s),
622        EthereumTxEnvelope::Eip1559(s) => sig!(s),
623        EthereumTxEnvelope::Eip4844(s) => sig!(s),
624        EthereumTxEnvelope::Eip7702(s) => sig!(s),
625    };
626
627    let (chain_id, nonce, gas_limit) = match tx {
628        EthereumTxEnvelope::Legacy(s) => {
629            (s.tx().chain_id.map(to_i64), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
630        }
631        EthereumTxEnvelope::Eip2930(s) => {
632            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
633        }
634        EthereumTxEnvelope::Eip1559(s) => {
635            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
636        }
637        EthereumTxEnvelope::Eip4844(s) => {
638            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
639        }
640        EthereumTxEnvelope::Eip7702(s) => {
641            (Some(to_i64(s.tx().chain_id)), to_i64(s.tx().nonce), to_i64(s.tx().gas_limit))
642        }
643    };
644
645    let (value, to_addr) = match tx {
646        EthereumTxEnvelope::Legacy(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
647        EthereumTxEnvelope::Eip2930(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
648        EthereumTxEnvelope::Eip1559(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)),
649        EthereumTxEnvelope::Eip4844(s) => {
650            (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec()))
651        }
652        EthereumTxEnvelope::Eip7702(s) => {
653            (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec()))
654        }
655    };
656
657    let input: &[u8] = match tx {
658        EthereumTxEnvelope::Legacy(s) => s.tx().input.as_ref(),
659        EthereumTxEnvelope::Eip2930(s) => s.tx().input.as_ref(),
660        EthereumTxEnvelope::Eip1559(s) => s.tx().input.as_ref(),
661        EthereumTxEnvelope::Eip4844(s) => s.tx().input.as_ref(),
662        EthereumTxEnvelope::Eip7702(s) => s.tx().input.as_ref(),
663    };
664
665    let (gas_price, max_fee, max_priority_fee, max_blob_fee) = match tx {
666        EthereumTxEnvelope::Legacy(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
667        EthereumTxEnvelope::Eip2930(s) => (Some(encode_u128(s.tx().gas_price)), None, None, None),
668        EthereumTxEnvelope::Eip1559(s) => (
669            None,
670            Some(encode_u128(s.tx().max_fee_per_gas)),
671            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
672            None,
673        ),
674        EthereumTxEnvelope::Eip4844(s) => (
675            None,
676            Some(encode_u128(s.tx().max_fee_per_gas)),
677            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
678            Some(encode_u128(s.tx().max_fee_per_blob_gas)),
679        ),
680        EthereumTxEnvelope::Eip7702(s) => (
681            None,
682            Some(encode_u128(s.tx().max_fee_per_gas)),
683            Some(encode_u128(s.tx().max_priority_fee_per_gas)),
684            None,
685        ),
686    };
687
688    let (access_list, blob_hashes, auth_list) = match tx {
689        EthereumTxEnvelope::Legacy(_) => (None, None, None),
690        EthereumTxEnvelope::Eip2930(s) => {
691            (Some(encode_access_list(&s.tx().access_list)), None, None)
692        }
693        EthereumTxEnvelope::Eip1559(s) => {
694            (Some(encode_access_list(&s.tx().access_list)), None, None)
695        }
696        EthereumTxEnvelope::Eip4844(s) => (
697            Some(encode_access_list(&s.tx().access_list)),
698            Some(encode_b256_vec(&s.tx().blob_versioned_hashes)),
699            None,
700        ),
701        EthereumTxEnvelope::Eip7702(s) => (
702            Some(encode_access_list(&s.tx().access_list)),
703            None,
704            Some(encode_authorization_list(&s.tx().authorization_list)),
705        ),
706    };
707
708    sqlx::query(
709        "INSERT INTO transactions (
710            block_number, tx_index, tx_hash, tx_type,
711            sig_y_parity, sig_r, sig_s,
712            chain_id, nonce, gas_limit, to_address, value, input,
713            gas_price, max_fee_per_gas, max_priority_fee_per_gas,
714            max_fee_per_blob_gas, blob_versioned_hashes,
715            access_list, authorization_list, from_address
716        ) VALUES (
717            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
718            $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
719        )",
720    )
721    .bind(bn)
722    .bind(tx_index)
723    .bind(tx_hash.as_slice())
724    .bind(tx_type)
725    .bind(sig_y)
726    .bind(sig_r.as_slice())
727    .bind(sig_s.as_slice())
728    .bind(chain_id)
729    .bind(nonce)
730    .bind(gas_limit)
731    .bind(to_addr.as_deref())
732    .bind(value.as_slice())
733    .bind(input)
734    .bind(gas_price.as_ref().map(|v| v.as_slice()))
735    .bind(max_fee.as_ref().map(|v| v.as_slice()))
736    .bind(max_priority_fee.as_ref().map(|v| v.as_slice()))
737    .bind(max_blob_fee.as_ref().map(|v| v.as_slice()))
738    .bind(blob_hashes.as_deref())
739    .bind(access_list.as_deref())
740    .bind(auth_list.as_deref())
741    .bind(sender.as_slice())
742    .execute(&mut *conn)
743    .await?;
744
745    Ok(())
746}
747
748/// Insert a signet event, binding directly from the source type.
749async fn insert_signet_event(
750    conn: &mut sqlx::AnyConnection,
751    block_number: i64,
752    event_index: i64,
753    event: &DbSignetEvent,
754) -> Result<(), SqlColdError> {
755    let (event_type, order, chain_id) = match event {
756        DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), encode_u256(&t.rollupChainId)),
757        DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), encode_u256(&e.rollupChainId)),
758        DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), encode_u256(&e.rollupChainId)),
759    };
760
761    let (value, gas, max_fee, amount) = match event {
762        DbSignetEvent::Transact(_, t) => (
763            Some(encode_u256(&t.value)),
764            Some(encode_u256(&t.gas)),
765            Some(encode_u256(&t.maxFeePerGas)),
766            None,
767        ),
768        DbSignetEvent::Enter(_, e) => (None, None, None, Some(encode_u256(&e.amount))),
769        DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(encode_u256(&e.amount))),
770    };
771
772    sqlx::query(
773        "INSERT INTO signet_events (
774            block_number, event_index, event_type, order_index,
775            rollup_chain_id, sender, to_address, value, gas,
776            max_fee_per_gas, data, rollup_recipient, amount, token
777        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
778    )
779    .bind(block_number)
780    .bind(event_index)
781    .bind(event_type)
782    .bind(order)
783    .bind(chain_id.as_slice())
784    .bind(match event {
785        DbSignetEvent::Transact(_, t) => Some(t.sender.as_slice()),
786        _ => None,
787    })
788    .bind(match event {
789        DbSignetEvent::Transact(_, t) => Some(t.to.as_slice()),
790        _ => None,
791    })
792    .bind(value.as_ref().map(|v| v.as_slice()))
793    .bind(gas.as_ref().map(|v| v.as_slice()))
794    .bind(max_fee.as_ref().map(|v| v.as_slice()))
795    .bind(match event {
796        DbSignetEvent::Transact(_, t) => Some(t.data.as_ref()),
797        _ => None,
798    })
799    .bind(match event {
800        DbSignetEvent::Enter(_, e) => Some(e.rollupRecipient.as_slice()),
801        DbSignetEvent::EnterToken(_, e) => Some(e.rollupRecipient.as_slice()),
802        _ => None,
803    })
804    .bind(amount.as_ref().map(|v| v.as_slice()))
805    .bind(match event {
806        DbSignetEvent::EnterToken(_, e) => Some(e.token.as_slice()),
807        _ => None,
808    })
809    .execute(&mut *conn)
810    .await?;
811
812    Ok(())
813}
814
815// ============================================================================
816// ColdStorage implementation
817// ============================================================================
818
819impl ColdStorage for SqlColdBackend {
820    async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
821        let Some(block_num) = self.resolve_header_spec(spec).await? else {
822            return Ok(None);
823        };
824        self.fetch_header_by_number(block_num).await.map_err(ColdStorageError::from)
825    }
826
827    async fn get_headers(
828        &self,
829        specs: Vec<HeaderSpecifier>,
830    ) -> ColdResult<Vec<Option<SealedHeader>>> {
831        let mut results = Vec::with_capacity(specs.len());
832        for spec in specs {
833            let header = self.get_header(spec).await?;
834            results.push(header);
835        }
836        Ok(results)
837    }
838
839    async fn get_transaction(
840        &self,
841        spec: TransactionSpecifier,
842    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
843        let row = match spec {
844            TransactionSpecifier::Hash(hash) => sqlx::query(
845                "SELECT t.*, h.block_hash
846                     FROM transactions t
847                     JOIN headers h ON t.block_number = h.block_number
848                     WHERE t.tx_hash = $1",
849            )
850            .bind(hash.as_slice())
851            .fetch_optional(&self.pool)
852            .await
853            .map_err(SqlColdError::from)?,
854            TransactionSpecifier::BlockAndIndex { block, index } => sqlx::query(
855                "SELECT t.*, h.block_hash
856                     FROM transactions t
857                     JOIN headers h ON t.block_number = h.block_number
858                     WHERE t.block_number = $1 AND t.tx_index = $2",
859            )
860            .bind(to_i64(block))
861            .bind(to_i64(index))
862            .fetch_optional(&self.pool)
863            .await
864            .map_err(SqlColdError::from)?,
865            TransactionSpecifier::BlockHashAndIndex { block_hash, index } => sqlx::query(
866                "SELECT t.*, h.block_hash
867                     FROM transactions t
868                     JOIN headers h ON t.block_number = h.block_number
869                     WHERE h.block_hash = $1 AND t.tx_index = $2",
870            )
871            .bind(block_hash.as_slice())
872            .bind(to_i64(index))
873            .fetch_optional(&self.pool)
874            .await
875            .map_err(SqlColdError::from)?,
876        };
877
878        let Some(r) = row else {
879            return Ok(None);
880        };
881
882        let block = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
883        let index = from_i64(r.get::<i64, _>(COL_TX_INDEX));
884        let hash_bytes: Vec<u8> = r.get(COL_BLOCK_HASH);
885        let block_hash = B256::from_slice(&hash_bytes);
886        let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?;
887        let meta = ConfirmationMeta::new(block, block_hash, index);
888        Ok(Some(Confirmed::new(recovered, meta)))
889    }
890
891    async fn get_transactions_in_block(&self, block: BlockNumber) -> ColdResult<Vec<RecoveredTx>> {
892        let bn = to_i64(block);
893        let rows =
894            sqlx::query("SELECT * FROM transactions WHERE block_number = $1 ORDER BY tx_index")
895                .bind(bn)
896                .fetch_all(&self.pool)
897                .await
898                .map_err(SqlColdError::from)?;
899
900        rows.iter().map(|r| recovered_tx_from_row(r).map_err(ColdStorageError::from)).collect()
901    }
902
903    async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
904        let bn = to_i64(block);
905        let row = sqlx::query("SELECT COUNT(*) as cnt FROM transactions WHERE block_number = $1")
906            .bind(bn)
907            .fetch_one(&self.pool)
908            .await
909            .map_err(SqlColdError::from)?;
910
911        Ok(from_i64(row.get::<i64, _>(COL_CNT)))
912    }
913
914    async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
915        // Resolve to (block, index)
916        let (block, index) = match spec {
917            ReceiptSpecifier::TxHash(hash) => {
918                let row = sqlx::query(
919                    "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1",
920                )
921                .bind(hash.as_slice())
922                .fetch_optional(&self.pool)
923                .await
924                .map_err(SqlColdError::from)?;
925                let Some(r) = row else { return Ok(None) };
926                (
927                    from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER)),
928                    from_i64(r.get::<i64, _>(COL_TX_INDEX)),
929                )
930            }
931            ReceiptSpecifier::BlockAndIndex { block, index } => (block, index),
932        };
933
934        let Some(header) = self.fetch_header_by_number(block).await? else {
935            return Ok(None);
936        };
937
938        // Fetch receipt + tx_hash + from_address
939        let receipt_row = sqlx::query(
940            "SELECT r.*, t.tx_hash, t.from_address
941             FROM receipts r
942             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
943             WHERE r.block_number = $1 AND r.tx_index = $2",
944        )
945        .bind(to_i64(block))
946        .bind(to_i64(index))
947        .fetch_optional(&self.pool)
948        .await
949        .map_err(SqlColdError::from)?;
950
951        let Some(rr) = receipt_row else {
952            return Ok(None);
953        };
954
955        let bn: i64 = rr.get(COL_BLOCK_NUMBER);
956        let tx_idx: i64 = rr.get(COL_TX_INDEX);
957        let tx_hash = B256::from_slice(&rr.get::<Vec<u8>, _>(COL_TX_HASH));
958        let sender = Address::from_slice(&rr.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
959        let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
960        let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
961        let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
962
963        let log_rows = sqlx::query(
964            "SELECT * FROM logs WHERE block_number = $1 AND tx_index = $2 ORDER BY log_index",
965        )
966        .bind(bn)
967        .bind(tx_idx)
968        .fetch_all(&self.pool)
969        .await
970        .map_err(SqlColdError::from)?;
971
972        let logs = log_rows.iter().map(log_from_row).collect();
973        let built = build_receipt(tx_type, success, cumulative_gas_used, logs)
974            .map_err(ColdStorageError::from)?;
975
976        // Compute gas_used and first_log_index by querying prior receipts
977        let prior = sqlx::query(
978            "SELECT CAST(SUM(
979                (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index)
980             ) AS bigint) as log_count,
981             CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas
982             FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2",
983        )
984        .bind(to_i64(block))
985        .bind(to_i64(index))
986        .fetch_one(&self.pool)
987        .await
988        .map_err(SqlColdError::from)?;
989
990        let first_log_index: u64 = prior.get::<Option<i64>, _>(COL_LOG_COUNT).unwrap_or(0) as u64;
991        let prior_cumulative_gas: u64 =
992            prior.get::<Option<i64>, _>(COL_PRIOR_GAS).unwrap_or(0) as u64;
993        let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas;
994
995        let ir = IndexedReceipt { receipt: built, tx_hash, first_log_index, gas_used, sender };
996        Ok(Some(ColdReceipt::new(ir, &header, index)))
997    }
998
999    async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
1000        let Some(header) =
1001            self.fetch_header_by_number(block).await.map_err(ColdStorageError::from)?
1002        else {
1003            return Ok(Vec::new());
1004        };
1005
1006        let bn = to_i64(block);
1007
1008        // Fetch receipts joined with tx_hash and from_address
1009        let receipt_rows = sqlx::query(
1010            "SELECT r.*, t.tx_hash, t.from_address
1011             FROM receipts r
1012             JOIN transactions t ON r.block_number = t.block_number AND r.tx_index = t.tx_index
1013             WHERE r.block_number = $1
1014             ORDER BY r.tx_index",
1015        )
1016        .bind(bn)
1017        .fetch_all(&self.pool)
1018        .await
1019        .map_err(SqlColdError::from)?;
1020
1021        let all_log_rows =
1022            sqlx::query("SELECT * FROM logs WHERE block_number = $1 ORDER BY tx_index, log_index")
1023                .bind(bn)
1024                .fetch_all(&self.pool)
1025                .await
1026                .map_err(SqlColdError::from)?;
1027
1028        // Group logs by tx_index
1029        let mut logs_by_tx: std::collections::BTreeMap<i64, Vec<Log>> =
1030            std::collections::BTreeMap::new();
1031        for r in &all_log_rows {
1032            let tx_idx: i64 = r.get(COL_TX_INDEX);
1033            logs_by_tx.entry(tx_idx).or_default().push(log_from_row(r));
1034        }
1035
1036        let mut first_log_index = 0u64;
1037        let mut prior_cumulative_gas = 0u64;
1038        receipt_rows
1039            .into_iter()
1040            .enumerate()
1041            .map(|(idx, rr)| {
1042                let tx_idx: i64 = rr.get(COL_TX_INDEX);
1043                let tx_hash = B256::from_slice(&rr.get::<Vec<u8>, _>(COL_TX_HASH));
1044                let sender = Address::from_slice(&rr.get::<Vec<u8>, _>(COL_FROM_ADDRESS));
1045                let tx_type = rr.get::<i32, _>(COL_TX_TYPE) as i16;
1046                let success = rr.get::<i32, _>(COL_SUCCESS) != 0;
1047                let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED);
1048                let logs = logs_by_tx.remove(&tx_idx).unwrap_or_default();
1049                let receipt = build_receipt(tx_type, success, cumulative_gas_used, logs)
1050                    .map_err(ColdStorageError::from)?;
1051                let gas_used = receipt.inner.cumulative_gas_used - prior_cumulative_gas;
1052                prior_cumulative_gas = receipt.inner.cumulative_gas_used;
1053                let ir = IndexedReceipt { receipt, tx_hash, first_log_index, gas_used, sender };
1054                first_log_index += ir.receipt.inner.logs.len() as u64;
1055                Ok(ColdReceipt::new(ir, &header, idx as u64))
1056            })
1057            .collect()
1058    }
1059
1060    async fn get_signet_events(
1061        &self,
1062        spec: SignetEventsSpecifier,
1063    ) -> ColdResult<Vec<DbSignetEvent>> {
1064        let rows = match spec {
1065            SignetEventsSpecifier::Block(block) => {
1066                let bn = to_i64(block);
1067                sqlx::query(
1068                    "SELECT * FROM signet_events WHERE block_number = $1 ORDER BY event_index",
1069                )
1070                .bind(bn)
1071                .fetch_all(&self.pool)
1072                .await
1073                .map_err(SqlColdError::from)?
1074            }
1075            SignetEventsSpecifier::BlockRange { start, end } => {
1076                let s = to_i64(start);
1077                let e = to_i64(end);
1078                sqlx::query(
1079                    "SELECT * FROM signet_events WHERE block_number >= $1 AND block_number <= $2
1080                     ORDER BY block_number, event_index",
1081                )
1082                .bind(s)
1083                .bind(e)
1084                .fetch_all(&self.pool)
1085                .await
1086                .map_err(SqlColdError::from)?
1087            }
1088        };
1089
1090        rows.iter().map(|r| signet_event_from_row(r).map_err(ColdStorageError::from)).collect()
1091    }
1092
1093    async fn get_zenith_header(
1094        &self,
1095        spec: ZenithHeaderSpecifier,
1096    ) -> ColdResult<Option<DbZenithHeader>> {
1097        let block = match spec {
1098            ZenithHeaderSpecifier::Number(n) => n,
1099            ZenithHeaderSpecifier::Range { start, .. } => start,
1100        };
1101        let bn = to_i64(block);
1102        let row = sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1103            .bind(bn)
1104            .fetch_optional(&self.pool)
1105            .await
1106            .map_err(SqlColdError::from)?;
1107
1108        row.map(|r| zenith_header_from_row(&r)).transpose().map_err(ColdStorageError::from)
1109    }
1110
1111    async fn get_zenith_headers(
1112        &self,
1113        spec: ZenithHeaderSpecifier,
1114    ) -> ColdResult<Vec<DbZenithHeader>> {
1115        let rows = match spec {
1116            ZenithHeaderSpecifier::Number(n) => {
1117                let bn = to_i64(n);
1118                sqlx::query("SELECT * FROM zenith_headers WHERE block_number = $1")
1119                    .bind(bn)
1120                    .fetch_all(&self.pool)
1121                    .await
1122                    .map_err(SqlColdError::from)?
1123            }
1124            ZenithHeaderSpecifier::Range { start, end } => {
1125                let s = to_i64(start);
1126                let e = to_i64(end);
1127                sqlx::query(
1128                    "SELECT * FROM zenith_headers WHERE block_number >= $1 AND block_number <= $2
1129                     ORDER BY block_number",
1130                )
1131                .bind(s)
1132                .bind(e)
1133                .fetch_all(&self.pool)
1134                .await
1135                .map_err(SqlColdError::from)?
1136            }
1137        };
1138
1139        rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect()
1140    }
1141
1142    async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
1143        let from = filter.get_from_block().unwrap_or(0);
1144        let to = filter.get_to_block().unwrap_or(u64::MAX);
1145
1146        // Build the shared WHERE clause for both count and data queries.
1147        let mut where_clause = String::from("l.block_number >= $1 AND l.block_number <= $2");
1148        let mut params: Vec<Vec<u8>> = Vec::new();
1149        let mut idx = 3u32;
1150
1151        // Address filter
1152        if !filter.address.is_empty() {
1153            let addrs: Vec<_> = filter.address.iter().collect();
1154            if addrs.len() == 1 {
1155                where_clause.push_str(&format!(" AND l.address = ${idx}"));
1156                params.push(addrs[0].as_slice().to_vec());
1157                idx += 1;
1158            } else {
1159                let placeholders: String = addrs
1160                    .iter()
1161                    .enumerate()
1162                    .map(|(i, _)| format!("${}", idx + i as u32))
1163                    .collect::<Vec<_>>()
1164                    .join(", ");
1165                where_clause.push_str(&format!(" AND l.address IN ({placeholders})"));
1166                for addr in &addrs {
1167                    params.push(addr.as_slice().to_vec());
1168                }
1169                idx += addrs.len() as u32;
1170            }
1171        }
1172
1173        // Topic filters
1174        let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"];
1175        for (i, topic_filter) in filter.topics.iter().enumerate() {
1176            if topic_filter.is_empty() {
1177                continue;
1178            }
1179            let values: Vec<_> = topic_filter.iter().collect();
1180            if values.len() == 1 {
1181                where_clause.push_str(&format!(" AND {} = ${idx}", topic_cols[i]));
1182                params.push(values[0].as_slice().to_vec());
1183                idx += 1;
1184            } else {
1185                let placeholders: String = values
1186                    .iter()
1187                    .enumerate()
1188                    .map(|(j, _)| format!("${}", idx + j as u32))
1189                    .collect::<Vec<_>>()
1190                    .join(", ");
1191                where_clause.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i]));
1192                for v in &values {
1193                    params.push(v.as_slice().to_vec());
1194                }
1195                idx += values.len() as u32;
1196            }
1197        }
1198
1199        // Run a cheap COUNT(*) query first to reject queries that exceed
1200        // the limit without loading any row data.
1201        let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}");
1202        let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to));
1203        for param in &params {
1204            count_query = count_query.bind(param.as_slice());
1205        }
1206        let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?;
1207        let count = from_i64(count_row.get::<i64, _>(COL_CNT)) as usize;
1208        if count > max_logs {
1209            return Err(ColdStorageError::TooManyLogs { limit: max_logs });
1210        }
1211
1212        // Fetch the actual log data with JOINs and the correlated subquery
1213        // for block_log_index (absolute position within block).
1214        let data_sql = format!(
1215            "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \
1216               (SELECT COUNT(*) FROM logs l2 \
1217                WHERE l2.block_number = l.block_number \
1218                  AND (l2.tx_index < l.tx_index \
1219                       OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \
1220               ) AS block_log_index \
1221             FROM logs l \
1222             JOIN headers h ON l.block_number = h.block_number \
1223             JOIN transactions t ON l.block_number = t.block_number \
1224               AND l.tx_index = t.tx_index \
1225             WHERE {where_clause} \
1226             ORDER BY l.block_number, l.tx_index, l.log_index"
1227        );
1228        let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to));
1229        for param in &params {
1230            query = query.bind(param.as_slice());
1231        }
1232
1233        let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?;
1234
1235        rows.into_iter()
1236            .map(|r| {
1237                let log = log_from_row(&r);
1238                let block_number = from_i64(r.get::<i64, _>(COL_BLOCK_NUMBER));
1239                let block_hash_bytes: Vec<u8> = r.get(COL_BLOCK_HASH);
1240                let tx_hash_bytes: Vec<u8> = r.get(COL_TX_HASH);
1241                Ok(RpcLog {
1242                    inner: log,
1243                    block_hash: Some(B256::from_slice(&block_hash_bytes)),
1244                    block_number: Some(block_number),
1245                    block_timestamp: Some(from_i64(r.get::<i64, _>(COL_BLOCK_TIMESTAMP))),
1246                    transaction_hash: Some(B256::from_slice(&tx_hash_bytes)),
1247                    transaction_index: Some(from_i64(r.get::<i64, _>(COL_TX_INDEX))),
1248                    log_index: Some(from_i64(r.get::<i64, _>(COL_BLOCK_LOG_INDEX))),
1249                    removed: false,
1250                })
1251            })
1252            .collect::<ColdResult<Vec<_>>>()
1253    }
1254
1255    async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
1256        let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers")
1257            .fetch_one(&self.pool)
1258            .await
1259            .map_err(SqlColdError::from)?;
1260        Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
1261    }
1262
1263    async fn append_block(&self, data: BlockData) -> ColdResult<()> {
1264        self.insert_block(data).await.map_err(ColdStorageError::from)
1265    }
1266
1267    async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
1268        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1269        for block_data in data {
1270            write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
1271        }
1272        tx.commit().await.map_err(SqlColdError::from)?;
1273        Ok(())
1274    }
1275
1276    async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
1277        let bn = to_i64(block);
1278        let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
1279
1280        sqlx::query("DELETE FROM logs WHERE block_number > $1")
1281            .bind(bn)
1282            .execute(&mut *tx)
1283            .await
1284            .map_err(SqlColdError::from)?;
1285        sqlx::query("DELETE FROM transactions WHERE block_number > $1")
1286            .bind(bn)
1287            .execute(&mut *tx)
1288            .await
1289            .map_err(SqlColdError::from)?;
1290        sqlx::query("DELETE FROM receipts WHERE block_number > $1")
1291            .bind(bn)
1292            .execute(&mut *tx)
1293            .await
1294            .map_err(SqlColdError::from)?;
1295        sqlx::query("DELETE FROM signet_events WHERE block_number > $1")
1296            .bind(bn)
1297            .execute(&mut *tx)
1298            .await
1299            .map_err(SqlColdError::from)?;
1300        sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1")
1301            .bind(bn)
1302            .execute(&mut *tx)
1303            .await
1304            .map_err(SqlColdError::from)?;
1305        sqlx::query("DELETE FROM headers WHERE block_number > $1")
1306            .bind(bn)
1307            .execute(&mut *tx)
1308            .await
1309            .map_err(SqlColdError::from)?;
1310
1311        tx.commit().await.map_err(SqlColdError::from)?;
1312        Ok(())
1313    }
1314}
1315
1316#[cfg(all(test, feature = "test-utils"))]
1317mod tests {
1318    use super::*;
1319    use signet_cold::conformance::conformance;
1320
1321    #[tokio::test]
1322    async fn sqlite_conformance() {
1323        let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
1324        conformance(&backend).await.unwrap();
1325    }
1326
1327    #[tokio::test]
1328    async fn pg_conformance() {
1329        let Ok(url) = std::env::var("DATABASE_URL") else {
1330            eprintln!("skipping pg conformance: DATABASE_URL not set");
1331            return;
1332        };
1333        let backend = SqlColdBackend::connect(&url).await.unwrap();
1334        conformance(&backend).await.unwrap();
1335    }
1336}