zksync_state 0.1.0

ZKsync state keeper state
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
use std::{
    mem,
    sync::{Arc, RwLock},
    time::Duration,
};

use anyhow::Context as _;
use backon::{BlockingRetryable, ConstantBuilder};
use tokio::{
    runtime::Handle,
    sync::{
        mpsc::{self, UnboundedReceiver},
        watch,
    },
};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_types::{L1BatchNumber, L2BlockNumber, StorageKey, StorageValue, H256};

use self::metrics::{Method, ValuesUpdateStage, CACHE_METRICS, STORAGE_METRICS};
use crate::{
    cache::{lru_cache::LruCache, CacheValue},
    ReadStorage,
};

mod metrics;
#[cfg(test)]
mod tests;

#[derive(Debug, Clone, PartialEq, Eq)]
struct TimestampedFactoryDep {
    bytecode: Vec<u8>,
    inserted_at: L2BlockNumber,
}

/// Type alias for smart contract source code cache.
type FactoryDepsCache = LruCache<H256, TimestampedFactoryDep>;

impl CacheValue<H256> for TimestampedFactoryDep {
    fn cache_weight(&self) -> u32 {
        (self.bytecode.len() + mem::size_of::<L2BlockNumber>())
            .try_into()
            .expect("Cached bytes are too large")
    }
}

/// Type alias for initial writes caches.
type InitialWritesCache = LruCache<H256, L1BatchNumber>;

impl CacheValue<H256> for L1BatchNumber {
    #[allow(clippy::cast_possible_truncation)] // doesn't happen in practice
    fn cache_weight(&self) -> u32 {
        const WEIGHT: usize = mem::size_of::<L1BatchNumber>() + mem::size_of::<H256>();
        // ^ Since values are small, we want to account for key sizes as well

        WEIGHT as u32
    }
}

/// [`StorageValue`] together with an L2 block "timestamp" starting from which it is known to be valid.
///
/// Using timestamped values in [`ValuesCache`] enables using it for past L2 block states. As long as
/// a cached value has a "timestamp" older or equal than the requested L2 block, the value can be used.
///
/// Timestamp is assigned to equal the latest L2 block when a value is fetched from the storage.
/// A value may be valid for earlier L2 blocks, but fetching the actual modification "timestamp"
/// would make the relevant Postgres query more complex.
#[derive(Debug, Clone, Copy)]
struct TimestampedStorageValue {
    value: StorageValue,
    loaded_at: L2BlockNumber,
}

impl CacheValue<H256> for TimestampedStorageValue {
    #[allow(clippy::cast_possible_truncation)] // doesn't happen in practice
    fn cache_weight(&self) -> u32 {
        const WEIGHT: usize = mem::size_of::<TimestampedStorageValue>() + mem::size_of::<H256>();
        // ^ Since values are small in size, we want to account for key sizes as well

        WEIGHT as u32
    }
}

#[derive(Debug)]
struct ValuesCacheInner {
    /// L2 block up to which `self.values` are valid. Has the same meaning as `l2_block_number`
    /// in `PostgresStorage` (i.e., the latest sealed L2 block for which storage logs should
    /// be taken into account).
    valid_for: L2BlockNumber,
    values: LruCache<H256, TimestampedStorageValue>,
}

/// Cache for the VM storage. Only caches values for a single VM storage snapshot, which logically
/// corresponds to the latest sealed L2 block in Postgres.
///
/// The cached snapshot can be updated, which will load changed storage keys from Postgres and remove
/// the (potentially stale) cached values for these keys.
///
/// # Why wrap the cache in `RwLock`?
///
/// We need to be sure that `valid_for` L2 block of the values cache has not changed while we are
/// loading or storing values in it. This is easiest to achieve using an `RwLock`. Note that
/// almost all cache ops require only shared access to the lock (including cache updates!); we only
/// need exclusive access when we are updating the `valid_for` L2 block. Further, the update itself
/// doesn't grab the lock until *after* the Postgres data has been loaded. (This works because we
/// know statically that there is a single thread updating the cache; hence, we have no contention
/// over updating the cache.) To summarize, `RwLock` should see barely any contention.
#[derive(Debug, Clone)]
struct ValuesCache(Arc<RwLock<ValuesCacheInner>>);

impl ValuesCache {
    fn new(capacity: u64) -> Self {
        let inner = ValuesCacheInner {
            valid_for: L2BlockNumber(0),
            values: LruCache::new("values_cache", capacity),
        };
        Self(Arc::new(RwLock::new(inner)))
    }

    /// *NB.* The returned value should be considered immediately stale; at best, it can be
    /// the lower boundary on the current `valid_for` value.
    fn valid_for(&self) -> L2BlockNumber {
        self.0.read().expect("values cache is poisoned").valid_for
    }

    /// Gets the cached value for `key` provided that the cache currently holds values
    /// for `l2_block_number`.
    fn get(&self, l2_block_number: L2BlockNumber, hashed_key: H256) -> Option<StorageValue> {
        let lock = self.0.read().expect("values cache is poisoned");
        if lock.valid_for < l2_block_number {
            // The request is from the future; we cannot say which values in the cache remain valid,
            // so we don't return *any* cached values.
            return None;
        }

        let timestamped_value = lock.values.get(&hashed_key)?;
        if timestamped_value.loaded_at <= l2_block_number {
            Some(timestamped_value.value)
        } else {
            None // The value is from the future
        }
    }

    /// Caches `value` for `key`, but only if the cache currently holds values for `l2_block_number`.
    fn insert(&self, l2_block_number: L2BlockNumber, hashed_key: H256, value: StorageValue) {
        let lock = self.0.read().expect("values cache is poisoned");
        if lock.valid_for == l2_block_number {
            lock.values.insert(
                hashed_key,
                TimestampedStorageValue {
                    value,
                    loaded_at: l2_block_number,
                },
            );
        } else {
            CACHE_METRICS.stale_values.inc();
        }
    }

    async fn update(
        &self,
        from_l2_block: L2BlockNumber,
        to_l2_block: L2BlockNumber,
        connection: &mut Connection<'_, Core>,
    ) -> anyhow::Result<()> {
        const MAX_L2_BLOCKS_LAG: u32 = 5;

        tracing::debug!(
            "Updating storage values cache from L2 block {from_l2_block} to {to_l2_block}"
        );

        if to_l2_block.0 - from_l2_block.0 > MAX_L2_BLOCKS_LAG {
            // We can spend too much time loading data from Postgres, so we opt for an easier "update" route:
            // evict *everything* from cache and call it a day. This should not happen too often in practice.
            tracing::info!(
                "Storage values cache is too far behind (current L2 block is {from_l2_block}; \
                 requested update to {to_l2_block}); resetting the cache"
            );
            let mut lock = self
                .0
                .write()
                .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?;
            anyhow::ensure!(
                lock.valid_for == from_l2_block,
                "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \
                 valid for L2 block #{}",
                lock.valid_for
            );
            lock.valid_for = to_l2_block;
            lock.values.clear();

            CACHE_METRICS.values_emptied.inc();
        } else {
            let update_latency = CACHE_METRICS.values_update[&ValuesUpdateStage::LoadKeys].start();
            let l2_blocks = (from_l2_block + 1)..=to_l2_block;
            let modified_keys = connection
                .storage_logs_dal()
                .modified_keys_in_l2_blocks(l2_blocks.clone())
                .await?;

            let elapsed = update_latency.observe();
            CACHE_METRICS
                .values_update_modified_keys
                .observe(modified_keys.len());
            tracing::debug!(
                "Loaded {modified_keys_len} modified storage keys from L2 blocks {l2_blocks:?}; \
                 took {elapsed:?}",
                modified_keys_len = modified_keys.len()
            );

            let update_latency =
                CACHE_METRICS.values_update[&ValuesUpdateStage::RemoveStaleKeys].start();
            let mut lock = self
                .0
                .write()
                .map_err(|_| anyhow::anyhow!("values cache is poisoned"))?;
            // The code below holding onto the write `lock` is the only code that can theoretically poison the `RwLock`
            // (other than emptying the cache above). Thus, it's kept as simple and tight as possible.
            // E.g., we load data from Postgres beforehand.
            anyhow::ensure!(
                lock.valid_for == from_l2_block,
                "sanity check failed: values cache was expected to be valid for L2 block #{from_l2_block}, but it's actually \
                 valid for L2 block #{}",
                lock.valid_for
            );
            lock.valid_for = to_l2_block;
            for modified_key in &modified_keys {
                lock.values.remove(modified_key);
            }
            lock.values.report_size();
            drop(lock);
            update_latency.observe();
        }

        CACHE_METRICS
            .values_valid_for_miniblock
            .set(u64::from(to_l2_block.0));
        Ok(())
    }
}

#[derive(Debug, Clone)]
struct ValuesCacheAndUpdater {
    cache: ValuesCache,
    command_sender: mpsc::UnboundedSender<L2BlockNumber>,
}

/// Caches used during VM execution.
///
/// Currently, this struct includes the following caches:
///
/// - Cache for smart contract bytecodes (never invalidated, since it is content-addressable)
/// - Cache for L1 batch numbers of initial writes for storage keys (never invalidated, except after
///   reverting L1 batch execution)
/// - Cache of the VM storage snapshot corresponding to the latest sealed L2 block
#[derive(Debug, Clone)]
pub struct PostgresStorageCaches {
    factory_deps: FactoryDepsCache,
    initial_writes: InitialWritesCache,
    // Besides L1 batch numbers for initial writes, we also cache information that a certain key
    // was not written to before the certain L1 batch (i.e., this lower boundary is the cached value).
    //
    // This is caused by the observation that a significant part of `is_write_initial()` queries returns `true`
    // (i.e., the corresponding key was not written to).
    // If we don't cache this information, we'll query Postgres multiple times for the same key even if we know
    // it wasn't written to at the point that interests us.
    negative_initial_writes: InitialWritesCache,
    values: Option<ValuesCacheAndUpdater>,
}

impl PostgresStorageCaches {
    /// Creates caches with the specified capacities measured in bytes.
    pub fn new(factory_deps_capacity: u64, initial_writes_capacity: u64) -> Self {
        tracing::debug!(
            "Initialized VM execution cache with {factory_deps_capacity}B capacity for factory deps, \
             {initial_writes_capacity}B capacity for initial writes"
        );

        Self {
            factory_deps: FactoryDepsCache::new("factory_deps_cache", factory_deps_capacity),
            initial_writes: InitialWritesCache::new(
                "initial_writes_cache",
                initial_writes_capacity / 2,
            ),
            negative_initial_writes: InitialWritesCache::new(
                "negative_initial_writes_cache",
                initial_writes_capacity / 2,
            ),
            values: None,
        }
    }

    /// Configures the VM storage values cache. The returned closure is the background task that will update
    /// the cache according to [`Self::schedule_values_update()`] calls. It should be spawned on a separate thread
    /// or a blocking Tokio task.
    ///
    /// # Panics
    ///
    /// Panics if provided `capacity` is zero. (Check on the caller side beforehand if there is
    /// such possibility.)
    pub fn configure_storage_values_cache(
        &mut self,
        capacity: u64,
        connection_pool: ConnectionPool<Core>,
    ) -> PostgresStorageCachesTask {
        assert!(
            capacity > 0,
            "Storage values cache capacity must be positive"
        );
        tracing::debug!("Initializing VM storage values cache with {capacity}B capacity");

        let (command_sender, command_receiver) = mpsc::unbounded_channel();
        let values_cache = ValuesCache::new(capacity);
        self.values = Some(ValuesCacheAndUpdater {
            cache: values_cache.clone(),
            command_sender,
        });

        // We want to run updates in a separate task in order to not block VM execution on update
        // and keep contention over the `ValuesCache` lock as low as possible. As a downside,
        // `Self::schedule_values_update()` will produce some no-op update commands from concurrently
        // executing VM instances. Due to built-in filtering, this seems manageable.
        PostgresStorageCachesTask {
            connection_pool,
            values_cache,
            command_receiver,
        }
    }

    /// Schedules an update of the VM storage values cache to the specified L2 block. If the values cache is not configured,
    /// this is a no-op.
    ///
    /// # Panics
    ///
    /// - Panics if the cache update task returned from `configure_storage_values_cache()` has panicked.
    pub fn schedule_values_update(&self, to_l2_block: L2BlockNumber) {
        let Some(values) = &self.values else {
            return;
        };
        if values.cache.valid_for() < to_l2_block {
            // Filter out no-op updates right away in order to not store lots of them in RAM.
            // Since the task updating the values cache (`PostgresStorageCachesTask`) is cancel-aware,
            // it can stop before some of `schedule_values_update()` calls; in this case, it's OK
            // to ignore the updates.
            values.command_sender.send(to_l2_block).ok();
        }
    }
}

/// An asynchronous task that updates the VM storage values cache.
#[derive(Debug)]
pub struct PostgresStorageCachesTask {
    connection_pool: ConnectionPool<Core>,
    values_cache: ValuesCache,
    command_receiver: UnboundedReceiver<L2BlockNumber>,
}

impl PostgresStorageCachesTask {
    /// Runs the task.
    ///
    /// ## Errors
    ///
    /// - Propagates Postgres errors.
    /// - Propagates errors from the cache update task.
    pub async fn run(mut self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
        let mut current_l2_block = self.values_cache.valid_for();
        loop {
            tokio::select! {
                _ = stop_receiver.changed() => {
                    break;
                }
                Some(to_l2_block) = self.command_receiver.recv() => {
                    if to_l2_block <= current_l2_block {
                        continue;
                    }
                    let mut connection = self
                        .connection_pool
                        .connection_tagged("values_cache_updater")
                        .await?;
                    self.values_cache
                        .update(current_l2_block, to_l2_block, &mut connection)
                        .await?;
                    current_l2_block = to_l2_block;
                }
                else => {
                    // The command sender has been dropped, which means that we must receive the stop signal soon.
                    stop_receiver.changed().await?;
                    break;
                }
            }
        }
        Ok(())
    }
}

/// [`ReadStorage`] implementation backed by the Postgres database.
#[derive(Debug)]
pub struct PostgresStorage<'a> {
    rt_handle: Handle,
    connection: Connection<'a, Core>,
    l2_block_number: L2BlockNumber,
    l1_batch_number_for_l2_block: L1BatchNumber,
    pending_l1_batch_number: L1BatchNumber,
    consider_new_l1_batch: bool,
    caches: Option<PostgresStorageCaches>,
}

impl<'a> PostgresStorage<'a> {
    /// Creates a new storage using the specified connection.
    ///
    /// # Panics
    ///
    /// Panics on Postgres errors.
    pub fn new(
        rt_handle: Handle,
        connection: Connection<'a, Core>,
        block_number: L2BlockNumber,
        consider_new_l1_batch: bool,
    ) -> Self {
        rt_handle
            .clone()
            .block_on(Self::new_async(
                rt_handle,
                connection,
                block_number,
                consider_new_l1_batch,
            ))
            .unwrap()
    }

    /// Asynchronous version of [`Self::new()`] that also propagates errors instead of panicking.
    ///
    /// # Errors
    ///
    /// Propagates Postgres errors.
    pub async fn new_async(
        rt_handle: Handle,
        mut connection: Connection<'a, Core>,
        block_number: L2BlockNumber,
        consider_new_l1_batch: bool,
    ) -> anyhow::Result<PostgresStorage<'a>> {
        let resolved = connection
            .storage_web3_dal()
            .resolve_l1_batch_number_of_l2_block(block_number)
            .await
            .with_context(|| {
                format!("failed resolving L1 batch number for L2 block #{block_number}")
            })?;
        Ok(Self {
            rt_handle,
            connection,
            l2_block_number: block_number,
            l1_batch_number_for_l2_block: resolved.expected_l1_batch(),
            pending_l1_batch_number: resolved.pending_l1_batch,
            consider_new_l1_batch,
            caches: None,
        })
    }

    /// Sets the caches to use with the storage.
    #[must_use]
    pub fn with_caches(self, caches: PostgresStorageCaches) -> Self {
        Self {
            caches: Some(caches),
            ..self
        }
    }

    /// This method is expected to be called for each write that was found in the database, and it decides
    /// whether the change is initial or not. Even if a change is present in the DB, in some cases we would not consider it.
    /// For example, in API we always represent the state at the beginning of an L1 batch, so we discard all the writes
    /// that happened at the same batch or later (for historical `eth_call` requests).
    fn write_counts(&self, write_l1_batch_number: L1BatchNumber) -> bool {
        if self.consider_new_l1_batch {
            self.l1_batch_number_for_l2_block >= write_l1_batch_number
        } else {
            self.l1_batch_number_for_l2_block > write_l1_batch_number
        }
    }

    fn values_cache(&self) -> Option<&ValuesCache> {
        Some(&self.caches.as_ref()?.values.as_ref()?.cache)
    }
}

impl ReadStorage for PostgresStorage<'_> {
    fn read_value(&mut self, key: &StorageKey) -> StorageValue {
        let hashed_key = key.hashed_key();
        let latency = STORAGE_METRICS.storage[&Method::ReadValue].start();
        let values_cache = self.values_cache();
        let cached_value =
            values_cache.and_then(|cache| cache.get(self.l2_block_number, hashed_key));

        let value = cached_value.unwrap_or_else(|| {
            const RETRY_INTERVAL: Duration = Duration::from_millis(500);
            const MAX_TRIES: usize = 20;

            let mut dal = self.connection.storage_web3_dal();
            let value = (|| {
                self.rt_handle
                    .block_on(dal.get_historical_value_unchecked(hashed_key, self.l2_block_number))
            })
            .retry(
                &ConstantBuilder::default()
                    .with_delay(RETRY_INTERVAL)
                    .with_max_times(MAX_TRIES),
            )
            .when(|e| {
                e.inner()
                    .as_database_error()
                    .is_some_and(|e| e.message() == "canceling statement due to statement timeout")
            })
            .call()
            .expect("Failed executing `read_value`");
            if let Some(cache) = self.values_cache() {
                cache.insert(self.l2_block_number, hashed_key, value);
            }
            value
        });

        latency.observe();
        value
    }

    fn is_write_initial(&mut self, key: &StorageKey) -> bool {
        let hashed_key = key.hashed_key();
        let latency = STORAGE_METRICS.storage[&Method::IsWriteInitial].start();
        let caches = self.caches.as_ref();
        let cached_value = caches.and_then(|caches| caches.initial_writes.get(&hashed_key));

        if cached_value.is_none() {
            // Write is absent in positive cache, check whether it's present in the negative cache.
            let cached_value =
                caches.and_then(|caches| caches.negative_initial_writes.get(&hashed_key));
            if let Some(min_l1_batch_for_initial_write) = cached_value {
                // We know that this slot was certainly not touched before `min_l1_batch_for_initial_write`.
                // Try to use this knowledge to decide if the change is certainly initial.
                // This is based on the hypothetical worst-case scenario, in which the key was
                // written to at the earliest possible L1 batch (i.e., `min_l1_batch_for_initial_write`).
                if !self.write_counts(min_l1_batch_for_initial_write) {
                    CACHE_METRICS.effective_values.inc();
                    return true;
                }
            }
        }

        let l1_batch_number = cached_value.or_else(|| {
            let mut dal = self.connection.storage_web3_dal();
            let value = self
                .rt_handle
                .block_on(dal.get_l1_batch_number_for_initial_write(hashed_key))
                .expect("Failed executing `is_write_initial`");

            if let Some(caches) = &self.caches {
                if let Some(l1_batch_number) = value {
                    caches.negative_initial_writes.remove(&hashed_key);
                    caches.initial_writes.insert(hashed_key, l1_batch_number);
                } else {
                    caches
                        .negative_initial_writes
                        .insert(hashed_key, self.pending_l1_batch_number);
                    // The pending L1 batch might have been sealed since its number was requested from Postgres
                    // in `Self::new()`, so this is a somewhat conservative estimate.
                }
            }
            value
        });
        latency.observe();

        let contains_key = l1_batch_number.map_or(false, |initial_write_l1_batch_number| {
            self.write_counts(initial_write_l1_batch_number)
        });
        !contains_key
    }

    fn load_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
        let latency = STORAGE_METRICS.storage[&Method::LoadFactoryDep].start();

        let cached_value = self
            .caches
            .as_ref()
            .and_then(|caches| caches.factory_deps.get(&hash));

        let value = cached_value.or_else(|| {
            let mut dal = self.connection.storage_web3_dal();
            let value = self
                .rt_handle
                .block_on(dal.get_factory_dep(hash))
                .expect("Failed executing `load_factory_dep`")
                .map(|(bytecode, inserted_at)| TimestampedFactoryDep {
                    bytecode,
                    inserted_at,
                });

            if let Some(caches) = &self.caches {
                // If we receive None, we won't cache it.
                if let Some(value) = value.clone() {
                    caches.factory_deps.insert(hash, value);
                }
            };

            value
        });

        latency.observe();
        Some(
            value
                .filter(|dep| dep.inserted_at <= self.l2_block_number)?
                .bytecode,
        )
    }

    fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
        let hashed_key = key.hashed_key();
        let mut dal = self.connection.storage_logs_dedup_dal();
        let value = self.rt_handle.block_on(
            dal.get_enumeration_index_in_l1_batch(hashed_key, self.l1_batch_number_for_l2_block),
        );
        value.expect("failed getting enumeration index for key")
    }
}