fuel_core/state/
historical_rocksdb.rs

1use crate::{
2    database::{
3        Error as DatabaseError,
4        Result as DatabaseResult,
5        database_description::{
6            DatabaseDescription,
7            DatabaseHeight,
8        },
9    },
10    state::{
11        ColumnType,
12        HeightType,
13        IterableKeyValueView,
14        KeyValueView,
15        TransactableStorage,
16        historical_rocksdb::{
17            description::{
18                Column,
19                Historical,
20                historical_duplicate_column_id,
21            },
22            view_at_height::ViewAtHeight,
23        },
24        iterable_key_value_view::IterableKeyValueViewWrapper,
25        key_value_view::KeyValueViewWrapper,
26        rocks_db::RocksDb,
27    },
28};
29use fuel_core_storage::{
30    Error as StorageError,
31    Result as StorageResult,
32    StorageAsMut,
33    StorageAsRef,
34    iter::{
35        BoxedIter,
36        IterDirection,
37        IterableStore,
38        IteratorOverTable,
39    },
40    kv_store::{
41        KVItem,
42        KeyValueInspect,
43        Value,
44        WriteOperation,
45    },
46    not_found,
47    structured_storage::TableWithBlueprint,
48    transactional::{
49        Changes,
50        ConflictPolicy,
51        ReadTransaction,
52        StorageChanges,
53        StorageTransaction,
54    },
55};
56use itertools::Itertools;
57use modifications_history::{
58    ModificationsHistoryV1,
59    ModificationsHistoryV2,
60};
61use serde::{
62    Deserialize,
63    Serialize,
64};
65use std::{
66    num::NonZeroU64,
67    path::Path,
68};
69
70use super::rocks_db::DatabaseConfig;
71
72pub mod description;
73pub mod modifications_history;
74pub mod view_at_height;
75
76#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)]
77/// Defined policies for the state rewind behaviour of the database.
78pub enum StateRewindPolicy {
79    #[default]
80    /// The checkpoint will be created only for the latest height.
81    NoRewind,
82    /// The checkpoint will be created for each height.
83    RewindFullRange,
84    /// The checkpoint will be created for each height
85    /// in the range `[latest_height-size..latest_height]`.
86    RewindRange { size: NonZeroU64 },
87}
88
89/// Implementation of a database
90#[derive(Debug)]
91pub struct HistoricalRocksDB<Description> {
92    /// The [`StateRewindPolicy`] used by the historical rocksdb
93    state_rewind_policy: StateRewindPolicy,
94    /// The Description of the database.
95    db: RocksDb<Historical<Description>>,
96    /// Flag indicating if the database has a history of changes stored in `ModificationsHistoryV1`.
97    has_v1_history: core::sync::atomic::AtomicBool,
98}
99
100impl<Description> HistoricalRocksDB<Description>
101where
102    Description: DatabaseDescription,
103{
104    pub fn new(
105        db: RocksDb<Historical<Description>>,
106        state_rewind_policy: StateRewindPolicy,
107    ) -> DatabaseResult<Self> {
108        let has_v1_history = db
109            .iter_all::<ModificationsHistoryV1<Description>>(None)
110            .next()
111            .is_some();
112
113        Ok(Self {
114            state_rewind_policy,
115            db,
116            has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
117        })
118    }
119
120    pub fn default_open<P: AsRef<Path>>(
121        path: P,
122        state_rewind_policy: StateRewindPolicy,
123        database_config: DatabaseConfig,
124    ) -> DatabaseResult<Self> {
125        let db = RocksDb::<Historical<Description>>::default_open(path, database_config)?;
126        let has_v1_history = db
127            .iter_all::<ModificationsHistoryV1<Description>>(None)
128            .next()
129            .is_some();
130        Ok(Self {
131            state_rewind_policy,
132            db,
133            has_v1_history: core::sync::atomic::AtomicBool::new(has_v1_history),
134        })
135    }
136
137    fn reverse_history_changes(&self, changes: &Changes) -> StorageResult<Changes> {
138        let mut reverse_changes = Changes::default();
139
140        for (column, column_changes) in changes {
141            let results = self.db.multi_get(*column, column_changes.keys())?;
142
143            let entry = reverse_changes
144                .entry(*column)
145                .or_insert_with(Default::default);
146
147            for (was, (key, became)) in results.into_iter().zip(column_changes.iter()) {
148                match (was, became) {
149                    (None, WriteOperation::Remove) => {
150                        // Do nothing since it was not existing, and it was removed.
151                    }
152                    (None, WriteOperation::Insert(_)) => {
153                        entry.insert(key.clone(), WriteOperation::Remove);
154                    }
155                    (Some(old_value), WriteOperation::Remove) => {
156                        entry.insert(
157                            key.clone(),
158                            WriteOperation::Insert(old_value.into()),
159                        );
160                    }
161                    (Some(old_value), WriteOperation::Insert(new_value)) => {
162                        if *old_value != **new_value {
163                            entry.insert(
164                                key.clone(),
165                                WriteOperation::Insert(old_value.into()),
166                            );
167                        }
168                    }
169                }
170            }
171        }
172        Ok(reverse_changes)
173    }
174
175    pub fn latest_view(&self) -> RocksDb<Description> {
176        self.db.create_snapshot_generic()
177    }
178
179    /// Create a view at a specific height.
180    /// This function relies on the fact that all modifications are sequential
181    /// and monotonically grow the height without gaps.
182    pub fn create_view_at(
183        &self,
184        height: &Description::Height,
185    ) -> StorageResult<ViewAtHeight<Description>> {
186        // Each height stores reverse modification caused by the corresponding
187        // block at the same height. Applying reverse changes at height `X`
188        // gives us a state at height `X - 1`. If we want a state at height `X`,
189        // we need to apply all modifications up to `X + 1`.
190        let height_for_the_state = height.as_u64();
191        let rollback_height = height_for_the_state.saturating_add(1);
192        let has_v1_history = self.has_v1_history();
193        let tx = self.db.read_transaction();
194        let mut contains = multiversion_contains(&tx, rollback_height, has_v1_history)?;
195
196        // If we don't have any modifications on the height after us,
197        // maybe we are at the end of the history.
198        // We can check it by checking our height.
199        // If modifications exist, then we are the last element of the history
200        // that contains an actual state.
201        if !contains {
202            contains = multiversion_contains(&tx, height_for_the_state, has_v1_history)?;
203        }
204
205        if !contains {
206            return Err(DatabaseError::NoHistoryForRequestedHeight {
207                requested_height: height_for_the_state,
208            }
209            .into());
210        }
211        let latest_view = self.db.create_snapshot_generic::<Historical<Description>>();
212
213        Ok(ViewAtHeight::new(rollback_height, latest_view))
214    }
215
216    fn store_modifications_history<T>(
217        &self,
218        storage_transaction: &mut StorageTransaction<T>,
219        height: &Description::Height,
220    ) -> StorageResult<()>
221    where
222        T: KeyValueInspect<Column = Column<Description>>,
223    {
224        if self.state_rewind_policy == StateRewindPolicy::NoRewind {
225            return Ok(());
226        }
227        let height_u64 = height.as_u64();
228
229        let reverse_changes =
230            self.reverse_history_changes(storage_transaction.changes())?;
231
232        cleanup_old_changes(
233            self,
234            &height_u64,
235            storage_transaction,
236            &self.state_rewind_policy,
237        )?;
238
239        let old_changes = storage_transaction
240            .storage_as_mut::<ModificationsHistoryV2<Description>>()
241            .replace(&height_u64, &reverse_changes)?;
242
243        if let Some(old_changes) = old_changes {
244            tracing::warn!(
245                "Historical database committed twice the same height: {:?}",
246                height
247            );
248            remove_historical_modifications(
249                &height_u64,
250                storage_transaction,
251                &old_changes,
252            )?;
253        }
254
255        let historical_changes = reverse_changes
256            .into_iter()
257            .map(|(column, reverse_column_changes)| {
258                let historical_column_changes = reverse_column_changes
259                    .into_iter()
260                    .map(|(key, reverse_operation)| {
261                        let height_key = height_key(&key, &height_u64).into();
262                        // We want to store the operation that we want
263                        // to apply during rollback as a value.
264                        let operation =
265                            WriteOperation::Insert(serialize(&reverse_operation)?);
266                        Ok::<_, StorageError>((height_key, operation))
267                    })
268                    .try_collect()?;
269
270                let historical_duplicate_column = historical_duplicate_column_id(column);
271                Ok::<_, StorageError>((
272                    historical_duplicate_column,
273                    historical_column_changes,
274                ))
275            })
276            .try_collect()?;
277
278        // Combine removed old changes, all modifications for
279        // the current height and historical changes.
280        StorageTransaction::transaction(
281            storage_transaction,
282            ConflictPolicy::Overwrite,
283            historical_changes,
284        )
285        .commit()?;
286        Ok(())
287    }
288
289    fn remove_v1_entries(&self) -> StorageResult<()> {
290        if !self.has_v1_history() {
291            return Ok(())
292        }
293
294        self.db
295            .clear_table(ModificationsHistoryV1::<Description>::column())?;
296        self.has_v1_history
297            .store(false, core::sync::atomic::Ordering::Release);
298
299        Ok(())
300    }
301
302    #[cfg(test)]
303    fn multiversion_changes_heights(
304        &self,
305        direction: IterDirection,
306        has_v1_history: bool,
307    ) -> (Option<StorageResult<u64>>, Option<StorageResult<u64>>) {
308        let v2_changes = self
309            .db
310            .iter_all_keys::<ModificationsHistoryV2<Description>>(Some(direction))
311            .next();
312        let v1_changes = has_v1_history
313            .then(|| {
314                self.db
315                    .iter_all_keys::<ModificationsHistoryV1<Description>>(Some(direction))
316                    .next()
317            })
318            .flatten();
319
320        (v2_changes, v1_changes)
321    }
322
323    #[cfg(test)]
324    fn rollback_last_block(&self) -> StorageResult<u64> {
325        let has_v1_history = self.has_v1_history();
326
327        let (v2_latest_height, v1_latest_height) =
328            self.multiversion_changes_heights(IterDirection::Reverse, has_v1_history);
329
330        let latest_height = match (v2_latest_height, v1_latest_height) {
331            (None, None) => Err(DatabaseError::ReachedEndOfHistory)?,
332            (Some(Ok(v1)), Some(Ok(v2))) => v1.max(v2),
333            (_, Some(v1_res)) => v1_res?,
334            (Some(v2_res), _) => v2_res?,
335        };
336
337        self.rollback_block_to(latest_height)?;
338
339        Ok(latest_height)
340    }
341
342    fn rollback_block_to(&self, height_to_rollback: u64) -> StorageResult<()> {
343        let mut storage_transaction = self.db.read_transaction();
344
345        let last_changes = multiversion_take(
346            &mut storage_transaction,
347            height_to_rollback,
348            self.has_v1_history(),
349        )?
350        .ok_or(not_found!(ModificationsHistoryV2<Description>))?;
351
352        remove_historical_modifications(
353            &height_to_rollback,
354            &mut storage_transaction,
355            &last_changes,
356        )?;
357
358        StorageTransaction::transaction(
359            &mut storage_transaction,
360            ConflictPolicy::Overwrite,
361            last_changes,
362        )
363        .commit()?;
364
365        self.db
366            .commit_changes(&storage_transaction.into_changes().into())?;
367
368        Ok(())
369    }
370
371    fn has_v1_history(&self) -> bool {
372        use core::sync::atomic::Ordering;
373
374        self.has_v1_history.load(Ordering::Acquire)
375    }
376}
377
378fn multiversion_contains<Description, T>(
379    storage_transaction: &StorageTransaction<T>,
380    height: u64,
381    has_v1_history: bool,
382) -> StorageResult<bool>
383where
384    Description: DatabaseDescription,
385    T: KeyValueInspect<Column = Column<Description>>,
386{
387    let contains_v2 = storage_transaction
388        .storage_as_ref::<ModificationsHistoryV2<Description>>()
389        .contains_key(&height)?;
390
391    if !contains_v2 && has_v1_history {
392        let contains_v1 = storage_transaction
393            .storage_as_ref::<ModificationsHistoryV1<Description>>()
394            .contains_key(&height)?;
395        Ok(contains_v1)
396    } else {
397        Ok(contains_v2)
398    }
399}
400
401// Try to take the value from `ModificationsHistoryV2`, or return value from
402// `ModificationsHistoryV1`, if database still has v1 entries.
403fn multiversion_take<Description, T>(
404    storage_transaction: &mut StorageTransaction<T>,
405    height: u64,
406    has_v1_history: bool,
407) -> StorageResult<Option<Changes>>
408where
409    Description: DatabaseDescription,
410    T: KeyValueInspect<Column = Column<Description>>,
411{
412    let v2_last_changes = storage_transaction
413        .storage_as_mut::<ModificationsHistoryV2<Description>>()
414        .take(&height)?;
415
416    if v2_last_changes.is_none() && has_v1_history {
417        let v1_last_changes = storage_transaction
418            .storage_as_mut::<ModificationsHistoryV1<Description>>()
419            .take(&height)?;
420        Ok(v1_last_changes)
421    } else {
422        Ok(v2_last_changes)
423    }
424}
425
426fn cleanup_old_changes<Description, T>(
427    db: &HistoricalRocksDB<Description>,
428    height: &u64,
429    storage_transaction: &mut StorageTransaction<T>,
430    state_rewind_policy: &StateRewindPolicy,
431) -> StorageResult<()>
432where
433    Description: DatabaseDescription,
434    T: KeyValueInspect<Column = Column<Description>>,
435{
436    match state_rewind_policy {
437        StateRewindPolicy::NoRewind => {
438            // Do nothing since we do not store any history.
439        }
440        StateRewindPolicy::RewindFullRange => {
441            // Do nothing since we store all history.
442        }
443        StateRewindPolicy::RewindRange { size } => {
444            let old_height = height.saturating_sub(size.get());
445
446            let old_changes = storage_transaction
447                .storage_as_mut::<ModificationsHistoryV2<Description>>()
448                .take(&old_height)?;
449
450            if let Some(old_changes) = old_changes {
451                remove_historical_modifications(
452                    &old_height,
453                    storage_transaction,
454                    &old_changes,
455                )?;
456                // We found end of the V2 history, so we can remove V1 history entirely.
457                db.remove_v1_entries()?;
458            }
459        }
460    }
461    Ok(())
462}
463
464fn remove_historical_modifications<Description, T>(
465    old_height: &u64,
466    storage_transaction: &mut StorageTransaction<T>,
467    reverse_changes: &Changes,
468) -> StorageResult<()>
469where
470    Description: DatabaseDescription,
471    T: KeyValueInspect<Column = Column<Description>>,
472{
473    let changes = reverse_changes
474        .iter()
475        .map(|(column, column_changes)| {
476            let historical_column_changes = column_changes
477                .keys()
478                .map(|key| {
479                    let height_key = height_key(key, old_height).into();
480                    let operation = WriteOperation::Remove;
481                    (height_key, operation)
482                })
483                .collect();
484            let historical_duplicate_column = historical_duplicate_column_id(*column);
485            (historical_duplicate_column, historical_column_changes)
486        })
487        .collect();
488
489    StorageTransaction::transaction(
490        storage_transaction,
491        ConflictPolicy::Overwrite,
492        changes,
493    )
494    .commit()?;
495
496    Ok(())
497}
498
499impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
500where
501    Description: DatabaseDescription,
502{
503    type Column = Description::Column;
504
505    fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult<bool> {
506        self.db.exists(key, Column::OriginalColumn(column))
507    }
508
509    fn size_of_value(
510        &self,
511        key: &[u8],
512        column: Self::Column,
513    ) -> StorageResult<Option<usize>> {
514        self.db.size_of_value(key, Column::OriginalColumn(column))
515    }
516
517    fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
518        self.db.get(key, Column::OriginalColumn(column))
519    }
520
521    fn read(
522        &self,
523        key: &[u8],
524        column: Self::Column,
525        offset: usize,
526        buf: &mut [u8],
527    ) -> StorageResult<bool> {
528        self.db
529            .read(key, Column::OriginalColumn(column), offset, buf)
530    }
531}
532
533impl<Description> IterableStore for HistoricalRocksDB<Description>
534where
535    Description: DatabaseDescription,
536{
537    fn iter_store(
538        &self,
539        column: Self::Column,
540        prefix: Option<&[u8]>,
541        start: Option<&[u8]>,
542        direction: IterDirection,
543    ) -> BoxedIter<'_, KVItem> {
544        self.db
545            .iter_store(Column::OriginalColumn(column), prefix, start, direction)
546    }
547
548    fn iter_store_keys(
549        &self,
550        column: Self::Column,
551        prefix: Option<&[u8]>,
552        start: Option<&[u8]>,
553        direction: IterDirection,
554    ) -> BoxedIter<'_, fuel_core_storage::kv_store::KeyItem> {
555        self.db
556            .iter_store_keys(Column::OriginalColumn(column), prefix, start, direction)
557    }
558}
559
560impl<Description> TransactableStorage<Description::Height>
561    for HistoricalRocksDB<Description>
562where
563    Description: DatabaseDescription,
564{
565    fn commit_changes(
566        &self,
567        height: Option<Description::Height>,
568        mut changes: StorageChanges,
569    ) -> StorageResult<()> {
570        // When the history need to be process we need to have all the changes in one
571        // transaction to be able to write their reverse changes.
572        if let Some(height) = height
573            && self.state_rewind_policy != StateRewindPolicy::NoRewind
574        {
575            let all_changes = match changes {
576                StorageChanges::Changes(changes) => changes,
577                StorageChanges::ChangesList(list) => list.into_iter().flatten().collect(),
578            };
579            let mut storage_transaction = StorageTransaction::transaction(
580                &self.db,
581                ConflictPolicy::Overwrite,
582                all_changes,
583            );
584            self.store_modifications_history(&mut storage_transaction, &height)?;
585            changes = StorageChanges::Changes(storage_transaction.into_changes());
586        }
587
588        self.db.commit_changes(&changes)?;
589
590        Ok(())
591    }
592
593    fn view_at_height(
594        &self,
595        height: &Description::Height,
596    ) -> StorageResult<KeyValueView<ColumnType<Description>, HeightType<Description>>>
597    {
598        let view = self.create_view_at(height)?;
599        Ok(KeyValueView::from_storage_and_metadata(
600            KeyValueViewWrapper::new(view),
601            Some(*height),
602        ))
603    }
604
605    fn latest_view(
606        &self,
607    ) -> StorageResult<
608        IterableKeyValueView<ColumnType<Description>, HeightType<Description>>,
609    > {
610        let view = self.latest_view();
611        Ok(IterableKeyValueView::from_storage_and_metadata(
612            IterableKeyValueViewWrapper::new(view),
613            None,
614        ))
615    }
616
617    fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> {
618        self.rollback_block_to(height.as_u64())
619    }
620
621    fn shutdown(&self) {
622        self.db.shutdown()
623    }
624}
625
626pub fn height_key(key: &[u8], height: &u64) -> Vec<u8> {
627    let mut bytes = Vec::with_capacity(key.len().saturating_add(8));
628    let height_bytes = height.to_be_bytes();
629    bytes.extend_from_slice(key);
630    bytes.extend_from_slice(&height_bytes);
631    bytes
632}
633
634pub fn serialize<T>(t: &T) -> StorageResult<Value>
635where
636    T: Serialize + ?Sized,
637{
638    Ok(postcard::to_allocvec(&t)
639        .map_err(|err| StorageError::Codec(err.into()))?
640        .into())
641}
642
643pub fn deserialize<'a, T>(bytes: &'a [u8]) -> StorageResult<T>
644where
645    T: Deserialize<'a>,
646{
647    postcard::from_bytes(bytes).map_err(|err| StorageError::Codec(err.into()))
648}
649
650#[cfg(test)]
651#[allow(non_snake_case)]
652#[allow(clippy::cast_possible_truncation)]
653mod tests {
654    use super::*;
655    use crate::database::database_description::on_chain::OnChain;
656    use fuel_core_storage::{
657        ContractsAssetKey,
658        StorageAsMut,
659        StorageAsRef,
660        tables::ContractsAssets,
661        transactional::{
662            IntoTransaction,
663            ReadTransaction,
664        },
665    };
666
667    #[test]
668    fn test_height_key() {
669        let key = b"key";
670        let height = 42;
671        let expected = b"key\x00\x00\x00\x00\x00\x00\x00\x2a";
672        assert_eq!(height_key(key, &height), expected);
673    }
674
675    fn key() -> ContractsAssetKey {
676        ContractsAssetKey::new(&[123; 32].into(), &[213; 32].into())
677    }
678
679    #[test]
680    fn historical_rocksdb_read_original_database_works() {
681        // Given
682        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
683        let historical_rocks_db =
684            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
685
686        // Set the value at height 1 to be 123.
687        let mut transaction = historical_rocks_db.read_transaction();
688        transaction
689            .storage_as_mut::<ContractsAssets>()
690            .insert(&key(), &123)
691            .unwrap();
692        historical_rocks_db
693            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
694            .unwrap();
695
696        // Set the value at height 2 to be 321.
697        let mut transaction = historical_rocks_db.read_transaction();
698        transaction
699            .storage_as_mut::<ContractsAssets>()
700            .insert(&key(), &321)
701            .unwrap();
702        historical_rocks_db
703            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
704            .unwrap();
705
706        // When
707        let read_view = historical_rocks_db.read_transaction();
708        let latest_balance = read_view
709            .storage_as_ref::<ContractsAssets>()
710            .get(&key())
711            .unwrap()
712            .unwrap()
713            .into_owned();
714
715        // Then
716        assert_eq!(latest_balance, 321);
717    }
718
719    #[test]
720    fn historical_rocksdb_read_latest_view_works() {
721        // Given
722        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
723        let historical_rocks_db =
724            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
725
726        // Set the value at height 1 to be 123.
727        let mut transaction = historical_rocks_db.read_transaction();
728        transaction
729            .storage_as_mut::<ContractsAssets>()
730            .insert(&key(), &123)
731            .unwrap();
732        historical_rocks_db
733            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
734            .unwrap();
735
736        // Set the value at height 2 to be 321.
737        let mut transaction = historical_rocks_db.read_transaction();
738        transaction
739            .storage_as_mut::<ContractsAssets>()
740            .insert(&key(), &321)
741            .unwrap();
742        historical_rocks_db
743            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
744            .unwrap();
745
746        // When
747        let latest_view = historical_rocks_db.latest_view().into_transaction();
748        let latest_balance = latest_view
749            .storage_as_ref::<ContractsAssets>()
750            .get(&key())
751            .unwrap()
752            .unwrap()
753            .into_owned();
754
755        // Then
756        assert_eq!(latest_balance, 321);
757    }
758
759    #[test]
760    fn state_rewind_policy__no_rewind__create_view_at__fails() {
761        // Given
762        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
763        let historical_rocks_db =
764            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
765
766        let mut transaction = historical_rocks_db.read_transaction();
767        transaction
768            .storage_as_mut::<ContractsAssets>()
769            .insert(&key(), &123)
770            .unwrap();
771        historical_rocks_db
772            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
773            .unwrap();
774
775        // When
776        let view_at_height_1 =
777            historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
778
779        // Then
780        assert_eq!(
781            view_at_height_1,
782            Err(DatabaseError::NoHistoryForRequestedHeight {
783                requested_height: 1,
784            }
785            .into())
786        );
787    }
788
789    #[test]
790    fn state_rewind_policy__no_rewind__rollback__fails() {
791        // Given
792        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
793        let historical_rocks_db =
794            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
795
796        let mut transaction = historical_rocks_db.read_transaction();
797        transaction
798            .storage_as_mut::<ContractsAssets>()
799            .insert(&key(), &123)
800            .unwrap();
801        historical_rocks_db
802            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
803            .unwrap();
804
805        // When
806        let result = historical_rocks_db.rollback_last_block();
807
808        // Then
809        assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
810    }
811
812    #[test]
813    fn state_rewind_policy__rewind_range_1__cleanup_in_range_works() {
814        // Given
815        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
816        let historical_rocks_db = HistoricalRocksDB::new(
817            rocks_db,
818            StateRewindPolicy::RewindRange {
819                size: NonZeroU64::new(1).unwrap(),
820            },
821        )
822        .unwrap();
823
824        let mut transaction = historical_rocks_db.read_transaction();
825        transaction
826            .storage_as_mut::<ContractsAssets>()
827            .insert(&key(), &123)
828            .unwrap();
829        historical_rocks_db
830            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
831            .unwrap();
832
833        // When
834        let mut transaction = historical_rocks_db.read_transaction();
835        transaction
836            .storage_as_mut::<ContractsAssets>()
837            .insert(&key(), &321)
838            .unwrap();
839        historical_rocks_db
840            .commit_changes(Some(2u32.into()), transaction.into_changes().into())
841            .unwrap();
842
843        // Then
844        let view_at_height_1 =
845            historical_rocks_db.create_view_at(&1u32.into()).map(|_| ());
846        let view_at_height_0 =
847            historical_rocks_db.create_view_at(&0u32.into()).map(|_| ());
848        assert_eq!(view_at_height_1, Ok(()));
849        assert_eq!(
850            view_at_height_0,
851            Err(DatabaseError::NoHistoryForRequestedHeight {
852                requested_height: 0,
853            }
854            .into())
855        );
856    }
857
858    #[test]
859    fn state_rewind_policy__rewind_range_1__rollback_works() {
860        // Given
861        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
862        let historical_rocks_db = HistoricalRocksDB::new(
863            rocks_db,
864            StateRewindPolicy::RewindRange {
865                size: NonZeroU64::new(1).unwrap(),
866            },
867        )
868        .unwrap();
869
870        let mut transaction = historical_rocks_db.read_transaction();
871        transaction
872            .storage_as_mut::<ContractsAssets>()
873            .insert(&key(), &123)
874            .unwrap();
875        historical_rocks_db
876            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
877            .unwrap();
878        let entries = historical_rocks_db
879            .db
880            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
881            .collect::<Vec<_>>();
882        assert_eq!(entries.len(), 1);
883
884        // When
885        let result = historical_rocks_db.rollback_last_block();
886
887        // Then
888        assert_eq!(result, Ok(1));
889        let entries = historical_rocks_db
890            .db
891            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
892            .collect::<Vec<_>>();
893        assert_eq!(entries.len(), 0);
894    }
895
896    #[test]
897    fn state_rewind_policy__rewind_range_1__rollback_uses_v2() {
898        // Given
899        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
900        let historical_rocks_db = HistoricalRocksDB::new(
901            rocks_db,
902            StateRewindPolicy::RewindRange {
903                size: NonZeroU64::new(1).unwrap(),
904            },
905        )
906        .unwrap();
907
908        // When
909        let mut transaction = historical_rocks_db.read_transaction();
910        transaction
911            .storage_as_mut::<ContractsAssets>()
912            .insert(&key(), &123)
913            .unwrap();
914        historical_rocks_db
915            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
916            .unwrap();
917        let v2_entries = historical_rocks_db
918            .db
919            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
920            .collect::<Vec<_>>();
921        let v1_entries = historical_rocks_db
922            .db
923            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
924            .collect::<Vec<_>>();
925
926        // Then
927        assert_eq!(v2_entries.len(), 1);
928        assert_eq!(v1_entries.len(), 0);
929    }
930
931    #[test]
932    fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() {
933        // Given
934        let temp_dir = tempfile::tempdir().unwrap();
935        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
936            &temp_dir,
937            DatabaseConfig::config_for_tests(),
938        )
939        .unwrap();
940        let historical_rocks_db = HistoricalRocksDB::new(
941            rocks_db,
942            StateRewindPolicy::RewindRange {
943                size: NonZeroU64::new(1).unwrap(),
944            },
945        )
946        .unwrap();
947
948        // When
949        let mut transaction = historical_rocks_db.read_transaction();
950        transaction
951            .storage_as_mut::<ContractsAssets>()
952            .insert(&key(), &123)
953            .unwrap();
954        historical_rocks_db
955            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
956            .unwrap();
957
958        // Migrate the changes from V2 to V1.
959
960        let mut migration_transaction = StorageTransaction::transaction(
961            &historical_rocks_db.db,
962            ConflictPolicy::Overwrite,
963            Changes::default(),
964        );
965
966        let v2_changes = migration_transaction
967            .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
968            .take(&1u64)
969            .unwrap()
970            .unwrap();
971        migration_transaction
972            .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
973            .insert(&1u64, &v2_changes)
974            .unwrap();
975
976        historical_rocks_db
977            .db
978            .commit_changes(&migration_transaction.into_changes().into())
979            .unwrap();
980
981        // Check that the history has indeed been written to V1
982        let v2_entries = historical_rocks_db
983            .db
984            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
985            .collect::<Vec<_>>();
986        let v1_entries = historical_rocks_db
987            .db
988            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
989            .collect::<Vec<_>>();
990
991        assert_eq!(v2_entries.len(), 0);
992        assert_eq!(v1_entries.len(), 1);
993
994        drop(historical_rocks_db);
995
996        // Open the database again with fetched V1 entries status.
997        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
998            &temp_dir,
999            DatabaseConfig::config_for_tests(),
1000        )
1001        .unwrap();
1002        let historical_rocks_db = HistoricalRocksDB::new(
1003            rocks_db,
1004            StateRewindPolicy::RewindRange {
1005                size: NonZeroU64::new(1).unwrap(),
1006            },
1007        )
1008        .unwrap();
1009        let result = historical_rocks_db.rollback_last_block();
1010
1011        // Then
1012        assert_eq!(result, Ok(1));
1013        let v2_entries = historical_rocks_db
1014            .db
1015            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1016            .collect::<Vec<_>>();
1017        let v1_entries = historical_rocks_db
1018            .db
1019            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1020            .collect::<Vec<_>>();
1021        assert_eq!(v2_entries.len(), 0);
1022        assert_eq!(v1_entries.len(), 0);
1023    }
1024
1025    #[test]
1026    fn state_rewind_policy__rewind_range_1__migration_removes_v1() {
1027        let temp_dir = tempfile::tempdir().unwrap();
1028        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1029            &temp_dir,
1030            DatabaseConfig::config_for_tests(),
1031        )
1032        .unwrap();
1033        let historical_rocks_db =
1034            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1035
1036        const BLOCKS: u8 = 10;
1037
1038        // Given
1039        // Create 10 blocks with some changes.
1040        // And migrate all changes to V1 history.
1041        for i in 1..=BLOCKS {
1042            let height = i as u32;
1043            let height_64 = i as u64;
1044            let mut transaction = historical_rocks_db.read_transaction();
1045            let key = ContractsAssetKey::new(&[i; 32].into(), &[213; 32].into());
1046            transaction
1047                .storage_as_mut::<ContractsAssets>()
1048                .insert(&key, &123)
1049                .unwrap();
1050            historical_rocks_db
1051                .commit_changes(Some(height.into()), transaction.into_changes().into())
1052                .unwrap();
1053
1054            // Migrate the changes from V2 to V1.
1055            let mut migration_transaction = StorageTransaction::transaction(
1056                &historical_rocks_db.db,
1057                ConflictPolicy::Overwrite,
1058                Changes::default(),
1059            );
1060
1061            let v2_changes = migration_transaction
1062                .storage_as_mut::<ModificationsHistoryV2<OnChain>>()
1063                .take(&height_64)
1064                .unwrap()
1065                .unwrap();
1066            migration_transaction
1067                .storage_as_mut::<ModificationsHistoryV1<OnChain>>()
1068                .insert(&height_64, &v2_changes)
1069                .unwrap();
1070
1071            historical_rocks_db
1072                .db
1073                .commit_changes(&migration_transaction.into_changes().into())
1074                .unwrap();
1075
1076            // Check that the history has indeed been written to V1
1077            let v2_entries = historical_rocks_db
1078                .db
1079                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1080                .collect::<Vec<_>>();
1081            let v1_entries = historical_rocks_db
1082                .db
1083                .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1084                .collect::<Vec<_>>();
1085
1086            assert_eq!(v2_entries.len(), 0);
1087            assert_eq!(v1_entries.len(), i as usize);
1088        }
1089
1090        drop(historical_rocks_db);
1091
1092        // When
1093        // Open the database again, but with the rewind range of 1.
1094        // Committing 2 new blocks, should add new entries to V2 history.
1095        // And because of the rewind range of 1, the V1 history should be removed.
1096        let rocks_db = RocksDb::<Historical<OnChain>>::default_open(
1097            &temp_dir,
1098            DatabaseConfig::config_for_tests(),
1099        )
1100        .unwrap();
1101        let historical_rocks_db = HistoricalRocksDB::new(
1102            rocks_db,
1103            StateRewindPolicy::RewindRange {
1104                size: NonZeroU64::new(1).unwrap(),
1105            },
1106        )
1107        .unwrap();
1108        historical_rocks_db
1109            .commit_changes(Some((BLOCKS as u32 + 1).into()), StorageChanges::default())
1110            .unwrap();
1111        historical_rocks_db
1112            .commit_changes(Some((BLOCKS as u32 + 2).into()), StorageChanges::default())
1113            .unwrap();
1114
1115        // Then
1116        let v2_entries = historical_rocks_db
1117            .db
1118            .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1119            .collect::<Vec<_>>();
1120        let v1_entries = historical_rocks_db
1121            .db
1122            .iter_all::<ModificationsHistoryV1<OnChain>>(None)
1123            .collect::<Vec<_>>();
1124        assert_eq!(v2_entries.len(), 1);
1125        assert_eq!(v1_entries.len(), 0);
1126    }
1127
1128    #[test]
1129    fn rollback_last_block_works_with_v2() {
1130        // Given
1131        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1132
1133        let historical_rocks_db =
1134            HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
1135
1136        // When
1137        // Commit 1000 blocks
1138        for i in 1..=1000u32 {
1139            let mut transaction = historical_rocks_db.read_transaction();
1140            transaction
1141                .storage_as_mut::<ContractsAssets>()
1142                .insert(&key(), &(123 + i as u64))
1143                .unwrap();
1144            historical_rocks_db
1145                .commit_changes(Some(i.into()), transaction.into_changes().into())
1146                .unwrap();
1147        }
1148        // We can now rollback the last block 1000 times.
1149        let results: Vec<Result<u64, _>> = (0..1000u32)
1150            .map(|_| historical_rocks_db.rollback_last_block())
1151            .collect();
1152
1153        // Then
1154        // If the rollback fails at some point, then we have unintentionally rollbacked to
1155        // a block that was not the last.
1156        for (i, result) in results.iter().enumerate() {
1157            assert_eq!(result, &Ok(1000 - i as u64));
1158        }
1159    }
1160
1161    #[test]
1162    fn state_rewind_policy__rewind_range_1__second_rollback_fails() {
1163        // Given
1164        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1165        let historical_rocks_db = HistoricalRocksDB::new(
1166            rocks_db,
1167            StateRewindPolicy::RewindRange {
1168                size: NonZeroU64::new(1).unwrap(),
1169            },
1170        )
1171        .unwrap();
1172
1173        let mut transaction = historical_rocks_db.read_transaction();
1174        transaction
1175            .storage_as_mut::<ContractsAssets>()
1176            .insert(&key(), &123)
1177            .unwrap();
1178        historical_rocks_db
1179            .commit_changes(Some(1u32.into()), transaction.into_changes().into())
1180            .unwrap();
1181        historical_rocks_db.rollback_last_block().unwrap();
1182
1183        // When
1184        let result = historical_rocks_db.rollback_last_block();
1185
1186        // Then
1187        assert_eq!(result, Err(DatabaseError::ReachedEndOfHistory.into()));
1188    }
1189
1190    #[test]
1191    fn state_rewind_policy__rewind_range_10__rollbacks_work() {
1192        const ITERATIONS: usize = 100;
1193
1194        let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp().unwrap();
1195        let historical_rocks_db = HistoricalRocksDB::new(
1196            rocks_db,
1197            StateRewindPolicy::RewindRange {
1198                size: NonZeroU64::new(ITERATIONS as u64).unwrap(),
1199            },
1200        )
1201        .unwrap();
1202
1203        fn key(height: u32) -> ContractsAssetKey {
1204            ContractsAssetKey::new(&[height as u8; 32].into(), &[213; 32].into())
1205        }
1206
1207        for height in 1..=ITERATIONS {
1208            let height = height as u32;
1209            let key = key(height);
1210
1211            let mut transaction = historical_rocks_db.read_transaction();
1212            transaction
1213                .storage_as_mut::<ContractsAssets>()
1214                .insert(&key, &123)
1215                .unwrap();
1216            historical_rocks_db
1217                .commit_changes(Some(height.into()), transaction.into_changes().into())
1218                .unwrap();
1219        }
1220
1221        for height in (1..=ITERATIONS).rev() {
1222            // Given
1223            let entries = historical_rocks_db
1224                .db
1225                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1226                .collect::<Vec<_>>();
1227            assert_eq!(entries.len(), height);
1228
1229            // When
1230            let result = historical_rocks_db.rollback_last_block();
1231
1232            // Then
1233            assert_eq!(result, Ok(height as u64));
1234            let entries = historical_rocks_db
1235                .db
1236                .iter_all::<ModificationsHistoryV2<OnChain>>(None)
1237                .collect::<Vec<_>>();
1238            assert_eq!(entries.len(), height - 1);
1239        }
1240    }
1241}