Skip to main content

fuel_core/state/
rocks_db.rs

1use crate::{
2    database::{
3        Error as DatabaseError,
4        Result as DatabaseResult,
5        convert_to_rocksdb_direction,
6        database_description::DatabaseDescription,
7    },
8    state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12    ExtractItem,
13    RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18    Result as StorageResult,
19    StorageReadError,
20    iter::{
21        BoxedIter,
22        IntoBoxedIter,
23        IterableStore,
24    },
25    kv_store::{
26        KVItem,
27        KeyItem,
28        KeyValueInspect,
29        StorageColumn,
30        Value,
31        WriteOperation,
32    },
33    transactional::{
34        Changes,
35        ReferenceBytesKey,
36        StorageChanges,
37    },
38};
39use itertools::Itertools;
40use rocksdb::{
41    BlockBasedOptions,
42    BoundColumnFamily,
43    Cache,
44    ColumnFamilyDescriptor,
45    DBAccess,
46    DBCompressionType,
47    DBRawIteratorWithThreadMode,
48    DBWithThreadMode,
49    IteratorMode,
50    MultiThreaded,
51    Options,
52    ReadOptions,
53    SliceTransform,
54    WriteBatch,
55};
56use std::{
57    cmp,
58    collections::{
59        BTreeMap,
60        HashSet,
61    },
62    fmt,
63    fmt::Formatter,
64    iter,
65    path::{
66        Path,
67        PathBuf,
68    },
69    sync::{
70        Arc,
71        Mutex,
72    },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80    type Target = DBWithThreadMode<MultiThreaded>;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl Drop for PrimaryInstance {
88    fn drop(&mut self) {
89        self.cancel_all_background_work(true);
90    }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98    // move resources into this closure to have them dropped when db drops
99    drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104        write!(f, "DropResources")
105    }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109    fn from(closure: F) -> Self {
110        Self {
111            drop: Option::Some(Box::new(closure)),
112        }
113    }
114}
115
116impl Drop for DropResources {
117    fn drop(&mut self) {
118        if let Some(drop) = self.drop.take() {
119            (drop)()
120        }
121    }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125/// Defined behaviour for opening the columns of the database.
126pub enum ColumnsPolicy {
127    #[cfg_attr(not(feature = "rocksdb-production"), default)]
128    // Open a new column only when a database interaction is done with it.
129    Lazy,
130    #[cfg_attr(feature = "rocksdb-production", default)]
131    // Open all columns on creation on the service.
132    OnCreation,
133}
134
135/// Configuration to create a database
136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138    pub cache_capacity: Option<usize>,
139    pub max_fds: i32,
140    pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145    pub fn config_for_tests() -> Self {
146        Self {
147            cache_capacity: None,
148            max_fds: 512,
149            columns_policy: ColumnsPolicy::Lazy,
150        }
151    }
152}
153
154pub struct RocksDb<Description> {
155    read_options: ReadOptions,
156    db: Arc<PrimaryInstance>,
157    block_opts: Arc<BlockBasedOptions>,
158    create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159    snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160    metrics: Arc<DatabaseMetrics>,
161    // used for RAII
162    _drop: Arc<DropResources>,
163    _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167    fn drop(&mut self) {
168        // Drop the snapshot before the db.
169        // Dropping the snapshot after the db will cause a sigsegv.
170        self.snapshot = None;
171    }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176        f.debug_struct("RocksDb").field("db", &self.db).finish()
177    }
178}
179
180impl<Description> RocksDb<Description>
181where
182    Description: DatabaseDescription,
183{
184    pub fn default_open_temp() -> DatabaseResult<Self> {
185        Self::default_open_temp_with_params(DatabaseConfig {
186            cache_capacity: None,
187            max_fds: 512,
188            columns_policy: ColumnsPolicy::Lazy,
189        })
190    }
191
192    pub fn default_open_temp_with_params(
193        database_config: DatabaseConfig,
194    ) -> DatabaseResult<Self> {
195        let tmp_dir = TempDir::new().unwrap();
196        let path = tmp_dir.path();
197        let result = Self::open(
198            path,
199            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200            database_config,
201        );
202        let mut db = result?;
203
204        db._drop = Arc::new(
205            {
206                move || {
207                    // cleanup temp dir
208                    drop(tmp_dir);
209                }
210            }
211            .into(),
212        );
213
214        Ok(db)
215    }
216
217    pub fn default_open<P: AsRef<Path>>(
218        path: P,
219        database_config: DatabaseConfig,
220    ) -> DatabaseResult<Self> {
221        Self::open(
222            path,
223            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224            database_config,
225        )
226    }
227
228    pub fn prune(path: &Path) -> DatabaseResult<()> {
229        let path = path.join(Description::name());
230        DB::destroy(&Options::default(), path)
231            .map_err(|e| DatabaseError::Other(e.into()))?;
232        Ok(())
233    }
234
235    pub fn open<P: AsRef<Path>>(
236        path: P,
237        columns: Vec<Description::Column>,
238        database_config: DatabaseConfig,
239    ) -> DatabaseResult<Self> {
240        Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241    }
242
243    pub fn open_read_only<P: AsRef<Path>>(
244        path: P,
245        columns: Vec<Description::Column>,
246        error_if_log_file_exist: bool,
247        database_config: DatabaseConfig,
248    ) -> DatabaseResult<Self> {
249        Self::open_with(
250            |options, primary_path, cfs| {
251                DB::open_cf_descriptors_read_only(
252                    options,
253                    primary_path,
254                    cfs,
255                    error_if_log_file_exist,
256                )
257            },
258            path,
259            columns,
260            database_config,
261        )
262    }
263
264    pub fn open_secondary<PrimaryPath, SecondaryPath>(
265        path: PrimaryPath,
266        secondary_path: SecondaryPath,
267        columns: Vec<Description::Column>,
268        database_config: DatabaseConfig,
269    ) -> DatabaseResult<Self>
270    where
271        PrimaryPath: AsRef<Path>,
272        SecondaryPath: AsRef<Path>,
273    {
274        Self::open_with(
275            |options, primary_path, cfs| {
276                DB::open_cf_descriptors_as_secondary(
277                    options,
278                    primary_path,
279                    secondary_path.as_ref().to_path_buf(),
280                    cfs,
281                )
282            },
283            path,
284            columns,
285            database_config,
286        )
287    }
288
289    pub fn open_with<F, P>(
290        opener: F,
291        path: P,
292        columns: Vec<Description::Column>,
293        database_config: DatabaseConfig,
294    ) -> DatabaseResult<Self>
295    where
296        F: Fn(
297            &Options,
298            PathBuf,
299            Vec<ColumnFamilyDescriptor>,
300        ) -> Result<DB, rocksdb::Error>,
301        P: AsRef<Path>,
302    {
303        let original_path = path.as_ref().to_path_buf();
304        let path = original_path.join(Description::name());
305        let metric_columns = columns
306            .iter()
307            .map(|column| (column.id(), column.name()))
308            .collect::<Vec<_>>();
309        let metrics = Arc::new(DatabaseMetrics::new(
310            Description::name().as_str(),
311            &metric_columns,
312        ));
313        let mut block_opts = BlockBasedOptions::default();
314        // See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
315        block_opts.set_format_version(5);
316
317        if let Some(capacity) = database_config.cache_capacity {
318            // Set cache size 1/3 of the capacity as recommended by
319            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
320            let block_cache_size = capacity / 3;
321            let cache = Cache::new_lru_cache(block_cache_size);
322            block_opts.set_block_cache(&cache);
323            // "index and filter blocks will be stored in block cache, together with all other data blocks."
324            // See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
325            block_opts.set_cache_index_and_filter_blocks(true);
326            // Don't evict L0 filter/index blocks from the cache
327            block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328        } else {
329            block_opts.disable_cache();
330        }
331        block_opts.set_bloom_filter(10.0, true);
332        block_opts.set_block_size(16 * 1024);
333
334        let mut opts = Options::default();
335        opts.set_compression_type(DBCompressionType::Lz4);
336        // TODO: Make it customizable https://github.com/FuelLabs/fuel-core/issues/1666
337        opts.set_max_total_wal_size(64 * 1024 * 1024);
338        let cpu_number =
339            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340        opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341        if let Some(capacity) = database_config.cache_capacity {
342            // Set cache size 1/3 of the capacity. Another 1/3 is
343            // used by block cache and the last 1 / 3 remains for other purposes:
344            //
345            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
346            let row_cache_size = capacity / 3;
347            let cache = Cache::new_lru_cache(row_cache_size);
348            opts.set_row_cache(&cache);
349        }
350        opts.set_max_background_jobs(6);
351        opts.set_bytes_per_sync(1048576);
352        opts.set_max_open_files(database_config.max_fds);
353
354        let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356        let mut cf_descriptors_to_open = BTreeMap::new();
357        let mut cf_descriptors_to_create = BTreeMap::new();
358        for column in columns.clone() {
359            let column_name = Self::col_name(column.id());
360            let opts = Self::cf_opts(column, &block_opts);
361            if existing_column_families.contains(&column_name) {
362                cf_descriptors_to_open.insert(column_name, opts);
363            } else {
364                cf_descriptors_to_create.insert(column_name, opts);
365            }
366        }
367
368        if database_config.columns_policy == ColumnsPolicy::OnCreation
369            || (database_config.columns_policy == ColumnsPolicy::Lazy
370                && cf_descriptors_to_open.is_empty())
371        {
372            opts.create_if_missing(true);
373        }
374
375        let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376            .iter()
377            .filter(|column_name| {
378                !cf_descriptors_to_open.contains_key(*column_name)
379                    && !cf_descriptors_to_create.contains_key(*column_name)
380            })
381            .map(|unknown_column_name| {
382                let unknown_column_options = Self::default_opts(&block_opts);
383                (unknown_column_name.clone(), unknown_column_options)
384            })
385            .collect();
386        cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388        let iterator = cf_descriptors_to_open
389            .clone()
390            .into_iter()
391            .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392            .collect::<Vec<_>>();
393
394        let db = match opener(&opts, path.clone(), iterator) {
395            Ok(db) => {
396                Ok(db)
397            },
398            Err(err) => {
399                tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401                let iterator = cf_descriptors_to_open
402                    .clone()
403                    .into_iter()
404                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405                    .collect::<Vec<_>>();
406
407                opener(&opts, path.clone(), iterator)
408            },
409        }
410        .map_err(|e| DatabaseError::Other(e.into()))?;
411
412        let create_family = match database_config.columns_policy {
413            ColumnsPolicy::OnCreation => {
414                for (name, opt) in cf_descriptors_to_create {
415                    db.create_cf(name, &opt)
416                        .map_err(|e| DatabaseError::Other(e.into()))?;
417                }
418                None
419            }
420            ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421        };
422        let db = Arc::new(PrimaryInstance(db));
423
424        let rocks_db = RocksDb {
425            read_options: Self::generate_read_options(&None),
426            block_opts: Arc::new(block_opts),
427            snapshot: None,
428            db,
429            metrics,
430            create_family,
431            _drop: Default::default(),
432            _marker: Default::default(),
433        };
434
435        Ok(rocks_db)
436    }
437
438    fn generate_read_options(
439        snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440    ) -> ReadOptions {
441        let mut opts = ReadOptions::default();
442        opts.set_verify_checksums(false);
443        if let Some(snapshot) = &snapshot {
444            opts.set_snapshot(snapshot);
445        }
446        opts
447    }
448
449    fn read_options(&self) -> ReadOptions {
450        Self::generate_read_options(&self.snapshot)
451    }
452
453    pub fn create_snapshot(&self) -> Self {
454        self.create_snapshot_generic()
455    }
456
457    pub fn create_snapshot_generic<TargetDescription>(
458        &self,
459    ) -> RocksDb<TargetDescription> {
460        let db = self.db.clone();
461        let block_opts = self.block_opts.clone();
462        let create_family = self.create_family.clone();
463        let metrics = self.metrics.clone();
464        let _drop = self._drop.clone();
465
466        // Safety: We are transmuting the snapshot to 'static lifetime, but it's safe
467        // because we are not going to use it after the RocksDb is dropped.
468        // We control the lifetime of the `Self` - RocksDb, so we can guarantee that
469        // the snapshot will be dropped before the RocksDb.
470        #[allow(clippy::missing_transmute_annotations)]
471        // Remove this and see for yourself
472        let snapshot = unsafe {
473            let snapshot = db.snapshot();
474            core::mem::transmute(snapshot)
475        };
476        let snapshot = Some(snapshot);
477
478        RocksDb {
479            read_options: Self::generate_read_options(&snapshot),
480            block_opts,
481            snapshot,
482            db,
483            create_family,
484            metrics,
485            _drop,
486            _marker: Default::default(),
487        }
488    }
489
490    fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491        self.cf_u32(column.id())
492    }
493
494    fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495        let family = self.db.cf_handle(&Self::col_name(column));
496
497        match family {
498            None => match &self.create_family {
499                Some(create_family) => {
500                    let mut lock = create_family
501                        .lock()
502                        .expect("The create family lock should be available");
503
504                    let name = Self::col_name(column);
505                    let Some(family) = lock.remove(&name) else {
506                        return self
507                            .db
508                            .cf_handle(&Self::col_name(column))
509                            .expect("No column family found");
510                    };
511
512                    self.db
513                        .create_cf(&name, &family)
514                        .expect("Couldn't create column family");
515
516                    self.db.cf_handle(&name).expect("invalid column state")
517                }
518                _ => {
519                    panic!("Columns in the DB should have been created on DB opening");
520                }
521            },
522            Some(family) => family,
523        }
524    }
525
526    fn col_name(column: u32) -> String {
527        format!("col-{}", column)
528    }
529
530    fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531        let mut opts = Options::default();
532        opts.create_if_missing(true);
533        opts.set_compression_type(DBCompressionType::Lz4);
534        opts.set_block_based_table_factory(block_opts);
535
536        opts
537    }
538
539    fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540        let mut opts = Self::default_opts(block_opts);
541
542        // All double-keys should be configured here
543        if let Some(size) = Description::prefix(&column) {
544            opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545        }
546
547        opts
548    }
549
550    /// RocksDB prefix iteration doesn't support reverse order,
551    /// so we need to to force the RocksDB iterator to order
552    /// all the prefix in the iterator so that we can take the next prefix
553    /// as start of iterator and iterate in reverse.
554    fn reverse_prefix_iter<T>(
555        &self,
556        prefix: &[u8],
557        column: Description::Column,
558    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559    where
560        T: ExtractItem,
561    {
562        let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563            let mut opts = self.read_options();
564            // We need this option because our iterator start in the `next_prefix` prefix section
565            // and continue in `prefix` section. Without this option the correct
566            // iteration between prefix section isn't guaranteed
567            // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
568            // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
569            opts.set_total_order_seek(true);
570            self.iterator::<T>(
571                column,
572                opts,
573                IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574            )
575        });
576
577        match reverse_iterator {
578            Some(iterator) => {
579                let prefix = prefix.to_vec();
580                iterator
581                    .take_while(move |item| {
582                        if let Ok(item) = item {
583                            T::starts_with(item, prefix.as_slice())
584                        } else {
585                            true
586                        }
587                    })
588                    .into_boxed()
589            }
590            _ => {
591                // No next item, so we can start backward iteration from the end.
592                let prefix = prefix.to_vec();
593                self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594                    .take_while(move |item| {
595                        if let Ok(item) = item {
596                            T::starts_with(item, prefix.as_slice())
597                        } else {
598                            true
599                        }
600                    })
601                    .into_boxed()
602            }
603        }
604    }
605
606    pub(crate) fn iterator<T>(
607        &self,
608        column: Description::Column,
609        opts: ReadOptions,
610        iter_mode: IteratorMode,
611    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612    where
613        T: ExtractItem,
614    {
615        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617        RocksDBKeyIterator::<_, T>::new(
618            self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619            iter_mode,
620        )
621        .map(move |item| {
622            item.inspect(|item| {
623                self.metrics.read_meter.inc();
624                column_metrics.map(|metric| metric.inc());
625                self.metrics.bytes_read.inc_by(T::size(item));
626            })
627            .map_err(|e| DatabaseError::Other(e.into()).into())
628        })
629    }
630
631    /// The fast way to remove all data from the column.
632    pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633        // Mark column for deletion
634        let column_name = Self::col_name(column.id());
635        self.db
636            .drop_cf(&column_name)
637            .map_err(|e| DatabaseError::Other(e.into()))?;
638
639        // Insert new fresh column without data
640        let column_name = Self::col_name(column.id());
641        let opts = Self::cf_opts(column, self.block_opts.as_ref());
642        self.db
643            .create_cf(&column_name, &opts)
644            .map_err(|e| DatabaseError::Other(e.into()))?;
645
646        Ok(())
647    }
648
649    pub fn multi_get<K, I>(
650        &self,
651        column: u32,
652        iterator: I,
653    ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654    where
655        I: Iterator<Item = K>,
656        K: AsRef<[u8]>,
657    {
658        let column_metrics = self.metrics.columns_read_statistic.get(&column);
659        let cl = self.cf_u32(column);
660        let results = self
661            .db
662            .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663            .into_iter()
664            .map(|el| {
665                self.metrics.read_meter.inc();
666                column_metrics.map(|metric| metric.inc());
667                el.map(|value| {
668                    value.inspect(|vec| {
669                        self.metrics.bytes_read.inc_by(vec.len() as u64);
670                    })
671                })
672                .map_err(|err| DatabaseError::Other(err.into()))
673            })
674            .try_collect()?;
675        Ok(results)
676    }
677
678    fn _iter_store<T>(
679        &self,
680        column: Description::Column,
681        prefix: Option<&[u8]>,
682        start: Option<&[u8]>,
683        direction: IterDirection,
684    ) -> BoxedIter<'_, StorageResult<T::Item>>
685    where
686        T: ExtractItem,
687    {
688        match (prefix, start) {
689            (None, None) => {
690                let iter_mode =
691                    // if no start or prefix just start iterating over entire keyspace
692                    match direction {
693                        IterDirection::Forward => IteratorMode::Start,
694                        // end always iterates in reverse
695                        IterDirection::Reverse => IteratorMode::End,
696                    };
697                self.iterator::<T>(column, self.read_options(), iter_mode)
698                    .into_boxed()
699            }
700            (Some(prefix), None) => {
701                if direction == IterDirection::Reverse {
702                    self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703                } else {
704                    // start iterating in a certain direction within the keyspace
705                    let iter_mode = IteratorMode::From(
706                        prefix,
707                        convert_to_rocksdb_direction(direction),
708                    );
709
710                    // Setting prefix on the RocksDB level to optimize iteration.
711                    let mut opts = self.read_options();
712                    opts.set_prefix_same_as_start(true);
713
714                    let prefix = prefix.to_vec();
715                    self.iterator::<T>(column, opts, iter_mode)
716                        // Not all tables has a prefix set, so we need to filter out the keys.
717                        .take_while(move |item| {
718                            if let Ok(item) = item {
719                                T::starts_with(item, prefix.as_slice())
720                            } else {
721                                true
722                            }
723                        })
724                        .into_boxed()
725                }
726            }
727            (None, Some(start)) => {
728                // start iterating in a certain direction from the start key
729                let iter_mode =
730                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731                let mut opts = self.read_options();
732                // We need this option because our iterator start in the `start` prefix section
733                // and continue in next sections. Without this option the correct
734                // iteration between prefix section isn't guaranteed
735                // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
736                // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
737                opts.set_total_order_seek(true);
738                self.iterator::<T>(column, opts, iter_mode).into_boxed()
739            }
740            (Some(prefix), Some(start)) => {
741                // TODO: Maybe we want to allow the `start` to be without a `prefix` in the future.
742                // If the `start` doesn't have the same `prefix`, return nothing.
743                if !start.starts_with(prefix) {
744                    return iter::empty().into_boxed();
745                }
746
747                // start iterating in a certain direction from the start key
748                // and end iterating when we've gone outside the prefix
749                let prefix = prefix.to_vec();
750                let iter_mode =
751                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752                self.iterator::<T>(column, self.read_options(), iter_mode)
753                    .take_while(move |item| {
754                        if let Ok(item) = item {
755                            T::starts_with(item, prefix.as_slice())
756                        } else {
757                            true
758                        }
759                    })
760                    .into_boxed()
761            }
762        }
763    }
764
765    #[cfg(feature = "backup")]
766    fn backup_engine<P: AsRef<Path> + ?Sized>(
767        backup_dir: &P,
768    ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769        use rocksdb::{
770            Env,
771            backup::{
772                BackupEngine,
773                BackupEngineOptions,
774            },
775        };
776
777        let backup_dir = backup_dir.as_ref().join(Description::name());
778        let backup_dir_path = backup_dir.as_path();
779
780        let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781            .map_err(|e| {
782                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783                    "Couldn't create backup engine options for path `{}`: {}",
784                    backup_dir_path.display(),
785                    e
786                ))
787            })?;
788
789        let cpu_number =
790            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792        backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794        let env = Env::new().map_err(|e| {
795            DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796                "Couldn't create environment for backup: {}",
797                e
798            ))
799        })?;
800
801        let backup_engine =
802            BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804                    "Couldn't open backup engine for path `{}`: {}",
805                    backup_dir.display(),
806                    e
807                ))
808            })?;
809
810        Ok(backup_engine)
811    }
812
813    #[cfg(feature = "backup")]
814    pub fn backup<P: AsRef<Path> + ?Sized>(
815        db_dir: &P,
816        backup_dir: &P,
817    ) -> DatabaseResult<()> {
818        let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820        let db_config = DatabaseConfig {
821            cache_capacity: None,
822            max_fds: -1,
823            columns_policy: ColumnsPolicy::Lazy,
824        };
825
826        let db = Self::open_read_only(
827            db_dir,
828            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829            false,
830            db_config,
831        )?;
832
833        backup_engine
834            .create_new_backup_flush(&db.db, true)
835            .map_err(|e| {
836                DatabaseError::BackupError(anyhow::anyhow!(
837                    "Couldn't create new backup for path `{}`: {}",
838                    backup_dir.as_ref().display(),
839                    e
840                ))
841            })?;
842
843        Ok(())
844    }
845
846    /// We delegate opening of restored db to consumer, so they can apply their own options
847    #[cfg(feature = "backup")]
848    pub fn restore<P: AsRef<Path> + ?Sized>(
849        db_dir: &P,
850        backup_dir: &P,
851    ) -> DatabaseResult<()> {
852        use rocksdb::backup::RestoreOptions;
853
854        let mut backup_engine = Self::backup_engine(backup_dir)?;
855        let restore_option = RestoreOptions::default();
856
857        let db_dir = db_dir.as_ref().join(Description::name());
858        let db_dir_path = db_dir.as_path();
859        // we use the default wal directory, which is same as db path
860        backup_engine
861            .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
862            .map_err(|e| {
863                DatabaseError::RestoreError(anyhow::anyhow!(
864                    "Couldn't restore from latest backup for path `{}`: {}",
865                    db_dir_path.display(),
866                    e
867                ))
868            })?;
869
870        Ok(())
871    }
872
873    pub fn shutdown(&self) {
874        while Arc::strong_count(&self.db) > 1 {}
875    }
876}
877
878pub(crate) struct KeyOnly;
879
880impl ExtractItem for KeyOnly {
881    type Item = Vec<u8>;
882
883    fn extract_item<D>(
884        raw_iterator: &DBRawIteratorWithThreadMode<D>,
885    ) -> Option<Self::Item>
886    where
887        D: DBAccess,
888    {
889        raw_iterator.key().map(|key| key.to_vec())
890    }
891
892    fn size(item: &Self::Item) -> u64 {
893        item.len() as u64
894    }
895
896    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
897        item.starts_with(prefix)
898    }
899}
900
901pub(crate) struct KeyAndValue;
902
903impl ExtractItem for KeyAndValue {
904    type Item = (Vec<u8>, Value);
905
906    fn extract_item<D>(
907        raw_iterator: &DBRawIteratorWithThreadMode<D>,
908    ) -> Option<Self::Item>
909    where
910        D: DBAccess,
911    {
912        raw_iterator
913            .item()
914            .map(|(key, value)| (key.to_vec(), Value::from(value)))
915    }
916
917    fn size(item: &Self::Item) -> u64 {
918        item.0.len().saturating_add(item.1.len()) as u64
919    }
920
921    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
922        item.0.starts_with(prefix)
923    }
924}
925
926impl<Description> KeyValueInspect for RocksDb<Description>
927where
928    Description: DatabaseDescription,
929{
930    type Column = Description::Column;
931
932    fn size_of_value(
933        &self,
934        key: &[u8],
935        column: Self::Column,
936    ) -> StorageResult<Option<usize>> {
937        self.metrics.read_meter.inc();
938        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
939        column_metrics.map(|metric| metric.inc());
940
941        Ok(self
942            .db
943            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
944            .map_err(|e| DatabaseError::Other(e.into()))?
945            .map(|value| value.len()))
946    }
947
948    fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
949        self.metrics.read_meter.inc();
950        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
951        column_metrics.map(|metric| metric.inc());
952
953        let value = self
954            .db
955            .get_cf_opt(&self.cf(column), key, &self.read_options)
956            .map_err(|e| DatabaseError::Other(e.into()))?;
957
958        if let Some(value) = &value {
959            self.metrics.bytes_read.inc_by(value.len() as u64);
960        }
961
962        Ok(value.map(Arc::from))
963    }
964
965    fn read_exact(
966        &self,
967        key: &[u8],
968        column: Self::Column,
969        offset: usize,
970        buf: &mut [u8],
971    ) -> StorageResult<Result<usize, StorageReadError>> {
972        self.metrics.read_meter.inc();
973        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
974        column_metrics.map(|metric| metric.inc());
975
976        let Some(value) = self
977            .db
978            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
979            .map_err(|e| DatabaseError::Other(e.into()))?
980        else {
981            return Ok(Err(StorageReadError::KeyNotFound));
982        };
983
984        let bytes_len = value.len();
985        let start = offset;
986        let buf_len = buf.len();
987        let end = offset.saturating_add(buf_len);
988
989        if end > bytes_len {
990            return Ok(Err(StorageReadError::OutOfBounds));
991        }
992
993        let starting_from_offset = &value[start..end];
994        buf[..].copy_from_slice(starting_from_offset);
995
996        self.metrics
997            .bytes_read
998            .inc_by(starting_from_offset.len() as u64);
999
1000        Ok(Ok(buf_len))
1001    }
1002
1003    fn read_zerofill(
1004        &self,
1005        key: &[u8],
1006        column: Self::Column,
1007        offset: usize,
1008        buf: &mut [u8],
1009    ) -> StorageResult<Result<usize, StorageReadError>> {
1010        self.metrics.read_meter.inc();
1011        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
1012        column_metrics.map(|metric| metric.inc());
1013
1014        let Some(value) = self
1015            .db
1016            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
1017            .map_err(|e| DatabaseError::Other(e.into()))?
1018        else {
1019            return Ok(Err(StorageReadError::KeyNotFound));
1020        };
1021
1022        let bytes_len = value.len();
1023        let buf_len = buf.len();
1024
1025        let Some((_, after)) = value.split_at_checked(offset) else {
1026            return Ok(Err(StorageReadError::OutOfBounds));
1027        };
1028
1029        let (dst, rest) = buf.split_at_mut(buf_len.min(after.len()));
1030        dst.copy_from_slice(&after[..dst.len()]);
1031        rest.fill(0);
1032
1033        self.metrics.bytes_read.inc_by(dst.len() as u64);
1034
1035        Ok(Ok(bytes_len))
1036    }
1037}
1038
1039impl<Description> IterableStore for RocksDb<Description>
1040where
1041    Description: DatabaseDescription,
1042{
1043    fn iter_store(
1044        &self,
1045        column: Self::Column,
1046        prefix: Option<&[u8]>,
1047        start: Option<&[u8]>,
1048        direction: IterDirection,
1049    ) -> BoxedIter<'_, KVItem> {
1050        self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1051    }
1052
1053    fn iter_store_keys(
1054        &self,
1055        column: Self::Column,
1056        prefix: Option<&[u8]>,
1057        start: Option<&[u8]>,
1058        direction: IterDirection,
1059    ) -> BoxedIter<'_, KeyItem> {
1060        self._iter_store::<KeyOnly>(column, prefix, start, direction)
1061    }
1062}
1063
1064impl<Description> RocksDb<Description>
1065where
1066    Description: DatabaseDescription,
1067{
1068    pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1069        let instant = std::time::Instant::now();
1070        let mut batch = WriteBatch::default();
1071        let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1072
1073        match changes {
1074            StorageChanges::Changes(changes) => {
1075                self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1076            }
1077            StorageChanges::ChangesList(changes_list) => {
1078                for changes in changes_list {
1079                    self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1080                }
1081            }
1082        }
1083
1084        self.db
1085            .write(batch)
1086            .map_err(|e| DatabaseError::Other(e.into()))?;
1087        // TODO: Use `u128` when `AtomicU128` is stable.
1088        self.metrics.database_commit_time.inc_by(
1089            u64::try_from(instant.elapsed().as_nanos())
1090                .expect("The commit shouldn't take longer than `u64`"),
1091        );
1092
1093        Ok(())
1094    }
1095
1096    fn _populate_batch<'a>(
1097        &self,
1098        batch: &mut WriteBatch,
1099        conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1100        changes: &'a Changes,
1101    ) -> DatabaseResult<()> {
1102        for (column, ops) in changes {
1103            let cf = self.cf_u32(*column);
1104            let column_metrics = self.metrics.columns_write_statistic.get(column);
1105            for (key, op) in ops {
1106                self.metrics.write_meter.inc();
1107                column_metrics.map(|metric| metric.inc());
1108
1109                if !conflict_finder.insert((column, key)) {
1110                    return Err(DatabaseError::ConflictingChanges {
1111                        column: *column,
1112                        key: key.clone(),
1113                    });
1114                }
1115
1116                match op {
1117                    WriteOperation::Insert(value) => {
1118                        self.metrics.bytes_written.inc_by(value.len() as u64);
1119                        batch.put_cf(&cf, key, value.as_ref());
1120                    }
1121                    WriteOperation::Remove => {
1122                        batch.delete_cf(&cf, key);
1123                    }
1124                }
1125            }
1126        }
1127        Ok(())
1128    }
1129}
1130
1131/// The `None` means overflow, so there is not following prefix.
1132fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1133    for byte in prefix.iter_mut().rev() {
1134        if let Some(new_byte) = byte.checked_add(1) {
1135            *byte = new_byte;
1136            return Some(prefix);
1137        }
1138    }
1139    None
1140}
1141
1142#[cfg(feature = "test-helpers")]
1143pub mod test_helpers {
1144    use super::*;
1145    use fuel_core_storage::{
1146        kv_store::KeyValueMutate,
1147        transactional::ReadTransaction,
1148    };
1149
1150    impl<Description> KeyValueMutate for RocksDb<Description>
1151    where
1152        Description: DatabaseDescription,
1153    {
1154        fn write(
1155            &mut self,
1156            key: &[u8],
1157            column: Self::Column,
1158            buf: &[u8],
1159        ) -> StorageResult<usize> {
1160            let mut transaction = self.read_transaction();
1161            let len = transaction.write(key, column, buf)?;
1162            let changes = transaction.into_changes();
1163            self.commit_changes(&StorageChanges::Changes(changes))?;
1164
1165            Ok(len)
1166        }
1167
1168        fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1169            let mut transaction = self.read_transaction();
1170            transaction.delete(key, column)?;
1171            let changes = transaction.into_changes();
1172            self.commit_changes(&StorageChanges::Changes(changes))?;
1173            Ok(())
1174        }
1175    }
1176}
1177
1178#[allow(non_snake_case)]
1179#[cfg(test)]
1180mod tests {
1181    use super::*;
1182    use crate::database::database_description::on_chain::OnChain;
1183    use fuel_core_storage::{
1184        column::Column,
1185        kv_store::KeyValueMutate,
1186    };
1187    use std::collections::{
1188        BTreeMap,
1189        HashMap,
1190    };
1191    use tempfile::TempDir;
1192
1193    fn create_db() -> (RocksDb<OnChain>, TempDir) {
1194        let tmp_dir = TempDir::new().unwrap();
1195        (
1196            RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1197                .unwrap(),
1198            tmp_dir,
1199        )
1200    }
1201
1202    #[test]
1203    fn open_new_columns() {
1204        let tmp_dir = TempDir::new().unwrap();
1205
1206        // Given
1207        let old_columns =
1208            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1209        let database_with_old_columns = RocksDb::<OnChain>::open(
1210            tmp_dir.path(),
1211            old_columns.clone(),
1212            DatabaseConfig::config_for_tests(),
1213        )
1214        .expect("Failed to open database with old columns");
1215        drop(database_with_old_columns);
1216
1217        // When
1218        let mut new_columns = old_columns;
1219        new_columns.push(Column::ContractsAssets);
1220        new_columns.push(Column::Metadata);
1221        let database_with_new_columns = RocksDb::<OnChain>::open(
1222            tmp_dir.path(),
1223            new_columns,
1224            DatabaseConfig::config_for_tests(),
1225        )
1226        .map(|_| ());
1227
1228        // Then
1229        assert_eq!(Ok(()), database_with_new_columns);
1230    }
1231
1232    #[test]
1233    fn can_put_and_read() {
1234        let key = vec![0xA, 0xB, 0xC];
1235
1236        let (mut db, _tmp) = create_db();
1237        let expected = Value::from([1, 2, 3]);
1238        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1239
1240        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1241    }
1242
1243    #[test]
1244    fn put_returns_previous_value() {
1245        let key = vec![0xA, 0xB, 0xC];
1246
1247        let (mut db, _tmp) = create_db();
1248        let expected = Value::from([1, 2, 3]);
1249        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1250        let prev = db
1251            .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1252            .unwrap();
1253
1254        assert_eq!(prev, Some(expected));
1255    }
1256
1257    #[test]
1258    fn delete_and_get() {
1259        let key = vec![0xA, 0xB, 0xC];
1260
1261        let (mut db, _tmp) = create_db();
1262        let expected = Value::from([1, 2, 3]);
1263        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1264        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1265
1266        db.delete(&key, Column::Metadata).unwrap();
1267        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1268    }
1269
1270    #[test]
1271    fn key_exists() {
1272        let key = vec![0xA, 0xB, 0xC];
1273
1274        let (mut db, _tmp) = create_db();
1275        let expected = Arc::new([1, 2, 3]);
1276        db.put(&key, Column::Metadata, expected).unwrap();
1277        assert!(db.exists(&key, Column::Metadata).unwrap());
1278    }
1279
1280    #[test]
1281    fn commit_changes_inserts() {
1282        let key = vec![0xA, 0xB, 0xC];
1283        let value = Value::from([1, 2, 3]);
1284
1285        let (db, _tmp) = create_db();
1286        let ops = vec![(
1287            Column::Metadata.id(),
1288            BTreeMap::from_iter(vec![(
1289                key.clone().into(),
1290                WriteOperation::Insert(value.clone()),
1291            )]),
1292        )];
1293
1294        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1295            .unwrap();
1296        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1297    }
1298
1299    #[test]
1300    fn commit_changes_removes() {
1301        let key = vec![0xA, 0xB, 0xC];
1302        let value = Arc::new([1, 2, 3]);
1303
1304        let (mut db, _tmp) = create_db();
1305        db.put(&key, Column::Metadata, value).unwrap();
1306
1307        let ops = vec![(
1308            Column::Metadata.id(),
1309            BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1310        )];
1311        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1312            .unwrap();
1313
1314        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1315    }
1316
1317    #[test]
1318    fn can_use_unit_value() {
1319        let key = vec![0x00];
1320
1321        let (mut db, _tmp) = create_db();
1322        let expected = Value::from([]);
1323        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1324
1325        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1326
1327        assert!(db.exists(&key, Column::Metadata).unwrap());
1328
1329        assert_eq!(
1330            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1331                .collect::<Result<Vec<_>, _>>()
1332                .unwrap()[0],
1333            (key.clone(), expected.clone())
1334        );
1335
1336        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1337
1338        assert!(!db.exists(&key, Column::Metadata).unwrap());
1339    }
1340
1341    #[test]
1342    fn can_use_unit_key() {
1343        let key: Vec<u8> = Vec::with_capacity(0);
1344
1345        let (mut db, _tmp) = create_db();
1346        let expected = Value::from([1, 2, 3]);
1347        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1348
1349        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1350
1351        assert!(db.exists(&key, Column::Metadata).unwrap());
1352
1353        assert_eq!(
1354            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1355                .collect::<Result<Vec<_>, _>>()
1356                .unwrap()[0],
1357            (key.clone(), expected.clone())
1358        );
1359
1360        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1361
1362        assert!(!db.exists(&key, Column::Metadata).unwrap());
1363    }
1364
1365    #[test]
1366    fn can_use_unit_key_and_value() {
1367        let key: Vec<u8> = Vec::with_capacity(0);
1368
1369        let (mut db, _tmp) = create_db();
1370        let expected = Value::from([]);
1371        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1372
1373        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1374
1375        assert!(db.exists(&key, Column::Metadata).unwrap());
1376
1377        assert_eq!(
1378            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1379                .collect::<Result<Vec<_>, _>>()
1380                .unwrap()[0],
1381            (key.clone(), expected.clone())
1382        );
1383
1384        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1385
1386        assert!(!db.exists(&key, Column::Metadata).unwrap());
1387    }
1388
1389    #[test]
1390    fn open_primary_db_second_time_fails() {
1391        // Given
1392        let (_primary_db, tmp_dir) = create_db();
1393
1394        // When
1395        let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1396            .collect::<Vec<_>>();
1397        let result = RocksDb::<OnChain>::open(
1398            tmp_dir.path(),
1399            columns,
1400            DatabaseConfig::config_for_tests(),
1401        );
1402
1403        // Then
1404        assert!(result.is_err());
1405    }
1406
1407    #[test]
1408    fn open_second_read_only_db() {
1409        // Given
1410        let (_primary_db, tmp_dir) = create_db();
1411
1412        // When
1413        let old_columns =
1414            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1415        let result = RocksDb::<OnChain>::open_read_only(
1416            tmp_dir.path(),
1417            old_columns.clone(),
1418            false,
1419            DatabaseConfig::config_for_tests(),
1420        )
1421        .map(|_| ());
1422
1423        // Then
1424        assert_eq!(Ok(()), result);
1425    }
1426
1427    #[test]
1428    fn open_secondary_db() {
1429        // Given
1430        let (_primary_db, tmp_dir) = create_db();
1431        let secondary_temp = TempDir::new().unwrap();
1432
1433        // When
1434        let old_columns =
1435            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1436        let result = RocksDb::<OnChain>::open_secondary(
1437            tmp_dir.path(),
1438            secondary_temp.path(),
1439            old_columns.clone(),
1440            DatabaseConfig::config_for_tests(),
1441        )
1442        .map(|_| ());
1443
1444        // Then
1445        assert_eq!(Ok(()), result);
1446    }
1447
1448    #[test]
1449    fn snapshot_allows_get_entry_after_it_was_removed() {
1450        let (mut db, _tmp) = create_db();
1451        let value = Value::from([1, 2, 3]);
1452
1453        // Given
1454        let key_1 = [1; 32];
1455        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1456        let snapshot = db.create_snapshot();
1457
1458        // When
1459        db.delete(&key_1, Column::Metadata).unwrap();
1460
1461        // Then
1462        let db_get = db.get(&key_1, Column::Metadata).unwrap();
1463        assert!(db_get.is_none());
1464
1465        let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1466        assert_eq!(snapshot_get, Some(value));
1467    }
1468
1469    #[test]
1470    fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1471        let (mut db, _tmp) = create_db();
1472        let value = Value::from([1, 2, 3]);
1473
1474        // Given
1475        let key_1 = vec![1; 32];
1476        let key_2 = vec![2; 32];
1477        let key_3 = vec![3; 32];
1478        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1479        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1480        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1481        let snapshot = db.create_snapshot();
1482
1483        // When
1484        db.delete(&key_1, Column::Metadata).unwrap();
1485        db.delete(&key_2, Column::Metadata).unwrap();
1486        db.delete(&key_3, Column::Metadata).unwrap();
1487
1488        // Then
1489        let db_iter = db
1490            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1491            .collect::<Vec<_>>();
1492        assert!(db_iter.is_empty());
1493
1494        let snapshot_iter = snapshot
1495            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1496            .collect::<Vec<_>>();
1497        assert_eq!(
1498            snapshot_iter,
1499            vec![
1500                Ok((key_1, value.clone())),
1501                Ok((key_2, value.clone())),
1502                Ok((key_3, value))
1503            ]
1504        );
1505    }
1506
1507    #[test]
1508    fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1509        let (db, _tmp) = create_db();
1510
1511        // Given
1512        let snapshot = db.create_snapshot();
1513
1514        // When
1515        drop(db);
1516
1517        // Then
1518        drop(snapshot);
1519    }
1520
1521    #[test]
1522    fn open__opens_subset_of_columns_after_opening_all_columns() {
1523        // Given
1524        let (first_open_with_all_columns, tmp_dir) = create_db();
1525
1526        // When
1527        drop(first_open_with_all_columns);
1528        let part_of_columns =
1529            enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1530                .skip(1)
1531                .collect::<Vec<_>>();
1532        let open_with_part_of_columns = RocksDb::<OnChain>::open(
1533            tmp_dir.path(),
1534            part_of_columns,
1535            DatabaseConfig::config_for_tests(),
1536        );
1537
1538        // Then
1539        let _ = open_with_part_of_columns
1540            .expect("Should open the database with shorter number of columns");
1541    }
1542
1543    #[test]
1544    fn iter_store__reverse_iterator__no_target_prefix() {
1545        // Given
1546        let (mut db, _tmp) = create_db();
1547        let value = Value::from([]);
1548        let key_1 = [1, 1];
1549        let key_2 = [2, 2];
1550        let key_3 = [9, 3];
1551        let key_4 = [10, 0];
1552        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1553        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1554        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1555        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1556
1557        // When
1558        let db_iter = db
1559            .iter_store(
1560                Column::Metadata,
1561                Some(vec![5].as_slice()),
1562                None,
1563                IterDirection::Reverse,
1564            )
1565            .map(|item| item.map(|(key, _)| key))
1566            .collect::<Vec<_>>();
1567
1568        // Then
1569        assert_eq!(db_iter, vec![]);
1570    }
1571
1572    #[test]
1573    fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1574        // Given
1575        let (mut db, _tmp) = create_db();
1576        let value = Value::from([]);
1577        let key_1 = [1, 1];
1578        let key_2 = [2, 2];
1579        let key_3 = [2, 3];
1580        let key_4 = [10, 0];
1581        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1582        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1583        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1584        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1585
1586        // When
1587        let db_iter = db
1588            .iter_store(
1589                Column::Metadata,
1590                Some(vec![2].as_slice()),
1591                None,
1592                IterDirection::Reverse,
1593            )
1594            .map(|item| item.map(|(key, _)| key))
1595            .collect::<Vec<_>>();
1596
1597        // Then
1598        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1599    }
1600
1601    #[test]
1602    fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1603        // Given
1604        let (mut db, _tmp) = create_db();
1605        let value = Value::from([]);
1606        let key_1 = [1, 1];
1607        let key_2 = [2, 2];
1608        let key_3 = [2, 3];
1609        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1610        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1611        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1612
1613        // When
1614        let db_iter = db
1615            .iter_store(
1616                Column::Metadata,
1617                Some(vec![2].as_slice()),
1618                None,
1619                IterDirection::Reverse,
1620            )
1621            .map(|item| item.map(|(key, _)| key))
1622            .collect::<Vec<_>>();
1623
1624        // Then
1625        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1626    }
1627
1628    #[test]
1629    fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1630        // Given
1631        let (mut db, _tmp) = create_db();
1632        let value = Value::from([]);
1633        let key_1 = [1, 1];
1634        let key_2 = [255, 254];
1635        let key_3 = [255, 255];
1636        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1637        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1638        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1639
1640        // When
1641        let db_iter = db
1642            .iter_store(
1643                Column::Metadata,
1644                Some(vec![255].as_slice()),
1645                None,
1646                IterDirection::Reverse,
1647            )
1648            .map(|item| item.map(|(key, _)| key))
1649            .collect::<Vec<_>>();
1650
1651        // Then
1652        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1653    }
1654
1655    #[test]
1656    fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1657        // Given
1658        let (mut db, _tmp) = create_db();
1659        let value = Value::from([]);
1660        let key_1 = [1, 1];
1661        let key_2 = [255, 254];
1662        let key_3 = [255, 255];
1663        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1664        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1665        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1666
1667        // When
1668        db.clear_table(Column::Metadata).unwrap();
1669
1670        // Then
1671        let db_iter = db
1672            .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1673            .collect::<Vec<_>>();
1674        assert_eq!(db_iter, vec![]);
1675    }
1676}