1use crate::{
2 database::{
3 Error as DatabaseError,
4 Result as DatabaseResult,
5 convert_to_rocksdb_direction,
6 database_description::DatabaseDescription,
7 },
8 state::IterDirection,
9};
10
11use super::rocks_db_key_iterator::{
12 ExtractItem,
13 RocksDBKeyIterator,
14};
15use core::ops::Deref;
16use fuel_core_metrics::core_metrics::DatabaseMetrics;
17use fuel_core_storage::{
18 Result as StorageResult,
19 StorageReadError,
20 iter::{
21 BoxedIter,
22 IntoBoxedIter,
23 IterableStore,
24 },
25 kv_store::{
26 KVItem,
27 KeyItem,
28 KeyValueInspect,
29 StorageColumn,
30 Value,
31 WriteOperation,
32 },
33 transactional::{
34 Changes,
35 ReferenceBytesKey,
36 StorageChanges,
37 },
38};
39use itertools::Itertools;
40use rocksdb::{
41 BlockBasedOptions,
42 BoundColumnFamily,
43 Cache,
44 ColumnFamilyDescriptor,
45 DBAccess,
46 DBCompressionType,
47 DBRawIteratorWithThreadMode,
48 DBWithThreadMode,
49 IteratorMode,
50 MultiThreaded,
51 Options,
52 ReadOptions,
53 SliceTransform,
54 WriteBatch,
55};
56use std::{
57 cmp,
58 collections::{
59 BTreeMap,
60 HashSet,
61 },
62 fmt,
63 fmt::Formatter,
64 iter,
65 path::{
66 Path,
67 PathBuf,
68 },
69 sync::{
70 Arc,
71 Mutex,
72 },
73};
74use tempfile::TempDir;
75
76#[derive(Debug)]
77struct PrimaryInstance(DBWithThreadMode<MultiThreaded>);
78
79impl Deref for PrimaryInstance {
80 type Target = DBWithThreadMode<MultiThreaded>;
81
82 fn deref(&self) -> &Self::Target {
83 &self.0
84 }
85}
86
87impl Drop for PrimaryInstance {
88 fn drop(&mut self) {
89 self.cancel_all_background_work(true);
90 }
91}
92
93type DB = DBWithThreadMode<MultiThreaded>;
94
95type DropFn = Box<dyn FnOnce() + Send + Sync>;
96#[derive(Default)]
97struct DropResources {
98 drop: Option<DropFn>,
100}
101
102impl fmt::Debug for DropResources {
103 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
104 write!(f, "DropResources")
105 }
106}
107
108impl<F: 'static + FnOnce() + Send + Sync> From<F> for DropResources {
109 fn from(closure: F) -> Self {
110 Self {
111 drop: Option::Some(Box::new(closure)),
112 }
113 }
114}
115
116impl Drop for DropResources {
117 fn drop(&mut self) {
118 if let Some(drop) = self.drop.take() {
119 (drop)()
120 }
121 }
122}
123
124#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
125pub enum ColumnsPolicy {
127 #[cfg_attr(not(feature = "rocksdb-production"), default)]
128 Lazy,
130 #[cfg_attr(feature = "rocksdb-production", default)]
131 OnCreation,
133}
134
135#[derive(Debug, Copy, Clone, PartialEq, Eq)]
137pub struct DatabaseConfig {
138 pub cache_capacity: Option<usize>,
139 pub max_fds: i32,
140 pub columns_policy: ColumnsPolicy,
141}
142
143#[cfg(feature = "test-helpers")]
144impl DatabaseConfig {
145 pub fn config_for_tests() -> Self {
146 Self {
147 cache_capacity: None,
148 max_fds: 512,
149 columns_policy: ColumnsPolicy::Lazy,
150 }
151 }
152}
153
154pub struct RocksDb<Description> {
155 read_options: ReadOptions,
156 db: Arc<PrimaryInstance>,
157 block_opts: Arc<BlockBasedOptions>,
158 create_family: Option<Arc<Mutex<BTreeMap<String, Options>>>>,
159 snapshot: Option<rocksdb::SnapshotWithThreadMode<'static, DB>>,
160 metrics: Arc<DatabaseMetrics>,
161 _drop: Arc<DropResources>,
163 _marker: core::marker::PhantomData<Description>,
164}
165
166impl<Description> Drop for RocksDb<Description> {
167 fn drop(&mut self) {
168 self.snapshot = None;
171 }
172}
173
174impl<Description> std::fmt::Debug for RocksDb<Description> {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
176 f.debug_struct("RocksDb").field("db", &self.db).finish()
177 }
178}
179
180impl<Description> RocksDb<Description>
181where
182 Description: DatabaseDescription,
183{
184 pub fn default_open_temp() -> DatabaseResult<Self> {
185 Self::default_open_temp_with_params(DatabaseConfig {
186 cache_capacity: None,
187 max_fds: 512,
188 columns_policy: ColumnsPolicy::Lazy,
189 })
190 }
191
192 pub fn default_open_temp_with_params(
193 database_config: DatabaseConfig,
194 ) -> DatabaseResult<Self> {
195 let tmp_dir = TempDir::new().unwrap();
196 let path = tmp_dir.path();
197 let result = Self::open(
198 path,
199 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
200 database_config,
201 );
202 let mut db = result?;
203
204 db._drop = Arc::new(
205 {
206 move || {
207 drop(tmp_dir);
209 }
210 }
211 .into(),
212 );
213
214 Ok(db)
215 }
216
217 pub fn default_open<P: AsRef<Path>>(
218 path: P,
219 database_config: DatabaseConfig,
220 ) -> DatabaseResult<Self> {
221 Self::open(
222 path,
223 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
224 database_config,
225 )
226 }
227
228 pub fn prune(path: &Path) -> DatabaseResult<()> {
229 let path = path.join(Description::name());
230 DB::destroy(&Options::default(), path)
231 .map_err(|e| DatabaseError::Other(e.into()))?;
232 Ok(())
233 }
234
235 pub fn open<P: AsRef<Path>>(
236 path: P,
237 columns: Vec<Description::Column>,
238 database_config: DatabaseConfig,
239 ) -> DatabaseResult<Self> {
240 Self::open_with(DB::open_cf_descriptors, path, columns, database_config)
241 }
242
243 pub fn open_read_only<P: AsRef<Path>>(
244 path: P,
245 columns: Vec<Description::Column>,
246 error_if_log_file_exist: bool,
247 database_config: DatabaseConfig,
248 ) -> DatabaseResult<Self> {
249 Self::open_with(
250 |options, primary_path, cfs| {
251 DB::open_cf_descriptors_read_only(
252 options,
253 primary_path,
254 cfs,
255 error_if_log_file_exist,
256 )
257 },
258 path,
259 columns,
260 database_config,
261 )
262 }
263
264 pub fn open_secondary<PrimaryPath, SecondaryPath>(
265 path: PrimaryPath,
266 secondary_path: SecondaryPath,
267 columns: Vec<Description::Column>,
268 database_config: DatabaseConfig,
269 ) -> DatabaseResult<Self>
270 where
271 PrimaryPath: AsRef<Path>,
272 SecondaryPath: AsRef<Path>,
273 {
274 Self::open_with(
275 |options, primary_path, cfs| {
276 DB::open_cf_descriptors_as_secondary(
277 options,
278 primary_path,
279 secondary_path.as_ref().to_path_buf(),
280 cfs,
281 )
282 },
283 path,
284 columns,
285 database_config,
286 )
287 }
288
289 pub fn open_with<F, P>(
290 opener: F,
291 path: P,
292 columns: Vec<Description::Column>,
293 database_config: DatabaseConfig,
294 ) -> DatabaseResult<Self>
295 where
296 F: Fn(
297 &Options,
298 PathBuf,
299 Vec<ColumnFamilyDescriptor>,
300 ) -> Result<DB, rocksdb::Error>,
301 P: AsRef<Path>,
302 {
303 let original_path = path.as_ref().to_path_buf();
304 let path = original_path.join(Description::name());
305 let metric_columns = columns
306 .iter()
307 .map(|column| (column.id(), column.name()))
308 .collect::<Vec<_>>();
309 let metrics = Arc::new(DatabaseMetrics::new(
310 Description::name().as_str(),
311 &metric_columns,
312 ));
313 let mut block_opts = BlockBasedOptions::default();
314 block_opts.set_format_version(5);
316
317 if let Some(capacity) = database_config.cache_capacity {
318 let block_cache_size = capacity / 3;
321 let cache = Cache::new_lru_cache(block_cache_size);
322 block_opts.set_block_cache(&cache);
323 block_opts.set_cache_index_and_filter_blocks(true);
326 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
328 } else {
329 block_opts.disable_cache();
330 }
331 block_opts.set_bloom_filter(10.0, true);
332 block_opts.set_block_size(16 * 1024);
333
334 let mut opts = Options::default();
335 opts.set_compression_type(DBCompressionType::Lz4);
336 opts.set_max_total_wal_size(64 * 1024 * 1024);
338 let cpu_number =
339 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
340 opts.increase_parallelism(cmp::max(1, cpu_number / 2));
341 if let Some(capacity) = database_config.cache_capacity {
342 let row_cache_size = capacity / 3;
347 let cache = Cache::new_lru_cache(row_cache_size);
348 opts.set_row_cache(&cache);
349 }
350 opts.set_max_background_jobs(6);
351 opts.set_bytes_per_sync(1048576);
352 opts.set_max_open_files(database_config.max_fds);
353
354 let existing_column_families = DB::list_cf(&opts, &path).unwrap_or_default();
355
356 let mut cf_descriptors_to_open = BTreeMap::new();
357 let mut cf_descriptors_to_create = BTreeMap::new();
358 for column in columns.clone() {
359 let column_name = Self::col_name(column.id());
360 let opts = Self::cf_opts(column, &block_opts);
361 if existing_column_families.contains(&column_name) {
362 cf_descriptors_to_open.insert(column_name, opts);
363 } else {
364 cf_descriptors_to_create.insert(column_name, opts);
365 }
366 }
367
368 if database_config.columns_policy == ColumnsPolicy::OnCreation
369 || (database_config.columns_policy == ColumnsPolicy::Lazy
370 && cf_descriptors_to_open.is_empty())
371 {
372 opts.create_if_missing(true);
373 }
374
375 let unknown_columns_to_open: BTreeMap<_, _> = existing_column_families
376 .iter()
377 .filter(|column_name| {
378 !cf_descriptors_to_open.contains_key(*column_name)
379 && !cf_descriptors_to_create.contains_key(*column_name)
380 })
381 .map(|unknown_column_name| {
382 let unknown_column_options = Self::default_opts(&block_opts);
383 (unknown_column_name.clone(), unknown_column_options)
384 })
385 .collect();
386 cf_descriptors_to_open.extend(unknown_columns_to_open);
387
388 let iterator = cf_descriptors_to_open
389 .clone()
390 .into_iter()
391 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
392 .collect::<Vec<_>>();
393
394 let db = match opener(&opts, path.clone(), iterator) {
395 Ok(db) => {
396 Ok(db)
397 },
398 Err(err) => {
399 tracing::error!("Couldn't open the database with an error: {}. \nTrying to reopen the database", err);
400
401 let iterator = cf_descriptors_to_open
402 .clone()
403 .into_iter()
404 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
405 .collect::<Vec<_>>();
406
407 opener(&opts, path.clone(), iterator)
408 },
409 }
410 .map_err(|e| DatabaseError::Other(e.into()))?;
411
412 let create_family = match database_config.columns_policy {
413 ColumnsPolicy::OnCreation => {
414 for (name, opt) in cf_descriptors_to_create {
415 db.create_cf(name, &opt)
416 .map_err(|e| DatabaseError::Other(e.into()))?;
417 }
418 None
419 }
420 ColumnsPolicy::Lazy => Some(Arc::new(Mutex::new(cf_descriptors_to_create))),
421 };
422 let db = Arc::new(PrimaryInstance(db));
423
424 let rocks_db = RocksDb {
425 read_options: Self::generate_read_options(&None),
426 block_opts: Arc::new(block_opts),
427 snapshot: None,
428 db,
429 metrics,
430 create_family,
431 _drop: Default::default(),
432 _marker: Default::default(),
433 };
434
435 Ok(rocks_db)
436 }
437
438 fn generate_read_options(
439 snapshot: &Option<rocksdb::SnapshotWithThreadMode<DB>>,
440 ) -> ReadOptions {
441 let mut opts = ReadOptions::default();
442 opts.set_verify_checksums(false);
443 if let Some(snapshot) = &snapshot {
444 opts.set_snapshot(snapshot);
445 }
446 opts
447 }
448
449 fn read_options(&self) -> ReadOptions {
450 Self::generate_read_options(&self.snapshot)
451 }
452
453 pub fn create_snapshot(&self) -> Self {
454 self.create_snapshot_generic()
455 }
456
457 pub fn create_snapshot_generic<TargetDescription>(
458 &self,
459 ) -> RocksDb<TargetDescription> {
460 let db = self.db.clone();
461 let block_opts = self.block_opts.clone();
462 let create_family = self.create_family.clone();
463 let metrics = self.metrics.clone();
464 let _drop = self._drop.clone();
465
466 #[allow(clippy::missing_transmute_annotations)]
471 let snapshot = unsafe {
473 let snapshot = db.snapshot();
474 core::mem::transmute(snapshot)
475 };
476 let snapshot = Some(snapshot);
477
478 RocksDb {
479 read_options: Self::generate_read_options(&snapshot),
480 block_opts,
481 snapshot,
482 db,
483 create_family,
484 metrics,
485 _drop,
486 _marker: Default::default(),
487 }
488 }
489
490 fn cf(&self, column: Description::Column) -> Arc<BoundColumnFamily<'_>> {
491 self.cf_u32(column.id())
492 }
493
494 fn cf_u32(&self, column: u32) -> Arc<BoundColumnFamily<'_>> {
495 let family = self.db.cf_handle(&Self::col_name(column));
496
497 match family {
498 None => match &self.create_family {
499 Some(create_family) => {
500 let mut lock = create_family
501 .lock()
502 .expect("The create family lock should be available");
503
504 let name = Self::col_name(column);
505 let Some(family) = lock.remove(&name) else {
506 return self
507 .db
508 .cf_handle(&Self::col_name(column))
509 .expect("No column family found");
510 };
511
512 self.db
513 .create_cf(&name, &family)
514 .expect("Couldn't create column family");
515
516 self.db.cf_handle(&name).expect("invalid column state")
517 }
518 _ => {
519 panic!("Columns in the DB should have been created on DB opening");
520 }
521 },
522 Some(family) => family,
523 }
524 }
525
526 fn col_name(column: u32) -> String {
527 format!("col-{}", column)
528 }
529
530 fn default_opts(block_opts: &BlockBasedOptions) -> Options {
531 let mut opts = Options::default();
532 opts.create_if_missing(true);
533 opts.set_compression_type(DBCompressionType::Lz4);
534 opts.set_block_based_table_factory(block_opts);
535
536 opts
537 }
538
539 fn cf_opts(column: Description::Column, block_opts: &BlockBasedOptions) -> Options {
540 let mut opts = Self::default_opts(block_opts);
541
542 if let Some(size) = Description::prefix(&column) {
544 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(size))
545 }
546
547 opts
548 }
549
550 fn reverse_prefix_iter<T>(
555 &self,
556 prefix: &[u8],
557 column: Description::Column,
558 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
559 where
560 T: ExtractItem,
561 {
562 let reverse_iterator = next_prefix(prefix.to_vec()).map(|next_prefix| {
563 let mut opts = self.read_options();
564 opts.set_total_order_seek(true);
570 self.iterator::<T>(
571 column,
572 opts,
573 IteratorMode::From(next_prefix.as_slice(), rocksdb::Direction::Reverse),
574 )
575 });
576
577 match reverse_iterator {
578 Some(iterator) => {
579 let prefix = prefix.to_vec();
580 iterator
581 .take_while(move |item| {
582 if let Ok(item) = item {
583 T::starts_with(item, prefix.as_slice())
584 } else {
585 true
586 }
587 })
588 .into_boxed()
589 }
590 _ => {
591 let prefix = prefix.to_vec();
593 self.iterator::<T>(column, self.read_options(), IteratorMode::End)
594 .take_while(move |item| {
595 if let Ok(item) = item {
596 T::starts_with(item, prefix.as_slice())
597 } else {
598 true
599 }
600 })
601 .into_boxed()
602 }
603 }
604 }
605
606 pub(crate) fn iterator<T>(
607 &self,
608 column: Description::Column,
609 opts: ReadOptions,
610 iter_mode: IteratorMode,
611 ) -> impl Iterator<Item = StorageResult<T::Item>> + '_ + use<'_, T, Description>
612 where
613 T: ExtractItem,
614 {
615 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
616
617 RocksDBKeyIterator::<_, T>::new(
618 self.db.raw_iterator_cf_opt(&self.cf(column), opts),
619 iter_mode,
620 )
621 .map(move |item| {
622 item.inspect(|item| {
623 self.metrics.read_meter.inc();
624 column_metrics.map(|metric| metric.inc());
625 self.metrics.bytes_read.inc_by(T::size(item));
626 })
627 .map_err(|e| DatabaseError::Other(e.into()).into())
628 })
629 }
630
631 pub fn clear_table(&self, column: Description::Column) -> DatabaseResult<()> {
633 let column_name = Self::col_name(column.id());
635 self.db
636 .drop_cf(&column_name)
637 .map_err(|e| DatabaseError::Other(e.into()))?;
638
639 let column_name = Self::col_name(column.id());
641 let opts = Self::cf_opts(column, self.block_opts.as_ref());
642 self.db
643 .create_cf(&column_name, &opts)
644 .map_err(|e| DatabaseError::Other(e.into()))?;
645
646 Ok(())
647 }
648
649 pub fn multi_get<K, I>(
650 &self,
651 column: u32,
652 iterator: I,
653 ) -> DatabaseResult<Vec<Option<Vec<u8>>>>
654 where
655 I: Iterator<Item = K>,
656 K: AsRef<[u8]>,
657 {
658 let column_metrics = self.metrics.columns_read_statistic.get(&column);
659 let cl = self.cf_u32(column);
660 let results = self
661 .db
662 .multi_get_cf_opt(iterator.map(|k| (&cl, k)), &self.read_options)
663 .into_iter()
664 .map(|el| {
665 self.metrics.read_meter.inc();
666 column_metrics.map(|metric| metric.inc());
667 el.map(|value| {
668 value.inspect(|vec| {
669 self.metrics.bytes_read.inc_by(vec.len() as u64);
670 })
671 })
672 .map_err(|err| DatabaseError::Other(err.into()))
673 })
674 .try_collect()?;
675 Ok(results)
676 }
677
678 fn _iter_store<T>(
679 &self,
680 column: Description::Column,
681 prefix: Option<&[u8]>,
682 start: Option<&[u8]>,
683 direction: IterDirection,
684 ) -> BoxedIter<'_, StorageResult<T::Item>>
685 where
686 T: ExtractItem,
687 {
688 match (prefix, start) {
689 (None, None) => {
690 let iter_mode =
691 match direction {
693 IterDirection::Forward => IteratorMode::Start,
694 IterDirection::Reverse => IteratorMode::End,
696 };
697 self.iterator::<T>(column, self.read_options(), iter_mode)
698 .into_boxed()
699 }
700 (Some(prefix), None) => {
701 if direction == IterDirection::Reverse {
702 self.reverse_prefix_iter::<T>(prefix, column).into_boxed()
703 } else {
704 let iter_mode = IteratorMode::From(
706 prefix,
707 convert_to_rocksdb_direction(direction),
708 );
709
710 let mut opts = self.read_options();
712 opts.set_prefix_same_as_start(true);
713
714 let prefix = prefix.to_vec();
715 self.iterator::<T>(column, opts, iter_mode)
716 .take_while(move |item| {
718 if let Ok(item) = item {
719 T::starts_with(item, prefix.as_slice())
720 } else {
721 true
722 }
723 })
724 .into_boxed()
725 }
726 }
727 (None, Some(start)) => {
728 let iter_mode =
730 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
731 let mut opts = self.read_options();
732 opts.set_total_order_seek(true);
738 self.iterator::<T>(column, opts, iter_mode).into_boxed()
739 }
740 (Some(prefix), Some(start)) => {
741 if !start.starts_with(prefix) {
744 return iter::empty().into_boxed();
745 }
746
747 let prefix = prefix.to_vec();
750 let iter_mode =
751 IteratorMode::From(start, convert_to_rocksdb_direction(direction));
752 self.iterator::<T>(column, self.read_options(), iter_mode)
753 .take_while(move |item| {
754 if let Ok(item) = item {
755 T::starts_with(item, prefix.as_slice())
756 } else {
757 true
758 }
759 })
760 .into_boxed()
761 }
762 }
763 }
764
765 #[cfg(feature = "backup")]
766 fn backup_engine<P: AsRef<Path> + ?Sized>(
767 backup_dir: &P,
768 ) -> DatabaseResult<rocksdb::backup::BackupEngine> {
769 use rocksdb::{
770 Env,
771 backup::{
772 BackupEngine,
773 BackupEngineOptions,
774 },
775 };
776
777 let backup_dir = backup_dir.as_ref().join(Description::name());
778 let backup_dir_path = backup_dir.as_path();
779
780 let mut backup_engine_options = BackupEngineOptions::new(backup_dir_path)
781 .map_err(|e| {
782 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
783 "Couldn't create backup engine options for path `{}`: {}",
784 backup_dir_path.display(),
785 e
786 ))
787 })?;
788
789 let cpu_number =
790 i32::try_from(num_cpus::get()).expect("The number of CPU can't exceed `i32`");
791
792 backup_engine_options.set_max_background_operations(cmp::max(1, cpu_number / 4));
793
794 let env = Env::new().map_err(|e| {
795 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
796 "Couldn't create environment for backup: {}",
797 e
798 ))
799 })?;
800
801 let backup_engine =
802 BackupEngine::open(&backup_engine_options, &env).map_err(|e| {
803 DatabaseError::BackupEngineInitError(anyhow::anyhow!(
804 "Couldn't open backup engine for path `{}`: {}",
805 backup_dir.display(),
806 e
807 ))
808 })?;
809
810 Ok(backup_engine)
811 }
812
813 #[cfg(feature = "backup")]
814 pub fn backup<P: AsRef<Path> + ?Sized>(
815 db_dir: &P,
816 backup_dir: &P,
817 ) -> DatabaseResult<()> {
818 let mut backup_engine = Self::backup_engine(backup_dir)?;
819
820 let db_config = DatabaseConfig {
821 cache_capacity: None,
822 max_fds: -1,
823 columns_policy: ColumnsPolicy::Lazy,
824 };
825
826 let db = Self::open_read_only(
827 db_dir,
828 enum_iterator::all::<Description::Column>().collect::<Vec<_>>(),
829 false,
830 db_config,
831 )?;
832
833 backup_engine
834 .create_new_backup_flush(&db.db, true)
835 .map_err(|e| {
836 DatabaseError::BackupError(anyhow::anyhow!(
837 "Couldn't create new backup for path `{}`: {}",
838 backup_dir.as_ref().display(),
839 e
840 ))
841 })?;
842
843 Ok(())
844 }
845
846 #[cfg(feature = "backup")]
848 pub fn restore<P: AsRef<Path> + ?Sized>(
849 db_dir: &P,
850 backup_dir: &P,
851 ) -> DatabaseResult<()> {
852 use rocksdb::backup::RestoreOptions;
853
854 let mut backup_engine = Self::backup_engine(backup_dir)?;
855 let restore_option = RestoreOptions::default();
856
857 let db_dir = db_dir.as_ref().join(Description::name());
858 let db_dir_path = db_dir.as_path();
859 backup_engine
861 .restore_from_latest_backup(db_dir_path, db_dir_path, &restore_option)
862 .map_err(|e| {
863 DatabaseError::RestoreError(anyhow::anyhow!(
864 "Couldn't restore from latest backup for path `{}`: {}",
865 db_dir_path.display(),
866 e
867 ))
868 })?;
869
870 Ok(())
871 }
872
873 pub fn shutdown(&self) {
874 self.db.cancel_all_background_work(false);
882
883 #[cfg(feature = "test-helpers")]
891 while Arc::strong_count(&self.db) > 1 {}
892 }
893}
894
895pub(crate) struct KeyOnly;
896
897impl ExtractItem for KeyOnly {
898 type Item = Vec<u8>;
899
900 fn extract_item<D>(
901 raw_iterator: &DBRawIteratorWithThreadMode<D>,
902 ) -> Option<Self::Item>
903 where
904 D: DBAccess,
905 {
906 raw_iterator.key().map(|key| key.to_vec())
907 }
908
909 fn size(item: &Self::Item) -> u64 {
910 item.len() as u64
911 }
912
913 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
914 item.starts_with(prefix)
915 }
916}
917
918pub(crate) struct KeyAndValue;
919
920impl ExtractItem for KeyAndValue {
921 type Item = (Vec<u8>, Value);
922
923 fn extract_item<D>(
924 raw_iterator: &DBRawIteratorWithThreadMode<D>,
925 ) -> Option<Self::Item>
926 where
927 D: DBAccess,
928 {
929 raw_iterator
930 .item()
931 .map(|(key, value)| (key.to_vec(), Value::from(value)))
932 }
933
934 fn size(item: &Self::Item) -> u64 {
935 item.0.len().saturating_add(item.1.len()) as u64
936 }
937
938 fn starts_with(item: &Self::Item, prefix: &[u8]) -> bool {
939 item.0.starts_with(prefix)
940 }
941}
942
943impl<Description> KeyValueInspect for RocksDb<Description>
944where
945 Description: DatabaseDescription,
946{
947 type Column = Description::Column;
948
949 fn size_of_value(
950 &self,
951 key: &[u8],
952 column: Self::Column,
953 ) -> StorageResult<Option<usize>> {
954 self.metrics.read_meter.inc();
955 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
956 column_metrics.map(|metric| metric.inc());
957
958 Ok(self
959 .db
960 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
961 .map_err(|e| DatabaseError::Other(e.into()))?
962 .map(|value| value.len()))
963 }
964
965 fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
966 self.metrics.read_meter.inc();
967 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
968 column_metrics.map(|metric| metric.inc());
969
970 let value = self
971 .db
972 .get_cf_opt(&self.cf(column), key, &self.read_options)
973 .map_err(|e| DatabaseError::Other(e.into()))?;
974
975 if let Some(value) = &value {
976 self.metrics.bytes_read.inc_by(value.len() as u64);
977 }
978
979 Ok(value.map(Arc::from))
980 }
981
982 fn read_exact(
983 &self,
984 key: &[u8],
985 column: Self::Column,
986 offset: usize,
987 buf: &mut [u8],
988 ) -> StorageResult<Result<usize, StorageReadError>> {
989 self.metrics.read_meter.inc();
990 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
991 column_metrics.map(|metric| metric.inc());
992
993 let Some(value) = self
994 .db
995 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
996 .map_err(|e| DatabaseError::Other(e.into()))?
997 else {
998 return Ok(Err(StorageReadError::KeyNotFound));
999 };
1000
1001 let bytes_len = value.len();
1002 let start = offset;
1003 let buf_len = buf.len();
1004 let end = offset.saturating_add(buf_len);
1005
1006 if end > bytes_len {
1007 return Ok(Err(StorageReadError::OutOfBounds));
1008 }
1009
1010 let starting_from_offset = &value[start..end];
1011 buf[..].copy_from_slice(starting_from_offset);
1012
1013 self.metrics
1014 .bytes_read
1015 .inc_by(starting_from_offset.len() as u64);
1016
1017 Ok(Ok(buf_len))
1018 }
1019
1020 fn read_zerofill(
1021 &self,
1022 key: &[u8],
1023 column: Self::Column,
1024 offset: usize,
1025 buf: &mut [u8],
1026 ) -> StorageResult<Result<usize, StorageReadError>> {
1027 self.metrics.read_meter.inc();
1028 let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
1029 column_metrics.map(|metric| metric.inc());
1030
1031 let Some(value) = self
1032 .db
1033 .get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
1034 .map_err(|e| DatabaseError::Other(e.into()))?
1035 else {
1036 return Ok(Err(StorageReadError::KeyNotFound));
1037 };
1038
1039 let bytes_len = value.len();
1040 let buf_len = buf.len();
1041
1042 let Some((_, after)) = value.split_at_checked(offset) else {
1043 return Ok(Err(StorageReadError::OutOfBounds));
1044 };
1045
1046 let (dst, rest) = buf.split_at_mut(buf_len.min(after.len()));
1047 dst.copy_from_slice(&after[..dst.len()]);
1048 rest.fill(0);
1049
1050 self.metrics.bytes_read.inc_by(dst.len() as u64);
1051
1052 Ok(Ok(bytes_len))
1053 }
1054}
1055
1056impl<Description> IterableStore for RocksDb<Description>
1057where
1058 Description: DatabaseDescription,
1059{
1060 fn iter_store(
1061 &self,
1062 column: Self::Column,
1063 prefix: Option<&[u8]>,
1064 start: Option<&[u8]>,
1065 direction: IterDirection,
1066 ) -> BoxedIter<'_, KVItem> {
1067 self._iter_store::<KeyAndValue>(column, prefix, start, direction)
1068 }
1069
1070 fn iter_store_keys(
1071 &self,
1072 column: Self::Column,
1073 prefix: Option<&[u8]>,
1074 start: Option<&[u8]>,
1075 direction: IterDirection,
1076 ) -> BoxedIter<'_, KeyItem> {
1077 self._iter_store::<KeyOnly>(column, prefix, start, direction)
1078 }
1079}
1080
1081impl<Description> RocksDb<Description>
1082where
1083 Description: DatabaseDescription,
1084{
1085 pub fn commit_changes<'a>(&self, changes: &'a StorageChanges) -> StorageResult<()> {
1086 let instant = std::time::Instant::now();
1087 let mut batch = WriteBatch::default();
1088 let mut conflict_finder = HashSet::<(&'a u32, &'a ReferenceBytesKey)>::new();
1089
1090 match changes {
1091 StorageChanges::Changes(changes) => {
1092 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1093 }
1094 StorageChanges::ChangesList(changes_list) => {
1095 for changes in changes_list {
1096 self._populate_batch(&mut batch, &mut conflict_finder, changes)?;
1097 }
1098 }
1099 }
1100
1101 self.db
1102 .write(batch)
1103 .map_err(|e| DatabaseError::Other(e.into()))?;
1104 self.metrics.database_commit_time.inc_by(
1106 u64::try_from(instant.elapsed().as_nanos())
1107 .expect("The commit shouldn't take longer than `u64`"),
1108 );
1109
1110 Ok(())
1111 }
1112
1113 fn _populate_batch<'a>(
1114 &self,
1115 batch: &mut WriteBatch,
1116 conflict_finder: &mut HashSet<(&'a u32, &'a ReferenceBytesKey)>,
1117 changes: &'a Changes,
1118 ) -> DatabaseResult<()> {
1119 for (column, ops) in changes {
1120 let cf = self.cf_u32(*column);
1121 let column_metrics = self.metrics.columns_write_statistic.get(column);
1122 for (key, op) in ops {
1123 self.metrics.write_meter.inc();
1124 column_metrics.map(|metric| metric.inc());
1125
1126 if !conflict_finder.insert((column, key)) {
1127 return Err(DatabaseError::ConflictingChanges {
1128 column: *column,
1129 key: key.clone(),
1130 });
1131 }
1132
1133 match op {
1134 WriteOperation::Insert(value) => {
1135 self.metrics.bytes_written.inc_by(value.len() as u64);
1136 batch.put_cf(&cf, key, value.as_ref());
1137 }
1138 WriteOperation::Remove => {
1139 batch.delete_cf(&cf, key);
1140 }
1141 }
1142 }
1143 }
1144 Ok(())
1145 }
1146}
1147
1148fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
1150 for byte in prefix.iter_mut().rev() {
1151 if let Some(new_byte) = byte.checked_add(1) {
1152 *byte = new_byte;
1153 return Some(prefix);
1154 }
1155 }
1156 None
1157}
1158
1159#[cfg(feature = "test-helpers")]
1160pub mod test_helpers {
1161 use super::*;
1162 use fuel_core_storage::{
1163 kv_store::KeyValueMutate,
1164 transactional::ReadTransaction,
1165 };
1166
1167 impl<Description> KeyValueMutate for RocksDb<Description>
1168 where
1169 Description: DatabaseDescription,
1170 {
1171 fn write(
1172 &mut self,
1173 key: &[u8],
1174 column: Self::Column,
1175 buf: &[u8],
1176 ) -> StorageResult<usize> {
1177 let mut transaction = self.read_transaction();
1178 let len = transaction.write(key, column, buf)?;
1179 let changes = transaction.into_changes();
1180 self.commit_changes(&StorageChanges::Changes(changes))?;
1181
1182 Ok(len)
1183 }
1184
1185 fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
1186 let mut transaction = self.read_transaction();
1187 transaction.delete(key, column)?;
1188 let changes = transaction.into_changes();
1189 self.commit_changes(&StorageChanges::Changes(changes))?;
1190 Ok(())
1191 }
1192 }
1193}
1194
1195#[allow(non_snake_case)]
1196#[cfg(test)]
1197mod tests {
1198 use super::*;
1199 use crate::database::database_description::on_chain::OnChain;
1200 use fuel_core_storage::{
1201 column::Column,
1202 kv_store::KeyValueMutate,
1203 };
1204 use std::collections::{
1205 BTreeMap,
1206 HashMap,
1207 };
1208 use tempfile::TempDir;
1209
1210 fn create_db() -> (RocksDb<OnChain>, TempDir) {
1211 let tmp_dir = TempDir::new().unwrap();
1212 (
1213 RocksDb::default_open(tmp_dir.path(), DatabaseConfig::config_for_tests())
1214 .unwrap(),
1215 tmp_dir,
1216 )
1217 }
1218
1219 #[test]
1220 fn open_new_columns() {
1221 let tmp_dir = TempDir::new().unwrap();
1222
1223 let old_columns =
1225 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1226 let database_with_old_columns = RocksDb::<OnChain>::open(
1227 tmp_dir.path(),
1228 old_columns.clone(),
1229 DatabaseConfig::config_for_tests(),
1230 )
1231 .expect("Failed to open database with old columns");
1232 drop(database_with_old_columns);
1233
1234 let mut new_columns = old_columns;
1236 new_columns.push(Column::ContractsAssets);
1237 new_columns.push(Column::Metadata);
1238 let database_with_new_columns = RocksDb::<OnChain>::open(
1239 tmp_dir.path(),
1240 new_columns,
1241 DatabaseConfig::config_for_tests(),
1242 )
1243 .map(|_| ());
1244
1245 assert_eq!(Ok(()), database_with_new_columns);
1247 }
1248
1249 #[test]
1250 fn can_put_and_read() {
1251 let key = vec![0xA, 0xB, 0xC];
1252
1253 let (mut db, _tmp) = create_db();
1254 let expected = Value::from([1, 2, 3]);
1255 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1256
1257 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
1258 }
1259
1260 #[test]
1261 fn put_returns_previous_value() {
1262 let key = vec![0xA, 0xB, 0xC];
1263
1264 let (mut db, _tmp) = create_db();
1265 let expected = Value::from([1, 2, 3]);
1266 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1267 let prev = db
1268 .replace(&key, Column::Metadata, Arc::new([2, 4, 6]))
1269 .unwrap();
1270
1271 assert_eq!(prev, Some(expected));
1272 }
1273
1274 #[test]
1275 fn delete_and_get() {
1276 let key = vec![0xA, 0xB, 0xC];
1277
1278 let (mut db, _tmp) = create_db();
1279 let expected = Value::from([1, 2, 3]);
1280 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1281 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1282
1283 db.delete(&key, Column::Metadata).unwrap();
1284 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1285 }
1286
1287 #[test]
1288 fn key_exists() {
1289 let key = vec![0xA, 0xB, 0xC];
1290
1291 let (mut db, _tmp) = create_db();
1292 let expected = Arc::new([1, 2, 3]);
1293 db.put(&key, Column::Metadata, expected).unwrap();
1294 assert!(db.exists(&key, Column::Metadata).unwrap());
1295 }
1296
1297 #[test]
1298 fn commit_changes_inserts() {
1299 let key = vec![0xA, 0xB, 0xC];
1300 let value = Value::from([1, 2, 3]);
1301
1302 let (db, _tmp) = create_db();
1303 let ops = vec![(
1304 Column::Metadata.id(),
1305 BTreeMap::from_iter(vec![(
1306 key.clone().into(),
1307 WriteOperation::Insert(value.clone()),
1308 )]),
1309 )];
1310
1311 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1312 .unwrap();
1313 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
1314 }
1315
1316 #[test]
1317 fn commit_changes_removes() {
1318 let key = vec![0xA, 0xB, 0xC];
1319 let value = Arc::new([1, 2, 3]);
1320
1321 let (mut db, _tmp) = create_db();
1322 db.put(&key, Column::Metadata, value).unwrap();
1323
1324 let ops = vec![(
1325 Column::Metadata.id(),
1326 BTreeMap::from_iter(vec![(key.clone().into(), WriteOperation::Remove)]),
1327 )];
1328 db.commit_changes(&StorageChanges::Changes(HashMap::from_iter(ops)))
1329 .unwrap();
1330
1331 assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
1332 }
1333
1334 #[test]
1335 fn can_use_unit_value() {
1336 let key = vec![0x00];
1337
1338 let (mut db, _tmp) = create_db();
1339 let expected = Value::from([]);
1340 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1341
1342 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1343
1344 assert!(db.exists(&key, Column::Metadata).unwrap());
1345
1346 assert_eq!(
1347 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1348 .collect::<Result<Vec<_>, _>>()
1349 .unwrap()[0],
1350 (key.clone(), expected.clone())
1351 );
1352
1353 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1354
1355 assert!(!db.exists(&key, Column::Metadata).unwrap());
1356 }
1357
1358 #[test]
1359 fn can_use_unit_key() {
1360 let key: Vec<u8> = Vec::with_capacity(0);
1361
1362 let (mut db, _tmp) = create_db();
1363 let expected = Value::from([1, 2, 3]);
1364 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1365
1366 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1367
1368 assert!(db.exists(&key, Column::Metadata).unwrap());
1369
1370 assert_eq!(
1371 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1372 .collect::<Result<Vec<_>, _>>()
1373 .unwrap()[0],
1374 (key.clone(), expected.clone())
1375 );
1376
1377 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1378
1379 assert!(!db.exists(&key, Column::Metadata).unwrap());
1380 }
1381
1382 #[test]
1383 fn can_use_unit_key_and_value() {
1384 let key: Vec<u8> = Vec::with_capacity(0);
1385
1386 let (mut db, _tmp) = create_db();
1387 let expected = Value::from([]);
1388 db.put(&key, Column::Metadata, expected.clone()).unwrap();
1389
1390 assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
1391
1392 assert!(db.exists(&key, Column::Metadata).unwrap());
1393
1394 assert_eq!(
1395 db.iter_store(Column::Metadata, None, None, IterDirection::Forward)
1396 .collect::<Result<Vec<_>, _>>()
1397 .unwrap()[0],
1398 (key.clone(), expected.clone())
1399 );
1400
1401 assert_eq!(db.take(&key, Column::Metadata).unwrap().unwrap(), expected);
1402
1403 assert!(!db.exists(&key, Column::Metadata).unwrap());
1404 }
1405
1406 #[test]
1407 fn open_primary_db_second_time_fails() {
1408 let (_primary_db, tmp_dir) = create_db();
1410
1411 let columns = enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1413 .collect::<Vec<_>>();
1414 let result = RocksDb::<OnChain>::open(
1415 tmp_dir.path(),
1416 columns,
1417 DatabaseConfig::config_for_tests(),
1418 );
1419
1420 assert!(result.is_err());
1422 }
1423
1424 #[test]
1425 fn open_second_read_only_db() {
1426 let (_primary_db, tmp_dir) = create_db();
1428
1429 let old_columns =
1431 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1432 let result = RocksDb::<OnChain>::open_read_only(
1433 tmp_dir.path(),
1434 old_columns.clone(),
1435 false,
1436 DatabaseConfig::config_for_tests(),
1437 )
1438 .map(|_| ());
1439
1440 assert_eq!(Ok(()), result);
1442 }
1443
1444 #[test]
1445 fn open_secondary_db() {
1446 let (_primary_db, tmp_dir) = create_db();
1448 let secondary_temp = TempDir::new().unwrap();
1449
1450 let old_columns =
1452 vec![Column::Coins, Column::Messages, Column::UploadedBytecodes];
1453 let result = RocksDb::<OnChain>::open_secondary(
1454 tmp_dir.path(),
1455 secondary_temp.path(),
1456 old_columns.clone(),
1457 DatabaseConfig::config_for_tests(),
1458 )
1459 .map(|_| ());
1460
1461 assert_eq!(Ok(()), result);
1463 }
1464
1465 #[test]
1466 fn snapshot_allows_get_entry_after_it_was_removed() {
1467 let (mut db, _tmp) = create_db();
1468 let value = Value::from([1, 2, 3]);
1469
1470 let key_1 = [1; 32];
1472 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1473 let snapshot = db.create_snapshot();
1474
1475 db.delete(&key_1, Column::Metadata).unwrap();
1477
1478 let db_get = db.get(&key_1, Column::Metadata).unwrap();
1480 assert!(db_get.is_none());
1481
1482 let snapshot_get = snapshot.get(&key_1, Column::Metadata).unwrap();
1483 assert_eq!(snapshot_get, Some(value));
1484 }
1485
1486 #[test]
1487 fn snapshot_allows_correct_iteration_even_after_all_elements_where_removed() {
1488 let (mut db, _tmp) = create_db();
1489 let value = Value::from([1, 2, 3]);
1490
1491 let key_1 = vec![1; 32];
1493 let key_2 = vec![2; 32];
1494 let key_3 = vec![3; 32];
1495 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1496 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1497 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1498 let snapshot = db.create_snapshot();
1499
1500 db.delete(&key_1, Column::Metadata).unwrap();
1502 db.delete(&key_2, Column::Metadata).unwrap();
1503 db.delete(&key_3, Column::Metadata).unwrap();
1504
1505 let db_iter = db
1507 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1508 .collect::<Vec<_>>();
1509 assert!(db_iter.is_empty());
1510
1511 let snapshot_iter = snapshot
1512 .iter_store(Column::Metadata, None, None, IterDirection::Forward)
1513 .collect::<Vec<_>>();
1514 assert_eq!(
1515 snapshot_iter,
1516 vec![
1517 Ok((key_1, value.clone())),
1518 Ok((key_2, value.clone())),
1519 Ok((key_3, value))
1520 ]
1521 );
1522 }
1523
1524 #[test]
1525 fn drop_snapshot_after_dropping_main_database_shouldn_panic() {
1526 let (db, _tmp) = create_db();
1527
1528 let snapshot = db.create_snapshot();
1530
1531 drop(db);
1533
1534 drop(snapshot);
1536 }
1537
1538 #[test]
1539 fn open__opens_subset_of_columns_after_opening_all_columns() {
1540 let (first_open_with_all_columns, tmp_dir) = create_db();
1542
1543 drop(first_open_with_all_columns);
1545 let part_of_columns =
1546 enum_iterator::all::<<OnChain as DatabaseDescription>::Column>()
1547 .skip(1)
1548 .collect::<Vec<_>>();
1549 let open_with_part_of_columns = RocksDb::<OnChain>::open(
1550 tmp_dir.path(),
1551 part_of_columns,
1552 DatabaseConfig::config_for_tests(),
1553 );
1554
1555 let _ = open_with_part_of_columns
1557 .expect("Should open the database with shorter number of columns");
1558 }
1559
1560 #[test]
1561 fn iter_store__reverse_iterator__no_target_prefix() {
1562 let (mut db, _tmp) = create_db();
1564 let value = Value::from([]);
1565 let key_1 = [1, 1];
1566 let key_2 = [2, 2];
1567 let key_3 = [9, 3];
1568 let key_4 = [10, 0];
1569 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1570 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1571 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1572 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1573
1574 let db_iter = db
1576 .iter_store(
1577 Column::Metadata,
1578 Some(vec![5].as_slice()),
1579 None,
1580 IterDirection::Reverse,
1581 )
1582 .map(|item| item.map(|(key, _)| key))
1583 .collect::<Vec<_>>();
1584
1585 assert_eq!(db_iter, vec![]);
1587 }
1588
1589 #[test]
1590 fn iter_store__reverse_iterator__target_prefix_at_the_middle() {
1591 let (mut db, _tmp) = create_db();
1593 let value = Value::from([]);
1594 let key_1 = [1, 1];
1595 let key_2 = [2, 2];
1596 let key_3 = [2, 3];
1597 let key_4 = [10, 0];
1598 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1599 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1600 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1601 db.put(&key_4, Column::Metadata, value.clone()).unwrap();
1602
1603 let db_iter = db
1605 .iter_store(
1606 Column::Metadata,
1607 Some(vec![2].as_slice()),
1608 None,
1609 IterDirection::Reverse,
1610 )
1611 .map(|item| item.map(|(key, _)| key))
1612 .collect::<Vec<_>>();
1613
1614 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1616 }
1617
1618 #[test]
1619 fn iter_store__reverse_iterator__target_prefix_at_the_end() {
1620 let (mut db, _tmp) = create_db();
1622 let value = Value::from([]);
1623 let key_1 = [1, 1];
1624 let key_2 = [2, 2];
1625 let key_3 = [2, 3];
1626 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1627 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1628 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1629
1630 let db_iter = db
1632 .iter_store(
1633 Column::Metadata,
1634 Some(vec![2].as_slice()),
1635 None,
1636 IterDirection::Reverse,
1637 )
1638 .map(|item| item.map(|(key, _)| key))
1639 .collect::<Vec<_>>();
1640
1641 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1643 }
1644
1645 #[test]
1646 fn iter_store__reverse_iterator__target_prefix_at_the_end__overflow() {
1647 let (mut db, _tmp) = create_db();
1649 let value = Value::from([]);
1650 let key_1 = [1, 1];
1651 let key_2 = [255, 254];
1652 let key_3 = [255, 255];
1653 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1654 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1655 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1656
1657 let db_iter = db
1659 .iter_store(
1660 Column::Metadata,
1661 Some(vec![255].as_slice()),
1662 None,
1663 IterDirection::Reverse,
1664 )
1665 .map(|item| item.map(|(key, _)| key))
1666 .collect::<Vec<_>>();
1667
1668 assert_eq!(db_iter, vec![Ok(key_3.to_vec()), Ok(key_2.to_vec())]);
1670 }
1671
1672 #[test]
1673 fn clear_table__keeps_column_accessible_after_marking_for_deletion() {
1674 let (mut db, _tmp) = create_db();
1676 let value = Value::from([]);
1677 let key_1 = [1, 1];
1678 let key_2 = [255, 254];
1679 let key_3 = [255, 255];
1680 db.put(&key_1, Column::Metadata, value.clone()).unwrap();
1681 db.put(&key_2, Column::Metadata, value.clone()).unwrap();
1682 db.put(&key_3, Column::Metadata, value.clone()).unwrap();
1683
1684 db.clear_table(Column::Metadata).unwrap();
1686
1687 let db_iter = db
1689 .iter_store_keys(Column::Metadata, None, None, IterDirection::Forward)
1690 .collect::<Vec<_>>();
1691 assert_eq!(db_iter, vec![]);
1692 }
1693}