fuel_core/state/
rocks_db.rs

1use crate::{
2    database::{
3        Error as DatabaseError,
4        Result as DatabaseResult,
5        convert_to_rocksdb_direction,
6        database_description::DatabaseDescription,
7    },
8    state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12    ExtractItem,
13    RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18    Error as StorageError,
19    Result as StorageResult,
20    iter::{
21        BoxedIter,
22        IntoBoxedIter,
23        IterableStore,
24    },
25    kv_store::{
26        KVItem,
27        KeyItem,
28        KeyValueInspect,
29        StorageColumn,
30        Value,
31        WriteOperation,
32    },
33    transactional::{
34        Changes,
35        ReferenceBytesKey,
36        StorageChanges,
37    },
38};
39use itertools::Itertools;
40use rocksdb::{
41    BlockBasedOptions,
42    BoundColumnFamily,
43    Cache,
44    ColumnFamilyDescriptor,
45    DBAccess,
46    DBCompressionType,
47    DBRawIteratorWithThreadMode,
48    DBWithThreadMode,
49    IteratorMode,
50    MultiThreaded,
51    Options,
52    ReadOptions,
53    SliceTransform,
54    WriteBatch,
55};
56use std::{
57    cmp,
58    collections::{
59        BTreeMap,
60        HashSet,
61    },
62    fmt,
63    fmt::Formatter,
64    iter,
65    path::{
66        Path,
67        PathBuf,
68    },
69    sync::{
70        Arc,
71        Mutex,
72    },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80    type Target = DBWithThreadMode<MultiThreaded>;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl Drop for PrimaryInstance {
88    fn drop(&mut self) {
89        self.cancel_all_background_work(true);
90    }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98    // move resources into this closure to have them dropped when db drops
99    drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104        write!(f, "DropResources")
105    }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109    fn from(closure: F) -> Self {
110        Self {
111            drop: Option::Some(Box::new(closure)),
112        }
113    }
114}
115
116impl Drop for DropResources {
117    fn drop(&mut self) {
118        if let Some(drop) = self.drop.take() {
119            (drop)()
120        }
121    }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125/// Defined behaviour for opening the columns of the database.
126pub enum ColumnsPolicy {
127    #[cfg_attr(not(feature = "rocksdb-production"), default)]
128    // Open a new column only when a database interaction is done with it.
129    Lazy,
130    #[cfg_attr(feature = "rocksdb-production", default)]
131    // Open all columns on creation on the service.
132    OnCreation,
133}
134
135/// Configuration to create a database
136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138    pub cache_capacity: Option<usize>,
139    pub max_fds: i32,
140    pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145    pub fn config_for_tests() -> Self {
146        Self {
147            cache_capacity: None,
148            max_fds: 512,
149            columns_policy: ColumnsPolicy::Lazy,
150        }
151    }
152}
153
154pub struct RocksDb<Description> {
155    read_options: ReadOptions,
156    db: Arc<PrimaryInstance>,
157    block_opts: Arc<BlockBasedOptions>,
158    create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159    snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160    metrics: Arc<DatabaseMetrics>,
161    // used for RAII
162    _drop: Arc<DropResources>,
163    _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167    fn drop(&mut self) {
168        // Drop the snapshot before the db.
169        // Dropping the snapshot after the db will cause a sigsegv.
170        self.snapshot = None;
171    }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176        f.debug_struct("RocksDb").field("db", &self.db).finish()
177    }
178}
179
180impl<Description> RocksDb<Description>
181where
182    Description: DatabaseDescription,
183{
184    pub fn default_open_temp() -> DatabaseResult<Self> {
185        Self::default_open_temp_with_params(DatabaseConfig {
186            cache_capacity: None,
187            max_fds: 512,
188            columns_policy: ColumnsPolicy::Lazy,
189        })
190    }
191
192    pub fn default_open_temp_with_params(
193        database_config: DatabaseConfig,
194    ) -> DatabaseResult<Self> {
195        let tmp_dir = TempDir::new().unwrap();
196        let path = tmp_dir.path();
197        let result = Self::open(
198            path,
199            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200            database_config,
201        );
202        let mut db = result?;
203
204        db._drop = Arc::new(
205            {
206                move || {
207                    // cleanup temp dir
208                    drop(tmp_dir);
209                }
210            }
211            .into(),
212        );
213
214        Ok(db)
215    }
216
217    pub fn default_open<P: AsRef<Path>>(
218        path: P,
219        database_config: DatabaseConfig,
220    ) -> DatabaseResult<Self> {
221        Self::open(
222            path,
223            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224            database_config,
225        )
226    }
227
228    pub fn prune(path: &Path) -> DatabaseResult<()> {
229        let path = path.join(Description::name());
230        DB::destroy(&Options::default(), path)
231            .map_err(|e| DatabaseError::Other(e.into()))?;
232        Ok(())
233    }
234
235    pub fn open<P: AsRef<Path>>(
236        path: P,
237        columns: Vec<Description::Column>,
238        database_config: DatabaseConfig,
239    ) -> DatabaseResult<Self> {
240        Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241    }
242
243    pub fn open_read_only<P: AsRef<Path>>(
244        path: P,
245        columns: Vec<Description::Column>,
246        error_if_log_file_exist: bool,
247        database_config: DatabaseConfig,
248    ) -> DatabaseResult<Self> {
249        Self::open_with(
250            |options, primary_path, cfs| {
251                DB::open_cf_descriptors_read_only(
252                    options,
253                    primary_path,
254                    cfs,
255                    error_if_log_file_exist,
256                )
257            },
258            path,
259            columns,
260            database_config,
261        )
262    }
263
264    pub fn open_secondary<PrimaryPath, SecondaryPath>(
265        path: PrimaryPath,
266        secondary_path: SecondaryPath,
267        columns: Vec<Description::Column>,
268        database_config: DatabaseConfig,
269    ) -> DatabaseResult<Self>
270    where
271        PrimaryPath: AsRef<Path>,
272        SecondaryPath: AsRef<Path>,
273    {
274        Self::open_with(
275            |options, primary_path, cfs| {
276                DB::open_cf_descriptors_as_secondary(
277                    options,
278                    primary_path,
279                    secondary_path.as_ref().to_path_buf(),
280                    cfs,
281                )
282            },
283            path,
284            columns,
285            database_config,
286        )
287    }
288
289    pub fn open_with<F, P>(
290        opener: F,
291        path: P,
292        columns: Vec<Description::Column>,
293        database_config: DatabaseConfig,
294    ) -> DatabaseResult<Self>
295    where
296        F: Fn(
297            &Options,
298            PathBuf,
299            Vec<ColumnFamilyDescriptor>,
300        ) -> Result<DB, rocksdb::Error>,
301        P: AsRef<Path>,
302    {
303        let original_path = path.as_ref().to_path_buf();
304        let path = original_path.join(Description::name());
305        let metric_columns = columns
306            .iter()
307            .map(|column| (column.id(), column.name()))
308            .collect::<Vec<_>>();
309        let metrics = Arc::new(DatabaseMetrics::new(
310            Description::name().as_str(),
311            &metric_columns,
312        ));
313        let mut block_opts = BlockBasedOptions::default();
314        // See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
315        block_opts.set_format_version(5);
316
317        if let Some(capacity) = database_config.cache_capacity {
318            // Set cache size 1/3 of the capacity as recommended by
319            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
320            let block_cache_size = capacity / 3;
321            let cache = Cache::new_lru_cache(block_cache_size);
322            block_opts.set_block_cache(&cache);
323            // "index and filter blocks will be stored in block cache, together with all other data blocks."
324            // See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
325            block_opts.set_cache_index_and_filter_blocks(true);
326            // Don't evict L0 filter/index blocks from the cache
327            block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328        } else {
329            block_opts.disable_cache();
330        }
331        block_opts.set_bloom_filter(10.0, true);
332        block_opts.set_block_size(16 * 1024);
333
334        let mut opts = Options::default();
335        opts.set_compression_type(DBCompressionType::Lz4);
336        // TODO: Make it customizable https://github.com/FuelLabs/fuel-core/issues/1666
337        opts.set_max_total_wal_size(64 * 1024 * 1024);
338        let cpu_number =
339            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340        opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341        if let Some(capacity) = database_config.cache_capacity {
342            // Set cache size 1/3 of the capacity. Another 1/3 is
343            // used by block cache and the last 1 / 3 remains for other purposes:
344            //
345            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
346            let row_cache_size = capacity / 3;
347            let cache = Cache::new_lru_cache(row_cache_size);
348            opts.set_row_cache(&cache);
349        }
350        opts.set_max_background_jobs(6);
351        opts.set_bytes_per_sync(1048576);
352        opts.set_max_open_files(database_config.max_fds);
353
354        let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356        let mut cf_descriptors_to_open = BTreeMap::new();
357        let mut cf_descriptors_to_create = BTreeMap::new();
358        for column in columns.clone() {
359            let column_name = Self::col_name(column.id());
360            let opts = Self::cf_opts(column, &block_opts);
361            if existing_column_families.contains(&column_name) {
362                cf_descriptors_to_open.insert(column_name, opts);
363            } else {
364                cf_descriptors_to_create.insert(column_name, opts);
365            }
366        }
367
368        if database_config.columns_policy == ColumnsPolicy::OnCreation
369            || (database_config.columns_policy == ColumnsPolicy::Lazy
370                && cf_descriptors_to_open.is_empty())
371        {
372            opts.create_if_missing(true);
373        }
374
375        let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376            .iter()
377            .filter(|column_name| {
378                !cf_descriptors_to_open.contains_key(*column_name)
379                    && !cf_descriptors_to_create.contains_key(*column_name)
380            })
381            .map(|unknown_column_name| {
382                let unknown_column_options = Self::default_opts(&block_opts);
383                (unknown_column_name.clone(), unknown_column_options)
384            })
385            .collect();
386        cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388        let iterator = cf_descriptors_to_open
389            .clone()
390            .into_iter()
391            .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392            .collect::<Vec<_>>();
393
394        let db = match opener(&opts, path.clone(), iterator) {
395            Ok(db) => {
396                Ok(db)
397            },
398            Err(err) => {
399                tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401                let iterator = cf_descriptors_to_open
402                    .clone()
403                    .into_iter()
404                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405                    .collect::<Vec<_>>();
406
407                opener(&opts, path.clone(), iterator)
408            },
409        }
410        .map_err(|e| DatabaseError::Other(e.into()))?;
411
412        let create_family = match database_config.columns_policy {
413            ColumnsPolicy::OnCreation => {
414                for (name, opt) in cf_descriptors_to_create {
415                    db.create_cf(name, &opt)
416                        .map_err(|e| DatabaseError::Other(e.into()))?;
417                }
418                None
419            }
420            ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421        };
422        let db = Arc::new(PrimaryInstance(db));
423
424        let rocks_db = RocksDb {
425            read_options: Self::generate_read_options(&None),
426            block_opts: Arc::new(block_opts),
427            snapshot: None,
428            db,
429            metrics,
430            create_family,
431            _drop: Default::default(),
432            _marker: Default::default(),
433        };
434
435        Ok(rocks_db)
436    }
437
438    fn generate_read_options(
439        snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440    ) -> ReadOptions {
441        let mut opts = ReadOptions::default();
442        opts.set_verify_checksums(false);
443        if let Some(snapshot) = &snapshot {
444            opts.set_snapshot(snapshot);
445        }
446        opts
447    }
448
449    fn read_options(&self) -> ReadOptions {
450        Self::generate_read_options(&self.snapshot)
451    }
452
453    pub fn create_snapshot(&self) -> Self {
454        self.create_snapshot_generic()
455    }
456
457    pub fn create_snapshot_generic<TargetDescription>(
458        &self,
459    ) -> RocksDb<TargetDescription> {
460        let db = self.db.clone();
461        let block_opts = self.block_opts.clone();
462        let create_family = self.create_family.clone();
463        let metrics = self.metrics.clone();
464        let _drop = self._drop.clone();
465
466        // Safety: We are transmuting the snapshot to 'static lifetime, but it's safe
467        // because we are not going to use it after the RocksDb is dropped.
468        // We control the lifetime of the `Self` - RocksDb, so we can guarantee that
469        // the snapshot will be dropped before the RocksDb.
470        #[allow(clippy::missing_transmute_annotations)]
471        // Remove this and see for yourself
472        let snapshot = unsafe {
473            let snapshot = db.snapshot();
474            core::mem::transmute(snapshot)
475        };
476        let snapshot = Some(snapshot);
477
478        RocksDb {
479            read_options: Self::generate_read_options(&snapshot),
480            block_opts,
481            snapshot,
482            db,
483            create_family,
484            metrics,
485            _drop,
486            _marker: Default::default(),
487        }
488    }
489
490    fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491        self.cf_u32(column.id())
492    }
493
494    fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495        let family = self.db.cf_handle(&Self::col_name(column));
496
497        match family {
498            None => match &self.create_family {
499                Some(create_family) => {
500                    let mut lock = create_family
501                        .lock()
502                        .expect("The create family lock should be available");
503
504                    let name = Self::col_name(column);
505                    let Some(family) = lock.remove(&name) else {
506                        return self
507                            .db
508                            .cf_handle(&Self::col_name(column))
509                            .expect("No column family found");
510                    };
511
512                    self.db
513                        .create_cf(&name, &family)
514                        .expect("Couldn't create column family");
515
516                    self.db.cf_handle(&name).expect("invalid column state")
517                }
518                _ => {
519                    panic!("Columns in the DB should have been created on DB opening");
520                }
521            },
522            Some(family) => family,
523        }
524    }
525
526    fn col_name(column: u32) -> String {
527        format!("col-{}", column)
528    }
529
530    fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531        let mut opts = Options::default();
532        opts.create_if_missing(true);
533        opts.set_compression_type(DBCompressionType::Lz4);
534        opts.set_block_based_table_factory(block_opts);
535
536        opts
537    }
538
539    fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540        let mut opts = Self::default_opts(block_opts);
541
542        // All double-keys should be configured here
543        if let Some(size) = Description::prefix(&column) {
544            opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545        }
546
547        opts
548    }
549
550    /// RocksDB prefix iteration doesn't support reverse order,
551    /// so we need to to force the RocksDB iterator to order
552    /// all the prefix in the iterator so that we can take the next prefix
553    /// as start of iterator and iterate in reverse.
554    fn reverse_prefix_iter<T>(
555        &self,
556        prefix: &[u8],
557        column: Description::Column,
558    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559    where
560        T: ExtractItem,
561    {
562        let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563            let mut opts = self.read_options();
564            // We need this option because our iterator start in the `next_prefix` prefix section
565            // and continue in `prefix` section. Without this option the correct
566            // iteration between prefix section isn't guaranteed
567            // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
568            // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
569            opts.set_total_order_seek(true);
570            self.iterator::<T>(
571                column,
572                opts,
573                IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574            )
575        });
576
577        match reverse_iterator {
578            Some(iterator) => {
579                let prefix = prefix.to_vec();
580                iterator
581                    .take_while(move |item| {
582                        if let Ok(item) = item {
583                            T::starts_with(item, prefix.as_slice())
584                        } else {
585                            true
586                        }
587                    })
588                    .into_boxed()
589            }
590            _ => {
591                // No next item, so we can start backward iteration from the end.
592                let prefix = prefix.to_vec();
593                self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594                    .take_while(move |item| {
595                        if let Ok(item) = item {
596                            T::starts_with(item, prefix.as_slice())
597                        } else {
598                            true
599                        }
600                    })
601                    .into_boxed()
602            }
603        }
604    }
605
606    pub(crate) fn iterator<T>(
607        &self,
608        column: Description::Column,
609        opts: ReadOptions,
610        iter_mode: IteratorMode,
611    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612    where
613        T: ExtractItem,
614    {
615        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617        RocksDBKeyIterator::<_, T>::new(
618            self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619            iter_mode,
620        )
621        .map(move |item| {
622            item.inspect(|item| {
623                self.metrics.read_meter.inc();
624                column_metrics.map(|metric| metric.inc());
625                self.metrics.bytes_read.inc_by(T::size(item));
626            })
627            .map_err(|e| DatabaseError::Other(e.into()).into())
628        })
629    }
630
631    /// The fast way to remove all data from the column.
632    pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633        // Mark column for deletion
634        let column_name = Self::col_name(column.id());
635        self.db
636            .drop_cf(&column_name)
637            .map_err(|e| DatabaseError::Other(e.into()))?;
638
639        // Insert new fresh column without data
640        let column_name = Self::col_name(column.id());
641        let opts = Self::cf_opts(column, self.block_opts.as_ref());
642        self.db
643            .create_cf(&column_name, &opts)
644            .map_err(|e| DatabaseError::Other(e.into()))?;
645
646        Ok(())
647    }
648
649    pub fn multi_get<K, I>(
650        &self,
651        column: u32,
652        iterator: I,
653    ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654    where
655        I: Iterator<Item = K>,
656        K: AsRef<[u8]>,
657    {
658        let column_metrics = self.metrics.columns_read_statistic.get(&column);
659        let cl = self.cf_u32(column);
660        let results = self
661            .db
662            .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663            .into_iter()
664            .map(|el| {
665                self.metrics.read_meter.inc();
666                column_metrics.map(|metric| metric.inc());
667                el.map(|value| {
668                    value.inspect(|vec| {
669                        self.metrics.bytes_read.inc_by(vec.len() as u64);
670                    })
671                })
672                .map_err(|err| DatabaseError::Other(err.into()))
673            })
674            .try_collect()?;
675        Ok(results)
676    }
677
678    fn _iter_store<T>(
679        &self,
680        column: Description::Column,
681        prefix: Option<&[u8]>,
682        start: Option<&[u8]>,
683        direction: IterDirection,
684    ) -> BoxedIter<'_, StorageResult<T::Item>>
685    where
686        T: ExtractItem,
687    {
688        match (prefix, start) {
689            (None, None) => {
690                let iter_mode =
691                    // if no start or prefix just start iterating over entire keyspace
692                    match direction {
693                        IterDirection::Forward => IteratorMode::Start,
694                        // end always iterates in reverse
695                        IterDirection::Reverse => IteratorMode::End,
696                    };
697                self.iterator::<T>(column, self.read_options(), iter_mode)
698                    .into_boxed()
699            }
700            (Some(prefix), None) => {
701                if direction == IterDirection::Reverse {
702                    self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703                } else {
704                    // start iterating in a certain direction within the keyspace
705                    let iter_mode = IteratorMode::From(
706                        prefix,
707                        convert_to_rocksdb_direction(direction),
708                    );
709
710                    // Setting prefix on the RocksDB level to optimize iteration.
711                    let mut opts = self.read_options();
712                    opts.set_prefix_same_as_start(true);
713
714                    let prefix = prefix.to_vec();
715                    self.iterator::<T>(column, opts, iter_mode)
716                        // Not all tables has a prefix set, so we need to filter out the keys.
717                        .take_while(move |item| {
718                            if let Ok(item) = item {
719                                T::starts_with(item, prefix.as_slice())
720                            } else {
721                                true
722                            }
723                        })
724                        .into_boxed()
725                }
726            }
727            (None, Some(start)) => {
728                // start iterating in a certain direction from the start key
729                let iter_mode =
730                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731                let mut opts = self.read_options();
732                // We need this option because our iterator start in the `start` prefix section
733                // and continue in next sections. Without this option the correct
734                // iteration between prefix section isn't guaranteed
735                // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
736                // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
737                opts.set_total_order_seek(true);
738                self.iterator::<T>(column, opts, iter_mode).into_boxed()
739            }
740            (Some(prefix), Some(start)) => {
741                // TODO: Maybe we want to allow the `start` to be without a `prefix` in the future.
742                // If the `start` doesn't have the same `prefix`, return nothing.
743                if !start.starts_with(prefix) {
744                    return iter::empty().into_boxed();
745                }
746
747                // start iterating in a certain direction from the start key
748                // and end iterating when we've gone outside the prefix
749                let prefix = prefix.to_vec();
750                let iter_mode =
751                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752                self.iterator::<T>(column, self.read_options(), iter_mode)
753                    .take_while(move |item| {
754                        if let Ok(item) = item {
755                            T::starts_with(item, prefix.as_slice())
756                        } else {
757                            true
758                        }
759                    })
760                    .into_boxed()
761            }
762        }
763    }
764
765    #[cfg(feature = "backup")]
766    fn backup_engine<P: AsRef<Path> + ?Sized>(
767        backup_dir: &P,
768    ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769        use rocksdb::{
770            Env,
771            backup::{
772                BackupEngine,
773                BackupEngineOptions,
774            },
775        };
776
777        let backup_dir = backup_dir.as_ref().join(Description::name());
778        let backup_dir_path = backup_dir.as_path();
779
780        let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781            .map_err(|e| {
782                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783                    "Couldn't create backup engine options for path `{}`: {}",
784                    backup_dir_path.display(),
785                    e
786                ))
787            })?;
788
789        let cpu_number =
790            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792        backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794        let env = Env::new().map_err(|e| {
795            DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796                "Couldn't create environment for backup: {}",
797                e
798            ))
799        })?;
800
801        let backup_engine =
802            BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804                    "Couldn't open backup engine for path `{}`: {}",
805                    backup_dir.display(),
806                    e
807                ))
808            })?;
809
810        Ok(backup_engine)
811    }
812
813    #[cfg(feature = "backup")]
814    pub fn backup<P: AsRef<Path> + ?Sized>(
815        db_dir: &P,
816        backup_dir: &P,
817    ) -> DatabaseResult<()> {
818        let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820        let db_config = DatabaseConfig {
821            cache_capacity: None,
822            max_fds: -1,
823            columns_policy: ColumnsPolicy::Lazy,
824        };
825
826        let db = Self::open_read_only(
827            db_dir,
828            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829            false,
830            db_config,
831        )?;
832
833        backup_engine.create_new_backup(&db.db).map_err(|e| {
834            DatabaseError::BackupError(anyhow::anyhow!(
835                "Couldn't create new backup for path `{}`: {}",
836                backup_dir.as_ref().display(),
837                e
838            ))
839        })?;
840
841        Ok(())
842    }
843
844    /// We delegate opening of restored db to consumer, so they can apply their own options
845    #[cfg(feature = "backup")]
846    pub fn restore<P: AsRef<Path> + ?Sized>(
847        db_dir: &P,
848        backup_dir: &P,
849    ) -> DatabaseResult<()> {
850        use rocksdb::backup::RestoreOptions;
851
852        let mut backup_engine = Self::backup_engine(backup_dir)?;
853        let restore_option = RestoreOptions::default();
854
855        let db_dir = db_dir.as_ref().join(Description::name());
856        let db_dir_path = db_dir.as_path();
857        // we use the default wal directory, which is same as db path
858        backup_engine
859            .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
860            .map_err(|e| {
861                DatabaseError::RestoreError(anyhow::anyhow!(
862                    "Couldn't restore from latest backup for path `{}`: {}",
863                    db_dir_path.display(),
864                    e
865                ))
866            })?;
867
868        Ok(())
869    }
870
871    pub fn shutdown(&self) {
872        while Arc::strong_count(&self.db) > 1 {}
873    }
874}
875
876pub(crate) struct KeyOnly;
877
878impl ExtractItem for KeyOnly {
879    type Item = Vec<u8>;
880
881    fn extract_item<D>(
882        raw_iterator: &DBRawIteratorWithThreadMode<D>,
883    ) -> Option<Self::Item>
884    where
885        D: DBAccess,
886    {
887        raw_iterator.key().map(|key| key.to_vec())
888    }
889
890    fn size(item: &Self::Item) -> u64 {
891        item.len() as u64
892    }
893
894    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
895        item.starts_with(prefix)
896    }
897}
898
899pub(crate) struct KeyAndValue;
900
901impl ExtractItem for KeyAndValue {
902    type Item = (Vec<u8>, Value);
903
904    fn extract_item<D>(
905        raw_iterator: &DBRawIteratorWithThreadMode<D>,
906    ) -> Option<Self::Item>
907    where
908        D: DBAccess,
909    {
910        raw_iterator
911            .item()
912            .map(|(key, value)| (key.to_vec(), Value::from(value)))
913    }
914
915    fn size(item: &Self::Item) -> u64 {
916        item.0.len().saturating_add(item.1.len()) as u64
917    }
918
919    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
920        item.0.starts_with(prefix)
921    }
922}
923
924impl<Description> KeyValueInspect for RocksDb<Description>
925where
926    Description: DatabaseDescription,
927{
928    type Column = Description::Column;
929
930    fn size_of_value(
931        &self,
932        key: &[u8],
933        column: Self::Column,
934    ) -> StorageResult<Option<usize>> {
935        self.metrics.read_meter.inc();
936        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
937        column_metrics.map(|metric| metric.inc());
938
939        Ok(self
940            .db
941            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
942            .map_err(|e| DatabaseError::Other(e.into()))?
943            .map(|value| value.len()))
944    }
945
946    fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
947        self.metrics.read_meter.inc();
948        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
949        column_metrics.map(|metric| metric.inc());
950
951        let value = self
952            .db
953            .get_cf_opt(&self.cf(column), key, &self.read_options)
954            .map_err(|e| DatabaseError::Other(e.into()))?;
955
956        if let Some(value) = &value {
957            self.metrics.bytes_read.inc_by(value.len() as u64);
958        }
959
960        Ok(value.map(Arc::from))
961    }
962
963    fn read(
964        &self,
965        key: &[u8],
966        column: Self::Column,
967        offset: usize,
968        buf: &mut [u8],
969    ) -> StorageResult<bool> {
970        self.metrics.read_meter.inc();
971        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
972        column_metrics.map(|metric| metric.inc());
973
974        let Some(value) = self
975            .db
976            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
977            .map_err(|e| DatabaseError::Other(e.into()))?
978        else {
979            return Ok(false);
980        };
981
982        let bytes_len = value.len();
983        let start = offset;
984        let buf_len = buf.len();
985        let end = offset.saturating_add(buf_len);
986
987        if end > bytes_len {
988            return Err(StorageError::Other(anyhow::anyhow!(
989                "Offset `{offset}` + buf_len `{buf_len}` read until {end} which is out of bounds `{bytes_len}` for key `{:?}` and column `{column:?}`",
990                key
991            )));
992        }
993
994        let starting_from_offset = &value[start..end];
995        buf[..].copy_from_slice(starting_from_offset);
996
997        self.metrics
998            .bytes_read
999            .inc_by(starting_from_offset.len() as u64);
1000
1001        Ok(true)
1002    }
1003}
1004
1005impl<Description> IterableStore for RocksDb<Description>
1006where
1007    Description: DatabaseDescription,
1008{
1009    fn iter_store(
1010        &self,
1011        column: Self::Column,
1012        prefix: Option<&[u8]>,
1013        start: Option<&[u8]>,
1014        direction: IterDirection,
1015    ) -> BoxedIter<'_, KVItem> {
1016        self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1017    }
1018
1019    fn iter_store_keys(
1020        &self,
1021        column: Self::Column,
1022        prefix: Option<&[u8]>,
1023        start: Option<&[u8]>,
1024        direction: IterDirection,
1025    ) -> BoxedIter<'_, KeyItem> {
1026        self._iter_store::<KeyOnly>(column, prefix, start, direction)
1027    }
1028}
1029
1030impl<Description> RocksDb<Description>
1031where
1032    Description: DatabaseDescription,
1033{
1034    pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1035        let instant = std::time::Instant::now();
1036        let mut batch = WriteBatch::default();
1037        let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1038
1039        match changes {
1040            StorageChanges::Changes(changes) => {
1041                self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1042            }
1043            StorageChanges::ChangesList(changes_list) => {
1044                for changes in changes_list {
1045                    self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1046                }
1047            }
1048        }
1049
1050        self.db
1051            .write(batch)
1052            .map_err(|e| DatabaseError::Other(e.into()))?;
1053        // TODO: Use `u128` when `AtomicU128` is stable.
1054        self.metrics.database_commit_time.inc_by(
1055            u64::try_from(instant.elapsed().as_nanos())
1056                .expect("The commit shouldn't take longer than `u64`"),
1057        );
1058
1059        Ok(())
1060    }
1061
1062    fn _populate_batch<'a>(
1063        &self,
1064        batch: &mut WriteBatch,
1065        conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1066        changes: &'a Changes,
1067    ) -> DatabaseResult<()> {
1068        for (column, ops) in changes {
1069            let cf = self.cf_u32(*column);
1070            let column_metrics = self.metrics.columns_write_statistic.get(column);
1071            for (key, op) in ops {
1072                self.metrics.write_meter.inc();
1073                column_metrics.map(|metric| metric.inc());
1074
1075                if !conflict_finder.insert((column, key)) {
1076                    return Err(DatabaseError::ConflictingChanges {
1077                        column: *column,
1078                        key: key.clone(),
1079                    });
1080                }
1081
1082                match op {
1083                    WriteOperation::Insert(value) => {
1084                        self.metrics.bytes_written.inc_by(value.len() as u64);
1085                        batch.put_cf(&cf, key, value.as_ref());
1086                    }
1087                    WriteOperation::Remove => {
1088                        batch.delete_cf(&cf, key);
1089                    }
1090                }
1091            }
1092        }
1093        Ok(())
1094    }
1095}
1096
1097/// The `None` means overflow, so there is not following prefix.
1098fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1099    for byte in prefix.iter_mut().rev() {
1100        if let Some(new_byte) = byte.checked_add(1) {
1101            *byte = new_byte;
1102            return Some(prefix);
1103        }
1104    }
1105    None
1106}
1107
1108#[cfg(feature = "test-helpers")]
1109pub mod test_helpers {
1110    use super::*;
1111    use fuel_core_storage::{
1112        kv_store::KeyValueMutate,
1113        transactional::ReadTransaction,
1114    };
1115
1116    impl<Description> KeyValueMutate for RocksDb<Description>
1117    where
1118        Description: DatabaseDescription,
1119    {
1120        fn write(
1121            &mut self,
1122            key: &[u8],
1123            column: Self::Column,
1124            buf: &[u8],
1125        ) -> StorageResult<usize> {
1126            let mut transaction = self.read_transaction();
1127            let len = transaction.write(key, column, buf)?;
1128            let changes = transaction.into_changes();
1129            self.commit_changes(&StorageChanges::Changes(changes))?;
1130
1131            Ok(len)
1132        }
1133
1134        fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1135            let mut transaction = self.read_transaction();
1136            transaction.delete(key, column)?;
1137            let changes = transaction.into_changes();
1138            self.commit_changes(&StorageChanges::Changes(changes))?;
1139            Ok(())
1140        }
1141    }
1142}
1143
1144#[allow(non_snake_case)]
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::database::database_description::on_chain::OnChain;
1149    use fuel_core_storage::{
1150        column::Column,
1151        kv_store::KeyValueMutate,
1152    };
1153    use std::collections::{
1154        BTreeMap,
1155        HashMap,
1156    };
1157    use tempfile::TempDir;
1158
1159    fn create_db() -> (RocksDb<OnChain>, TempDir) {
1160        let tmp_dir = TempDir::new().unwrap();
1161        (
1162            RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1163                .unwrap(),
1164            tmp_dir,
1165        )
1166    }
1167
1168    #[test]
1169    fn open_new_columns() {
1170        let tmp_dir = TempDir::new().unwrap();
1171
1172        // Given
1173        let old_columns =
1174            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1175        let database_with_old_columns = RocksDb::<OnChain>::open(
1176            tmp_dir.path(),
1177            old_columns.clone(),
1178            DatabaseConfig::config_for_tests(),
1179        )
1180        .expect("Failed to open database with old columns");
1181        drop(database_with_old_columns);
1182
1183        // When
1184        let mut new_columns = old_columns;
1185        new_columns.push(Column::ContractsAssets);
1186        new_columns.push(Column::Metadata);
1187        let database_with_new_columns = RocksDb::<OnChain>::open(
1188            tmp_dir.path(),
1189            new_columns,
1190            DatabaseConfig::config_for_tests(),
1191        )
1192        .map(|_| ());
1193
1194        // Then
1195        assert_eq!(Ok(()), database_with_new_columns);
1196    }
1197
1198    #[test]
1199    fn can_put_and_read() {
1200        let key = vec![0xA, 0xB, 0xC];
1201
1202        let (mut db, _tmp) = create_db();
1203        let expected = Value::from([1, 2, 3]);
1204        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1205
1206        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1207    }
1208
1209    #[test]
1210    fn put_returns_previous_value() {
1211        let key = vec![0xA, 0xB, 0xC];
1212
1213        let (mut db, _tmp) = create_db();
1214        let expected = Value::from([1, 2, 3]);
1215        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1216        let prev = db
1217            .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1218            .unwrap();
1219
1220        assert_eq!(prev, Some(expected));
1221    }
1222
1223    #[test]
1224    fn delete_and_get() {
1225        let key = vec![0xA, 0xB, 0xC];
1226
1227        let (mut db, _tmp) = create_db();
1228        let expected = Value::from([1, 2, 3]);
1229        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1230        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1231
1232        db.delete(&key, Column::Metadata).unwrap();
1233        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1234    }
1235
1236    #[test]
1237    fn key_exists() {
1238        let key = vec![0xA, 0xB, 0xC];
1239
1240        let (mut db, _tmp) = create_db();
1241        let expected = Arc::new([1, 2, 3]);
1242        db.put(&key, Column::Metadata, expected).unwrap();
1243        assert!(db.exists(&key, Column::Metadata).unwrap());
1244    }
1245
1246    #[test]
1247    fn commit_changes_inserts() {
1248        let key = vec![0xA, 0xB, 0xC];
1249        let value = Value::from([1, 2, 3]);
1250
1251        let (db, _tmp) = create_db();
1252        let ops = vec![(
1253            Column::Metadata.id(),
1254            BTreeMap::from_iter(vec![(
1255                key.clone().into(),
1256                WriteOperation::Insert(value.clone()),
1257            )]),
1258        )];
1259
1260        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1261            .unwrap();
1262        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1263    }
1264
1265    #[test]
1266    fn commit_changes_removes() {
1267        let key = vec![0xA, 0xB, 0xC];
1268        let value = Arc::new([1, 2, 3]);
1269
1270        let (mut db, _tmp) = create_db();
1271        db.put(&key, Column::Metadata, value).unwrap();
1272
1273        let ops = vec![(
1274            Column::Metadata.id(),
1275            BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1276        )];
1277        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1278            .unwrap();
1279
1280        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1281    }
1282
1283    #[test]
1284    fn can_use_unit_value() {
1285        let key = vec![0x00];
1286
1287        let (mut db, _tmp) = create_db();
1288        let expected = Value::from([]);
1289        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1290
1291        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1292
1293        assert!(db.exists(&key, Column::Metadata).unwrap());
1294
1295        assert_eq!(
1296            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1297                .collect::<Result<Vec<_>, _>>()
1298                .unwrap()[0],
1299            (key.clone(), expected.clone())
1300        );
1301
1302        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1303
1304        assert!(!db.exists(&key, Column::Metadata).unwrap());
1305    }
1306
1307    #[test]
1308    fn can_use_unit_key() {
1309        let key: Vec<u8> = Vec::with_capacity(0);
1310
1311        let (mut db, _tmp) = create_db();
1312        let expected = Value::from([1, 2, 3]);
1313        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1314
1315        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1316
1317        assert!(db.exists(&key, Column::Metadata).unwrap());
1318
1319        assert_eq!(
1320            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1321                .collect::<Result<Vec<_>, _>>()
1322                .unwrap()[0],
1323            (key.clone(), expected.clone())
1324        );
1325
1326        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1327
1328        assert!(!db.exists(&key, Column::Metadata).unwrap());
1329    }
1330
1331    #[test]
1332    fn can_use_unit_key_and_value() {
1333        let key: Vec<u8> = Vec::with_capacity(0);
1334
1335        let (mut db, _tmp) = create_db();
1336        let expected = Value::from([]);
1337        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1338
1339        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1340
1341        assert!(db.exists(&key, Column::Metadata).unwrap());
1342
1343        assert_eq!(
1344            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1345                .collect::<Result<Vec<_>, _>>()
1346                .unwrap()[0],
1347            (key.clone(), expected.clone())
1348        );
1349
1350        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1351
1352        assert!(!db.exists(&key, Column::Metadata).unwrap());
1353    }
1354
1355    #[test]
1356    fn open_primary_db_second_time_fails() {
1357        // Given
1358        let (_primary_db, tmp_dir) = create_db();
1359
1360        // When
1361        let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1362            .collect::<Vec<_>>();
1363        let result = RocksDb::<OnChain>::open(
1364            tmp_dir.path(),
1365            columns,
1366            DatabaseConfig::config_for_tests(),
1367        );
1368
1369        // Then
1370        assert!(result.is_err());
1371    }
1372
1373    #[test]
1374    fn open_second_read_only_db() {
1375        // Given
1376        let (_primary_db, tmp_dir) = create_db();
1377
1378        // When
1379        let old_columns =
1380            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1381        let result = RocksDb::<OnChain>::open_read_only(
1382            tmp_dir.path(),
1383            old_columns.clone(),
1384            false,
1385            DatabaseConfig::config_for_tests(),
1386        )
1387        .map(|_| ());
1388
1389        // Then
1390        assert_eq!(Ok(()), result);
1391    }
1392
1393    #[test]
1394    fn open_secondary_db() {
1395        // Given
1396        let (_primary_db, tmp_dir) = create_db();
1397        let secondary_temp = TempDir::new().unwrap();
1398
1399        // When
1400        let old_columns =
1401            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1402        let result = RocksDb::<OnChain>::open_secondary(
1403            tmp_dir.path(),
1404            secondary_temp.path(),
1405            old_columns.clone(),
1406            DatabaseConfig::config_for_tests(),
1407        )
1408        .map(|_| ());
1409
1410        // Then
1411        assert_eq!(Ok(()), result);
1412    }
1413
1414    #[test]
1415    fn snapshot_allows_get_entry_after_it_was_removed() {
1416        let (mut db, _tmp) = create_db();
1417        let value = Value::from([1, 2, 3]);
1418
1419        // Given
1420        let key_1 = [1; 32];
1421        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1422        let snapshot = db.create_snapshot();
1423
1424        // When
1425        db.delete(&key_1, Column::Metadata).unwrap();
1426
1427        // Then
1428        let db_get = db.get(&key_1, Column::Metadata).unwrap();
1429        assert!(db_get.is_none());
1430
1431        let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1432        assert_eq!(snapshot_get, Some(value));
1433    }
1434
1435    #[test]
1436    fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1437        let (mut db, _tmp) = create_db();
1438        let value = Value::from([1, 2, 3]);
1439
1440        // Given
1441        let key_1 = vec![1; 32];
1442        let key_2 = vec![2; 32];
1443        let key_3 = vec![3; 32];
1444        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1445        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1446        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1447        let snapshot = db.create_snapshot();
1448
1449        // When
1450        db.delete(&key_1, Column::Metadata).unwrap();
1451        db.delete(&key_2, Column::Metadata).unwrap();
1452        db.delete(&key_3, Column::Metadata).unwrap();
1453
1454        // Then
1455        let db_iter = db
1456            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1457            .collect::<Vec<_>>();
1458        assert!(db_iter.is_empty());
1459
1460        let snapshot_iter = snapshot
1461            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1462            .collect::<Vec<_>>();
1463        assert_eq!(
1464            snapshot_iter,
1465            vec![
1466                Ok((key_1, value.clone())),
1467                Ok((key_2, value.clone())),
1468                Ok((key_3, value))
1469            ]
1470        );
1471    }
1472
1473    #[test]
1474    fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1475        let (db, _tmp) = create_db();
1476
1477        // Given
1478        let snapshot = db.create_snapshot();
1479
1480        // When
1481        drop(db);
1482
1483        // Then
1484        drop(snapshot);
1485    }
1486
1487    #[test]
1488    fn open__opens_subset_of_columns_after_opening_all_columns() {
1489        // Given
1490        let (first_open_with_all_columns, tmp_dir) = create_db();
1491
1492        // When
1493        drop(first_open_with_all_columns);
1494        let part_of_columns =
1495            enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1496                .skip(1)
1497                .collect::<Vec<_>>();
1498        let open_with_part_of_columns = RocksDb::<OnChain>::open(
1499            tmp_dir.path(),
1500            part_of_columns,
1501            DatabaseConfig::config_for_tests(),
1502        );
1503
1504        // Then
1505        let _ = open_with_part_of_columns
1506            .expect("Should open the database with shorter number of columns");
1507    }
1508
1509    #[test]
1510    fn iter_store__reverse_iterator__no_target_prefix() {
1511        // Given
1512        let (mut db, _tmp) = create_db();
1513        let value = Value::from([]);
1514        let key_1 = [1, 1];
1515        let key_2 = [2, 2];
1516        let key_3 = [9, 3];
1517        let key_4 = [10, 0];
1518        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1519        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1520        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1521        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1522
1523        // When
1524        let db_iter = db
1525            .iter_store(
1526                Column::Metadata,
1527                Some(vec![5].as_slice()),
1528                None,
1529                IterDirection::Reverse,
1530            )
1531            .map(|item| item.map(|(key, _)| key))
1532            .collect::<Vec<_>>();
1533
1534        // Then
1535        assert_eq!(db_iter, vec![]);
1536    }
1537
1538    #[test]
1539    fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1540        // Given
1541        let (mut db, _tmp) = create_db();
1542        let value = Value::from([]);
1543        let key_1 = [1, 1];
1544        let key_2 = [2, 2];
1545        let key_3 = [2, 3];
1546        let key_4 = [10, 0];
1547        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1548        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1549        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1550        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1551
1552        // When
1553        let db_iter = db
1554            .iter_store(
1555                Column::Metadata,
1556                Some(vec![2].as_slice()),
1557                None,
1558                IterDirection::Reverse,
1559            )
1560            .map(|item| item.map(|(key, _)| key))
1561            .collect::<Vec<_>>();
1562
1563        // Then
1564        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1565    }
1566
1567    #[test]
1568    fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1569        // Given
1570        let (mut db, _tmp) = create_db();
1571        let value = Value::from([]);
1572        let key_1 = [1, 1];
1573        let key_2 = [2, 2];
1574        let key_3 = [2, 3];
1575        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1576        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1577        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1578
1579        // When
1580        let db_iter = db
1581            .iter_store(
1582                Column::Metadata,
1583                Some(vec![2].as_slice()),
1584                None,
1585                IterDirection::Reverse,
1586            )
1587            .map(|item| item.map(|(key, _)| key))
1588            .collect::<Vec<_>>();
1589
1590        // Then
1591        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1592    }
1593
1594    #[test]
1595    fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1596        // Given
1597        let (mut db, _tmp) = create_db();
1598        let value = Value::from([]);
1599        let key_1 = [1, 1];
1600        let key_2 = [255, 254];
1601        let key_3 = [255, 255];
1602        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1603        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1604        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1605
1606        // When
1607        let db_iter = db
1608            .iter_store(
1609                Column::Metadata,
1610                Some(vec![255].as_slice()),
1611                None,
1612                IterDirection::Reverse,
1613            )
1614            .map(|item| item.map(|(key, _)| key))
1615            .collect::<Vec<_>>();
1616
1617        // Then
1618        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1619    }
1620
1621    #[test]
1622    fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1623        // Given
1624        let (mut db, _tmp) = create_db();
1625        let value = Value::from([]);
1626        let key_1 = [1, 1];
1627        let key_2 = [255, 254];
1628        let key_3 = [255, 255];
1629        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1630        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1631        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1632
1633        // When
1634        db.clear_table(Column::Metadata).unwrap();
1635
1636        // Then
1637        let db_iter = db
1638            .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1639            .collect::<Vec<_>>();
1640        assert_eq!(db_iter, vec![]);
1641    }
1642}