zebra_state/service/finalized_state/disk_db.rs
1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//! ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14 collections::{BTreeMap, HashMap},
15 fmt::{Debug, Write},
16 fs,
17 ops::RangeBounds,
18 path::Path,
19 sync::{
20 atomic::{self, AtomicBool},
21 Arc,
22 },
23};
24
25use itertools::Itertools;
26use rlimit::increase_nofile_limit;
27
28use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
29use semver::Version;
30use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
31
32use crate::{
33 database_format_version_on_disk,
34 service::finalized_state::disk_format::{FromDisk, IntoDisk},
35 write_database_format_version_to_disk, Config,
36};
37
38use super::zebra_db::transparent::{
39 fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
40 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
41};
42// Doc-only imports
43#[allow(unused_imports)]
44use super::{TypedColumnFamily, WriteTypedBatch};
45
46#[cfg(any(test, feature = "proptest-impl"))]
47mod tests;
48
49/// The [`rocksdb::ThreadMode`] used by the database.
50pub type DBThreadMode = rocksdb::SingleThreaded;
51
52/// The [`rocksdb`] database type, including thread mode.
53///
54/// Also the [`rocksdb::DBAccess`] used by database iterators.
55pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
56
57/// Wrapper struct to ensure low-level database access goes through the correct API.
58///
59/// `rocksdb` allows concurrent writes through a shared reference,
60/// so database instances are cloneable. When the final clone is dropped,
61/// the database is closed.
62///
63/// # Correctness
64///
65/// Reading transactions from the database using RocksDB iterators causes hangs.
66/// But creating iterators and reading the tip height works fine.
67///
68/// So these hangs are probably caused by holding column family locks to read:
69/// - multiple values, or
70/// - large values.
71///
72/// This bug might be fixed by moving database operations to blocking threads (#2188),
73/// so that they don't block the tokio executor.
74/// (Or it might be fixed by future RocksDB upgrades.)
75#[derive(Clone, Debug)]
76pub struct DiskDb {
77 // Configuration
78 //
79 // This configuration cannot be modified after the database is initialized,
80 // because some clones would have different values.
81 //
82 /// The configured database kind for this database.
83 db_kind: String,
84
85 /// The format version of the running Zebra code.
86 format_version_in_code: Version,
87
88 /// The configured network for this database.
89 network: Network,
90
91 /// The configured temporary database setting.
92 ///
93 /// If true, the database files are deleted on drop.
94 ephemeral: bool,
95
96 /// A boolean flag indicating whether the db format change task has finished
97 /// applying any format changes that may have been required.
98 finished_format_upgrades: Arc<AtomicBool>,
99
100 // Owned State
101 //
102 // Everything contained in this state must be shared by all clones, or read-only.
103 //
104 /// The shared inner RocksDB database.
105 ///
106 /// RocksDB allows reads and writes via a shared reference.
107 ///
108 /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
109 /// column family changes and [`Drop`] require exclusive access.
110 ///
111 /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
112 /// only [`Drop`] requires exclusive access.
113 db: Arc<DB>,
114}
115
116/// Wrapper struct to ensure low-level database writes go through the correct API.
117///
118/// [`rocksdb::WriteBatch`] is a batched set of database updates,
119/// which must be written to the database using `DiskDb::write(batch)`.
120#[must_use = "batches must be written to the database"]
121#[derive(Default)]
122pub struct DiskWriteBatch {
123 /// The inner RocksDB write batch.
124 batch: rocksdb::WriteBatch,
125}
126
127impl Debug for DiskWriteBatch {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("DiskWriteBatch")
130 .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
131 .finish()
132 }
133}
134
135impl PartialEq for DiskWriteBatch {
136 fn eq(&self, other: &Self) -> bool {
137 self.batch.data() == other.batch.data()
138 }
139}
140
141impl Eq for DiskWriteBatch {}
142
143/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
144///
145/// # Deprecation
146///
147/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
148//
149// TODO: replace uses of this trait with WriteTypedBatch,
150// implement these methods directly on WriteTypedBatch, and delete the trait.
151pub trait WriteDisk {
152 /// Serialize and insert the given key and value into a rocksdb column family,
153 /// overwriting any existing `value` for `key`.
154 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
155 where
156 C: rocksdb::AsColumnFamilyRef,
157 K: IntoDisk + Debug,
158 V: IntoDisk;
159
160 /// Serialize and merge the given key and value into a rocksdb column family,
161 /// merging with any existing `value` for `key`.
162 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
163 where
164 C: rocksdb::AsColumnFamilyRef,
165 K: IntoDisk + Debug,
166 V: IntoDisk;
167
168 /// Remove the given key from a rocksdb column family, if it exists.
169 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
170 where
171 C: rocksdb::AsColumnFamilyRef,
172 K: IntoDisk + Debug;
173
174 /// Delete the given key range from a rocksdb column family, if it exists, including `from`
175 /// and excluding `until_strictly_before`.
176 //
177 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
178 // see zs_range_iter() for an example of the edge cases
179 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
180 where
181 C: rocksdb::AsColumnFamilyRef,
182 K: IntoDisk + Debug;
183}
184
185/// # Deprecation
186///
187/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
188//
189// TODO: replace uses of these impls with WriteTypedBatch,
190// implement these methods directly on WriteTypedBatch, and delete the trait.
191impl WriteDisk for DiskWriteBatch {
192 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
193 where
194 C: rocksdb::AsColumnFamilyRef,
195 K: IntoDisk + Debug,
196 V: IntoDisk,
197 {
198 let key_bytes = key.as_bytes();
199 let value_bytes = value.as_bytes();
200 self.batch.put_cf(cf, key_bytes, value_bytes);
201 }
202
203 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
204 where
205 C: rocksdb::AsColumnFamilyRef,
206 K: IntoDisk + Debug,
207 V: IntoDisk,
208 {
209 let key_bytes = key.as_bytes();
210 let value_bytes = value.as_bytes();
211 self.batch.merge_cf(cf, key_bytes, value_bytes);
212 }
213
214 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
215 where
216 C: rocksdb::AsColumnFamilyRef,
217 K: IntoDisk + Debug,
218 {
219 let key_bytes = key.as_bytes();
220 self.batch.delete_cf(cf, key_bytes);
221 }
222
223 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
224 // see zs_range_iter() for an example of the edge cases
225 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
226 where
227 C: rocksdb::AsColumnFamilyRef,
228 K: IntoDisk + Debug,
229 {
230 let from_bytes = from.as_bytes();
231 let until_strictly_before_bytes = until_strictly_before.as_bytes();
232 self.batch
233 .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
234 }
235}
236
237// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
238impl<T> WriteDisk for &mut T
239where
240 T: WriteDisk,
241{
242 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
243 where
244 C: rocksdb::AsColumnFamilyRef,
245 K: IntoDisk + Debug,
246 V: IntoDisk,
247 {
248 (*self).zs_insert(cf, key, value)
249 }
250
251 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
252 where
253 C: rocksdb::AsColumnFamilyRef,
254 K: IntoDisk + Debug,
255 V: IntoDisk,
256 {
257 (*self).zs_merge(cf, key, value)
258 }
259
260 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
261 where
262 C: rocksdb::AsColumnFamilyRef,
263 K: IntoDisk + Debug,
264 {
265 (*self).zs_delete(cf, key)
266 }
267
268 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
269 where
270 C: rocksdb::AsColumnFamilyRef,
271 K: IntoDisk + Debug,
272 {
273 (*self).zs_delete_range(cf, from, until_strictly_before)
274 }
275}
276
277/// Helper trait for retrieving and deserializing values from rocksdb column families.
278///
279/// # Deprecation
280///
281/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
282//
283// TODO: replace uses of this trait with TypedColumnFamily,
284// implement these methods directly on DiskDb, and delete the trait.
285pub trait ReadDisk {
286 /// Returns true if a rocksdb column family `cf` does not contain any entries.
287 fn zs_is_empty<C>(&self, cf: &C) -> bool
288 where
289 C: rocksdb::AsColumnFamilyRef;
290
291 /// Returns the value for `key` in the rocksdb column family `cf`, if present.
292 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
293 where
294 C: rocksdb::AsColumnFamilyRef,
295 K: IntoDisk,
296 V: FromDisk;
297
298 /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
299 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
300 where
301 C: rocksdb::AsColumnFamilyRef,
302 K: IntoDisk;
303
304 /// Returns the lowest key in `cf`, and the corresponding value.
305 ///
306 /// Returns `None` if the column family is empty.
307 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
308 where
309 C: rocksdb::AsColumnFamilyRef,
310 K: IntoDisk + FromDisk,
311 V: FromDisk;
312
313 /// Returns the highest key in `cf`, and the corresponding value.
314 ///
315 /// Returns `None` if the column family is empty.
316 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
317 where
318 C: rocksdb::AsColumnFamilyRef,
319 K: IntoDisk + FromDisk,
320 V: FromDisk;
321
322 /// Returns the first key greater than or equal to `lower_bound` in `cf`,
323 /// and the corresponding value.
324 ///
325 /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
326 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
327 where
328 C: rocksdb::AsColumnFamilyRef,
329 K: IntoDisk + FromDisk,
330 V: FromDisk;
331
332 /// Returns the first key strictly greater than `lower_bound` in `cf`,
333 /// and the corresponding value.
334 ///
335 /// Returns `None` if there are no keys greater than `lower_bound`.
336 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
337 where
338 C: rocksdb::AsColumnFamilyRef,
339 K: IntoDisk + FromDisk,
340 V: FromDisk;
341
342 /// Returns the first key less than or equal to `upper_bound` in `cf`,
343 /// and the corresponding value.
344 ///
345 /// Returns `None` if there are no keys less than or equal to `upper_bound`.
346 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
347 where
348 C: rocksdb::AsColumnFamilyRef,
349 K: IntoDisk + FromDisk,
350 V: FromDisk;
351
352 /// Returns the first key strictly less than `upper_bound` in `cf`,
353 /// and the corresponding value.
354 ///
355 /// Returns `None` if there are no keys less than `upper_bound`.
356 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
357 where
358 C: rocksdb::AsColumnFamilyRef,
359 K: IntoDisk + FromDisk,
360 V: FromDisk;
361
362 /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
363 ///
364 /// Holding this iterator open might delay block commit transactions.
365 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
366 where
367 C: rocksdb::AsColumnFamilyRef,
368 K: IntoDisk + FromDisk + Ord,
369 V: FromDisk,
370 R: RangeBounds<K>;
371
372 /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
373 ///
374 /// Holding this iterator open might delay block commit transactions.
375 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
376 where
377 C: rocksdb::AsColumnFamilyRef,
378 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
379 V: FromDisk,
380 R: RangeBounds<K>;
381}
382
383impl PartialEq for DiskDb {
384 fn eq(&self, other: &Self) -> bool {
385 if self.db.path() == other.db.path() {
386 assert_eq!(
387 self.network, other.network,
388 "database with same path but different network configs",
389 );
390 assert_eq!(
391 self.ephemeral, other.ephemeral,
392 "database with same path but different ephemeral configs",
393 );
394
395 return true;
396 }
397
398 false
399 }
400}
401
402impl Eq for DiskDb {}
403
404/// # Deprecation
405///
406/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
407//
408// TODO: replace uses of these impls with TypedColumnFamily,
409// implement these methods directly on DiskDb, and delete the trait.
410impl ReadDisk for DiskDb {
411 fn zs_is_empty<C>(&self, cf: &C) -> bool
412 where
413 C: rocksdb::AsColumnFamilyRef,
414 {
415 // Empty column families return invalid forward iterators.
416 //
417 // Checking iterator validity does not seem to cause database hangs.
418 let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
419 let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
420
421 !raw_iterator.valid()
422 }
423
424 #[allow(clippy::unwrap_in_result)]
425 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
426 where
427 C: rocksdb::AsColumnFamilyRef,
428 K: IntoDisk,
429 V: FromDisk,
430 {
431 let key_bytes = key.as_bytes();
432
433 // We use `get_pinned_cf` to avoid taking ownership of the serialized
434 // value, because we're going to deserialize it anyways, which avoids an
435 // extra copy
436 let value_bytes = self
437 .db
438 .get_pinned_cf(cf, key_bytes)
439 .expect("unexpected database failure");
440
441 value_bytes.map(V::from_bytes)
442 }
443
444 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
445 where
446 C: rocksdb::AsColumnFamilyRef,
447 K: IntoDisk,
448 {
449 let key_bytes = key.as_bytes();
450
451 // We use `get_pinned_cf` to avoid taking ownership of the serialized
452 // value, because we don't use the value at all. This avoids an extra copy.
453 self.db
454 .get_pinned_cf(cf, key_bytes)
455 .expect("unexpected database failure")
456 .is_some()
457 }
458
459 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
460 where
461 C: rocksdb::AsColumnFamilyRef,
462 K: IntoDisk + FromDisk,
463 V: FromDisk,
464 {
465 // Reading individual values from iterators does not seem to cause database hangs.
466 self.zs_forward_range_iter(cf, ..).next()
467 }
468
469 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
470 where
471 C: rocksdb::AsColumnFamilyRef,
472 K: IntoDisk + FromDisk,
473 V: FromDisk,
474 {
475 // Reading individual values from iterators does not seem to cause database hangs.
476 self.zs_reverse_range_iter(cf, ..).next()
477 }
478
479 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
480 where
481 C: rocksdb::AsColumnFamilyRef,
482 K: IntoDisk + FromDisk,
483 V: FromDisk,
484 {
485 self.zs_forward_range_iter(cf, lower_bound..).next()
486 }
487
488 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
489 where
490 C: rocksdb::AsColumnFamilyRef,
491 K: IntoDisk + FromDisk,
492 V: FromDisk,
493 {
494 use std::ops::Bound::*;
495
496 // There is no standard syntax for an excluded start bound.
497 self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
498 .next()
499 }
500
501 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
502 where
503 C: rocksdb::AsColumnFamilyRef,
504 K: IntoDisk + FromDisk,
505 V: FromDisk,
506 {
507 self.zs_reverse_range_iter(cf, ..=upper_bound).next()
508 }
509
510 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
511 where
512 C: rocksdb::AsColumnFamilyRef,
513 K: IntoDisk + FromDisk,
514 V: FromDisk,
515 {
516 self.zs_reverse_range_iter(cf, ..upper_bound).next()
517 }
518
519 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
520 where
521 C: rocksdb::AsColumnFamilyRef,
522 K: IntoDisk + FromDisk + Ord,
523 V: FromDisk,
524 R: RangeBounds<K>,
525 {
526 self.zs_forward_range_iter(cf, range).collect()
527 }
528
529 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
530 where
531 C: rocksdb::AsColumnFamilyRef,
532 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
533 V: FromDisk,
534 R: RangeBounds<K>,
535 {
536 self.zs_forward_range_iter(cf, range).collect()
537 }
538}
539
540impl DiskWriteBatch {
541 /// Creates and returns a new transactional batch write.
542 ///
543 /// # Correctness
544 ///
545 /// Each block must be written to the state inside a batch, so that:
546 /// - concurrent `ReadStateService` queries don't see half-written blocks, and
547 /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
548 pub fn new() -> Self {
549 DiskWriteBatch {
550 batch: rocksdb::WriteBatch::default(),
551 }
552 }
553}
554
555impl DiskDb {
556 /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
557 pub fn print_db_metrics(&self) {
558 let mut total_size_on_disk = 0;
559 let mut total_live_size_on_disk = 0;
560 let mut total_size_in_mem = 0;
561 let db: &Arc<DB> = &self.db;
562 let db_options = DiskDb::options();
563 let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
564 let mut column_families_log_string = String::from("");
565
566 write!(column_families_log_string, "Column families and sizes: ").unwrap();
567
568 for cf_descriptor in column_families {
569 let cf_name = &cf_descriptor.name();
570 let cf_handle = db
571 .cf_handle(cf_name)
572 .expect("Column family handle must exist");
573 let live_data_size = db
574 .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
575 .unwrap_or(Some(0));
576 let total_sst_files_size = db
577 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
578 .unwrap_or(Some(0));
579 let cf_disk_size = total_sst_files_size.unwrap_or(0);
580 total_size_on_disk += cf_disk_size;
581 total_live_size_on_disk += live_data_size.unwrap_or(0);
582 let mem_table_size = db
583 .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
584 .unwrap_or(Some(0));
585 total_size_in_mem += mem_table_size.unwrap_or(0);
586
587 write!(
588 column_families_log_string,
589 "{} (Disk: {}, Memory: {})",
590 cf_name,
591 human_bytes::human_bytes(cf_disk_size as f64),
592 human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
593 )
594 .unwrap();
595 }
596
597 debug!("{}", column_families_log_string);
598 info!(
599 "Total Database Disk Size: {}",
600 human_bytes::human_bytes(total_size_on_disk as f64)
601 );
602 info!(
603 "Total Live Data Disk Size: {}",
604 human_bytes::human_bytes(total_live_size_on_disk as f64)
605 );
606 info!(
607 "Total Database Memory Size: {}",
608 human_bytes::human_bytes(total_size_in_mem as f64)
609 );
610 }
611
612 /// Returns the estimated total disk space usage of the database.
613 pub fn size(&self) -> u64 {
614 let db: &Arc<DB> = &self.db;
615 let db_options = DiskDb::options();
616 let mut total_size_on_disk = 0;
617 for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
618 let cf_name = &cf_descriptor.name();
619 let cf_handle = db
620 .cf_handle(cf_name)
621 .expect("Column family handle must exist");
622
623 total_size_on_disk += db
624 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
625 .ok()
626 .flatten()
627 .unwrap_or(0);
628 }
629
630 total_size_on_disk
631 }
632
633 /// Sets `finished_format_upgrades` to true to indicate that Zebra has
634 /// finished applying any required db format upgrades.
635 pub fn mark_finished_format_upgrades(&self) {
636 self.finished_format_upgrades
637 .store(true, atomic::Ordering::SeqCst);
638 }
639
640 /// Returns true if the `finished_format_upgrades` flag has been set to true to
641 /// indicate that Zebra has finished applying any required db format upgrades.
642 pub fn finished_format_upgrades(&self) -> bool {
643 self.finished_format_upgrades.load(atomic::Ordering::SeqCst)
644 }
645
646 /// When called with a secondary DB instance, tries to catch up with the primary DB instance
647 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
648 self.db.try_catch_up_with_primary()
649 }
650
651 /// Returns a forward iterator over the items in `cf` in `range`.
652 ///
653 /// Holding this iterator open might delay block commit transactions.
654 pub fn zs_forward_range_iter<C, K, V, R>(
655 &self,
656 cf: &C,
657 range: R,
658 ) -> impl Iterator<Item = (K, V)> + '_
659 where
660 C: rocksdb::AsColumnFamilyRef,
661 K: IntoDisk + FromDisk,
662 V: FromDisk,
663 R: RangeBounds<K>,
664 {
665 self.zs_range_iter_with_direction(cf, range, false)
666 }
667
668 /// Returns a reverse iterator over the items in `cf` in `range`.
669 ///
670 /// Holding this iterator open might delay block commit transactions.
671 pub fn zs_reverse_range_iter<C, K, V, R>(
672 &self,
673 cf: &C,
674 range: R,
675 ) -> impl Iterator<Item = (K, V)> + '_
676 where
677 C: rocksdb::AsColumnFamilyRef,
678 K: IntoDisk + FromDisk,
679 V: FromDisk,
680 R: RangeBounds<K>,
681 {
682 self.zs_range_iter_with_direction(cf, range, true)
683 }
684
685 /// Returns an iterator over the items in `cf` in `range`.
686 ///
687 /// RocksDB iterators are ordered by increasing key bytes by default.
688 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
689 ///
690 /// Holding this iterator open might delay block commit transactions.
691 fn zs_range_iter_with_direction<C, K, V, R>(
692 &self,
693 cf: &C,
694 range: R,
695 reverse: bool,
696 ) -> impl Iterator<Item = (K, V)> + '_
697 where
698 C: rocksdb::AsColumnFamilyRef,
699 K: IntoDisk + FromDisk,
700 V: FromDisk,
701 R: RangeBounds<K>,
702 {
703 use std::ops::Bound::{self, *};
704
705 // Replace with map() when it stabilises:
706 // https://github.com/rust-lang/rust/issues/86026
707 let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
708 match bound {
709 Unbounded => Unbounded,
710 Included(x) => Included(x.as_bytes().as_ref().to_vec()),
711 Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
712 }
713 };
714
715 let start_bound = map_to_vec(range.start_bound());
716 let end_bound = map_to_vec(range.end_bound());
717 let range = (start_bound, end_bound);
718
719 let mode = Self::zs_iter_mode(&range, reverse);
720 let opts = Self::zs_iter_opts(&range);
721
722 // Reading multiple items from iterators has caused database hangs,
723 // in previous RocksDB versions
724 self.db
725 .iterator_cf_opt(cf, opts, mode)
726 .map(|result| result.expect("unexpected database failure"))
727 .map(|(key, value)| (key.to_vec(), value))
728 // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
729 // strictly before the "from" bound.
730 .skip_while({
731 let range = range.clone();
732 move |(key, _value)| !range.contains(key)
733 })
734 // Take until the excluded "to" bound is reached,
735 // or we're after the included "to" bound.
736 .take_while(move |(key, _value)| range.contains(key))
737 .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
738 }
739
740 /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
741 fn zs_iter_opts<R>(range: &R) -> ReadOptions
742 where
743 R: RangeBounds<Vec<u8>>,
744 {
745 let mut opts = ReadOptions::default();
746 let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
747
748 if let Some(bound) = lower_bound {
749 opts.set_iterate_lower_bound(bound);
750 };
751
752 if let Some(bound) = upper_bound {
753 opts.set_iterate_upper_bound(bound);
754 };
755
756 opts
757 }
758
759 /// Returns a lower and upper iterate bounds for a range.
760 ///
761 /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
762 /// will increment the upper bound by 1 if the end bound of the provided range
763 /// is inclusive.
764 fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
765 where
766 R: RangeBounds<Vec<u8>>,
767 {
768 use std::ops::Bound::*;
769
770 let lower_bound = match range.start_bound() {
771 Included(bound) | Excluded(bound) => Some(bound.clone()),
772 Unbounded => None,
773 };
774
775 let upper_bound = match range.end_bound().cloned() {
776 Included(mut bound) => {
777 // Increment the last byte in the upper bound that is less than u8::MAX, and
778 // clear any bytes after it to increment the next key in lexicographic order
779 // (next big-endian number). RocksDB uses lexicographic order for keys.
780 let is_wrapped_overflow = increment_big_endian(&mut bound);
781
782 if is_wrapped_overflow {
783 bound.insert(0, 0x01)
784 }
785
786 Some(bound)
787 }
788 Excluded(bound) => Some(bound),
789 Unbounded => None,
790 };
791
792 (lower_bound, upper_bound)
793 }
794
795 /// Returns the RocksDB iterator "from" mode for `range`.
796 ///
797 /// RocksDB iterators are ordered by increasing key bytes by default.
798 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
799 fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode<'_>
800 where
801 R: RangeBounds<Vec<u8>>,
802 {
803 use std::ops::Bound::*;
804
805 let from_bound = if reverse {
806 range.end_bound()
807 } else {
808 range.start_bound()
809 };
810
811 match from_bound {
812 Unbounded => {
813 if reverse {
814 // Reversed unbounded iterators start from the last item
815 rocksdb::IteratorMode::End
816 } else {
817 // Unbounded iterators start from the first item
818 rocksdb::IteratorMode::Start
819 }
820 }
821
822 Included(bound) | Excluded(bound) => {
823 let direction = if reverse {
824 rocksdb::Direction::Reverse
825 } else {
826 rocksdb::Direction::Forward
827 };
828
829 rocksdb::IteratorMode::From(bound.as_slice(), direction)
830 }
831 }
832 }
833
834 /// The ideal open file limit for Zebra
835 const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
836
837 /// The minimum number of open files for Zebra to operate normally. Also used
838 /// as the default open file limit, when the OS doesn't tell us how many
839 /// files we can use.
840 ///
841 /// We want 100+ file descriptors for peers, and 100+ for the database.
842 ///
843 /// On Windows, the default limit is 512 high-level I/O files, and 8192
844 /// low-level I/O files:
845 /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
846 const MIN_OPEN_FILE_LIMIT: u64 = 512;
847
848 /// The number of files used internally by Zebra.
849 ///
850 /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
851 /// stdio (3), and other OS facilities (2+).
852 const RESERVED_FILE_COUNT: u64 = 48;
853
854 /// The size of the database memtable RAM cache in megabytes.
855 ///
856 /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
857 const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
858
859 /// Build a vector of current column families on the disk and optionally any new column families.
860 /// Returns an iterable collection of all column families.
861 fn construct_column_families(
862 db_options: Options,
863 path: &Path,
864 column_families_in_code: impl IntoIterator<Item = String>,
865 ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
866 // When opening the database in read/write mode, all column families must be opened.
867 //
868 // To make Zebra forward-compatible with databases updated by later versions,
869 // we read any existing column families off the disk, then add any new column families
870 // from the current implementation.
871 //
872 // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
873 let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
874 let column_families_in_code = column_families_in_code.into_iter();
875
876 column_families_on_disk
877 .into_iter()
878 .chain(column_families_in_code)
879 .unique()
880 .map(move |cf_name: String| {
881 let mut cf_options = db_options.clone();
882
883 if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
884 cf_options.set_merge_operator_associative(
885 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
886 fetch_add_balance_and_received,
887 );
888 }
889
890 rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
891 })
892 }
893
894 /// Opens or creates the database at a path based on the kind, major version and network,
895 /// with the supplied column families, preserving any existing column families,
896 /// and returns a shared low-level database wrapper.
897 ///
898 /// # Panics
899 ///
900 /// - If the cache directory does not exist and can't be created.
901 /// - If the database cannot be opened for whatever reason.
902 pub fn new(
903 config: &Config,
904 db_kind: impl AsRef<str>,
905 format_version_in_code: &Version,
906 network: &Network,
907 column_families_in_code: impl IntoIterator<Item = String>,
908 read_only: bool,
909 ) -> DiskDb {
910 // If the database is ephemeral, we don't need to check the cache directory.
911 if !config.ephemeral {
912 DiskDb::validate_cache_dir(&config.cache_dir);
913 }
914
915 let db_kind = db_kind.as_ref();
916 let path = config.db_path(db_kind, format_version_in_code.major, network);
917
918 let db_options = DiskDb::options();
919
920 let column_families =
921 DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
922
923 let db_result = if read_only {
924 // Use a tempfile for the secondary instance cache directory
925 let secondary_config = Config {
926 ephemeral: true,
927 ..config.clone()
928 };
929 let secondary_path =
930 secondary_config.db_path("secondary_state", format_version_in_code.major, network);
931 let create_dir_result = std::fs::create_dir_all(&secondary_path);
932
933 info!(?create_dir_result, "creating secondary db directory");
934
935 DB::open_cf_descriptors_as_secondary(
936 &db_options,
937 &path,
938 &secondary_path,
939 column_families,
940 )
941 } else {
942 DB::open_cf_descriptors(&db_options, &path, column_families)
943 };
944
945 match db_result {
946 Ok(db) => {
947 info!("Opened Zebra state cache at {}", path.display());
948
949 let db = DiskDb {
950 db_kind: db_kind.to_string(),
951 format_version_in_code: format_version_in_code.clone(),
952 network: network.clone(),
953 ephemeral: config.ephemeral,
954 db: Arc::new(db),
955 finished_format_upgrades: Arc::new(AtomicBool::new(false)),
956 };
957
958 db.assert_default_cf_is_empty();
959
960 db
961 }
962
963 Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
964 "Database likely already open {path:?} \
965 Hint: Check if another zebrad process is running."
966 ),
967
968 Err(e) => panic!(
969 "Opening database {path:?} failed. \
970 Hint: Try changing the state cache_dir in the Zebra config. \
971 Error: {e}",
972 ),
973 }
974 }
975
976 // Accessor methods
977
978 /// Returns the configured database kind for this database.
979 pub fn db_kind(&self) -> String {
980 self.db_kind.clone()
981 }
982
983 /// Returns the format version of the running code that created this `DiskDb` instance in memory.
984 pub fn format_version_in_code(&self) -> Version {
985 self.format_version_in_code.clone()
986 }
987
988 /// Returns the fixed major version for this database.
989 pub fn major_version(&self) -> u64 {
990 self.format_version_in_code().major
991 }
992
993 /// Returns the configured network for this database.
994 pub fn network(&self) -> Network {
995 self.network.clone()
996 }
997
998 /// Returns the `Path` where the files used by this database are located.
999 pub fn path(&self) -> &Path {
1000 self.db.path()
1001 }
1002
1003 /// Returns the low-level rocksdb inner database.
1004 #[allow(dead_code)]
1005 fn inner(&self) -> &Arc<DB> {
1006 &self.db
1007 }
1008
1009 /// Returns the column family handle for `cf_name`.
1010 pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
1011 // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
1012 // the shorter of &self and &str, but RocksDB clones column family names internally, so it
1013 // should just be &self. To avoid this restriction, clone the string before passing it to
1014 // this method. Currently Zebra uses static strings, so this doesn't matter.
1015 self.db.cf_handle(cf_name)
1016 }
1017
1018 // Read methods are located in the ReadDisk trait
1019
1020 // Write methods
1021 // Low-level write methods are located in the WriteDisk trait
1022
1023 /// Writes `batch` to the database.
1024 pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1025 self.db.write(batch.batch)
1026 }
1027
1028 // Private methods
1029
1030 /// Tries to reuse an existing db after a major upgrade.
1031 ///
1032 /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1033 /// db to a new path so it can be used again. It does so by merely trying to rename the path
1034 /// corresponding to the db version directly preceding the current version to the path that is
1035 /// used by the current db. If successful, it also deletes the db version file.
1036 ///
1037 /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1038 // TODO: Update this function to rename older major db format version to the current version (#9565).
1039 #[allow(clippy::unwrap_in_result)]
1040 pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1041 restorable_db_versions: &[u64],
1042 format_version_in_code: &Version,
1043 config: &Config,
1044 db_kind: impl AsRef<str>,
1045 network: &Network,
1046 ) -> Option<Version> {
1047 if let Some(&major_db_ver) = restorable_db_versions
1048 .iter()
1049 .find(|v| **v == format_version_in_code.major)
1050 {
1051 let db_kind = db_kind.as_ref();
1052
1053 let old_major_db_ver = major_db_ver - 1;
1054 let old_path = config.db_path(db_kind, old_major_db_ver, network);
1055 // Exit early if the path doesn't exist or there's an error checking it.
1056 if !fs::exists(&old_path).unwrap_or(false) {
1057 return None;
1058 }
1059
1060 let new_path = config.db_path(db_kind, major_db_ver, network);
1061
1062 let old_path = match fs::canonicalize(&old_path) {
1063 Ok(canonicalized_old_path) => canonicalized_old_path,
1064 Err(e) => {
1065 warn!("could not canonicalize {old_path:?}: {e}");
1066 return None;
1067 }
1068 };
1069
1070 let cache_path = match fs::canonicalize(&config.cache_dir) {
1071 Ok(canonicalized_cache_path) => canonicalized_cache_path,
1072 Err(e) => {
1073 warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1074 return None;
1075 }
1076 };
1077
1078 // # Correctness
1079 //
1080 // Check that the path we're about to move is inside the cache directory.
1081 //
1082 // If the user has symlinked the state directory to a non-cache directory, we don't want
1083 // to move it, because it might contain other files.
1084 //
1085 // We don't attempt to guard against malicious symlinks created by attackers
1086 // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1087 if !old_path.starts_with(&cache_path) {
1088 info!("skipped reusing previous state cache: state is outside cache directory");
1089 return None;
1090 }
1091
1092 let opts = DiskDb::options();
1093 let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1094 let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1095
1096 if old_db_exists && !new_db_exists {
1097 // Create the parent directory for the new db. This is because we can't directly
1098 // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1099 // `state/v26/` does not exist.
1100 match fs::create_dir_all(
1101 new_path
1102 .parent()
1103 .expect("new state cache must have a parent path"),
1104 ) {
1105 Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1106 Err(e) => {
1107 warn!(
1108 "could not create new directory for state cache at {new_path:?}: {e}"
1109 );
1110 return None;
1111 }
1112 };
1113
1114 match fs::rename(&old_path, &new_path) {
1115 Ok(()) => {
1116 info!("moved state cache from {old_path:?} to {new_path:?}");
1117
1118 let mut disk_version =
1119 database_format_version_on_disk(config, db_kind, major_db_ver, network)
1120 .expect("unable to read database format version file")
1121 .expect("unable to parse database format version");
1122
1123 disk_version.major = old_major_db_ver;
1124
1125 write_database_format_version_to_disk(
1126 config,
1127 db_kind,
1128 major_db_ver,
1129 &disk_version,
1130 network,
1131 )
1132 .expect("unable to write database format version file to disk");
1133
1134 // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1135 // empty.
1136 let old_path = old_path
1137 .parent()
1138 .expect("old state cache must have parent path");
1139
1140 if fs::read_dir(old_path)
1141 .expect("cached state dir needs to be readable")
1142 .next()
1143 .is_none()
1144 {
1145 match fs::remove_dir_all(old_path) {
1146 Ok(()) => {
1147 info!("removed empty old state cache directory at {old_path:?}")
1148 }
1149 Err(e) => {
1150 warn!(
1151 "could not remove empty old state cache directory \
1152 at {old_path:?}: {e}"
1153 )
1154 }
1155 }
1156 }
1157
1158 return Some(disk_version);
1159 }
1160 Err(e) => {
1161 warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1162 }
1163 };
1164 }
1165 };
1166
1167 None
1168 }
1169
1170 /// Returns the database options for the finalized state database.
1171 fn options() -> rocksdb::Options {
1172 let mut opts = rocksdb::Options::default();
1173 let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1174
1175 const ONE_MEGABYTE: usize = 1024 * 1024;
1176
1177 opts.create_if_missing(true);
1178 opts.create_missing_column_families(true);
1179
1180 // Use the recommended Ribbon filter setting for all column families.
1181 //
1182 // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1183 // (They aren't needed for single-valued column families, but they don't hurt either.)
1184 block_based_opts.set_ribbon_filter(9.9);
1185
1186 // Use the recommended LZ4 compression type.
1187 //
1188 // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1189 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1190
1191 // Tune level-style database file compaction.
1192 //
1193 // This improves Zebra's initial sync speed slightly, as of April 2022.
1194 opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1195
1196 // Increase the process open file limit if needed,
1197 // then use it to set RocksDB's limit.
1198 let open_file_limit = DiskDb::increase_open_file_limit();
1199 let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1200
1201 // If the current limit is very large, set the DB limit using the ideal limit
1202 let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1203 .try_into()
1204 .expect("ideal open file limit fits in a c_int");
1205 let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1206
1207 opts.set_max_open_files(db_file_limit);
1208
1209 // Set the block-based options
1210 opts.set_block_based_table_factory(&block_based_opts);
1211
1212 opts
1213 }
1214
1215 /// Calculate the database's share of `open_file_limit`
1216 fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1217 // Give the DB half the files, and reserve half the files for peers
1218 (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1219 }
1220
1221 /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1222 /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1223 ///
1224 /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1225 /// unchanged.
1226 ///
1227 /// Returns the current limit, after any successful increases.
1228 ///
1229 /// # Panics
1230 ///
1231 /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1232 fn increase_open_file_limit() -> u64 {
1233 // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1234 // (`zebra-state` database).
1235 //
1236 // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1237 // both database files and TCP connections.
1238 //
1239 // But it doesn't do anything on Windows in rlimit 0.7.0.
1240 //
1241 // On Windows, the default limits are:
1242 // - 512 high-level stream I/O files (via the C standard functions),
1243 // - 8192 low-level I/O files (via the Unix C functions), and
1244 // - 1000 TCP Control Block entries (network connections).
1245 //
1246 // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1247 // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1248 //
1249 // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1250 // the Windows low-level I/O file limit.
1251 //
1252 // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1253 // functions from the `rlimit` crate only change the high-level I/O file limit.
1254 //
1255 // `zebra-network`'s default connection limit is much less than
1256 // the TCP Control Block limit on Windows.
1257
1258 // We try setting the ideal limit, then the minimum limit.
1259 let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1260 Ok(current_limit) => current_limit,
1261 Err(limit_error) => {
1262 // These errors can happen due to sandboxing or unsupported system calls,
1263 // even if the file limit is high enough.
1264 info!(
1265 ?limit_error,
1266 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1267 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1268 "unable to increase the open file limit, \
1269 assuming Zebra can open a minimum number of files"
1270 );
1271
1272 return DiskDb::MIN_OPEN_FILE_LIMIT;
1273 }
1274 };
1275
1276 if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1277 panic!(
1278 "open file limit too low: \
1279 unable to set the number of open files to {}, \
1280 the minimum number of files required by Zebra. \
1281 Current limit is {:?}. \
1282 Hint: Increase the open file limit to {} before launching Zebra",
1283 DiskDb::MIN_OPEN_FILE_LIMIT,
1284 current_limit,
1285 DiskDb::IDEAL_OPEN_FILE_LIMIT
1286 );
1287 } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1288 warn!(
1289 ?current_limit,
1290 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1291 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1292 "the maximum number of open files is below Zebra's ideal limit. \
1293 Hint: Increase the open file limit to {} before launching Zebra",
1294 DiskDb::IDEAL_OPEN_FILE_LIMIT
1295 );
1296 } else if cfg!(windows) {
1297 // This log is verbose during tests.
1298 #[cfg(not(test))]
1299 info!(
1300 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1301 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1302 "assuming the open file limit is high enough for Zebra",
1303 );
1304 #[cfg(test)]
1305 debug!(
1306 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1307 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1308 "assuming the open file limit is high enough for Zebra",
1309 );
1310 } else {
1311 #[cfg(not(test))]
1312 debug!(
1313 ?current_limit,
1314 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1315 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1316 "the open file limit is high enough for Zebra",
1317 );
1318 #[cfg(test)]
1319 debug!(
1320 ?current_limit,
1321 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1322 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1323 "the open file limit is high enough for Zebra",
1324 );
1325 }
1326
1327 current_limit
1328 }
1329
1330 // Cleanup methods
1331
1332 /// Returns the number of shared instances of this database.
1333 ///
1334 /// # Concurrency
1335 ///
1336 /// The actual number of owners can be higher or lower than the returned value,
1337 /// because databases can simultaneously be cloned or dropped in other threads.
1338 ///
1339 /// However, if the number of owners is 1, and the caller has exclusive access,
1340 /// the count can't increase unless that caller clones the database.
1341 pub(crate) fn shared_database_owners(&self) -> usize {
1342 Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1343 }
1344
1345 /// Shut down the database, cleaning up background tasks and ephemeral data.
1346 ///
1347 /// If `force` is true, clean up regardless of any shared references.
1348 /// `force` can cause errors accessing the database from other shared references.
1349 /// It should only be used in debugging or test code, immediately before a manual shutdown.
1350 ///
1351 /// TODO: make private after the stop height check has moved to the syncer (#3442)
1352 /// move shutting down the database to a blocking thread (#2188)
1353 pub(crate) fn shutdown(&mut self, force: bool) {
1354 // # Correctness
1355 //
1356 // If we're the only owner of the shared database instance,
1357 // then there are no other threads that can increase the strong or weak count.
1358 //
1359 // ## Implementation Requirements
1360 //
1361 // This function and all functions that it calls should avoid cloning the shared database
1362 // instance. If they do, they must drop it before:
1363 // - shutting down database threads, or
1364 // - deleting database files.
1365
1366 if self.shared_database_owners() > 1 {
1367 let path = self.path();
1368
1369 let mut ephemeral_note = "";
1370
1371 if force {
1372 if self.ephemeral {
1373 ephemeral_note = " and removing ephemeral files";
1374 }
1375
1376 // This log is verbose during tests.
1377 #[cfg(not(test))]
1378 info!(
1379 ?path,
1380 "forcing shutdown{} of a state database with multiple active instances",
1381 ephemeral_note,
1382 );
1383 #[cfg(test)]
1384 debug!(
1385 ?path,
1386 "forcing shutdown{} of a state database with multiple active instances",
1387 ephemeral_note,
1388 );
1389 } else {
1390 if self.ephemeral {
1391 ephemeral_note = " and files";
1392 }
1393
1394 debug!(
1395 ?path,
1396 "dropping DiskDb clone, \
1397 but keeping shared database instance{} until the last reference is dropped",
1398 ephemeral_note,
1399 );
1400 return;
1401 }
1402 }
1403
1404 self.assert_default_cf_is_empty();
1405
1406 // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1407 //
1408 // Zebra's data should be fine if we don't clean up, because:
1409 // - the database flushes regularly anyway
1410 // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1411 // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1412 let path = self.path();
1413 debug!(?path, "flushing database to disk");
1414
1415 // These flushes can fail during forced shutdown or during Drop after a shutdown,
1416 // particularly in tests. If they fail, there's nothing we can do about it anyway.
1417 if let Err(error) = self.db.flush() {
1418 let error = format!("{error:?}");
1419 if error.to_ascii_lowercase().contains("shutdown in progress") {
1420 debug!(
1421 ?error,
1422 ?path,
1423 "expected shutdown error flushing database SST files to disk"
1424 );
1425 } else {
1426 info!(
1427 ?error,
1428 ?path,
1429 "unexpected error flushing database SST files to disk during shutdown"
1430 );
1431 }
1432 }
1433
1434 if let Err(error) = self.db.flush_wal(true) {
1435 let error = format!("{error:?}");
1436 if error.to_ascii_lowercase().contains("shutdown in progress") {
1437 debug!(
1438 ?error,
1439 ?path,
1440 "expected shutdown error flushing database WAL buffer to disk"
1441 );
1442 } else {
1443 info!(
1444 ?error,
1445 ?path,
1446 "unexpected error flushing database WAL buffer to disk during shutdown"
1447 );
1448 }
1449 }
1450
1451 // # Memory Safety
1452 //
1453 // We'd like to call `cancel_all_background_work()` before Zebra exits,
1454 // but when we call it, we get memory, thread, or C++ errors when the process exits.
1455 // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1456 // all the threads have cleaned up.)
1457 //
1458 // # Change History
1459 //
1460 // We've changed this setting multiple times since 2021, in response to new RocksDB
1461 // and Rust compiler behaviour.
1462 //
1463 // We enabled cancel_all_background_work() due to failures on:
1464 // - Rust 1.57 on Linux
1465 //
1466 // We disabled cancel_all_background_work() due to failures on:
1467 // - Rust 1.64 on Linux
1468 //
1469 // We tried enabling cancel_all_background_work() due to failures on:
1470 // - Rust 1.70 on macOS 12.6.5 on x86_64
1471 // but it didn't stop the aborts happening (PR #6820).
1472 //
1473 // There weren't any failures with cancel_all_background_work() disabled on:
1474 // - Rust 1.69 or earlier
1475 // - Linux with Rust 1.70
1476 // And with cancel_all_background_work() enabled or disabled on:
1477 // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1478 //
1479 // # Detailed Description
1480 //
1481 // We see these kinds of errors:
1482 // ```
1483 // pthread lock: Invalid argument
1484 // pure virtual method called
1485 // terminate called without an active exception
1486 // pthread destroy mutex: Device or resource busy
1487 // Aborted (core dumped)
1488 // signal: 6, SIGABRT: process abort signal
1489 // signal: 11, SIGSEGV: invalid memory reference
1490 // ```
1491 //
1492 // # Reference
1493 //
1494 // The RocksDB wiki says:
1495 // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1496 // >
1497 // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1498 // > You can speed up the waiting by calling CancelAllBackgroundWork().
1499 //
1500 // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1501 //
1502 // > rocksdb::DB instances need to be destroyed before your main function exits.
1503 // > RocksDB instances usually depend on some internal static variables.
1504 // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1505 //
1506 // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1507 //
1508 // # TODO
1509 //
1510 // Try re-enabling this code and fixing the underlying concurrency bug.
1511 //
1512 //info!(?path, "stopping background database tasks");
1513 //self.db.cancel_all_background_work(true);
1514
1515 // We'd like to drop the database before deleting its files,
1516 // because that closes the column families and the database correctly.
1517 // But Rust's ownership rules make that difficult,
1518 // so we just flush and delete ephemeral data instead.
1519 //
1520 // This implementation doesn't seem to cause any issues,
1521 // and the RocksDB Drop implementation handles any cleanup.
1522 self.delete_ephemeral();
1523 }
1524
1525 /// If the database is `ephemeral`, delete its files.
1526 fn delete_ephemeral(&mut self) {
1527 // # Correctness
1528 //
1529 // This function and all functions that it calls should avoid cloning the shared database
1530 // instance. See `shutdown()` for details.
1531
1532 if !self.ephemeral {
1533 return;
1534 }
1535
1536 let path = self.path();
1537
1538 // This log is verbose during tests.
1539 #[cfg(not(test))]
1540 info!(?path, "removing temporary database files");
1541 #[cfg(test)]
1542 debug!(?path, "removing temporary database files");
1543
1544 // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1545 // but the Zcash blockchain might not fit in memory. So we just
1546 // delete the database files instead.
1547 //
1548 // We'd also like to call `DB::destroy` here, but calling destroy on a
1549 // live DB is undefined behaviour:
1550 // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1551 //
1552 // So we assume that all the database files are under `path`, and
1553 // delete them using standard filesystem APIs. Deleting open files
1554 // might cause errors on non-Unix platforms, so we ignore the result.
1555 // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1556 let result = std::fs::remove_dir_all(path);
1557
1558 if result.is_err() {
1559 // This log is verbose during tests.
1560 #[cfg(not(test))]
1561 info!(
1562 ?result,
1563 ?path,
1564 "removing temporary database files caused an error",
1565 );
1566 #[cfg(test)]
1567 debug!(
1568 ?result,
1569 ?path,
1570 "removing temporary database files caused an error",
1571 );
1572 } else {
1573 debug!(
1574 ?result,
1575 ?path,
1576 "successfully removed temporary database files",
1577 );
1578 }
1579 }
1580
1581 /// Check that the "default" column family is empty.
1582 ///
1583 /// # Panics
1584 ///
1585 /// If Zebra has a bug where it is storing data in the wrong column family.
1586 fn assert_default_cf_is_empty(&self) {
1587 // # Correctness
1588 //
1589 // This function and all functions that it calls should avoid cloning the shared database
1590 // instance. See `shutdown()` for details.
1591
1592 if let Some(default_cf) = self.cf_handle("default") {
1593 assert!(
1594 self.zs_is_empty(&default_cf),
1595 "Zebra should not store data in the 'default' column family"
1596 );
1597 }
1598 }
1599
1600 // Validates a cache directory and creates it if it doesn't exist.
1601 // If the directory cannot be created, it panics with a specific error message.
1602 fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1603 if let Err(e) = fs::create_dir_all(cache_dir) {
1604 match e.kind() {
1605 std::io::ErrorKind::PermissionDenied => panic!(
1606 "Permission denied creating {cache_dir:?}. \
1607 Hint: check if cache directory exist and has write permissions."
1608 ),
1609 std::io::ErrorKind::StorageFull => panic!(
1610 "No space left on device creating {cache_dir:?}. \
1611 Hint: check if the disk is full."
1612 ),
1613 _ => panic!("Could not create cache dir {cache_dir:?}: {e}"),
1614 }
1615 }
1616 }
1617}
1618
1619impl Drop for DiskDb {
1620 fn drop(&mut self) {
1621 let path = self.path();
1622 debug!(?path, "dropping DiskDb instance");
1623
1624 self.shutdown(false);
1625 }
1626}