zebra_state/service/finalized_state/
disk_db.rs

1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//!   ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14    collections::{BTreeMap, HashMap},
15    fmt::{Debug, Write},
16    fs,
17    ops::RangeBounds,
18    path::Path,
19    sync::{
20        atomic::{self, AtomicBool},
21        Arc,
22    },
23};
24
25use itertools::Itertools;
26use rlimit::increase_nofile_limit;
27
28use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
29use semver::Version;
30use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
31
32use crate::{
33    database_format_version_on_disk,
34    service::finalized_state::disk_format::{FromDisk, IntoDisk},
35    write_database_format_version_to_disk, Config,
36};
37
38use super::zebra_db::transparent::{
39    fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
40    BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
41};
42// Doc-only imports
43#[allow(unused_imports)]
44use super::{TypedColumnFamily, WriteTypedBatch};
45
46#[cfg(any(test, feature = "proptest-impl"))]
47mod tests;
48
49/// The [`rocksdb::ThreadMode`] used by the database.
50pub type DBThreadMode = rocksdb::SingleThreaded;
51
52/// The [`rocksdb`] database type, including thread mode.
53///
54/// Also the [`rocksdb::DBAccess`] used by database iterators.
55pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
56
57/// Wrapper struct to ensure low-level database access goes through the correct API.
58///
59/// `rocksdb` allows concurrent writes through a shared reference,
60/// so database instances are cloneable. When the final clone is dropped,
61/// the database is closed.
62///
63/// # Correctness
64///
65/// Reading transactions from the database using RocksDB iterators causes hangs.
66/// But creating iterators and reading the tip height works fine.
67///
68/// So these hangs are probably caused by holding column family locks to read:
69/// - multiple values, or
70/// - large values.
71///
72/// This bug might be fixed by moving database operations to blocking threads (#2188),
73/// so that they don't block the tokio executor.
74/// (Or it might be fixed by future RocksDB upgrades.)
75#[derive(Clone, Debug)]
76pub struct DiskDb {
77    // Configuration
78    //
79    // This configuration cannot be modified after the database is initialized,
80    // because some clones would have different values.
81    //
82    /// The configured database kind for this database.
83    db_kind: String,
84
85    /// The format version of the running Zebra code.
86    format_version_in_code: Version,
87
88    /// The configured network for this database.
89    network: Network,
90
91    /// The configured temporary database setting.
92    ///
93    /// If true, the database files are deleted on drop.
94    ephemeral: bool,
95
96    /// A boolean flag indicating whether the db format change task has finished
97    /// applying any format changes that may have been required.
98    finished_format_upgrades: Arc<AtomicBool>,
99
100    // Owned State
101    //
102    // Everything contained in this state must be shared by all clones, or read-only.
103    //
104    /// The shared inner RocksDB database.
105    ///
106    /// RocksDB allows reads and writes via a shared reference.
107    ///
108    /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
109    /// column family changes and [`Drop`] require exclusive access.
110    ///
111    /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
112    /// only [`Drop`] requires exclusive access.
113    db: Arc<DB>,
114}
115
116/// Wrapper struct to ensure low-level database writes go through the correct API.
117///
118/// [`rocksdb::WriteBatch`] is a batched set of database updates,
119/// which must be written to the database using `DiskDb::write(batch)`.
120#[must_use = "batches must be written to the database"]
121#[derive(Default)]
122pub struct DiskWriteBatch {
123    /// The inner RocksDB write batch.
124    batch: rocksdb::WriteBatch,
125}
126
127impl Debug for DiskWriteBatch {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("DiskWriteBatch")
130            .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
131            .finish()
132    }
133}
134
135impl PartialEq for DiskWriteBatch {
136    fn eq(&self, other: &Self) -> bool {
137        self.batch.data() == other.batch.data()
138    }
139}
140
141impl Eq for DiskWriteBatch {}
142
143/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
144///
145/// # Deprecation
146///
147/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
148//
149// TODO: replace uses of this trait with WriteTypedBatch,
150//       implement these methods directly on WriteTypedBatch, and delete the trait.
151pub trait WriteDisk {
152    /// Serialize and insert the given key and value into a rocksdb column family,
153    /// overwriting any existing `value` for `key`.
154    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
155    where
156        C: rocksdb::AsColumnFamilyRef,
157        K: IntoDisk + Debug,
158        V: IntoDisk;
159
160    /// Serialize and merge the given key and value into a rocksdb column family,
161    /// merging with any existing `value` for `key`.
162    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
163    where
164        C: rocksdb::AsColumnFamilyRef,
165        K: IntoDisk + Debug,
166        V: IntoDisk;
167
168    /// Remove the given key from a rocksdb column family, if it exists.
169    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
170    where
171        C: rocksdb::AsColumnFamilyRef,
172        K: IntoDisk + Debug;
173
174    /// Delete the given key range from a rocksdb column family, if it exists, including `from`
175    /// and excluding `until_strictly_before`.
176    //
177    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
178    //       see zs_range_iter() for an example of the edge cases
179    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
180    where
181        C: rocksdb::AsColumnFamilyRef,
182        K: IntoDisk + Debug;
183}
184
185/// # Deprecation
186///
187/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
188//
189// TODO: replace uses of these impls with WriteTypedBatch,
190//       implement these methods directly on WriteTypedBatch, and delete the trait.
191impl WriteDisk for DiskWriteBatch {
192    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
193    where
194        C: rocksdb::AsColumnFamilyRef,
195        K: IntoDisk + Debug,
196        V: IntoDisk,
197    {
198        let key_bytes = key.as_bytes();
199        let value_bytes = value.as_bytes();
200        self.batch.put_cf(cf, key_bytes, value_bytes);
201    }
202
203    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
204    where
205        C: rocksdb::AsColumnFamilyRef,
206        K: IntoDisk + Debug,
207        V: IntoDisk,
208    {
209        let key_bytes = key.as_bytes();
210        let value_bytes = value.as_bytes();
211        self.batch.merge_cf(cf, key_bytes, value_bytes);
212    }
213
214    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
215    where
216        C: rocksdb::AsColumnFamilyRef,
217        K: IntoDisk + Debug,
218    {
219        let key_bytes = key.as_bytes();
220        self.batch.delete_cf(cf, key_bytes);
221    }
222
223    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
224    //       see zs_range_iter() for an example of the edge cases
225    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
226    where
227        C: rocksdb::AsColumnFamilyRef,
228        K: IntoDisk + Debug,
229    {
230        let from_bytes = from.as_bytes();
231        let until_strictly_before_bytes = until_strictly_before.as_bytes();
232        self.batch
233            .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
234    }
235}
236
237// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
238impl<T> WriteDisk for &mut T
239where
240    T: WriteDisk,
241{
242    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
243    where
244        C: rocksdb::AsColumnFamilyRef,
245        K: IntoDisk + Debug,
246        V: IntoDisk,
247    {
248        (*self).zs_insert(cf, key, value)
249    }
250
251    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
252    where
253        C: rocksdb::AsColumnFamilyRef,
254        K: IntoDisk + Debug,
255        V: IntoDisk,
256    {
257        (*self).zs_merge(cf, key, value)
258    }
259
260    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
261    where
262        C: rocksdb::AsColumnFamilyRef,
263        K: IntoDisk + Debug,
264    {
265        (*self).zs_delete(cf, key)
266    }
267
268    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
269    where
270        C: rocksdb::AsColumnFamilyRef,
271        K: IntoDisk + Debug,
272    {
273        (*self).zs_delete_range(cf, from, until_strictly_before)
274    }
275}
276
277/// Helper trait for retrieving and deserializing values from rocksdb column families.
278///
279/// # Deprecation
280///
281/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
282//
283// TODO: replace uses of this trait with TypedColumnFamily,
284//       implement these methods directly on DiskDb, and delete the trait.
285pub trait ReadDisk {
286    /// Returns true if a rocksdb column family `cf` does not contain any entries.
287    fn zs_is_empty<C>(&self, cf: &C) -> bool
288    where
289        C: rocksdb::AsColumnFamilyRef;
290
291    /// Returns the value for `key` in the rocksdb column family `cf`, if present.
292    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
293    where
294        C: rocksdb::AsColumnFamilyRef,
295        K: IntoDisk,
296        V: FromDisk;
297
298    /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
299    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
300    where
301        C: rocksdb::AsColumnFamilyRef,
302        K: IntoDisk;
303
304    /// Returns the lowest key in `cf`, and the corresponding value.
305    ///
306    /// Returns `None` if the column family is empty.
307    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
308    where
309        C: rocksdb::AsColumnFamilyRef,
310        K: IntoDisk + FromDisk,
311        V: FromDisk;
312
313    /// Returns the highest key in `cf`, and the corresponding value.
314    ///
315    /// Returns `None` if the column family is empty.
316    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
317    where
318        C: rocksdb::AsColumnFamilyRef,
319        K: IntoDisk + FromDisk,
320        V: FromDisk;
321
322    /// Returns the first key greater than or equal to `lower_bound` in `cf`,
323    /// and the corresponding value.
324    ///
325    /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
326    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
327    where
328        C: rocksdb::AsColumnFamilyRef,
329        K: IntoDisk + FromDisk,
330        V: FromDisk;
331
332    /// Returns the first key strictly greater than `lower_bound` in `cf`,
333    /// and the corresponding value.
334    ///
335    /// Returns `None` if there are no keys greater than `lower_bound`.
336    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
337    where
338        C: rocksdb::AsColumnFamilyRef,
339        K: IntoDisk + FromDisk,
340        V: FromDisk;
341
342    /// Returns the first key less than or equal to `upper_bound` in `cf`,
343    /// and the corresponding value.
344    ///
345    /// Returns `None` if there are no keys less than or equal to `upper_bound`.
346    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
347    where
348        C: rocksdb::AsColumnFamilyRef,
349        K: IntoDisk + FromDisk,
350        V: FromDisk;
351
352    /// Returns the first key strictly less than `upper_bound` in `cf`,
353    /// and the corresponding value.
354    ///
355    /// Returns `None` if there are no keys less than `upper_bound`.
356    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
357    where
358        C: rocksdb::AsColumnFamilyRef,
359        K: IntoDisk + FromDisk,
360        V: FromDisk;
361
362    /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
363    ///
364    /// Holding this iterator open might delay block commit transactions.
365    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
366    where
367        C: rocksdb::AsColumnFamilyRef,
368        K: IntoDisk + FromDisk + Ord,
369        V: FromDisk,
370        R: RangeBounds<K>;
371
372    /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
373    ///
374    /// Holding this iterator open might delay block commit transactions.
375    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
376    where
377        C: rocksdb::AsColumnFamilyRef,
378        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
379        V: FromDisk,
380        R: RangeBounds<K>;
381}
382
383impl PartialEq for DiskDb {
384    fn eq(&self, other: &Self) -> bool {
385        if self.db.path() == other.db.path() {
386            assert_eq!(
387                self.network, other.network,
388                "database with same path but different network configs",
389            );
390            assert_eq!(
391                self.ephemeral, other.ephemeral,
392                "database with same path but different ephemeral configs",
393            );
394
395            return true;
396        }
397
398        false
399    }
400}
401
402impl Eq for DiskDb {}
403
404/// # Deprecation
405///
406/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
407//
408// TODO: replace uses of these impls with TypedColumnFamily,
409//       implement these methods directly on DiskDb, and delete the trait.
410impl ReadDisk for DiskDb {
411    fn zs_is_empty<C>(&self, cf: &C) -> bool
412    where
413        C: rocksdb::AsColumnFamilyRef,
414    {
415        // Empty column families return invalid forward iterators.
416        //
417        // Checking iterator validity does not seem to cause database hangs.
418        let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
419        let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
420
421        !raw_iterator.valid()
422    }
423
424    #[allow(clippy::unwrap_in_result)]
425    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
426    where
427        C: rocksdb::AsColumnFamilyRef,
428        K: IntoDisk,
429        V: FromDisk,
430    {
431        let key_bytes = key.as_bytes();
432
433        // We use `get_pinned_cf` to avoid taking ownership of the serialized
434        // value, because we're going to deserialize it anyways, which avoids an
435        // extra copy
436        let value_bytes = self
437            .db
438            .get_pinned_cf(cf, key_bytes)
439            .expect("unexpected database failure");
440
441        value_bytes.map(V::from_bytes)
442    }
443
444    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
445    where
446        C: rocksdb::AsColumnFamilyRef,
447        K: IntoDisk,
448    {
449        let key_bytes = key.as_bytes();
450
451        // We use `get_pinned_cf` to avoid taking ownership of the serialized
452        // value, because we don't use the value at all. This avoids an extra copy.
453        self.db
454            .get_pinned_cf(cf, key_bytes)
455            .expect("unexpected database failure")
456            .is_some()
457    }
458
459    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
460    where
461        C: rocksdb::AsColumnFamilyRef,
462        K: IntoDisk + FromDisk,
463        V: FromDisk,
464    {
465        // Reading individual values from iterators does not seem to cause database hangs.
466        self.zs_forward_range_iter(cf, ..).next()
467    }
468
469    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
470    where
471        C: rocksdb::AsColumnFamilyRef,
472        K: IntoDisk + FromDisk,
473        V: FromDisk,
474    {
475        // Reading individual values from iterators does not seem to cause database hangs.
476        self.zs_reverse_range_iter(cf, ..).next()
477    }
478
479    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
480    where
481        C: rocksdb::AsColumnFamilyRef,
482        K: IntoDisk + FromDisk,
483        V: FromDisk,
484    {
485        self.zs_forward_range_iter(cf, lower_bound..).next()
486    }
487
488    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
489    where
490        C: rocksdb::AsColumnFamilyRef,
491        K: IntoDisk + FromDisk,
492        V: FromDisk,
493    {
494        use std::ops::Bound::*;
495
496        // There is no standard syntax for an excluded start bound.
497        self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
498            .next()
499    }
500
501    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
502    where
503        C: rocksdb::AsColumnFamilyRef,
504        K: IntoDisk + FromDisk,
505        V: FromDisk,
506    {
507        self.zs_reverse_range_iter(cf, ..=upper_bound).next()
508    }
509
510    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
511    where
512        C: rocksdb::AsColumnFamilyRef,
513        K: IntoDisk + FromDisk,
514        V: FromDisk,
515    {
516        self.zs_reverse_range_iter(cf, ..upper_bound).next()
517    }
518
519    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
520    where
521        C: rocksdb::AsColumnFamilyRef,
522        K: IntoDisk + FromDisk + Ord,
523        V: FromDisk,
524        R: RangeBounds<K>,
525    {
526        self.zs_forward_range_iter(cf, range).collect()
527    }
528
529    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
530    where
531        C: rocksdb::AsColumnFamilyRef,
532        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
533        V: FromDisk,
534        R: RangeBounds<K>,
535    {
536        self.zs_forward_range_iter(cf, range).collect()
537    }
538}
539
540impl DiskWriteBatch {
541    /// Creates and returns a new transactional batch write.
542    ///
543    /// # Correctness
544    ///
545    /// Each block must be written to the state inside a batch, so that:
546    /// - concurrent `ReadStateService` queries don't see half-written blocks, and
547    /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
548    pub fn new() -> Self {
549        DiskWriteBatch {
550            batch: rocksdb::WriteBatch::default(),
551        }
552    }
553}
554
555impl DiskDb {
556    /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
557    pub fn print_db_metrics(&self) {
558        let mut total_size_on_disk = 0;
559        let mut total_live_size_on_disk = 0;
560        let mut total_size_in_mem = 0;
561        let db: &Arc<DB> = &self.db;
562        let db_options = DiskDb::options();
563        let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
564        let mut column_families_log_string = String::from("");
565
566        write!(column_families_log_string, "Column families and sizes: ").unwrap();
567
568        for cf_descriptor in column_families {
569            let cf_name = &cf_descriptor.name();
570            let cf_handle = db
571                .cf_handle(cf_name)
572                .expect("Column family handle must exist");
573            let live_data_size = db
574                .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
575                .unwrap_or(Some(0));
576            let total_sst_files_size = db
577                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
578                .unwrap_or(Some(0));
579            let cf_disk_size = total_sst_files_size.unwrap_or(0);
580            total_size_on_disk += cf_disk_size;
581            total_live_size_on_disk += live_data_size.unwrap_or(0);
582            let mem_table_size = db
583                .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
584                .unwrap_or(Some(0));
585            total_size_in_mem += mem_table_size.unwrap_or(0);
586
587            write!(
588                column_families_log_string,
589                "{} (Disk: {}, Memory: {})",
590                cf_name,
591                human_bytes::human_bytes(cf_disk_size as f64),
592                human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
593            )
594            .unwrap();
595        }
596
597        debug!("{}", column_families_log_string);
598        info!(
599            "Total Database Disk Size: {}",
600            human_bytes::human_bytes(total_size_on_disk as f64)
601        );
602        info!(
603            "Total Live Data Disk Size: {}",
604            human_bytes::human_bytes(total_live_size_on_disk as f64)
605        );
606        info!(
607            "Total Database Memory Size: {}",
608            human_bytes::human_bytes(total_size_in_mem as f64)
609        );
610    }
611
612    /// Returns the estimated total disk space usage of the database.
613    pub fn size(&self) -> u64 {
614        let db: &Arc<DB> = &self.db;
615        let db_options = DiskDb::options();
616        let mut total_size_on_disk = 0;
617        for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
618            let cf_name = &cf_descriptor.name();
619            let cf_handle = db
620                .cf_handle(cf_name)
621                .expect("Column family handle must exist");
622
623            total_size_on_disk += db
624                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
625                .ok()
626                .flatten()
627                .unwrap_or(0);
628        }
629
630        total_size_on_disk
631    }
632
633    /// Sets `finished_format_upgrades` to true to indicate that Zebra has
634    /// finished applying any required db format upgrades.
635    pub fn mark_finished_format_upgrades(&self) {
636        self.finished_format_upgrades
637            .store(true, atomic::Ordering::SeqCst);
638    }
639
640    /// Returns true if the `finished_format_upgrades` flag has been set to true to
641    /// indicate that Zebra has finished applying any required db format upgrades.
642    pub fn finished_format_upgrades(&self) -> bool {
643        self.finished_format_upgrades.load(atomic::Ordering::SeqCst)
644    }
645
646    /// When called with a secondary DB instance, tries to catch up with the primary DB instance
647    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
648        self.db.try_catch_up_with_primary()
649    }
650
651    /// Returns a forward iterator over the items in `cf` in `range`.
652    ///
653    /// Holding this iterator open might delay block commit transactions.
654    pub fn zs_forward_range_iter<C, K, V, R>(
655        &self,
656        cf: &C,
657        range: R,
658    ) -> impl Iterator<Item = (K, V)> + '_
659    where
660        C: rocksdb::AsColumnFamilyRef,
661        K: IntoDisk + FromDisk,
662        V: FromDisk,
663        R: RangeBounds<K>,
664    {
665        self.zs_range_iter_with_direction(cf, range, false)
666    }
667
668    /// Returns a reverse iterator over the items in `cf` in `range`.
669    ///
670    /// Holding this iterator open might delay block commit transactions.
671    pub fn zs_reverse_range_iter<C, K, V, R>(
672        &self,
673        cf: &C,
674        range: R,
675    ) -> impl Iterator<Item = (K, V)> + '_
676    where
677        C: rocksdb::AsColumnFamilyRef,
678        K: IntoDisk + FromDisk,
679        V: FromDisk,
680        R: RangeBounds<K>,
681    {
682        self.zs_range_iter_with_direction(cf, range, true)
683    }
684
685    /// Returns an iterator over the items in `cf` in `range`.
686    ///
687    /// RocksDB iterators are ordered by increasing key bytes by default.
688    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
689    ///
690    /// Holding this iterator open might delay block commit transactions.
691    fn zs_range_iter_with_direction<C, K, V, R>(
692        &self,
693        cf: &C,
694        range: R,
695        reverse: bool,
696    ) -> impl Iterator<Item = (K, V)> + '_
697    where
698        C: rocksdb::AsColumnFamilyRef,
699        K: IntoDisk + FromDisk,
700        V: FromDisk,
701        R: RangeBounds<K>,
702    {
703        use std::ops::Bound::{self, *};
704
705        // Replace with map() when it stabilises:
706        // https://github.com/rust-lang/rust/issues/86026
707        let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
708            match bound {
709                Unbounded => Unbounded,
710                Included(x) => Included(x.as_bytes().as_ref().to_vec()),
711                Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
712            }
713        };
714
715        let start_bound = map_to_vec(range.start_bound());
716        let end_bound = map_to_vec(range.end_bound());
717        let range = (start_bound, end_bound);
718
719        let mode = Self::zs_iter_mode(&range, reverse);
720        let opts = Self::zs_iter_opts(&range);
721
722        // Reading multiple items from iterators has caused database hangs,
723        // in previous RocksDB versions
724        self.db
725            .iterator_cf_opt(cf, opts, mode)
726            .map(|result| result.expect("unexpected database failure"))
727            .map(|(key, value)| (key.to_vec(), value))
728            // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
729            // strictly before the "from" bound.
730            .skip_while({
731                let range = range.clone();
732                move |(key, _value)| !range.contains(key)
733            })
734            // Take until the excluded "to" bound is reached,
735            // or we're after the included "to" bound.
736            .take_while(move |(key, _value)| range.contains(key))
737            .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
738    }
739
740    /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
741    fn zs_iter_opts<R>(range: &R) -> ReadOptions
742    where
743        R: RangeBounds<Vec<u8>>,
744    {
745        let mut opts = ReadOptions::default();
746        let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
747
748        if let Some(bound) = lower_bound {
749            opts.set_iterate_lower_bound(bound);
750        };
751
752        if let Some(bound) = upper_bound {
753            opts.set_iterate_upper_bound(bound);
754        };
755
756        opts
757    }
758
759    /// Returns a lower and upper iterate bounds for a range.
760    ///
761    /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
762    ///       will increment the upper bound by 1 if the end bound of the provided range
763    ///       is inclusive.
764    fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
765    where
766        R: RangeBounds<Vec<u8>>,
767    {
768        use std::ops::Bound::*;
769
770        let lower_bound = match range.start_bound() {
771            Included(bound) | Excluded(bound) => Some(bound.clone()),
772            Unbounded => None,
773        };
774
775        let upper_bound = match range.end_bound().cloned() {
776            Included(mut bound) => {
777                // Increment the last byte in the upper bound that is less than u8::MAX, and
778                // clear any bytes after it to increment the next key in lexicographic order
779                // (next big-endian number). RocksDB uses lexicographic order for keys.
780                let is_wrapped_overflow = increment_big_endian(&mut bound);
781
782                if is_wrapped_overflow {
783                    bound.insert(0, 0x01)
784                }
785
786                Some(bound)
787            }
788            Excluded(bound) => Some(bound),
789            Unbounded => None,
790        };
791
792        (lower_bound, upper_bound)
793    }
794
795    /// Returns the RocksDB iterator "from" mode for `range`.
796    ///
797    /// RocksDB iterators are ordered by increasing key bytes by default.
798    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
799    fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode<'_>
800    where
801        R: RangeBounds<Vec<u8>>,
802    {
803        use std::ops::Bound::*;
804
805        let from_bound = if reverse {
806            range.end_bound()
807        } else {
808            range.start_bound()
809        };
810
811        match from_bound {
812            Unbounded => {
813                if reverse {
814                    // Reversed unbounded iterators start from the last item
815                    rocksdb::IteratorMode::End
816                } else {
817                    // Unbounded iterators start from the first item
818                    rocksdb::IteratorMode::Start
819                }
820            }
821
822            Included(bound) | Excluded(bound) => {
823                let direction = if reverse {
824                    rocksdb::Direction::Reverse
825                } else {
826                    rocksdb::Direction::Forward
827                };
828
829                rocksdb::IteratorMode::From(bound.as_slice(), direction)
830            }
831        }
832    }
833
834    /// The ideal open file limit for Zebra
835    const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
836
837    /// The minimum number of open files for Zebra to operate normally. Also used
838    /// as the default open file limit, when the OS doesn't tell us how many
839    /// files we can use.
840    ///
841    /// We want 100+ file descriptors for peers, and 100+ for the database.
842    ///
843    /// On Windows, the default limit is 512 high-level I/O files, and 8192
844    /// low-level I/O files:
845    /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
846    const MIN_OPEN_FILE_LIMIT: u64 = 512;
847
848    /// The number of files used internally by Zebra.
849    ///
850    /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
851    /// stdio (3), and other OS facilities (2+).
852    const RESERVED_FILE_COUNT: u64 = 48;
853
854    /// The size of the database memtable RAM cache in megabytes.
855    ///
856    /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
857    const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
858
859    /// Build a vector of current column families on the disk and optionally any new column families.
860    /// Returns an iterable collection of all column families.
861    fn construct_column_families(
862        db_options: Options,
863        path: &Path,
864        column_families_in_code: impl IntoIterator<Item = String>,
865    ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
866        // When opening the database in read/write mode, all column families must be opened.
867        //
868        // To make Zebra forward-compatible with databases updated by later versions,
869        // we read any existing column families off the disk, then add any new column families
870        // from the current implementation.
871        //
872        // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
873        let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
874        let column_families_in_code = column_families_in_code.into_iter();
875
876        column_families_on_disk
877            .into_iter()
878            .chain(column_families_in_code)
879            .unique()
880            .map(move |cf_name: String| {
881                let mut cf_options = db_options.clone();
882
883                if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
884                    cf_options.set_merge_operator_associative(
885                        BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
886                        fetch_add_balance_and_received,
887                    );
888                }
889
890                rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
891            })
892    }
893
894    /// Opens or creates the database at a path based on the kind, major version and network,
895    /// with the supplied column families, preserving any existing column families,
896    /// and returns a shared low-level database wrapper.
897    ///
898    /// # Panics
899    ///
900    /// - If the cache directory does not exist and can't be created.
901    /// - If the database cannot be opened for whatever reason.
902    pub fn new(
903        config: &Config,
904        db_kind: impl AsRef<str>,
905        format_version_in_code: &Version,
906        network: &Network,
907        column_families_in_code: impl IntoIterator<Item = String>,
908        read_only: bool,
909    ) -> DiskDb {
910        // If the database is ephemeral, we don't need to check the cache directory.
911        if !config.ephemeral {
912            DiskDb::validate_cache_dir(&config.cache_dir);
913        }
914
915        let db_kind = db_kind.as_ref();
916        let path = config.db_path(db_kind, format_version_in_code.major, network);
917
918        let db_options = DiskDb::options();
919
920        let column_families =
921            DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
922
923        let db_result = if read_only {
924            // Use a tempfile for the secondary instance cache directory
925            let secondary_config = Config {
926                ephemeral: true,
927                ..config.clone()
928            };
929            let secondary_path =
930                secondary_config.db_path("secondary_state", format_version_in_code.major, network);
931            let create_dir_result = std::fs::create_dir_all(&secondary_path);
932
933            info!(?create_dir_result, "creating secondary db directory");
934
935            DB::open_cf_descriptors_as_secondary(
936                &db_options,
937                &path,
938                &secondary_path,
939                column_families,
940            )
941        } else {
942            DB::open_cf_descriptors(&db_options, &path, column_families)
943        };
944
945        match db_result {
946            Ok(db) => {
947                info!("Opened Zebra state cache at {}", path.display());
948
949                let db = DiskDb {
950                    db_kind: db_kind.to_string(),
951                    format_version_in_code: format_version_in_code.clone(),
952                    network: network.clone(),
953                    ephemeral: config.ephemeral,
954                    db: Arc::new(db),
955                    finished_format_upgrades: Arc::new(AtomicBool::new(false)),
956                };
957
958                db.assert_default_cf_is_empty();
959
960                db
961            }
962
963            Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
964                "Database likely already open {path:?} \
965                         Hint: Check if another zebrad process is running."
966            ),
967
968            Err(e) => panic!(
969                "Opening database {path:?} failed. \
970                        Hint: Try changing the state cache_dir in the Zebra config. \
971                        Error: {e}",
972            ),
973        }
974    }
975
976    // Accessor methods
977
978    /// Returns the configured database kind for this database.
979    pub fn db_kind(&self) -> String {
980        self.db_kind.clone()
981    }
982
983    /// Returns the format version of the running code that created this `DiskDb` instance in memory.
984    pub fn format_version_in_code(&self) -> Version {
985        self.format_version_in_code.clone()
986    }
987
988    /// Returns the fixed major version for this database.
989    pub fn major_version(&self) -> u64 {
990        self.format_version_in_code().major
991    }
992
993    /// Returns the configured network for this database.
994    pub fn network(&self) -> Network {
995        self.network.clone()
996    }
997
998    /// Returns the `Path` where the files used by this database are located.
999    pub fn path(&self) -> &Path {
1000        self.db.path()
1001    }
1002
1003    /// Returns the low-level rocksdb inner database.
1004    #[allow(dead_code)]
1005    fn inner(&self) -> &Arc<DB> {
1006        &self.db
1007    }
1008
1009    /// Returns the column family handle for `cf_name`.
1010    pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
1011        // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
1012        // the shorter of &self and &str, but RocksDB clones column family names internally, so it
1013        // should just be &self. To avoid this restriction, clone the string before passing it to
1014        // this method. Currently Zebra uses static strings, so this doesn't matter.
1015        self.db.cf_handle(cf_name)
1016    }
1017
1018    // Read methods are located in the ReadDisk trait
1019
1020    // Write methods
1021    // Low-level write methods are located in the WriteDisk trait
1022
1023    /// Writes `batch` to the database.
1024    pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1025        self.db.write(batch.batch)
1026    }
1027
1028    // Private methods
1029
1030    /// Tries to reuse an existing db after a major upgrade.
1031    ///
1032    /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1033    /// db to a new path so it can be used again. It does so by merely trying to rename the path
1034    /// corresponding to the db version directly preceding the current version to the path that is
1035    /// used by the current db. If successful, it also deletes the db version file.
1036    ///
1037    /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1038    // TODO: Update this function to rename older major db format version to the current version (#9565).
1039    #[allow(clippy::unwrap_in_result)]
1040    pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1041        restorable_db_versions: &[u64],
1042        format_version_in_code: &Version,
1043        config: &Config,
1044        db_kind: impl AsRef<str>,
1045        network: &Network,
1046    ) -> Option<Version> {
1047        if let Some(&major_db_ver) = restorable_db_versions
1048            .iter()
1049            .find(|v| **v == format_version_in_code.major)
1050        {
1051            let db_kind = db_kind.as_ref();
1052
1053            let old_major_db_ver = major_db_ver - 1;
1054            let old_path = config.db_path(db_kind, old_major_db_ver, network);
1055            // Exit early if the path doesn't exist or there's an error checking it.
1056            if !fs::exists(&old_path).unwrap_or(false) {
1057                return None;
1058            }
1059
1060            let new_path = config.db_path(db_kind, major_db_ver, network);
1061
1062            let old_path = match fs::canonicalize(&old_path) {
1063                Ok(canonicalized_old_path) => canonicalized_old_path,
1064                Err(e) => {
1065                    warn!("could not canonicalize {old_path:?}: {e}");
1066                    return None;
1067                }
1068            };
1069
1070            let cache_path = match fs::canonicalize(&config.cache_dir) {
1071                Ok(canonicalized_cache_path) => canonicalized_cache_path,
1072                Err(e) => {
1073                    warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1074                    return None;
1075                }
1076            };
1077
1078            // # Correctness
1079            //
1080            // Check that the path we're about to move is inside the cache directory.
1081            //
1082            // If the user has symlinked the state directory to a non-cache directory, we don't want
1083            // to move it, because it might contain other files.
1084            //
1085            // We don't attempt to guard against malicious symlinks created by attackers
1086            // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1087            if !old_path.starts_with(&cache_path) {
1088                info!("skipped reusing previous state cache: state is outside cache directory");
1089                return None;
1090            }
1091
1092            let opts = DiskDb::options();
1093            let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1094            let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1095
1096            if old_db_exists && !new_db_exists {
1097                // Create the parent directory for the new db. This is because we can't directly
1098                // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1099                // `state/v26/` does not exist.
1100                match fs::create_dir_all(
1101                    new_path
1102                        .parent()
1103                        .expect("new state cache must have a parent path"),
1104                ) {
1105                    Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1106                    Err(e) => {
1107                        warn!(
1108                            "could not create new directory for state cache at {new_path:?}: {e}"
1109                        );
1110                        return None;
1111                    }
1112                };
1113
1114                match fs::rename(&old_path, &new_path) {
1115                    Ok(()) => {
1116                        info!("moved state cache from {old_path:?} to {new_path:?}");
1117
1118                        let mut disk_version =
1119                            database_format_version_on_disk(config, db_kind, major_db_ver, network)
1120                                .expect("unable to read database format version file")
1121                                .expect("unable to parse database format version");
1122
1123                        disk_version.major = old_major_db_ver;
1124
1125                        write_database_format_version_to_disk(
1126                            config,
1127                            db_kind,
1128                            major_db_ver,
1129                            &disk_version,
1130                            network,
1131                        )
1132                        .expect("unable to write database format version file to disk");
1133
1134                        // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1135                        // empty.
1136                        let old_path = old_path
1137                            .parent()
1138                            .expect("old state cache must have parent path");
1139
1140                        if fs::read_dir(old_path)
1141                            .expect("cached state dir needs to be readable")
1142                            .next()
1143                            .is_none()
1144                        {
1145                            match fs::remove_dir_all(old_path) {
1146                                Ok(()) => {
1147                                    info!("removed empty old state cache directory at {old_path:?}")
1148                                }
1149                                Err(e) => {
1150                                    warn!(
1151                                        "could not remove empty old state cache directory \
1152                                           at {old_path:?}: {e}"
1153                                    )
1154                                }
1155                            }
1156                        }
1157
1158                        return Some(disk_version);
1159                    }
1160                    Err(e) => {
1161                        warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1162                    }
1163                };
1164            }
1165        };
1166
1167        None
1168    }
1169
1170    /// Returns the database options for the finalized state database.
1171    fn options() -> rocksdb::Options {
1172        let mut opts = rocksdb::Options::default();
1173        let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1174
1175        const ONE_MEGABYTE: usize = 1024 * 1024;
1176
1177        opts.create_if_missing(true);
1178        opts.create_missing_column_families(true);
1179
1180        // Use the recommended Ribbon filter setting for all column families.
1181        //
1182        // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1183        // (They aren't needed for single-valued column families, but they don't hurt either.)
1184        block_based_opts.set_ribbon_filter(9.9);
1185
1186        // Use the recommended LZ4 compression type.
1187        //
1188        // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1189        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1190
1191        // Tune level-style database file compaction.
1192        //
1193        // This improves Zebra's initial sync speed slightly, as of April 2022.
1194        opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1195
1196        // Increase the process open file limit if needed,
1197        // then use it to set RocksDB's limit.
1198        let open_file_limit = DiskDb::increase_open_file_limit();
1199        let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1200
1201        // If the current limit is very large, set the DB limit using the ideal limit
1202        let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1203            .try_into()
1204            .expect("ideal open file limit fits in a c_int");
1205        let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1206
1207        opts.set_max_open_files(db_file_limit);
1208
1209        // Set the block-based options
1210        opts.set_block_based_table_factory(&block_based_opts);
1211
1212        opts
1213    }
1214
1215    /// Calculate the database's share of `open_file_limit`
1216    fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1217        // Give the DB half the files, and reserve half the files for peers
1218        (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1219    }
1220
1221    /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1222    /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1223    ///
1224    /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1225    /// unchanged.
1226    ///
1227    /// Returns the current limit, after any successful increases.
1228    ///
1229    /// # Panics
1230    ///
1231    /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1232    fn increase_open_file_limit() -> u64 {
1233        // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1234        // (`zebra-state` database).
1235        //
1236        // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1237        // both database files and TCP connections.
1238        //
1239        // But it doesn't do anything on Windows in rlimit 0.7.0.
1240        //
1241        // On Windows, the default limits are:
1242        // - 512 high-level stream I/O files (via the C standard functions),
1243        // - 8192 low-level I/O files (via the Unix C functions), and
1244        // - 1000 TCP Control Block entries (network connections).
1245        //
1246        // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1247        // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1248        //
1249        // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1250        // the Windows low-level I/O file limit.
1251        //
1252        // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1253        // functions from the `rlimit` crate only change the high-level I/O file limit.
1254        //
1255        // `zebra-network`'s default connection limit is much less than
1256        // the TCP Control Block limit on Windows.
1257
1258        // We try setting the ideal limit, then the minimum limit.
1259        let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1260            Ok(current_limit) => current_limit,
1261            Err(limit_error) => {
1262                // These errors can happen due to sandboxing or unsupported system calls,
1263                // even if the file limit is high enough.
1264                info!(
1265                    ?limit_error,
1266                    min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1267                    ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1268                    "unable to increase the open file limit, \
1269                     assuming Zebra can open a minimum number of files"
1270                );
1271
1272                return DiskDb::MIN_OPEN_FILE_LIMIT;
1273            }
1274        };
1275
1276        if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1277            panic!(
1278                "open file limit too low: \
1279                 unable to set the number of open files to {}, \
1280                 the minimum number of files required by Zebra. \
1281                 Current limit is {:?}. \
1282                 Hint: Increase the open file limit to {} before launching Zebra",
1283                DiskDb::MIN_OPEN_FILE_LIMIT,
1284                current_limit,
1285                DiskDb::IDEAL_OPEN_FILE_LIMIT
1286            );
1287        } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1288            warn!(
1289                ?current_limit,
1290                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1291                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1292                "the maximum number of open files is below Zebra's ideal limit. \
1293                 Hint: Increase the open file limit to {} before launching Zebra",
1294                DiskDb::IDEAL_OPEN_FILE_LIMIT
1295            );
1296        } else if cfg!(windows) {
1297            // This log is verbose during tests.
1298            #[cfg(not(test))]
1299            info!(
1300                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1301                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1302                "assuming the open file limit is high enough for Zebra",
1303            );
1304            #[cfg(test)]
1305            debug!(
1306                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1307                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1308                "assuming the open file limit is high enough for Zebra",
1309            );
1310        } else {
1311            #[cfg(not(test))]
1312            debug!(
1313                ?current_limit,
1314                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1315                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1316                "the open file limit is high enough for Zebra",
1317            );
1318            #[cfg(test)]
1319            debug!(
1320                ?current_limit,
1321                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1322                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1323                "the open file limit is high enough for Zebra",
1324            );
1325        }
1326
1327        current_limit
1328    }
1329
1330    // Cleanup methods
1331
1332    /// Returns the number of shared instances of this database.
1333    ///
1334    /// # Concurrency
1335    ///
1336    /// The actual number of owners can be higher or lower than the returned value,
1337    /// because databases can simultaneously be cloned or dropped in other threads.
1338    ///
1339    /// However, if the number of owners is 1, and the caller has exclusive access,
1340    /// the count can't increase unless that caller clones the database.
1341    pub(crate) fn shared_database_owners(&self) -> usize {
1342        Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1343    }
1344
1345    /// Shut down the database, cleaning up background tasks and ephemeral data.
1346    ///
1347    /// If `force` is true, clean up regardless of any shared references.
1348    /// `force` can cause errors accessing the database from other shared references.
1349    /// It should only be used in debugging or test code, immediately before a manual shutdown.
1350    ///
1351    /// TODO: make private after the stop height check has moved to the syncer (#3442)
1352    ///       move shutting down the database to a blocking thread (#2188)
1353    pub(crate) fn shutdown(&mut self, force: bool) {
1354        // # Correctness
1355        //
1356        // If we're the only owner of the shared database instance,
1357        // then there are no other threads that can increase the strong or weak count.
1358        //
1359        // ## Implementation Requirements
1360        //
1361        // This function and all functions that it calls should avoid cloning the shared database
1362        // instance. If they do, they must drop it before:
1363        // - shutting down database threads, or
1364        // - deleting database files.
1365
1366        if self.shared_database_owners() > 1 {
1367            let path = self.path();
1368
1369            let mut ephemeral_note = "";
1370
1371            if force {
1372                if self.ephemeral {
1373                    ephemeral_note = " and removing ephemeral files";
1374                }
1375
1376                // This log is verbose during tests.
1377                #[cfg(not(test))]
1378                info!(
1379                    ?path,
1380                    "forcing shutdown{} of a state database with multiple active instances",
1381                    ephemeral_note,
1382                );
1383                #[cfg(test)]
1384                debug!(
1385                    ?path,
1386                    "forcing shutdown{} of a state database with multiple active instances",
1387                    ephemeral_note,
1388                );
1389            } else {
1390                if self.ephemeral {
1391                    ephemeral_note = " and files";
1392                }
1393
1394                debug!(
1395                    ?path,
1396                    "dropping DiskDb clone, \
1397                     but keeping shared database instance{} until the last reference is dropped",
1398                    ephemeral_note,
1399                );
1400                return;
1401            }
1402        }
1403
1404        self.assert_default_cf_is_empty();
1405
1406        // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1407        //
1408        // Zebra's data should be fine if we don't clean up, because:
1409        // - the database flushes regularly anyway
1410        // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1411        // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1412        let path = self.path();
1413        debug!(?path, "flushing database to disk");
1414
1415        // These flushes can fail during forced shutdown or during Drop after a shutdown,
1416        // particularly in tests. If they fail, there's nothing we can do about it anyway.
1417        if let Err(error) = self.db.flush() {
1418            let error = format!("{error:?}");
1419            if error.to_ascii_lowercase().contains("shutdown in progress") {
1420                debug!(
1421                    ?error,
1422                    ?path,
1423                    "expected shutdown error flushing database SST files to disk"
1424                );
1425            } else {
1426                info!(
1427                    ?error,
1428                    ?path,
1429                    "unexpected error flushing database SST files to disk during shutdown"
1430                );
1431            }
1432        }
1433
1434        if let Err(error) = self.db.flush_wal(true) {
1435            let error = format!("{error:?}");
1436            if error.to_ascii_lowercase().contains("shutdown in progress") {
1437                debug!(
1438                    ?error,
1439                    ?path,
1440                    "expected shutdown error flushing database WAL buffer to disk"
1441                );
1442            } else {
1443                info!(
1444                    ?error,
1445                    ?path,
1446                    "unexpected error flushing database WAL buffer to disk during shutdown"
1447                );
1448            }
1449        }
1450
1451        // # Memory Safety
1452        //
1453        // We'd like to call `cancel_all_background_work()` before Zebra exits,
1454        // but when we call it, we get memory, thread, or C++ errors when the process exits.
1455        // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1456        // all the threads have cleaned up.)
1457        //
1458        // # Change History
1459        //
1460        // We've changed this setting multiple times since 2021, in response to new RocksDB
1461        // and Rust compiler behaviour.
1462        //
1463        // We enabled cancel_all_background_work() due to failures on:
1464        // - Rust 1.57 on Linux
1465        //
1466        // We disabled cancel_all_background_work() due to failures on:
1467        // - Rust 1.64 on Linux
1468        //
1469        // We tried enabling cancel_all_background_work() due to failures on:
1470        // - Rust 1.70 on macOS 12.6.5 on x86_64
1471        // but it didn't stop the aborts happening (PR #6820).
1472        //
1473        // There weren't any failures with cancel_all_background_work() disabled on:
1474        // - Rust 1.69 or earlier
1475        // - Linux with Rust 1.70
1476        // And with cancel_all_background_work() enabled or disabled on:
1477        // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1478        //
1479        // # Detailed Description
1480        //
1481        // We see these kinds of errors:
1482        // ```
1483        // pthread lock: Invalid argument
1484        // pure virtual method called
1485        // terminate called without an active exception
1486        // pthread destroy mutex: Device or resource busy
1487        // Aborted (core dumped)
1488        // signal: 6, SIGABRT: process abort signal
1489        // signal: 11, SIGSEGV: invalid memory reference
1490        // ```
1491        //
1492        // # Reference
1493        //
1494        // The RocksDB wiki says:
1495        // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1496        // >
1497        // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1498        // > You can speed up the waiting by calling CancelAllBackgroundWork().
1499        //
1500        // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1501        //
1502        // > rocksdb::DB instances need to be destroyed before your main function exits.
1503        // > RocksDB instances usually depend on some internal static variables.
1504        // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1505        //
1506        // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1507        //
1508        // # TODO
1509        //
1510        // Try re-enabling this code and fixing the underlying concurrency bug.
1511        //
1512        //info!(?path, "stopping background database tasks");
1513        //self.db.cancel_all_background_work(true);
1514
1515        // We'd like to drop the database before deleting its files,
1516        // because that closes the column families and the database correctly.
1517        // But Rust's ownership rules make that difficult,
1518        // so we just flush and delete ephemeral data instead.
1519        //
1520        // This implementation doesn't seem to cause any issues,
1521        // and the RocksDB Drop implementation handles any cleanup.
1522        self.delete_ephemeral();
1523    }
1524
1525    /// If the database is `ephemeral`, delete its files.
1526    fn delete_ephemeral(&mut self) {
1527        // # Correctness
1528        //
1529        // This function and all functions that it calls should avoid cloning the shared database
1530        // instance. See `shutdown()` for details.
1531
1532        if !self.ephemeral {
1533            return;
1534        }
1535
1536        let path = self.path();
1537
1538        // This log is verbose during tests.
1539        #[cfg(not(test))]
1540        info!(?path, "removing temporary database files");
1541        #[cfg(test)]
1542        debug!(?path, "removing temporary database files");
1543
1544        // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1545        // but the Zcash blockchain might not fit in memory. So we just
1546        // delete the database files instead.
1547        //
1548        // We'd also like to call `DB::destroy` here, but calling destroy on a
1549        // live DB is undefined behaviour:
1550        // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1551        //
1552        // So we assume that all the database files are under `path`, and
1553        // delete them using standard filesystem APIs. Deleting open files
1554        // might cause errors on non-Unix platforms, so we ignore the result.
1555        // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1556        let result = std::fs::remove_dir_all(path);
1557
1558        if result.is_err() {
1559            // This log is verbose during tests.
1560            #[cfg(not(test))]
1561            info!(
1562                ?result,
1563                ?path,
1564                "removing temporary database files caused an error",
1565            );
1566            #[cfg(test)]
1567            debug!(
1568                ?result,
1569                ?path,
1570                "removing temporary database files caused an error",
1571            );
1572        } else {
1573            debug!(
1574                ?result,
1575                ?path,
1576                "successfully removed temporary database files",
1577            );
1578        }
1579    }
1580
1581    /// Check that the "default" column family is empty.
1582    ///
1583    /// # Panics
1584    ///
1585    /// If Zebra has a bug where it is storing data in the wrong column family.
1586    fn assert_default_cf_is_empty(&self) {
1587        // # Correctness
1588        //
1589        // This function and all functions that it calls should avoid cloning the shared database
1590        // instance. See `shutdown()` for details.
1591
1592        if let Some(default_cf) = self.cf_handle("default") {
1593            assert!(
1594                self.zs_is_empty(&default_cf),
1595                "Zebra should not store data in the 'default' column family"
1596            );
1597        }
1598    }
1599
1600    // Validates a cache directory and creates it if it doesn't exist.
1601    // If the directory cannot be created, it panics with a specific error message.
1602    fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1603        if let Err(e) = fs::create_dir_all(cache_dir) {
1604            match e.kind() {
1605                std::io::ErrorKind::PermissionDenied => panic!(
1606                    "Permission denied creating {cache_dir:?}. \
1607                     Hint: check if cache directory exist and has write permissions."
1608                ),
1609                std::io::ErrorKind::StorageFull => panic!(
1610                    "No space left on device creating {cache_dir:?}. \
1611                     Hint: check if the disk is full."
1612                ),
1613                _ => panic!("Could not create cache dir {cache_dir:?}: {e}"),
1614            }
1615        }
1616    }
1617}
1618
1619impl Drop for DiskDb {
1620    fn drop(&mut self) {
1621        let path = self.path();
1622        debug!(?path, "dropping DiskDb instance");
1623
1624        self.shutdown(false);
1625    }
1626}