1use crate::{
2 database::{
3 Error as DatabaseError,
4 Result as DatabaseResult,
5 database_description::{
6 DatabaseDescription,
7 DatabaseHeight,
8 },
9 },
10 state::{
11 ColumnType,
12 HeightType,
13 IterableKeyValueView,
14 KeyValueView,
15 TransactableStorage,
16 historical_rocksdb::{
17 description::{
18 Column,
19 Historical,
20 historical_duplicate_column_id,
21 },
22 view_at_height::ViewAtHeight,
23 },
24 iterable_key_value_view::IterableKeyValueViewWrapper,
25 key_value_view::KeyValueViewWrapper,
26 rocks_db::RocksDb,
27 },
28};
29use fuel_core_storage::{
30 Error as StorageError,
31 Result as StorageResult,
32 StorageAsMut,
33 StorageAsRef,
34 iter::{
35 BoxedIter,
36 IterDirection,
37 IterableStore,
38 IteratorOverTable,
39 },
40 kv_store::{
41 KVItem,
42 KeyValueInspect,
43 Value,
44 WriteOperation,
45 },
46 not_found,
47 structured_storage::TableWithBlueprint,
48 transactional::{
49 Changes,
50 ConflictPolicy,
51 ReadTransaction,
52 StorageChanges,
53 StorageTransaction,
54 },
55};
56use itertools::Itertools;
57use modifications_history::{
58 ModificationsHistoryV1,
59 ModificationsHistoryV2,
60};
61use serde::{
62 Deserialize,
63 Serialize,
64};
65use std::{
66 num::NonZeroU64,
67 path::Path,
68};
69
70use super::rocks_db::DatabaseConfig;
71
72pub mod description;
73pub mod modifications_history;
74pub mod view_at_height;
75
76#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
77pub enum StateRewindPolicy {
79 #[default]
80 NoRewind,
82 RewindFullRange,
84 RewindRange { size: NonZeroU64 },
87}
88
89#[derive(Debug)]
91pub struct HistoricalRocksDB<Description> {
92 state_rewind_policy: StateRewindPolicy,
94 db: RocksDb<Historical<Description>>,
96 has_v1_history: core::sync::atomic::AtomicBool,
98}
99
100impl<Description> HistoricalRocksDB<Description>
101where
102 Description: DatabaseDescription,
103{
104 pub fn new(
105 db: RocksDb<Historical<Description>>,
106 state_rewind_policy: StateRewindPolicy,
107 ) -> DatabaseResult<Self> {
108 let has_v1_history = db
109 .iter_all::<ModificationsHistoryV1<Description>>(None)
110 .next()
111 .is_some();
112
113 Ok(Self {
114 state_rewind_policy,
115 db,
116 has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
117 })
118 }
119
120 pub fn default_open<P: AsRef<Path>>(
121 path: P,
122 state_rewind_policy: StateRewindPolicy,
123 database_config: DatabaseConfig,
124 ) -> DatabaseResult<Self> {
125 let db = RocksDb::<Historical<Description>>::default_open(path, database_config)?;
126 let has_v1_history = db
127 .iter_all::<ModificationsHistoryV1<Description>>(None)
128 .next()
129 .is_some();
130 Ok(Self {
131 state_rewind_policy,
132 db,
133 has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
134 })
135 }
136
137 fn reverse_history_changes(&self, changes: &Changes) -> StorageResult<Changes> {
138 let mut reverse_changes = Changes::default();
139
140 for (column, column_changes) in changes {
141 let results = self.db.multi_get(*column, column_changes.keys())?;
142
143 let entry = reverse_changes
144 .entry(*column)
145 .or_insert_with(Default::default);
146
147 for (was, (key, became)) in results.into_iter().zip(column_changes.iter()) {
148 match (was, became) {
149 (None, WriteOperation::Remove) => {
150 }
152 (None, WriteOperation::Insert(_)) => {
153 entry.insert(key.clone(), WriteOperation::Remove);
154 }
155 (Some(old_value), WriteOperation::Remove) => {
156 entry.insert(
157 key.clone(),
158 WriteOperation::Insert(old_value.into()),
159 );
160 }
161 (Some(old_value), WriteOperation::Insert(new_value)) => {
162 if *old_value != **new_value {
163 entry.insert(
164 key.clone(),
165 WriteOperation::Insert(old_value.into()),
166 );
167 }
168 }
169 }
170 }
171 }
172 Ok(reverse_changes)
173 }
174
175 pub fn latest_view(&self) -> RocksDb<Description> {
176 self.db.create_snapshot_generic()
177 }
178
179 pub fn create_view_at(
183 &self,
184 height: &Description::Height,
185 ) -> StorageResult<ViewAtHeight<Description>> {
186 let height_for_the_state = height.as_u64();
191 let rollback_height = height_for_the_state.saturating_add(1);
192 let has_v1_history = self.has_v1_history();
193 let tx = self.db.read_transaction();
194 let mut contains = multiversion_contains(&tx, rollback_height, has_v1_history)?;
195
196 if !contains {
202 contains = multiversion_contains(&tx, height_for_the_state, has_v1_history)?;
203 }
204
205 if !contains {
206 return Err(DatabaseError::NoHistoryForRequestedHeight {
207 requested_height: height_for_the_state,
208 }
209 .into());
210 }
211 let latest_view = self.db.create_snapshot_generic::<Historical<Description>>();
212
213 Ok(ViewAtHeight::new(rollback_height, latest_view))
214 }
215
216 fn store_modifications_history<T>(
217 &self,
218 storage_transaction: &mut StorageTransaction<T>,
219 height: &Description::Height,
220 ) -> StorageResult<()>
221 where
222 T: KeyValueInspect<Column = Column<Description>>,
223 {
224 if self.state_rewind_policy == StateRewindPolicy::NoRewind {
225 return Ok(());
226 }
227 let height_u64 = height.as_u64();
228
229 let reverse_changes =
230 self.reverse_history_changes(storage_transaction.changes())?;
231
232 cleanup_old_changes(
233 self,
234 &height_u64,
235 storage_transaction,
236 &self.state_rewind_policy,
237 )?;
238
239 let old_changes = storage_transaction
240 .storage_as_mut::<ModificationsHistoryV2<Description>>()
241 .replace(&height_u64, &reverse_changes)?;
242
243 if let Some(old_changes) = old_changes {
244 tracing::warn!(
245 "Historical database committed twice the same height: {:?}",
246 height
247 );
248 remove_historical_modifications(
249 &height_u64,
250 storage_transaction,
251 &old_changes,
252 )?;
253 }
254
255 let historical_changes = reverse_changes
256 .into_iter()
257 .map(|(column, reverse_column_changes)| {
258 let historical_column_changes = reverse_column_changes
259 .into_iter()
260 .map(|(key, reverse_operation)| {
261 let height_key = height_key(&key, &height_u64).into();
262 let operation =
265 WriteOperation::Insert(serialize(&reverse_operation)?);
266 Ok::<_, StorageError>((height_key, operation))
267 })
268 .try_collect()?;
269
270 let historical_duplicate_column = historical_duplicate_column_id(column);
271 Ok::<_, StorageError>((
272 historical_duplicate_column,
273 historical_column_changes,
274 ))
275 })
276 .try_collect()?;
277
278 StorageTransaction::transaction(
281 storage_transaction,
282 ConflictPolicy::Overwrite,
283 historical_changes,
284 )
285 .commit()?;
286 Ok(())
287 }
288
289 fn remove_v1_entries(&self) -> StorageResult<()> {
290 if !self.has_v1_history() {
291 return Ok(())
292 }
293
294 self.db
295 .clear_table(ModificationsHistoryV1::<Description>::column())?;
296 self.has_v1_history
297 .store(false, core::sync::atomic::Ordering::Release);
298
299 Ok(())
300 }
301
302 #[cfg(test)]
303 fn multiversion_changes_heights(
304 &self,
305 direction: IterDirection,
306 has_v1_history: bool,
307 ) -> (Option<StorageResult<u64>>, Option<StorageResult<u64>>) {
308 let v2_changes = self
309 .db
310 .iter_all_keys::<ModificationsHistoryV2<Description>>(Some(direction))
311 .next();
312 let v1_changes = has_v1_history
313 .then(|| {
314 self.db
315 .iter_all_keys::<ModificationsHistoryV1<Description>>(Some(direction))
316 .next()
317 })
318 .flatten();
319
320 (v2_changes, v1_changes)
321 }
322
323 #[cfg(test)]
324 fn rollback_last_block(&self) -> StorageResult<u64> {
325 let has_v1_history = self.has_v1_history();
326
327 let (v2_latest_height, v1_latest_height) =
328 self.multiversion_changes_heights(IterDirection::Reverse, has_v1_history);
329
330 let latest_height = match (v2_latest_height, v1_latest_height) {
331 (None, None) => Err(DatabaseError::ReachedEndOfHistory)?,
332 (Some(Ok(v1)), Some(Ok(v2))) => v1.max(v2),
333 (_, Some(v1_res)) => v1_res?,
334 (Some(v2_res), _) => v2_res?,
335 };
336
337 self.rollback_block_to(latest_height)?;
338
339 Ok(latest_height)
340 }
341
342 fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
343 let mut storage_transaction = self.db.read_transaction();
344
345 let last_changes = multiversion_take(
346 &mut storage_transaction,
347 height_to_rollback,
348 self.has_v1_history(),
349 )?
350 .ok_or(not_found!(ModificationsHistoryV2<Description>))?;
351
352 remove_historical_modifications(
353 &height_to_rollback,
354 &mut storage_transaction,
355 &last_changes,
356 )?;
357
358 StorageTransaction::transaction(
359 &mut storage_transaction,
360 ConflictPolicy::Overwrite,
361 last_changes,
362 )
363 .commit()?;
364
365 self.db
366 .commit_changes(&storage_transaction.into_changes().into())?;
367
368 Ok(())
369 }
370
371 fn has_v1_history(&self) -> bool {
372 use core::sync::atomic::Ordering;
373
374 self.has_v1_history.load(Ordering::Acquire)
375 }
376}
377
378fn multiversion_contains<Description, T>(
379 storage_transaction: &StorageTransaction<T>,
380 height: u64,
381 has_v1_history: bool,
382) -> StorageResult<bool>
383where
384 Description: DatabaseDescription,
385 T: KeyValueInspect<Column = Column<Description>>,
386{
387 let contains_v2 = storage_transaction
388 .storage_as_ref::<ModificationsHistoryV2<Description>>()
389 .contains_key(&height)?;
390
391 if !contains_v2 && has_v1_history {
392 let contains_v1 = storage_transaction
393 .storage_as_ref::<ModificationsHistoryV1<Description>>()
394 .contains_key(&height)?;
395 Ok(contains_v1)
396 } else {
397 Ok(contains_v2)
398 }
399}
400
401fn multiversion_take<Description, T>(
404 storage_transaction: &mut StorageTransaction<T>,
405 height: u64,
406 has_v1_history: bool,
407) -> StorageResult<Option<Changes>>
408where
409 Description: DatabaseDescription,
410 T: KeyValueInspect<Column = Column<Description>>,
411{
412 let v2_last_changes = storage_transaction
413 .storage_as_mut::<ModificationsHistoryV2<Description>>()
414 .take(&height)?;
415
416 if v2_last_changes.is_none() && has_v1_history {
417 let v1_last_changes = storage_transaction
418 .storage_as_mut::<ModificationsHistoryV1<Description>>()
419 .take(&height)?;
420 Ok(v1_last_changes)
421 } else {
422 Ok(v2_last_changes)
423 }
424}
425
426fn cleanup_old_changes<Description, T>(
427 db: &HistoricalRocksDB<Description>,
428 height: &u64,
429 storage_transaction: &mut StorageTransaction<T>,
430 state_rewind_policy: &StateRewindPolicy,
431) -> StorageResult<()>
432where
433 Description: DatabaseDescription,
434 T: KeyValueInspect<Column = Column<Description>>,
435{
436 match state_rewind_policy {
437 StateRewindPolicy::NoRewind => {
438 }
440 StateRewindPolicy::RewindFullRange => {
441 }
443 StateRewindPolicy::RewindRange { size } => {
444 let old_height = height.saturating_sub(size.get());
445
446 let old_changes = storage_transaction
447 .storage_as_mut::<ModificationsHistoryV2<Description>>()
448 .take(&old_height)?;
449
450 if let Some(old_changes) = old_changes {
451 remove_historical_modifications(
452 &old_height,
453 storage_transaction,
454 &old_changes,
455 )?;
456 db.remove_v1_entries()?;
458 }
459 }
460 }
461 Ok(())
462}
463
464fn remove_historical_modifications<Description, T>(
465 old_height: &u64,
466 storage_transaction: &mut StorageTransaction<T>,
467 reverse_changes: &Changes,
468) -> StorageResult<()>
469where
470 Description: DatabaseDescription,
471 T: KeyValueInspect<Column = Column<Description>>,
472{
473 let changes = reverse_changes
474 .iter()
475 .map(|(column, column_changes)| {
476 let historical_column_changes = column_changes
477 .keys()
478 .map(|key| {
479 let height_key = height_key(key, old_height).into();
480 let operation = WriteOperation::Remove;
481 (height_key, operation)
482 })
483 .collect();
484 let historical_duplicate_column = historical_duplicate_column_id(*column);
485 (historical_duplicate_column, historical_column_changes)
486 })
487 .collect();
488
489 StorageTransaction::transaction(
490 storage_transaction,
491 ConflictPolicy::Overwrite,
492 changes,
493 )
494 .commit()?;
495
496 Ok(())
497}
498
499impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
500where
501 Description: DatabaseDescription,
502{
503 type Column = Description::Column;
504
505 fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult<bool> {
506 self.db.exists(key, Column::OriginalColumn(column))
507 }
508
509 fn size_of_value(
510 &self,
511 key: &[u8],
512 column: Self::Column,
513 ) -> StorageResult<Option<usize>> {
514 self.db.size_of_value(key, Column::OriginalColumn(column))
515 }
516
517 fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
518 self.db.get(key, Column::OriginalColumn(column))
519 }
520
521 fn read(
522 &self,
523 key: &[u8],
524 column: Self::Column,
525 offset: usize,
526 buf: &mut [u8],
527 ) -> StorageResult<bool> {
528 self.db
529 .read(key, Column::OriginalColumn(column), offset, buf)
530 }
531}
532
533impl<Description> IterableStore for HistoricalRocksDB<Description>
534where
535 Description: DatabaseDescription,
536{
537 fn iter_store(
538 &self,
539 column: Self::Column,
540 prefix: Option<&[u8]>,
541 start: Option<&[u8]>,
542 direction: IterDirection,
543 ) -> BoxedIter<'_, KVItem> {
544 self.db
545 .iter_store(Column::OriginalColumn(column), prefix, start, direction)
546 }
547
548 fn iter_store_keys(
549 &self,
550 column: Self::Column,
551 prefix: Option<&[u8]>,
552 start: Option<&[u8]>,
553 direction: IterDirection,
554 ) -> BoxedIter<'_, fuel_core_storage::kv_store::KeyItem> {
555 self.db
556 .iter_store_keys(Column::OriginalColumn(column), prefix, start, direction)
557 }
558}
559
560impl<Description> TransactableStorage<Description::Height>
561 for HistoricalRocksDB<Description>
562where
563 Description: DatabaseDescription,
564{
565 fn commit_changes(
566 &self,
567 height: Option<Description::Height>,
568 mut changes: StorageChanges,
569 ) -> StorageResult<()> {
570 if let Some(height) = height
573 && self.state_rewind_policy != StateRewindPolicy::NoRewind
574 {
575 let all_changes = match changes {
576 StorageChanges::Changes(changes) => changes,
577 StorageChanges::ChangesList(list) => list.into_iter().flatten().collect(),
578 };
579 let mut storage_transaction = StorageTransaction::transaction(
580 &self.db,
581 ConflictPolicy::Overwrite,
582 all_changes,
583 );
584 self.store_modifications_history(&mut storage_transaction, &height)?;
585 changes = StorageChanges::Changes(storage_transaction.into_changes());
586 }
587
588 self.db.commit_changes(&changes)?;
589
590 Ok(())
591 }
592
593 fn view_at_height(
594 &self,
595 height: &Description::Height,
596 ) -> StorageResult<KeyValueView<ColumnType<Description>, HeightType<Description>>>
597 {
598 let view = self.create_view_at(height)?;
599 Ok(KeyValueView::from_storage_and_metadata(
600 KeyValueViewWrapper::new(view),
601 Some(*height),
602 ))
603 }
604
605 fn latest_view(
606 &self,
607 ) -> StorageResult<
608 IterableKeyValueView<ColumnType<Description>, HeightType<Description>>,
609 > {
610 let view = self.latest_view();
611 Ok(IterableKeyValueView::from_storage_and_metadata(
612 IterableKeyValueViewWrapper::new(view),
613 None,
614 ))
615 }
616
617 fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> {
618 self.rollback_block_to(height.as_u64())
619 }
620
621 fn shutdown(&self) {
622 self.db.shutdown()
623 }
624}
625
626pub fn height_key(key: &[u8], height: &u64) -> Vec<u8> {
627 let mut bytes = Vec::with_capacity(key.len().saturating_add(8));
628 let height_bytes = height.to_be_bytes();
629 bytes.extend_from_slice(key);
630 bytes.extend_from_slice(&height_bytes);
631 bytes
632}
633
634pub fn serialize<T>(t: &T) -> StorageResult<Value>
635where
636 T: Serialize + ?Sized,
637{
638 Ok(postcard::to_allocvec(&t)
639 .map_err(|err| StorageError::Codec(err.into()))?
640 .into())
641}
642
643pub fn deserialize<'a, T>(bytes: &'a [u8]) -> StorageResult<T>
644where
645 T: Deserialize<'a>,
646{
647 postcard::from_bytes(bytes).map_err(|err| StorageError::Codec(err.into()))
648}
649
650#[cfg(test)]
651#[allow(non_snake_case)]
652#[allow(clippy::cast_possible_truncation)]
653mod tests {
654 use super::*;
655 use crate::database::database_description::on_chain::OnChain;
656 use fuel_core_storage::{
657 ContractsAssetKey,
658 StorageAsMut,
659 StorageAsRef,
660 tables::ContractsAssets,
661 transactional::{
662 IntoTransaction,
663 ReadTransaction,
664 },
665 };
666
667 #[test]
668 fn test_height_key() {
669 let key = b"key";
670 let height = 42;
671 let expected = b"key\x00\x00\x00\x00\x00\x00\x00\x2a";
672 assert_eq!(height_key(key, &height), expected);
673 }
674
675 fn key() -> ContractsAssetKey {
676 ContractsAssetKey::new(&[123; 32].into(), &[213; 32].into())
677 }
678
679 #[test]
680 fn historical_rocksdb_read_original_database_works() {
681 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
683 let historical_rocks_db =
684 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
685
686 let mut transaction = historical_rocks_db.read_transaction();
688 transaction
689 .storage_as_mut::<ContractsAssets>()
690 .insert(&key(), &123)
691 .unwrap();
692 historical_rocks_db
693 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
694 .unwrap();
695
696 let mut transaction = historical_rocks_db.read_transaction();
698 transaction
699 .storage_as_mut::<ContractsAssets>()
700 .insert(&key(), &321)
701 .unwrap();
702 historical_rocks_db
703 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
704 .unwrap();
705
706 let read_view = historical_rocks_db.read_transaction();
708 let latest_balance = read_view
709 .storage_as_ref::<ContractsAssets>()
710 .get(&key())
711 .unwrap()
712 .unwrap()
713 .into_owned();
714
715 assert_eq!(latest_balance, 321);
717 }
718
719 #[test]
720 fn historical_rocksdb_read_latest_view_works() {
721 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
723 let historical_rocks_db =
724 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
725
726 let mut transaction = historical_rocks_db.read_transaction();
728 transaction
729 .storage_as_mut::<ContractsAssets>()
730 .insert(&key(), &123)
731 .unwrap();
732 historical_rocks_db
733 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
734 .unwrap();
735
736 let mut transaction = historical_rocks_db.read_transaction();
738 transaction
739 .storage_as_mut::<ContractsAssets>()
740 .insert(&key(), &321)
741 .unwrap();
742 historical_rocks_db
743 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
744 .unwrap();
745
746 let latest_view = historical_rocks_db.latest_view().into_transaction();
748 let latest_balance = latest_view
749 .storage_as_ref::<ContractsAssets>()
750 .get(&key())
751 .unwrap()
752 .unwrap()
753 .into_owned();
754
755 assert_eq!(latest_balance, 321);
757 }
758
759 #[test]
760 fn state_rewind_policy__no_rewind__create_view_at__fails() {
761 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
763 let historical_rocks_db =
764 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
765
766 let mut transaction = historical_rocks_db.read_transaction();
767 transaction
768 .storage_as_mut::<ContractsAssets>()
769 .insert(&key(), &123)
770 .unwrap();
771 historical_rocks_db
772 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
773 .unwrap();
774
775 let view_at_height_1 =
777 historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
778
779 assert_eq!(
781 view_at_height_1,
782 Err(DatabaseError::NoHistoryForRequestedHeight {
783 requested_height: 1,
784 }
785 .into())
786 );
787 }
788
789 #[test]
790 fn state_rewind_policy__no_rewind__rollback__fails() {
791 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
793 let historical_rocks_db =
794 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
795
796 let mut transaction = historical_rocks_db.read_transaction();
797 transaction
798 .storage_as_mut::<ContractsAssets>()
799 .insert(&key(), &123)
800 .unwrap();
801 historical_rocks_db
802 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
803 .unwrap();
804
805 let result = historical_rocks_db.rollback_last_block();
807
808 assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
810 }
811
812 #[test]
813 fn state_rewind_policy__rewind_range_1__cleanup_in_range_works() {
814 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
816 let historical_rocks_db = HistoricalRocksDB::new(
817 rocks_db,
818 StateRewindPolicy::RewindRange {
819 size: NonZeroU64::new(1).unwrap(),
820 },
821 )
822 .unwrap();
823
824 let mut transaction = historical_rocks_db.read_transaction();
825 transaction
826 .storage_as_mut::<ContractsAssets>()
827 .insert(&key(), &123)
828 .unwrap();
829 historical_rocks_db
830 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
831 .unwrap();
832
833 let mut transaction = historical_rocks_db.read_transaction();
835 transaction
836 .storage_as_mut::<ContractsAssets>()
837 .insert(&key(), &321)
838 .unwrap();
839 historical_rocks_db
840 .commit_changes(Some(2u32.into()), transaction.into_changes().into())
841 .unwrap();
842
843 let view_at_height_1 =
845 historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
846 let view_at_height_0 =
847 historical_rocks_db.create_view_at(&0u32.into()).map(|_| ());
848 assert_eq!(view_at_height_1, Ok(()));
849 assert_eq!(
850 view_at_height_0,
851 Err(DatabaseError::NoHistoryForRequestedHeight {
852 requested_height: 0,
853 }
854 .into())
855 );
856 }
857
858 #[test]
859 fn state_rewind_policy__rewind_range_1__rollback_works() {
860 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
862 let historical_rocks_db = HistoricalRocksDB::new(
863 rocks_db,
864 StateRewindPolicy::RewindRange {
865 size: NonZeroU64::new(1).unwrap(),
866 },
867 )
868 .unwrap();
869
870 let mut transaction = historical_rocks_db.read_transaction();
871 transaction
872 .storage_as_mut::<ContractsAssets>()
873 .insert(&key(), &123)
874 .unwrap();
875 historical_rocks_db
876 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
877 .unwrap();
878 let entries = historical_rocks_db
879 .db
880 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
881 .collect::<Vec<_>>();
882 assert_eq!(entries.len(), 1);
883
884 let result = historical_rocks_db.rollback_last_block();
886
887 assert_eq!(result, Ok(1));
889 let entries = historical_rocks_db
890 .db
891 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
892 .collect::<Vec<_>>();
893 assert_eq!(entries.len(), 0);
894 }
895
896 #[test]
897 fn state_rewind_policy__rewind_range_1__rollback_uses_v2() {
898 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
900 let historical_rocks_db = HistoricalRocksDB::new(
901 rocks_db,
902 StateRewindPolicy::RewindRange {
903 size: NonZeroU64::new(1).unwrap(),
904 },
905 )
906 .unwrap();
907
908 let mut transaction = historical_rocks_db.read_transaction();
910 transaction
911 .storage_as_mut::<ContractsAssets>()
912 .insert(&key(), &123)
913 .unwrap();
914 historical_rocks_db
915 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
916 .unwrap();
917 let v2_entries = historical_rocks_db
918 .db
919 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
920 .collect::<Vec<_>>();
921 let v1_entries = historical_rocks_db
922 .db
923 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
924 .collect::<Vec<_>>();
925
926 assert_eq!(v2_entries.len(), 1);
928 assert_eq!(v1_entries.len(), 0);
929 }
930
931 #[test]
932 fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() {
933 let temp_dir = tempfile::tempdir().unwrap();
935 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
936 &temp_dir,
937 DatabaseConfig::config_for_tests(),
938 )
939 .unwrap();
940 let historical_rocks_db = HistoricalRocksDB::new(
941 rocks_db,
942 StateRewindPolicy::RewindRange {
943 size: NonZeroU64::new(1).unwrap(),
944 },
945 )
946 .unwrap();
947
948 let mut transaction = historical_rocks_db.read_transaction();
950 transaction
951 .storage_as_mut::<ContractsAssets>()
952 .insert(&key(), &123)
953 .unwrap();
954 historical_rocks_db
955 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
956 .unwrap();
957
958 let mut migration_transaction = StorageTransaction::transaction(
961 &historical_rocks_db.db,
962 ConflictPolicy::Overwrite,
963 Changes::default(),
964 );
965
966 let v2_changes = migration_transaction
967 .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
968 .take(&1u64)
969 .unwrap()
970 .unwrap();
971 migration_transaction
972 .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
973 .insert(&1u64, &v2_changes)
974 .unwrap();
975
976 historical_rocks_db
977 .db
978 .commit_changes(&migration_transaction.into_changes().into())
979 .unwrap();
980
981 let v2_entries = historical_rocks_db
983 .db
984 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
985 .collect::<Vec<_>>();
986 let v1_entries = historical_rocks_db
987 .db
988 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
989 .collect::<Vec<_>>();
990
991 assert_eq!(v2_entries.len(), 0);
992 assert_eq!(v1_entries.len(), 1);
993
994 drop(historical_rocks_db);
995
996 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
998 &temp_dir,
999 DatabaseConfig::config_for_tests(),
1000 )
1001 .unwrap();
1002 let historical_rocks_db = HistoricalRocksDB::new(
1003 rocks_db,
1004 StateRewindPolicy::RewindRange {
1005 size: NonZeroU64::new(1).unwrap(),
1006 },
1007 )
1008 .unwrap();
1009 let result = historical_rocks_db.rollback_last_block();
1010
1011 assert_eq!(result, Ok(1));
1013 let v2_entries = historical_rocks_db
1014 .db
1015 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1016 .collect::<Vec<_>>();
1017 let v1_entries = historical_rocks_db
1018 .db
1019 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1020 .collect::<Vec<_>>();
1021 assert_eq!(v2_entries.len(), 0);
1022 assert_eq!(v1_entries.len(), 0);
1023 }
1024
1025 #[test]
1026 fn state_rewind_policy__rewind_range_1__migration_removes_v1() {
1027 let temp_dir = tempfile::tempdir().unwrap();
1028 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1029 &temp_dir,
1030 DatabaseConfig::config_for_tests(),
1031 )
1032 .unwrap();
1033 let historical_rocks_db =
1034 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1035
1036 const BLOCKS: u8 = 10;
1037
1038 for i in 1..=BLOCKS {
1042 let height = i as u32;
1043 let height_64 = i as u64;
1044 let mut transaction = historical_rocks_db.read_transaction();
1045 let key = ContractsAssetKey::new(&[i; 32].into(), &[213; 32].into());
1046 transaction
1047 .storage_as_mut::<ContractsAssets>()
1048 .insert(&key, &123)
1049 .unwrap();
1050 historical_rocks_db
1051 .commit_changes(Some(height.into()), transaction.into_changes().into())
1052 .unwrap();
1053
1054 let mut migration_transaction = StorageTransaction::transaction(
1056 &historical_rocks_db.db,
1057 ConflictPolicy::Overwrite,
1058 Changes::default(),
1059 );
1060
1061 let v2_changes = migration_transaction
1062 .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
1063 .take(&height_64)
1064 .unwrap()
1065 .unwrap();
1066 migration_transaction
1067 .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
1068 .insert(&height_64, &v2_changes)
1069 .unwrap();
1070
1071 historical_rocks_db
1072 .db
1073 .commit_changes(&migration_transaction.into_changes().into())
1074 .unwrap();
1075
1076 let v2_entries = historical_rocks_db
1078 .db
1079 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1080 .collect::<Vec<_>>();
1081 let v1_entries = historical_rocks_db
1082 .db
1083 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1084 .collect::<Vec<_>>();
1085
1086 assert_eq!(v2_entries.len(), 0);
1087 assert_eq!(v1_entries.len(), i as usize);
1088 }
1089
1090 drop(historical_rocks_db);
1091
1092 let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1097 &temp_dir,
1098 DatabaseConfig::config_for_tests(),
1099 )
1100 .unwrap();
1101 let historical_rocks_db = HistoricalRocksDB::new(
1102 rocks_db,
1103 StateRewindPolicy::RewindRange {
1104 size: NonZeroU64::new(1).unwrap(),
1105 },
1106 )
1107 .unwrap();
1108 historical_rocks_db
1109 .commit_changes(Some((BLOCKS as u32 + 1).into()), StorageChanges::default())
1110 .unwrap();
1111 historical_rocks_db
1112 .commit_changes(Some((BLOCKS as u32 + 2).into()), StorageChanges::default())
1113 .unwrap();
1114
1115 let v2_entries = historical_rocks_db
1117 .db
1118 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1119 .collect::<Vec<_>>();
1120 let v1_entries = historical_rocks_db
1121 .db
1122 .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1123 .collect::<Vec<_>>();
1124 assert_eq!(v2_entries.len(), 1);
1125 assert_eq!(v1_entries.len(), 0);
1126 }
1127
1128 #[test]
1129 fn rollback_last_block_works_with_v2() {
1130 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1132
1133 let historical_rocks_db =
1134 HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1135
1136 for i in 1..=1000u32 {
1139 let mut transaction = historical_rocks_db.read_transaction();
1140 transaction
1141 .storage_as_mut::<ContractsAssets>()
1142 .insert(&key(), &(123 + i as u64))
1143 .unwrap();
1144 historical_rocks_db
1145 .commit_changes(Some(i.into()), transaction.into_changes().into())
1146 .unwrap();
1147 }
1148 let results: Vec<Result<u64, _>> = (0..1000u32)
1150 .map(|_| historical_rocks_db.rollback_last_block())
1151 .collect();
1152
1153 for (i, result) in results.iter().enumerate() {
1157 assert_eq!(result, &Ok(1000 - i as u64));
1158 }
1159 }
1160
1161 #[test]
1162 fn state_rewind_policy__rewind_range_1__second_rollback_fails() {
1163 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1165 let historical_rocks_db = HistoricalRocksDB::new(
1166 rocks_db,
1167 StateRewindPolicy::RewindRange {
1168 size: NonZeroU64::new(1).unwrap(),
1169 },
1170 )
1171 .unwrap();
1172
1173 let mut transaction = historical_rocks_db.read_transaction();
1174 transaction
1175 .storage_as_mut::<ContractsAssets>()
1176 .insert(&key(), &123)
1177 .unwrap();
1178 historical_rocks_db
1179 .commit_changes(Some(1u32.into()), transaction.into_changes().into())
1180 .unwrap();
1181 historical_rocks_db.rollback_last_block().unwrap();
1182
1183 let result = historical_rocks_db.rollback_last_block();
1185
1186 assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
1188 }
1189
1190 #[test]
1191 fn state_rewind_policy__rewind_range_10__rollbacks_work() {
1192 const ITERATIONS: usize = 100;
1193
1194 let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1195 let historical_rocks_db = HistoricalRocksDB::new(
1196 rocks_db,
1197 StateRewindPolicy::RewindRange {
1198 size: NonZeroU64::new(ITERATIONS as u64).unwrap(),
1199 },
1200 )
1201 .unwrap();
1202
1203 fn key(height: u32) -> ContractsAssetKey {
1204 ContractsAssetKey::new(&[height as u8; 32].into(), &[213; 32].into())
1205 }
1206
1207 for height in 1..=ITERATIONS {
1208 let height = height as u32;
1209 let key = key(height);
1210
1211 let mut transaction = historical_rocks_db.read_transaction();
1212 transaction
1213 .storage_as_mut::<ContractsAssets>()
1214 .insert(&key, &123)
1215 .unwrap();
1216 historical_rocks_db
1217 .commit_changes(Some(height.into()), transaction.into_changes().into())
1218 .unwrap();
1219 }
1220
1221 for height in (1..=ITERATIONS).rev() {
1222 let entries = historical_rocks_db
1224 .db
1225 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1226 .collect::<Vec<_>>();
1227 assert_eq!(entries.len(), height);
1228
1229 let result = historical_rocks_db.rollback_last_block();
1231
1232 assert_eq!(result, Ok(height as u64));
1234 let entries = historical_rocks_db
1235 .db
1236 .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1237 .collect::<Vec<_>>();
1238 assert_eq!(entries.len(), height - 1);
1239 }
1240 }
1241}