Skip to main content

linera_storage/
migration.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use linera_base::{
5    crypto::CryptoHash,
6    identifiers::{BlobId, ChainId, EventId},
7};
8use linera_views::{
9    batch::Batch,
10    store::{KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, WritableKeyValueStore},
11    ViewError,
12};
13use serde::{Deserialize, Serialize};
14use tokio::time::Duration;
15
16use crate::{
17    db_storage::{
18        to_event_key, DbStorage, MultiPartitionBatch, RootKey, BLOB_KEY, BLOB_STATE_KEY, BLOCK_KEY,
19        LITE_CERTIFICATE_KEY, NETWORK_DESCRIPTION_KEY,
20    },
21    Clock,
22};
23
24#[derive(Debug)]
25enum SchemaVersion {
26    /// No schema version detected.
27    Uninitialized,
28    /// Version 0. All the blobs, certificates, confirmed blocks, events and network
29    /// description are on the same partition.
30    Version0,
31    /// Version 1. New partitions are assigned by chain ID, crypto hash, and blob ID.
32    Version1,
33}
34
35/// How long we should wait (in minutes) before retrying when we detect another migration
36/// in progress.
37const MIGRATION_WAIT_BEFORE_RETRY_MIN: u64 = 3;
38
39const UNUSED_EMPTY_KEY: &[u8] = &[];
40// We choose the ordering of the variants in `BaseKey` and `RootKey` so that
41// the root keys corresponding to `ChainState` and `BlockExporter` remain the
42// same in their serialization.
43// This implies that data on those root keys do not need to be moved.
44// For other tag variants, that are on the shared partition, we need to move
45// the data into their own partitions.
46const MOVABLE_KEYS_0_1: &[u8] = &[1, 2, 3, 4, 5, 7];
47
48/// The total number of keys being migrated in a block.
49/// We use chunks to avoid OOM.
50const BLOCK_KEY_SIZE: usize = 90;
51
52#[derive(Debug, Serialize, Deserialize)]
53enum BaseKey {
54    ChainState(ChainId),
55    Certificate(CryptoHash),
56    ConfirmedBlock(CryptoHash),
57    Blob(BlobId),
58    BlobState(BlobId),
59    Event(EventId),
60    BlockExporterState(u32),
61    NetworkDescription,
62}
63
64// We map a serialized `BaseKey` in the shared partition to a serialized `RootKey`
65// and key in the new schema. For `ChainState` and `BlockExporterState`, there is
66// no need to move so we use `UNUSED_EMPTY_KEY`.
67fn map_base_key(base_key: &[u8]) -> Result<(Vec<u8>, Vec<u8>), ViewError> {
68    let base_key = bcs::from_bytes::<BaseKey>(base_key)?;
69    match base_key {
70        BaseKey::ChainState(chain_id) => {
71            let root_key = RootKey::ChainState(chain_id).bytes();
72            Ok((root_key, UNUSED_EMPTY_KEY.to_vec()))
73        }
74        BaseKey::Certificate(hash) => {
75            let root_key = RootKey::ConfirmedBlock(hash).bytes();
76            Ok((root_key, LITE_CERTIFICATE_KEY.to_vec()))
77        }
78        BaseKey::ConfirmedBlock(hash) => {
79            let root_key = RootKey::ConfirmedBlock(hash).bytes();
80            Ok((root_key, BLOCK_KEY.to_vec()))
81        }
82        BaseKey::Blob(blob_id) => {
83            let root_key = RootKey::Blob(blob_id).bytes();
84            Ok((root_key, BLOB_KEY.to_vec()))
85        }
86        BaseKey::BlobState(blob_id) => {
87            let root_key = RootKey::Blob(blob_id).bytes();
88            Ok((root_key, BLOB_STATE_KEY.to_vec()))
89        }
90        BaseKey::Event(event_id) => {
91            let root_key = RootKey::Event(event_id.chain_id).bytes();
92            let key = to_event_key(&event_id);
93            Ok((root_key, key))
94        }
95        BaseKey::BlockExporterState(index) => {
96            let root_key = RootKey::BlockExporterState(index).bytes();
97            Ok((root_key, UNUSED_EMPTY_KEY.to_vec()))
98        }
99        BaseKey::NetworkDescription => {
100            let root_key = RootKey::NetworkDescription.bytes();
101            Ok((root_key, NETWORK_DESCRIPTION_KEY.to_vec()))
102        }
103    }
104}
105
106impl<Database, C> DbStorage<Database, C>
107where
108    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
109    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
110    C: Clock + Clone + Send + Sync + 'static,
111    Database::Error: From<bcs::Error> + Send + Sync,
112{
113    async fn migrate_shared_partition(
114        &self,
115        first_byte: &u8,
116        keys: Vec<Vec<u8>>,
117    ) -> Result<(), ViewError> {
118        tracing::info!(
119            "Migrating {} keys of shared DB partition starting with {first_byte}",
120            keys.len()
121        );
122        for (index, chunk_keys) in keys.chunks(BLOCK_KEY_SIZE).enumerate() {
123            tracing::info!("Processing chunk {index} of size {}", chunk_keys.len());
124            let chunk_base_keys = chunk_keys
125                .iter()
126                .map(|key| {
127                    let mut base_key = vec![*first_byte];
128                    base_key.extend(key);
129                    base_key
130                })
131                .collect::<Vec<Vec<u8>>>();
132            let store = self.database.open_shared(&[])?;
133            let values = store.read_multi_values_bytes(&chunk_base_keys).await?;
134            let mut batch = MultiPartitionBatch::new();
135            for (base_key, value) in chunk_base_keys.iter().zip(values) {
136                let value = value.ok_or_else(|| ViewError::MissingEntries("migration".into()))?;
137                let (root_key, key) = map_base_key(base_key)?;
138                batch.put_key_value(root_key, key, value);
139            }
140            self.write_batch(batch).await?;
141            // Now delete the keys
142            let mut batch = Batch::new();
143            for key in chunk_base_keys {
144                batch.delete_key(key.to_vec());
145            }
146            store.write_batch(batch).await?;
147        }
148        Ok(())
149    }
150
151    async fn migrate_v0_to_v1(&self) -> Result<(), ViewError> {
152        for first_byte in MOVABLE_KEYS_0_1 {
153            let store = self.database.open_shared(&[])?;
154            let keys = store.find_keys_by_prefix(&[*first_byte]).await?;
155            self.migrate_shared_partition(first_byte, keys).await?;
156        }
157        Ok(())
158    }
159
160    pub async fn migrate_if_needed(&self) -> Result<(), ViewError> {
161        loop {
162            if matches!(
163                self.get_storage_state().await?,
164                SchemaVersion::Uninitialized | SchemaVersion::Version1
165            ) {
166                // Nothing to do.
167                return Ok(());
168            }
169            let result = self.migrate_v0_to_v1().await;
170            if let Err(ViewError::MissingEntries(_)) = result {
171                tracing::warn!(
172                    "It looks like a migration is already in progress on this database. \
173                     I will wait for {:?} minutes and retry.",
174                    MIGRATION_WAIT_BEFORE_RETRY_MIN
175                );
176                // Duration::from_mins is not yet stable for tokio 1.36.
177                tokio::time::sleep(Duration::from_secs(MIGRATION_WAIT_BEFORE_RETRY_MIN * 60)).await;
178                continue;
179            }
180            return result;
181        }
182    }
183
184    async fn get_storage_state(&self) -> Result<SchemaVersion, ViewError> {
185        let store = self.database.open_shared(&[])?;
186        let key = bcs::to_bytes(&BaseKey::NetworkDescription).unwrap();
187        if store.contains_key(&key).await? {
188            return Ok(SchemaVersion::Version0);
189        }
190
191        let root_key = RootKey::NetworkDescription.bytes();
192        let store = self.database.open_shared(&root_key)?;
193        if store.contains_key(NETWORK_DESCRIPTION_KEY).await? {
194            return Ok(SchemaVersion::Version1);
195        }
196
197        Ok(SchemaVersion::Uninitialized)
198    }
199
200    /// Assert that the storage is at the last version (or not yet initialized).
201    pub async fn assert_is_migrated_storage(&self) -> Result<(), ViewError> {
202        let state = self.get_storage_state().await?;
203        assert!(matches!(
204            state,
205            SchemaVersion::Uninitialized | SchemaVersion::Version1
206        ));
207        Ok(())
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::{
214        collections::{BTreeMap, HashMap},
215        marker::PhantomData,
216        ops::Deref,
217    };
218
219    use linera_base::{
220        crypto::CryptoHash,
221        identifiers::{BlobId, BlobType, ChainId, EventId, StreamId, StreamName},
222    };
223    #[cfg(feature = "rocksdb")]
224    use linera_views::rocks_db::RocksDbDatabase;
225    #[cfg(feature = "scylladb")]
226    use linera_views::scylla_db::ScyllaDbDatabase;
227    use linera_views::{
228        batch::Batch,
229        memory::MemoryDatabase,
230        random::make_deterministic_rng,
231        store::{
232            KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, TestKeyValueDatabase,
233            WritableKeyValueStore,
234        },
235        ViewError,
236    };
237    use rand::{distributions, Rng};
238    use test_case::test_case;
239
240    use crate::{
241        db_storage::RestrictedEventId,
242        migration::{
243            BaseKey, RootKey, BLOB_KEY, BLOB_STATE_KEY, BLOCK_KEY, LITE_CERTIFICATE_KEY,
244            NETWORK_DESCRIPTION_KEY,
245        },
246        DbStorage, WallClock,
247    };
248
249    #[derive(Clone, Debug, Eq, PartialEq)]
250    #[allow(clippy::type_complexity)]
251    struct StorageState {
252        chain_ids_key_values: BTreeMap<ChainId, Vec<(Vec<u8>, Vec<u8>)>>,
253        certificates: BTreeMap<CryptoHash, Vec<u8>>,
254        confirmed_blocks: BTreeMap<CryptoHash, Vec<u8>>,
255        blobs: BTreeMap<BlobId, Vec<u8>>,
256        blob_states: BTreeMap<BlobId, Vec<u8>>,
257        events: HashMap<EventId, Vec<u8>>,
258        block_exporter_states: BTreeMap<u32, Vec<(Vec<u8>, Vec<u8>)>>,
259        network_description: Option<Vec<u8>>,
260    }
261
262    impl StorageState {
263        fn append_storage_state(&mut self, storage_state: StorageState) {
264            self.chain_ids_key_values
265                .extend(storage_state.chain_ids_key_values);
266            self.certificates.extend(storage_state.certificates);
267            self.confirmed_blocks.extend(storage_state.confirmed_blocks);
268            self.blobs.extend(storage_state.blobs);
269            self.blob_states.extend(storage_state.blob_states);
270            self.events.extend(storage_state.events);
271            self.block_exporter_states
272                .extend(storage_state.block_exporter_states);
273            if let Some(value) = storage_state.network_description {
274                assert!(self.network_description.is_none());
275                self.network_description = Some(value);
276            }
277        }
278    }
279
280    fn create_vector(rng: &mut impl Rng, len: usize) -> Vec<u8> {
281        rng.sample_iter(distributions::Standard).take(len).collect()
282    }
283
284    fn get_hash(rng: &mut impl Rng) -> CryptoHash {
285        let rnd_val = rng.gen::<usize>();
286        CryptoHash::test_hash(format!("rnd_val={rnd_val}"))
287    }
288
289    fn get_stream_id(rng: &mut impl Rng) -> StreamId {
290        let stream_name = StreamName(create_vector(rng, 10));
291        StreamId::system(stream_name)
292    }
293
294    fn get_event_id(rng: &mut impl Rng) -> EventId {
295        let hash = get_hash(rng);
296        let chain_id = ChainId(hash);
297        let stream_id = get_stream_id(rng);
298        let index = rng.gen::<u32>();
299        EventId {
300            chain_id,
301            stream_id,
302            index,
303        }
304    }
305
306    fn get_storage_state() -> StorageState {
307        let mut rng = make_deterministic_rng();
308        let key_size = 5;
309        let value_size = 10;
310        // 0: the chain states.
311        let chain_id_count = 10;
312        let n_key = 1;
313        let mut chain_ids_key_values = BTreeMap::new();
314        for _i_chain in 0..chain_id_count {
315            let hash = get_hash(&mut rng);
316            let chain_id = ChainId(hash);
317            let mut key_values = Vec::new();
318            for _i_key in 0..n_key {
319                let key = create_vector(&mut rng, key_size);
320                let value = create_vector(&mut rng, value_size);
321                key_values.push((key, value));
322            }
323            key_values.sort_unstable();
324            chain_ids_key_values.insert(chain_id, key_values);
325        }
326        // 1: the certificates
327        let certificates_count = 10;
328        let mut certificates = BTreeMap::new();
329        for _i_certificate in 0..certificates_count {
330            let hash = get_hash(&mut rng);
331            let value = create_vector(&mut rng, value_size);
332            certificates.insert(hash, value);
333        }
334        // 2: the confirmed blocks (along with certificates)
335        let blocks_count = 10;
336        let mut confirmed_blocks = BTreeMap::new();
337        for _i_block in 0..blocks_count {
338            let hash = get_hash(&mut rng);
339            let value = create_vector(&mut rng, value_size);
340            certificates.insert(hash, value);
341            let value = create_vector(&mut rng, value_size);
342            confirmed_blocks.insert(hash, value);
343        }
344        // 3: the blobs
345        let blobs_count = 2;
346        let mut blobs = BTreeMap::new();
347        for _i_blob in 0..blobs_count {
348            let hash = get_hash(&mut rng);
349            let blob_id = BlobId {
350                blob_type: BlobType::Data,
351                hash,
352            };
353            let value = create_vector(&mut rng, value_size);
354            blobs.insert(blob_id, value);
355        }
356        // 4: the blob states
357        let blob_states_count = 2;
358        let mut blob_states = BTreeMap::new();
359        for _i_blob_state in 0..blob_states_count {
360            let hash = get_hash(&mut rng);
361            let blob_id = BlobId {
362                blob_type: BlobType::Data,
363                hash,
364            };
365            let value = create_vector(&mut rng, value_size);
366            blob_states.insert(blob_id, value);
367        }
368        // 5: the events
369        let events_count = 2;
370        let mut events = HashMap::new();
371        for _i_event in 0..events_count {
372            let event_id = get_event_id(&mut rng);
373            let value = create_vector(&mut rng, value_size);
374            events.insert(event_id, value);
375        }
376        // 6: the block exporters
377        let block_exporters_count = 2;
378        let n_key = 1;
379        let mut block_exporter_states = BTreeMap::new();
380        for _i_block_export in 0..block_exporters_count {
381            let index = rng.gen::<u32>();
382            let mut key_values = Vec::new();
383            for _i_key in 0..n_key {
384                let key = create_vector(&mut rng, key_size);
385                let value = create_vector(&mut rng, value_size);
386                key_values.push((key, value));
387            }
388            key_values.sort_unstable();
389            block_exporter_states.insert(index, key_values);
390        }
391        // 7: network description
392        let network_description = Some(create_vector(&mut rng, value_size));
393        StorageState {
394            chain_ids_key_values,
395            certificates,
396            confirmed_blocks,
397            blobs,
398            blob_states,
399            events,
400            block_exporter_states,
401            network_description,
402        }
403    }
404
405    async fn write_storage_state_old_schema<D>(
406        database: &D,
407        storage_state: StorageState,
408    ) -> Result<(), ViewError>
409    where
410        D: KeyValueDatabase + Clone + Send + Sync + 'static,
411        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
412        D::Error: Send + Sync,
413    {
414        for (chain_id, key_values) in storage_state.chain_ids_key_values {
415            let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
416            let store = database.open_shared(&root_key)?;
417            let mut batch = Batch::new();
418            for (key, value) in key_values {
419                batch.put_key_value_bytes(key, value);
420            }
421            store.write_batch(batch).await?;
422        }
423        for (index, key_values) in storage_state.block_exporter_states {
424            let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(index))?;
425            let store = database.open_shared(&root_key)?;
426            let mut batch = Batch::new();
427            for (key, value) in key_values {
428                batch.put_key_value_bytes(key, value);
429            }
430            store.write_batch(batch).await?;
431        }
432        // Writing in the shared partition
433        let mut batch = Batch::new();
434        for (hash, value) in storage_state.certificates {
435            let key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
436            batch.put_key_value_bytes(key, value);
437        }
438        for (hash, value) in storage_state.confirmed_blocks {
439            let key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
440            batch.put_key_value_bytes(key, value);
441        }
442        for (blob_id, value) in storage_state.blobs {
443            let key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
444            batch.put_key_value_bytes(key, value);
445        }
446        for (blob_id, value) in storage_state.blob_states {
447            let key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
448            batch.put_key_value_bytes(key, value);
449        }
450        for (event_id, value) in storage_state.events {
451            let key = bcs::to_bytes(&BaseKey::Event(event_id))?;
452            batch.put_key_value_bytes(key, value);
453        }
454        if let Some(network_description) = storage_state.network_description {
455            let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
456            batch.put_key_value_bytes(key, network_description);
457        }
458        let store = database.open_shared(&[])?;
459        store.write_batch(batch).await?;
460        Ok(())
461    }
462
463    fn is_valid_root_key(root_key: &[u8]) -> bool {
464        if root_key.is_empty() {
465            // It corresponds to the &[]
466            return false;
467        }
468        if root_key == [4] {
469            // It corresponds to the key of the database schema.
470            return false;
471        }
472        true
473    }
474
475    async fn read_storage_state_new_schema<D>(database: &D) -> Result<StorageState, ViewError>
476    where
477        D: KeyValueDatabase + Clone + Send + Sync + 'static,
478        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
479        D::Error: Send + Sync,
480    {
481        let mut chain_ids_key_values = BTreeMap::new();
482        let mut certificates = BTreeMap::new();
483        let mut confirmed_blocks = BTreeMap::new();
484        let mut blobs = BTreeMap::new();
485        let mut blob_states = BTreeMap::new();
486        let mut events = HashMap::new();
487        let mut block_exporter_states = BTreeMap::new();
488        let mut network_description = None;
489        let bcs_root_keys = database.list_root_keys().await?;
490        for bcs_root_key in bcs_root_keys {
491            if is_valid_root_key(&bcs_root_key) {
492                let root_key = bcs::from_bytes(&bcs_root_key)?;
493                match root_key {
494                    RootKey::ChainState(chain_id) => {
495                        let store = database.open_shared(&bcs_root_key)?;
496                        let key_values = store.find_key_values_by_prefix(&[]).await?;
497                        chain_ids_key_values.insert(chain_id, key_values);
498                    }
499                    RootKey::ConfirmedBlock(hash) => {
500                        let store = database.open_shared(&bcs_root_key)?;
501                        let value = store.read_value_bytes(LITE_CERTIFICATE_KEY).await?;
502                        if let Some(value) = value {
503                            certificates.insert(hash, value);
504                        }
505                        let value = store.read_value_bytes(BLOCK_KEY).await?;
506                        if let Some(value) = value {
507                            confirmed_blocks.insert(hash, value);
508                        }
509                    }
510                    RootKey::Blob(blob_id) => {
511                        let store = database.open_shared(&bcs_root_key)?;
512                        let value = store.read_value_bytes(BLOB_KEY).await?;
513                        if let Some(value) = value {
514                            blobs.insert(blob_id, value);
515                        }
516                        let value = store.read_value_bytes(BLOB_STATE_KEY).await?;
517                        if let Some(value) = value {
518                            blob_states.insert(blob_id, value);
519                        }
520                    }
521                    RootKey::Event(chain_id) => {
522                        let store = database.open_shared(&bcs_root_key)?;
523                        let key_values = store.find_key_values_by_prefix(&[]).await?;
524                        for (key, value) in key_values {
525                            let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
526                            let event_id = EventId {
527                                chain_id,
528                                stream_id: restricted_event_id.stream_id,
529                                index: restricted_event_id.index,
530                            };
531                            events.insert(event_id, value);
532                        }
533                    }
534                    RootKey::Placeholder => {
535                        // Nothing to be done
536                    }
537                    RootKey::NetworkDescription => {
538                        let store = database.open_shared(&bcs_root_key)?;
539                        let value = store.read_value_bytes(NETWORK_DESCRIPTION_KEY).await?;
540                        if let Some(value) = value {
541                            network_description = Some(value);
542                        }
543                    }
544                    RootKey::BlockExporterState(index) => {
545                        let store = database.open_shared(&bcs_root_key)?;
546                        let key_values = store.find_key_values_by_prefix(&[]).await?;
547                        block_exporter_states.insert(index, key_values);
548                    }
549                    RootKey::BlockByHeight(_) => {
550                        // Nothing to be done
551                    }
552                }
553            }
554        }
555        Ok(StorageState {
556            chain_ids_key_values,
557            certificates,
558            confirmed_blocks,
559            blobs,
560            blob_states,
561            events,
562            block_exporter_states,
563            network_description,
564        })
565    }
566
567    async fn test_storage_migration<D>() -> Result<(), ViewError>
568    where
569        D: TestKeyValueDatabase + Clone + Send + Sync + 'static,
570        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
571        D::Error: Send + Sync,
572    {
573        let database = D::connect_test_namespace().await?;
574        // Get a storage state and write it.
575        let mut storage_state = get_storage_state();
576        write_storage_state_old_schema(&database, storage_state.clone()).await?;
577        // Creating a storage and migrate to the new database schema.
578        let storage = DbStorage::<D, WallClock>::new(database, None, WallClock);
579        storage.migrate_if_needed().await?;
580        // read the storage state and compare it.
581        let read_storage_state = read_storage_state_new_schema(storage.database.deref()).await?;
582        assert_eq!(read_storage_state, storage_state);
583        // Creates a new storage state, write it and migrate it.
584        // That should simulate the partial migration interrupted for some reason and restarted.
585        let mut appended_state = get_storage_state();
586        appended_state.network_description = None;
587        write_storage_state_old_schema(storage.database.deref(), appended_state.clone()).await?;
588        storage.migrate_if_needed().await?;
589        storage_state.append_storage_state(appended_state);
590        let read_storage_state = read_storage_state_new_schema(storage.database.deref()).await?;
591        assert_eq!(read_storage_state, storage_state);
592        Ok(())
593    }
594
595    #[test_case(PhantomData::<MemoryDatabase>; "MemoryDatabase")]
596    #[cfg_attr(with_rocksdb, test_case(PhantomData::<RocksDbDatabase>; "RocksDbDatabase"))]
597    #[cfg_attr(with_scylladb, test_case(PhantomData::<ScyllaDbDatabase>; "ScyllaDbDatabase"))]
598    #[tokio::test]
599    async fn test_storage_migration_cases<D>(_storage_type: PhantomData<D>) -> Result<(), ViewError>
600    where
601        D: TestKeyValueDatabase + Clone + Send + Sync + 'static,
602        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
603        D::Error: Send + Sync,
604    {
605        test_storage_migration::<D>().await
606    }
607}