1use crate::{
2 database::{
3 Error as DatabaseError,
4 Result as DatabaseResult,
5 convert_to_rocksdb_direction,
6 database_description::DatabaseDescription,
7 },
8 state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12 ExtractItem,
13 RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18 Result as StorageResult,
19 StorageReadError,
20 iter::{
21 BoxedIter,
22 IntoBoxedIter,
23 IterableStore,
24 },
25 kv_store::{
26 KVItem,
27 KeyItem,
28 KeyValueInspect,
29 StorageColumn,
30 Value,
31 WriteOperation,
32 },
33 transactional::{
34 Changes,
35 ReferenceBytesKey,
36 StorageChanges,
37 },
38};
39use itertools::Itertools;
40use rocksdb::{
41 BlockBasedOptions,
42 BoundColumnFamily,
43 Cache,
44 ColumnFamilyDescriptor,
45 DBAccess,
46 DBCompressionType,
47 DBRawIteratorWithThreadMode,
48 DBWithThreadMode,
49 IteratorMode,
50 MultiThreaded,
51 Options,
52 ReadOptions,
53 SliceTransform,
54 WriteBatch,
55};
56use std::{
57 cmp,
58 collections::{
59 BTreeMap,
60 HashSet,
61 },
62 fmt,
63 fmt::Formatter,
64 iter,
65 path::{
66 Path,
67 PathBuf,
68 },
69 sync::{
70 Arc,
71 Mutex,
72 },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80 type Target = DBWithThreadMode<MultiThreaded>;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl Drop for PrimaryInstance {
88 fn drop(&mut self) {
89 self.cancel_all_background_work(true);
90 }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98 drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104 write!(f, "DropResources")
105 }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109 fn from(closure: F) -> Self {
110 Self {
111 drop: Option::Some(Box::new(closure)),
112 }
113 }
114}
115
116impl Drop for DropResources {
117 fn drop(&mut self) {
118 if let Some(drop) = self.drop.take() {
119 (drop)()
120 }
121 }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125pub enum ColumnsPolicy {
127 #[cfg_attr(not(feature = "rocksdb-production"), default)]
128 Lazy,
130 #[cfg_attr(feature = "rocksdb-production", default)]
131 OnCreation,
133}
134
135#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138 pub cache_capacity: Option<usize>,
139 pub max_fds: i32,
140 pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145 pub fn config_for_tests() -> Self {
146 Self {
147 cache_capacity: None,
148 max_fds: 512,
149 columns_policy: ColumnsPolicy::Lazy,
150 }
151 }
152}
153
154pub struct RocksDb<Description> {
155 read_options: ReadOptions,
156 db: Arc<PrimaryInstance>,
157 block_opts: Arc<BlockBasedOptions>,
158 create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159 snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160 metrics: Arc<DatabaseMetrics>,
161 _drop: Arc<DropResources>,
163 _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167 fn drop(&mut self) {
168 self.snapshot = None;
171 }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176 f.debug_struct("RocksDb").field("db", &self.db).finish()
177 }
178}
179
180impl<Description> RocksDb<Description>
181where
182 Description: DatabaseDescription,
183{
184 pub fn default_open_temp() -> DatabaseResult<Self> {
185 Self::default_open_temp_with_params(DatabaseConfig {
186 cache_capacity: None,
187 max_fds: 512,
188 columns_policy: ColumnsPolicy::Lazy,
189 })
190 }
191
192 pub fn default_open_temp_with_params(
193 database_config: DatabaseConfig,
194 ) -> DatabaseResult<Self> {
195 let tmp_dir = TempDir::new().unwrap();
196 let path = tmp_dir.path();
197 let result = Self::open(
198 path,
199 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200 database_config,
201 );
202 let mut db = result?;
203
204 db._drop = Arc::new(
205 {
206 move || {
207 drop(tmp_dir);
209 }
210 }
211 .into(),
212 );
213
214 Ok(db)
215 }
216
217 pub fn default_open<P: AsRef<Path>>(
218 path: P,
219 database_config: DatabaseConfig,
220 ) -> DatabaseResult<Self> {
221 Self::open(
222 path,
223 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224 database_config,
225 )
226 }
227
228 pub fn prune(path: &Path) -> DatabaseResult<()> {
229 let path = path.join(Description::name());
230 DB::destroy(&Options::default(), path)
231 .map_err(|e| DatabaseError::Other(e.into()))?;
232 Ok(())
233 }
234
235 pub fn open<P: AsRef<Path>>(
236 path: P,
237 columns: Vec<Description::Column>,
238 database_config: DatabaseConfig,
239 ) -> DatabaseResult<Self> {
240 Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241 }
242
243 pub fn open_read_only<P: AsRef<Path>>(
244 path: P,
245 columns: Vec<Description::Column>,
246 error_if_log_file_exist: bool,
247 database_config: DatabaseConfig,
248 ) -> DatabaseResult<Self> {
249 Self::open_with(
250 |options, primary_path, cfs| {
251 DB::open_cf_descriptors_read_only(
252 options,
253 primary_path,
254 cfs,
255 error_if_log_file_exist,
256 )
257 },
258 path,
259 columns,
260 database_config,
261 )
262 }
263
264 pub fn open_secondary<PrimaryPath, SecondaryPath>(
265 path: PrimaryPath,
266 secondary_path: SecondaryPath,
267 columns: Vec<Description::Column>,
268 database_config: DatabaseConfig,
269 ) -> DatabaseResult<Self>
270 where
271 PrimaryPath: AsRef<Path>,
272 SecondaryPath: AsRef<Path>,
273 {
274 Self::open_with(
275 |options, primary_path, cfs| {
276 DB::open_cf_descriptors_as_secondary(
277 options,
278 primary_path,
279 secondary_path.as_ref().to_path_buf(),
280 cfs,
281 )
282 },
283 path,
284 columns,
285 database_config,
286 )
287 }
288
289 pub fn open_with<F, P>(
290 opener: F,
291 path: P,
292 columns: Vec<Description::Column>,
293 database_config: DatabaseConfig,
294 ) -> DatabaseResult<Self>
295 where
296 F: Fn(
297 &Options,
298 PathBuf,
299 Vec<ColumnFamilyDescriptor>,
300 ) -> Result<DB, rocksdb::Error>,
301 P: AsRef<Path>,
302 {
303 let original_path = path.as_ref().to_path_buf();
304 let path = original_path.join(Description::name());
305 let metric_columns = columns
306 .iter()
307 .map(|column| (column.id(), column.name()))
308 .collect::<Vec<_>>();
309 let metrics = Arc::new(DatabaseMetrics::new(
310 Description::name().as_str(),
311 &metric_columns,
312 ));
313 let mut block_opts = BlockBasedOptions::default();
314 block_opts.set_format_version(5);
316
317 if let Some(capacity) = database_config.cache_capacity {
318 let block_cache_size = capacity / 3;
321 let cache = Cache::new_lru_cache(block_cache_size);
322 block_opts.set_block_cache(&cache);
323 block_opts.set_cache_index_and_filter_blocks(true);
326 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328 } else {
329 block_opts.disable_cache();
330 }
331 block_opts.set_bloom_filter(10.0, true);
332 block_opts.set_block_size(16 * 1024);
333
334 let mut opts = Options::default();
335 opts.set_compression_type(DBCompressionType::Lz4);
336 opts.set_max_total_wal_size(64 * 1024 * 1024);
338 let cpu_number =
339 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340 opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341 if let Some(capacity) = database_config.cache_capacity {
342 let row_cache_size = capacity / 3;
347 let cache = Cache::new_lru_cache(row_cache_size);
348 opts.set_row_cache(&cache);
349 }
350 opts.set_max_background_jobs(6);
351 opts.set_bytes_per_sync(1048576);
352 opts.set_max_open_files(database_config.max_fds);
353
354 let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356 let mut cf_descriptors_to_open = BTreeMap::new();
357 let mut cf_descriptors_to_create = BTreeMap::new();
358 for column in columns.clone() {
359 let column_name = Self::col_name(column.id());
360 let opts = Self::cf_opts(column, &block_opts);
361 if existing_column_families.contains(&column_name) {
362 cf_descriptors_to_open.insert(column_name, opts);
363 } else {
364 cf_descriptors_to_create.insert(column_name, opts);
365 }
366 }
367
368 if database_config.columns_policy == ColumnsPolicy::OnCreation
369 || (database_config.columns_policy == ColumnsPolicy::Lazy
370 && cf_descriptors_to_open.is_empty())
371 {
372 opts.create_if_missing(true);
373 }
374
375 let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376 .iter()
377 .filter(|column_name| {
378 !cf_descriptors_to_open.contains_key(*column_name)
379 && !cf_descriptors_to_create.contains_key(*column_name)
380 })
381 .map(|unknown_column_name| {
382 let unknown_column_options = Self::default_opts(&block_opts);
383 (unknown_column_name.clone(), unknown_column_options)
384 })
385 .collect();
386 cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388 let iterator = cf_descriptors_to_open
389 .clone()
390 .into_iter()
391 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392 .collect::<Vec<_>>();
393
394 let db = match opener(&opts, path.clone(), iterator) {
395 Ok(db) => {
396 Ok(db)
397 },
398 Err(err) => {
399 tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401 let iterator = cf_descriptors_to_open
402 .clone()
403 .into_iter()
404 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405 .collect::<Vec<_>>();
406
407 opener(&opts, path.clone(), iterator)
408 },
409 }
410 .map_err(|e| DatabaseError::Other(e.into()))?;
411
412 let create_family = match database_config.columns_policy {
413 ColumnsPolicy::OnCreation => {
414 for (name, opt) in cf_descriptors_to_create {
415 db.create_cf(name, &opt)
416 .map_err(|e| DatabaseError::Other(e.into()))?;
417 }
418 None
419 }
420 ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421 };
422 let db = Arc::new(PrimaryInstance(db));
423
424 let rocks_db = RocksDb {
425 read_options: Self::generate_read_options(&None),
426 block_opts: Arc::new(block_opts),
427 snapshot: None,
428 db,
429 metrics,
430 create_family,
431 _drop: Default::default(),
432 _marker: Default::default(),
433 };
434
435 Ok(rocks_db)
436 }
437
438 fn generate_read_options(
439 snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440 ) -> ReadOptions {
441 let mut opts = ReadOptions::default();
442 opts.set_verify_checksums(false);
443 if let Some(snapshot) = &snapshot {
444 opts.set_snapshot(snapshot);
445 }
446 opts
447 }
448
449 fn read_options(&self) -> ReadOptions {
450 Self::generate_read_options(&self.snapshot)
451 }
452
453 pub fn create_snapshot(&self) -> Self {
454 self.create_snapshot_generic()
455 }
456
457 pub fn create_snapshot_generic<TargetDescription>(
458 &self,
459 ) -> RocksDb<TargetDescription> {
460 let db = self.db.clone();
461 let block_opts = self.block_opts.clone();
462 let create_family = self.create_family.clone();
463 let metrics = self.metrics.clone();
464 let _drop = self._drop.clone();
465
466 #[allow(clippy::missing_transmute_annotations)]
471 let snapshot = unsafe {
473 let snapshot = db.snapshot();
474 core::mem::transmute(snapshot)
475 };
476 let snapshot = Some(snapshot);
477
478 RocksDb {
479 read_options: Self::generate_read_options(&snapshot),
480 block_opts,
481 snapshot,
482 db,
483 create_family,
484 metrics,
485 _drop,
486 _marker: Default::default(),
487 }
488 }
489
490 fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491 self.cf_u32(column.id())
492 }
493
494 fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495 let family = self.db.cf_handle(&Self::col_name(column));
496
497 match family {
498 None => match &self.create_family {
499 Some(create_family) => {
500 let mut lock = create_family
501 .lock()
502 .expect("The create family lock should be available");
503
504 let name = Self::col_name(column);
505 let Some(family) = lock.remove(&name) else {
506 return self
507 .db
508 .cf_handle(&Self::col_name(column))
509 .expect("No column family found");
510 };
511
512 self.db
513 .create_cf(&name, &family)
514 .expect("Couldn't create column family");
515
516 self.db.cf_handle(&name).expect("invalid column state")
517 }
518 _ => {
519 panic!("Columns in the DB should have been created on DB opening");
520 }
521 },
522 Some(family) => family,
523 }
524 }
525
526 fn col_name(column: u32) -> String {
527 format!("col-{}", column)
528 }
529
530 fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531 let mut opts = Options::default();
532 opts.create_if_missing(true);
533 opts.set_compression_type(DBCompressionType::Lz4);
534 opts.set_block_based_table_factory(block_opts);
535
536 opts
537 }
538
539 fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540 let mut opts = Self::default_opts(block_opts);
541
542 if let Some(size) = Description::prefix(&column) {
544 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545 }
546
547 opts
548 }
549
550 fn reverse_prefix_iter<T>(
555 &self,
556 prefix: &[u8],
557 column: Description::Column,
558 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559 where
560 T: ExtractItem,
561 {
562 let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563 let mut opts = self.read_options();
564 opts.set_total_order_seek(true);
570 self.iterator::<T>(
571 column,
572 opts,
573 IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574 )
575 });
576
577 match reverse_iterator {
578 Some(iterator) => {
579 let prefix = prefix.to_vec();
580 iterator
581 .take_while(move |item| {
582 if let Ok(item) = item {
583 T::starts_with(item, prefix.as_slice())
584 } else {
585 true
586 }
587 })
588 .into_boxed()
589 }
590 _ => {
591 let prefix = prefix.to_vec();
593 self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594 .take_while(move |item| {
595 if let Ok(item) = item {
596 T::starts_with(item, prefix.as_slice())
597 } else {
598 true
599 }
600 })
601 .into_boxed()
602 }
603 }
604 }
605
606 pub(crate) fn iterator<T>(
607 &self,
608 column: Description::Column,
609 opts: ReadOptions,
610 iter_mode: IteratorMode,
611 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612 where
613 T: ExtractItem,
614 {
615 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617 RocksDBKeyIterator::<_, T>::new(
618 self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619 iter_mode,
620 )
621 .map(move |item| {
622 item.inspect(|item| {
623 self.metrics.read_meter.inc();
624 column_metrics.map(|metric| metric.inc());
625 self.metrics.bytes_read.inc_by(T::size(item));
626 })
627 .map_err(|e| DatabaseError::Other(e.into()).into())
628 })
629 }
630
631 pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633 let column_name = Self::col_name(column.id());
635 self.db
636 .drop_cf(&column_name)
637 .map_err(|e| DatabaseError::Other(e.into()))?;
638
639 let column_name = Self::col_name(column.id());
641 let opts = Self::cf_opts(column, self.block_opts.as_ref());
642 self.db
643 .create_cf(&column_name, &opts)
644 .map_err(|e| DatabaseError::Other(e.into()))?;
645
646 Ok(())
647 }
648
649 pub fn multi_get<K, I>(
650 &self,
651 column: u32,
652 iterator: I,
653 ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654 where
655 I: Iterator<Item = K>,
656 K: AsRef<[u8]>,
657 {
658 let column_metrics = self.metrics.columns_read_statistic.get(&column);
659 let cl = self.cf_u32(column);
660 let results = self
661 .db
662 .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663 .into_iter()
664 .map(|el| {
665 self.metrics.read_meter.inc();
666 column_metrics.map(|metric| metric.inc());
667 el.map(|value| {
668 value.inspect(|vec| {
669 self.metrics.bytes_read.inc_by(vec.len() as u64);
670 })
671 })
672 .map_err(|err| DatabaseError::Other(err.into()))
673 })
674 .try_collect()?;
675 Ok(results)
676 }
677
678 fn _iter_store<T>(
679 &self,
680 column: Description::Column,
681 prefix: Option<&[u8]>,
682 start: Option<&[u8]>,
683 direction: IterDirection,
684 ) -> BoxedIter<'_, StorageResult<T::Item>>
685 where
686 T: ExtractItem,
687 {
688 match (prefix, start) {
689 (None, None) => {
690 let iter_mode =
691 match direction {
693 IterDirection::Forward => IteratorMode::Start,
694 IterDirection::Reverse => IteratorMode::End,
696 };
697 self.iterator::<T>(column, self.read_options(), iter_mode)
698 .into_boxed()
699 }
700 (Some(prefix), None) => {
701 if direction == IterDirection::Reverse {
702 self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703 } else {
704 let iter_mode = IteratorMode::From(
706 prefix,
707 convert_to_rocksdb_direction(direction),
708 );
709
710 let mut opts = self.read_options();
712 opts.set_prefix_same_as_start(true);
713
714 let prefix = prefix.to_vec();
715 self.iterator::<T>(column, opts, iter_mode)
716 .take_while(move |item| {
718 if let Ok(item) = item {
719 T::starts_with(item, prefix.as_slice())
720 } else {
721 true
722 }
723 })
724 .into_boxed()
725 }
726 }
727 (None, Some(start)) => {
728 let iter_mode =
730 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731 let mut opts = self.read_options();
732 opts.set_total_order_seek(true);
738 self.iterator::<T>(column, opts, iter_mode).into_boxed()
739 }
740 (Some(prefix), Some(start)) => {
741 if !start.starts_with(prefix) {
744 return iter::empty().into_boxed();
745 }
746
747 let prefix = prefix.to_vec();
750 let iter_mode =
751 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752 self.iterator::<T>(column, self.read_options(), iter_mode)
753 .take_while(move |item| {
754 if let Ok(item) = item {
755 T::starts_with(item, prefix.as_slice())
756 } else {
757 true
758 }
759 })
760 .into_boxed()
761 }
762 }
763 }
764
765 #[cfg(feature = "backup")]
766 fn backup_engine<P: AsRef<Path> + ?Sized>(
767 backup_dir: &P,
768 ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769 use rocksdb::{
770 Env,
771 backup::{
772 BackupEngine,
773 BackupEngineOptions,
774 },
775 };
776
777 let backup_dir = backup_dir.as_ref().join(Description::name());
778 let backup_dir_path = backup_dir.as_path();
779
780 let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781 .map_err(|e| {
782 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783 "Couldn't create backup engine options for path `{}`: {}",
784 backup_dir_path.display(),
785 e
786 ))
787 })?;
788
789 let cpu_number =
790 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792 backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794 let env = Env::new().map_err(|e| {
795 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796 "Couldn't create environment for backup: {}",
797 e
798 ))
799 })?;
800
801 let backup_engine =
802 BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804 "Couldn't open backup engine for path `{}`: {}",
805 backup_dir.display(),
806 e
807 ))
808 })?;
809
810 Ok(backup_engine)
811 }
812
813 #[cfg(feature = "backup")]
814 pub fn backup<P: AsRef<Path> + ?Sized>(
815 db_dir: &P,
816 backup_dir: &P,
817 ) -> DatabaseResult<()> {
818 let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820 let db_config = DatabaseConfig {
821 cache_capacity: None,
822 max_fds: -1,
823 columns_policy: ColumnsPolicy::Lazy,
824 };
825
826 let db = Self::open_read_only(
827 db_dir,
828 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829 false,
830 db_config,
831 )?;
832
833 backup_engine
834 .create_new_backup_flush(&db.db, true)
835 .map_err(|e| {
836 DatabaseError::BackupError(anyhow::anyhow!(
837 "Couldn't create new backup for path `{}`: {}",
838 backup_dir.as_ref().display(),
839 e
840 ))
841 })?;
842
843 Ok(())
844 }
845
846 #[cfg(feature = "backup")]
848 pub fn restore<P: AsRef<Path> + ?Sized>(
849 db_dir: &P,
850 backup_dir: &P,
851 ) -> DatabaseResult<()> {
852 use rocksdb::backup::RestoreOptions;
853
854 let mut backup_engine = Self::backup_engine(backup_dir)?;
855 let restore_option = RestoreOptions::default();
856
857 let db_dir = db_dir.as_ref().join(Description::name());
858 let db_dir_path = db_dir.as_path();
859 backup_engine
861 .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
862 .map_err(|e| {
863 DatabaseError::RestoreError(anyhow::anyhow!(
864 "Couldn't restore from latest backup for path `{}`: {}",
865 db_dir_path.display(),
866 e
867 ))
868 })?;
869
870 Ok(())
871 }
872
873 pub fn shutdown(&self) {
874 while Arc::strong_count(&self.db) > 1 {}
875 }
876}
877
878pub(crate) struct KeyOnly;
879
880impl ExtractItem for KeyOnly {
881 type Item = Vec<u8>;
882
883 fn extract_item<D>(
884 raw_iterator: &DBRawIteratorWithThreadMode<D>,
885 ) -> Option<Self::Item>
886 where
887 D: DBAccess,
888 {
889 raw_iterator.key().map(|key| key.to_vec())
890 }
891
892 fn size(item: &Self::Item) -> u64 {
893 item.len() as u64
894 }
895
896 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
897 item.starts_with(prefix)
898 }
899}
900
901pub(crate) struct KeyAndValue;
902
903impl ExtractItem for KeyAndValue {
904 type Item = (Vec<u8>, Value);
905
906 fn extract_item<D>(
907 raw_iterator: &DBRawIteratorWithThreadMode<D>,
908 ) -> Option<Self::Item>
909 where
910 D: DBAccess,
911 {
912 raw_iterator
913 .item()
914 .map(|(key, value)| (key.to_vec(), Value::from(value)))
915 }
916
917 fn size(item: &Self::Item) -> u64 {
918 item.0.len().saturating_add(item.1.len()) as u64
919 }
920
921 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
922 item.0.starts_with(prefix)
923 }
924}
925
926impl<Description> KeyValueInspect for RocksDb<Description>
927where
928 Description: DatabaseDescription,
929{
930 type Column = Description::Column;
931
932 fn size_of_value(
933 &self,
934 key: &[u8],
935 column: Self::Column,
936 ) -> StorageResult<Option<usize>> {
937 self.metrics.read_meter.inc();
938 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
939 column_metrics.map(|metric| metric.inc());
940
941 Ok(self
942 .db
943 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
944 .map_err(|e| DatabaseError::Other(e.into()))?
945 .map(|value| value.len()))
946 }
947
948 fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
949 self.metrics.read_meter.inc();
950 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
951 column_metrics.map(|metric| metric.inc());
952
953 let value = self
954 .db
955 .get_cf_opt(&self.cf(column), key, &self.read_options)
956 .map_err(|e| DatabaseError::Other(e.into()))?;
957
958 if let Some(value) = &value {
959 self.metrics.bytes_read.inc_by(value.len() as u64);
960 }
961
962 Ok(value.map(Arc::from))
963 }
964
965 fn read_exact(
966 &self,
967 key: &[u8],
968 column: Self::Column,
969 offset: usize,
970 buf: &mut [u8],
971 ) -> StorageResult<Result<usize, StorageReadError>> {
972 self.metrics.read_meter.inc();
973 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
974 column_metrics.map(|metric| metric.inc());
975
976 let Some(value) = self
977 .db
978 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
979 .map_err(|e| DatabaseError::Other(e.into()))?
980 else {
981 return Ok(Err(StorageReadError::KeyNotFound));
982 };
983
984 let bytes_len = value.len();
985 let start = offset;
986 let buf_len = buf.len();
987 let end = offset.saturating_add(buf_len);
988
989 if end > bytes_len {
990 return Ok(Err(StorageReadError::OutOfBounds));
991 }
992
993 let starting_from_offset = &value[start..end];
994 buf[..].copy_from_slice(starting_from_offset);
995
996 self.metrics
997 .bytes_read
998 .inc_by(starting_from_offset.len() as u64);
999
1000 Ok(Ok(buf_len))
1001 }
1002
1003 fn read_zerofill(
1004 &self,
1005 key: &[u8],
1006 column: Self::Column,
1007 offset: usize,
1008 buf: &mut [u8],
1009 ) -> StorageResult<Result<usize, StorageReadError>> {
1010 self.metrics.read_meter.inc();
1011 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
1012 column_metrics.map(|metric| metric.inc());
1013
1014 let Some(value) = self
1015 .db
1016 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
1017 .map_err(|e| DatabaseError::Other(e.into()))?
1018 else {
1019 return Ok(Err(StorageReadError::KeyNotFound));
1020 };
1021
1022 let bytes_len = value.len();
1023 let buf_len = buf.len();
1024
1025 let Some((_, after)) = value.split_at_checked(offset) else {
1026 return Ok(Err(StorageReadError::OutOfBounds));
1027 };
1028
1029 let (dst, rest) = buf.split_at_mut(buf_len.min(after.len()));
1030 dst.copy_from_slice(&after[..dst.len()]);
1031 rest.fill(0);
1032
1033 self.metrics.bytes_read.inc_by(dst.len() as u64);
1034
1035 Ok(Ok(bytes_len))
1036 }
1037}
1038
1039impl<Description> IterableStore for RocksDb<Description>
1040where
1041 Description: DatabaseDescription,
1042{
1043 fn iter_store(
1044 &self,
1045 column: Self::Column,
1046 prefix: Option<&[u8]>,
1047 start: Option<&[u8]>,
1048 direction: IterDirection,
1049 ) -> BoxedIter<'_, KVItem> {
1050 self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1051 }
1052
1053 fn iter_store_keys(
1054 &self,
1055 column: Self::Column,
1056 prefix: Option<&[u8]>,
1057 start: Option<&[u8]>,
1058 direction: IterDirection,
1059 ) -> BoxedIter<'_, KeyItem> {
1060 self._iter_store::<KeyOnly>(column, prefix, start, direction)
1061 }
1062}
1063
1064impl<Description> RocksDb<Description>
1065where
1066 Description: DatabaseDescription,
1067{
1068 pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1069 let instant = std::time::Instant::now();
1070 let mut batch = WriteBatch::default();
1071 let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1072
1073 match changes {
1074 StorageChanges::Changes(changes) => {
1075 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1076 }
1077 StorageChanges::ChangesList(changes_list) => {
1078 for changes in changes_list {
1079 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1080 }
1081 }
1082 }
1083
1084 self.db
1085 .write(batch)
1086 .map_err(|e| DatabaseError::Other(e.into()))?;
1087 self.metrics.database_commit_time.inc_by(
1089 u64::try_from(instant.elapsed().as_nanos())
1090 .expect("The commit shouldn't take longer than `u64`"),
1091 );
1092
1093 Ok(())
1094 }
1095
1096 fn _populate_batch<'a>(
1097 &self,
1098 batch: &mut WriteBatch,
1099 conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1100 changes: &'a Changes,
1101 ) -> DatabaseResult<()> {
1102 for (column, ops) in changes {
1103 let cf = self.cf_u32(*column);
1104 let column_metrics = self.metrics.columns_write_statistic.get(column);
1105 for (key, op) in ops {
1106 self.metrics.write_meter.inc();
1107 column_metrics.map(|metric| metric.inc());
1108
1109 if !conflict_finder.insert((column, key)) {
1110 return Err(DatabaseError::ConflictingChanges {
1111 column: *column,
1112 key: key.clone(),
1113 });
1114 }
1115
1116 match op {
1117 WriteOperation::Insert(value) => {
1118 self.metrics.bytes_written.inc_by(value.len() as u64);
1119 batch.put_cf(&cf, key, value.as_ref());
1120 }
1121 WriteOperation::Remove => {
1122 batch.delete_cf(&cf, key);
1123 }
1124 }
1125 }
1126 }
1127 Ok(())
1128 }
1129}
1130
1131fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1133 for byte in prefix.iter_mut().rev() {
1134 if let Some(new_byte) = byte.checked_add(1) {
1135 *byte = new_byte;
1136 return Some(prefix);
1137 }
1138 }
1139 None
1140}
1141
1142#[cfg(feature = "test-helpers")]
1143pub mod test_helpers {
1144 use super::*;
1145 use fuel_core_storage::{
1146 kv_store::KeyValueMutate,
1147 transactional::ReadTransaction,
1148 };
1149
1150 impl<Description> KeyValueMutate for RocksDb<Description>
1151 where
1152 Description: DatabaseDescription,
1153 {
1154 fn write(
1155 &mut self,
1156 key: &[u8],
1157 column: Self::Column,
1158 buf: &[u8],
1159 ) -> StorageResult<usize> {
1160 let mut transaction = self.read_transaction();
1161 let len = transaction.write(key, column, buf)?;
1162 let changes = transaction.into_changes();
1163 self.commit_changes(&StorageChanges::Changes(changes))?;
1164
1165 Ok(len)
1166 }
1167
1168 fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1169 let mut transaction = self.read_transaction();
1170 transaction.delete(key, column)?;
1171 let changes = transaction.into_changes();
1172 self.commit_changes(&StorageChanges::Changes(changes))?;
1173 Ok(())
1174 }
1175 }
1176}
1177
1178#[allow(non_snake_case)]
1179#[cfg(test)]
1180mod tests {
1181 use super::*;
1182 use crate::database::database_description::on_chain::OnChain;
1183 use fuel_core_storage::{
1184 column::Column,
1185 kv_store::KeyValueMutate,
1186 };
1187 use std::collections::{
1188 BTreeMap,
1189 HashMap,
1190 };
1191 use tempfile::TempDir;
1192
1193 fn create_db() -> (RocksDb<OnChain>, TempDir) {
1194 let tmp_dir = TempDir::new().unwrap();
1195 (
1196 RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1197 .unwrap(),
1198 tmp_dir,
1199 )
1200 }
1201
1202 #[test]
1203 fn open_new_columns() {
1204 let tmp_dir = TempDir::new().unwrap();
1205
1206 let old_columns =
1208 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1209 let database_with_old_columns = RocksDb::<OnChain>::open(
1210 tmp_dir.path(),
1211 old_columns.clone(),
1212 DatabaseConfig::config_for_tests(),
1213 )
1214 .expect("Failed to open database with old columns");
1215 drop(database_with_old_columns);
1216
1217 let mut new_columns = old_columns;
1219 new_columns.push(Column::ContractsAssets);
1220 new_columns.push(Column::Metadata);
1221 let database_with_new_columns = RocksDb::<OnChain>::open(
1222 tmp_dir.path(),
1223 new_columns,
1224 DatabaseConfig::config_for_tests(),
1225 )
1226 .map(|_| ());
1227
1228 assert_eq!(Ok(()), database_with_new_columns);
1230 }
1231
1232 #[test]
1233 fn can_put_and_read() {
1234 let key = vec![0xA, 0xB, 0xC];
1235
1236 let (mut db, _tmp) = create_db();
1237 let expected = Value::from([1, 2, 3]);
1238 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1239
1240 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1241 }
1242
1243 #[test]
1244 fn put_returns_previous_value() {
1245 let key = vec![0xA, 0xB, 0xC];
1246
1247 let (mut db, _tmp) = create_db();
1248 let expected = Value::from([1, 2, 3]);
1249 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1250 let prev = db
1251 .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1252 .unwrap();
1253
1254 assert_eq!(prev, Some(expected));
1255 }
1256
1257 #[test]
1258 fn delete_and_get() {
1259 let key = vec![0xA, 0xB, 0xC];
1260
1261 let (mut db, _tmp) = create_db();
1262 let expected = Value::from([1, 2, 3]);
1263 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1264 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1265
1266 db.delete(&key, Column::Metadata).unwrap();
1267 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1268 }
1269
1270 #[test]
1271 fn key_exists() {
1272 let key = vec![0xA, 0xB, 0xC];
1273
1274 let (mut db, _tmp) = create_db();
1275 let expected = Arc::new([1, 2, 3]);
1276 db.put(&key, Column::Metadata, expected).unwrap();
1277 assert!(db.exists(&key, Column::Metadata).unwrap());
1278 }
1279
1280 #[test]
1281 fn commit_changes_inserts() {
1282 let key = vec![0xA, 0xB, 0xC];
1283 let value = Value::from([1, 2, 3]);
1284
1285 let (db, _tmp) = create_db();
1286 let ops = vec![(
1287 Column::Metadata.id(),
1288 BTreeMap::from_iter(vec![(
1289 key.clone().into(),
1290 WriteOperation::Insert(value.clone()),
1291 )]),
1292 )];
1293
1294 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1295 .unwrap();
1296 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1297 }
1298
1299 #[test]
1300 fn commit_changes_removes() {
1301 let key = vec![0xA, 0xB, 0xC];
1302 let value = Arc::new([1, 2, 3]);
1303
1304 let (mut db, _tmp) = create_db();
1305 db.put(&key, Column::Metadata, value).unwrap();
1306
1307 let ops = vec![(
1308 Column::Metadata.id(),
1309 BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1310 )];
1311 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1312 .unwrap();
1313
1314 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1315 }
1316
1317 #[test]
1318 fn can_use_unit_value() {
1319 let key = vec![0x00];
1320
1321 let (mut db, _tmp) = create_db();
1322 let expected = Value::from([]);
1323 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1324
1325 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1326
1327 assert!(db.exists(&key, Column::Metadata).unwrap());
1328
1329 assert_eq!(
1330 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1331 .collect::<Result<Vec<_>, _>>()
1332 .unwrap()[0],
1333 (key.clone(), expected.clone())
1334 );
1335
1336 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1337
1338 assert!(!db.exists(&key, Column::Metadata).unwrap());
1339 }
1340
1341 #[test]
1342 fn can_use_unit_key() {
1343 let key: Vec<u8> = Vec::with_capacity(0);
1344
1345 let (mut db, _tmp) = create_db();
1346 let expected = Value::from([1, 2, 3]);
1347 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1348
1349 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1350
1351 assert!(db.exists(&key, Column::Metadata).unwrap());
1352
1353 assert_eq!(
1354 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1355 .collect::<Result<Vec<_>, _>>()
1356 .unwrap()[0],
1357 (key.clone(), expected.clone())
1358 );
1359
1360 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1361
1362 assert!(!db.exists(&key, Column::Metadata).unwrap());
1363 }
1364
1365 #[test]
1366 fn can_use_unit_key_and_value() {
1367 let key: Vec<u8> = Vec::with_capacity(0);
1368
1369 let (mut db, _tmp) = create_db();
1370 let expected = Value::from([]);
1371 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1372
1373 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1374
1375 assert!(db.exists(&key, Column::Metadata).unwrap());
1376
1377 assert_eq!(
1378 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1379 .collect::<Result<Vec<_>, _>>()
1380 .unwrap()[0],
1381 (key.clone(), expected.clone())
1382 );
1383
1384 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1385
1386 assert!(!db.exists(&key, Column::Metadata).unwrap());
1387 }
1388
1389 #[test]
1390 fn open_primary_db_second_time_fails() {
1391 let (_primary_db, tmp_dir) = create_db();
1393
1394 let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1396 .collect::<Vec<_>>();
1397 let result = RocksDb::<OnChain>::open(
1398 tmp_dir.path(),
1399 columns,
1400 DatabaseConfig::config_for_tests(),
1401 );
1402
1403 assert!(result.is_err());
1405 }
1406
1407 #[test]
1408 fn open_second_read_only_db() {
1409 let (_primary_db, tmp_dir) = create_db();
1411
1412 let old_columns =
1414 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1415 let result = RocksDb::<OnChain>::open_read_only(
1416 tmp_dir.path(),
1417 old_columns.clone(),
1418 false,
1419 DatabaseConfig::config_for_tests(),
1420 )
1421 .map(|_| ());
1422
1423 assert_eq!(Ok(()), result);
1425 }
1426
1427 #[test]
1428 fn open_secondary_db() {
1429 let (_primary_db, tmp_dir) = create_db();
1431 let secondary_temp = TempDir::new().unwrap();
1432
1433 let old_columns =
1435 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1436 let result = RocksDb::<OnChain>::open_secondary(
1437 tmp_dir.path(),
1438 secondary_temp.path(),
1439 old_columns.clone(),
1440 DatabaseConfig::config_for_tests(),
1441 )
1442 .map(|_| ());
1443
1444 assert_eq!(Ok(()), result);
1446 }
1447
1448 #[test]
1449 fn snapshot_allows_get_entry_after_it_was_removed() {
1450 let (mut db, _tmp) = create_db();
1451 let value = Value::from([1, 2, 3]);
1452
1453 let key_1 = [1; 32];
1455 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1456 let snapshot = db.create_snapshot();
1457
1458 db.delete(&key_1, Column::Metadata).unwrap();
1460
1461 let db_get = db.get(&key_1, Column::Metadata).unwrap();
1463 assert!(db_get.is_none());
1464
1465 let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1466 assert_eq!(snapshot_get, Some(value));
1467 }
1468
1469 #[test]
1470 fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1471 let (mut db, _tmp) = create_db();
1472 let value = Value::from([1, 2, 3]);
1473
1474 let key_1 = vec![1; 32];
1476 let key_2 = vec![2; 32];
1477 let key_3 = vec![3; 32];
1478 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1479 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1480 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1481 let snapshot = db.create_snapshot();
1482
1483 db.delete(&key_1, Column::Metadata).unwrap();
1485 db.delete(&key_2, Column::Metadata).unwrap();
1486 db.delete(&key_3, Column::Metadata).unwrap();
1487
1488 let db_iter = db
1490 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1491 .collect::<Vec<_>>();
1492 assert!(db_iter.is_empty());
1493
1494 let snapshot_iter = snapshot
1495 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1496 .collect::<Vec<_>>();
1497 assert_eq!(
1498 snapshot_iter,
1499 vec![
1500 Ok((key_1, value.clone())),
1501 Ok((key_2, value.clone())),
1502 Ok((key_3, value))
1503 ]
1504 );
1505 }
1506
1507 #[test]
1508 fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1509 let (db, _tmp) = create_db();
1510
1511 let snapshot = db.create_snapshot();
1513
1514 drop(db);
1516
1517 drop(snapshot);
1519 }
1520
1521 #[test]
1522 fn open__opens_subset_of_columns_after_opening_all_columns() {
1523 let (first_open_with_all_columns, tmp_dir) = create_db();
1525
1526 drop(first_open_with_all_columns);
1528 let part_of_columns =
1529 enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1530 .skip(1)
1531 .collect::<Vec<_>>();
1532 let open_with_part_of_columns = RocksDb::<OnChain>::open(
1533 tmp_dir.path(),
1534 part_of_columns,
1535 DatabaseConfig::config_for_tests(),
1536 );
1537
1538 let _ = open_with_part_of_columns
1540 .expect("Should open the database with shorter number of columns");
1541 }
1542
1543 #[test]
1544 fn iter_store__reverse_iterator__no_target_prefix() {
1545 let (mut db, _tmp) = create_db();
1547 let value = Value::from([]);
1548 let key_1 = [1, 1];
1549 let key_2 = [2, 2];
1550 let key_3 = [9, 3];
1551 let key_4 = [10, 0];
1552 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1553 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1554 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1555 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1556
1557 let db_iter = db
1559 .iter_store(
1560 Column::Metadata,
1561 Some(vec![5].as_slice()),
1562 None,
1563 IterDirection::Reverse,
1564 )
1565 .map(|item| item.map(|(key, _)| key))
1566 .collect::<Vec<_>>();
1567
1568 assert_eq!(db_iter, vec![]);
1570 }
1571
1572 #[test]
1573 fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1574 let (mut db, _tmp) = create_db();
1576 let value = Value::from([]);
1577 let key_1 = [1, 1];
1578 let key_2 = [2, 2];
1579 let key_3 = [2, 3];
1580 let key_4 = [10, 0];
1581 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1582 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1583 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1584 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1585
1586 let db_iter = db
1588 .iter_store(
1589 Column::Metadata,
1590 Some(vec![2].as_slice()),
1591 None,
1592 IterDirection::Reverse,
1593 )
1594 .map(|item| item.map(|(key, _)| key))
1595 .collect::<Vec<_>>();
1596
1597 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1599 }
1600
1601 #[test]
1602 fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1603 let (mut db, _tmp) = create_db();
1605 let value = Value::from([]);
1606 let key_1 = [1, 1];
1607 let key_2 = [2, 2];
1608 let key_3 = [2, 3];
1609 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1610 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1611 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1612
1613 let db_iter = db
1615 .iter_store(
1616 Column::Metadata,
1617 Some(vec![2].as_slice()),
1618 None,
1619 IterDirection::Reverse,
1620 )
1621 .map(|item| item.map(|(key, _)| key))
1622 .collect::<Vec<_>>();
1623
1624 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1626 }
1627
1628 #[test]
1629 fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1630 let (mut db, _tmp) = create_db();
1632 let value = Value::from([]);
1633 let key_1 = [1, 1];
1634 let key_2 = [255, 254];
1635 let key_3 = [255, 255];
1636 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1637 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1638 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1639
1640 let db_iter = db
1642 .iter_store(
1643 Column::Metadata,
1644 Some(vec![255].as_slice()),
1645 None,
1646 IterDirection::Reverse,
1647 )
1648 .map(|item| item.map(|(key, _)| key))
1649 .collect::<Vec<_>>();
1650
1651 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1653 }
1654
1655 #[test]
1656 fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1657 let (mut db, _tmp) = create_db();
1659 let value = Value::from([]);
1660 let key_1 = [1, 1];
1661 let key_2 = [255, 254];
1662 let key_3 = [255, 255];
1663 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1664 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1665 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1666
1667 db.clear_table(Column::Metadata).unwrap();
1669
1670 let db_iter = db
1672 .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1673 .collect::<Vec<_>>();
1674 assert_eq!(db_iter, vec![]);
1675 }
1676}