Skip to main content

zebra_state/service/finalized_state/
disk_db.rs

1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//!   ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14    collections::{BTreeMap, HashMap},
15    fmt::{Debug, Write},
16    fs,
17    ops::RangeBounds,
18    path::Path,
19    sync::{
20        atomic::{self, AtomicBool},
21        Arc,
22    },
23};
24
25use itertools::Itertools;
26use rlimit::increase_nofile_limit;
27
28use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
29use semver::Version;
30use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
31
32use crate::{
33    database_format_version_on_disk,
34    service::finalized_state::disk_format::{FromDisk, IntoDisk},
35    write_database_format_version_to_disk, Config,
36};
37
38use super::zebra_db::transparent::{
39    fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
40    BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
41};
42// Doc-only imports
43#[allow(unused_imports)]
44use super::{TypedColumnFamily, WriteTypedBatch};
45
46#[cfg(any(test, feature = "proptest-impl"))]
47mod tests;
48
49/// The [`rocksdb::ThreadMode`] used by the database.
50pub type DBThreadMode = rocksdb::SingleThreaded;
51
52/// The [`rocksdb`] database type, including thread mode.
53///
54/// Also the [`rocksdb::DBAccess`] used by database iterators.
55pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
56
57/// Wrapper struct to ensure low-level database access goes through the correct API.
58///
59/// `rocksdb` allows concurrent writes through a shared reference,
60/// so database instances are cloneable. When the final clone is dropped,
61/// the database is closed.
62///
63/// # Correctness
64///
65/// Reading transactions from the database using RocksDB iterators causes hangs.
66/// But creating iterators and reading the tip height works fine.
67///
68/// So these hangs are probably caused by holding column family locks to read:
69/// - multiple values, or
70/// - large values.
71///
72/// This bug might be fixed by moving database operations to blocking threads (#2188),
73/// so that they don't block the tokio executor.
74/// (Or it might be fixed by future RocksDB upgrades.)
75#[derive(Clone, Debug)]
76pub struct DiskDb {
77    // Configuration
78    //
79    // This configuration cannot be modified after the database is initialized,
80    // because some clones would have different values.
81    //
82    /// The configured database kind for this database.
83    db_kind: String,
84
85    /// The format version of the running Zebra code.
86    format_version_in_code: Version,
87
88    /// The configured network for this database.
89    network: Network,
90
91    /// The configured temporary database setting.
92    ///
93    /// If true, the database files are deleted on drop.
94    ephemeral: bool,
95
96    /// A boolean flag indicating whether the db format change task has finished
97    /// applying any format changes that may have been required.
98    finished_format_upgrades: Arc<AtomicBool>,
99
100    // Owned State
101    //
102    // Everything contained in this state must be shared by all clones, or read-only.
103    //
104    /// The shared inner RocksDB database.
105    ///
106    /// RocksDB allows reads and writes via a shared reference.
107    ///
108    /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
109    /// column family changes and [`Drop`] require exclusive access.
110    ///
111    /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
112    /// only [`Drop`] requires exclusive access.
113    db: Arc<DB>,
114}
115
116/// Wrapper struct to ensure low-level database writes go through the correct API.
117///
118/// [`rocksdb::WriteBatch`] is a batched set of database updates,
119/// which must be written to the database using `DiskDb::write(batch)`.
120#[must_use = "batches must be written to the database"]
121#[derive(Default)]
122pub struct DiskWriteBatch {
123    /// The inner RocksDB write batch.
124    batch: rocksdb::WriteBatch,
125}
126
127impl Debug for DiskWriteBatch {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("DiskWriteBatch")
130            .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
131            .finish()
132    }
133}
134
135impl PartialEq for DiskWriteBatch {
136    fn eq(&self, other: &Self) -> bool {
137        self.batch.data() == other.batch.data()
138    }
139}
140
141impl Eq for DiskWriteBatch {}
142
143/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
144///
145/// # Deprecation
146///
147/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
148//
149// TODO: replace uses of this trait with WriteTypedBatch,
150//       implement these methods directly on WriteTypedBatch, and delete the trait.
151pub trait WriteDisk {
152    /// Serialize and insert the given key and value into a rocksdb column family,
153    /// overwriting any existing `value` for `key`.
154    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
155    where
156        C: rocksdb::AsColumnFamilyRef,
157        K: IntoDisk + Debug,
158        V: IntoDisk;
159
160    /// Serialize and merge the given key and value into a rocksdb column family,
161    /// merging with any existing `value` for `key`.
162    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
163    where
164        C: rocksdb::AsColumnFamilyRef,
165        K: IntoDisk + Debug,
166        V: IntoDisk;
167
168    /// Remove the given key from a rocksdb column family, if it exists.
169    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
170    where
171        C: rocksdb::AsColumnFamilyRef,
172        K: IntoDisk + Debug;
173
174    /// Delete the given key range from a rocksdb column family, if it exists, including `from`
175    /// and excluding `until_strictly_before`.
176    //
177    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
178    //       see zs_range_iter() for an example of the edge cases
179    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
180    where
181        C: rocksdb::AsColumnFamilyRef,
182        K: IntoDisk + Debug;
183}
184
185/// # Deprecation
186///
187/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
188//
189// TODO: replace uses of these impls with WriteTypedBatch,
190//       implement these methods directly on WriteTypedBatch, and delete the trait.
191impl WriteDisk for DiskWriteBatch {
192    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
193    where
194        C: rocksdb::AsColumnFamilyRef,
195        K: IntoDisk + Debug,
196        V: IntoDisk,
197    {
198        let key_bytes = key.as_bytes();
199        let value_bytes = value.as_bytes();
200        self.batch.put_cf(cf, key_bytes, value_bytes);
201    }
202
203    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
204    where
205        C: rocksdb::AsColumnFamilyRef,
206        K: IntoDisk + Debug,
207        V: IntoDisk,
208    {
209        let key_bytes = key.as_bytes();
210        let value_bytes = value.as_bytes();
211        self.batch.merge_cf(cf, key_bytes, value_bytes);
212    }
213
214    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
215    where
216        C: rocksdb::AsColumnFamilyRef,
217        K: IntoDisk + Debug,
218    {
219        let key_bytes = key.as_bytes();
220        self.batch.delete_cf(cf, key_bytes);
221    }
222
223    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
224    //       see zs_range_iter() for an example of the edge cases
225    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
226    where
227        C: rocksdb::AsColumnFamilyRef,
228        K: IntoDisk + Debug,
229    {
230        let from_bytes = from.as_bytes();
231        let until_strictly_before_bytes = until_strictly_before.as_bytes();
232        self.batch
233            .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
234    }
235}
236
237// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
238impl<T> WriteDisk for &mut T
239where
240    T: WriteDisk,
241{
242    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
243    where
244        C: rocksdb::AsColumnFamilyRef,
245        K: IntoDisk + Debug,
246        V: IntoDisk,
247    {
248        (*self).zs_insert(cf, key, value)
249    }
250
251    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
252    where
253        C: rocksdb::AsColumnFamilyRef,
254        K: IntoDisk + Debug,
255        V: IntoDisk,
256    {
257        (*self).zs_merge(cf, key, value)
258    }
259
260    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
261    where
262        C: rocksdb::AsColumnFamilyRef,
263        K: IntoDisk + Debug,
264    {
265        (*self).zs_delete(cf, key)
266    }
267
268    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
269    where
270        C: rocksdb::AsColumnFamilyRef,
271        K: IntoDisk + Debug,
272    {
273        (*self).zs_delete_range(cf, from, until_strictly_before)
274    }
275}
276
277/// Helper trait for retrieving and deserializing values from rocksdb column families.
278///
279/// # Deprecation
280///
281/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
282//
283// TODO: replace uses of this trait with TypedColumnFamily,
284//       implement these methods directly on DiskDb, and delete the trait.
285pub trait ReadDisk {
286    /// Returns true if a rocksdb column family `cf` does not contain any entries.
287    fn zs_is_empty<C>(&self, cf: &C) -> bool
288    where
289        C: rocksdb::AsColumnFamilyRef;
290
291    /// Returns the value for `key` in the rocksdb column family `cf`, if present.
292    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
293    where
294        C: rocksdb::AsColumnFamilyRef,
295        K: IntoDisk,
296        V: FromDisk;
297
298    /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
299    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
300    where
301        C: rocksdb::AsColumnFamilyRef,
302        K: IntoDisk;
303
304    /// Returns the lowest key in `cf`, and the corresponding value.
305    ///
306    /// Returns `None` if the column family is empty.
307    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
308    where
309        C: rocksdb::AsColumnFamilyRef,
310        K: IntoDisk + FromDisk,
311        V: FromDisk;
312
313    /// Returns the highest key in `cf`, and the corresponding value.
314    ///
315    /// Returns `None` if the column family is empty.
316    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
317    where
318        C: rocksdb::AsColumnFamilyRef,
319        K: IntoDisk + FromDisk,
320        V: FromDisk;
321
322    /// Returns the first key greater than or equal to `lower_bound` in `cf`,
323    /// and the corresponding value.
324    ///
325    /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
326    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
327    where
328        C: rocksdb::AsColumnFamilyRef,
329        K: IntoDisk + FromDisk,
330        V: FromDisk;
331
332    /// Returns the first key strictly greater than `lower_bound` in `cf`,
333    /// and the corresponding value.
334    ///
335    /// Returns `None` if there are no keys greater than `lower_bound`.
336    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
337    where
338        C: rocksdb::AsColumnFamilyRef,
339        K: IntoDisk + FromDisk,
340        V: FromDisk;
341
342    /// Returns the first key less than or equal to `upper_bound` in `cf`,
343    /// and the corresponding value.
344    ///
345    /// Returns `None` if there are no keys less than or equal to `upper_bound`.
346    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
347    where
348        C: rocksdb::AsColumnFamilyRef,
349        K: IntoDisk + FromDisk,
350        V: FromDisk;
351
352    /// Returns the first key strictly less than `upper_bound` in `cf`,
353    /// and the corresponding value.
354    ///
355    /// Returns `None` if there are no keys less than `upper_bound`.
356    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
357    where
358        C: rocksdb::AsColumnFamilyRef,
359        K: IntoDisk + FromDisk,
360        V: FromDisk;
361
362    /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
363    ///
364    /// Holding this iterator open might delay block commit transactions.
365    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
366    where
367        C: rocksdb::AsColumnFamilyRef,
368        K: IntoDisk + FromDisk + Ord,
369        V: FromDisk,
370        R: RangeBounds<K>;
371
372    /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
373    ///
374    /// Holding this iterator open might delay block commit transactions.
375    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
376    where
377        C: rocksdb::AsColumnFamilyRef,
378        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
379        V: FromDisk,
380        R: RangeBounds<K>;
381}
382
383impl PartialEq for DiskDb {
384    fn eq(&self, other: &Self) -> bool {
385        if self.db.path() == other.db.path() {
386            assert_eq!(
387                self.network, other.network,
388                "database with same path but different network configs",
389            );
390            assert_eq!(
391                self.ephemeral, other.ephemeral,
392                "database with same path but different ephemeral configs",
393            );
394
395            return true;
396        }
397
398        false
399    }
400}
401
402impl Eq for DiskDb {}
403
404/// # Deprecation
405///
406/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
407//
408// TODO: replace uses of these impls with TypedColumnFamily,
409//       implement these methods directly on DiskDb, and delete the trait.
410impl ReadDisk for DiskDb {
411    fn zs_is_empty<C>(&self, cf: &C) -> bool
412    where
413        C: rocksdb::AsColumnFamilyRef,
414    {
415        // Empty column families return invalid forward iterators.
416        //
417        // Checking iterator validity does not seem to cause database hangs.
418        let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
419        let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
420
421        !raw_iterator.valid()
422    }
423
424    #[allow(clippy::unwrap_in_result)]
425    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
426    where
427        C: rocksdb::AsColumnFamilyRef,
428        K: IntoDisk,
429        V: FromDisk,
430    {
431        let key_bytes = key.as_bytes();
432
433        // We use `get_pinned_cf` to avoid taking ownership of the serialized
434        // value, because we're going to deserialize it anyways, which avoids an
435        // extra copy
436        let value_bytes = self
437            .db
438            .get_pinned_cf(cf, key_bytes)
439            .expect("unexpected database failure");
440
441        value_bytes.map(V::from_bytes)
442    }
443
444    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
445    where
446        C: rocksdb::AsColumnFamilyRef,
447        K: IntoDisk,
448    {
449        let key_bytes = key.as_bytes();
450
451        // We use `get_pinned_cf` to avoid taking ownership of the serialized
452        // value, because we don't use the value at all. This avoids an extra copy.
453        self.db
454            .get_pinned_cf(cf, key_bytes)
455            .expect("unexpected database failure")
456            .is_some()
457    }
458
459    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
460    where
461        C: rocksdb::AsColumnFamilyRef,
462        K: IntoDisk + FromDisk,
463        V: FromDisk,
464    {
465        // Reading individual values from iterators does not seem to cause database hangs.
466        self.zs_forward_range_iter(cf, ..).next()
467    }
468
469    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
470    where
471        C: rocksdb::AsColumnFamilyRef,
472        K: IntoDisk + FromDisk,
473        V: FromDisk,
474    {
475        // Reading individual values from iterators does not seem to cause database hangs.
476        self.zs_reverse_range_iter(cf, ..).next()
477    }
478
479    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
480    where
481        C: rocksdb::AsColumnFamilyRef,
482        K: IntoDisk + FromDisk,
483        V: FromDisk,
484    {
485        self.zs_forward_range_iter(cf, lower_bound..).next()
486    }
487
488    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
489    where
490        C: rocksdb::AsColumnFamilyRef,
491        K: IntoDisk + FromDisk,
492        V: FromDisk,
493    {
494        use std::ops::Bound::*;
495
496        // There is no standard syntax for an excluded start bound.
497        self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
498            .next()
499    }
500
501    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
502    where
503        C: rocksdb::AsColumnFamilyRef,
504        K: IntoDisk + FromDisk,
505        V: FromDisk,
506    {
507        self.zs_reverse_range_iter(cf, ..=upper_bound).next()
508    }
509
510    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
511    where
512        C: rocksdb::AsColumnFamilyRef,
513        K: IntoDisk + FromDisk,
514        V: FromDisk,
515    {
516        self.zs_reverse_range_iter(cf, ..upper_bound).next()
517    }
518
519    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
520    where
521        C: rocksdb::AsColumnFamilyRef,
522        K: IntoDisk + FromDisk + Ord,
523        V: FromDisk,
524        R: RangeBounds<K>,
525    {
526        self.zs_forward_range_iter(cf, range).collect()
527    }
528
529    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
530    where
531        C: rocksdb::AsColumnFamilyRef,
532        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
533        V: FromDisk,
534        R: RangeBounds<K>,
535    {
536        self.zs_forward_range_iter(cf, range).collect()
537    }
538}
539
540impl DiskWriteBatch {
541    /// Creates and returns a new transactional batch write.
542    ///
543    /// # Correctness
544    ///
545    /// Each block must be written to the state inside a batch, so that:
546    /// - concurrent `ReadStateService` queries don't see half-written blocks, and
547    /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
548    pub fn new() -> Self {
549        DiskWriteBatch {
550            batch: rocksdb::WriteBatch::default(),
551        }
552    }
553}
554
555impl DiskDb {
556    /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
557    pub fn print_db_metrics(&self) {
558        let mut total_size_on_disk = 0;
559        let mut total_live_size_on_disk = 0;
560        let mut total_size_in_mem = 0;
561        let db: &Arc<DB> = &self.db;
562        let db_options = DiskDb::options();
563        let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
564        let mut column_families_log_string = String::from("");
565
566        write!(column_families_log_string, "Column families and sizes: ").unwrap();
567
568        for cf_descriptor in column_families {
569            let cf_name = &cf_descriptor.name();
570            let cf_handle = db
571                .cf_handle(cf_name)
572                .expect("Column family handle must exist");
573            let live_data_size = db
574                .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
575                .unwrap_or(Some(0));
576            let total_sst_files_size = db
577                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
578                .unwrap_or(Some(0));
579            let cf_disk_size = total_sst_files_size.unwrap_or(0);
580            total_size_on_disk += cf_disk_size;
581            total_live_size_on_disk += live_data_size.unwrap_or(0);
582            let mem_table_size = db
583                .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
584                .unwrap_or(Some(0));
585            total_size_in_mem += mem_table_size.unwrap_or(0);
586
587            write!(
588                column_families_log_string,
589                "{} (Disk: {}, Memory: {})",
590                cf_name,
591                human_bytes::human_bytes(cf_disk_size as f64),
592                human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
593            )
594            .unwrap();
595        }
596
597        debug!("{}", column_families_log_string);
598        info!(
599            "Total Database Disk Size: {}",
600            human_bytes::human_bytes(total_size_on_disk as f64)
601        );
602        info!(
603            "Total Live Data Disk Size: {}",
604            human_bytes::human_bytes(total_live_size_on_disk as f64)
605        );
606        info!(
607            "Total Database Memory Size: {}",
608            human_bytes::human_bytes(total_size_in_mem as f64)
609        );
610    }
611
612    /// Exports RocksDB metrics to Prometheus.
613    ///
614    /// This function collects database statistics and exposes them as Prometheus metrics.
615    /// Call this periodically (e.g., every 30 seconds) from a background task.
616    pub(crate) fn export_metrics(&self) {
617        let db: &Arc<DB> = &self.db;
618        let db_options = DiskDb::options();
619        let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
620
621        let mut total_disk: u64 = 0;
622        let mut total_live: u64 = 0;
623        let mut total_mem: u64 = 0;
624
625        for cf_descriptor in column_families {
626            let cf_name = cf_descriptor.name().to_string();
627            if let Some(cf_handle) = db.cf_handle(&cf_name) {
628                let disk = db
629                    .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
630                    .ok()
631                    .flatten()
632                    .unwrap_or(0);
633                let live = db
634                    .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
635                    .ok()
636                    .flatten()
637                    .unwrap_or(0);
638                let mem = db
639                    .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
640                    .ok()
641                    .flatten()
642                    .unwrap_or(0);
643
644                total_disk += disk;
645                total_live += live;
646                total_mem += mem;
647
648                metrics::gauge!("zebra.state.rocksdb.cf_disk_size_bytes", "cf" => cf_name.clone())
649                    .set(disk as f64);
650                metrics::gauge!("zebra.state.rocksdb.cf_memory_size_bytes", "cf" => cf_name)
651                    .set(mem as f64);
652            }
653        }
654
655        metrics::gauge!("zebra.state.rocksdb.total_disk_size_bytes").set(total_disk as f64);
656        metrics::gauge!("zebra.state.rocksdb.live_data_size_bytes").set(total_live as f64);
657        metrics::gauge!("zebra.state.rocksdb.total_memory_size_bytes").set(total_mem as f64);
658
659        // Compaction metrics - these use database-wide properties (not per-column-family)
660        if let Ok(Some(pending)) = db.property_int_value("rocksdb.compaction-pending") {
661            metrics::gauge!("zebra.state.rocksdb.compaction.pending_bytes").set(pending as f64);
662        }
663
664        if let Ok(Some(running)) = db.property_int_value("rocksdb.num-running-compactions") {
665            metrics::gauge!("zebra.state.rocksdb.compaction.running").set(running as f64);
666        }
667
668        if let Ok(Some(cache)) = db.property_int_value("rocksdb.block-cache-usage") {
669            metrics::gauge!("zebra.state.rocksdb.block_cache_usage_bytes").set(cache as f64);
670        }
671
672        // Level-by-level file counts (RocksDB typically has up to 7 levels)
673        for level in 0..7 {
674            let prop = format!("rocksdb.num-files-at-level{}", level);
675            if let Ok(Some(count)) = db.property_int_value(&prop) {
676                metrics::gauge!("zebra.state.rocksdb.num_files_at_level", "level" => level.to_string())
677                    .set(count as f64);
678            }
679        }
680    }
681
682    /// Returns the estimated total disk space usage of the database.
683    pub fn size(&self) -> u64 {
684        let db: &Arc<DB> = &self.db;
685        let db_options = DiskDb::options();
686        let mut total_size_on_disk = 0;
687        for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
688            let cf_name = &cf_descriptor.name();
689            let cf_handle = db
690                .cf_handle(cf_name)
691                .expect("Column family handle must exist");
692
693            total_size_on_disk += db
694                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
695                .ok()
696                .flatten()
697                .unwrap_or(0);
698        }
699
700        total_size_on_disk
701    }
702
703    /// Sets `finished_format_upgrades` to true to indicate that Zebra has
704    /// finished applying any required db format upgrades.
705    pub fn mark_finished_format_upgrades(&self) {
706        self.finished_format_upgrades
707            .store(true, atomic::Ordering::SeqCst);
708    }
709
710    /// Returns true if the `finished_format_upgrades` flag has been set to true to
711    /// indicate that Zebra has finished applying any required db format upgrades.
712    pub fn finished_format_upgrades(&self) -> bool {
713        self.finished_format_upgrades.load(atomic::Ordering::SeqCst)
714    }
715
716    /// When called with a secondary DB instance, tries to catch up with the primary DB instance
717    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
718        self.db.try_catch_up_with_primary()
719    }
720
721    /// Returns a forward iterator over the items in `cf` in `range`.
722    ///
723    /// Holding this iterator open might delay block commit transactions.
724    pub fn zs_forward_range_iter<C, K, V, R>(
725        &self,
726        cf: &C,
727        range: R,
728    ) -> impl Iterator<Item = (K, V)> + '_
729    where
730        C: rocksdb::AsColumnFamilyRef,
731        K: IntoDisk + FromDisk,
732        V: FromDisk,
733        R: RangeBounds<K>,
734    {
735        self.zs_range_iter_with_direction(cf, range, false)
736    }
737
738    /// Returns a reverse iterator over the items in `cf` in `range`.
739    ///
740    /// Holding this iterator open might delay block commit transactions.
741    pub fn zs_reverse_range_iter<C, K, V, R>(
742        &self,
743        cf: &C,
744        range: R,
745    ) -> impl Iterator<Item = (K, V)> + '_
746    where
747        C: rocksdb::AsColumnFamilyRef,
748        K: IntoDisk + FromDisk,
749        V: FromDisk,
750        R: RangeBounds<K>,
751    {
752        self.zs_range_iter_with_direction(cf, range, true)
753    }
754
755    /// Returns an iterator over the items in `cf` in `range`.
756    ///
757    /// RocksDB iterators are ordered by increasing key bytes by default.
758    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
759    ///
760    /// Holding this iterator open might delay block commit transactions.
761    fn zs_range_iter_with_direction<C, K, V, R>(
762        &self,
763        cf: &C,
764        range: R,
765        reverse: bool,
766    ) -> impl Iterator<Item = (K, V)> + '_
767    where
768        C: rocksdb::AsColumnFamilyRef,
769        K: IntoDisk + FromDisk,
770        V: FromDisk,
771        R: RangeBounds<K>,
772    {
773        use std::ops::Bound::{self, *};
774
775        // Replace with map() when it stabilises:
776        // https://github.com/rust-lang/rust/issues/86026
777        let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
778            match bound {
779                Unbounded => Unbounded,
780                Included(x) => Included(x.as_bytes().as_ref().to_vec()),
781                Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
782            }
783        };
784
785        let start_bound = map_to_vec(range.start_bound());
786        let end_bound = map_to_vec(range.end_bound());
787        let range = (start_bound, end_bound);
788
789        let mode = Self::zs_iter_mode(&range, reverse);
790        let opts = Self::zs_iter_opts(&range);
791
792        // Reading multiple items from iterators has caused database hangs,
793        // in previous RocksDB versions
794        self.db
795            .iterator_cf_opt(cf, opts, mode)
796            .map(|result| result.expect("unexpected database failure"))
797            .map(|(key, value)| (key.to_vec(), value))
798            // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
799            // strictly before the "from" bound.
800            .skip_while({
801                let range = range.clone();
802                move |(key, _value)| !range.contains(key)
803            })
804            // Take until the excluded "to" bound is reached,
805            // or we're after the included "to" bound.
806            .take_while(move |(key, _value)| range.contains(key))
807            .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
808    }
809
810    /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
811    fn zs_iter_opts<R>(range: &R) -> ReadOptions
812    where
813        R: RangeBounds<Vec<u8>>,
814    {
815        let mut opts = ReadOptions::default();
816        let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
817
818        if let Some(bound) = lower_bound {
819            opts.set_iterate_lower_bound(bound);
820        };
821
822        if let Some(bound) = upper_bound {
823            opts.set_iterate_upper_bound(bound);
824        };
825
826        opts
827    }
828
829    /// Returns a lower and upper iterate bounds for a range.
830    ///
831    /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
832    ///       will increment the upper bound by 1 if the end bound of the provided range
833    ///       is inclusive.
834    fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
835    where
836        R: RangeBounds<Vec<u8>>,
837    {
838        use std::ops::Bound::*;
839
840        let lower_bound = match range.start_bound() {
841            Included(bound) | Excluded(bound) => Some(bound.clone()),
842            Unbounded => None,
843        };
844
845        let upper_bound = match range.end_bound().cloned() {
846            Included(mut bound) => {
847                // Increment the last byte in the upper bound that is less than u8::MAX, and
848                // clear any bytes after it to increment the next key in lexicographic order
849                // (next big-endian number). RocksDB uses lexicographic order for keys.
850                let is_wrapped_overflow = increment_big_endian(&mut bound);
851
852                if is_wrapped_overflow {
853                    bound.insert(0, 0x01)
854                }
855
856                Some(bound)
857            }
858            Excluded(bound) => Some(bound),
859            Unbounded => None,
860        };
861
862        (lower_bound, upper_bound)
863    }
864
865    /// Returns the RocksDB iterator "from" mode for `range`.
866    ///
867    /// RocksDB iterators are ordered by increasing key bytes by default.
868    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
869    fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode<'_>
870    where
871        R: RangeBounds<Vec<u8>>,
872    {
873        use std::ops::Bound::*;
874
875        let from_bound = if reverse {
876            range.end_bound()
877        } else {
878            range.start_bound()
879        };
880
881        match from_bound {
882            Unbounded => {
883                if reverse {
884                    // Reversed unbounded iterators start from the last item
885                    rocksdb::IteratorMode::End
886                } else {
887                    // Unbounded iterators start from the first item
888                    rocksdb::IteratorMode::Start
889                }
890            }
891
892            Included(bound) | Excluded(bound) => {
893                let direction = if reverse {
894                    rocksdb::Direction::Reverse
895                } else {
896                    rocksdb::Direction::Forward
897                };
898
899                rocksdb::IteratorMode::From(bound.as_slice(), direction)
900            }
901        }
902    }
903
904    /// The ideal open file limit for Zebra
905    const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
906
907    /// The minimum number of open files for Zebra to operate normally. Also used
908    /// as the default open file limit, when the OS doesn't tell us how many
909    /// files we can use.
910    ///
911    /// We want 100+ file descriptors for peers, and 100+ for the database.
912    ///
913    /// On Windows, the default limit is 512 high-level I/O files, and 8192
914    /// low-level I/O files:
915    /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
916    const MIN_OPEN_FILE_LIMIT: u64 = 512;
917
918    /// The number of files used internally by Zebra.
919    ///
920    /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
921    /// stdio (3), and other OS facilities (2+).
922    const RESERVED_FILE_COUNT: u64 = 48;
923
924    /// The size of the database memtable RAM cache in megabytes.
925    ///
926    /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
927    const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
928
929    /// Build a vector of current column families on the disk and optionally any new column families.
930    /// Returns an iterable collection of all column families.
931    fn construct_column_families(
932        db_options: Options,
933        path: &Path,
934        column_families_in_code: impl IntoIterator<Item = String>,
935    ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
936        // When opening the database in read/write mode, all column families must be opened.
937        //
938        // To make Zebra forward-compatible with databases updated by later versions,
939        // we read any existing column families off the disk, then add any new column families
940        // from the current implementation.
941        //
942        // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
943        let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
944        let column_families_in_code = column_families_in_code.into_iter();
945
946        column_families_on_disk
947            .into_iter()
948            .chain(column_families_in_code)
949            .unique()
950            .map(move |cf_name: String| {
951                let mut cf_options = db_options.clone();
952
953                if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
954                    cf_options.set_merge_operator_associative(
955                        BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
956                        fetch_add_balance_and_received,
957                    );
958                }
959
960                rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
961            })
962    }
963
964    /// Opens or creates the database at a path based on the kind, major version and network,
965    /// with the supplied column families, preserving any existing column families,
966    /// and returns a shared low-level database wrapper.
967    ///
968    /// # Panics
969    ///
970    /// - If the cache directory does not exist and can't be created.
971    /// - If the database cannot be opened for whatever reason.
972    pub fn new(
973        config: &Config,
974        db_kind: impl AsRef<str>,
975        format_version_in_code: &Version,
976        network: &Network,
977        column_families_in_code: impl IntoIterator<Item = String>,
978        read_only: bool,
979    ) -> DiskDb {
980        // If the database is ephemeral, we don't need to check the cache directory.
981        if !config.ephemeral {
982            DiskDb::validate_cache_dir(&config.cache_dir);
983        }
984
985        let db_kind = db_kind.as_ref();
986        let path = config.db_path(db_kind, format_version_in_code.major, network);
987
988        let db_options = DiskDb::options();
989
990        let column_families =
991            DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
992
993        let db_result = if read_only {
994            // Use a tempfile for the secondary instance cache directory
995            let secondary_config = Config {
996                ephemeral: true,
997                ..config.clone()
998            };
999            let secondary_path =
1000                secondary_config.db_path("secondary_state", format_version_in_code.major, network);
1001            let create_dir_result = std::fs::create_dir_all(&secondary_path);
1002
1003            info!(?create_dir_result, "creating secondary db directory");
1004
1005            DB::open_cf_descriptors_as_secondary(
1006                &db_options,
1007                &path,
1008                &secondary_path,
1009                column_families,
1010            )
1011        } else {
1012            DB::open_cf_descriptors(&db_options, &path, column_families)
1013        };
1014
1015        match db_result {
1016            Ok(db) => {
1017                info!("Opened Zebra state cache at {}", path.display());
1018
1019                let db = DiskDb {
1020                    db_kind: db_kind.to_string(),
1021                    format_version_in_code: format_version_in_code.clone(),
1022                    network: network.clone(),
1023                    ephemeral: config.ephemeral,
1024                    db: Arc::new(db),
1025                    finished_format_upgrades: Arc::new(AtomicBool::new(false)),
1026                };
1027
1028                db.assert_default_cf_is_empty();
1029
1030                db
1031            }
1032
1033            Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
1034                "Database likely already open {path:?} \
1035                         Hint: Check if another zebrad process is running."
1036            ),
1037
1038            Err(e) => panic!(
1039                "Opening database {path:?} failed. \
1040                        Hint: Try changing the state cache_dir in the Zebra config. \
1041                        Error: {e}",
1042            ),
1043        }
1044    }
1045
1046    // Accessor methods
1047
1048    /// Returns the configured database kind for this database.
1049    pub fn db_kind(&self) -> String {
1050        self.db_kind.clone()
1051    }
1052
1053    /// Returns the format version of the running code that created this `DiskDb` instance in memory.
1054    pub fn format_version_in_code(&self) -> Version {
1055        self.format_version_in_code.clone()
1056    }
1057
1058    /// Returns the fixed major version for this database.
1059    pub fn major_version(&self) -> u64 {
1060        self.format_version_in_code().major
1061    }
1062
1063    /// Returns the configured network for this database.
1064    pub fn network(&self) -> Network {
1065        self.network.clone()
1066    }
1067
1068    /// Returns the `Path` where the files used by this database are located.
1069    pub fn path(&self) -> &Path {
1070        self.db.path()
1071    }
1072
1073    /// Returns the low-level rocksdb inner database.
1074    #[allow(dead_code)]
1075    fn inner(&self) -> &Arc<DB> {
1076        &self.db
1077    }
1078
1079    /// Returns the column family handle for `cf_name`.
1080    pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
1081        // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
1082        // the shorter of &self and &str, but RocksDB clones column family names internally, so it
1083        // should just be &self. To avoid this restriction, clone the string before passing it to
1084        // this method. Currently Zebra uses static strings, so this doesn't matter.
1085        self.db.cf_handle(cf_name)
1086    }
1087
1088    // Read methods are located in the ReadDisk trait
1089
1090    // Write methods
1091    // Low-level write methods are located in the WriteDisk trait
1092
1093    /// Writes `batch` to the database.
1094    pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1095        self.db.write(batch.batch)
1096    }
1097
1098    // Private methods
1099
1100    /// Tries to reuse an existing db after a major upgrade.
1101    ///
1102    /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1103    /// db to a new path so it can be used again. It does so by merely trying to rename the path
1104    /// corresponding to the db version directly preceding the current version to the path that is
1105    /// used by the current db. If successful, it also deletes the db version file.
1106    ///
1107    /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1108    // TODO: Update this function to rename older major db format version to the current version (#9565).
1109    #[allow(clippy::unwrap_in_result)]
1110    pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1111        restorable_db_versions: &[u64],
1112        format_version_in_code: &Version,
1113        config: &Config,
1114        db_kind: impl AsRef<str>,
1115        network: &Network,
1116    ) -> Option<Version> {
1117        if let Some(&major_db_ver) = restorable_db_versions
1118            .iter()
1119            .find(|v| **v == format_version_in_code.major)
1120        {
1121            let db_kind = db_kind.as_ref();
1122
1123            let old_major_db_ver = major_db_ver - 1;
1124            let old_path = config.db_path(db_kind, old_major_db_ver, network);
1125            // Exit early if the path doesn't exist or there's an error checking it.
1126            if !fs::exists(&old_path).unwrap_or(false) {
1127                return None;
1128            }
1129
1130            let new_path = config.db_path(db_kind, major_db_ver, network);
1131
1132            let old_path = match fs::canonicalize(&old_path) {
1133                Ok(canonicalized_old_path) => canonicalized_old_path,
1134                Err(e) => {
1135                    warn!("could not canonicalize {old_path:?}: {e}");
1136                    return None;
1137                }
1138            };
1139
1140            let cache_path = match fs::canonicalize(&config.cache_dir) {
1141                Ok(canonicalized_cache_path) => canonicalized_cache_path,
1142                Err(e) => {
1143                    warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1144                    return None;
1145                }
1146            };
1147
1148            // # Correctness
1149            //
1150            // Check that the path we're about to move is inside the cache directory.
1151            //
1152            // If the user has symlinked the state directory to a non-cache directory, we don't want
1153            // to move it, because it might contain other files.
1154            //
1155            // We don't attempt to guard against malicious symlinks created by attackers
1156            // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1157            if !old_path.starts_with(&cache_path) {
1158                info!("skipped reusing previous state cache: state is outside cache directory");
1159                return None;
1160            }
1161
1162            let opts = DiskDb::options();
1163            let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1164            let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1165
1166            if old_db_exists && !new_db_exists {
1167                // Create the parent directory for the new db. This is because we can't directly
1168                // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1169                // `state/v26/` does not exist.
1170                match fs::create_dir_all(
1171                    new_path
1172                        .parent()
1173                        .expect("new state cache must have a parent path"),
1174                ) {
1175                    Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1176                    Err(e) => {
1177                        warn!(
1178                            "could not create new directory for state cache at {new_path:?}: {e}"
1179                        );
1180                        return None;
1181                    }
1182                };
1183
1184                match fs::rename(&old_path, &new_path) {
1185                    Ok(()) => {
1186                        info!("moved state cache from {old_path:?} to {new_path:?}");
1187
1188                        let mut disk_version =
1189                            database_format_version_on_disk(config, db_kind, major_db_ver, network)
1190                                .expect("unable to read database format version file")
1191                                .expect("unable to parse database format version");
1192
1193                        disk_version.major = old_major_db_ver;
1194
1195                        write_database_format_version_to_disk(
1196                            config,
1197                            db_kind,
1198                            major_db_ver,
1199                            &disk_version,
1200                            network,
1201                        )
1202                        .expect("unable to write database format version file to disk");
1203
1204                        // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1205                        // empty.
1206                        let old_path = old_path
1207                            .parent()
1208                            .expect("old state cache must have parent path");
1209
1210                        if fs::read_dir(old_path)
1211                            .expect("cached state dir needs to be readable")
1212                            .next()
1213                            .is_none()
1214                        {
1215                            match fs::remove_dir_all(old_path) {
1216                                Ok(()) => {
1217                                    info!("removed empty old state cache directory at {old_path:?}")
1218                                }
1219                                Err(e) => {
1220                                    warn!(
1221                                        "could not remove empty old state cache directory \
1222                                           at {old_path:?}: {e}"
1223                                    )
1224                                }
1225                            }
1226                        }
1227
1228                        return Some(disk_version);
1229                    }
1230                    Err(e) => {
1231                        warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1232                    }
1233                };
1234            }
1235        };
1236
1237        None
1238    }
1239
1240    /// Returns the database options for the finalized state database.
1241    fn options() -> rocksdb::Options {
1242        let mut opts = rocksdb::Options::default();
1243        let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1244
1245        const ONE_MEGABYTE: usize = 1024 * 1024;
1246
1247        opts.create_if_missing(true);
1248        opts.create_missing_column_families(true);
1249
1250        // Use the recommended Ribbon filter setting for all column families.
1251        //
1252        // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1253        // (They aren't needed for single-valued column families, but they don't hurt either.)
1254        block_based_opts.set_ribbon_filter(9.9);
1255
1256        // Use the recommended LZ4 compression type.
1257        //
1258        // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1259        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1260
1261        // Tune level-style database file compaction.
1262        //
1263        // This improves Zebra's initial sync speed slightly, as of April 2022.
1264        opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1265
1266        // Increase the process open file limit if needed,
1267        // then use it to set RocksDB's limit.
1268        let open_file_limit = DiskDb::increase_open_file_limit();
1269        let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1270
1271        // If the current limit is very large, set the DB limit using the ideal limit
1272        let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1273            .try_into()
1274            .expect("ideal open file limit fits in a c_int");
1275        let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1276
1277        opts.set_max_open_files(db_file_limit);
1278
1279        // Set the block-based options
1280        opts.set_block_based_table_factory(&block_based_opts);
1281
1282        opts
1283    }
1284
1285    /// Calculate the database's share of `open_file_limit`
1286    fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1287        // Give the DB half the files, and reserve half the files for peers
1288        (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1289    }
1290
1291    /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1292    /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1293    ///
1294    /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1295    /// unchanged.
1296    ///
1297    /// Returns the current limit, after any successful increases.
1298    ///
1299    /// # Panics
1300    ///
1301    /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1302    fn increase_open_file_limit() -> u64 {
1303        // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1304        // (`zebra-state` database).
1305        //
1306        // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1307        // both database files and TCP connections.
1308        //
1309        // But it doesn't do anything on Windows in rlimit 0.7.0.
1310        //
1311        // On Windows, the default limits are:
1312        // - 512 high-level stream I/O files (via the C standard functions),
1313        // - 8192 low-level I/O files (via the Unix C functions), and
1314        // - 1000 TCP Control Block entries (network connections).
1315        //
1316        // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1317        // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1318        //
1319        // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1320        // the Windows low-level I/O file limit.
1321        //
1322        // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1323        // functions from the `rlimit` crate only change the high-level I/O file limit.
1324        //
1325        // `zebra-network`'s default connection limit is much less than
1326        // the TCP Control Block limit on Windows.
1327
1328        // We try setting the ideal limit, then the minimum limit.
1329        let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1330            Ok(current_limit) => current_limit,
1331            Err(limit_error) => {
1332                // These errors can happen due to sandboxing or unsupported system calls,
1333                // even if the file limit is high enough.
1334                info!(
1335                    ?limit_error,
1336                    min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1337                    ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1338                    "unable to increase the open file limit, \
1339                     assuming Zebra can open a minimum number of files"
1340                );
1341
1342                return DiskDb::MIN_OPEN_FILE_LIMIT;
1343            }
1344        };
1345
1346        if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1347            panic!(
1348                "open file limit too low: \
1349                 unable to set the number of open files to {}, \
1350                 the minimum number of files required by Zebra. \
1351                 Current limit is {:?}. \
1352                 Hint: Increase the open file limit to {} before launching Zebra",
1353                DiskDb::MIN_OPEN_FILE_LIMIT,
1354                current_limit,
1355                DiskDb::IDEAL_OPEN_FILE_LIMIT
1356            );
1357        } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1358            warn!(
1359                ?current_limit,
1360                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1361                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1362                "the maximum number of open files is below Zebra's ideal limit. \
1363                 Hint: Increase the open file limit to {} before launching Zebra",
1364                DiskDb::IDEAL_OPEN_FILE_LIMIT
1365            );
1366        } else if cfg!(windows) {
1367            // This log is verbose during tests.
1368            #[cfg(not(test))]
1369            info!(
1370                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1371                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1372                "assuming the open file limit is high enough for Zebra",
1373            );
1374            #[cfg(test)]
1375            debug!(
1376                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1377                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1378                "assuming the open file limit is high enough for Zebra",
1379            );
1380        } else {
1381            #[cfg(not(test))]
1382            debug!(
1383                ?current_limit,
1384                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1385                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1386                "the open file limit is high enough for Zebra",
1387            );
1388            #[cfg(test)]
1389            debug!(
1390                ?current_limit,
1391                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1392                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1393                "the open file limit is high enough for Zebra",
1394            );
1395        }
1396
1397        current_limit
1398    }
1399
1400    // Cleanup methods
1401
1402    /// Returns the number of shared instances of this database.
1403    ///
1404    /// # Concurrency
1405    ///
1406    /// The actual number of owners can be higher or lower than the returned value,
1407    /// because databases can simultaneously be cloned or dropped in other threads.
1408    ///
1409    /// However, if the number of owners is 1, and the caller has exclusive access,
1410    /// the count can't increase unless that caller clones the database.
1411    pub(crate) fn shared_database_owners(&self) -> usize {
1412        Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1413    }
1414
1415    /// Shut down the database, cleaning up background tasks and ephemeral data.
1416    ///
1417    /// If `force` is true, clean up regardless of any shared references.
1418    /// `force` can cause errors accessing the database from other shared references.
1419    /// It should only be used in debugging or test code, immediately before a manual shutdown.
1420    ///
1421    /// TODO: make private after the stop height check has moved to the syncer (#3442)
1422    ///       move shutting down the database to a blocking thread (#2188)
1423    pub(crate) fn shutdown(&mut self, force: bool) {
1424        // # Correctness
1425        //
1426        // If we're the only owner of the shared database instance,
1427        // then there are no other threads that can increase the strong or weak count.
1428        //
1429        // ## Implementation Requirements
1430        //
1431        // This function and all functions that it calls should avoid cloning the shared database
1432        // instance. If they do, they must drop it before:
1433        // - shutting down database threads, or
1434        // - deleting database files.
1435
1436        if self.shared_database_owners() > 1 {
1437            let path = self.path();
1438
1439            let mut ephemeral_note = "";
1440
1441            if force {
1442                if self.ephemeral {
1443                    ephemeral_note = " and removing ephemeral files";
1444                }
1445
1446                // This log is verbose during tests.
1447                #[cfg(not(test))]
1448                info!(
1449                    ?path,
1450                    "forcing shutdown{} of a state database with multiple active instances",
1451                    ephemeral_note,
1452                );
1453                #[cfg(test)]
1454                debug!(
1455                    ?path,
1456                    "forcing shutdown{} of a state database with multiple active instances",
1457                    ephemeral_note,
1458                );
1459            } else {
1460                if self.ephemeral {
1461                    ephemeral_note = " and files";
1462                }
1463
1464                debug!(
1465                    ?path,
1466                    "dropping DiskDb clone, \
1467                     but keeping shared database instance{} until the last reference is dropped",
1468                    ephemeral_note,
1469                );
1470                return;
1471            }
1472        }
1473
1474        self.assert_default_cf_is_empty();
1475
1476        // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1477        //
1478        // Zebra's data should be fine if we don't clean up, because:
1479        // - the database flushes regularly anyway
1480        // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1481        // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1482        let path = self.path();
1483        debug!(?path, "flushing database to disk");
1484
1485        // These flushes can fail during forced shutdown or during Drop after a shutdown,
1486        // particularly in tests. If they fail, there's nothing we can do about it anyway.
1487        if let Err(error) = self.db.flush() {
1488            let error = format!("{error:?}");
1489            if error.to_ascii_lowercase().contains("shutdown in progress") {
1490                debug!(
1491                    ?error,
1492                    ?path,
1493                    "expected shutdown error flushing database SST files to disk"
1494                );
1495            } else {
1496                info!(
1497                    ?error,
1498                    ?path,
1499                    "unexpected error flushing database SST files to disk during shutdown"
1500                );
1501            }
1502        }
1503
1504        if let Err(error) = self.db.flush_wal(true) {
1505            let error = format!("{error:?}");
1506            if error.to_ascii_lowercase().contains("shutdown in progress") {
1507                debug!(
1508                    ?error,
1509                    ?path,
1510                    "expected shutdown error flushing database WAL buffer to disk"
1511                );
1512            } else {
1513                info!(
1514                    ?error,
1515                    ?path,
1516                    "unexpected error flushing database WAL buffer to disk during shutdown"
1517                );
1518            }
1519        }
1520
1521        // # Memory Safety
1522        //
1523        // We'd like to call `cancel_all_background_work()` before Zebra exits,
1524        // but when we call it, we get memory, thread, or C++ errors when the process exits.
1525        // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1526        // all the threads have cleaned up.)
1527        //
1528        // # Change History
1529        //
1530        // We've changed this setting multiple times since 2021, in response to new RocksDB
1531        // and Rust compiler behaviour.
1532        //
1533        // We enabled cancel_all_background_work() due to failures on:
1534        // - Rust 1.57 on Linux
1535        //
1536        // We disabled cancel_all_background_work() due to failures on:
1537        // - Rust 1.64 on Linux
1538        //
1539        // We tried enabling cancel_all_background_work() due to failures on:
1540        // - Rust 1.70 on macOS 12.6.5 on x86_64
1541        // but it didn't stop the aborts happening (PR #6820).
1542        //
1543        // There weren't any failures with cancel_all_background_work() disabled on:
1544        // - Rust 1.69 or earlier
1545        // - Linux with Rust 1.70
1546        // And with cancel_all_background_work() enabled or disabled on:
1547        // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1548        //
1549        // # Detailed Description
1550        //
1551        // We see these kinds of errors:
1552        // ```
1553        // pthread lock: Invalid argument
1554        // pure virtual method called
1555        // terminate called without an active exception
1556        // pthread destroy mutex: Device or resource busy
1557        // Aborted (core dumped)
1558        // signal: 6, SIGABRT: process abort signal
1559        // signal: 11, SIGSEGV: invalid memory reference
1560        // ```
1561        //
1562        // # Reference
1563        //
1564        // The RocksDB wiki says:
1565        // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1566        // >
1567        // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1568        // > You can speed up the waiting by calling CancelAllBackgroundWork().
1569        //
1570        // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1571        //
1572        // > rocksdb::DB instances need to be destroyed before your main function exits.
1573        // > RocksDB instances usually depend on some internal static variables.
1574        // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1575        //
1576        // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1577        //
1578        // # TODO
1579        //
1580        // Try re-enabling this code and fixing the underlying concurrency bug.
1581        //
1582        //info!(?path, "stopping background database tasks");
1583        //self.db.cancel_all_background_work(true);
1584
1585        // We'd like to drop the database before deleting its files,
1586        // because that closes the column families and the database correctly.
1587        // But Rust's ownership rules make that difficult,
1588        // so we just flush and delete ephemeral data instead.
1589        //
1590        // This implementation doesn't seem to cause any issues,
1591        // and the RocksDB Drop implementation handles any cleanup.
1592        self.delete_ephemeral();
1593    }
1594
1595    /// If the database is `ephemeral`, delete its files.
1596    fn delete_ephemeral(&mut self) {
1597        // # Correctness
1598        //
1599        // This function and all functions that it calls should avoid cloning the shared database
1600        // instance. See `shutdown()` for details.
1601
1602        if !self.ephemeral {
1603            return;
1604        }
1605
1606        let path = self.path();
1607
1608        // This log is verbose during tests.
1609        #[cfg(not(test))]
1610        info!(?path, "removing temporary database files");
1611        #[cfg(test)]
1612        debug!(?path, "removing temporary database files");
1613
1614        // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1615        // but the Zcash blockchain might not fit in memory. So we just
1616        // delete the database files instead.
1617        //
1618        // We'd also like to call `DB::destroy` here, but calling destroy on a
1619        // live DB is undefined behaviour:
1620        // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1621        //
1622        // So we assume that all the database files are under `path`, and
1623        // delete them using standard filesystem APIs. Deleting open files
1624        // might cause errors on non-Unix platforms, so we ignore the result.
1625        // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1626        let result = std::fs::remove_dir_all(path);
1627
1628        if result.is_err() {
1629            // This log is verbose during tests.
1630            #[cfg(not(test))]
1631            info!(
1632                ?result,
1633                ?path,
1634                "removing temporary database files caused an error",
1635            );
1636            #[cfg(test)]
1637            debug!(
1638                ?result,
1639                ?path,
1640                "removing temporary database files caused an error",
1641            );
1642        } else {
1643            debug!(
1644                ?result,
1645                ?path,
1646                "successfully removed temporary database files",
1647            );
1648        }
1649    }
1650
1651    /// Check that the "default" column family is empty.
1652    ///
1653    /// # Panics
1654    ///
1655    /// If Zebra has a bug where it is storing data in the wrong column family.
1656    fn assert_default_cf_is_empty(&self) {
1657        // # Correctness
1658        //
1659        // This function and all functions that it calls should avoid cloning the shared database
1660        // instance. See `shutdown()` for details.
1661
1662        if let Some(default_cf) = self.cf_handle("default") {
1663            assert!(
1664                self.zs_is_empty(&default_cf),
1665                "Zebra should not store data in the 'default' column family"
1666            );
1667        }
1668    }
1669
1670    // Validates a cache directory and creates it if it doesn't exist.
1671    // If the directory cannot be created, it panics with a specific error message.
1672    fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1673        if let Err(e) = fs::create_dir_all(cache_dir) {
1674            match e.kind() {
1675                std::io::ErrorKind::PermissionDenied => panic!(
1676                    "Permission denied creating {cache_dir:?}. \
1677                     Hint: check if cache directory exist and has write permissions."
1678                ),
1679                std::io::ErrorKind::StorageFull => panic!(
1680                    "No space left on device creating {cache_dir:?}. \
1681                     Hint: check if the disk is full."
1682                ),
1683                _ => panic!("Could not create cache dir {cache_dir:?}: {e}"),
1684            }
1685        }
1686    }
1687}
1688
1689impl Drop for DiskDb {
1690    fn drop(&mut self) {
1691        let path = self.path();
1692        debug!(?path, "dropping DiskDb instance");
1693
1694        self.shutdown(false);
1695    }
1696}