use std::{collections::HashMap, mem, time::Duration};
use rand::{
rngs::StdRng,
seq::{IteratorRandom, SliceRandom},
Rng, SeedableRng,
};
use zksync_dal::ConnectionPool;
use zksync_types::StorageLog;
use super::*;
use crate::test_utils::{create_l1_batch, create_l2_block, gen_storage_logs, prepare_postgres};
fn test_postgres_storage_basics(
pool: &ConnectionPool<Core>,
rt_handle: Handle,
cache_initial_writes: bool,
) {
let mut connection = rt_handle.block_on(pool.connection()).unwrap();
rt_handle.block_on(prepare_postgres(&mut connection));
let mut storage = PostgresStorage::new(rt_handle, connection, L2BlockNumber(0), true);
if cache_initial_writes {
let caches = PostgresStorageCaches::new(1_024, 1_024);
storage = storage.with_caches(caches);
}
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(0));
let existing_logs = gen_storage_logs(0..20);
for log in &existing_logs {
assert!(!storage.is_write_initial(&log.key));
}
let non_existing_logs = gen_storage_logs(20..30);
for log in &non_existing_logs {
assert!(storage.is_write_initial(&log.key));
}
if cache_initial_writes {
let caches = storage.caches.as_ref().unwrap();
assert!(caches.initial_writes.estimated_len() > 0);
}
storage.rt_handle.block_on(create_l2_block(
&mut storage.connection,
L2BlockNumber(1),
non_existing_logs.clone(),
));
for log in &non_existing_logs {
assert!(storage.is_write_initial(&log.key));
}
let caches = mem::take(&mut storage.caches);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
true,
);
storage.caches = caches;
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(1));
for log in &non_existing_logs {
assert!(storage.is_write_initial(&log.key));
}
storage.rt_handle.block_on(create_l1_batch(
&mut storage.connection,
L1BatchNumber(1),
&non_existing_logs,
));
let caches = mem::take(&mut storage.caches);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(0),
true,
);
storage.caches = caches;
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(0));
for log in &non_existing_logs {
assert!(storage.is_write_initial(&log.key));
}
let caches = mem::take(&mut storage.caches);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
true,
);
storage.caches = caches;
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(1));
for log in &non_existing_logs {
assert!(!storage.is_write_initial(&log.key));
}
let caches = mem::take(&mut storage.caches);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
false,
);
storage.caches = caches;
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(1));
for log in &non_existing_logs {
assert!(storage.is_write_initial(&log.key));
}
for log in &existing_logs {
assert!(!storage.is_write_initial(&log.key));
}
}
#[tokio::test]
async fn postgres_storage_basics() {
let pool = ConnectionPool::<Core>::test_pool().await;
tokio::task::spawn_blocking(move || {
test_postgres_storage_basics(&pool, Handle::current(), false);
})
.await
.unwrap();
}
#[tokio::test]
async fn postgres_storage_with_initial_writes_cache() {
let pool = ConnectionPool::<Core>::test_pool().await;
tokio::task::spawn_blocking(move || {
test_postgres_storage_basics(&pool, Handle::current(), true);
})
.await
.unwrap();
}
fn test_postgres_storage_after_sealing_l2_block(
pool: &ConnectionPool<Core>,
rt_handle: Handle,
consider_new_l1_batch: bool,
) {
let mut connection = rt_handle.block_on(pool.connection()).unwrap();
rt_handle.block_on(prepare_postgres(&mut connection));
let new_logs = gen_storage_logs(20..30);
rt_handle.block_on(create_l2_block(
&mut connection,
L2BlockNumber(1),
new_logs.clone(),
));
let mut storage = PostgresStorage::new(
rt_handle,
connection,
L2BlockNumber(1),
consider_new_l1_batch,
);
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(1));
storage.rt_handle.block_on(create_l1_batch(
&mut storage.connection,
L1BatchNumber(1),
&new_logs,
));
for log in &new_logs {
assert_eq!(storage.is_write_initial(&log.key), !consider_new_l1_batch);
}
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
consider_new_l1_batch,
);
assert_eq!(storage.l1_batch_number_for_l2_block, L1BatchNumber(1));
for log in &new_logs {
assert_eq!(storage.is_write_initial(&log.key), !consider_new_l1_batch);
}
}
#[tokio::test]
async fn postgres_storage_after_sealing_l2_block() {
let pool = ConnectionPool::<Core>::test_pool().await;
tokio::task::spawn_blocking(move || {
println!("Considering new L1 batch");
test_postgres_storage_after_sealing_l2_block(&pool, Handle::current(), true);
println!("Not considering new L1 batch");
test_postgres_storage_after_sealing_l2_block(&pool, Handle::current(), false);
})
.await
.unwrap();
}
fn test_factory_deps_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
let mut connection = rt_handle.block_on(pool.connection()).unwrap();
rt_handle.block_on(prepare_postgres(&mut connection));
let caches = PostgresStorageCaches::new(128 * 1_024 * 1_024, 1_024);
let mut storage = PostgresStorage::new(rt_handle, connection, L2BlockNumber(1), true)
.with_caches(caches.clone());
let zero_addr = H256::zero();
let dep = storage.load_factory_dep(zero_addr);
assert_eq!(dep, None);
assert_eq!(caches.factory_deps.get(&zero_addr), None);
let mut contracts = HashMap::new();
contracts.insert(H256::zero(), vec![1, 2, 3]);
storage
.rt_handle
.block_on(
storage
.connection
.factory_deps_dal()
.insert_factory_deps(L2BlockNumber(0), &contracts),
)
.unwrap();
let mut contracts = HashMap::new();
contracts.insert(H256::from_low_u64_be(1), vec![1, 2, 3, 4]);
storage
.rt_handle
.block_on(
storage
.connection
.factory_deps_dal()
.insert_factory_deps(L2BlockNumber(1), &contracts),
)
.unwrap();
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
true,
)
.with_caches(caches.clone());
let dep = storage.load_factory_dep(zero_addr);
assert_eq!(dep, Some(vec![1, 2, 3]));
assert_eq!(
caches.factory_deps.get(&zero_addr),
Some(TimestampedFactoryDep {
bytecode: vec![1, 2, 3],
inserted_at: L2BlockNumber(0)
})
);
let dep = storage.load_factory_dep(H256::from_low_u64_be(1));
assert_eq!(dep, Some(vec![1, 2, 3, 4]));
assert_eq!(
caches.factory_deps.get(&H256::from_low_u64_be(1)),
Some(TimestampedFactoryDep {
bytecode: vec![1, 2, 3, 4],
inserted_at: L2BlockNumber(1)
})
);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(0),
true,
)
.with_caches(caches.clone());
let dep = storage.load_factory_dep(zero_addr);
assert_eq!(dep, Some(vec![1, 2, 3]));
let dep = storage.load_factory_dep(H256::from_low_u64_be(1));
assert!(dep.is_none());
}
#[tokio::test]
async fn using_factory_deps_cache() {
let pool = ConnectionPool::<Core>::test_pool().await;
let handle = Handle::current();
tokio::task::spawn_blocking(move || test_factory_deps_cache(&pool, handle))
.await
.unwrap();
}
fn test_initial_writes_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
let connection = rt_handle.block_on(pool.connection()).unwrap();
let caches = PostgresStorageCaches::new(1_024, 4 * 1_024 * 1_024);
let mut storage = PostgresStorage::new(rt_handle, connection, L2BlockNumber(0), false)
.with_caches(caches.clone());
assert_eq!(storage.pending_l1_batch_number, L1BatchNumber(0));
storage
.rt_handle
.block_on(prepare_postgres(&mut storage.connection));
let mut logs = gen_storage_logs(100..120);
let non_existing_key = logs[19].key;
logs.truncate(10);
assert!(storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
assert_eq!(
caches
.negative_initial_writes
.get(&logs[0].key.hashed_key()),
Some(L1BatchNumber(0))
);
assert_eq!(
caches
.negative_initial_writes
.get(&non_existing_key.hashed_key()),
Some(L1BatchNumber(0))
);
assert!(storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
storage.rt_handle.block_on(create_l2_block(
&mut storage.connection,
L2BlockNumber(1),
logs.clone(),
));
storage.rt_handle.block_on(create_l1_batch(
&mut storage.connection,
L1BatchNumber(1),
&logs,
));
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
false,
)
.with_caches(caches.clone());
assert!(storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
assert_eq!(
caches.initial_writes.get(&logs[0].key.hashed_key()),
Some(L1BatchNumber(1))
);
assert_eq!(
caches
.negative_initial_writes
.get(&logs[0].key.hashed_key()),
None
);
assert_eq!(
caches
.negative_initial_writes
.get(&non_existing_key.hashed_key()),
Some(L1BatchNumber(2))
);
assert!(storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
true,
)
.with_caches(caches.clone());
assert!(!storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
assert_eq!(
caches.initial_writes.get(&logs[0].key.hashed_key()),
Some(L1BatchNumber(1))
);
assert_eq!(
caches
.negative_initial_writes
.get(&non_existing_key.hashed_key()),
Some(L1BatchNumber(2))
);
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(2),
false,
)
.with_caches(caches);
assert!(!storage.is_write_initial(&logs[0].key));
assert!(storage.is_write_initial(&non_existing_key));
}
#[tokio::test]
async fn using_initial_writes_cache() {
let pool = ConnectionPool::<Core>::test_pool().await;
let handle = Handle::current();
tokio::task::spawn_blocking(move || test_initial_writes_cache(&pool, handle))
.await
.unwrap();
}
#[derive(Debug)]
struct ValueCacheAssertions<'a> {
cache: &'a ValuesCache,
l2_block_number: L2BlockNumber,
}
impl ValueCacheAssertions<'_> {
fn assert_entries(&self, expected_entries: &[(StorageKey, Option<StorageValue>)]) {
for (key, expected_value) in expected_entries {
assert_eq!(
self.cache.get(self.l2_block_number, key.hashed_key()),
*expected_value
);
}
}
}
impl ValuesCache {
fn assertions(&self, l2_block_number: L2BlockNumber) -> ValueCacheAssertions<'_> {
ValueCacheAssertions {
cache: self,
l2_block_number,
}
}
}
async fn wait_for_cache_update(values_cache: &ValuesCache, target_l2_block: L2BlockNumber) {
tokio::time::timeout(Duration::from_secs(5), async {
loop {
let valid_for = values_cache.0.read().unwrap().valid_for;
assert!(valid_for <= target_l2_block, "{valid_for:?}");
if valid_for == target_l2_block {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for cache update");
}
fn test_values_cache(pool: &ConnectionPool<Core>, rt_handle: Handle) {
let mut caches = PostgresStorageCaches::new(1_024, 1_024);
let task = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
let (stop_sender, stop_receiver) = watch::channel(false);
let update_task_handle = tokio::task::spawn(task.run(stop_receiver));
let values_cache = caches.values.as_ref().unwrap().cache.clone();
let old_l2_block_assertions = values_cache.assertions(L2BlockNumber(0));
let new_l2_block_assertions = values_cache.assertions(L2BlockNumber(1));
let mut connection = rt_handle.block_on(pool.connection()).unwrap();
rt_handle.block_on(prepare_postgres(&mut connection));
let mut storage = PostgresStorage::new(rt_handle, connection, L2BlockNumber(0), false)
.with_caches(caches.clone());
let initial_logs = gen_storage_logs(0..20);
let existing_key = initial_logs[1].key;
let unmodified_key = initial_logs[2].key;
let initial_value = storage.read_value(&existing_key);
assert!(!initial_value.is_zero());
let unmodified_value = storage.read_value(&unmodified_key);
assert!(!unmodified_value.is_zero());
let non_existing_key = gen_storage_logs(100..120)[0].key;
let value = storage.read_value(&non_existing_key);
assert_eq!(value, StorageValue::zero());
old_l2_block_assertions.assert_entries(&[
(existing_key, Some(initial_value)),
(unmodified_key, Some(unmodified_value)),
(non_existing_key, Some(H256::zero())),
]);
let logs = vec![
StorageLog::new_write_log(existing_key, H256::repeat_byte(1)),
StorageLog::new_write_log(non_existing_key, H256::repeat_byte(2)),
];
storage.rt_handle.block_on(create_l2_block(
&mut storage.connection,
L2BlockNumber(1),
logs,
));
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(1),
true,
)
.with_caches(caches.clone());
assert_eq!(storage.read_value(&existing_key), H256::repeat_byte(1));
assert_eq!(storage.read_value(&non_existing_key), H256::repeat_byte(2));
assert_eq!(storage.read_value(&unmodified_key), unmodified_value);
new_l2_block_assertions.assert_entries(&[
(existing_key, None),
(unmodified_key, None),
(non_existing_key, None),
]);
old_l2_block_assertions.assert_entries(&[
(existing_key, Some(initial_value)),
(non_existing_key, Some(H256::zero())),
]);
caches.schedule_values_update(L2BlockNumber(1));
storage
.rt_handle
.block_on(wait_for_cache_update(&values_cache, L2BlockNumber(1)));
assert_eq!(storage.read_value(&existing_key), H256::repeat_byte(1));
assert_eq!(storage.read_value(&non_existing_key), H256::repeat_byte(2));
assert_eq!(storage.read_value(&unmodified_key), unmodified_value);
let assert_final_cache = || {
new_l2_block_assertions.assert_entries(&[
(existing_key, Some(H256::repeat_byte(1))),
(non_existing_key, Some(H256::repeat_byte(2))),
(unmodified_key, Some(unmodified_value)),
]);
old_l2_block_assertions.assert_entries(&[
(existing_key, None),
(non_existing_key, None),
(unmodified_key, Some(unmodified_value)),
]);
};
assert_final_cache();
let mut storage = PostgresStorage::new(
storage.rt_handle,
storage.connection,
L2BlockNumber(0),
true,
)
.with_caches(caches.clone());
assert_eq!(storage.read_value(&existing_key), initial_value);
assert_eq!(storage.read_value(&non_existing_key), StorageValue::zero());
assert_eq!(storage.read_value(&unmodified_key), unmodified_value);
assert_final_cache();
stop_sender.send_replace(true);
storage
.rt_handle
.block_on(update_task_handle)
.expect("update task panicked")
.unwrap();
caches.schedule_values_update(L2BlockNumber(2));
}
#[tokio::test]
async fn using_values_cache() {
let pool = ConnectionPool::<Core>::test_pool().await;
let handle = Handle::current();
tokio::task::spawn_blocking(move || test_values_cache(&pool, handle))
.await
.unwrap();
}
fn mini_fuzz_values_cache_inner(
rng: &mut impl Rng,
pool: &ConnectionPool<Core>,
mut rt_handle: Handle,
) {
let mut caches = PostgresStorageCaches::new(1_024, 1_024);
let _ = caches.configure_storage_values_cache(1_024 * 1_024, pool.clone());
let values_cache = caches.values.as_ref().unwrap().cache.clone();
let mut connection = rt_handle.block_on(pool.connection()).unwrap();
rt_handle.block_on(prepare_postgres(&mut connection));
let queried_keys: Vec<_> = gen_storage_logs(0..100)
.into_iter()
.map(|log| log.key)
.collect();
for latest_block_number in 0..=10 {
let mut all_block_numbers: Vec<_> = (0..=latest_block_number).map(L2BlockNumber).collect();
all_block_numbers.shuffle(rng);
let mut cache_updated = latest_block_number == 0;
for block_number in all_block_numbers {
if !cache_updated && rng.gen_range(0..3) == 0 {
let cache_valid_for = values_cache.valid_for();
assert!(cache_valid_for < L2BlockNumber(latest_block_number));
rt_handle
.block_on(values_cache.update(
cache_valid_for,
L2BlockNumber(latest_block_number),
&mut connection,
))
.unwrap();
cache_updated = true;
}
let mut queried_keys = queried_keys.clone();
queried_keys.shuffle(rng);
let mut uncached_storage =
PostgresStorage::new(rt_handle, connection, block_number, false);
let uncached_storage_output: Vec<_> = queried_keys
.iter()
.map(|key| uncached_storage.read_value(key))
.collect();
rt_handle = uncached_storage.rt_handle;
connection = uncached_storage.connection;
let mut cached_storage =
PostgresStorage::new(rt_handle, connection, block_number, false)
.with_caches(caches.clone());
let cached_storage_output: Vec<_> = queried_keys
.iter()
.map(|key| cached_storage.read_value(key))
.collect();
rt_handle = cached_storage.rt_handle;
connection = cached_storage.connection;
assert_eq!(
uncached_storage_output, cached_storage_output,
"Outputs differ for {block_number:?} with latest {latest_block_number:?}"
);
}
let next_block_number = L2BlockNumber(latest_block_number) + 1;
let logs = queried_keys
.iter()
.choose_multiple(rng, 20)
.into_iter()
.map(|&key| {
let new_value = H256::from_low_u64_be(next_block_number.0.into());
StorageLog::new_write_log(key, new_value)
})
.collect();
rt_handle.block_on(create_l2_block(&mut connection, next_block_number, logs));
}
}
#[tokio::test]
async fn mini_fuzz_values_cache() {
const RNG_SEED: u64 = 123;
let pool = ConnectionPool::<Core>::test_pool().await;
let handle = Handle::current();
let mut rng = StdRng::seed_from_u64(RNG_SEED);
tokio::task::spawn_blocking(move || mini_fuzz_values_cache_inner(&mut rng, &pool, handle))
.await
.unwrap();
}