Skip to main content

fuel_core/state/
historical_rocksdb.rs

1use crate::{
2    database::{
3        Error as DatabaseError,
4        Result as DatabaseResult,
5        database_description::{
6            DatabaseDescription,
7            DatabaseHeight,
8        },
9    },
10    state::{
11        ColumnType,
12        HeightType,
13        IterableKeyValueView,
14        KeyValueView,
15        TransactableStorage,
16        historical_rocksdb::{
17            description::{
18                Column,
19                Historical,
20                historical_duplicate_column_id,
21            },
22            view_at_height::ViewAtHeight,
23        },
24        iterable_key_value_view::IterableKeyValueViewWrapper,
25        key_value_view::KeyValueViewWrapper,
26        rocks_db::RocksDb,
27    },
28};
29use fuel_core_storage::{
30    Error as StorageError,
31    Result as StorageResult,
32    StorageAsMut,
33    StorageAsRef,
34    StorageReadError,
35    iter::{
36        BoxedIter,
37        IterDirection,
38        IterableStore,
39        IteratorOverTable,
40    },
41    kv_store::{
42        KVItem,
43        KeyValueInspect,
44        Value,
45        WriteOperation,
46    },
47    not_found,
48    structured_storage::TableWithBlueprint,
49    transactional::{
50        Changes,
51        ConflictPolicy,
52        ReadTransaction,
53        StorageChanges,
54        StorageTransaction,
55    },
56};
57use itertools::Itertools;
58use modifications_history::{
59    ModificationsHistoryV1,
60    ModificationsHistoryV2,
61};
62use serde::{
63    Deserialize,
64    Serialize,
65};
66use std::{
67    num::NonZeroU64,
68    path::Path,
69};
70
71use super::rocks_db::DatabaseConfig;
72
73pub mod description;
74pub mod modifications_history;
75pub mod view_at_height;
76
77#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
78/// Defined policies for the state rewind behaviour of the database.
79pub enum StateRewindPolicy {
80    #[default]
81    /// The checkpoint will be created only for the latest height.
82    NoRewind,
83    /// The checkpoint will be created for each height.
84    RewindFullRange,
85    /// The checkpoint will be created for each height
86    /// in the range `[latest_height-size..latest_height]`.
87    RewindRange { size: NonZeroU64 },
88}
89
90/// Implementation of a database
91#[derive(Debug)]
92pub struct HistoricalRocksDB<Description> {
93    /// The [`StateRewindPolicy`] used by the historical rocksdb
94    state_rewind_policy: StateRewindPolicy,
95    /// The Description of the database.
96    db: RocksDb<Historical<Description>>,
97    /// Flag indicating if the database has a history of changes stored in `ModificationsHistoryV1`.
98    has_v1_history: core::sync::atomic::AtomicBool,
99}
100
101impl<Description> HistoricalRocksDB<Description>
102where
103    Description: DatabaseDescription,
104{
105    pub fn new(
106        db: RocksDb<Historical<Description>>,
107        state_rewind_policy: StateRewindPolicy,
108    ) -> DatabaseResult<Self> {
109        let has_v1_history = db
110            .iter_all::<ModificationsHistoryV1<Description>>(None)
111            .next()
112            .is_some();
113
114        Ok(Self {
115            state_rewind_policy,
116            db,
117            has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
118        })
119    }
120
121    pub fn default_open<P: AsRef<Path>>(
122        path: P,
123        state_rewind_policy: StateRewindPolicy,
124        database_config: DatabaseConfig,
125    ) -> DatabaseResult<Self> {
126        let db = RocksDb::<Historical<Description>>::default_open(path, database_config)?;
127        let has_v1_history = db
128            .iter_all::<ModificationsHistoryV1<Description>>(None)
129            .next()
130            .is_some();
131        Ok(Self {
132            state_rewind_policy,
133            db,
134            has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
135        })
136    }
137
138    fn reverse_history_changes(&self, changes: &Changes) -> StorageResult<Changes> {
139        let mut reverse_changes = Changes::default();
140
141        for (column, column_changes) in changes {
142            let results = self.db.multi_get(*column, column_changes.keys())?;
143
144            let entry = reverse_changes
145                .entry(*column)
146                .or_insert_with(Default::default);
147
148            for (was, (key, became)) in results.into_iter().zip(column_changes.iter()) {
149                match (was, became) {
150                    (None, WriteOperation::Remove) => {
151                        // Do nothing since it was not existing, and it was removed.
152                    }
153                    (None, WriteOperation::Insert(_)) => {
154                        entry.insert(key.clone(), WriteOperation::Remove);
155                    }
156                    (Some(old_value), WriteOperation::Remove) => {
157                        entry.insert(
158                            key.clone(),
159                            WriteOperation::Insert(old_value.into()),
160                        );
161                    }
162                    (Some(old_value), WriteOperation::Insert(new_value)) => {
163                        if *old_value != **new_value {
164                            entry.insert(
165                                key.clone(),
166                                WriteOperation::Insert(old_value.into()),
167                            );
168                        }
169                    }
170                }
171            }
172        }
173        Ok(reverse_changes)
174    }
175
176    pub fn latest_view(&self) -> RocksDb<Description> {
177        self.db.create_snapshot_generic()
178    }
179
180    /// Create a view at a specific height.
181    /// This function relies on the fact that all modifications are sequential
182    /// and monotonically grow the height without gaps.
183    pub fn create_view_at(
184        &self,
185        height: &Description::Height,
186    ) -> StorageResult<ViewAtHeight<Description>> {
187        // Each height stores reverse modification caused by the corresponding
188        // block at the same height. Applying reverse changes at height `X`
189        // gives us a state at height `X - 1`. If we want a state at height `X`,
190        // we need to apply all modifications up to `X + 1`.
191        let height_for_the_state = height.as_u64();
192        let rollback_height = height_for_the_state.saturating_add(1);
193        let has_v1_history = self.has_v1_history();
194        let tx = self.db.read_transaction();
195        let mut contains = multiversion_contains(&tx, rollback_height, has_v1_history)?;
196
197        // If we don't have any modifications on the height after us,
198        // maybe we are at the end of the history.
199        // We can check it by checking our height.
200        // If modifications exist, then we are the last element of the history
201        // that contains an actual state.
202        if !contains {
203            contains = multiversion_contains(&tx, height_for_the_state, has_v1_history)?;
204        }
205
206        if !contains {
207            return Err(DatabaseError::NoHistoryForRequestedHeight {
208                requested_height: height_for_the_state,
209            }
210            .into());
211        }
212        let latest_view = self.db.create_snapshot_generic::<Historical<Description>>();
213
214        Ok(ViewAtHeight::new(rollback_height, latest_view))
215    }
216
217    fn store_modifications_history<T>(
218        &self,
219        storage_transaction: &mut StorageTransaction<T>,
220        height: &Description::Height,
221    ) -> StorageResult<()>
222    where
223        T: KeyValueInspect<Column = Column<Description>>,
224    {
225        if self.state_rewind_policy == StateRewindPolicy::NoRewind {
226            return Ok(());
227        }
228        let height_u64 = height.as_u64();
229
230        let reverse_changes =
231            self.reverse_history_changes(storage_transaction.changes())?;
232
233        cleanup_old_changes(
234            self,
235            &height_u64,
236            storage_transaction,
237            &self.state_rewind_policy,
238        )?;
239
240        let old_changes = storage_transaction
241            .storage_as_mut::<ModificationsHistoryV2<Description>>()
242            .replace(&height_u64, &reverse_changes)?;
243
244        if let Some(old_changes) = old_changes {
245            tracing::warn!(
246                "Historical database committed twice the same height: {:?}",
247                height
248            );
249            remove_historical_modifications(
250                &height_u64,
251                storage_transaction,
252                &old_changes,
253            )?;
254        }
255
256        let historical_changes = reverse_changes
257            .into_iter()
258            .map(|(column, reverse_column_changes)| {
259                let historical_column_changes = reverse_column_changes
260                    .into_iter()
261                    .map(|(key, reverse_operation)| {
262                        let height_key = height_key(&key, &height_u64).into();
263                        // We want to store the operation that we want
264                        // to apply during rollback as a value.
265                        let operation =
266                            WriteOperation::Insert(serialize(&reverse_operation)?);
267                        Ok::<_, StorageError>((height_key, operation))
268                    })
269                    .try_collect()?;
270
271                let historical_duplicate_column = historical_duplicate_column_id(column);
272                Ok::<_, StorageError>((
273                    historical_duplicate_column,
274                    historical_column_changes,
275                ))
276            })
277            .try_collect()?;
278
279        // Combine removed old changes, all modifications for
280        // the current height and historical changes.
281        StorageTransaction::transaction(
282            storage_transaction,
283            ConflictPolicy::Overwrite,
284            historical_changes,
285        )
286        .commit()?;
287        Ok(())
288    }
289
290    fn remove_v1_entries(&self) -> StorageResult<()> {
291        if !self.has_v1_history() {
292            return Ok(())
293        }
294
295        self.db
296            .clear_table(ModificationsHistoryV1::<Description>::column())?;
297        self.has_v1_history
298            .store(false, core::sync::atomic::Ordering::Release);
299
300        Ok(())
301    }
302
303    #[cfg(test)]
304    fn multiversion_changes_heights(
305        &self,
306        direction: IterDirection,
307        has_v1_history: bool,
308    ) -> (Option<StorageResult<u64>>, Option<StorageResult<u64>>) {
309        let v2_changes = self
310            .db
311            .iter_all_keys::<ModificationsHistoryV2<Description>>(Some(direction))
312            .next();
313        let v1_changes = has_v1_history
314            .then(|| {
315                self.db
316                    .iter_all_keys::<ModificationsHistoryV1<Description>>(Some(direction))
317                    .next()
318            })
319            .flatten();
320
321        (v2_changes, v1_changes)
322    }
323
324    #[cfg(test)]
325    fn rollback_last_block(&self) -> StorageResult<u64> {
326        let has_v1_history = self.has_v1_history();
327
328        let (v2_latest_height, v1_latest_height) =
329            self.multiversion_changes_heights(IterDirection::Reverse, has_v1_history);
330
331        let latest_height = match (v2_latest_height, v1_latest_height) {
332            (None, None) => Err(DatabaseError::ReachedEndOfHistory)?,
333            (Some(Ok(v1)), Some(Ok(v2))) => v1.max(v2),
334            (_, Some(v1_res)) => v1_res?,
335            (Some(v2_res), _) => v2_res?,
336        };
337
338        self.rollback_block_to(latest_height)?;
339
340        Ok(latest_height)
341    }
342
343    fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
344        let mut storage_transaction = self.db.read_transaction();
345
346        let last_changes = multiversion_take(
347            &mut storage_transaction,
348            height_to_rollback,
349            self.has_v1_history(),
350        )?
351        .ok_or(not_found!(ModificationsHistoryV2<Description>))?;
352
353        remove_historical_modifications(
354            &height_to_rollback,
355            &mut storage_transaction,
356            &last_changes,
357        )?;
358
359        StorageTransaction::transaction(
360            &mut storage_transaction,
361            ConflictPolicy::Overwrite,
362            last_changes,
363        )
364        .commit()?;
365
366        self.db
367            .commit_changes(&storage_transaction.into_changes().into())?;
368
369        Ok(())
370    }
371
372    fn has_v1_history(&self) -> bool {
373        use core::sync::atomic::Ordering;
374
375        self.has_v1_history.load(Ordering::Acquire)
376    }
377}
378
379fn multiversion_contains<Description, T>(
380    storage_transaction: &StorageTransaction<T>,
381    height: u64,
382    has_v1_history: bool,
383) -> StorageResult<bool>
384where
385    Description: DatabaseDescription,
386    T: KeyValueInspect<Column = Column<Description>>,
387{
388    let contains_v2 = storage_transaction
389        .storage_as_ref::<ModificationsHistoryV2<Description>>()
390        .contains_key(&height)?;
391
392    if !contains_v2 && has_v1_history {
393        let contains_v1 = storage_transaction
394            .storage_as_ref::<ModificationsHistoryV1<Description>>()
395            .contains_key(&height)?;
396        Ok(contains_v1)
397    } else {
398        Ok(contains_v2)
399    }
400}
401
402// Try to take the value from `ModificationsHistoryV2`, or return value from
403// `ModificationsHistoryV1`, if database still has v1 entries.
404fn multiversion_take<Description, T>(
405    storage_transaction: &mut StorageTransaction<T>,
406    height: u64,
407    has_v1_history: bool,
408) -> StorageResult<Option<Changes>>
409where
410    Description: DatabaseDescription,
411    T: KeyValueInspect<Column = Column<Description>>,
412{
413    let v2_last_changes = storage_transaction
414        .storage_as_mut::<ModificationsHistoryV2<Description>>()
415        .take(&height)?;
416
417    if v2_last_changes.is_none() && has_v1_history {
418        let v1_last_changes = storage_transaction
419            .storage_as_mut::<ModificationsHistoryV1<Description>>()
420            .take(&height)?;
421        Ok(v1_last_changes)
422    } else {
423        Ok(v2_last_changes)
424    }
425}
426
427fn cleanup_old_changes<Description, T>(
428    db: &HistoricalRocksDB<Description>,
429    height: &u64,
430    storage_transaction: &mut StorageTransaction<T>,
431    state_rewind_policy: &StateRewindPolicy,
432) -> StorageResult<()>
433where
434    Description: DatabaseDescription,
435    T: KeyValueInspect<Column = Column<Description>>,
436{
437    match state_rewind_policy {
438        StateRewindPolicy::NoRewind => {
439            // Do nothing since we do not store any history.
440        }
441        StateRewindPolicy::RewindFullRange => {
442            // Do nothing since we store all history.
443        }
444        StateRewindPolicy::RewindRange { size } => {
445            let old_height = height.saturating_sub(size.get());
446
447            let old_changes = storage_transaction
448                .storage_as_mut::<ModificationsHistoryV2<Description>>()
449                .take(&old_height)?;
450
451            if let Some(old_changes) = old_changes {
452                remove_historical_modifications(
453                    &old_height,
454                    storage_transaction,
455                    &old_changes,
456                )?;
457                // We found end of the V2 history, so we can remove V1 history entirely.
458                db.remove_v1_entries()?;
459            }
460        }
461    }
462    Ok(())
463}
464
465fn remove_historical_modifications<Description, T>(
466    old_height: &u64,
467    storage_transaction: &mut StorageTransaction<T>,
468    reverse_changes: &Changes,
469) -> StorageResult<()>
470where
471    Description: DatabaseDescription,
472    T: KeyValueInspect<Column = Column<Description>>,
473{
474    let changes = reverse_changes
475        .iter()
476        .map(|(column, column_changes)| {
477            let historical_column_changes = column_changes
478                .keys()
479                .map(|key| {
480                    let height_key = height_key(key, old_height).into();
481                    let operation = WriteOperation::Remove;
482                    (height_key, operation)
483                })
484                .collect();
485            let historical_duplicate_column = historical_duplicate_column_id(*column);
486            (historical_duplicate_column, historical_column_changes)
487        })
488        .collect();
489
490    StorageTransaction::transaction(
491        storage_transaction,
492        ConflictPolicy::Overwrite,
493        changes,
494    )
495    .commit()?;
496
497    Ok(())
498}
499
500impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
501where
502    Description: DatabaseDescription,
503{
504    type Column = Description::Column;
505
506    fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult<bool> {
507        self.db.exists(key, Column::OriginalColumn(column))
508    }
509
510    fn size_of_value(
511        &self,
512        key: &[u8],
513        column: Self::Column,
514    ) -> StorageResult<Option<usize>> {
515        self.db.size_of_value(key, Column::OriginalColumn(column))
516    }
517
518    fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
519        self.db.get(key, Column::OriginalColumn(column))
520    }
521
522    fn read_exact(
523        &self,
524        key: &[u8],
525        column: Self::Column,
526        offset: usize,
527        buf: &mut [u8],
528    ) -> StorageResult<Result<usize, StorageReadError>> {
529        self.db
530            .read_exact(key, Column::OriginalColumn(column), offset, buf)
531    }
532
533    fn read_zerofill(
534        &self,
535        key: &[u8],
536        column: Self::Column,
537        offset: usize,
538        buf: &mut [u8],
539    ) -> StorageResult<Result<usize, StorageReadError>> {
540        self.db
541            .read_zerofill(key, Column::OriginalColumn(column), offset, buf)
542    }
543}
544
545impl<Description> IterableStore for HistoricalRocksDB<Description>
546where
547    Description: DatabaseDescription,
548{
549    fn iter_store(
550        &self,
551        column: Self::Column,
552        prefix: Option<&[u8]>,
553        start: Option<&[u8]>,
554        direction: IterDirection,
555    ) -> BoxedIter<'_, KVItem> {
556        self.db
557            .iter_store(Column::OriginalColumn(column), prefix, start, direction)
558    }
559
560    fn iter_store_keys(
561        &self,
562        column: Self::Column,
563        prefix: Option<&[u8]>,
564        start: Option<&[u8]>,
565        direction: IterDirection,
566    ) -> BoxedIter<'_, fuel_core_storage::kv_store::KeyItem> {
567        self.db
568            .iter_store_keys(Column::OriginalColumn(column), prefix, start, direction)
569    }
570}
571
572impl<Description> TransactableStorage<Description::Height>
573    for HistoricalRocksDB<Description>
574where
575    Description: DatabaseDescription,
576{
577    fn commit_changes(
578        &self,
579        height: Option<Description::Height>,
580        mut changes: StorageChanges,
581    ) -> StorageResult<()> {
582        // When the history need to be process we need to have all the changes in one
583        // transaction to be able to write their reverse changes.
584        if let Some(height) = height
585            && self.state_rewind_policy != StateRewindPolicy::NoRewind
586        {
587            let all_changes = match changes {
588                StorageChanges::Changes(changes) => changes,
589                StorageChanges::ChangesList(list) => list.into_iter().flatten().collect(),
590            };
591            let mut storage_transaction = StorageTransaction::transaction(
592                &self.db,
593                ConflictPolicy::Overwrite,
594                all_changes,
595            );
596            self.store_modifications_history(&mut storage_transaction, &height)?;
597            changes = StorageChanges::Changes(storage_transaction.into_changes());
598        }
599
600        self.db.commit_changes(&changes)?;
601
602        Ok(())
603    }
604
605    fn view_at_height(
606        &self,
607        height: &Description::Height,
608    ) -> StorageResult<KeyValueView<ColumnType<Description>, HeightType<Description>>>
609    {
610        let view = self.create_view_at(height)?;
611        Ok(KeyValueView::from_storage_and_metadata(
612            KeyValueViewWrapper::new(view),
613            Some(*height),
614        ))
615    }
616
617    fn latest_view(
618        &self,
619    ) -> StorageResult<
620        IterableKeyValueView<ColumnType<Description>, HeightType<Description>>,
621    > {
622        let view = self.latest_view();
623        Ok(IterableKeyValueView::from_storage_and_metadata(
624            IterableKeyValueViewWrapper::new(view),
625            None,
626        ))
627    }
628
629    fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> {
630        self.rollback_block_to(height.as_u64())
631    }
632
633    fn shutdown(&self) {
634        self.db.shutdown()
635    }
636}
637
638pub fn height_key(key: &[u8], height: &u64) -> Vec<u8> {
639    let mut bytes = Vec::with_capacity(key.len().saturating_add(8));
640    let height_bytes = height.to_be_bytes();
641    bytes.extend_from_slice(key);
642    bytes.extend_from_slice(&height_bytes);
643    bytes
644}
645
646pub fn serialize<T>(t: &T) -> StorageResult<Value>
647where
648    T: Serialize + ?Sized,
649{
650    Ok(postcard::to_allocvec(&t)
651        .map_err(|err| StorageError::Codec(err.into()))?
652        .into())
653}
654
655pub fn deserialize<'a, T>(bytes: &'a [u8]) -> StorageResult<T>
656where
657    T: Deserialize<'a>,
658{
659    postcard::from_bytes(bytes).map_err(|err| StorageError::Codec(err.into()))
660}
661
662#[cfg(test)]
663#[allow(non_snake_case)]
664#[allow(clippy::cast_possible_truncation)]
665mod tests {
666    use super::*;
667    use crate::database::database_description::on_chain::OnChain;
668    use fuel_core_storage::{
669        ContractsAssetKey,
670        StorageAsMut,
671        StorageAsRef,
672        tables::ContractsAssets,
673        transactional::{
674            IntoTransaction,
675            ReadTransaction,
676        },
677    };
678
679    #[test]
680    fn test_height_key() {
681        let key = b"key";
682        let height = 42;
683        let expected = b"key\x00\x00\x00\x00\x00\x00\x00\x2a";
684        assert_eq!(height_key(key, &height), expected);
685    }
686
687    fn key() -> ContractsAssetKey {
688        ContractsAssetKey::new(&[123; 32].into(), &[213; 32].into())
689    }
690
691    #[test]
692    fn historical_rocksdb_read_original_database_works() {
693        // Given
694        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
695        let historical_rocks_db =
696            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
697
698        // Set the value at height 1 to be 123.
699        let mut transaction = historical_rocks_db.read_transaction();
700        transaction
701            .storage_as_mut::<ContractsAssets>()
702            .insert(&key(), &123)
703            .unwrap();
704        historical_rocks_db
705            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
706            .unwrap();
707
708        // Set the value at height 2 to be 321.
709        let mut transaction = historical_rocks_db.read_transaction();
710        transaction
711            .storage_as_mut::<ContractsAssets>()
712            .insert(&key(), &321)
713            .unwrap();
714        historical_rocks_db
715            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
716            .unwrap();
717
718        // When
719        let read_view = historical_rocks_db.read_transaction();
720        let latest_balance = read_view
721            .storage_as_ref::<ContractsAssets>()
722            .get(&key())
723            .unwrap()
724            .unwrap()
725            .into_owned();
726
727        // Then
728        assert_eq!(latest_balance, 321);
729    }
730
731    #[test]
732    fn historical_rocksdb_read_latest_view_works() {
733        // Given
734        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
735        let historical_rocks_db =
736            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
737
738        // Set the value at height 1 to be 123.
739        let mut transaction = historical_rocks_db.read_transaction();
740        transaction
741            .storage_as_mut::<ContractsAssets>()
742            .insert(&key(), &123)
743            .unwrap();
744        historical_rocks_db
745            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
746            .unwrap();
747
748        // Set the value at height 2 to be 321.
749        let mut transaction = historical_rocks_db.read_transaction();
750        transaction
751            .storage_as_mut::<ContractsAssets>()
752            .insert(&key(), &321)
753            .unwrap();
754        historical_rocks_db
755            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
756            .unwrap();
757
758        // When
759        let latest_view = historical_rocks_db.latest_view().into_transaction();
760        let latest_balance = latest_view
761            .storage_as_ref::<ContractsAssets>()
762            .get(&key())
763            .unwrap()
764            .unwrap()
765            .into_owned();
766
767        // Then
768        assert_eq!(latest_balance, 321);
769    }
770
771    #[test]
772    fn state_rewind_policy__no_rewind__create_view_at__fails() {
773        // Given
774        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
775        let historical_rocks_db =
776            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
777
778        let mut transaction = historical_rocks_db.read_transaction();
779        transaction
780            .storage_as_mut::<ContractsAssets>()
781            .insert(&key(), &123)
782            .unwrap();
783        historical_rocks_db
784            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
785            .unwrap();
786
787        // When
788        let view_at_height_1 =
789            historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
790
791        // Then
792        assert_eq!(
793            view_at_height_1,
794            Err(DatabaseError::NoHistoryForRequestedHeight {
795                requested_height: 1,
796            }
797            .into())
798        );
799    }
800
801    #[test]
802    fn state_rewind_policy__no_rewind__rollback__fails() {
803        // Given
804        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
805        let historical_rocks_db =
806            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
807
808        let mut transaction = historical_rocks_db.read_transaction();
809        transaction
810            .storage_as_mut::<ContractsAssets>()
811            .insert(&key(), &123)
812            .unwrap();
813        historical_rocks_db
814            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
815            .unwrap();
816
817        // When
818        let result = historical_rocks_db.rollback_last_block();
819
820        // Then
821        assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
822    }
823
824    #[test]
825    fn state_rewind_policy__rewind_range_1__cleanup_in_range_works() {
826        // Given
827        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
828        let historical_rocks_db = HistoricalRocksDB::new(
829            rocks_db,
830            StateRewindPolicy::RewindRange {
831                size: NonZeroU64::new(1).unwrap(),
832            },
833        )
834        .unwrap();
835
836        let mut transaction = historical_rocks_db.read_transaction();
837        transaction
838            .storage_as_mut::<ContractsAssets>()
839            .insert(&key(), &123)
840            .unwrap();
841        historical_rocks_db
842            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
843            .unwrap();
844
845        // When
846        let mut transaction = historical_rocks_db.read_transaction();
847        transaction
848            .storage_as_mut::<ContractsAssets>()
849            .insert(&key(), &321)
850            .unwrap();
851        historical_rocks_db
852            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
853            .unwrap();
854
855        // Then
856        let view_at_height_1 =
857            historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
858        let view_at_height_0 =
859            historical_rocks_db.create_view_at(&0u32.into()).map(|_| ());
860        assert_eq!(view_at_height_1, Ok(()));
861        assert_eq!(
862            view_at_height_0,
863            Err(DatabaseError::NoHistoryForRequestedHeight {
864                requested_height: 0,
865            }
866            .into())
867        );
868    }
869
870    #[test]
871    fn state_rewind_policy__rewind_range_1__rollback_works() {
872        // Given
873        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
874        let historical_rocks_db = HistoricalRocksDB::new(
875            rocks_db,
876            StateRewindPolicy::RewindRange {
877                size: NonZeroU64::new(1).unwrap(),
878            },
879        )
880        .unwrap();
881
882        let mut transaction = historical_rocks_db.read_transaction();
883        transaction
884            .storage_as_mut::<ContractsAssets>()
885            .insert(&key(), &123)
886            .unwrap();
887        historical_rocks_db
888            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
889            .unwrap();
890        let entries = historical_rocks_db
891            .db
892            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
893            .collect::<Vec<_>>();
894        assert_eq!(entries.len(), 1);
895
896        // When
897        let result = historical_rocks_db.rollback_last_block();
898
899        // Then
900        assert_eq!(result, Ok(1));
901        let entries = historical_rocks_db
902            .db
903            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
904            .collect::<Vec<_>>();
905        assert_eq!(entries.len(), 0);
906    }
907
908    #[test]
909    fn state_rewind_policy__rewind_range_1__rollback_uses_v2() {
910        // Given
911        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
912        let historical_rocks_db = HistoricalRocksDB::new(
913            rocks_db,
914            StateRewindPolicy::RewindRange {
915                size: NonZeroU64::new(1).unwrap(),
916            },
917        )
918        .unwrap();
919
920        // When
921        let mut transaction = historical_rocks_db.read_transaction();
922        transaction
923            .storage_as_mut::<ContractsAssets>()
924            .insert(&key(), &123)
925            .unwrap();
926        historical_rocks_db
927            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
928            .unwrap();
929        let v2_entries = historical_rocks_db
930            .db
931            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
932            .collect::<Vec<_>>();
933        let v1_entries = historical_rocks_db
934            .db
935            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
936            .collect::<Vec<_>>();
937
938        // Then
939        assert_eq!(v2_entries.len(), 1);
940        assert_eq!(v1_entries.len(), 0);
941    }
942
943    #[test]
944    fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() {
945        // Given
946        let temp_dir = tempfile::tempdir().unwrap();
947        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
948            &temp_dir,
949            DatabaseConfig::config_for_tests(),
950        )
951        .unwrap();
952        let historical_rocks_db = HistoricalRocksDB::new(
953            rocks_db,
954            StateRewindPolicy::RewindRange {
955                size: NonZeroU64::new(1).unwrap(),
956            },
957        )
958        .unwrap();
959
960        // When
961        let mut transaction = historical_rocks_db.read_transaction();
962        transaction
963            .storage_as_mut::<ContractsAssets>()
964            .insert(&key(), &123)
965            .unwrap();
966        historical_rocks_db
967            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
968            .unwrap();
969
970        // Migrate the changes from V2 to V1.
971
972        let mut migration_transaction = StorageTransaction::transaction(
973            &historical_rocks_db.db,
974            ConflictPolicy::Overwrite,
975            Changes::default(),
976        );
977
978        let v2_changes = migration_transaction
979            .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
980            .take(&1u64)
981            .unwrap()
982            .unwrap();
983        migration_transaction
984            .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
985            .insert(&1u64, &v2_changes)
986            .unwrap();
987
988        historical_rocks_db
989            .db
990            .commit_changes(&migration_transaction.into_changes().into())
991            .unwrap();
992
993        // Check that the history has indeed been written to V1
994        let v2_entries = historical_rocks_db
995            .db
996            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
997            .collect::<Vec<_>>();
998        let v1_entries = historical_rocks_db
999            .db
1000            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1001            .collect::<Vec<_>>();
1002
1003        assert_eq!(v2_entries.len(), 0);
1004        assert_eq!(v1_entries.len(), 1);
1005
1006        drop(historical_rocks_db);
1007
1008        // Open the database again with fetched V1 entries status.
1009        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1010            &temp_dir,
1011            DatabaseConfig::config_for_tests(),
1012        )
1013        .unwrap();
1014        let historical_rocks_db = HistoricalRocksDB::new(
1015            rocks_db,
1016            StateRewindPolicy::RewindRange {
1017                size: NonZeroU64::new(1).unwrap(),
1018            },
1019        )
1020        .unwrap();
1021        let result = historical_rocks_db.rollback_last_block();
1022
1023        // Then
1024        assert_eq!(result, Ok(1));
1025        let v2_entries = historical_rocks_db
1026            .db
1027            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1028            .collect::<Vec<_>>();
1029        let v1_entries = historical_rocks_db
1030            .db
1031            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1032            .collect::<Vec<_>>();
1033        assert_eq!(v2_entries.len(), 0);
1034        assert_eq!(v1_entries.len(), 0);
1035    }
1036
1037    #[test]
1038    fn state_rewind_policy__rewind_range_1__migration_removes_v1() {
1039        let temp_dir = tempfile::tempdir().unwrap();
1040        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1041            &temp_dir,
1042            DatabaseConfig::config_for_tests(),
1043        )
1044        .unwrap();
1045        let historical_rocks_db =
1046            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1047
1048        const BLOCKS: u8 = 10;
1049
1050        // Given
1051        // Create 10 blocks with some changes.
1052        // And migrate all changes to V1 history.
1053        for i in 1..=BLOCKS {
1054            let height = i as u32;
1055            let height_64 = i as u64;
1056            let mut transaction = historical_rocks_db.read_transaction();
1057            let key = ContractsAssetKey::new(&[i; 32].into(), &[213; 32].into());
1058            transaction
1059                .storage_as_mut::<ContractsAssets>()
1060                .insert(&key, &123)
1061                .unwrap();
1062            historical_rocks_db
1063                .commit_changes(Some(height.into()), transaction.into_changes().into())
1064                .unwrap();
1065
1066            // Migrate the changes from V2 to V1.
1067            let mut migration_transaction = StorageTransaction::transaction(
1068                &historical_rocks_db.db,
1069                ConflictPolicy::Overwrite,
1070                Changes::default(),
1071            );
1072
1073            let v2_changes = migration_transaction
1074                .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
1075                .take(&height_64)
1076                .unwrap()
1077                .unwrap();
1078            migration_transaction
1079                .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
1080                .insert(&height_64, &v2_changes)
1081                .unwrap();
1082
1083            historical_rocks_db
1084                .db
1085                .commit_changes(&migration_transaction.into_changes().into())
1086                .unwrap();
1087
1088            // Check that the history has indeed been written to V1
1089            let v2_entries = historical_rocks_db
1090                .db
1091                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1092                .collect::<Vec<_>>();
1093            let v1_entries = historical_rocks_db
1094                .db
1095                .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1096                .collect::<Vec<_>>();
1097
1098            assert_eq!(v2_entries.len(), 0);
1099            assert_eq!(v1_entries.len(), i as usize);
1100        }
1101
1102        drop(historical_rocks_db);
1103
1104        // When
1105        // Open the database again, but with the rewind range of 1.
1106        // Committing 2 new blocks, should add new entries to V2 history.
1107        // And because of the rewind range of 1, the V1 history should be removed.
1108        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1109            &temp_dir,
1110            DatabaseConfig::config_for_tests(),
1111        )
1112        .unwrap();
1113        let historical_rocks_db = HistoricalRocksDB::new(
1114            rocks_db,
1115            StateRewindPolicy::RewindRange {
1116                size: NonZeroU64::new(1).unwrap(),
1117            },
1118        )
1119        .unwrap();
1120        historical_rocks_db
1121            .commit_changes(Some((BLOCKS as u32 + 1).into()), StorageChanges::default())
1122            .unwrap();
1123        historical_rocks_db
1124            .commit_changes(Some((BLOCKS as u32 + 2).into()), StorageChanges::default())
1125            .unwrap();
1126
1127        // Then
1128        let v2_entries = historical_rocks_db
1129            .db
1130            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1131            .collect::<Vec<_>>();
1132        let v1_entries = historical_rocks_db
1133            .db
1134            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1135            .collect::<Vec<_>>();
1136        assert_eq!(v2_entries.len(), 1);
1137        assert_eq!(v1_entries.len(), 0);
1138    }
1139
1140    #[test]
1141    fn rollback_last_block_works_with_v2() {
1142        // Given
1143        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1144
1145        let historical_rocks_db =
1146            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1147
1148        // When
1149        // Commit 1000 blocks
1150        for i in 1..=1000u32 {
1151            let mut transaction = historical_rocks_db.read_transaction();
1152            transaction
1153                .storage_as_mut::<ContractsAssets>()
1154                .insert(&key(), &(123 + i as u64))
1155                .unwrap();
1156            historical_rocks_db
1157                .commit_changes(Some(i.into()), transaction.into_changes().into())
1158                .unwrap();
1159        }
1160        // We can now rollback the last block 1000 times.
1161        let results: Vec<Result<u64, _>> = (0..1000u32)
1162            .map(|_| historical_rocks_db.rollback_last_block())
1163            .collect();
1164
1165        // Then
1166        // If the rollback fails at some point, then we have unintentionally rollbacked to
1167        // a block that was not the last.
1168        for (i, result) in results.iter().enumerate() {
1169            assert_eq!(result, &Ok(1000 - i as u64));
1170        }
1171    }
1172
1173    #[test]
1174    fn state_rewind_policy__rewind_range_1__second_rollback_fails() {
1175        // Given
1176        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1177        let historical_rocks_db = HistoricalRocksDB::new(
1178            rocks_db,
1179            StateRewindPolicy::RewindRange {
1180                size: NonZeroU64::new(1).unwrap(),
1181            },
1182        )
1183        .unwrap();
1184
1185        let mut transaction = historical_rocks_db.read_transaction();
1186        transaction
1187            .storage_as_mut::<ContractsAssets>()
1188            .insert(&key(), &123)
1189            .unwrap();
1190        historical_rocks_db
1191            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
1192            .unwrap();
1193        historical_rocks_db.rollback_last_block().unwrap();
1194
1195        // When
1196        let result = historical_rocks_db.rollback_last_block();
1197
1198        // Then
1199        assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
1200    }
1201
1202    #[test]
1203    fn state_rewind_policy__rewind_range_10__rollbacks_work() {
1204        const ITERATIONS: usize = 100;
1205
1206        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1207        let historical_rocks_db = HistoricalRocksDB::new(
1208            rocks_db,
1209            StateRewindPolicy::RewindRange {
1210                size: NonZeroU64::new(ITERATIONS as u64).unwrap(),
1211            },
1212        )
1213        .unwrap();
1214
1215        fn key(height: u32) -> ContractsAssetKey {
1216            ContractsAssetKey::new(&[height as u8; 32].into(), &[213; 32].into())
1217        }
1218
1219        for height in 1..=ITERATIONS {
1220            let height = height as u32;
1221            let key = key(height);
1222
1223            let mut transaction = historical_rocks_db.read_transaction();
1224            transaction
1225                .storage_as_mut::<ContractsAssets>()
1226                .insert(&key, &123)
1227                .unwrap();
1228            historical_rocks_db
1229                .commit_changes(Some(height.into()), transaction.into_changes().into())
1230                .unwrap();
1231        }
1232
1233        for height in (1..=ITERATIONS).rev() {
1234            // Given
1235            let entries = historical_rocks_db
1236                .db
1237                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1238                .collect::<Vec<_>>();
1239            assert_eq!(entries.len(), height);
1240
1241            // When
1242            let result = historical_rocks_db.rollback_last_block();
1243
1244            // Then
1245            assert_eq!(result, Ok(height as u64));
1246            let entries = historical_rocks_db
1247                .db
1248                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1249                .collect::<Vec<_>>();
1250            assert_eq!(entries.len(), height - 1);
1251        }
1252    }
1253}