Skip to main content

fuel_core/state/
rocks_db.rs

1use crate::{
2    database::{
3        Error as DatabaseError,
4        Result as DatabaseResult,
5        convert_to_rocksdb_direction,
6        database_description::DatabaseDescription,
7    },
8    state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12    ExtractItem,
13    RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18    Result as StorageResult,
19    StorageReadError,
20    iter::{
21        BoxedIter,
22        IntoBoxedIter,
23        IterableStore,
24    },
25    kv_store::{
26        KVItem,
27        KeyItem,
28        KeyValueInspect,
29        StorageColumn,
30        Value,
31        WriteOperation,
32    },
33    transactional::{
34        Changes,
35        ReferenceBytesKey,
36        StorageChanges,
37    },
38};
39use itertools::Itertools;
40use rocksdb::{
41    BlockBasedOptions,
42    BoundColumnFamily,
43    Cache,
44    ColumnFamilyDescriptor,
45    DBAccess,
46    DBCompressionType,
47    DBRawIteratorWithThreadMode,
48    DBWithThreadMode,
49    IteratorMode,
50    MultiThreaded,
51    Options,
52    ReadOptions,
53    SliceTransform,
54    WriteBatch,
55};
56use std::{
57    cmp,
58    collections::{
59        BTreeMap,
60        HashSet,
61    },
62    fmt,
63    fmt::Formatter,
64    iter,
65    path::{
66        Path,
67        PathBuf,
68    },
69    sync::{
70        Arc,
71        Mutex,
72    },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80    type Target = DBWithThreadMode<MultiThreaded>;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl Drop for PrimaryInstance {
88    fn drop(&mut self) {
89        self.cancel_all_background_work(true);
90    }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98    // move resources into this closure to have them dropped when db drops
99    drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104        write!(f, "DropResources")
105    }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109    fn from(closure: F) -> Self {
110        Self {
111            drop: Option::Some(Box::new(closure)),
112        }
113    }
114}
115
116impl Drop for DropResources {
117    fn drop(&mut self) {
118        if let Some(drop) = self.drop.take() {
119            (drop)()
120        }
121    }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125/// Defined behaviour for opening the columns of the database.
126pub enum ColumnsPolicy {
127    #[cfg_attr(not(feature = "rocksdb-production"), default)]
128    // Open a new column only when a database interaction is done with it.
129    Lazy,
130    #[cfg_attr(feature = "rocksdb-production", default)]
131    // Open all columns on creation on the service.
132    OnCreation,
133}
134
135/// Configuration to create a database
136#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138    pub cache_capacity: Option<usize>,
139    pub max_fds: i32,
140    pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145    pub fn config_for_tests() -> Self {
146        Self {
147            cache_capacity: None,
148            max_fds: 512,
149            columns_policy: ColumnsPolicy::Lazy,
150        }
151    }
152}
153
154pub struct RocksDb<Description> {
155    read_options: ReadOptions,
156    db: Arc<PrimaryInstance>,
157    block_opts: Arc<BlockBasedOptions>,
158    create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159    snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160    metrics: Arc<DatabaseMetrics>,
161    // used for RAII
162    _drop: Arc<DropResources>,
163    _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167    fn drop(&mut self) {
168        // Drop the snapshot before the db.
169        // Dropping the snapshot after the db will cause a sigsegv.
170        self.snapshot = None;
171    }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176        f.debug_struct("RocksDb").field("db", &self.db).finish()
177    }
178}
179
180impl<Description> RocksDb<Description>
181where
182    Description: DatabaseDescription,
183{
184    pub fn default_open_temp() -> DatabaseResult<Self> {
185        Self::default_open_temp_with_params(DatabaseConfig {
186            cache_capacity: None,
187            max_fds: 512,
188            columns_policy: ColumnsPolicy::Lazy,
189        })
190    }
191
192    pub fn default_open_temp_with_params(
193        database_config: DatabaseConfig,
194    ) -> DatabaseResult<Self> {
195        let tmp_dir = TempDir::new().unwrap();
196        let path = tmp_dir.path();
197        let result = Self::open(
198            path,
199            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200            database_config,
201        );
202        let mut db = result?;
203
204        db._drop = Arc::new(
205            {
206                move || {
207                    // cleanup temp dir
208                    drop(tmp_dir);
209                }
210            }
211            .into(),
212        );
213
214        Ok(db)
215    }
216
217    pub fn default_open<P: AsRef<Path>>(
218        path: P,
219        database_config: DatabaseConfig,
220    ) -> DatabaseResult<Self> {
221        Self::open(
222            path,
223            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224            database_config,
225        )
226    }
227
228    pub fn prune(path: &Path) -> DatabaseResult<()> {
229        let path = path.join(Description::name());
230        DB::destroy(&Options::default(), path)
231            .map_err(|e| DatabaseError::Other(e.into()))?;
232        Ok(())
233    }
234
235    pub fn open<P: AsRef<Path>>(
236        path: P,
237        columns: Vec<Description::Column>,
238        database_config: DatabaseConfig,
239    ) -> DatabaseResult<Self> {
240        Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241    }
242
243    pub fn open_read_only<P: AsRef<Path>>(
244        path: P,
245        columns: Vec<Description::Column>,
246        error_if_log_file_exist: bool,
247        database_config: DatabaseConfig,
248    ) -> DatabaseResult<Self> {
249        Self::open_with(
250            |options, primary_path, cfs| {
251                DB::open_cf_descriptors_read_only(
252                    options,
253                    primary_path,
254                    cfs,
255                    error_if_log_file_exist,
256                )
257            },
258            path,
259            columns,
260            database_config,
261        )
262    }
263
264    pub fn open_secondary<PrimaryPath, SecondaryPath>(
265        path: PrimaryPath,
266        secondary_path: SecondaryPath,
267        columns: Vec<Description::Column>,
268        database_config: DatabaseConfig,
269    ) -> DatabaseResult<Self>
270    where
271        PrimaryPath: AsRef<Path>,
272        SecondaryPath: AsRef<Path>,
273    {
274        Self::open_with(
275            |options, primary_path, cfs| {
276                DB::open_cf_descriptors_as_secondary(
277                    options,
278                    primary_path,
279                    secondary_path.as_ref().to_path_buf(),
280                    cfs,
281                )
282            },
283            path,
284            columns,
285            database_config,
286        )
287    }
288
289    pub fn open_with<F, P>(
290        opener: F,
291        path: P,
292        columns: Vec<Description::Column>,
293        database_config: DatabaseConfig,
294    ) -> DatabaseResult<Self>
295    where
296        F: Fn(
297            &Options,
298            PathBuf,
299            Vec<ColumnFamilyDescriptor>,
300        ) -> Result<DB, rocksdb::Error>,
301        P: AsRef<Path>,
302    {
303        let original_path = path.as_ref().to_path_buf();
304        let path = original_path.join(Description::name());
305        let metric_columns = columns
306            .iter()
307            .map(|column| (column.id(), column.name()))
308            .collect::<Vec<_>>();
309        let metrics = Arc::new(DatabaseMetrics::new(
310            Description::name().as_str(),
311            &metric_columns,
312        ));
313        let mut block_opts = BlockBasedOptions::default();
314        // See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
315        block_opts.set_format_version(5);
316
317        if let Some(capacity) = database_config.cache_capacity {
318            // Set cache size 1/3 of the capacity as recommended by
319            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
320            let block_cache_size = capacity / 3;
321            let cache = Cache::new_lru_cache(block_cache_size);
322            block_opts.set_block_cache(&cache);
323            // "index and filter blocks will be stored in block cache, together with all other data blocks."
324            // See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
325            block_opts.set_cache_index_and_filter_blocks(true);
326            // Don't evict L0 filter/index blocks from the cache
327            block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328        } else {
329            block_opts.disable_cache();
330        }
331        block_opts.set_bloom_filter(10.0, true);
332        block_opts.set_block_size(16 * 1024);
333
334        let mut opts = Options::default();
335        opts.set_compression_type(DBCompressionType::Lz4);
336        // TODO: Make it customizable https://github.com/FuelLabs/fuel-core/issues/1666
337        opts.set_max_total_wal_size(64 * 1024 * 1024);
338        let cpu_number =
339            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340        opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341        if let Some(capacity) = database_config.cache_capacity {
342            // Set cache size 1/3 of the capacity. Another 1/3 is
343            // used by block cache and the last 1 / 3 remains for other purposes:
344            //
345            // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
346            let row_cache_size = capacity / 3;
347            let cache = Cache::new_lru_cache(row_cache_size);
348            opts.set_row_cache(&cache);
349        }
350        opts.set_max_background_jobs(6);
351        opts.set_bytes_per_sync(1048576);
352        opts.set_max_open_files(database_config.max_fds);
353
354        let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356        let mut cf_descriptors_to_open = BTreeMap::new();
357        let mut cf_descriptors_to_create = BTreeMap::new();
358        for column in columns.clone() {
359            let column_name = Self::col_name(column.id());
360            let opts = Self::cf_opts(column, &block_opts);
361            if existing_column_families.contains(&column_name) {
362                cf_descriptors_to_open.insert(column_name, opts);
363            } else {
364                cf_descriptors_to_create.insert(column_name, opts);
365            }
366        }
367
368        if database_config.columns_policy == ColumnsPolicy::OnCreation
369            || (database_config.columns_policy == ColumnsPolicy::Lazy
370                && cf_descriptors_to_open.is_empty())
371        {
372            opts.create_if_missing(true);
373        }
374
375        let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376            .iter()
377            .filter(|column_name| {
378                !cf_descriptors_to_open.contains_key(*column_name)
379                    && !cf_descriptors_to_create.contains_key(*column_name)
380            })
381            .map(|unknown_column_name| {
382                let unknown_column_options = Self::default_opts(&block_opts);
383                (unknown_column_name.clone(), unknown_column_options)
384            })
385            .collect();
386        cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388        let iterator = cf_descriptors_to_open
389            .clone()
390            .into_iter()
391            .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392            .collect::<Vec<_>>();
393
394        let db = match opener(&opts, path.clone(), iterator) {
395            Ok(db) => {
396                Ok(db)
397            },
398            Err(err) => {
399                tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401                let iterator = cf_descriptors_to_open
402                    .clone()
403                    .into_iter()
404                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405                    .collect::<Vec<_>>();
406
407                opener(&opts, path.clone(), iterator)
408            },
409        }
410        .map_err(|e| DatabaseError::Other(e.into()))?;
411
412        let create_family = match database_config.columns_policy {
413            ColumnsPolicy::OnCreation => {
414                for (name, opt) in cf_descriptors_to_create {
415                    db.create_cf(name, &opt)
416                        .map_err(|e| DatabaseError::Other(e.into()))?;
417                }
418                None
419            }
420            ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421        };
422        let db = Arc::new(PrimaryInstance(db));
423
424        let rocks_db = RocksDb {
425            read_options: Self::generate_read_options(&None),
426            block_opts: Arc::new(block_opts),
427            snapshot: None,
428            db,
429            metrics,
430            create_family,
431            _drop: Default::default(),
432            _marker: Default::default(),
433        };
434
435        Ok(rocks_db)
436    }
437
438    fn generate_read_options(
439        snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440    ) -> ReadOptions {
441        let mut opts = ReadOptions::default();
442        opts.set_verify_checksums(false);
443        if let Some(snapshot) = &snapshot {
444            opts.set_snapshot(snapshot);
445        }
446        opts
447    }
448
449    fn read_options(&self) -> ReadOptions {
450        Self::generate_read_options(&self.snapshot)
451    }
452
453    pub fn create_snapshot(&self) -> Self {
454        self.create_snapshot_generic()
455    }
456
457    pub fn create_snapshot_generic<TargetDescription>(
458        &self,
459    ) -> RocksDb<TargetDescription> {
460        let db = self.db.clone();
461        let block_opts = self.block_opts.clone();
462        let create_family = self.create_family.clone();
463        let metrics = self.metrics.clone();
464        let _drop = self._drop.clone();
465
466        // Safety: We are transmuting the snapshot to 'static lifetime, but it's safe
467        // because we are not going to use it after the RocksDb is dropped.
468        // We control the lifetime of the `Self` - RocksDb, so we can guarantee that
469        // the snapshot will be dropped before the RocksDb.
470        #[allow(clippy::missing_transmute_annotations)]
471        // Remove this and see for yourself
472        let snapshot = unsafe {
473            let snapshot = db.snapshot();
474            core::mem::transmute(snapshot)
475        };
476        let snapshot = Some(snapshot);
477
478        RocksDb {
479            read_options: Self::generate_read_options(&snapshot),
480            block_opts,
481            snapshot,
482            db,
483            create_family,
484            metrics,
485            _drop,
486            _marker: Default::default(),
487        }
488    }
489
490    fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491        self.cf_u32(column.id())
492    }
493
494    fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495        let family = self.db.cf_handle(&Self::col_name(column));
496
497        match family {
498            None => match &self.create_family {
499                Some(create_family) => {
500                    let mut lock = create_family
501                        .lock()
502                        .expect("The create family lock should be available");
503
504                    let name = Self::col_name(column);
505                    let Some(family) = lock.remove(&name) else {
506                        return self
507                            .db
508                            .cf_handle(&Self::col_name(column))
509                            .expect("No column family found");
510                    };
511
512                    self.db
513                        .create_cf(&name, &family)
514                        .expect("Couldn't create column family");
515
516                    self.db.cf_handle(&name).expect("invalid column state")
517                }
518                _ => {
519                    panic!("Columns in the DB should have been created on DB opening");
520                }
521            },
522            Some(family) => family,
523        }
524    }
525
526    fn col_name(column: u32) -> String {
527        format!("col-{}", column)
528    }
529
530    fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531        let mut opts = Options::default();
532        opts.create_if_missing(true);
533        opts.set_compression_type(DBCompressionType::Lz4);
534        opts.set_block_based_table_factory(block_opts);
535
536        opts
537    }
538
539    fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540        let mut opts = Self::default_opts(block_opts);
541
542        // All double-keys should be configured here
543        if let Some(size) = Description::prefix(&column) {
544            opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545        }
546
547        opts
548    }
549
550    /// RocksDB prefix iteration doesn't support reverse order,
551    /// so we need to to force the RocksDB iterator to order
552    /// all the prefix in the iterator so that we can take the next prefix
553    /// as start of iterator and iterate in reverse.
554    fn reverse_prefix_iter<T>(
555        &self,
556        prefix: &[u8],
557        column: Description::Column,
558    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559    where
560        T: ExtractItem,
561    {
562        let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563            let mut opts = self.read_options();
564            // We need this option because our iterator start in the `next_prefix` prefix section
565            // and continue in `prefix` section. Without this option the correct
566            // iteration between prefix section isn't guaranteed
567            // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
568            // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
569            opts.set_total_order_seek(true);
570            self.iterator::<T>(
571                column,
572                opts,
573                IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574            )
575        });
576
577        match reverse_iterator {
578            Some(iterator) => {
579                let prefix = prefix.to_vec();
580                iterator
581                    .take_while(move |item| {
582                        if let Ok(item) = item {
583                            T::starts_with(item, prefix.as_slice())
584                        } else {
585                            true
586                        }
587                    })
588                    .into_boxed()
589            }
590            _ => {
591                // No next item, so we can start backward iteration from the end.
592                let prefix = prefix.to_vec();
593                self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594                    .take_while(move |item| {
595                        if let Ok(item) = item {
596                            T::starts_with(item, prefix.as_slice())
597                        } else {
598                            true
599                        }
600                    })
601                    .into_boxed()
602            }
603        }
604    }
605
606    pub(crate) fn iterator<T>(
607        &self,
608        column: Description::Column,
609        opts: ReadOptions,
610        iter_mode: IteratorMode,
611    ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612    where
613        T: ExtractItem,
614    {
615        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617        RocksDBKeyIterator::<_, T>::new(
618            self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619            iter_mode,
620        )
621        .map(move |item| {
622            item.inspect(|item| {
623                self.metrics.read_meter.inc();
624                column_metrics.map(|metric| metric.inc());
625                self.metrics.bytes_read.inc_by(T::size(item));
626            })
627            .map_err(|e| DatabaseError::Other(e.into()).into())
628        })
629    }
630
631    /// The fast way to remove all data from the column.
632    pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633        // Mark column for deletion
634        let column_name = Self::col_name(column.id());
635        self.db
636            .drop_cf(&column_name)
637            .map_err(|e| DatabaseError::Other(e.into()))?;
638
639        // Insert new fresh column without data
640        let column_name = Self::col_name(column.id());
641        let opts = Self::cf_opts(column, self.block_opts.as_ref());
642        self.db
643            .create_cf(&column_name, &opts)
644            .map_err(|e| DatabaseError::Other(e.into()))?;
645
646        Ok(())
647    }
648
649    pub fn multi_get<K, I>(
650        &self,
651        column: u32,
652        iterator: I,
653    ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654    where
655        I: Iterator<Item = K>,
656        K: AsRef<[u8]>,
657    {
658        let column_metrics = self.metrics.columns_read_statistic.get(&column);
659        let cl = self.cf_u32(column);
660        let results = self
661            .db
662            .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663            .into_iter()
664            .map(|el| {
665                self.metrics.read_meter.inc();
666                column_metrics.map(|metric| metric.inc());
667                el.map(|value| {
668                    value.inspect(|vec| {
669                        self.metrics.bytes_read.inc_by(vec.len() as u64);
670                    })
671                })
672                .map_err(|err| DatabaseError::Other(err.into()))
673            })
674            .try_collect()?;
675        Ok(results)
676    }
677
678    fn _iter_store<T>(
679        &self,
680        column: Description::Column,
681        prefix: Option<&[u8]>,
682        start: Option<&[u8]>,
683        direction: IterDirection,
684    ) -> BoxedIter<'_, StorageResult<T::Item>>
685    where
686        T: ExtractItem,
687    {
688        match (prefix, start) {
689            (None, None) => {
690                let iter_mode =
691                    // if no start or prefix just start iterating over entire keyspace
692                    match direction {
693                        IterDirection::Forward => IteratorMode::Start,
694                        // end always iterates in reverse
695                        IterDirection::Reverse => IteratorMode::End,
696                    };
697                self.iterator::<T>(column, self.read_options(), iter_mode)
698                    .into_boxed()
699            }
700            (Some(prefix), None) => {
701                if direction == IterDirection::Reverse {
702                    self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703                } else {
704                    // start iterating in a certain direction within the keyspace
705                    let iter_mode = IteratorMode::From(
706                        prefix,
707                        convert_to_rocksdb_direction(direction),
708                    );
709
710                    // Setting prefix on the RocksDB level to optimize iteration.
711                    let mut opts = self.read_options();
712                    opts.set_prefix_same_as_start(true);
713
714                    let prefix = prefix.to_vec();
715                    self.iterator::<T>(column, opts, iter_mode)
716                        // Not all tables has a prefix set, so we need to filter out the keys.
717                        .take_while(move |item| {
718                            if let Ok(item) = item {
719                                T::starts_with(item, prefix.as_slice())
720                            } else {
721                                true
722                            }
723                        })
724                        .into_boxed()
725                }
726            }
727            (None, Some(start)) => {
728                // start iterating in a certain direction from the start key
729                let iter_mode =
730                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731                let mut opts = self.read_options();
732                // We need this option because our iterator start in the `start` prefix section
733                // and continue in next sections. Without this option the correct
734                // iteration between prefix section isn't guaranteed
735                // Source : https://github.com/facebook/rocksdb/wiki/Prefix-Seek#how-to-ignore-prefix-bloom-filters-in-read
736                // and https://github.com/facebook/rocksdb/wiki/Prefix-Seek#general-prefix-seek-api
737                opts.set_total_order_seek(true);
738                self.iterator::<T>(column, opts, iter_mode).into_boxed()
739            }
740            (Some(prefix), Some(start)) => {
741                // TODO: Maybe we want to allow the `start` to be without a `prefix` in the future.
742                // If the `start` doesn't have the same `prefix`, return nothing.
743                if !start.starts_with(prefix) {
744                    return iter::empty().into_boxed();
745                }
746
747                // start iterating in a certain direction from the start key
748                // and end iterating when we've gone outside the prefix
749                let prefix = prefix.to_vec();
750                let iter_mode =
751                    IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752                self.iterator::<T>(column, self.read_options(), iter_mode)
753                    .take_while(move |item| {
754                        if let Ok(item) = item {
755                            T::starts_with(item, prefix.as_slice())
756                        } else {
757                            true
758                        }
759                    })
760                    .into_boxed()
761            }
762        }
763    }
764
765    #[cfg(feature = "backup")]
766    fn backup_engine<P: AsRef<Path> + ?Sized>(
767        backup_dir: &P,
768    ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769        use rocksdb::{
770            Env,
771            backup::{
772                BackupEngine,
773                BackupEngineOptions,
774            },
775        };
776
777        let backup_dir = backup_dir.as_ref().join(Description::name());
778        let backup_dir_path = backup_dir.as_path();
779
780        let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781            .map_err(|e| {
782                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783                    "Couldn't create backup engine options for path `{}`: {}",
784                    backup_dir_path.display(),
785                    e
786                ))
787            })?;
788
789        let cpu_number =
790            i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792        backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794        let env = Env::new().map_err(|e| {
795            DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796                "Couldn't create environment for backup: {}",
797                e
798            ))
799        })?;
800
801        let backup_engine =
802            BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803                DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804                    "Couldn't open backup engine for path `{}`: {}",
805                    backup_dir.display(),
806                    e
807                ))
808            })?;
809
810        Ok(backup_engine)
811    }
812
813    #[cfg(feature = "backup")]
814    pub fn backup<P: AsRef<Path> + ?Sized>(
815        db_dir: &P,
816        backup_dir: &P,
817    ) -> DatabaseResult<()> {
818        let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820        let db_config = DatabaseConfig {
821            cache_capacity: None,
822            max_fds: -1,
823            columns_policy: ColumnsPolicy::Lazy,
824        };
825
826        let db = Self::open_read_only(
827            db_dir,
828            enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829            false,
830            db_config,
831        )?;
832
833        backup_engine
834            .create_new_backup_flush(&db.db, true)
835            .map_err(|e| {
836                DatabaseError::BackupError(anyhow::anyhow!(
837                    "Couldn't create new backup for path `{}`: {}",
838                    backup_dir.as_ref().display(),
839                    e
840                ))
841            })?;
842
843        Ok(())
844    }
845
846    /// We delegate opening of restored db to consumer, so they can apply their own options
847    #[cfg(feature = "backup")]
848    pub fn restore<P: AsRef<Path> + ?Sized>(
849        db_dir: &P,
850        backup_dir: &P,
851    ) -> DatabaseResult<()> {
852        use rocksdb::backup::RestoreOptions;
853
854        let mut backup_engine = Self::backup_engine(backup_dir)?;
855        let restore_option = RestoreOptions::default();
856
857        let db_dir = db_dir.as_ref().join(Description::name());
858        let db_dir_path = db_dir.as_path();
859        // we use the default wal directory, which is same as db path
860        backup_engine
861            .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
862            .map_err(|e| {
863                DatabaseError::RestoreError(anyhow::anyhow!(
864                    "Couldn't restore from latest backup for path `{}`: {}",
865                    db_dir_path.display(),
866                    e
867                ))
868            })?;
869
870        Ok(())
871    }
872
873    pub fn shutdown(&self) {
874        // Signal rocksdb's background compaction/flush threads to stop. We
875        // use `wait=false` so this returns immediately — each `CombinedDatabase`
876        // has several layered DBs and `wait=true` per-DB serialises into a
877        // shutdown that blows past tight per-service timeouts in tests
878        // (`tests/tests/poa.rs` budgets 1s for `send_stop_signal_and_await_shutdown`).
879        // The work is still cancelled; rocksdb's own destructor finishes the
880        // teardown when the last `Arc<PrimaryInstance>` clone is dropped.
881        self.db.cancel_all_background_work(false);
882
883        // Belt-and-suspenders for the test suite only: spin until no other
884        // `Arc<PrimaryInstance>` clone is alive. CI runs surface a residual
885        // atexit-phase race ("pthread lock: Invalid argument") that doesn't
886        // reproduce reliably; blocking shutdown here until readers are
887        // released gives the process exit path a clean baseline. Disabled
888        // in production builds because a stuck reader would hang shutdown
889        // indefinitely.
890        #[cfg(feature = "test-helpers")]
891        while Arc::strong_count(&self.db) > 1 {}
892    }
893}
894
895pub(crate) struct KeyOnly;
896
897impl ExtractItem for KeyOnly {
898    type Item = Vec<u8>;
899
900    fn extract_item<D>(
901        raw_iterator: &DBRawIteratorWithThreadMode<D>,
902    ) -> Option<Self::Item>
903    where
904        D: DBAccess,
905    {
906        raw_iterator.key().map(|key| key.to_vec())
907    }
908
909    fn size(item: &Self::Item) -> u64 {
910        item.len() as u64
911    }
912
913    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
914        item.starts_with(prefix)
915    }
916}
917
918pub(crate) struct KeyAndValue;
919
920impl ExtractItem for KeyAndValue {
921    type Item = (Vec<u8>, Value);
922
923    fn extract_item<D>(
924        raw_iterator: &DBRawIteratorWithThreadMode<D>,
925    ) -> Option<Self::Item>
926    where
927        D: DBAccess,
928    {
929        raw_iterator
930            .item()
931            .map(|(key, value)| (key.to_vec(), Value::from(value)))
932    }
933
934    fn size(item: &Self::Item) -> u64 {
935        item.0.len().saturating_add(item.1.len()) as u64
936    }
937
938    fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
939        item.0.starts_with(prefix)
940    }
941}
942
943impl<Description> KeyValueInspect for RocksDb<Description>
944where
945    Description: DatabaseDescription,
946{
947    type Column = Description::Column;
948
949    fn size_of_value(
950        &self,
951        key: &[u8],
952        column: Self::Column,
953    ) -> StorageResult<Option<usize>> {
954        self.metrics.read_meter.inc();
955        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
956        column_metrics.map(|metric| metric.inc());
957
958        Ok(self
959            .db
960            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
961            .map_err(|e| DatabaseError::Other(e.into()))?
962            .map(|value| value.len()))
963    }
964
965    fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
966        self.metrics.read_meter.inc();
967        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
968        column_metrics.map(|metric| metric.inc());
969
970        let value = self
971            .db
972            .get_cf_opt(&self.cf(column), key, &self.read_options)
973            .map_err(|e| DatabaseError::Other(e.into()))?;
974
975        if let Some(value) = &value {
976            self.metrics.bytes_read.inc_by(value.len() as u64);
977        }
978
979        Ok(value.map(Arc::from))
980    }
981
982    fn read_exact(
983        &self,
984        key: &[u8],
985        column: Self::Column,
986        offset: usize,
987        buf: &mut [u8],
988    ) -> StorageResult<Result<usize, StorageReadError>> {
989        self.metrics.read_meter.inc();
990        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
991        column_metrics.map(|metric| metric.inc());
992
993        let Some(value) = self
994            .db
995            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
996            .map_err(|e| DatabaseError::Other(e.into()))?
997        else {
998            return Ok(Err(StorageReadError::KeyNotFound));
999        };
1000
1001        let bytes_len = value.len();
1002        let start = offset;
1003        let buf_len = buf.len();
1004        let end = offset.saturating_add(buf_len);
1005
1006        if end > bytes_len {
1007            return Ok(Err(StorageReadError::OutOfBounds));
1008        }
1009
1010        let starting_from_offset = &value[start..end];
1011        buf[..].copy_from_slice(starting_from_offset);
1012
1013        self.metrics
1014            .bytes_read
1015            .inc_by(starting_from_offset.len() as u64);
1016
1017        Ok(Ok(buf_len))
1018    }
1019
1020    fn read_zerofill(
1021        &self,
1022        key: &[u8],
1023        column: Self::Column,
1024        offset: usize,
1025        buf: &mut [u8],
1026    ) -> StorageResult<Result<usize, StorageReadError>> {
1027        self.metrics.read_meter.inc();
1028        let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
1029        column_metrics.map(|metric| metric.inc());
1030
1031        let Some(value) = self
1032            .db
1033            .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
1034            .map_err(|e| DatabaseError::Other(e.into()))?
1035        else {
1036            return Ok(Err(StorageReadError::KeyNotFound));
1037        };
1038
1039        let bytes_len = value.len();
1040        let buf_len = buf.len();
1041
1042        let Some((_, after)) = value.split_at_checked(offset) else {
1043            return Ok(Err(StorageReadError::OutOfBounds));
1044        };
1045
1046        let (dst, rest) = buf.split_at_mut(buf_len.min(after.len()));
1047        dst.copy_from_slice(&after[..dst.len()]);
1048        rest.fill(0);
1049
1050        self.metrics.bytes_read.inc_by(dst.len() as u64);
1051
1052        Ok(Ok(bytes_len))
1053    }
1054}
1055
1056impl<Description> IterableStore for RocksDb<Description>
1057where
1058    Description: DatabaseDescription,
1059{
1060    fn iter_store(
1061        &self,
1062        column: Self::Column,
1063        prefix: Option<&[u8]>,
1064        start: Option<&[u8]>,
1065        direction: IterDirection,
1066    ) -> BoxedIter<'_, KVItem> {
1067        self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1068    }
1069
1070    fn iter_store_keys(
1071        &self,
1072        column: Self::Column,
1073        prefix: Option<&[u8]>,
1074        start: Option<&[u8]>,
1075        direction: IterDirection,
1076    ) -> BoxedIter<'_, KeyItem> {
1077        self._iter_store::<KeyOnly>(column, prefix, start, direction)
1078    }
1079}
1080
1081impl<Description> RocksDb<Description>
1082where
1083    Description: DatabaseDescription,
1084{
1085    pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1086        let instant = std::time::Instant::now();
1087        let mut batch = WriteBatch::default();
1088        let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1089
1090        match changes {
1091            StorageChanges::Changes(changes) => {
1092                self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1093            }
1094            StorageChanges::ChangesList(changes_list) => {
1095                for changes in changes_list {
1096                    self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1097                }
1098            }
1099        }
1100
1101        self.db
1102            .write(batch)
1103            .map_err(|e| DatabaseError::Other(e.into()))?;
1104        // TODO: Use `u128` when `AtomicU128` is stable.
1105        self.metrics.database_commit_time.inc_by(
1106            u64::try_from(instant.elapsed().as_nanos())
1107                .expect("The commit shouldn't take longer than `u64`"),
1108        );
1109
1110        Ok(())
1111    }
1112
1113    fn _populate_batch<'a>(
1114        &self,
1115        batch: &mut WriteBatch,
1116        conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1117        changes: &'a Changes,
1118    ) -> DatabaseResult<()> {
1119        for (column, ops) in changes {
1120            let cf = self.cf_u32(*column);
1121            let column_metrics = self.metrics.columns_write_statistic.get(column);
1122            for (key, op) in ops {
1123                self.metrics.write_meter.inc();
1124                column_metrics.map(|metric| metric.inc());
1125
1126                if !conflict_finder.insert((column, key)) {
1127                    return Err(DatabaseError::ConflictingChanges {
1128                        column: *column,
1129                        key: key.clone(),
1130                    });
1131                }
1132
1133                match op {
1134                    WriteOperation::Insert(value) => {
1135                        self.metrics.bytes_written.inc_by(value.len() as u64);
1136                        batch.put_cf(&cf, key, value.as_ref());
1137                    }
1138                    WriteOperation::Remove => {
1139                        batch.delete_cf(&cf, key);
1140                    }
1141                }
1142            }
1143        }
1144        Ok(())
1145    }
1146}
1147
1148/// The `None` means overflow, so there is not following prefix.
1149fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1150    for byte in prefix.iter_mut().rev() {
1151        if let Some(new_byte) = byte.checked_add(1) {
1152            *byte = new_byte;
1153            return Some(prefix);
1154        }
1155    }
1156    None
1157}
1158
1159#[cfg(feature = "test-helpers")]
1160pub mod test_helpers {
1161    use super::*;
1162    use fuel_core_storage::{
1163        kv_store::KeyValueMutate,
1164        transactional::ReadTransaction,
1165    };
1166
1167    impl<Description> KeyValueMutate for RocksDb<Description>
1168    where
1169        Description: DatabaseDescription,
1170    {
1171        fn write(
1172            &mut self,
1173            key: &[u8],
1174            column: Self::Column,
1175            buf: &[u8],
1176        ) -> StorageResult<usize> {
1177            let mut transaction = self.read_transaction();
1178            let len = transaction.write(key, column, buf)?;
1179            let changes = transaction.into_changes();
1180            self.commit_changes(&StorageChanges::Changes(changes))?;
1181
1182            Ok(len)
1183        }
1184
1185        fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1186            let mut transaction = self.read_transaction();
1187            transaction.delete(key, column)?;
1188            let changes = transaction.into_changes();
1189            self.commit_changes(&StorageChanges::Changes(changes))?;
1190            Ok(())
1191        }
1192    }
1193}
1194
1195#[allow(non_snake_case)]
1196#[cfg(test)]
1197mod tests {
1198    use super::*;
1199    use crate::database::database_description::on_chain::OnChain;
1200    use fuel_core_storage::{
1201        column::Column,
1202        kv_store::KeyValueMutate,
1203    };
1204    use std::collections::{
1205        BTreeMap,
1206        HashMap,
1207    };
1208    use tempfile::TempDir;
1209
1210    fn create_db() -> (RocksDb<OnChain>, TempDir) {
1211        let tmp_dir = TempDir::new().unwrap();
1212        (
1213            RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1214                .unwrap(),
1215            tmp_dir,
1216        )
1217    }
1218
1219    #[test]
1220    fn open_new_columns() {
1221        let tmp_dir = TempDir::new().unwrap();
1222
1223        // Given
1224        let old_columns =
1225            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1226        let database_with_old_columns = RocksDb::<OnChain>::open(
1227            tmp_dir.path(),
1228            old_columns.clone(),
1229            DatabaseConfig::config_for_tests(),
1230        )
1231        .expect("Failed to open database with old columns");
1232        drop(database_with_old_columns);
1233
1234        // When
1235        let mut new_columns = old_columns;
1236        new_columns.push(Column::ContractsAssets);
1237        new_columns.push(Column::Metadata);
1238        let database_with_new_columns = RocksDb::<OnChain>::open(
1239            tmp_dir.path(),
1240            new_columns,
1241            DatabaseConfig::config_for_tests(),
1242        )
1243        .map(|_| ());
1244
1245        // Then
1246        assert_eq!(Ok(()), database_with_new_columns);
1247    }
1248
1249    #[test]
1250    fn can_put_and_read() {
1251        let key = vec![0xA, 0xB, 0xC];
1252
1253        let (mut db, _tmp) = create_db();
1254        let expected = Value::from([1, 2, 3]);
1255        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1256
1257        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1258    }
1259
1260    #[test]
1261    fn put_returns_previous_value() {
1262        let key = vec![0xA, 0xB, 0xC];
1263
1264        let (mut db, _tmp) = create_db();
1265        let expected = Value::from([1, 2, 3]);
1266        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1267        let prev = db
1268            .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1269            .unwrap();
1270
1271        assert_eq!(prev, Some(expected));
1272    }
1273
1274    #[test]
1275    fn delete_and_get() {
1276        let key = vec![0xA, 0xB, 0xC];
1277
1278        let (mut db, _tmp) = create_db();
1279        let expected = Value::from([1, 2, 3]);
1280        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1281        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1282
1283        db.delete(&key, Column::Metadata).unwrap();
1284        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1285    }
1286
1287    #[test]
1288    fn key_exists() {
1289        let key = vec![0xA, 0xB, 0xC];
1290
1291        let (mut db, _tmp) = create_db();
1292        let expected = Arc::new([1, 2, 3]);
1293        db.put(&key, Column::Metadata, expected).unwrap();
1294        assert!(db.exists(&key, Column::Metadata).unwrap());
1295    }
1296
1297    #[test]
1298    fn commit_changes_inserts() {
1299        let key = vec![0xA, 0xB, 0xC];
1300        let value = Value::from([1, 2, 3]);
1301
1302        let (db, _tmp) = create_db();
1303        let ops = vec![(
1304            Column::Metadata.id(),
1305            BTreeMap::from_iter(vec![(
1306                key.clone().into(),
1307                WriteOperation::Insert(value.clone()),
1308            )]),
1309        )];
1310
1311        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1312            .unwrap();
1313        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1314    }
1315
1316    #[test]
1317    fn commit_changes_removes() {
1318        let key = vec![0xA, 0xB, 0xC];
1319        let value = Arc::new([1, 2, 3]);
1320
1321        let (mut db, _tmp) = create_db();
1322        db.put(&key, Column::Metadata, value).unwrap();
1323
1324        let ops = vec![(
1325            Column::Metadata.id(),
1326            BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1327        )];
1328        db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1329            .unwrap();
1330
1331        assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1332    }
1333
1334    #[test]
1335    fn can_use_unit_value() {
1336        let key = vec![0x00];
1337
1338        let (mut db, _tmp) = create_db();
1339        let expected = Value::from([]);
1340        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1341
1342        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1343
1344        assert!(db.exists(&key, Column::Metadata).unwrap());
1345
1346        assert_eq!(
1347            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1348                .collect::<Result<Vec<_>, _>>()
1349                .unwrap()[0],
1350            (key.clone(), expected.clone())
1351        );
1352
1353        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1354
1355        assert!(!db.exists(&key, Column::Metadata).unwrap());
1356    }
1357
1358    #[test]
1359    fn can_use_unit_key() {
1360        let key: Vec<u8> = Vec::with_capacity(0);
1361
1362        let (mut db, _tmp) = create_db();
1363        let expected = Value::from([1, 2, 3]);
1364        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1365
1366        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1367
1368        assert!(db.exists(&key, Column::Metadata).unwrap());
1369
1370        assert_eq!(
1371            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1372                .collect::<Result<Vec<_>, _>>()
1373                .unwrap()[0],
1374            (key.clone(), expected.clone())
1375        );
1376
1377        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1378
1379        assert!(!db.exists(&key, Column::Metadata).unwrap());
1380    }
1381
1382    #[test]
1383    fn can_use_unit_key_and_value() {
1384        let key: Vec<u8> = Vec::with_capacity(0);
1385
1386        let (mut db, _tmp) = create_db();
1387        let expected = Value::from([]);
1388        db.put(&key, Column::Metadata, expected.clone()).unwrap();
1389
1390        assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1391
1392        assert!(db.exists(&key, Column::Metadata).unwrap());
1393
1394        assert_eq!(
1395            db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1396                .collect::<Result<Vec<_>, _>>()
1397                .unwrap()[0],
1398            (key.clone(), expected.clone())
1399        );
1400
1401        assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1402
1403        assert!(!db.exists(&key, Column::Metadata).unwrap());
1404    }
1405
1406    #[test]
1407    fn open_primary_db_second_time_fails() {
1408        // Given
1409        let (_primary_db, tmp_dir) = create_db();
1410
1411        // When
1412        let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1413            .collect::<Vec<_>>();
1414        let result = RocksDb::<OnChain>::open(
1415            tmp_dir.path(),
1416            columns,
1417            DatabaseConfig::config_for_tests(),
1418        );
1419
1420        // Then
1421        assert!(result.is_err());
1422    }
1423
1424    #[test]
1425    fn open_second_read_only_db() {
1426        // Given
1427        let (_primary_db, tmp_dir) = create_db();
1428
1429        // When
1430        let old_columns =
1431            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1432        let result = RocksDb::<OnChain>::open_read_only(
1433            tmp_dir.path(),
1434            old_columns.clone(),
1435            false,
1436            DatabaseConfig::config_for_tests(),
1437        )
1438        .map(|_| ());
1439
1440        // Then
1441        assert_eq!(Ok(()), result);
1442    }
1443
1444    #[test]
1445    fn open_secondary_db() {
1446        // Given
1447        let (_primary_db, tmp_dir) = create_db();
1448        let secondary_temp = TempDir::new().unwrap();
1449
1450        // When
1451        let old_columns =
1452            vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1453        let result = RocksDb::<OnChain>::open_secondary(
1454            tmp_dir.path(),
1455            secondary_temp.path(),
1456            old_columns.clone(),
1457            DatabaseConfig::config_for_tests(),
1458        )
1459        .map(|_| ());
1460
1461        // Then
1462        assert_eq!(Ok(()), result);
1463    }
1464
1465    #[test]
1466    fn snapshot_allows_get_entry_after_it_was_removed() {
1467        let (mut db, _tmp) = create_db();
1468        let value = Value::from([1, 2, 3]);
1469
1470        // Given
1471        let key_1 = [1; 32];
1472        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1473        let snapshot = db.create_snapshot();
1474
1475        // When
1476        db.delete(&key_1, Column::Metadata).unwrap();
1477
1478        // Then
1479        let db_get = db.get(&key_1, Column::Metadata).unwrap();
1480        assert!(db_get.is_none());
1481
1482        let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1483        assert_eq!(snapshot_get, Some(value));
1484    }
1485
1486    #[test]
1487    fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1488        let (mut db, _tmp) = create_db();
1489        let value = Value::from([1, 2, 3]);
1490
1491        // Given
1492        let key_1 = vec![1; 32];
1493        let key_2 = vec![2; 32];
1494        let key_3 = vec![3; 32];
1495        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1496        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1497        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1498        let snapshot = db.create_snapshot();
1499
1500        // When
1501        db.delete(&key_1, Column::Metadata).unwrap();
1502        db.delete(&key_2, Column::Metadata).unwrap();
1503        db.delete(&key_3, Column::Metadata).unwrap();
1504
1505        // Then
1506        let db_iter = db
1507            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1508            .collect::<Vec<_>>();
1509        assert!(db_iter.is_empty());
1510
1511        let snapshot_iter = snapshot
1512            .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1513            .collect::<Vec<_>>();
1514        assert_eq!(
1515            snapshot_iter,
1516            vec![
1517                Ok((key_1, value.clone())),
1518                Ok((key_2, value.clone())),
1519                Ok((key_3, value))
1520            ]
1521        );
1522    }
1523
1524    #[test]
1525    fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1526        let (db, _tmp) = create_db();
1527
1528        // Given
1529        let snapshot = db.create_snapshot();
1530
1531        // When
1532        drop(db);
1533
1534        // Then
1535        drop(snapshot);
1536    }
1537
1538    #[test]
1539    fn open__opens_subset_of_columns_after_opening_all_columns() {
1540        // Given
1541        let (first_open_with_all_columns, tmp_dir) = create_db();
1542
1543        // When
1544        drop(first_open_with_all_columns);
1545        let part_of_columns =
1546            enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1547                .skip(1)
1548                .collect::<Vec<_>>();
1549        let open_with_part_of_columns = RocksDb::<OnChain>::open(
1550            tmp_dir.path(),
1551            part_of_columns,
1552            DatabaseConfig::config_for_tests(),
1553        );
1554
1555        // Then
1556        let _ = open_with_part_of_columns
1557            .expect("Should open the database with shorter number of columns");
1558    }
1559
1560    #[test]
1561    fn iter_store__reverse_iterator__no_target_prefix() {
1562        // Given
1563        let (mut db, _tmp) = create_db();
1564        let value = Value::from([]);
1565        let key_1 = [1, 1];
1566        let key_2 = [2, 2];
1567        let key_3 = [9, 3];
1568        let key_4 = [10, 0];
1569        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1570        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1571        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1572        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1573
1574        // When
1575        let db_iter = db
1576            .iter_store(
1577                Column::Metadata,
1578                Some(vec![5].as_slice()),
1579                None,
1580                IterDirection::Reverse,
1581            )
1582            .map(|item| item.map(|(key, _)| key))
1583            .collect::<Vec<_>>();
1584
1585        // Then
1586        assert_eq!(db_iter, vec![]);
1587    }
1588
1589    #[test]
1590    fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1591        // Given
1592        let (mut db, _tmp) = create_db();
1593        let value = Value::from([]);
1594        let key_1 = [1, 1];
1595        let key_2 = [2, 2];
1596        let key_3 = [2, 3];
1597        let key_4 = [10, 0];
1598        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1599        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1600        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1601        db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1602
1603        // When
1604        let db_iter = db
1605            .iter_store(
1606                Column::Metadata,
1607                Some(vec![2].as_slice()),
1608                None,
1609                IterDirection::Reverse,
1610            )
1611            .map(|item| item.map(|(key, _)| key))
1612            .collect::<Vec<_>>();
1613
1614        // Then
1615        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1616    }
1617
1618    #[test]
1619    fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1620        // Given
1621        let (mut db, _tmp) = create_db();
1622        let value = Value::from([]);
1623        let key_1 = [1, 1];
1624        let key_2 = [2, 2];
1625        let key_3 = [2, 3];
1626        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1627        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1628        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1629
1630        // When
1631        let db_iter = db
1632            .iter_store(
1633                Column::Metadata,
1634                Some(vec![2].as_slice()),
1635                None,
1636                IterDirection::Reverse,
1637            )
1638            .map(|item| item.map(|(key, _)| key))
1639            .collect::<Vec<_>>();
1640
1641        // Then
1642        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1643    }
1644
1645    #[test]
1646    fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1647        // Given
1648        let (mut db, _tmp) = create_db();
1649        let value = Value::from([]);
1650        let key_1 = [1, 1];
1651        let key_2 = [255, 254];
1652        let key_3 = [255, 255];
1653        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1654        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1655        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1656
1657        // When
1658        let db_iter = db
1659            .iter_store(
1660                Column::Metadata,
1661                Some(vec![255].as_slice()),
1662                None,
1663                IterDirection::Reverse,
1664            )
1665            .map(|item| item.map(|(key, _)| key))
1666            .collect::<Vec<_>>();
1667
1668        // Then
1669        assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1670    }
1671
1672    #[test]
1673    fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1674        // Given
1675        let (mut db, _tmp) = create_db();
1676        let value = Value::from([]);
1677        let key_1 = [1, 1];
1678        let key_2 = [255, 254];
1679        let key_3 = [255, 255];
1680        db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1681        db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1682        db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1683
1684        // When
1685        db.clear_table(Column::Metadata).unwrap();
1686
1687        // Then
1688        let db_iter = db
1689            .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1690            .collect::<Vec<_>>();
1691        assert_eq!(db_iter, vec![]);
1692    }
1693}