1use crate::{
2 database::{
3 Error as DatabaseError,
4 Result as DatabaseResult,
5 database_description::{
6 DatabaseDescription,
7 DatabaseHeight,
8 },
9 },
10 state::{
11 ColumnType,
12 HeightType,
13 IterableKeyValueView,
14 KeyValueView,
15 TransactableStorage,
16 historical_rocksdb::{
17 description::{
18 Column,
19 Historical,
20 historical_duplicate_column_id,
21 },
22 view_at_height::ViewAtHeight,
23 },
24 iterable_key_value_view::IterableKeyValueViewWrapper,
25 key_value_view::KeyValueViewWrapper,
26 rocks_db::RocksDb,
27 },
28};
29use fuel_core_storage::{
30 Error as StorageError,
31 Result as StorageResult,
32 StorageAsMut,
33 StorageAsRef,
34 StorageReadError,
35 iter::{
36 BoxedIter,
37 IterDirection,
38 IterableStore,
39 IteratorOverTable,
40 },
41 kv_store::{
42 KVItem,
43 KeyValueInspect,
44 Value,
45 WriteOperation,
46 },
47 not_found,
48 structured_storage::TableWithBlueprint,
49 transactional::{
50 Changes,
51 ConflictPolicy,
52 ReadTransaction,
53 StorageChanges,
54 StorageTransaction,
55 },
56};
57use itertools::Itertools;
58use modifications_history::{
59 ModificationsHistoryV1,
60 ModificationsHistoryV2,
61};
62use serde::{
63 Deserialize,
64 Serialize,
65};
66use std::{
67 num::NonZeroU64,
68 path::Path,
69};
70
71use super::rocks_db::DatabaseConfig;
72
73pub mod description;
74pub mod modifications_history;
75pub mod view_at_height;
76
77#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
78pub enum StateRewindPolicy {
80 #[default]
81 NoRewind,
83 RewindFullRange,
85 RewindRange { size: NonZeroU64 },
88}
89
90#[derive(Debug)]
92pub struct HistoricalRocksDB<Description> {
93 state_rewind_policy: StateRewindPolicy,
95 db: RocksDb<Historical<Description>>,
97 has_v1_history: core::sync::atomic::AtomicBool,
99}
100
101impl<Description> HistoricalRocksDB<Description>
102where
103 Description: DatabaseDescription,
104{
105 pub fn new(
106 db: RocksDb<Historical<Description>>,
107 state_rewind_policy: StateRewindPolicy,
108 ) -> DatabaseResult<Self> {
109 let has_v1_history = db
110 .iter_all::<ModificationsHistoryV1<Description>>(None)
111 .next()
112 .is_some();
113
114 Ok(Self {
115 state_rewind_policy,
116 db,
117 has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
118 })
119 }
120
121 pub fn default_open<P: AsRef<Path>>(
122 path: P,
123 state_rewind_policy: StateRewindPolicy,
124 database_config: DatabaseConfig,
125 ) -> DatabaseResult<Self> {
126 let db = RocksDb::<Historical<Description>>::default_open(path, database_config)?;
127 let has_v1_history = db
128 .iter_all::<ModificationsHistoryV1<Description>>(None)
129 .next()
130 .is_some();
131 Ok(Self {
132 state_rewind_policy,
133 db,
134 has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
135 })
136 }
137
138 fn reverse_history_changes(&self, changes: &Changes) -> StorageResult<Changes> {
139 let mut reverse_changes = Changes::default();
140
141 for (column, column_changes) in changes {
142 let results = self.db.multi_get(*column, column_changes.keys())?;
143
144 let entry = reverse_changes
145 .entry(*column)
146 .or_insert_with(Default::default);
147
148 for (was, (key, became)) in results.into_iter().zip(column_changes.iter()) {
149 match (was, became) {
150 (None, WriteOperation::Remove) => {
151 }
153 (None, WriteOperation::Insert(_)) => {
154 entry.insert(key.clone(), WriteOperation::Remove);
155 }
156 (Some(old_value), WriteOperation::Remove) => {
157 entry.insert(
158 key.clone(),
159 WriteOperation::Insert(old_value.into()),
160 );
161 }
162 (Some(old_value), WriteOperation::Insert(new_value)) => {
163 if *old_value != **new_value {
164 entry.insert(
165 key.clone(),
166 WriteOperation::Insert(old_value.into()),
167 );
168 }
169 }
170 }
171 }
172 }
173 Ok(reverse_changes)
174 }
175
176 pub fn latest_view(&self) -> RocksDb<Description> {
177 self.db.create_snapshot_generic()
178 }
179
180 pub fn create_view_at(
184 &self,
185 height: &Description::Height,
186 ) -> StorageResult<ViewAtHeight<Description>> {
187 let height_for_the_state = height.as_u64();
192 let rollback_height = height_for_the_state.saturating_add(1);
193 let has_v1_history = self.has_v1_history();
194 let tx = self.db.read_transaction();
195 let mut contains = multiversion_contains(&tx, rollback_height, has_v1_history)?;
196
197 if !contains {
203 contains = multiversion_contains(&tx, height_for_the_state, has_v1_history)?;
204 }
205
206 if !contains {
207 return Err(DatabaseError::NoHistoryForRequestedHeight {
208 requested_height: height_for_the_state,
209 }
210 .into());
211 }
212 let latest_view = self.db.create_snapshot_generic::<Historical<Description>>();
213
214 Ok(ViewAtHeight::new(rollback_height, latest_view))
215 }
216
217 fn store_modifications_history<T>(
218 &self,
219 storage_transaction: &mut StorageTransaction<T>,
220 height: &Description::Height,
221 ) -> StorageResult<()>
222 where
223 T: KeyValueInspect<Column = Column<Description>>,
224 {
225 if self.state_rewind_policy == StateRewindPolicy::NoRewind {
226 return Ok(());
227 }
228 let height_u64 = height.as_u64();
229
230 let reverse_changes =
231 self.reverse_history_changes(storage_transaction.changes())?;
232
233 cleanup_old_changes(
234 self,
235 &height_u64,
236 storage_transaction,
237 &self.state_rewind_policy,
238 )?;
239
240 let old_changes = storage_transaction
241 .storage_as_mut::<ModificationsHistoryV2<Description>>()
242 .replace(&height_u64, &reverse_changes)?;
243
244 if let Some(old_changes) = old_changes {
245 tracing::warn!(
246 "Historical database committed twice the same height: {:?}",
247 height
248 );
249 remove_historical_modifications(
250 &height_u64,
251 storage_transaction,
252 &old_changes,
253 )?;
254 }
255
256 let historical_changes = reverse_changes
257 .into_iter()
258 .map(|(column, reverse_column_changes)| {
259 let historical_column_changes = reverse_column_changes
260 .into_iter()
261 .map(|(key, reverse_operation)| {
262 let height_key = height_key(&key, &height_u64).into();
263 let operation =
266 WriteOperation::Insert(serialize(&reverse_operation)?);
267 Ok::<_, StorageError>((height_key, operation))
268 })
269 .try_collect()?;
270
271 let historical_duplicate_column = historical_duplicate_column_id(column);
272 Ok::<_, StorageError>((
273 historical_duplicate_column,
274 historical_column_changes,
275 ))
276 })
277 .try_collect()?;
278
279 StorageTransaction::transaction(
282 storage_transaction,
283 ConflictPolicy::Overwrite,
284 historical_changes,
285 )
286 .commit()?;
287 Ok(())
288 }
289
290 fn remove_v1_entries(&self) -> StorageResult<()> {
291 if !self.has_v1_history() {
292 return Ok(())
293 }
294
295 self.db
296 .clear_table(ModificationsHistoryV1::<Description>::column())?;
297 self.has_v1_history
298 .store(false, core::sync::atomic::Ordering::Release);
299
300 Ok(())
301 }
302
303 #[cfg(test)]
304 fn multiversion_changes_heights(
305 &self,
306 direction: IterDirection,
307 has_v1_history: bool,
308 ) -> (Option<StorageResult<u64>>, Option<StorageResult<u64>>) {
309 let v2_changes = self
310 .db
311 .iter_all_keys::<ModificationsHistoryV2<Description>>(Some(direction))
312 .next();
313 let v1_changes = has_v1_history
314 .then(|| {
315 self.db
316 .iter_all_keys::<ModificationsHistoryV1<Description>>(Some(direction))
317 .next()
318 })
319 .flatten();
320
321 (v2_changes, v1_changes)
322 }
323
324 #[cfg(test)]
325 fn rollback_last_block(&self) -> StorageResult<u64> {
326 let has_v1_history = self.has_v1_history();
327
328 let (v2_latest_height, v1_latest_height) =
329 self.multiversion_changes_heights(IterDirection::Reverse, has_v1_history);
330
331 let latest_height = match (v2_latest_height, v1_latest_height) {
332 (None, None) => Err(DatabaseError::ReachedEndOfHistory)?,
333 (Some(Ok(v1)), Some(Ok(v2))) => v1.max(v2),
334 (_, Some(v1_res)) => v1_res?,
335 (Some(v2_res), _) => v2_res?,
336 };
337
338 self.rollback_block_to(latest_height)?;
339
340 Ok(latest_height)
341 }
342
343 fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
344 let mut storage_transaction = self.db.read_transaction();
345
346 let last_changes = multiversion_take(
347 &mut storage_transaction,
348 height_to_rollback,
349 self.has_v1_history(),
350 )?
351 .ok_or(not_found!(ModificationsHistoryV2<Description>))?;
352
353 remove_historical_modifications(
354 &height_to_rollback,
355 &mut storage_transaction,
356 &last_changes,
357 )?;
358
359 StorageTransaction::transaction(
360 &mut storage_transaction,
361 ConflictPolicy::Overwrite,
362 last_changes,
363 )
364 .commit()?;
365
366 self.db
367 .commit_changes(&storage_transaction.into_changes().into())?;
368
369 Ok(())
370 }
371
372 fn has_v1_history(&self) -> bool {
373 use core::sync::atomic::Ordering;
374
375 self.has_v1_history.load(Ordering::Acquire)
376 }
377}
378
379fn multiversion_contains<Description, T>(
380 storage_transaction: &StorageTransaction<T>,
381 height: u64,
382 has_v1_history: bool,
383) -> StorageResult<bool>
384where
385 Description: DatabaseDescription,
386 T: KeyValueInspect<Column = Column<Description>>,
387{
388 let contains_v2 = storage_transaction
389 .storage_as_ref::<ModificationsHistoryV2<Description>>()
390 .contains_key(&height)?;
391
392 if !contains_v2 && has_v1_history {
393 let contains_v1 = storage_transaction
394 .storage_as_ref::<ModificationsHistoryV1<Description>>()
395 .contains_key(&height)?;
396 Ok(contains_v1)
397 } else {
398 Ok(contains_v2)
399 }
400}
401
402fn multiversion_take<Description, T>(
405 storage_transaction: &mut StorageTransaction<T>,
406 height: u64,
407 has_v1_history: bool,
408) -> StorageResult<Option<Changes>>
409where
410 Description: DatabaseDescription,
411 T: KeyValueInspect<Column = Column<Description>>,
412{
413 let v2_last_changes = storage_transaction
414 .storage_as_mut::<ModificationsHistoryV2<Description>>()
415 .take(&height)?;
416
417 if v2_last_changes.is_none() && has_v1_history {
418 let v1_last_changes = storage_transaction
419 .storage_as_mut::<ModificationsHistoryV1<Description>>()
420 .take(&height)?;
421 Ok(v1_last_changes)
422 } else {
423 Ok(v2_last_changes)
424 }
425}
426
427fn cleanup_old_changes<Description, T>(
428 db: &HistoricalRocksDB<Description>,
429 height: &u64,
430 storage_transaction: &mut StorageTransaction<T>,
431 state_rewind_policy: &StateRewindPolicy,
432) -> StorageResult<()>
433where
434 Description: DatabaseDescription,
435 T: KeyValueInspect<Column = Column<Description>>,
436{
437 match state_rewind_policy {
438 StateRewindPolicy::NoRewind => {
439 }
441 StateRewindPolicy::RewindFullRange => {
442 }
444 StateRewindPolicy::RewindRange { size } => {
445 let old_height = height.saturating_sub(size.get());
446
447 let old_changes = storage_transaction
448 .storage_as_mut::<ModificationsHistoryV2<Description>>()
449 .take(&old_height)?;
450
451 if let Some(old_changes) = old_changes {
452 remove_historical_modifications(
453 &old_height,
454 storage_transaction,
455 &old_changes,
456 )?;
457 db.remove_v1_entries()?;
459 }
460 }
461 }
462 Ok(())
463}
464
465fn remove_historical_modifications<Description, T>(
466 old_height: &u64,
467 storage_transaction: &mut StorageTransaction<T>,
468 reverse_changes: &Changes,
469) -> StorageResult<()>
470where
471 Description: DatabaseDescription,
472 T: KeyValueInspect<Column = Column<Description>>,
473{
474 let changes = reverse_changes
475 .iter()
476 .map(|(column, column_changes)| {
477 let historical_column_changes = column_changes
478 .keys()
479 .map(|key| {
480 let height_key = height_key(key, old_height).into();
481 let operation = WriteOperation::Remove;
482 (height_key, operation)
483 })
484 .collect();
485 let historical_duplicate_column = historical_duplicate_column_id(*column);
486 (historical_duplicate_column, historical_column_changes)
487 })
488 .collect();
489
490 StorageTransaction::transaction(
491 storage_transaction,
492 ConflictPolicy::Overwrite,
493 changes,
494 )
495 .commit()?;
496
497 Ok(())
498}
499
500impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
501where
502 Description: DatabaseDescription,
503{
504 type Column = Description::Column;
505
506 fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult<bool> {
507 self.db.exists(key, Column::OriginalColumn(column))
508 }
509
510 fn size_of_value(
511 &self,
512 key: &[u8],
513 column: Self::Column,
514 ) -> StorageResult<Option<usize>> {
515 self.db.size_of_value(key, Column::OriginalColumn(column))
516 }
517
518 fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
519 self.db.get(key, Column::OriginalColumn(column))
520 }
521
522 fn read_exact(
523 &self,
524 key: &[u8],
525 column: Self::Column,
526 offset: usize,
527 buf: &mut [u8],
528 ) -> StorageResult<Result<usize, StorageReadError>> {
529 self.db
530 .read_exact(key, Column::OriginalColumn(column), offset, buf)
531 }
532
533 fn read_zerofill(
534 &self,
535 key: &[u8],
536 column: Self::Column,
537 offset: usize,
538 buf: &mut [u8],
539 ) -> StorageResult<Result<usize, StorageReadError>> {
540 self.db
541 .read_zerofill(key, Column::OriginalColumn(column), offset, buf)
542 }
543}
544
545impl<Description> IterableStore for HistoricalRocksDB<Description>
546where
547 Description: DatabaseDescription,
548{
549 fn iter_store(
550 &self,
551 column: Self::Column,
552 prefix: Option<&[u8]>,
553 start: Option<&[u8]>,
554 direction: IterDirection,
555 ) -> BoxedIter<'_, KVItem> {
556 self.db
557 .iter_store(Column::OriginalColumn(column), prefix, start, direction)
558 }
559
560 fn iter_store_keys(
561 &self,
562 column: Self::Column,
563 prefix: Option<&[u8]>,
564 start: Option<&[u8]>,
565 direction: IterDirection,
566 ) -> BoxedIter<'_, fuel_core_storage::kv_store::KeyItem> {
567 self.db
568 .iter_store_keys(Column::OriginalColumn(column), prefix, start, direction)
569 }
570}
571
572impl<Description> TransactableStorage<Description::Height>
573 for HistoricalRocksDB<Description>
574where
575 Description: DatabaseDescription,
576{
577 fn commit_changes(
578 &self,
579 height: Option<Description::Height>,
580 mut changes: StorageChanges,
581 ) -> StorageResult<()> {
582 if let Some(height) = height
585 && self.state_rewind_policy != StateRewindPolicy::NoRewind
586 {
587 let all_changes = match changes {
588 StorageChanges::Changes(changes) => changes,
589 StorageChanges::ChangesList(list) => list.into_iter().flatten().collect(),
590 };
591 let mut storage_transaction = StorageTransaction::transaction(
592 &self.db,
593 ConflictPolicy::Overwrite,
594 all_changes,
595 );
596 self.store_modifications_history(&mut storage_transaction, &height)?;
597 changes = StorageChanges::Changes(storage_transaction.into_changes());
598 }
599
600 self.db.commit_changes(&changes)?;
601
602 Ok(())
603 }
604
605 fn view_at_height(
606 &self,
607 height: &Description::Height,
608 ) -> StorageResult<KeyValueView<ColumnType<Description>, HeightType<Description>>>
609 {
610 let view = self.create_view_at(height)?;
611 Ok(KeyValueView::from_storage_and_metadata(
612 KeyValueViewWrapper::new(view),
613 Some(*height),
614 ))
615 }
616
617 fn latest_view(
618 &self,
619 ) -> StorageResult<
620 IterableKeyValueView<ColumnType<Description>, HeightType<Description>>,
621 > {
622 let view = self.latest_view();
623 Ok(IterableKeyValueView::from_storage_and_metadata(
624 IterableKeyValueViewWrapper::new(view),
625 None,
626 ))
627 }
628
629 fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> {
630 self.rollback_block_to(height.as_u64())
631 }
632
633 fn shutdown(&self) {
634 self.db.shutdown()
635 }
636}
637
638pub fn height_key(key: &[u8], height: &u64) -> Vec<u8> {
639 let mut bytes = Vec::with_capacity(key.len().saturating_add(8));
640 let height_bytes = height.to_be_bytes();
641 bytes.extend_from_slice(key);
642 bytes.extend_from_slice(&height_bytes);
643 bytes
644}
645
646pub fn serialize<T>(t: &T) -> StorageResult<Value>
647where
648 T: Serialize + ?Sized,
649{
650 Ok(postcard::to_allocvec(&t)
651 .map_err(|err| StorageError::Codec(err.into()))?
652 .into())
653}
654
655pub fn deserialize<'a, T>(bytes: &'a [u8]) -> StorageResult<T>
656where
657 T: Deserialize<'a>,
658{
659 postcard::from_bytes(bytes).map_err(|err| StorageError::Codec(err.into()))
660}
661
662#[cfg(test)]
663#[allow(non_snake_case)]
664#[allow(clippy::cast_possible_truncation)]
665mod tests {
666 use super::*;
667 use crate::database::database_description::on_chain::OnChain;
668 use fuel_core_storage::{
669 ContractsAssetKey,
670 StorageAsMut,
671 StorageAsRef,
672 tables::ContractsAssets,
673 transactional::{
674 IntoTransaction,
675 ReadTransaction,
676 },
677 };
678
679 #[test]
680 fn test_height_key() {
681 let key = b"key";
682 let height = 42;
683 let expected = b"key\x00\x00\x00\x00\x00\x00\x00\x2a";
684 assert_eq!(height_key(key, &height), expected);
685 }
686
687 fn key() -> ContractsAssetKey {
688 ContractsAssetKey::new(&[123; 32].into(), &[213; 32].into())
689 }
690
691 #[test]
692 fn historical_rocksdb_read_original_database_works() {
693 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
695 let historical_rocks_db =
696 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
697
698 let mut transaction = historical_rocks_db.read_transaction();
700 transaction
701 .storage_as_mut::<ContractsAssets>()
702 .insert(&key(), &123)
703 .unwrap();
704 historical_rocks_db
705 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
706 .unwrap();
707
708 let mut transaction = historical_rocks_db.read_transaction();
710 transaction
711 .storage_as_mut::<ContractsAssets>()
712 .insert(&key(), &321)
713 .unwrap();
714 historical_rocks_db
715 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
716 .unwrap();
717
718 let read_view = historical_rocks_db.read_transaction();
720 let latest_balance = read_view
721 .storage_as_ref::<ContractsAssets>()
722 .get(&key())
723 .unwrap()
724 .unwrap()
725 .into_owned();
726
727 assert_eq!(latest_balance, 321);
729 }
730
731 #[test]
732 fn historical_rocksdb_read_latest_view_works() {
733 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
735 let historical_rocks_db =
736 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
737
738 let mut transaction = historical_rocks_db.read_transaction();
740 transaction
741 .storage_as_mut::<ContractsAssets>()
742 .insert(&key(), &123)
743 .unwrap();
744 historical_rocks_db
745 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
746 .unwrap();
747
748 let mut transaction = historical_rocks_db.read_transaction();
750 transaction
751 .storage_as_mut::<ContractsAssets>()
752 .insert(&key(), &321)
753 .unwrap();
754 historical_rocks_db
755 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
756 .unwrap();
757
758 let latest_view = historical_rocks_db.latest_view().into_transaction();
760 let latest_balance = latest_view
761 .storage_as_ref::<ContractsAssets>()
762 .get(&key())
763 .unwrap()
764 .unwrap()
765 .into_owned();
766
767 assert_eq!(latest_balance, 321);
769 }
770
771 #[test]
772 fn state_rewind_policy__no_rewind__create_view_at__fails() {
773 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
775 let historical_rocks_db =
776 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
777
778 let mut transaction = historical_rocks_db.read_transaction();
779 transaction
780 .storage_as_mut::<ContractsAssets>()
781 .insert(&key(), &123)
782 .unwrap();
783 historical_rocks_db
784 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
785 .unwrap();
786
787 let view_at_height_1 =
789 historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
790
791 assert_eq!(
793 view_at_height_1,
794 Err(DatabaseError::NoHistoryForRequestedHeight {
795 requested_height: 1,
796 }
797 .into())
798 );
799 }
800
801 #[test]
802 fn state_rewind_policy__no_rewind__rollback__fails() {
803 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
805 let historical_rocks_db =
806 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
807
808 let mut transaction = historical_rocks_db.read_transaction();
809 transaction
810 .storage_as_mut::<ContractsAssets>()
811 .insert(&key(), &123)
812 .unwrap();
813 historical_rocks_db
814 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
815 .unwrap();
816
817 let result = historical_rocks_db.rollback_last_block();
819
820 assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
822 }
823
824 #[test]
825 fn state_rewind_policy__rewind_range_1__cleanup_in_range_works() {
826 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
828 let historical_rocks_db = HistoricalRocksDB::new(
829 rocks_db,
830 StateRewindPolicy::RewindRange {
831 size: NonZeroU64::new(1).unwrap(),
832 },
833 )
834 .unwrap();
835
836 let mut transaction = historical_rocks_db.read_transaction();
837 transaction
838 .storage_as_mut::<ContractsAssets>()
839 .insert(&key(), &123)
840 .unwrap();
841 historical_rocks_db
842 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
843 .unwrap();
844
845 let mut transaction = historical_rocks_db.read_transaction();
847 transaction
848 .storage_as_mut::<ContractsAssets>()
849 .insert(&key(), &321)
850 .unwrap();
851 historical_rocks_db
852 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
853 .unwrap();
854
855 let view_at_height_1 =
857 historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
858 let view_at_height_0 =
859 historical_rocks_db.create_view_at(&0u32.into()).map(|_| ());
860 assert_eq!(view_at_height_1, Ok(()));
861 assert_eq!(
862 view_at_height_0,
863 Err(DatabaseError::NoHistoryForRequestedHeight {
864 requested_height: 0,
865 }
866 .into())
867 );
868 }
869
870 #[test]
871 fn state_rewind_policy__rewind_range_1__rollback_works() {
872 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
874 let historical_rocks_db = HistoricalRocksDB::new(
875 rocks_db,
876 StateRewindPolicy::RewindRange {
877 size: NonZeroU64::new(1).unwrap(),
878 },
879 )
880 .unwrap();
881
882 let mut transaction = historical_rocks_db.read_transaction();
883 transaction
884 .storage_as_mut::<ContractsAssets>()
885 .insert(&key(), &123)
886 .unwrap();
887 historical_rocks_db
888 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
889 .unwrap();
890 let entries = historical_rocks_db
891 .db
892 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
893 .collect::<Vec<_>>();
894 assert_eq!(entries.len(), 1);
895
896 let result = historical_rocks_db.rollback_last_block();
898
899 assert_eq!(result, Ok(1));
901 let entries = historical_rocks_db
902 .db
903 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
904 .collect::<Vec<_>>();
905 assert_eq!(entries.len(), 0);
906 }
907
908 #[test]
909 fn state_rewind_policy__rewind_range_1__rollback_uses_v2() {
910 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
912 let historical_rocks_db = HistoricalRocksDB::new(
913 rocks_db,
914 StateRewindPolicy::RewindRange {
915 size: NonZeroU64::new(1).unwrap(),
916 },
917 )
918 .unwrap();
919
920 let mut transaction = historical_rocks_db.read_transaction();
922 transaction
923 .storage_as_mut::<ContractsAssets>()
924 .insert(&key(), &123)
925 .unwrap();
926 historical_rocks_db
927 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
928 .unwrap();
929 let v2_entries = historical_rocks_db
930 .db
931 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
932 .collect::<Vec<_>>();
933 let v1_entries = historical_rocks_db
934 .db
935 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
936 .collect::<Vec<_>>();
937
938 assert_eq!(v2_entries.len(), 1);
940 assert_eq!(v1_entries.len(), 0);
941 }
942
943 #[test]
944 fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() {
945 let temp_dir = tempfile::tempdir().unwrap();
947 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
948 &temp_dir,
949 DatabaseConfig::config_for_tests(),
950 )
951 .unwrap();
952 let historical_rocks_db = HistoricalRocksDB::new(
953 rocks_db,
954 StateRewindPolicy::RewindRange {
955 size: NonZeroU64::new(1).unwrap(),
956 },
957 )
958 .unwrap();
959
960 let mut transaction = historical_rocks_db.read_transaction();
962 transaction
963 .storage_as_mut::<ContractsAssets>()
964 .insert(&key(), &123)
965 .unwrap();
966 historical_rocks_db
967 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
968 .unwrap();
969
970 let mut migration_transaction = StorageTransaction::transaction(
973 &historical_rocks_db.db,
974 ConflictPolicy::Overwrite,
975 Changes::default(),
976 );
977
978 let v2_changes = migration_transaction
979 .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
980 .take(&1u64)
981 .unwrap()
982 .unwrap();
983 migration_transaction
984 .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
985 .insert(&1u64, &v2_changes)
986 .unwrap();
987
988 historical_rocks_db
989 .db
990 .commit_changes(&migration_transaction.into_changes().into())
991 .unwrap();
992
993 let v2_entries = historical_rocks_db
995 .db
996 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
997 .collect::<Vec<_>>();
998 let v1_entries = historical_rocks_db
999 .db
1000 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1001 .collect::<Vec<_>>();
1002
1003 assert_eq!(v2_entries.len(), 0);
1004 assert_eq!(v1_entries.len(), 1);
1005
1006 drop(historical_rocks_db);
1007
1008 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1010 &temp_dir,
1011 DatabaseConfig::config_for_tests(),
1012 )
1013 .unwrap();
1014 let historical_rocks_db = HistoricalRocksDB::new(
1015 rocks_db,
1016 StateRewindPolicy::RewindRange {
1017 size: NonZeroU64::new(1).unwrap(),
1018 },
1019 )
1020 .unwrap();
1021 let result = historical_rocks_db.rollback_last_block();
1022
1023 assert_eq!(result, Ok(1));
1025 let v2_entries = historical_rocks_db
1026 .db
1027 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1028 .collect::<Vec<_>>();
1029 let v1_entries = historical_rocks_db
1030 .db
1031 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1032 .collect::<Vec<_>>();
1033 assert_eq!(v2_entries.len(), 0);
1034 assert_eq!(v1_entries.len(), 0);
1035 }
1036
1037 #[test]
1038 fn state_rewind_policy__rewind_range_1__migration_removes_v1() {
1039 let temp_dir = tempfile::tempdir().unwrap();
1040 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1041 &temp_dir,
1042 DatabaseConfig::config_for_tests(),
1043 )
1044 .unwrap();
1045 let historical_rocks_db =
1046 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1047
1048 const BLOCKS: u8 = 10;
1049
1050 for i in 1..=BLOCKS {
1054 let height = i as u32;
1055 let height_64 = i as u64;
1056 let mut transaction = historical_rocks_db.read_transaction();
1057 let key = ContractsAssetKey::new(&[i; 32].into(), &[213; 32].into());
1058 transaction
1059 .storage_as_mut::<ContractsAssets>()
1060 .insert(&key, &123)
1061 .unwrap();
1062 historical_rocks_db
1063 .commit_changes(Some(height.into()), transaction.into_changes().into())
1064 .unwrap();
1065
1066 let mut migration_transaction = StorageTransaction::transaction(
1068 &historical_rocks_db.db,
1069 ConflictPolicy::Overwrite,
1070 Changes::default(),
1071 );
1072
1073 let v2_changes = migration_transaction
1074 .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
1075 .take(&height_64)
1076 .unwrap()
1077 .unwrap();
1078 migration_transaction
1079 .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
1080 .insert(&height_64, &v2_changes)
1081 .unwrap();
1082
1083 historical_rocks_db
1084 .db
1085 .commit_changes(&migration_transaction.into_changes().into())
1086 .unwrap();
1087
1088 let v2_entries = historical_rocks_db
1090 .db
1091 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1092 .collect::<Vec<_>>();
1093 let v1_entries = historical_rocks_db
1094 .db
1095 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1096 .collect::<Vec<_>>();
1097
1098 assert_eq!(v2_entries.len(), 0);
1099 assert_eq!(v1_entries.len(), i as usize);
1100 }
1101
1102 drop(historical_rocks_db);
1103
1104 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1109 &temp_dir,
1110 DatabaseConfig::config_for_tests(),
1111 )
1112 .unwrap();
1113 let historical_rocks_db = HistoricalRocksDB::new(
1114 rocks_db,
1115 StateRewindPolicy::RewindRange {
1116 size: NonZeroU64::new(1).unwrap(),
1117 },
1118 )
1119 .unwrap();
1120 historical_rocks_db
1121 .commit_changes(Some((BLOCKS as u32 + 1).into()), StorageChanges::default())
1122 .unwrap();
1123 historical_rocks_db
1124 .commit_changes(Some((BLOCKS as u32 + 2).into()), StorageChanges::default())
1125 .unwrap();
1126
1127 let v2_entries = historical_rocks_db
1129 .db
1130 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1131 .collect::<Vec<_>>();
1132 let v1_entries = historical_rocks_db
1133 .db
1134 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1135 .collect::<Vec<_>>();
1136 assert_eq!(v2_entries.len(), 1);
1137 assert_eq!(v1_entries.len(), 0);
1138 }
1139
1140 #[test]
1141 fn rollback_last_block_works_with_v2() {
1142 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1144
1145 let historical_rocks_db =
1146 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1147
1148 for i in 1..=1000u32 {
1151 let mut transaction = historical_rocks_db.read_transaction();
1152 transaction
1153 .storage_as_mut::<ContractsAssets>()
1154 .insert(&key(), &(123 + i as u64))
1155 .unwrap();
1156 historical_rocks_db
1157 .commit_changes(Some(i.into()), transaction.into_changes().into())
1158 .unwrap();
1159 }
1160 let results: Vec<Result<u64, _>> = (0..1000u32)
1162 .map(|_| historical_rocks_db.rollback_last_block())
1163 .collect();
1164
1165 for (i, result) in results.iter().enumerate() {
1169 assert_eq!(result, &Ok(1000 - i as u64));
1170 }
1171 }
1172
1173 #[test]
1174 fn state_rewind_policy__rewind_range_1__second_rollback_fails() {
1175 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1177 let historical_rocks_db = HistoricalRocksDB::new(
1178 rocks_db,
1179 StateRewindPolicy::RewindRange {
1180 size: NonZeroU64::new(1).unwrap(),
1181 },
1182 )
1183 .unwrap();
1184
1185 let mut transaction = historical_rocks_db.read_transaction();
1186 transaction
1187 .storage_as_mut::<ContractsAssets>()
1188 .insert(&key(), &123)
1189 .unwrap();
1190 historical_rocks_db
1191 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
1192 .unwrap();
1193 historical_rocks_db.rollback_last_block().unwrap();
1194
1195 let result = historical_rocks_db.rollback_last_block();
1197
1198 assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
1200 }
1201
1202 #[test]
1203 fn state_rewind_policy__rewind_range_10__rollbacks_work() {
1204 const ITERATIONS: usize = 100;
1205
1206 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1207 let historical_rocks_db = HistoricalRocksDB::new(
1208 rocks_db,
1209 StateRewindPolicy::RewindRange {
1210 size: NonZeroU64::new(ITERATIONS as u64).unwrap(),
1211 },
1212 )
1213 .unwrap();
1214
1215 fn key(height: u32) -> ContractsAssetKey {
1216 ContractsAssetKey::new(&[height as u8; 32].into(), &[213; 32].into())
1217 }
1218
1219 for height in 1..=ITERATIONS {
1220 let height = height as u32;
1221 let key = key(height);
1222
1223 let mut transaction = historical_rocks_db.read_transaction();
1224 transaction
1225 .storage_as_mut::<ContractsAssets>()
1226 .insert(&key, &123)
1227 .unwrap();
1228 historical_rocks_db
1229 .commit_changes(Some(height.into()), transaction.into_changes().into())
1230 .unwrap();
1231 }
1232
1233 for height in (1..=ITERATIONS).rev() {
1234 let entries = historical_rocks_db
1236 .db
1237 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1238 .collect::<Vec<_>>();
1239 assert_eq!(entries.len(), height);
1240
1241 let result = historical_rocks_db.rollback_last_block();
1243
1244 assert_eq!(result, Ok(height as u64));
1246 let entries = historical_rocks_db
1247 .db
1248 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1249 .collect::<Vec<_>>();
1250 assert_eq!(entries.len(), height - 1);
1251 }
1252 }
1253}