use std::{fmt::Debug, path::Path, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use crate::{RollupStoreError, api::StoreEngineRollup};
use ethereum_types::U256;
use ethrex_common::{
H256,
types::{
AccountUpdate, Blob, BlockNumber, balance_diff::AssetDiff, balance_diff::BalanceDiff,
batch::Batch, fee_config::FeeConfig,
},
};
use ethrex_l2_common::prover::{ProverInputData, ProverOutput, ProverType};
use libsql::{
Builder, Connection, Row, Rows, Transaction, Value,
params::{IntoParams, Params},
};
pub struct SQLStore {
read_conn: Connection,
write_conn: Arc<Mutex<Connection>>,
}
impl Debug for SQLStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("SQLStore")
}
}
const DB_SCHEMA: [&str; 22] = [
"CREATE TABLE IF NOT EXISTS blocks (block_number INT PRIMARY KEY, batch INT)",
"CREATE TABLE IF NOT EXISTS l1_messages (batch INT, idx INT, message_hash BLOB, PRIMARY KEY (batch, idx))",
"CREATE TABLE IF NOT EXISTS l2_rolling_hashes (batch INT PRIMARY KEY, value BLOB)",
"CREATE TABLE IF NOT EXISTS balance_diffs (batch INT, chain_id BLOB, value BLOB, message_hashes BLOB, value_per_token BLOB, PRIMARY KEY (batch, chain_id))",
"CREATE TABLE IF NOT EXISTS privileged_transactions (batch INT PRIMARY KEY, transactions_hash BLOB)",
"CREATE TABLE IF NOT EXISTS non_privileged_transactions (batch INT PRIMARY KEY, transactions INT)",
"CREATE TABLE IF NOT EXISTS state_roots (batch INT PRIMARY KEY, state_root BLOB)",
"CREATE TABLE IF NOT EXISTS blob_bundles (batch INT, idx INT, blob_bundle BLOB, PRIMARY KEY (batch, idx))",
"CREATE TABLE IF NOT EXISTS account_updates (block_number INT PRIMARY KEY, updates BLOB)",
"CREATE TABLE IF NOT EXISTS commit_txs (batch INT PRIMARY KEY, commit_tx BLOB)",
"CREATE TABLE IF NOT EXISTS verify_txs (batch INT PRIMARY KEY, verify_tx BLOB)",
"CREATE TABLE IF NOT EXISTS operation_count (_id INT PRIMARY KEY, transactions INT, privileged_transactions INT, messages INT)",
"INSERT INTO operation_count VALUES (0, 0, 0, 0) ON CONFLICT(_id) DO NOTHING",
"CREATE TABLE IF NOT EXISTS latest_sent (_id INT PRIMARY KEY, batch INT, verified_at INT DEFAULT 0)",
"INSERT INTO latest_sent (_id, batch) VALUES (0, 0) ON CONFLICT(_id) DO NOTHING",
"CREATE TABLE IF NOT EXISTS batch_proofs (batch INT, prover_type INT, proof BLOB, PRIMARY KEY (batch, prover_type))",
"CREATE TABLE IF NOT EXISTS block_signatures (block_hash BLOB PRIMARY KEY, signature BLOB)",
"CREATE TABLE IF NOT EXISTS batch_signatures (batch INT PRIMARY KEY, signature BLOB)",
"CREATE TABLE IF NOT EXISTS batch_prover_input (batch INT, prover_version TEXT, prover_input BLOB, PRIMARY KEY (batch, prover_version))",
"CREATE TABLE IF NOT EXISTS fee_config (block_number INT PRIMARY KEY, fee_config BLOB)",
"CREATE TABLE IF NOT EXISTS latest_sent_to_aligned (_id INT PRIMARY KEY, batch INT, sent_at INT)",
"INSERT INTO latest_sent_to_aligned VALUES (0, 0, 0) ON CONFLICT(_id) DO NOTHING",
];
impl SQLStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self, RollupStoreError> {
futures::executor::block_on(async {
let db = Builder::new_local(path).build().await?;
let write_conn = db.connect()?;
write_conn.busy_timeout(Duration::from_millis(5000))?;
let store = SQLStore {
read_conn: db.connect()?,
write_conn: Arc::new(Mutex::new(write_conn)),
};
store.init_db().await?;
Ok(store)
})
}
async fn execute<T: IntoParams>(&self, sql: &str, params: T) -> Result<(), RollupStoreError> {
let conn = self.write_conn.lock().await;
conn.execute(sql, params).await?;
Ok(())
}
#[doc(hidden)]
pub async fn query<T: IntoParams>(
&self,
sql: &str,
params: T,
) -> Result<Rows, RollupStoreError> {
Ok(self.read_conn.query(sql, params).await?)
}
async fn init_db(&self) -> Result<(), RollupStoreError> {
self.query("PRAGMA journal_mode=WAL;", ()).await?;
let empty_param = ().into_params()?;
let queries = DB_SCHEMA
.iter()
.map(|v| (*v, empty_param.clone()))
.collect();
self.execute_in_tx(queries, None).await?;
let _ = self
.execute(
"ALTER TABLE latest_sent ADD COLUMN verified_at INT DEFAULT 0",
(),
)
.await;
Ok(())
}
async fn execute_in_tx(
&self,
queries: Vec<(&str, Params)>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
if let Some(existing_tx) = db_tx {
for (query, params) in queries {
existing_tx.execute(query, params).await?;
}
} else {
let conn = self.write_conn.lock().await;
let tx = conn.transaction().await?;
for (query, params) in queries {
tx.execute(query, params).await?;
}
tx.commit().await?;
}
Ok(())
}
async fn store_batch_number_by_block_in_tx(
&self,
block_number: u64,
batch_number: u64,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![
(
"DELETE FROM blocks WHERE block_number = ?1",
vec![block_number].into_params()?,
),
(
"INSERT INTO blocks VALUES (?1, ?2)",
vec![block_number, batch_number].into_params()?,
),
];
self.execute_in_tx(queries, db_tx).await
}
async fn store_l1_message_hashes_by_batch_in_tx(
&self,
batch_number: u64,
message_hashes: Vec<H256>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let mut queries = vec![(
"DELETE FROM l1_messages WHERE batch = ?1",
vec![batch_number].into_params()?,
)];
for (index, hash) in message_hashes.iter().enumerate() {
let index = u64::try_from(index)
.map_err(|e| RollupStoreError::Custom(format!("conversion error: {e}")))?;
queries.push((
"INSERT INTO l1_messages VALUES (?1, ?2, ?3)",
(batch_number, index, Vec::from(hash.to_fixed_bytes())).into_params()?,
));
}
self.execute_in_tx(queries, db_tx).await
}
async fn store_non_privileged_transactions_by_batch_in_tx(
&self,
batch_number: u64,
non_privileged_transactions: u64,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![(
"INSERT OR REPLACE INTO non_privileged_transactions VALUES (?1, ?2)",
(batch_number, non_privileged_transactions).into_params()?,
)];
self.execute_in_tx(queries, db_tx).await
}
async fn store_balance_diffs_by_batch_in_tx(
&self,
batch_number: u64,
balance_diffs: Vec<BalanceDiff>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let mut queries = vec![(
"DELETE FROM balance_diffs WHERE batch = ?1",
vec![batch_number].into_params()?,
)];
for balance_diff in balance_diffs {
queries.push((
"INSERT INTO balance_diffs VALUES (?1, ?2, ?3, ?4, ?5)",
(
batch_number,
Vec::from(balance_diff.chain_id.to_big_endian()),
Vec::from(balance_diff.value.to_big_endian()),
balance_diff
.message_hashes
.iter()
.flat_map(|h| h.to_fixed_bytes())
.collect::<Vec<u8>>(),
bincode::serialize(&balance_diff.value_per_token).map_err(|e| {
RollupStoreError::Custom(format!(
"Failed to serialize balance_diff value_per_token: {e}"
))
})?,
)
.into_params()?,
));
}
self.execute_in_tx(queries, db_tx).await
}
async fn store_l2_rolling_hashes_by_batch_in_tx(
&self,
batch_number: u64,
l2_rolling_hashes: Vec<(u64, H256)>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let serialized = bincode::serialize(&l2_rolling_hashes)?;
let queries = vec![
(
"DELETE FROM l2_rolling_hashes WHERE batch = ?1",
vec![batch_number].into_params()?,
),
(
"INSERT INTO l2_rolling_hashes VALUES (?1, ?2)",
(batch_number, serialized).into_params()?,
),
];
self.execute_in_tx(queries, db_tx).await
}
async fn store_l1_in_messages_hash_by_batch_number_in_tx(
&self,
batch_number: u64,
l1_in_messages_hash: H256,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![
(
"DELETE FROM privileged_transactions WHERE batch = ?1",
vec![batch_number].into_params()?,
),
(
"INSERT INTO privileged_transactions VALUES (?1, ?2)",
(
batch_number,
Vec::from(l1_in_messages_hash.to_fixed_bytes()),
)
.into_params()?,
),
];
self.execute_in_tx(queries, db_tx).await
}
async fn store_state_root_by_batch_number_in_tx(
&self,
batch_number: u64,
state_root: H256,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![
(
"DELETE FROM state_roots WHERE batch = ?1",
vec![batch_number].into_params()?,
),
(
"INSERT INTO state_roots VALUES (?1, ?2)",
(batch_number, Vec::from(state_root.to_fixed_bytes())).into_params()?,
),
];
self.execute_in_tx(queries, db_tx).await
}
async fn store_blob_bundle_by_batch_number_in_tx(
&self,
batch_number: u64,
blobs: Vec<Blob>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let mut queries = vec![(
"DELETE FROM blob_bundles WHERE batch = ?1",
vec![batch_number].into_params()?,
)];
for (index, blob) in blobs.iter().enumerate() {
let index = u64::try_from(index)
.map_err(|e| RollupStoreError::Custom(format!("conversion error: {e}")))?;
queries.push((
"INSERT INTO blob_bundles VALUES (?1, ?2, ?3)",
(batch_number, index, blob.to_vec()).into_params()?,
));
}
self.execute_in_tx(queries, db_tx).await
}
async fn store_commit_tx_by_batch_in_tx(
&self,
batch_number: u64,
commit_tx: H256,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![(
"INSERT OR REPLACE INTO commit_txs VALUES (?1, ?2)",
(batch_number, Vec::from(commit_tx.to_fixed_bytes())).into_params()?,
)];
self.execute_in_tx(queries, db_tx).await
}
async fn store_verify_tx_by_batch_in_tx(
&self,
batch_number: u64,
verify_tx: H256,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let queries = vec![(
"INSERT OR REPLACE INTO verify_txs VALUES (?1, ?2)",
(batch_number, Vec::from(verify_tx.to_fixed_bytes())).into_params()?,
)];
self.execute_in_tx(queries, db_tx).await
}
async fn store_account_updates_by_block_number_in_tx(
&self,
block_number: BlockNumber,
account_updates: Vec<AccountUpdate>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let serialized = bincode::serialize(&account_updates)?;
let queries = vec![
(
"DELETE FROM account_updates WHERE block_number = ?1",
vec![block_number].into_params()?,
),
(
"INSERT INTO account_updates VALUES (?1, ?2)",
(block_number, serialized).into_params()?,
),
];
self.execute_in_tx(queries, db_tx).await
}
async fn store_block_numbers_by_batch_in_tx(
&self,
batch_number: u64,
block_numbers: Vec<BlockNumber>,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
for block_number in block_numbers {
self.store_batch_number_by_block_in_tx(block_number, batch_number, db_tx)
.await?;
}
Ok(())
}
async fn store_prover_input_by_batch_and_version_in_tx(
&self,
batch_number: u64,
prover_version: &str,
prover_input: ProverInputData,
db_tx: Option<&Transaction>,
) -> Result<(), RollupStoreError> {
let prover_input_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&prover_input)
.map_err(|e| {
RollupStoreError::Custom(format!("Failed to serialize prover input: {e}"))
})?
.to_vec();
let queries = vec![(
"INSERT OR REPLACE INTO batch_prover_input VALUES (?1, ?2, ?3)",
(batch_number, prover_version, prover_input_bytes).into_params()?,
)];
self.execute_in_tx(queries, db_tx).await
}
async fn seal_batch_in_tx(
&self,
batch: Batch,
transaction: &Transaction,
) -> Result<(), RollupStoreError> {
let blocks: Vec<u64> = (batch.first_block..=batch.last_block).collect();
for block_number in blocks.iter() {
self.store_batch_number_by_block_in_tx(*block_number, batch.number, Some(transaction))
.await?;
}
self.store_block_numbers_by_batch_in_tx(batch.number, blocks, Some(transaction))
.await?;
self.store_l1_message_hashes_by_batch_in_tx(
batch.number,
batch.l1_out_message_hashes,
Some(transaction),
)
.await?;
self.store_balance_diffs_by_batch_in_tx(
batch.number,
batch.balance_diffs,
Some(transaction),
)
.await?;
self.store_l1_in_messages_hash_by_batch_number_in_tx(
batch.number,
batch.l1_in_messages_rolling_hash,
Some(transaction),
)
.await?;
self.store_non_privileged_transactions_by_batch_in_tx(
batch.number,
batch.non_privileged_transactions,
Some(transaction),
)
.await?;
self.store_l2_rolling_hashes_by_batch_in_tx(
batch.number,
batch.l2_in_message_rolling_hashes,
Some(transaction),
)
.await?;
self.store_blob_bundle_by_batch_number_in_tx(
batch.number,
batch.blobs_bundle.blobs,
Some(transaction),
)
.await?;
self.store_state_root_by_batch_number_in_tx(
batch.number,
batch.state_root,
Some(transaction),
)
.await?;
if let Some(commit_tx) = batch.commit_tx {
self.store_commit_tx_by_batch_in_tx(batch.number, commit_tx, Some(transaction))
.await?;
}
if let Some(verify_tx) = batch.verify_tx {
self.store_verify_tx_by_batch_in_tx(batch.number, verify_tx, Some(transaction))
.await?;
}
Ok(())
}
}
fn read_from_row_int(row: &Row, index: i32) -> Result<u64, RollupStoreError> {
match row.get_value(index)? {
Value::Integer(i) => {
let val = i
.try_into()
.map_err(|e| RollupStoreError::Custom(format!("conversion error: {e}")))?;
Ok(val)
}
_ => Err(RollupStoreError::SQLInvalidTypeError),
}
}
fn read_from_row_blob(row: &Row, index: i32) -> Result<Vec<u8>, RollupStoreError> {
match row.get_value(index)? {
Value::Blob(vec) => Ok(vec),
_ => Err(RollupStoreError::SQLInvalidTypeError),
}
}
#[async_trait::async_trait]
impl StoreEngineRollup for SQLStore {
async fn get_batch_number_by_block(
&self,
block_number: BlockNumber,
) -> Result<Option<u64>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * from blocks WHERE block_number = ?1",
vec![block_number],
)
.await?;
if let Some(row) = rows.next().await? {
return Ok(Some(read_from_row_int(&row, 1)?));
}
Ok(None)
}
async fn get_l1_out_message_hashes_by_batch(
&self,
batch_number: u64,
) -> Result<Option<Vec<H256>>, RollupStoreError> {
let mut hashes = vec![];
let mut rows = self
.query(
"SELECT * from l1_messages WHERE batch = ?1 ORDER BY idx ASC",
vec![batch_number],
)
.await?;
while let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 2)?;
hashes.push(H256::from_slice(&vec));
}
if hashes.is_empty() {
Ok(None)
} else {
Ok(Some(hashes))
}
}
async fn get_balance_diffs_by_batch(
&self,
batch_number: u64,
) -> Result<Option<Vec<BalanceDiff>>, RollupStoreError> {
let mut balance_diffs = vec![];
let mut rows = self
.query(
"SELECT * from balance_diffs WHERE batch = ?1 ORDER BY chain_id ASC",
vec![batch_number],
)
.await?;
while let Some(row) = rows.next().await? {
let chain_id = U256::from_big_endian(&read_from_row_blob(&row, 1)?);
let value = U256::from_big_endian(&read_from_row_blob(&row, 2)?);
let blob = read_from_row_blob(&row, 3)?;
let message_hashes = blob.chunks(32).map(H256::from_slice).collect::<Vec<_>>();
let value_per_token: Vec<AssetDiff> =
bincode::deserialize(&read_from_row_blob(&row, 4)?).map_err(|e| {
RollupStoreError::Custom(format!(
"Failed to deserialize balance diff value_per_token: {e}"
))
})?;
balance_diffs.push(BalanceDiff {
chain_id,
value,
value_per_token,
message_hashes,
});
}
if balance_diffs.is_empty() {
Ok(None)
} else {
Ok(Some(balance_diffs))
}
}
async fn get_block_numbers_by_batch(
&self,
batch_number: u64,
) -> Result<Option<Vec<BlockNumber>>, RollupStoreError> {
let mut blocks = Vec::new();
let mut rows = self
.query("SELECT * from blocks WHERE batch = ?1", vec![batch_number])
.await?;
while let Some(row) = rows.next().await? {
let val = read_from_row_int(&row, 0)?;
blocks.push(val);
}
if blocks.is_empty() {
Ok(None)
} else {
Ok(Some(blocks))
}
}
async fn get_l2_in_message_rolling_hashes_by_batch(
&self,
batch_number: u64,
) -> Result<Option<Vec<(u64, H256)>>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * FROM l2_rolling_hashes WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 1)?;
let l2_rolling_hashes: Vec<(u64, H256)> = bincode::deserialize(&vec).map_err(|e| {
RollupStoreError::Custom(format!("error deserializing l2 rolling hashes: {e}"))
})?;
return Ok(Some(l2_rolling_hashes));
}
Ok(None)
}
async fn get_l1_in_messages_rolling_hash_by_batch_number(
&self,
batch_number: u64,
) -> Result<Option<H256>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * from privileged_transactions WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 1)?;
return Ok(Some(H256::from_slice(&vec)));
}
Ok(None)
}
async fn get_non_privileged_transactions_by_batch(
&self,
batch_number: u64,
) -> Result<Option<u64>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * from non_privileged_transactions WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let val = read_from_row_int(&row, 1)?;
return Ok(Some(val));
}
Ok(None)
}
async fn get_state_root_by_batch_number(
&self,
batch_number: u64,
) -> Result<Option<H256>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * FROM state_roots WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 1)?;
return Ok(Some(H256::from_slice(&vec)));
}
Ok(None)
}
async fn get_blob_bundle_by_batch_number(
&self,
batch_number: u64,
) -> Result<Option<Vec<Blob>>, RollupStoreError> {
let mut bundles = Vec::new();
let mut rows = self
.query(
"SELECT * FROM blob_bundles WHERE batch = ?1 ORDER BY idx ASC",
vec![batch_number],
)
.await?;
while let Some(row) = rows.next().await? {
let val = read_from_row_blob(&row, 2)?;
bundles.push(
Blob::try_from(val).map_err(|_| {
RollupStoreError::Custom("error converting to Blob".to_string())
})?,
);
}
if bundles.is_empty() {
Ok(None)
} else {
Ok(Some(bundles))
}
}
async fn store_commit_tx_by_batch(
&self,
batch_number: u64,
commit_tx: H256,
) -> Result<(), RollupStoreError> {
self.store_commit_tx_by_batch_in_tx(batch_number, commit_tx, None)
.await
}
async fn get_commit_tx_by_batch(
&self,
batch_number: u64,
) -> Result<Option<H256>, RollupStoreError> {
let mut rows = self
.query(
"SELECT commit_tx FROM commit_txs WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 0)?;
return Ok(Some(H256::from_slice(&vec)));
}
Ok(None)
}
async fn store_verify_tx_by_batch(
&self,
batch_number: u64,
verify_tx: H256,
) -> Result<(), RollupStoreError> {
self.store_verify_tx_by_batch_in_tx(batch_number, verify_tx, None)
.await
}
async fn get_verify_tx_by_batch(
&self,
batch_number: u64,
) -> Result<Option<H256>, RollupStoreError> {
let mut rows = self
.query(
"SELECT verify_tx FROM verify_txs WHERE batch = ?1",
vec![batch_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 0)?;
return Ok(Some(H256::from_slice(&vec)));
}
Ok(None)
}
async fn update_operations_count(
&self,
transaction_inc: u64,
privileged_transactions_inc: u64,
messages_inc: u64,
) -> Result<(), RollupStoreError> {
self.execute(
"UPDATE operation_count SET transactions = transactions + ?1, privileged_transactions = privileged_transactions + ?2, messages = messages + ?3",
(transaction_inc, privileged_transactions_inc, messages_inc)).await?;
Ok(())
}
async fn get_operations_count(&self) -> Result<[u64; 3], RollupStoreError> {
let mut rows = self.query("SELECT * from operation_count", ()).await?;
if let Some(row) = rows.next().await? {
return Ok([
read_from_row_int(&row, 1)?,
read_from_row_int(&row, 2)?,
read_from_row_int(&row, 3)?,
]);
}
Err(RollupStoreError::Custom(
"missing operation_count row".to_string(),
))
}
async fn contains_batch(&self, batch_number: &u64) -> Result<bool, RollupStoreError> {
let mut row = self
.query("SELECT * from blocks WHERE batch = ?1", vec![*batch_number])
.await?;
Ok(row.next().await?.is_some())
}
async fn get_latest_verified_batch_proof(&self) -> Result<(u64, u64), RollupStoreError> {
let mut rows = self
.query("SELECT batch, verified_at FROM latest_sent", ())
.await?;
if let Some(row) = rows.next().await? {
let batch = read_from_row_int(&row, 0)?;
let verified_at = read_from_row_int(&row, 1)?;
return Ok((batch, verified_at));
}
Err(RollupStoreError::Custom(
"missing latest_sent row".to_string(),
))
}
async fn set_latest_verified_batch_proof(
&self,
batch_number: u64,
verified_at: u64,
) -> Result<(), RollupStoreError> {
self.execute(
"INSERT OR REPLACE INTO latest_sent (_id, batch, verified_at) VALUES (0, ?1, ?2)",
(batch_number, verified_at),
)
.await?;
Ok(())
}
async fn get_latest_sent_to_aligned(&self) -> Result<u64, RollupStoreError> {
let mut rows = self
.query("SELECT batch FROM latest_sent_to_aligned WHERE _id = 0", ())
.await?;
if let Some(row) = rows.next().await? {
return read_from_row_int(&row, 0);
}
Err(RollupStoreError::Custom(
"missing latest_sent_to_aligned row".to_string(),
))
}
async fn set_latest_sent_to_aligned(&self, batch_number: u64) -> Result<(), RollupStoreError> {
self.execute(
"INSERT OR REPLACE INTO latest_sent_to_aligned (_id, batch, sent_at) VALUES (0, ?1, 0)",
[batch_number],
)
.await?;
Ok(())
}
async fn get_account_updates_by_block_number(
&self,
block_number: BlockNumber,
) -> Result<Option<Vec<AccountUpdate>>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * FROM account_updates WHERE block_number = ?1",
vec![block_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 1)?;
return Ok(Some(bincode::deserialize(&vec)?));
}
Ok(None)
}
async fn store_account_updates_by_block_number(
&self,
block_number: BlockNumber,
account_updates: Vec<AccountUpdate>,
) -> Result<(), RollupStoreError> {
self.store_account_updates_by_block_number_in_tx(block_number, account_updates, None)
.await
}
async fn revert_to_batch(&self, batch_number: u64) -> Result<(), RollupStoreError> {
let queries = vec![
(
"DELETE FROM blocks WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM l1_messages WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM l2_rolling_hashes WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM privileged_transactions WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM non_privileged_transactions WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM state_roots WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM blob_bundles WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM batch_proofs WHERE batch > ?1",
[batch_number].into_params()?,
),
(
"DELETE FROM batch_prover_input WHERE batch > ?1",
[batch_number].into_params()?,
),
];
self.execute_in_tx(queries, None).await
}
async fn store_proof_by_batch_and_type(
&self,
batch_number: u64,
prover_type: ProverType,
proof: ProverOutput,
) -> Result<(), RollupStoreError> {
let serialized_proof = bincode::serialize(&proof)?;
let prover_type: u32 = prover_type.into();
self.execute_in_tx(
vec![
(
"DELETE FROM batch_proofs WHERE batch = ?1 AND prover_type = ?2",
(batch_number, prover_type).into_params()?,
),
(
"INSERT INTO batch_proofs VALUES (?1, ?2, ?3)",
(batch_number, prover_type, serialized_proof).into_params()?,
),
],
None,
)
.await
}
async fn get_proof_by_batch_and_type(
&self,
batch_number: u64,
prover_type: ProverType,
) -> Result<Option<ProverOutput>, RollupStoreError> {
let prover_type: u32 = prover_type.into();
let mut rows = self
.query(
"SELECT proof from batch_proofs WHERE batch = ?1 AND prover_type = ?2",
(batch_number, prover_type),
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 0)?;
return Ok(Some(bincode::deserialize(&vec)?));
}
Ok(None)
}
async fn seal_batch(&self, batch: Batch) -> Result<(), RollupStoreError> {
let conn = self.write_conn.lock().await;
let transaction = conn.transaction().await?;
self.seal_batch_in_tx(batch, &transaction).await?;
transaction.commit().await.map_err(RollupStoreError::from)
}
async fn seal_batch_with_prover_input(
&self,
batch: Batch,
prover_version: &str,
prover_input: ProverInputData,
) -> Result<(), RollupStoreError> {
let conn = self.write_conn.lock().await;
let transaction = conn.transaction().await?;
self.store_prover_input_by_batch_and_version_in_tx(
batch.number,
prover_version,
prover_input,
Some(&transaction),
)
.await?;
self.seal_batch_in_tx(batch, &transaction).await?;
transaction.commit().await.map_err(RollupStoreError::from)
}
async fn store_signature_by_block(
&self,
block_hash: H256,
signature: ethereum_types::Signature,
) -> Result<(), RollupStoreError> {
self.execute_in_tx(
vec![
(
"DELETE FROM block_signatures WHERE block_hash = ?1",
vec![Vec::from(block_hash.to_fixed_bytes())].into_params()?,
),
(
"INSERT INTO block_signatures VALUES (?1, ?2)",
(
Vec::from(block_hash.to_fixed_bytes()),
Vec::from(signature.as_fixed_bytes()),
)
.into_params()?,
),
],
None,
)
.await
}
async fn get_signature_by_block(
&self,
block_hash: H256,
) -> Result<Option<ethereum_types::Signature>, RollupStoreError> {
let mut rows = self
.query(
"SELECT signature FROM block_signatures WHERE block_hash = ?1",
vec![Vec::from(block_hash.to_fixed_bytes())],
)
.await?;
rows.next()
.await?
.map(|row| {
read_from_row_blob(&row, 0)
.map(|vec| ethereum_types::Signature::from_slice(vec.as_slice()))
})
.transpose()
}
async fn store_signature_by_batch(
&self,
batch_number: u64,
signature: ethereum_types::Signature,
) -> Result<(), RollupStoreError> {
self.execute_in_tx(
vec![
(
"DELETE FROM batch_signatures WHERE batch = ?1",
vec![batch_number].into_params()?,
),
(
"INSERT INTO batch_signatures VALUES (?1, ?2)",
(batch_number, Vec::from(signature.to_fixed_bytes())).into_params()?,
),
],
None,
)
.await
}
async fn get_signature_by_batch(
&self,
batch_number: u64,
) -> Result<Option<ethereum_types::Signature>, RollupStoreError> {
let mut rows = self
.query(
"SELECT signature FROM batch_signatures WHERE batch = ?1",
vec![batch_number],
)
.await?;
rows.next()
.await?
.map(|row| {
read_from_row_blob(&row, 0)
.map(|vec| ethereum_types::Signature::from_slice(vec.as_slice()))
})
.transpose()
}
async fn delete_proof_by_batch_and_type(
&self,
batch_number: u64,
proof_type: ProverType,
) -> Result<(), RollupStoreError> {
let prover_type: u32 = proof_type.into();
self.execute_in_tx(
vec![(
"DELETE FROM batch_proofs WHERE batch = ?1 AND prover_type = ?2",
(batch_number, prover_type).into_params()?,
)],
None,
)
.await
}
async fn get_last_batch_number(&self) -> Result<Option<u64>, RollupStoreError> {
let mut rows = self.query("SELECT MAX(batch) FROM state_roots", ()).await?;
rows.next()
.await?
.map(|row| read_from_row_int(&row, 0))
.transpose()
}
async fn store_prover_input_by_batch_and_version(
&self,
batch_number: u64,
prover_version: &str,
prover_input: ProverInputData,
) -> Result<(), RollupStoreError> {
self.store_prover_input_by_batch_and_version_in_tx(
batch_number,
prover_version,
prover_input,
None,
)
.await
}
async fn get_prover_input_by_batch_and_version(
&self,
batch_number: u64,
prover_version: &str,
) -> Result<Option<ProverInputData>, RollupStoreError> {
let mut rows = self
.query(
"SELECT prover_input FROM batch_prover_input WHERE batch = ?1 AND prover_version = ?2",
(batch_number, prover_version),
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 0)?;
let prover_input = rkyv::from_bytes::<ProverInputData, rkyv::rancor::Error>(&vec)
.map_err(|e| {
RollupStoreError::Custom(format!(
"Failed to deserialize prover input for batch {batch_number} and version {prover_version}: {e}",
))
})?;
return Ok(Some(prover_input));
}
Ok(None)
}
async fn store_fee_config_by_block(
&self,
block_number: BlockNumber,
fee_config: FeeConfig,
) -> Result<(), RollupStoreError> {
let serialized = bincode::serialize(&fee_config)?;
let queries = vec![
(
"DELETE FROM fee_config WHERE block_number = ?1",
vec![block_number].into_params()?,
),
(
"INSERT INTO fee_config VALUES (?1, ?2)",
(block_number, serialized).into_params()?,
),
];
self.execute_in_tx(queries, None).await
}
async fn get_fee_config_by_block(
&self,
block_number: BlockNumber,
) -> Result<Option<FeeConfig>, RollupStoreError> {
let mut rows = self
.query(
"SELECT * FROM fee_config WHERE block_number = ?1",
vec![block_number],
)
.await?;
if let Some(row) = rows.next().await? {
let vec = read_from_row_blob(&row, 1)?;
return Ok(Some(bincode::deserialize(&vec)?));
}
Ok(None)
}
}