1use crate::{
2 database::{
3 Error as DatabaseError,
4 Result as DatabaseResult,
5 convert_to_rocksdb_direction,
6 database_description::DatabaseDescription,
7 },
8 state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12 ExtractItem,
13 RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18 Error as StorageError,
19 Result as StorageResult,
20 iter::{
21 BoxedIter,
22 IntoBoxedIter,
23 IterableStore,
24 },
25 kv_store::{
26 KVItem,
27 KeyItem,
28 KeyValueInspect,
29 StorageColumn,
30 Value,
31 WriteOperation,
32 },
33 transactional::{
34 Changes,
35 ReferenceBytesKey,
36 StorageChanges,
37 },
38};
39use itertools::Itertools;
40use rocksdb::{
41 BlockBasedOptions,
42 BoundColumnFamily,
43 Cache,
44 ColumnFamilyDescriptor,
45 DBAccess,
46 DBCompressionType,
47 DBRawIteratorWithThreadMode,
48 DBWithThreadMode,
49 IteratorMode,
50 MultiThreaded,
51 Options,
52 ReadOptions,
53 SliceTransform,
54 WriteBatch,
55};
56use std::{
57 cmp,
58 collections::{
59 BTreeMap,
60 HashSet,
61 },
62 fmt,
63 fmt::Formatter,
64 iter,
65 path::{
66 Path,
67 PathBuf,
68 },
69 sync::{
70 Arc,
71 Mutex,
72 },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80 type Target = DBWithThreadMode<MultiThreaded>;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl Drop for PrimaryInstance {
88 fn drop(&mut self) {
89 self.cancel_all_background_work(true);
90 }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98 drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104 write!(f, "DropResources")
105 }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109 fn from(closure: F) -> Self {
110 Self {
111 drop: Option::Some(Box::new(closure)),
112 }
113 }
114}
115
116impl Drop for DropResources {
117 fn drop(&mut self) {
118 if let Some(drop) = self.drop.take() {
119 (drop)()
120 }
121 }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125pub enum ColumnsPolicy {
127 #[cfg_attr(not(feature = "rocksdb-production"), default)]
128 Lazy,
130 #[cfg_attr(feature = "rocksdb-production", default)]
131 OnCreation,
133}
134
135#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138 pub cache_capacity: Option<usize>,
139 pub max_fds: i32,
140 pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145 pub fn config_for_tests() -> Self {
146 Self {
147 cache_capacity: None,
148 max_fds: 512,
149 columns_policy: ColumnsPolicy::Lazy,
150 }
151 }
152}
153
154pub struct RocksDb<Description> {
155 read_options: ReadOptions,
156 db: Arc<PrimaryInstance>,
157 block_opts: Arc<BlockBasedOptions>,
158 create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159 snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160 metrics: Arc<DatabaseMetrics>,
161 _drop: Arc<DropResources>,
163 _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167 fn drop(&mut self) {
168 self.snapshot = None;
171 }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176 f.debug_struct("RocksDb").field("db", &self.db).finish()
177 }
178}
179
180impl<Description> RocksDb<Description>
181where
182 Description: DatabaseDescription,
183{
184 pub fn default_open_temp() -> DatabaseResult<Self> {
185 Self::default_open_temp_with_params(DatabaseConfig {
186 cache_capacity: None,
187 max_fds: 512,
188 columns_policy: ColumnsPolicy::Lazy,
189 })
190 }
191
192 pub fn default_open_temp_with_params(
193 database_config: DatabaseConfig,
194 ) -> DatabaseResult<Self> {
195 let tmp_dir = TempDir::new().unwrap();
196 let path = tmp_dir.path();
197 let result = Self::open(
198 path,
199 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200 database_config,
201 );
202 let mut db = result?;
203
204 db._drop = Arc::new(
205 {
206 move || {
207 drop(tmp_dir);
209 }
210 }
211 .into(),
212 );
213
214 Ok(db)
215 }
216
217 pub fn default_open<P: AsRef<Path>>(
218 path: P,
219 database_config: DatabaseConfig,
220 ) -> DatabaseResult<Self> {
221 Self::open(
222 path,
223 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224 database_config,
225 )
226 }
227
228 pub fn prune(path: &Path) -> DatabaseResult<()> {
229 let path = path.join(Description::name());
230 DB::destroy(&Options::default(), path)
231 .map_err(|e| DatabaseError::Other(e.into()))?;
232 Ok(())
233 }
234
235 pub fn open<P: AsRef<Path>>(
236 path: P,
237 columns: Vec<Description::Column>,
238 database_config: DatabaseConfig,
239 ) -> DatabaseResult<Self> {
240 Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241 }
242
243 pub fn open_read_only<P: AsRef<Path>>(
244 path: P,
245 columns: Vec<Description::Column>,
246 error_if_log_file_exist: bool,
247 database_config: DatabaseConfig,
248 ) -> DatabaseResult<Self> {
249 Self::open_with(
250 |options, primary_path, cfs| {
251 DB::open_cf_descriptors_read_only(
252 options,
253 primary_path,
254 cfs,
255 error_if_log_file_exist,
256 )
257 },
258 path,
259 columns,
260 database_config,
261 )
262 }
263
264 pub fn open_secondary<PrimaryPath, SecondaryPath>(
265 path: PrimaryPath,
266 secondary_path: SecondaryPath,
267 columns: Vec<Description::Column>,
268 database_config: DatabaseConfig,
269 ) -> DatabaseResult<Self>
270 where
271 PrimaryPath: AsRef<Path>,
272 SecondaryPath: AsRef<Path>,
273 {
274 Self::open_with(
275 |options, primary_path, cfs| {
276 DB::open_cf_descriptors_as_secondary(
277 options,
278 primary_path,
279 secondary_path.as_ref().to_path_buf(),
280 cfs,
281 )
282 },
283 path,
284 columns,
285 database_config,
286 )
287 }
288
289 pub fn open_with<F, P>(
290 opener: F,
291 path: P,
292 columns: Vec<Description::Column>,
293 database_config: DatabaseConfig,
294 ) -> DatabaseResult<Self>
295 where
296 F: Fn(
297 &Options,
298 PathBuf,
299 Vec<ColumnFamilyDescriptor>,
300 ) -> Result<DB, rocksdb::Error>,
301 P: AsRef<Path>,
302 {
303 let original_path = path.as_ref().to_path_buf();
304 let path = original_path.join(Description::name());
305 let metric_columns = columns
306 .iter()
307 .map(|column| (column.id(), column.name()))
308 .collect::<Vec<_>>();
309 let metrics = Arc::new(DatabaseMetrics::new(
310 Description::name().as_str(),
311 &metric_columns,
312 ));
313 let mut block_opts = BlockBasedOptions::default();
314 block_opts.set_format_version(5);
316
317 if let Some(capacity) = database_config.cache_capacity {
318 let block_cache_size = capacity / 3;
321 let cache = Cache::new_lru_cache(block_cache_size);
322 block_opts.set_block_cache(&cache);
323 block_opts.set_cache_index_and_filter_blocks(true);
326 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328 } else {
329 block_opts.disable_cache();
330 }
331 block_opts.set_bloom_filter(10.0, true);
332 block_opts.set_block_size(16 * 1024);
333
334 let mut opts = Options::default();
335 opts.set_compression_type(DBCompressionType::Lz4);
336 opts.set_max_total_wal_size(64 * 1024 * 1024);
338 let cpu_number =
339 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340 opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341 if let Some(capacity) = database_config.cache_capacity {
342 let row_cache_size = capacity / 3;
347 let cache = Cache::new_lru_cache(row_cache_size);
348 opts.set_row_cache(&cache);
349 }
350 opts.set_max_background_jobs(6);
351 opts.set_bytes_per_sync(1048576);
352 opts.set_max_open_files(database_config.max_fds);
353
354 let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356 let mut cf_descriptors_to_open = BTreeMap::new();
357 let mut cf_descriptors_to_create = BTreeMap::new();
358 for column in columns.clone() {
359 let column_name = Self::col_name(column.id());
360 let opts = Self::cf_opts(column, &block_opts);
361 if existing_column_families.contains(&column_name) {
362 cf_descriptors_to_open.insert(column_name, opts);
363 } else {
364 cf_descriptors_to_create.insert(column_name, opts);
365 }
366 }
367
368 if database_config.columns_policy == ColumnsPolicy::OnCreation
369 || (database_config.columns_policy == ColumnsPolicy::Lazy
370 && cf_descriptors_to_open.is_empty())
371 {
372 opts.create_if_missing(true);
373 }
374
375 let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376 .iter()
377 .filter(|column_name| {
378 !cf_descriptors_to_open.contains_key(*column_name)
379 && !cf_descriptors_to_create.contains_key(*column_name)
380 })
381 .map(|unknown_column_name| {
382 let unknown_column_options = Self::default_opts(&block_opts);
383 (unknown_column_name.clone(), unknown_column_options)
384 })
385 .collect();
386 cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388 let iterator = cf_descriptors_to_open
389 .clone()
390 .into_iter()
391 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392 .collect::<Vec<_>>();
393
394 let db = match opener(&opts, path.clone(), iterator) {
395 Ok(db) => {
396 Ok(db)
397 },
398 Err(err) => {
399 tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401 let iterator = cf_descriptors_to_open
402 .clone()
403 .into_iter()
404 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405 .collect::<Vec<_>>();
406
407 opener(&opts, path.clone(), iterator)
408 },
409 }
410 .map_err(|e| DatabaseError::Other(e.into()))?;
411
412 let create_family = match database_config.columns_policy {
413 ColumnsPolicy::OnCreation => {
414 for (name, opt) in cf_descriptors_to_create {
415 db.create_cf(name, &opt)
416 .map_err(|e| DatabaseError::Other(e.into()))?;
417 }
418 None
419 }
420 ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421 };
422 let db = Arc::new(PrimaryInstance(db));
423
424 let rocks_db = RocksDb {
425 read_options: Self::generate_read_options(&None),
426 block_opts: Arc::new(block_opts),
427 snapshot: None,
428 db,
429 metrics,
430 create_family,
431 _drop: Default::default(),
432 _marker: Default::default(),
433 };
434
435 Ok(rocks_db)
436 }
437
438 fn generate_read_options(
439 snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440 ) -> ReadOptions {
441 let mut opts = ReadOptions::default();
442 opts.set_verify_checksums(false);
443 if let Some(snapshot) = &snapshot {
444 opts.set_snapshot(snapshot);
445 }
446 opts
447 }
448
449 fn read_options(&self) -> ReadOptions {
450 Self::generate_read_options(&self.snapshot)
451 }
452
453 pub fn create_snapshot(&self) -> Self {
454 self.create_snapshot_generic()
455 }
456
457 pub fn create_snapshot_generic<TargetDescription>(
458 &self,
459 ) -> RocksDb<TargetDescription> {
460 let db = self.db.clone();
461 let block_opts = self.block_opts.clone();
462 let create_family = self.create_family.clone();
463 let metrics = self.metrics.clone();
464 let _drop = self._drop.clone();
465
466 #[allow(clippy::missing_transmute_annotations)]
471 let snapshot = unsafe {
473 let snapshot = db.snapshot();
474 core::mem::transmute(snapshot)
475 };
476 let snapshot = Some(snapshot);
477
478 RocksDb {
479 read_options: Self::generate_read_options(&snapshot),
480 block_opts,
481 snapshot,
482 db,
483 create_family,
484 metrics,
485 _drop,
486 _marker: Default::default(),
487 }
488 }
489
490 fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491 self.cf_u32(column.id())
492 }
493
494 fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495 let family = self.db.cf_handle(&Self::col_name(column));
496
497 match family {
498 None => match &self.create_family {
499 Some(create_family) => {
500 let mut lock = create_family
501 .lock()
502 .expect("The create family lock should be available");
503
504 let name = Self::col_name(column);
505 let Some(family) = lock.remove(&name) else {
506 return self
507 .db
508 .cf_handle(&Self::col_name(column))
509 .expect("No column family found");
510 };
511
512 self.db
513 .create_cf(&name, &family)
514 .expect("Couldn't create column family");
515
516 self.db.cf_handle(&name).expect("invalid column state")
517 }
518 _ => {
519 panic!("Columns in the DB should have been created on DB opening");
520 }
521 },
522 Some(family) => family,
523 }
524 }
525
526 fn col_name(column: u32) -> String {
527 format!("col-{}", column)
528 }
529
530 fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531 let mut opts = Options::default();
532 opts.create_if_missing(true);
533 opts.set_compression_type(DBCompressionType::Lz4);
534 opts.set_block_based_table_factory(block_opts);
535
536 opts
537 }
538
539 fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540 let mut opts = Self::default_opts(block_opts);
541
542 if let Some(size) = Description::prefix(&column) {
544 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545 }
546
547 opts
548 }
549
550 fn reverse_prefix_iter<T>(
555 &self,
556 prefix: &[u8],
557 column: Description::Column,
558 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559 where
560 T: ExtractItem,
561 {
562 let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563 let mut opts = self.read_options();
564 opts.set_total_order_seek(true);
570 self.iterator::<T>(
571 column,
572 opts,
573 IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574 )
575 });
576
577 match reverse_iterator {
578 Some(iterator) => {
579 let prefix = prefix.to_vec();
580 iterator
581 .take_while(move |item| {
582 if let Ok(item) = item {
583 T::starts_with(item, prefix.as_slice())
584 } else {
585 true
586 }
587 })
588 .into_boxed()
589 }
590 _ => {
591 let prefix = prefix.to_vec();
593 self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594 .take_while(move |item| {
595 if let Ok(item) = item {
596 T::starts_with(item, prefix.as_slice())
597 } else {
598 true
599 }
600 })
601 .into_boxed()
602 }
603 }
604 }
605
606 pub(crate) fn iterator<T>(
607 &self,
608 column: Description::Column,
609 opts: ReadOptions,
610 iter_mode: IteratorMode,
611 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612 where
613 T: ExtractItem,
614 {
615 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617 RocksDBKeyIterator::<_, T>::new(
618 self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619 iter_mode,
620 )
621 .map(move |item| {
622 item.inspect(|item| {
623 self.metrics.read_meter.inc();
624 column_metrics.map(|metric| metric.inc());
625 self.metrics.bytes_read.inc_by(T::size(item));
626 })
627 .map_err(|e| DatabaseError::Other(e.into()).into())
628 })
629 }
630
631 pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633 let column_name = Self::col_name(column.id());
635 self.db
636 .drop_cf(&column_name)
637 .map_err(|e| DatabaseError::Other(e.into()))?;
638
639 let column_name = Self::col_name(column.id());
641 let opts = Self::cf_opts(column, self.block_opts.as_ref());
642 self.db
643 .create_cf(&column_name, &opts)
644 .map_err(|e| DatabaseError::Other(e.into()))?;
645
646 Ok(())
647 }
648
649 pub fn multi_get<K, I>(
650 &self,
651 column: u32,
652 iterator: I,
653 ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654 where
655 I: Iterator<Item = K>,
656 K: AsRef<[u8]>,
657 {
658 let column_metrics = self.metrics.columns_read_statistic.get(&column);
659 let cl = self.cf_u32(column);
660 let results = self
661 .db
662 .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663 .into_iter()
664 .map(|el| {
665 self.metrics.read_meter.inc();
666 column_metrics.map(|metric| metric.inc());
667 el.map(|value| {
668 value.inspect(|vec| {
669 self.metrics.bytes_read.inc_by(vec.len() as u64);
670 })
671 })
672 .map_err(|err| DatabaseError::Other(err.into()))
673 })
674 .try_collect()?;
675 Ok(results)
676 }
677
678 fn _iter_store<T>(
679 &self,
680 column: Description::Column,
681 prefix: Option<&[u8]>,
682 start: Option<&[u8]>,
683 direction: IterDirection,
684 ) -> BoxedIter<'_, StorageResult<T::Item>>
685 where
686 T: ExtractItem,
687 {
688 match (prefix, start) {
689 (None, None) => {
690 let iter_mode =
691 match direction {
693 IterDirection::Forward => IteratorMode::Start,
694 IterDirection::Reverse => IteratorMode::End,
696 };
697 self.iterator::<T>(column, self.read_options(), iter_mode)
698 .into_boxed()
699 }
700 (Some(prefix), None) => {
701 if direction == IterDirection::Reverse {
702 self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703 } else {
704 let iter_mode = IteratorMode::From(
706 prefix,
707 convert_to_rocksdb_direction(direction),
708 );
709
710 let mut opts = self.read_options();
712 opts.set_prefix_same_as_start(true);
713
714 let prefix = prefix.to_vec();
715 self.iterator::<T>(column, opts, iter_mode)
716 .take_while(move |item| {
718 if let Ok(item) = item {
719 T::starts_with(item, prefix.as_slice())
720 } else {
721 true
722 }
723 })
724 .into_boxed()
725 }
726 }
727 (None, Some(start)) => {
728 let iter_mode =
730 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731 let mut opts = self.read_options();
732 opts.set_total_order_seek(true);
738 self.iterator::<T>(column, opts, iter_mode).into_boxed()
739 }
740 (Some(prefix), Some(start)) => {
741 if !start.starts_with(prefix) {
744 return iter::empty().into_boxed();
745 }
746
747 let prefix = prefix.to_vec();
750 let iter_mode =
751 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752 self.iterator::<T>(column, self.read_options(), iter_mode)
753 .take_while(move |item| {
754 if let Ok(item) = item {
755 T::starts_with(item, prefix.as_slice())
756 } else {
757 true
758 }
759 })
760 .into_boxed()
761 }
762 }
763 }
764
765 #[cfg(feature = "backup")]
766 fn backup_engine<P: AsRef<Path> + ?Sized>(
767 backup_dir: &P,
768 ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769 use rocksdb::{
770 Env,
771 backup::{
772 BackupEngine,
773 BackupEngineOptions,
774 },
775 };
776
777 let backup_dir = backup_dir.as_ref().join(Description::name());
778 let backup_dir_path = backup_dir.as_path();
779
780 let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781 .map_err(|e| {
782 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783 "Couldn't create backup engine options for path `{}`: {}",
784 backup_dir_path.display(),
785 e
786 ))
787 })?;
788
789 let cpu_number =
790 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792 backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794 let env = Env::new().map_err(|e| {
795 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796 "Couldn't create environment for backup: {}",
797 e
798 ))
799 })?;
800
801 let backup_engine =
802 BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804 "Couldn't open backup engine for path `{}`: {}",
805 backup_dir.display(),
806 e
807 ))
808 })?;
809
810 Ok(backup_engine)
811 }
812
813 #[cfg(feature = "backup")]
814 pub fn backup<P: AsRef<Path> + ?Sized>(
815 db_dir: &P,
816 backup_dir: &P,
817 ) -> DatabaseResult<()> {
818 let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820 let db_config = DatabaseConfig {
821 cache_capacity: None,
822 max_fds: -1,
823 columns_policy: ColumnsPolicy::Lazy,
824 };
825
826 let db = Self::open_read_only(
827 db_dir,
828 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829 false,
830 db_config,
831 )?;
832
833 backup_engine.create_new_backup(&db.db).map_err(|e| {
834 DatabaseError::BackupError(anyhow::anyhow!(
835 "Couldn't create new backup for path `{}`: {}",
836 backup_dir.as_ref().display(),
837 e
838 ))
839 })?;
840
841 Ok(())
842 }
843
844 #[cfg(feature = "backup")]
846 pub fn restore<P: AsRef<Path> + ?Sized>(
847 db_dir: &P,
848 backup_dir: &P,
849 ) -> DatabaseResult<()> {
850 use rocksdb::backup::RestoreOptions;
851
852 let mut backup_engine = Self::backup_engine(backup_dir)?;
853 let restore_option = RestoreOptions::default();
854
855 let db_dir = db_dir.as_ref().join(Description::name());
856 let db_dir_path = db_dir.as_path();
857 backup_engine
859 .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
860 .map_err(|e| {
861 DatabaseError::RestoreError(anyhow::anyhow!(
862 "Couldn't restore from latest backup for path `{}`: {}",
863 db_dir_path.display(),
864 e
865 ))
866 })?;
867
868 Ok(())
869 }
870
871 pub fn shutdown(&self) {
872 while Arc::strong_count(&self.db) > 1 {}
873 }
874}
875
876pub(crate) struct KeyOnly;
877
878impl ExtractItem for KeyOnly {
879 type Item = Vec<u8>;
880
881 fn extract_item<D>(
882 raw_iterator: &DBRawIteratorWithThreadMode<D>,
883 ) -> Option<Self::Item>
884 where
885 D: DBAccess,
886 {
887 raw_iterator.key().map(|key| key.to_vec())
888 }
889
890 fn size(item: &Self::Item) -> u64 {
891 item.len() as u64
892 }
893
894 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
895 item.starts_with(prefix)
896 }
897}
898
899pub(crate) struct KeyAndValue;
900
901impl ExtractItem for KeyAndValue {
902 type Item = (Vec<u8>, Value);
903
904 fn extract_item<D>(
905 raw_iterator: &DBRawIteratorWithThreadMode<D>,
906 ) -> Option<Self::Item>
907 where
908 D: DBAccess,
909 {
910 raw_iterator
911 .item()
912 .map(|(key, value)| (key.to_vec(), Value::from(value)))
913 }
914
915 fn size(item: &Self::Item) -> u64 {
916 item.0.len().saturating_add(item.1.len()) as u64
917 }
918
919 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
920 item.0.starts_with(prefix)
921 }
922}
923
924impl<Description> KeyValueInspect for RocksDb<Description>
925where
926 Description: DatabaseDescription,
927{
928 type Column = Description::Column;
929
930 fn size_of_value(
931 &self,
932 key: &[u8],
933 column: Self::Column,
934 ) -> StorageResult<Option<usize>> {
935 self.metrics.read_meter.inc();
936 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
937 column_metrics.map(|metric| metric.inc());
938
939 Ok(self
940 .db
941 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
942 .map_err(|e| DatabaseError::Other(e.into()))?
943 .map(|value| value.len()))
944 }
945
946 fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
947 self.metrics.read_meter.inc();
948 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
949 column_metrics.map(|metric| metric.inc());
950
951 let value = self
952 .db
953 .get_cf_opt(&self.cf(column), key, &self.read_options)
954 .map_err(|e| DatabaseError::Other(e.into()))?;
955
956 if let Some(value) = &value {
957 self.metrics.bytes_read.inc_by(value.len() as u64);
958 }
959
960 Ok(value.map(Arc::from))
961 }
962
963 fn read(
964 &self,
965 key: &[u8],
966 column: Self::Column,
967 offset: usize,
968 buf: &mut [u8],
969 ) -> StorageResult<bool> {
970 self.metrics.read_meter.inc();
971 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
972 column_metrics.map(|metric| metric.inc());
973
974 let Some(value) = self
975 .db
976 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
977 .map_err(|e| DatabaseError::Other(e.into()))?
978 else {
979 return Ok(false);
980 };
981
982 let bytes_len = value.len();
983 let start = offset;
984 let buf_len = buf.len();
985 let end = offset.saturating_add(buf_len);
986
987 if end > bytes_len {
988 return Err(StorageError::Other(anyhow::anyhow!(
989 "Offset `{offset}` + buf_len `{buf_len}` read until {end} which is out of bounds `{bytes_len}` for key `{:?}` and column `{column:?}`",
990 key
991 )));
992 }
993
994 let starting_from_offset = &value[start..end];
995 buf[..].copy_from_slice(starting_from_offset);
996
997 self.metrics
998 .bytes_read
999 .inc_by(starting_from_offset.len() as u64);
1000
1001 Ok(true)
1002 }
1003}
1004
1005impl<Description> IterableStore for RocksDb<Description>
1006where
1007 Description: DatabaseDescription,
1008{
1009 fn iter_store(
1010 &self,
1011 column: Self::Column,
1012 prefix: Option<&[u8]>,
1013 start: Option<&[u8]>,
1014 direction: IterDirection,
1015 ) -> BoxedIter<'_, KVItem> {
1016 self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1017 }
1018
1019 fn iter_store_keys(
1020 &self,
1021 column: Self::Column,
1022 prefix: Option<&[u8]>,
1023 start: Option<&[u8]>,
1024 direction: IterDirection,
1025 ) -> BoxedIter<'_, KeyItem> {
1026 self._iter_store::<KeyOnly>(column, prefix, start, direction)
1027 }
1028}
1029
1030impl<Description> RocksDb<Description>
1031where
1032 Description: DatabaseDescription,
1033{
1034 pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1035 let instant = std::time::Instant::now();
1036 let mut batch = WriteBatch::default();
1037 let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1038
1039 match changes {
1040 StorageChanges::Changes(changes) => {
1041 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1042 }
1043 StorageChanges::ChangesList(changes_list) => {
1044 for changes in changes_list {
1045 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1046 }
1047 }
1048 }
1049
1050 self.db
1051 .write(batch)
1052 .map_err(|e| DatabaseError::Other(e.into()))?;
1053 self.metrics.database_commit_time.inc_by(
1055 u64::try_from(instant.elapsed().as_nanos())
1056 .expect("The commit shouldn't take longer than `u64`"),
1057 );
1058
1059 Ok(())
1060 }
1061
1062 fn _populate_batch<'a>(
1063 &self,
1064 batch: &mut WriteBatch,
1065 conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1066 changes: &'a Changes,
1067 ) -> DatabaseResult<()> {
1068 for (column, ops) in changes {
1069 let cf = self.cf_u32(*column);
1070 let column_metrics = self.metrics.columns_write_statistic.get(column);
1071 for (key, op) in ops {
1072 self.metrics.write_meter.inc();
1073 column_metrics.map(|metric| metric.inc());
1074
1075 if !conflict_finder.insert((column, key)) {
1076 return Err(DatabaseError::ConflictingChanges {
1077 column: *column,
1078 key: key.clone(),
1079 });
1080 }
1081
1082 match op {
1083 WriteOperation::Insert(value) => {
1084 self.metrics.bytes_written.inc_by(value.len() as u64);
1085 batch.put_cf(&cf, key, value.as_ref());
1086 }
1087 WriteOperation::Remove => {
1088 batch.delete_cf(&cf, key);
1089 }
1090 }
1091 }
1092 }
1093 Ok(())
1094 }
1095}
1096
1097fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1099 for byte in prefix.iter_mut().rev() {
1100 if let Some(new_byte) = byte.checked_add(1) {
1101 *byte = new_byte;
1102 return Some(prefix);
1103 }
1104 }
1105 None
1106}
1107
1108#[cfg(feature = "test-helpers")]
1109pub mod test_helpers {
1110 use super::*;
1111 use fuel_core_storage::{
1112 kv_store::KeyValueMutate,
1113 transactional::ReadTransaction,
1114 };
1115
1116 impl<Description> KeyValueMutate for RocksDb<Description>
1117 where
1118 Description: DatabaseDescription,
1119 {
1120 fn write(
1121 &mut self,
1122 key: &[u8],
1123 column: Self::Column,
1124 buf: &[u8],
1125 ) -> StorageResult<usize> {
1126 let mut transaction = self.read_transaction();
1127 let len = transaction.write(key, column, buf)?;
1128 let changes = transaction.into_changes();
1129 self.commit_changes(&StorageChanges::Changes(changes))?;
1130
1131 Ok(len)
1132 }
1133
1134 fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1135 let mut transaction = self.read_transaction();
1136 transaction.delete(key, column)?;
1137 let changes = transaction.into_changes();
1138 self.commit_changes(&StorageChanges::Changes(changes))?;
1139 Ok(())
1140 }
1141 }
1142}
1143
1144#[allow(non_snake_case)]
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::database::database_description::on_chain::OnChain;
1149 use fuel_core_storage::{
1150 column::Column,
1151 kv_store::KeyValueMutate,
1152 };
1153 use std::collections::{
1154 BTreeMap,
1155 HashMap,
1156 };
1157 use tempfile::TempDir;
1158
1159 fn create_db() -> (RocksDb<OnChain>, TempDir) {
1160 let tmp_dir = TempDir::new().unwrap();
1161 (
1162 RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1163 .unwrap(),
1164 tmp_dir,
1165 )
1166 }
1167
1168 #[test]
1169 fn open_new_columns() {
1170 let tmp_dir = TempDir::new().unwrap();
1171
1172 let old_columns =
1174 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1175 let database_with_old_columns = RocksDb::<OnChain>::open(
1176 tmp_dir.path(),
1177 old_columns.clone(),
1178 DatabaseConfig::config_for_tests(),
1179 )
1180 .expect("Failed to open database with old columns");
1181 drop(database_with_old_columns);
1182
1183 let mut new_columns = old_columns;
1185 new_columns.push(Column::ContractsAssets);
1186 new_columns.push(Column::Metadata);
1187 let database_with_new_columns = RocksDb::<OnChain>::open(
1188 tmp_dir.path(),
1189 new_columns,
1190 DatabaseConfig::config_for_tests(),
1191 )
1192 .map(|_| ());
1193
1194 assert_eq!(Ok(()), database_with_new_columns);
1196 }
1197
1198 #[test]
1199 fn can_put_and_read() {
1200 let key = vec![0xA, 0xB, 0xC];
1201
1202 let (mut db, _tmp) = create_db();
1203 let expected = Value::from([1, 2, 3]);
1204 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1205
1206 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1207 }
1208
1209 #[test]
1210 fn put_returns_previous_value() {
1211 let key = vec![0xA, 0xB, 0xC];
1212
1213 let (mut db, _tmp) = create_db();
1214 let expected = Value::from([1, 2, 3]);
1215 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1216 let prev = db
1217 .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1218 .unwrap();
1219
1220 assert_eq!(prev, Some(expected));
1221 }
1222
1223 #[test]
1224 fn delete_and_get() {
1225 let key = vec![0xA, 0xB, 0xC];
1226
1227 let (mut db, _tmp) = create_db();
1228 let expected = Value::from([1, 2, 3]);
1229 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1230 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1231
1232 db.delete(&key, Column::Metadata).unwrap();
1233 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1234 }
1235
1236 #[test]
1237 fn key_exists() {
1238 let key = vec![0xA, 0xB, 0xC];
1239
1240 let (mut db, _tmp) = create_db();
1241 let expected = Arc::new([1, 2, 3]);
1242 db.put(&key, Column::Metadata, expected).unwrap();
1243 assert!(db.exists(&key, Column::Metadata).unwrap());
1244 }
1245
1246 #[test]
1247 fn commit_changes_inserts() {
1248 let key = vec![0xA, 0xB, 0xC];
1249 let value = Value::from([1, 2, 3]);
1250
1251 let (db, _tmp) = create_db();
1252 let ops = vec![(
1253 Column::Metadata.id(),
1254 BTreeMap::from_iter(vec![(
1255 key.clone().into(),
1256 WriteOperation::Insert(value.clone()),
1257 )]),
1258 )];
1259
1260 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1261 .unwrap();
1262 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1263 }
1264
1265 #[test]
1266 fn commit_changes_removes() {
1267 let key = vec![0xA, 0xB, 0xC];
1268 let value = Arc::new([1, 2, 3]);
1269
1270 let (mut db, _tmp) = create_db();
1271 db.put(&key, Column::Metadata, value).unwrap();
1272
1273 let ops = vec![(
1274 Column::Metadata.id(),
1275 BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1276 )];
1277 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1278 .unwrap();
1279
1280 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1281 }
1282
1283 #[test]
1284 fn can_use_unit_value() {
1285 let key = vec![0x00];
1286
1287 let (mut db, _tmp) = create_db();
1288 let expected = Value::from([]);
1289 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1290
1291 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1292
1293 assert!(db.exists(&key, Column::Metadata).unwrap());
1294
1295 assert_eq!(
1296 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1297 .collect::<Result<Vec<_>, _>>()
1298 .unwrap()[0],
1299 (key.clone(), expected.clone())
1300 );
1301
1302 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1303
1304 assert!(!db.exists(&key, Column::Metadata).unwrap());
1305 }
1306
1307 #[test]
1308 fn can_use_unit_key() {
1309 let key: Vec<u8> = Vec::with_capacity(0);
1310
1311 let (mut db, _tmp) = create_db();
1312 let expected = Value::from([1, 2, 3]);
1313 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1314
1315 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1316
1317 assert!(db.exists(&key, Column::Metadata).unwrap());
1318
1319 assert_eq!(
1320 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1321 .collect::<Result<Vec<_>, _>>()
1322 .unwrap()[0],
1323 (key.clone(), expected.clone())
1324 );
1325
1326 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1327
1328 assert!(!db.exists(&key, Column::Metadata).unwrap());
1329 }
1330
1331 #[test]
1332 fn can_use_unit_key_and_value() {
1333 let key: Vec<u8> = Vec::with_capacity(0);
1334
1335 let (mut db, _tmp) = create_db();
1336 let expected = Value::from([]);
1337 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1338
1339 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1340
1341 assert!(db.exists(&key, Column::Metadata).unwrap());
1342
1343 assert_eq!(
1344 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1345 .collect::<Result<Vec<_>, _>>()
1346 .unwrap()[0],
1347 (key.clone(), expected.clone())
1348 );
1349
1350 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1351
1352 assert!(!db.exists(&key, Column::Metadata).unwrap());
1353 }
1354
1355 #[test]
1356 fn open_primary_db_second_time_fails() {
1357 let (_primary_db, tmp_dir) = create_db();
1359
1360 let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1362 .collect::<Vec<_>>();
1363 let result = RocksDb::<OnChain>::open(
1364 tmp_dir.path(),
1365 columns,
1366 DatabaseConfig::config_for_tests(),
1367 );
1368
1369 assert!(result.is_err());
1371 }
1372
1373 #[test]
1374 fn open_second_read_only_db() {
1375 let (_primary_db, tmp_dir) = create_db();
1377
1378 let old_columns =
1380 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1381 let result = RocksDb::<OnChain>::open_read_only(
1382 tmp_dir.path(),
1383 old_columns.clone(),
1384 false,
1385 DatabaseConfig::config_for_tests(),
1386 )
1387 .map(|_| ());
1388
1389 assert_eq!(Ok(()), result);
1391 }
1392
1393 #[test]
1394 fn open_secondary_db() {
1395 let (_primary_db, tmp_dir) = create_db();
1397 let secondary_temp = TempDir::new().unwrap();
1398
1399 let old_columns =
1401 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1402 let result = RocksDb::<OnChain>::open_secondary(
1403 tmp_dir.path(),
1404 secondary_temp.path(),
1405 old_columns.clone(),
1406 DatabaseConfig::config_for_tests(),
1407 )
1408 .map(|_| ());
1409
1410 assert_eq!(Ok(()), result);
1412 }
1413
1414 #[test]
1415 fn snapshot_allows_get_entry_after_it_was_removed() {
1416 let (mut db, _tmp) = create_db();
1417 let value = Value::from([1, 2, 3]);
1418
1419 let key_1 = [1; 32];
1421 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1422 let snapshot = db.create_snapshot();
1423
1424 db.delete(&key_1, Column::Metadata).unwrap();
1426
1427 let db_get = db.get(&key_1, Column::Metadata).unwrap();
1429 assert!(db_get.is_none());
1430
1431 let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1432 assert_eq!(snapshot_get, Some(value));
1433 }
1434
1435 #[test]
1436 fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1437 let (mut db, _tmp) = create_db();
1438 let value = Value::from([1, 2, 3]);
1439
1440 let key_1 = vec![1; 32];
1442 let key_2 = vec![2; 32];
1443 let key_3 = vec![3; 32];
1444 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1445 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1446 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1447 let snapshot = db.create_snapshot();
1448
1449 db.delete(&key_1, Column::Metadata).unwrap();
1451 db.delete(&key_2, Column::Metadata).unwrap();
1452 db.delete(&key_3, Column::Metadata).unwrap();
1453
1454 let db_iter = db
1456 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1457 .collect::<Vec<_>>();
1458 assert!(db_iter.is_empty());
1459
1460 let snapshot_iter = snapshot
1461 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1462 .collect::<Vec<_>>();
1463 assert_eq!(
1464 snapshot_iter,
1465 vec![
1466 Ok((key_1, value.clone())),
1467 Ok((key_2, value.clone())),
1468 Ok((key_3, value))
1469 ]
1470 );
1471 }
1472
1473 #[test]
1474 fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1475 let (db, _tmp) = create_db();
1476
1477 let snapshot = db.create_snapshot();
1479
1480 drop(db);
1482
1483 drop(snapshot);
1485 }
1486
1487 #[test]
1488 fn open__opens_subset_of_columns_after_opening_all_columns() {
1489 let (first_open_with_all_columns, tmp_dir) = create_db();
1491
1492 drop(first_open_with_all_columns);
1494 let part_of_columns =
1495 enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1496 .skip(1)
1497 .collect::<Vec<_>>();
1498 let open_with_part_of_columns = RocksDb::<OnChain>::open(
1499 tmp_dir.path(),
1500 part_of_columns,
1501 DatabaseConfig::config_for_tests(),
1502 );
1503
1504 let _ = open_with_part_of_columns
1506 .expect("Should open the database with shorter number of columns");
1507 }
1508
1509 #[test]
1510 fn iter_store__reverse_iterator__no_target_prefix() {
1511 let (mut db, _tmp) = create_db();
1513 let value = Value::from([]);
1514 let key_1 = [1, 1];
1515 let key_2 = [2, 2];
1516 let key_3 = [9, 3];
1517 let key_4 = [10, 0];
1518 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1519 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1520 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1521 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1522
1523 let db_iter = db
1525 .iter_store(
1526 Column::Metadata,
1527 Some(vec![5].as_slice()),
1528 None,
1529 IterDirection::Reverse,
1530 )
1531 .map(|item| item.map(|(key, _)| key))
1532 .collect::<Vec<_>>();
1533
1534 assert_eq!(db_iter, vec![]);
1536 }
1537
1538 #[test]
1539 fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1540 let (mut db, _tmp) = create_db();
1542 let value = Value::from([]);
1543 let key_1 = [1, 1];
1544 let key_2 = [2, 2];
1545 let key_3 = [2, 3];
1546 let key_4 = [10, 0];
1547 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1548 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1549 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1550 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1551
1552 let db_iter = db
1554 .iter_store(
1555 Column::Metadata,
1556 Some(vec![2].as_slice()),
1557 None,
1558 IterDirection::Reverse,
1559 )
1560 .map(|item| item.map(|(key, _)| key))
1561 .collect::<Vec<_>>();
1562
1563 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1565 }
1566
1567 #[test]
1568 fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1569 let (mut db, _tmp) = create_db();
1571 let value = Value::from([]);
1572 let key_1 = [1, 1];
1573 let key_2 = [2, 2];
1574 let key_3 = [2, 3];
1575 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1576 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1577 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1578
1579 let db_iter = db
1581 .iter_store(
1582 Column::Metadata,
1583 Some(vec![2].as_slice()),
1584 None,
1585 IterDirection::Reverse,
1586 )
1587 .map(|item| item.map(|(key, _)| key))
1588 .collect::<Vec<_>>();
1589
1590 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1592 }
1593
1594 #[test]
1595 fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1596 let (mut db, _tmp) = create_db();
1598 let value = Value::from([]);
1599 let key_1 = [1, 1];
1600 let key_2 = [255, 254];
1601 let key_3 = [255, 255];
1602 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1603 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1604 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1605
1606 let db_iter = db
1608 .iter_store(
1609 Column::Metadata,
1610 Some(vec![255].as_slice()),
1611 None,
1612 IterDirection::Reverse,
1613 )
1614 .map(|item| item.map(|(key, _)| key))
1615 .collect::<Vec<_>>();
1616
1617 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1619 }
1620
1621 #[test]
1622 fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1623 let (mut db, _tmp) = create_db();
1625 let value = Value::from([]);
1626 let key_1 = [1, 1];
1627 let key_2 = [255, 254];
1628 let key_3 = [255, 255];
1629 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1630 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1631 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1632
1633 db.clear_table(Column::Metadata).unwrap();
1635
1636 let db_iter = db
1638 .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1639 .collect::<Vec<_>>();
1640 assert_eq!(db_iter, vec![]);
1641 }
1642}