zebra_state/service/finalized_state/disk_db.rs
1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//! ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14 collections::{BTreeMap, HashMap},
15 fmt::{Debug, Write},
16 fs,
17 ops::RangeBounds,
18 path::Path,
19 sync::{
20 atomic::{self, AtomicBool},
21 Arc,
22 },
23};
24
25use itertools::Itertools;
26use rlimit::increase_nofile_limit;
27
28use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
29use semver::Version;
30use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
31
32use crate::{
33 database_format_version_on_disk,
34 service::finalized_state::disk_format::{FromDisk, IntoDisk},
35 write_database_format_version_to_disk, Config,
36};
37
38use super::zebra_db::transparent::{
39 fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
40 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
41};
42// Doc-only imports
43#[allow(unused_imports)]
44use super::{TypedColumnFamily, WriteTypedBatch};
45
46#[cfg(any(test, feature = "proptest-impl"))]
47mod tests;
48
49/// The [`rocksdb::ThreadMode`] used by the database.
50pub type DBThreadMode = rocksdb::SingleThreaded;
51
52/// The [`rocksdb`] database type, including thread mode.
53///
54/// Also the [`rocksdb::DBAccess`] used by database iterators.
55pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
56
57/// Wrapper struct to ensure low-level database access goes through the correct API.
58///
59/// `rocksdb` allows concurrent writes through a shared reference,
60/// so database instances are cloneable. When the final clone is dropped,
61/// the database is closed.
62///
63/// # Correctness
64///
65/// Reading transactions from the database using RocksDB iterators causes hangs.
66/// But creating iterators and reading the tip height works fine.
67///
68/// So these hangs are probably caused by holding column family locks to read:
69/// - multiple values, or
70/// - large values.
71///
72/// This bug might be fixed by moving database operations to blocking threads (#2188),
73/// so that they don't block the tokio executor.
74/// (Or it might be fixed by future RocksDB upgrades.)
75#[derive(Clone, Debug)]
76pub struct DiskDb {
77 // Configuration
78 //
79 // This configuration cannot be modified after the database is initialized,
80 // because some clones would have different values.
81 //
82 /// The configured database kind for this database.
83 db_kind: String,
84
85 /// The format version of the running Zebra code.
86 format_version_in_code: Version,
87
88 /// The configured network for this database.
89 network: Network,
90
91 /// The configured temporary database setting.
92 ///
93 /// If true, the database files are deleted on drop.
94 ephemeral: bool,
95
96 /// A boolean flag indicating whether the db format change task has finished
97 /// applying any format changes that may have been required.
98 finished_format_upgrades: Arc<AtomicBool>,
99
100 // Owned State
101 //
102 // Everything contained in this state must be shared by all clones, or read-only.
103 //
104 /// The shared inner RocksDB database.
105 ///
106 /// RocksDB allows reads and writes via a shared reference.
107 ///
108 /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
109 /// column family changes and [`Drop`] require exclusive access.
110 ///
111 /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
112 /// only [`Drop`] requires exclusive access.
113 db: Arc<DB>,
114}
115
116/// Wrapper struct to ensure low-level database writes go through the correct API.
117///
118/// [`rocksdb::WriteBatch`] is a batched set of database updates,
119/// which must be written to the database using `DiskDb::write(batch)`.
120#[must_use = "batches must be written to the database"]
121#[derive(Default)]
122pub struct DiskWriteBatch {
123 /// The inner RocksDB write batch.
124 batch: rocksdb::WriteBatch,
125}
126
127impl Debug for DiskWriteBatch {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("DiskWriteBatch")
130 .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
131 .finish()
132 }
133}
134
135impl PartialEq for DiskWriteBatch {
136 fn eq(&self, other: &Self) -> bool {
137 self.batch.data() == other.batch.data()
138 }
139}
140
141impl Eq for DiskWriteBatch {}
142
143/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
144///
145/// # Deprecation
146///
147/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
148//
149// TODO: replace uses of this trait with WriteTypedBatch,
150// implement these methods directly on WriteTypedBatch, and delete the trait.
151pub trait WriteDisk {
152 /// Serialize and insert the given key and value into a rocksdb column family,
153 /// overwriting any existing `value` for `key`.
154 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
155 where
156 C: rocksdb::AsColumnFamilyRef,
157 K: IntoDisk + Debug,
158 V: IntoDisk;
159
160 /// Serialize and merge the given key and value into a rocksdb column family,
161 /// merging with any existing `value` for `key`.
162 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
163 where
164 C: rocksdb::AsColumnFamilyRef,
165 K: IntoDisk + Debug,
166 V: IntoDisk;
167
168 /// Remove the given key from a rocksdb column family, if it exists.
169 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
170 where
171 C: rocksdb::AsColumnFamilyRef,
172 K: IntoDisk + Debug;
173
174 /// Delete the given key range from a rocksdb column family, if it exists, including `from`
175 /// and excluding `until_strictly_before`.
176 //
177 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
178 // see zs_range_iter() for an example of the edge cases
179 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
180 where
181 C: rocksdb::AsColumnFamilyRef,
182 K: IntoDisk + Debug;
183}
184
185/// # Deprecation
186///
187/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
188//
189// TODO: replace uses of these impls with WriteTypedBatch,
190// implement these methods directly on WriteTypedBatch, and delete the trait.
191impl WriteDisk for DiskWriteBatch {
192 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
193 where
194 C: rocksdb::AsColumnFamilyRef,
195 K: IntoDisk + Debug,
196 V: IntoDisk,
197 {
198 let key_bytes = key.as_bytes();
199 let value_bytes = value.as_bytes();
200 self.batch.put_cf(cf, key_bytes, value_bytes);
201 }
202
203 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
204 where
205 C: rocksdb::AsColumnFamilyRef,
206 K: IntoDisk + Debug,
207 V: IntoDisk,
208 {
209 let key_bytes = key.as_bytes();
210 let value_bytes = value.as_bytes();
211 self.batch.merge_cf(cf, key_bytes, value_bytes);
212 }
213
214 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
215 where
216 C: rocksdb::AsColumnFamilyRef,
217 K: IntoDisk + Debug,
218 {
219 let key_bytes = key.as_bytes();
220 self.batch.delete_cf(cf, key_bytes);
221 }
222
223 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
224 // see zs_range_iter() for an example of the edge cases
225 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
226 where
227 C: rocksdb::AsColumnFamilyRef,
228 K: IntoDisk + Debug,
229 {
230 let from_bytes = from.as_bytes();
231 let until_strictly_before_bytes = until_strictly_before.as_bytes();
232 self.batch
233 .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
234 }
235}
236
237// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
238impl<T> WriteDisk for &mut T
239where
240 T: WriteDisk,
241{
242 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
243 where
244 C: rocksdb::AsColumnFamilyRef,
245 K: IntoDisk + Debug,
246 V: IntoDisk,
247 {
248 (*self).zs_insert(cf, key, value)
249 }
250
251 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
252 where
253 C: rocksdb::AsColumnFamilyRef,
254 K: IntoDisk + Debug,
255 V: IntoDisk,
256 {
257 (*self).zs_merge(cf, key, value)
258 }
259
260 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
261 where
262 C: rocksdb::AsColumnFamilyRef,
263 K: IntoDisk + Debug,
264 {
265 (*self).zs_delete(cf, key)
266 }
267
268 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
269 where
270 C: rocksdb::AsColumnFamilyRef,
271 K: IntoDisk + Debug,
272 {
273 (*self).zs_delete_range(cf, from, until_strictly_before)
274 }
275}
276
277/// Helper trait for retrieving and deserializing values from rocksdb column families.
278///
279/// # Deprecation
280///
281/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
282//
283// TODO: replace uses of this trait with TypedColumnFamily,
284// implement these methods directly on DiskDb, and delete the trait.
285pub trait ReadDisk {
286 /// Returns true if a rocksdb column family `cf` does not contain any entries.
287 fn zs_is_empty<C>(&self, cf: &C) -> bool
288 where
289 C: rocksdb::AsColumnFamilyRef;
290
291 /// Returns the value for `key` in the rocksdb column family `cf`, if present.
292 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
293 where
294 C: rocksdb::AsColumnFamilyRef,
295 K: IntoDisk,
296 V: FromDisk;
297
298 /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
299 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
300 where
301 C: rocksdb::AsColumnFamilyRef,
302 K: IntoDisk;
303
304 /// Returns the lowest key in `cf`, and the corresponding value.
305 ///
306 /// Returns `None` if the column family is empty.
307 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
308 where
309 C: rocksdb::AsColumnFamilyRef,
310 K: IntoDisk + FromDisk,
311 V: FromDisk;
312
313 /// Returns the highest key in `cf`, and the corresponding value.
314 ///
315 /// Returns `None` if the column family is empty.
316 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
317 where
318 C: rocksdb::AsColumnFamilyRef,
319 K: IntoDisk + FromDisk,
320 V: FromDisk;
321
322 /// Returns the first key greater than or equal to `lower_bound` in `cf`,
323 /// and the corresponding value.
324 ///
325 /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
326 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
327 where
328 C: rocksdb::AsColumnFamilyRef,
329 K: IntoDisk + FromDisk,
330 V: FromDisk;
331
332 /// Returns the first key strictly greater than `lower_bound` in `cf`,
333 /// and the corresponding value.
334 ///
335 /// Returns `None` if there are no keys greater than `lower_bound`.
336 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
337 where
338 C: rocksdb::AsColumnFamilyRef,
339 K: IntoDisk + FromDisk,
340 V: FromDisk;
341
342 /// Returns the first key less than or equal to `upper_bound` in `cf`,
343 /// and the corresponding value.
344 ///
345 /// Returns `None` if there are no keys less than or equal to `upper_bound`.
346 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
347 where
348 C: rocksdb::AsColumnFamilyRef,
349 K: IntoDisk + FromDisk,
350 V: FromDisk;
351
352 /// Returns the first key strictly less than `upper_bound` in `cf`,
353 /// and the corresponding value.
354 ///
355 /// Returns `None` if there are no keys less than `upper_bound`.
356 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
357 where
358 C: rocksdb::AsColumnFamilyRef,
359 K: IntoDisk + FromDisk,
360 V: FromDisk;
361
362 /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
363 ///
364 /// Holding this iterator open might delay block commit transactions.
365 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
366 where
367 C: rocksdb::AsColumnFamilyRef,
368 K: IntoDisk + FromDisk + Ord,
369 V: FromDisk,
370 R: RangeBounds<K>;
371
372 /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
373 ///
374 /// Holding this iterator open might delay block commit transactions.
375 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
376 where
377 C: rocksdb::AsColumnFamilyRef,
378 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
379 V: FromDisk,
380 R: RangeBounds<K>;
381}
382
383impl PartialEq for DiskDb {
384 fn eq(&self, other: &Self) -> bool {
385 if self.db.path() == other.db.path() {
386 assert_eq!(
387 self.network, other.network,
388 "database with same path but different network configs",
389 );
390 assert_eq!(
391 self.ephemeral, other.ephemeral,
392 "database with same path but different ephemeral configs",
393 );
394
395 return true;
396 }
397
398 false
399 }
400}
401
402impl Eq for DiskDb {}
403
404/// # Deprecation
405///
406/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
407//
408// TODO: replace uses of these impls with TypedColumnFamily,
409// implement these methods directly on DiskDb, and delete the trait.
410impl ReadDisk for DiskDb {
411 fn zs_is_empty<C>(&self, cf: &C) -> bool
412 where
413 C: rocksdb::AsColumnFamilyRef,
414 {
415 // Empty column families return invalid forward iterators.
416 //
417 // Checking iterator validity does not seem to cause database hangs.
418 let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
419 let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
420
421 !raw_iterator.valid()
422 }
423
424 #[allow(clippy::unwrap_in_result)]
425 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
426 where
427 C: rocksdb::AsColumnFamilyRef,
428 K: IntoDisk,
429 V: FromDisk,
430 {
431 let key_bytes = key.as_bytes();
432
433 // We use `get_pinned_cf` to avoid taking ownership of the serialized
434 // value, because we're going to deserialize it anyways, which avoids an
435 // extra copy
436 let value_bytes = self
437 .db
438 .get_pinned_cf(cf, key_bytes)
439 .expect("unexpected database failure");
440
441 value_bytes.map(V::from_bytes)
442 }
443
444 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
445 where
446 C: rocksdb::AsColumnFamilyRef,
447 K: IntoDisk,
448 {
449 let key_bytes = key.as_bytes();
450
451 // We use `get_pinned_cf` to avoid taking ownership of the serialized
452 // value, because we don't use the value at all. This avoids an extra copy.
453 self.db
454 .get_pinned_cf(cf, key_bytes)
455 .expect("unexpected database failure")
456 .is_some()
457 }
458
459 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
460 where
461 C: rocksdb::AsColumnFamilyRef,
462 K: IntoDisk + FromDisk,
463 V: FromDisk,
464 {
465 // Reading individual values from iterators does not seem to cause database hangs.
466 self.zs_forward_range_iter(cf, ..).next()
467 }
468
469 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
470 where
471 C: rocksdb::AsColumnFamilyRef,
472 K: IntoDisk + FromDisk,
473 V: FromDisk,
474 {
475 // Reading individual values from iterators does not seem to cause database hangs.
476 self.zs_reverse_range_iter(cf, ..).next()
477 }
478
479 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
480 where
481 C: rocksdb::AsColumnFamilyRef,
482 K: IntoDisk + FromDisk,
483 V: FromDisk,
484 {
485 self.zs_forward_range_iter(cf, lower_bound..).next()
486 }
487
488 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
489 where
490 C: rocksdb::AsColumnFamilyRef,
491 K: IntoDisk + FromDisk,
492 V: FromDisk,
493 {
494 use std::ops::Bound::*;
495
496 // There is no standard syntax for an excluded start bound.
497 self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
498 .next()
499 }
500
501 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
502 where
503 C: rocksdb::AsColumnFamilyRef,
504 K: IntoDisk + FromDisk,
505 V: FromDisk,
506 {
507 self.zs_reverse_range_iter(cf, ..=upper_bound).next()
508 }
509
510 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
511 where
512 C: rocksdb::AsColumnFamilyRef,
513 K: IntoDisk + FromDisk,
514 V: FromDisk,
515 {
516 self.zs_reverse_range_iter(cf, ..upper_bound).next()
517 }
518
519 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
520 where
521 C: rocksdb::AsColumnFamilyRef,
522 K: IntoDisk + FromDisk + Ord,
523 V: FromDisk,
524 R: RangeBounds<K>,
525 {
526 self.zs_forward_range_iter(cf, range).collect()
527 }
528
529 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
530 where
531 C: rocksdb::AsColumnFamilyRef,
532 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
533 V: FromDisk,
534 R: RangeBounds<K>,
535 {
536 self.zs_forward_range_iter(cf, range).collect()
537 }
538}
539
540impl DiskWriteBatch {
541 /// Creates and returns a new transactional batch write.
542 ///
543 /// # Correctness
544 ///
545 /// Each block must be written to the state inside a batch, so that:
546 /// - concurrent `ReadStateService` queries don't see half-written blocks, and
547 /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
548 pub fn new() -> Self {
549 DiskWriteBatch {
550 batch: rocksdb::WriteBatch::default(),
551 }
552 }
553}
554
555impl DiskDb {
556 /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
557 pub fn print_db_metrics(&self) {
558 let mut total_size_on_disk = 0;
559 let mut total_live_size_on_disk = 0;
560 let mut total_size_in_mem = 0;
561 let db: &Arc<DB> = &self.db;
562 let db_options = DiskDb::options();
563 let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
564 let mut column_families_log_string = String::from("");
565
566 write!(column_families_log_string, "Column families and sizes: ").unwrap();
567
568 for cf_descriptor in column_families {
569 let cf_name = &cf_descriptor.name();
570 let cf_handle = db
571 .cf_handle(cf_name)
572 .expect("Column family handle must exist");
573 let live_data_size = db
574 .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
575 .unwrap_or(Some(0));
576 let total_sst_files_size = db
577 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
578 .unwrap_or(Some(0));
579 let cf_disk_size = total_sst_files_size.unwrap_or(0);
580 total_size_on_disk += cf_disk_size;
581 total_live_size_on_disk += live_data_size.unwrap_or(0);
582 let mem_table_size = db
583 .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
584 .unwrap_or(Some(0));
585 total_size_in_mem += mem_table_size.unwrap_or(0);
586
587 write!(
588 column_families_log_string,
589 "{} (Disk: {}, Memory: {})",
590 cf_name,
591 human_bytes::human_bytes(cf_disk_size as f64),
592 human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
593 )
594 .unwrap();
595 }
596
597 debug!("{}", column_families_log_string);
598 info!(
599 "Total Database Disk Size: {}",
600 human_bytes::human_bytes(total_size_on_disk as f64)
601 );
602 info!(
603 "Total Live Data Disk Size: {}",
604 human_bytes::human_bytes(total_live_size_on_disk as f64)
605 );
606 info!(
607 "Total Database Memory Size: {}",
608 human_bytes::human_bytes(total_size_in_mem as f64)
609 );
610 }
611
612 /// Exports RocksDB metrics to Prometheus.
613 ///
614 /// This function collects database statistics and exposes them as Prometheus metrics.
615 /// Call this periodically (e.g., every 30 seconds) from a background task.
616 pub(crate) fn export_metrics(&self) {
617 let db: &Arc<DB> = &self.db;
618 let db_options = DiskDb::options();
619 let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
620
621 let mut total_disk: u64 = 0;
622 let mut total_live: u64 = 0;
623 let mut total_mem: u64 = 0;
624
625 for cf_descriptor in column_families {
626 let cf_name = cf_descriptor.name().to_string();
627 if let Some(cf_handle) = db.cf_handle(&cf_name) {
628 let disk = db
629 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
630 .ok()
631 .flatten()
632 .unwrap_or(0);
633 let live = db
634 .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
635 .ok()
636 .flatten()
637 .unwrap_or(0);
638 let mem = db
639 .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
640 .ok()
641 .flatten()
642 .unwrap_or(0);
643
644 total_disk += disk;
645 total_live += live;
646 total_mem += mem;
647
648 metrics::gauge!("zebra.state.rocksdb.cf_disk_size_bytes", "cf" => cf_name.clone())
649 .set(disk as f64);
650 metrics::gauge!("zebra.state.rocksdb.cf_memory_size_bytes", "cf" => cf_name)
651 .set(mem as f64);
652 }
653 }
654
655 metrics::gauge!("zebra.state.rocksdb.total_disk_size_bytes").set(total_disk as f64);
656 metrics::gauge!("zebra.state.rocksdb.live_data_size_bytes").set(total_live as f64);
657 metrics::gauge!("zebra.state.rocksdb.total_memory_size_bytes").set(total_mem as f64);
658
659 // Compaction metrics - these use database-wide properties (not per-column-family)
660 if let Ok(Some(pending)) = db.property_int_value("rocksdb.compaction-pending") {
661 metrics::gauge!("zebra.state.rocksdb.compaction.pending_bytes").set(pending as f64);
662 }
663
664 if let Ok(Some(running)) = db.property_int_value("rocksdb.num-running-compactions") {
665 metrics::gauge!("zebra.state.rocksdb.compaction.running").set(running as f64);
666 }
667
668 if let Ok(Some(cache)) = db.property_int_value("rocksdb.block-cache-usage") {
669 metrics::gauge!("zebra.state.rocksdb.block_cache_usage_bytes").set(cache as f64);
670 }
671
672 // Level-by-level file counts (RocksDB typically has up to 7 levels)
673 for level in 0..7 {
674 let prop = format!("rocksdb.num-files-at-level{}", level);
675 if let Ok(Some(count)) = db.property_int_value(&prop) {
676 metrics::gauge!("zebra.state.rocksdb.num_files_at_level", "level" => level.to_string())
677 .set(count as f64);
678 }
679 }
680 }
681
682 /// Returns the estimated total disk space usage of the database.
683 pub fn size(&self) -> u64 {
684 let db: &Arc<DB> = &self.db;
685 let db_options = DiskDb::options();
686 let mut total_size_on_disk = 0;
687 for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
688 let cf_name = &cf_descriptor.name();
689 let cf_handle = db
690 .cf_handle(cf_name)
691 .expect("Column family handle must exist");
692
693 total_size_on_disk += db
694 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
695 .ok()
696 .flatten()
697 .unwrap_or(0);
698 }
699
700 total_size_on_disk
701 }
702
703 /// Sets `finished_format_upgrades` to true to indicate that Zebra has
704 /// finished applying any required db format upgrades.
705 pub fn mark_finished_format_upgrades(&self) {
706 self.finished_format_upgrades
707 .store(true, atomic::Ordering::SeqCst);
708 }
709
710 /// Returns true if the `finished_format_upgrades` flag has been set to true to
711 /// indicate that Zebra has finished applying any required db format upgrades.
712 pub fn finished_format_upgrades(&self) -> bool {
713 self.finished_format_upgrades.load(atomic::Ordering::SeqCst)
714 }
715
716 /// When called with a secondary DB instance, tries to catch up with the primary DB instance
717 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
718 self.db.try_catch_up_with_primary()
719 }
720
721 /// Returns a forward iterator over the items in `cf` in `range`.
722 ///
723 /// Holding this iterator open might delay block commit transactions.
724 pub fn zs_forward_range_iter<C, K, V, R>(
725 &self,
726 cf: &C,
727 range: R,
728 ) -> impl Iterator<Item = (K, V)> + '_
729 where
730 C: rocksdb::AsColumnFamilyRef,
731 K: IntoDisk + FromDisk,
732 V: FromDisk,
733 R: RangeBounds<K>,
734 {
735 self.zs_range_iter_with_direction(cf, range, false)
736 }
737
738 /// Returns a reverse iterator over the items in `cf` in `range`.
739 ///
740 /// Holding this iterator open might delay block commit transactions.
741 pub fn zs_reverse_range_iter<C, K, V, R>(
742 &self,
743 cf: &C,
744 range: R,
745 ) -> impl Iterator<Item = (K, V)> + '_
746 where
747 C: rocksdb::AsColumnFamilyRef,
748 K: IntoDisk + FromDisk,
749 V: FromDisk,
750 R: RangeBounds<K>,
751 {
752 self.zs_range_iter_with_direction(cf, range, true)
753 }
754
755 /// Returns an iterator over the items in `cf` in `range`.
756 ///
757 /// RocksDB iterators are ordered by increasing key bytes by default.
758 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
759 ///
760 /// Holding this iterator open might delay block commit transactions.
761 fn zs_range_iter_with_direction<C, K, V, R>(
762 &self,
763 cf: &C,
764 range: R,
765 reverse: bool,
766 ) -> impl Iterator<Item = (K, V)> + '_
767 where
768 C: rocksdb::AsColumnFamilyRef,
769 K: IntoDisk + FromDisk,
770 V: FromDisk,
771 R: RangeBounds<K>,
772 {
773 use std::ops::Bound::{self, *};
774
775 // Replace with map() when it stabilises:
776 // https://github.com/rust-lang/rust/issues/86026
777 let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
778 match bound {
779 Unbounded => Unbounded,
780 Included(x) => Included(x.as_bytes().as_ref().to_vec()),
781 Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
782 }
783 };
784
785 let start_bound = map_to_vec(range.start_bound());
786 let end_bound = map_to_vec(range.end_bound());
787 let range = (start_bound, end_bound);
788
789 let mode = Self::zs_iter_mode(&range, reverse);
790 let opts = Self::zs_iter_opts(&range);
791
792 // Reading multiple items from iterators has caused database hangs,
793 // in previous RocksDB versions
794 self.db
795 .iterator_cf_opt(cf, opts, mode)
796 .map(|result| result.expect("unexpected database failure"))
797 .map(|(key, value)| (key.to_vec(), value))
798 // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
799 // strictly before the "from" bound.
800 .skip_while({
801 let range = range.clone();
802 move |(key, _value)| !range.contains(key)
803 })
804 // Take until the excluded "to" bound is reached,
805 // or we're after the included "to" bound.
806 .take_while(move |(key, _value)| range.contains(key))
807 .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
808 }
809
810 /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
811 fn zs_iter_opts<R>(range: &R) -> ReadOptions
812 where
813 R: RangeBounds<Vec<u8>>,
814 {
815 let mut opts = ReadOptions::default();
816 let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
817
818 if let Some(bound) = lower_bound {
819 opts.set_iterate_lower_bound(bound);
820 };
821
822 if let Some(bound) = upper_bound {
823 opts.set_iterate_upper_bound(bound);
824 };
825
826 opts
827 }
828
829 /// Returns a lower and upper iterate bounds for a range.
830 ///
831 /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
832 /// will increment the upper bound by 1 if the end bound of the provided range
833 /// is inclusive.
834 fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
835 where
836 R: RangeBounds<Vec<u8>>,
837 {
838 use std::ops::Bound::*;
839
840 let lower_bound = match range.start_bound() {
841 Included(bound) | Excluded(bound) => Some(bound.clone()),
842 Unbounded => None,
843 };
844
845 let upper_bound = match range.end_bound().cloned() {
846 Included(mut bound) => {
847 // Increment the last byte in the upper bound that is less than u8::MAX, and
848 // clear any bytes after it to increment the next key in lexicographic order
849 // (next big-endian number). RocksDB uses lexicographic order for keys.
850 let is_wrapped_overflow = increment_big_endian(&mut bound);
851
852 if is_wrapped_overflow {
853 bound.insert(0, 0x01)
854 }
855
856 Some(bound)
857 }
858 Excluded(bound) => Some(bound),
859 Unbounded => None,
860 };
861
862 (lower_bound, upper_bound)
863 }
864
865 /// Returns the RocksDB iterator "from" mode for `range`.
866 ///
867 /// RocksDB iterators are ordered by increasing key bytes by default.
868 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
869 fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode<'_>
870 where
871 R: RangeBounds<Vec<u8>>,
872 {
873 use std::ops::Bound::*;
874
875 let from_bound = if reverse {
876 range.end_bound()
877 } else {
878 range.start_bound()
879 };
880
881 match from_bound {
882 Unbounded => {
883 if reverse {
884 // Reversed unbounded iterators start from the last item
885 rocksdb::IteratorMode::End
886 } else {
887 // Unbounded iterators start from the first item
888 rocksdb::IteratorMode::Start
889 }
890 }
891
892 Included(bound) | Excluded(bound) => {
893 let direction = if reverse {
894 rocksdb::Direction::Reverse
895 } else {
896 rocksdb::Direction::Forward
897 };
898
899 rocksdb::IteratorMode::From(bound.as_slice(), direction)
900 }
901 }
902 }
903
904 /// The ideal open file limit for Zebra
905 const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
906
907 /// The minimum number of open files for Zebra to operate normally. Also used
908 /// as the default open file limit, when the OS doesn't tell us how many
909 /// files we can use.
910 ///
911 /// We want 100+ file descriptors for peers, and 100+ for the database.
912 ///
913 /// On Windows, the default limit is 512 high-level I/O files, and 8192
914 /// low-level I/O files:
915 /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
916 const MIN_OPEN_FILE_LIMIT: u64 = 512;
917
918 /// The number of files used internally by Zebra.
919 ///
920 /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
921 /// stdio (3), and other OS facilities (2+).
922 const RESERVED_FILE_COUNT: u64 = 48;
923
924 /// The size of the database memtable RAM cache in megabytes.
925 ///
926 /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
927 const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
928
929 /// Build a vector of current column families on the disk and optionally any new column families.
930 /// Returns an iterable collection of all column families.
931 fn construct_column_families(
932 db_options: Options,
933 path: &Path,
934 column_families_in_code: impl IntoIterator<Item = String>,
935 ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
936 // When opening the database in read/write mode, all column families must be opened.
937 //
938 // To make Zebra forward-compatible with databases updated by later versions,
939 // we read any existing column families off the disk, then add any new column families
940 // from the current implementation.
941 //
942 // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
943 let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
944 let column_families_in_code = column_families_in_code.into_iter();
945
946 column_families_on_disk
947 .into_iter()
948 .chain(column_families_in_code)
949 .unique()
950 .map(move |cf_name: String| {
951 let mut cf_options = db_options.clone();
952
953 if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
954 cf_options.set_merge_operator_associative(
955 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
956 fetch_add_balance_and_received,
957 );
958 }
959
960 rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
961 })
962 }
963
964 /// Opens or creates the database at a path based on the kind, major version and network,
965 /// with the supplied column families, preserving any existing column families,
966 /// and returns a shared low-level database wrapper.
967 ///
968 /// # Panics
969 ///
970 /// - If the cache directory does not exist and can't be created.
971 /// - If the database cannot be opened for whatever reason.
972 pub fn new(
973 config: &Config,
974 db_kind: impl AsRef<str>,
975 format_version_in_code: &Version,
976 network: &Network,
977 column_families_in_code: impl IntoIterator<Item = String>,
978 read_only: bool,
979 ) -> DiskDb {
980 // If the database is ephemeral, we don't need to check the cache directory.
981 if !config.ephemeral {
982 DiskDb::validate_cache_dir(&config.cache_dir);
983 }
984
985 let db_kind = db_kind.as_ref();
986 let path = config.db_path(db_kind, format_version_in_code.major, network);
987
988 let db_options = DiskDb::options();
989
990 let column_families =
991 DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
992
993 let db_result = if read_only {
994 // Use a tempfile for the secondary instance cache directory
995 let secondary_config = Config {
996 ephemeral: true,
997 ..config.clone()
998 };
999 let secondary_path =
1000 secondary_config.db_path("secondary_state", format_version_in_code.major, network);
1001 let create_dir_result = std::fs::create_dir_all(&secondary_path);
1002
1003 info!(?create_dir_result, "creating secondary db directory");
1004
1005 DB::open_cf_descriptors_as_secondary(
1006 &db_options,
1007 &path,
1008 &secondary_path,
1009 column_families,
1010 )
1011 } else {
1012 DB::open_cf_descriptors(&db_options, &path, column_families)
1013 };
1014
1015 match db_result {
1016 Ok(db) => {
1017 info!("Opened Zebra state cache at {}", path.display());
1018
1019 let db = DiskDb {
1020 db_kind: db_kind.to_string(),
1021 format_version_in_code: format_version_in_code.clone(),
1022 network: network.clone(),
1023 ephemeral: config.ephemeral,
1024 db: Arc::new(db),
1025 finished_format_upgrades: Arc::new(AtomicBool::new(false)),
1026 };
1027
1028 db.assert_default_cf_is_empty();
1029
1030 db
1031 }
1032
1033 Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
1034 "Database likely already open {path:?} \
1035 Hint: Check if another zebrad process is running."
1036 ),
1037
1038 Err(e) => panic!(
1039 "Opening database {path:?} failed. \
1040 Hint: Try changing the state cache_dir in the Zebra config. \
1041 Error: {e}",
1042 ),
1043 }
1044 }
1045
1046 // Accessor methods
1047
1048 /// Returns the configured database kind for this database.
1049 pub fn db_kind(&self) -> String {
1050 self.db_kind.clone()
1051 }
1052
1053 /// Returns the format version of the running code that created this `DiskDb` instance in memory.
1054 pub fn format_version_in_code(&self) -> Version {
1055 self.format_version_in_code.clone()
1056 }
1057
1058 /// Returns the fixed major version for this database.
1059 pub fn major_version(&self) -> u64 {
1060 self.format_version_in_code().major
1061 }
1062
1063 /// Returns the configured network for this database.
1064 pub fn network(&self) -> Network {
1065 self.network.clone()
1066 }
1067
1068 /// Returns the `Path` where the files used by this database are located.
1069 pub fn path(&self) -> &Path {
1070 self.db.path()
1071 }
1072
1073 /// Returns the low-level rocksdb inner database.
1074 #[allow(dead_code)]
1075 fn inner(&self) -> &Arc<DB> {
1076 &self.db
1077 }
1078
1079 /// Returns the column family handle for `cf_name`.
1080 pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
1081 // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
1082 // the shorter of &self and &str, but RocksDB clones column family names internally, so it
1083 // should just be &self. To avoid this restriction, clone the string before passing it to
1084 // this method. Currently Zebra uses static strings, so this doesn't matter.
1085 self.db.cf_handle(cf_name)
1086 }
1087
1088 // Read methods are located in the ReadDisk trait
1089
1090 // Write methods
1091 // Low-level write methods are located in the WriteDisk trait
1092
1093 /// Writes `batch` to the database.
1094 pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1095 self.db.write(batch.batch)
1096 }
1097
1098 // Private methods
1099
1100 /// Tries to reuse an existing db after a major upgrade.
1101 ///
1102 /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1103 /// db to a new path so it can be used again. It does so by merely trying to rename the path
1104 /// corresponding to the db version directly preceding the current version to the path that is
1105 /// used by the current db. If successful, it also deletes the db version file.
1106 ///
1107 /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1108 // TODO: Update this function to rename older major db format version to the current version (#9565).
1109 #[allow(clippy::unwrap_in_result)]
1110 pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1111 restorable_db_versions: &[u64],
1112 format_version_in_code: &Version,
1113 config: &Config,
1114 db_kind: impl AsRef<str>,
1115 network: &Network,
1116 ) -> Option<Version> {
1117 if let Some(&major_db_ver) = restorable_db_versions
1118 .iter()
1119 .find(|v| **v == format_version_in_code.major)
1120 {
1121 let db_kind = db_kind.as_ref();
1122
1123 let old_major_db_ver = major_db_ver - 1;
1124 let old_path = config.db_path(db_kind, old_major_db_ver, network);
1125 // Exit early if the path doesn't exist or there's an error checking it.
1126 if !fs::exists(&old_path).unwrap_or(false) {
1127 return None;
1128 }
1129
1130 let new_path = config.db_path(db_kind, major_db_ver, network);
1131
1132 let old_path = match fs::canonicalize(&old_path) {
1133 Ok(canonicalized_old_path) => canonicalized_old_path,
1134 Err(e) => {
1135 warn!("could not canonicalize {old_path:?}: {e}");
1136 return None;
1137 }
1138 };
1139
1140 let cache_path = match fs::canonicalize(&config.cache_dir) {
1141 Ok(canonicalized_cache_path) => canonicalized_cache_path,
1142 Err(e) => {
1143 warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1144 return None;
1145 }
1146 };
1147
1148 // # Correctness
1149 //
1150 // Check that the path we're about to move is inside the cache directory.
1151 //
1152 // If the user has symlinked the state directory to a non-cache directory, we don't want
1153 // to move it, because it might contain other files.
1154 //
1155 // We don't attempt to guard against malicious symlinks created by attackers
1156 // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1157 if !old_path.starts_with(&cache_path) {
1158 info!("skipped reusing previous state cache: state is outside cache directory");
1159 return None;
1160 }
1161
1162 let opts = DiskDb::options();
1163 let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1164 let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1165
1166 if old_db_exists && !new_db_exists {
1167 // Create the parent directory for the new db. This is because we can't directly
1168 // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1169 // `state/v26/` does not exist.
1170 match fs::create_dir_all(
1171 new_path
1172 .parent()
1173 .expect("new state cache must have a parent path"),
1174 ) {
1175 Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1176 Err(e) => {
1177 warn!(
1178 "could not create new directory for state cache at {new_path:?}: {e}"
1179 );
1180 return None;
1181 }
1182 };
1183
1184 match fs::rename(&old_path, &new_path) {
1185 Ok(()) => {
1186 info!("moved state cache from {old_path:?} to {new_path:?}");
1187
1188 let mut disk_version =
1189 database_format_version_on_disk(config, db_kind, major_db_ver, network)
1190 .expect("unable to read database format version file")
1191 .expect("unable to parse database format version");
1192
1193 disk_version.major = old_major_db_ver;
1194
1195 write_database_format_version_to_disk(
1196 config,
1197 db_kind,
1198 major_db_ver,
1199 &disk_version,
1200 network,
1201 )
1202 .expect("unable to write database format version file to disk");
1203
1204 // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1205 // empty.
1206 let old_path = old_path
1207 .parent()
1208 .expect("old state cache must have parent path");
1209
1210 if fs::read_dir(old_path)
1211 .expect("cached state dir needs to be readable")
1212 .next()
1213 .is_none()
1214 {
1215 match fs::remove_dir_all(old_path) {
1216 Ok(()) => {
1217 info!("removed empty old state cache directory at {old_path:?}")
1218 }
1219 Err(e) => {
1220 warn!(
1221 "could not remove empty old state cache directory \
1222 at {old_path:?}: {e}"
1223 )
1224 }
1225 }
1226 }
1227
1228 return Some(disk_version);
1229 }
1230 Err(e) => {
1231 warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1232 }
1233 };
1234 }
1235 };
1236
1237 None
1238 }
1239
1240 /// Returns the database options for the finalized state database.
1241 fn options() -> rocksdb::Options {
1242 let mut opts = rocksdb::Options::default();
1243 let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1244
1245 const ONE_MEGABYTE: usize = 1024 * 1024;
1246
1247 opts.create_if_missing(true);
1248 opts.create_missing_column_families(true);
1249
1250 // Use the recommended Ribbon filter setting for all column families.
1251 //
1252 // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1253 // (They aren't needed for single-valued column families, but they don't hurt either.)
1254 block_based_opts.set_ribbon_filter(9.9);
1255
1256 // Use the recommended LZ4 compression type.
1257 //
1258 // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1259 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1260
1261 // Tune level-style database file compaction.
1262 //
1263 // This improves Zebra's initial sync speed slightly, as of April 2022.
1264 opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1265
1266 // Increase the process open file limit if needed,
1267 // then use it to set RocksDB's limit.
1268 let open_file_limit = DiskDb::increase_open_file_limit();
1269 let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1270
1271 // If the current limit is very large, set the DB limit using the ideal limit
1272 let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1273 .try_into()
1274 .expect("ideal open file limit fits in a c_int");
1275 let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1276
1277 opts.set_max_open_files(db_file_limit);
1278
1279 // Set the block-based options
1280 opts.set_block_based_table_factory(&block_based_opts);
1281
1282 opts
1283 }
1284
1285 /// Calculate the database's share of `open_file_limit`
1286 fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1287 // Give the DB half the files, and reserve half the files for peers
1288 (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1289 }
1290
1291 /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1292 /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1293 ///
1294 /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1295 /// unchanged.
1296 ///
1297 /// Returns the current limit, after any successful increases.
1298 ///
1299 /// # Panics
1300 ///
1301 /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1302 fn increase_open_file_limit() -> u64 {
1303 // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1304 // (`zebra-state` database).
1305 //
1306 // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1307 // both database files and TCP connections.
1308 //
1309 // But it doesn't do anything on Windows in rlimit 0.7.0.
1310 //
1311 // On Windows, the default limits are:
1312 // - 512 high-level stream I/O files (via the C standard functions),
1313 // - 8192 low-level I/O files (via the Unix C functions), and
1314 // - 1000 TCP Control Block entries (network connections).
1315 //
1316 // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1317 // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1318 //
1319 // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1320 // the Windows low-level I/O file limit.
1321 //
1322 // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1323 // functions from the `rlimit` crate only change the high-level I/O file limit.
1324 //
1325 // `zebra-network`'s default connection limit is much less than
1326 // the TCP Control Block limit on Windows.
1327
1328 // We try setting the ideal limit, then the minimum limit.
1329 let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1330 Ok(current_limit) => current_limit,
1331 Err(limit_error) => {
1332 // These errors can happen due to sandboxing or unsupported system calls,
1333 // even if the file limit is high enough.
1334 info!(
1335 ?limit_error,
1336 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1337 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1338 "unable to increase the open file limit, \
1339 assuming Zebra can open a minimum number of files"
1340 );
1341
1342 return DiskDb::MIN_OPEN_FILE_LIMIT;
1343 }
1344 };
1345
1346 if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1347 panic!(
1348 "open file limit too low: \
1349 unable to set the number of open files to {}, \
1350 the minimum number of files required by Zebra. \
1351 Current limit is {:?}. \
1352 Hint: Increase the open file limit to {} before launching Zebra",
1353 DiskDb::MIN_OPEN_FILE_LIMIT,
1354 current_limit,
1355 DiskDb::IDEAL_OPEN_FILE_LIMIT
1356 );
1357 } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1358 warn!(
1359 ?current_limit,
1360 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1361 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1362 "the maximum number of open files is below Zebra's ideal limit. \
1363 Hint: Increase the open file limit to {} before launching Zebra",
1364 DiskDb::IDEAL_OPEN_FILE_LIMIT
1365 );
1366 } else if cfg!(windows) {
1367 // This log is verbose during tests.
1368 #[cfg(not(test))]
1369 info!(
1370 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1371 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1372 "assuming the open file limit is high enough for Zebra",
1373 );
1374 #[cfg(test)]
1375 debug!(
1376 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1377 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1378 "assuming the open file limit is high enough for Zebra",
1379 );
1380 } else {
1381 #[cfg(not(test))]
1382 debug!(
1383 ?current_limit,
1384 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1385 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1386 "the open file limit is high enough for Zebra",
1387 );
1388 #[cfg(test)]
1389 debug!(
1390 ?current_limit,
1391 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1392 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1393 "the open file limit is high enough for Zebra",
1394 );
1395 }
1396
1397 current_limit
1398 }
1399
1400 // Cleanup methods
1401
1402 /// Returns the number of shared instances of this database.
1403 ///
1404 /// # Concurrency
1405 ///
1406 /// The actual number of owners can be higher or lower than the returned value,
1407 /// because databases can simultaneously be cloned or dropped in other threads.
1408 ///
1409 /// However, if the number of owners is 1, and the caller has exclusive access,
1410 /// the count can't increase unless that caller clones the database.
1411 pub(crate) fn shared_database_owners(&self) -> usize {
1412 Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1413 }
1414
1415 /// Shut down the database, cleaning up background tasks and ephemeral data.
1416 ///
1417 /// If `force` is true, clean up regardless of any shared references.
1418 /// `force` can cause errors accessing the database from other shared references.
1419 /// It should only be used in debugging or test code, immediately before a manual shutdown.
1420 ///
1421 /// TODO: make private after the stop height check has moved to the syncer (#3442)
1422 /// move shutting down the database to a blocking thread (#2188)
1423 pub(crate) fn shutdown(&mut self, force: bool) {
1424 // # Correctness
1425 //
1426 // If we're the only owner of the shared database instance,
1427 // then there are no other threads that can increase the strong or weak count.
1428 //
1429 // ## Implementation Requirements
1430 //
1431 // This function and all functions that it calls should avoid cloning the shared database
1432 // instance. If they do, they must drop it before:
1433 // - shutting down database threads, or
1434 // - deleting database files.
1435
1436 if self.shared_database_owners() > 1 {
1437 let path = self.path();
1438
1439 let mut ephemeral_note = "";
1440
1441 if force {
1442 if self.ephemeral {
1443 ephemeral_note = " and removing ephemeral files";
1444 }
1445
1446 // This log is verbose during tests.
1447 #[cfg(not(test))]
1448 info!(
1449 ?path,
1450 "forcing shutdown{} of a state database with multiple active instances",
1451 ephemeral_note,
1452 );
1453 #[cfg(test)]
1454 debug!(
1455 ?path,
1456 "forcing shutdown{} of a state database with multiple active instances",
1457 ephemeral_note,
1458 );
1459 } else {
1460 if self.ephemeral {
1461 ephemeral_note = " and files";
1462 }
1463
1464 debug!(
1465 ?path,
1466 "dropping DiskDb clone, \
1467 but keeping shared database instance{} until the last reference is dropped",
1468 ephemeral_note,
1469 );
1470 return;
1471 }
1472 }
1473
1474 self.assert_default_cf_is_empty();
1475
1476 // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1477 //
1478 // Zebra's data should be fine if we don't clean up, because:
1479 // - the database flushes regularly anyway
1480 // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1481 // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1482 let path = self.path();
1483 debug!(?path, "flushing database to disk");
1484
1485 // These flushes can fail during forced shutdown or during Drop after a shutdown,
1486 // particularly in tests. If they fail, there's nothing we can do about it anyway.
1487 if let Err(error) = self.db.flush() {
1488 let error = format!("{error:?}");
1489 if error.to_ascii_lowercase().contains("shutdown in progress") {
1490 debug!(
1491 ?error,
1492 ?path,
1493 "expected shutdown error flushing database SST files to disk"
1494 );
1495 } else {
1496 info!(
1497 ?error,
1498 ?path,
1499 "unexpected error flushing database SST files to disk during shutdown"
1500 );
1501 }
1502 }
1503
1504 if let Err(error) = self.db.flush_wal(true) {
1505 let error = format!("{error:?}");
1506 if error.to_ascii_lowercase().contains("shutdown in progress") {
1507 debug!(
1508 ?error,
1509 ?path,
1510 "expected shutdown error flushing database WAL buffer to disk"
1511 );
1512 } else {
1513 info!(
1514 ?error,
1515 ?path,
1516 "unexpected error flushing database WAL buffer to disk during shutdown"
1517 );
1518 }
1519 }
1520
1521 // # Memory Safety
1522 //
1523 // We'd like to call `cancel_all_background_work()` before Zebra exits,
1524 // but when we call it, we get memory, thread, or C++ errors when the process exits.
1525 // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1526 // all the threads have cleaned up.)
1527 //
1528 // # Change History
1529 //
1530 // We've changed this setting multiple times since 2021, in response to new RocksDB
1531 // and Rust compiler behaviour.
1532 //
1533 // We enabled cancel_all_background_work() due to failures on:
1534 // - Rust 1.57 on Linux
1535 //
1536 // We disabled cancel_all_background_work() due to failures on:
1537 // - Rust 1.64 on Linux
1538 //
1539 // We tried enabling cancel_all_background_work() due to failures on:
1540 // - Rust 1.70 on macOS 12.6.5 on x86_64
1541 // but it didn't stop the aborts happening (PR #6820).
1542 //
1543 // There weren't any failures with cancel_all_background_work() disabled on:
1544 // - Rust 1.69 or earlier
1545 // - Linux with Rust 1.70
1546 // And with cancel_all_background_work() enabled or disabled on:
1547 // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1548 //
1549 // # Detailed Description
1550 //
1551 // We see these kinds of errors:
1552 // ```
1553 // pthread lock: Invalid argument
1554 // pure virtual method called
1555 // terminate called without an active exception
1556 // pthread destroy mutex: Device or resource busy
1557 // Aborted (core dumped)
1558 // signal: 6, SIGABRT: process abort signal
1559 // signal: 11, SIGSEGV: invalid memory reference
1560 // ```
1561 //
1562 // # Reference
1563 //
1564 // The RocksDB wiki says:
1565 // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1566 // >
1567 // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1568 // > You can speed up the waiting by calling CancelAllBackgroundWork().
1569 //
1570 // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1571 //
1572 // > rocksdb::DB instances need to be destroyed before your main function exits.
1573 // > RocksDB instances usually depend on some internal static variables.
1574 // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1575 //
1576 // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1577 //
1578 // # TODO
1579 //
1580 // Try re-enabling this code and fixing the underlying concurrency bug.
1581 //
1582 //info!(?path, "stopping background database tasks");
1583 //self.db.cancel_all_background_work(true);
1584
1585 // We'd like to drop the database before deleting its files,
1586 // because that closes the column families and the database correctly.
1587 // But Rust's ownership rules make that difficult,
1588 // so we just flush and delete ephemeral data instead.
1589 //
1590 // This implementation doesn't seem to cause any issues,
1591 // and the RocksDB Drop implementation handles any cleanup.
1592 self.delete_ephemeral();
1593 }
1594
1595 /// If the database is `ephemeral`, delete its files.
1596 fn delete_ephemeral(&mut self) {
1597 // # Correctness
1598 //
1599 // This function and all functions that it calls should avoid cloning the shared database
1600 // instance. See `shutdown()` for details.
1601
1602 if !self.ephemeral {
1603 return;
1604 }
1605
1606 let path = self.path();
1607
1608 // This log is verbose during tests.
1609 #[cfg(not(test))]
1610 info!(?path, "removing temporary database files");
1611 #[cfg(test)]
1612 debug!(?path, "removing temporary database files");
1613
1614 // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1615 // but the Zcash blockchain might not fit in memory. So we just
1616 // delete the database files instead.
1617 //
1618 // We'd also like to call `DB::destroy` here, but calling destroy on a
1619 // live DB is undefined behaviour:
1620 // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1621 //
1622 // So we assume that all the database files are under `path`, and
1623 // delete them using standard filesystem APIs. Deleting open files
1624 // might cause errors on non-Unix platforms, so we ignore the result.
1625 // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1626 let result = std::fs::remove_dir_all(path);
1627
1628 if result.is_err() {
1629 // This log is verbose during tests.
1630 #[cfg(not(test))]
1631 info!(
1632 ?result,
1633 ?path,
1634 "removing temporary database files caused an error",
1635 );
1636 #[cfg(test)]
1637 debug!(
1638 ?result,
1639 ?path,
1640 "removing temporary database files caused an error",
1641 );
1642 } else {
1643 debug!(
1644 ?result,
1645 ?path,
1646 "successfully removed temporary database files",
1647 );
1648 }
1649 }
1650
1651 /// Check that the "default" column family is empty.
1652 ///
1653 /// # Panics
1654 ///
1655 /// If Zebra has a bug where it is storing data in the wrong column family.
1656 fn assert_default_cf_is_empty(&self) {
1657 // # Correctness
1658 //
1659 // This function and all functions that it calls should avoid cloning the shared database
1660 // instance. See `shutdown()` for details.
1661
1662 if let Some(default_cf) = self.cf_handle("default") {
1663 assert!(
1664 self.zs_is_empty(&default_cf),
1665 "Zebra should not store data in the 'default' column family"
1666 );
1667 }
1668 }
1669
1670 // Validates a cache directory and creates it if it doesn't exist.
1671 // If the directory cannot be created, it panics with a specific error message.
1672 fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1673 if let Err(e) = fs::create_dir_all(cache_dir) {
1674 match e.kind() {
1675 std::io::ErrorKind::PermissionDenied => panic!(
1676 "Permission denied creating {cache_dir:?}. \
1677 Hint: check if cache directory exist and has write permissions."
1678 ),
1679 std::io::ErrorKind::StorageFull => panic!(
1680 "No space left on device creating {cache_dir:?}. \
1681 Hint: check if the disk is full."
1682 ),
1683 _ => panic!("Could not create cache dir {cache_dir:?}: {e}"),
1684 }
1685 }
1686 }
1687}
1688
1689impl Drop for DiskDb {
1690 fn drop(&mut self) {
1691 let path = self.path();
1692 debug!(?path, "dropping DiskDb instance");
1693
1694 self.shutdown(false);
1695 }
1696}