use std::{error, fmt, time::Instant};
use anyhow::Context;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_storage::RocksDB;
use zksync_types::L1BatchNumber;
use crate::{RocksdbStorage, RocksdbStorageOptions, StateKeeperColumnFamily};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct InitialRocksdbState {
pub l1_batch_number: Option<L1BatchNumber>,
}
#[derive(Debug)]
pub struct AsyncCatchupFailed(());
impl fmt::Display for AsyncCatchupFailed {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("Async RocksDB cache catchup failed or was canceled")
}
}
impl error::Error for AsyncCatchupFailed {}
type AsyncOnceCell<T> = watch::Receiver<Option<T>>;
#[derive(Debug)]
pub struct RocksdbCell {
initial_state: AsyncOnceCell<InitialRocksdbState>,
db: AsyncOnceCell<RocksDB<StateKeeperColumnFamily>>,
}
impl RocksdbCell {
#[allow(clippy::missing_panics_doc)] pub async fn wait(&self) -> Result<RocksDB<StateKeeperColumnFamily>, AsyncCatchupFailed> {
self.db
.clone()
.wait_for(Option::is_some)
.await
.map(|db| db.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}
pub fn get(&self) -> Option<RocksDB<StateKeeperColumnFamily>> {
self.db.borrow().clone()
}
#[allow(clippy::missing_panics_doc)] pub async fn ensure_initialized(&self) -> Result<InitialRocksdbState, AsyncCatchupFailed> {
self.initial_state
.clone()
.wait_for(Option::is_some)
.await
.map(|state| state.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}
}
#[derive(Debug)]
pub struct AsyncCatchupTask {
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
initial_state_sender: watch::Sender<Option<InitialRocksdbState>>,
db_sender: watch::Sender<Option<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
}
impl AsyncCatchupTask {
pub fn new(pool: ConnectionPool<Core>, state_keeper_db_path: String) -> (Self, RocksdbCell) {
let (initial_state_sender, initial_state) = watch::channel(None);
let (db_sender, db) = watch::channel(None);
let this = Self {
pool,
state_keeper_db_path,
state_keeper_db_options: RocksdbStorageOptions::default(),
initial_state_sender,
db_sender,
to_l1_batch_number: None,
};
(this, RocksdbCell { initial_state, db })
}
#[must_use]
pub fn with_db_options(mut self, options: RocksdbStorageOptions) -> Self {
self.state_keeper_db_options = options;
self
}
#[must_use]
pub fn with_target_l1_batch_number(mut self, number: L1BatchNumber) -> Self {
self.to_l1_batch_number = Some(number);
self
}
#[tracing::instrument(name = "catch_up", skip_all, fields(target_l1_batch = ?self.to_l1_batch_number))]
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::info!("Catching up RocksDB asynchronously");
let mut rocksdb_builder = RocksdbStorage::builder_with_options(
self.state_keeper_db_path.as_ref(),
self.state_keeper_db_options,
)
.await
.context("Failed creating RocksDB storage builder")?;
let initial_state = InitialRocksdbState {
l1_batch_number: rocksdb_builder.l1_batch_number().await,
};
tracing::info!("Initialized RocksDB catchup from state: {initial_state:?}");
self.initial_state_sender.send_replace(Some(initial_state));
let mut connection = self.pool.connection_tagged("state_keeper").await?;
let was_recovered_from_snapshot = rocksdb_builder
.ensure_ready(&mut connection, &stop_receiver)
.await
.context("failed initializing state keeper RocksDB from snapshot or scratch")?;
if was_recovered_from_snapshot {
let elapsed = started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache]
.set(elapsed);
tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}");
}
let rocksdb = rocksdb_builder
.synchronize(&mut connection, &stop_receiver, self.to_l1_batch_number)
.await
.context("Failed to catch up RocksDB to Postgres")?;
drop(connection);
if let Some(rocksdb) = rocksdb {
self.db_sender.send_replace(Some(rocksdb.into_rocksdb()));
} else {
tracing::info!("Synchronizing RocksDB interrupted");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use test_casing::test_casing;
use zksync_types::L2BlockNumber;
use super::*;
use crate::{
test_utils::{create_l1_batch, create_l2_block, gen_storage_logs, prepare_postgres},
RocksdbStorageBuilder,
};
#[tokio::test]
async fn catching_up_basics() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);
let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));
let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, None);
let db = rocksdb_cell.wait().await.unwrap();
assert_eq!(
RocksdbStorageBuilder::from_rocksdb(db)
.l1_batch_number()
.await,
Some(L1BatchNumber(2))
);
task_handle.await.unwrap().unwrap();
drop(rocksdb_cell);
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool, temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));
let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, Some(L1BatchNumber(2)));
task_handle.await.unwrap().unwrap();
rocksdb_cell.get().unwrap(); }
#[derive(Debug)]
enum CancellationScenario {
DropTask,
CancelTask,
}
impl CancellationScenario {
const ALL: [Self; 2] = [Self::DropTask, Self::CancelTask];
}
#[test_casing(2, CancellationScenario::ALL)]
#[tokio::test]
async fn catching_up_cancellation(scenario: CancellationScenario) {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);
let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (stop_sender, stop_receiver) = watch::channel(false);
match scenario {
CancellationScenario::DropTask => drop(task),
CancellationScenario::CancelTask => {
stop_sender.send_replace(true);
task.run(stop_receiver).await.unwrap();
}
}
assert!(rocksdb_cell.get().is_none());
rocksdb_cell.wait().await.unwrap_err();
}
}