1use linera_base::{
5 crypto::CryptoHash,
6 identifiers::{BlobId, ChainId, EventId},
7};
8use linera_views::{
9 batch::Batch,
10 store::{KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, WritableKeyValueStore},
11 ViewError,
12};
13use serde::{Deserialize, Serialize};
14use tokio::time::Duration;
15
16use crate::{
17 db_storage::{
18 to_event_key, DbStorage, MultiPartitionBatch, RootKey, BLOB_KEY, BLOB_STATE_KEY, BLOCK_KEY,
19 LITE_CERTIFICATE_KEY, NETWORK_DESCRIPTION_KEY,
20 },
21 Clock,
22};
23
24#[derive(Debug)]
25enum SchemaVersion {
26 Uninitialized,
28 Version0,
31 Version1,
33}
34
35const MIGRATION_WAIT_BEFORE_RETRY_MIN: u64 = 3;
38
39const UNUSED_EMPTY_KEY: &[u8] = &[];
40const MOVABLE_KEYS_0_1: &[u8] = &[1, 2, 3, 4, 5, 7];
47
48const BLOCK_KEY_SIZE: usize = 90;
51
52#[derive(Debug, Serialize, Deserialize)]
53enum BaseKey {
54 ChainState(ChainId),
55 Certificate(CryptoHash),
56 ConfirmedBlock(CryptoHash),
57 Blob(BlobId),
58 BlobState(BlobId),
59 Event(EventId),
60 BlockExporterState(u32),
61 NetworkDescription,
62}
63
64fn map_base_key(base_key: &[u8]) -> Result<(Vec<u8>, Vec<u8>), ViewError> {
68 let base_key = bcs::from_bytes::<BaseKey>(base_key)?;
69 match base_key {
70 BaseKey::ChainState(chain_id) => {
71 let root_key = RootKey::ChainState(chain_id).bytes();
72 Ok((root_key, UNUSED_EMPTY_KEY.to_vec()))
73 }
74 BaseKey::Certificate(hash) => {
75 let root_key = RootKey::ConfirmedBlock(hash).bytes();
76 Ok((root_key, LITE_CERTIFICATE_KEY.to_vec()))
77 }
78 BaseKey::ConfirmedBlock(hash) => {
79 let root_key = RootKey::ConfirmedBlock(hash).bytes();
80 Ok((root_key, BLOCK_KEY.to_vec()))
81 }
82 BaseKey::Blob(blob_id) => {
83 let root_key = RootKey::Blob(blob_id).bytes();
84 Ok((root_key, BLOB_KEY.to_vec()))
85 }
86 BaseKey::BlobState(blob_id) => {
87 let root_key = RootKey::Blob(blob_id).bytes();
88 Ok((root_key, BLOB_STATE_KEY.to_vec()))
89 }
90 BaseKey::Event(event_id) => {
91 let root_key = RootKey::Event(event_id.chain_id).bytes();
92 let key = to_event_key(&event_id);
93 Ok((root_key, key))
94 }
95 BaseKey::BlockExporterState(index) => {
96 let root_key = RootKey::BlockExporterState(index).bytes();
97 Ok((root_key, UNUSED_EMPTY_KEY.to_vec()))
98 }
99 BaseKey::NetworkDescription => {
100 let root_key = RootKey::NetworkDescription.bytes();
101 Ok((root_key, NETWORK_DESCRIPTION_KEY.to_vec()))
102 }
103 }
104}
105
106impl<Database, C> DbStorage<Database, C>
107where
108 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
109 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
110 C: Clock + Clone + Send + Sync + 'static,
111 Database::Error: From<bcs::Error> + Send + Sync,
112{
113 async fn migrate_shared_partition(
114 &self,
115 first_byte: &u8,
116 keys: Vec<Vec<u8>>,
117 ) -> Result<(), ViewError> {
118 tracing::info!(
119 "Migrating {} keys of shared DB partition starting with {first_byte}",
120 keys.len()
121 );
122 for (index, chunk_keys) in keys.chunks(BLOCK_KEY_SIZE).enumerate() {
123 tracing::info!("Processing chunk {index} of size {}", chunk_keys.len());
124 let chunk_base_keys = chunk_keys
125 .iter()
126 .map(|key| {
127 let mut base_key = vec![*first_byte];
128 base_key.extend(key);
129 base_key
130 })
131 .collect::<Vec<Vec<u8>>>();
132 let store = self.database.open_shared(&[])?;
133 let values = store.read_multi_values_bytes(&chunk_base_keys).await?;
134 let mut batch = MultiPartitionBatch::new();
135 for (base_key, value) in chunk_base_keys.iter().zip(values) {
136 let value = value.ok_or_else(|| ViewError::MissingEntries("migration".into()))?;
137 let (root_key, key) = map_base_key(base_key)?;
138 batch.put_key_value(root_key, key, value);
139 }
140 self.write_batch(batch).await?;
141 let mut batch = Batch::new();
143 for key in chunk_base_keys {
144 batch.delete_key(key.to_vec());
145 }
146 store.write_batch(batch).await?;
147 }
148 Ok(())
149 }
150
151 async fn migrate_v0_to_v1(&self) -> Result<(), ViewError> {
152 for first_byte in MOVABLE_KEYS_0_1 {
153 let store = self.database.open_shared(&[])?;
154 let keys = store.find_keys_by_prefix(&[*first_byte]).await?;
155 self.migrate_shared_partition(first_byte, keys).await?;
156 }
157 Ok(())
158 }
159
160 pub async fn migrate_if_needed(&self) -> Result<(), ViewError> {
161 loop {
162 if matches!(
163 self.get_storage_state().await?,
164 SchemaVersion::Uninitialized | SchemaVersion::Version1
165 ) {
166 return Ok(());
168 }
169 let result = self.migrate_v0_to_v1().await;
170 if let Err(ViewError::MissingEntries(_)) = result {
171 tracing::warn!(
172 "It looks like a migration is already in progress on this database. \
173 I will wait for {:?} minutes and retry.",
174 MIGRATION_WAIT_BEFORE_RETRY_MIN
175 );
176 tokio::time::sleep(Duration::from_secs(MIGRATION_WAIT_BEFORE_RETRY_MIN * 60)).await;
178 continue;
179 }
180 return result;
181 }
182 }
183
184 async fn get_storage_state(&self) -> Result<SchemaVersion, ViewError> {
185 let store = self.database.open_shared(&[])?;
186 let key = bcs::to_bytes(&BaseKey::NetworkDescription).unwrap();
187 if store.contains_key(&key).await? {
188 return Ok(SchemaVersion::Version0);
189 }
190
191 let root_key = RootKey::NetworkDescription.bytes();
192 let store = self.database.open_shared(&root_key)?;
193 if store.contains_key(NETWORK_DESCRIPTION_KEY).await? {
194 return Ok(SchemaVersion::Version1);
195 }
196
197 Ok(SchemaVersion::Uninitialized)
198 }
199
200 pub async fn assert_is_migrated_storage(&self) -> Result<(), ViewError> {
202 let state = self.get_storage_state().await?;
203 assert!(matches!(
204 state,
205 SchemaVersion::Uninitialized | SchemaVersion::Version1
206 ));
207 Ok(())
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use std::{
214 collections::{BTreeMap, HashMap},
215 marker::PhantomData,
216 ops::Deref,
217 };
218
219 use linera_base::{
220 crypto::CryptoHash,
221 identifiers::{BlobId, BlobType, ChainId, EventId, StreamId, StreamName},
222 };
223 #[cfg(feature = "rocksdb")]
224 use linera_views::rocks_db::RocksDbDatabase;
225 #[cfg(feature = "scylladb")]
226 use linera_views::scylla_db::ScyllaDbDatabase;
227 use linera_views::{
228 batch::Batch,
229 memory::MemoryDatabase,
230 random::make_deterministic_rng,
231 store::{
232 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, TestKeyValueDatabase,
233 WritableKeyValueStore,
234 },
235 ViewError,
236 };
237 use rand::{distributions, Rng};
238 use test_case::test_case;
239
240 use crate::{
241 db_storage::RestrictedEventId,
242 migration::{
243 BaseKey, RootKey, BLOB_KEY, BLOB_STATE_KEY, BLOCK_KEY, LITE_CERTIFICATE_KEY,
244 NETWORK_DESCRIPTION_KEY,
245 },
246 DbStorage, WallClock,
247 };
248
249 #[derive(Clone, Debug, Eq, PartialEq)]
250 #[allow(clippy::type_complexity)]
251 struct StorageState {
252 chain_ids_key_values: BTreeMap<ChainId, Vec<(Vec<u8>, Vec<u8>)>>,
253 certificates: BTreeMap<CryptoHash, Vec<u8>>,
254 confirmed_blocks: BTreeMap<CryptoHash, Vec<u8>>,
255 blobs: BTreeMap<BlobId, Vec<u8>>,
256 blob_states: BTreeMap<BlobId, Vec<u8>>,
257 events: HashMap<EventId, Vec<u8>>,
258 block_exporter_states: BTreeMap<u32, Vec<(Vec<u8>, Vec<u8>)>>,
259 network_description: Option<Vec<u8>>,
260 }
261
262 impl StorageState {
263 fn append_storage_state(&mut self, storage_state: StorageState) {
264 self.chain_ids_key_values
265 .extend(storage_state.chain_ids_key_values);
266 self.certificates.extend(storage_state.certificates);
267 self.confirmed_blocks.extend(storage_state.confirmed_blocks);
268 self.blobs.extend(storage_state.blobs);
269 self.blob_states.extend(storage_state.blob_states);
270 self.events.extend(storage_state.events);
271 self.block_exporter_states
272 .extend(storage_state.block_exporter_states);
273 if let Some(value) = storage_state.network_description {
274 assert!(self.network_description.is_none());
275 self.network_description = Some(value);
276 }
277 }
278 }
279
280 fn create_vector(rng: &mut impl Rng, len: usize) -> Vec<u8> {
281 rng.sample_iter(distributions::Standard).take(len).collect()
282 }
283
284 fn get_hash(rng: &mut impl Rng) -> CryptoHash {
285 let rnd_val = rng.gen::<usize>();
286 CryptoHash::test_hash(format!("rnd_val={rnd_val}"))
287 }
288
289 fn get_stream_id(rng: &mut impl Rng) -> StreamId {
290 let stream_name = StreamName(create_vector(rng, 10));
291 StreamId::system(stream_name)
292 }
293
294 fn get_event_id(rng: &mut impl Rng) -> EventId {
295 let hash = get_hash(rng);
296 let chain_id = ChainId(hash);
297 let stream_id = get_stream_id(rng);
298 let index = rng.gen::<u32>();
299 EventId {
300 chain_id,
301 stream_id,
302 index,
303 }
304 }
305
306 fn get_storage_state() -> StorageState {
307 let mut rng = make_deterministic_rng();
308 let key_size = 5;
309 let value_size = 10;
310 let chain_id_count = 10;
312 let n_key = 1;
313 let mut chain_ids_key_values = BTreeMap::new();
314 for _i_chain in 0..chain_id_count {
315 let hash = get_hash(&mut rng);
316 let chain_id = ChainId(hash);
317 let mut key_values = Vec::new();
318 for _i_key in 0..n_key {
319 let key = create_vector(&mut rng, key_size);
320 let value = create_vector(&mut rng, value_size);
321 key_values.push((key, value));
322 }
323 key_values.sort_unstable();
324 chain_ids_key_values.insert(chain_id, key_values);
325 }
326 let certificates_count = 10;
328 let mut certificates = BTreeMap::new();
329 for _i_certificate in 0..certificates_count {
330 let hash = get_hash(&mut rng);
331 let value = create_vector(&mut rng, value_size);
332 certificates.insert(hash, value);
333 }
334 let blocks_count = 10;
336 let mut confirmed_blocks = BTreeMap::new();
337 for _i_block in 0..blocks_count {
338 let hash = get_hash(&mut rng);
339 let value = create_vector(&mut rng, value_size);
340 certificates.insert(hash, value);
341 let value = create_vector(&mut rng, value_size);
342 confirmed_blocks.insert(hash, value);
343 }
344 let blobs_count = 2;
346 let mut blobs = BTreeMap::new();
347 for _i_blob in 0..blobs_count {
348 let hash = get_hash(&mut rng);
349 let blob_id = BlobId {
350 blob_type: BlobType::Data,
351 hash,
352 };
353 let value = create_vector(&mut rng, value_size);
354 blobs.insert(blob_id, value);
355 }
356 let blob_states_count = 2;
358 let mut blob_states = BTreeMap::new();
359 for _i_blob_state in 0..blob_states_count {
360 let hash = get_hash(&mut rng);
361 let blob_id = BlobId {
362 blob_type: BlobType::Data,
363 hash,
364 };
365 let value = create_vector(&mut rng, value_size);
366 blob_states.insert(blob_id, value);
367 }
368 let events_count = 2;
370 let mut events = HashMap::new();
371 for _i_event in 0..events_count {
372 let event_id = get_event_id(&mut rng);
373 let value = create_vector(&mut rng, value_size);
374 events.insert(event_id, value);
375 }
376 let block_exporters_count = 2;
378 let n_key = 1;
379 let mut block_exporter_states = BTreeMap::new();
380 for _i_block_export in 0..block_exporters_count {
381 let index = rng.gen::<u32>();
382 let mut key_values = Vec::new();
383 for _i_key in 0..n_key {
384 let key = create_vector(&mut rng, key_size);
385 let value = create_vector(&mut rng, value_size);
386 key_values.push((key, value));
387 }
388 key_values.sort_unstable();
389 block_exporter_states.insert(index, key_values);
390 }
391 let network_description = Some(create_vector(&mut rng, value_size));
393 StorageState {
394 chain_ids_key_values,
395 certificates,
396 confirmed_blocks,
397 blobs,
398 blob_states,
399 events,
400 block_exporter_states,
401 network_description,
402 }
403 }
404
405 async fn write_storage_state_old_schema<D>(
406 database: &D,
407 storage_state: StorageState,
408 ) -> Result<(), ViewError>
409 where
410 D: KeyValueDatabase + Clone + Send + Sync + 'static,
411 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
412 D::Error: Send + Sync,
413 {
414 for (chain_id, key_values) in storage_state.chain_ids_key_values {
415 let root_key = bcs::to_bytes(&BaseKey::ChainState(chain_id))?;
416 let store = database.open_shared(&root_key)?;
417 let mut batch = Batch::new();
418 for (key, value) in key_values {
419 batch.put_key_value_bytes(key, value);
420 }
421 store.write_batch(batch).await?;
422 }
423 for (index, key_values) in storage_state.block_exporter_states {
424 let root_key = bcs::to_bytes(&BaseKey::BlockExporterState(index))?;
425 let store = database.open_shared(&root_key)?;
426 let mut batch = Batch::new();
427 for (key, value) in key_values {
428 batch.put_key_value_bytes(key, value);
429 }
430 store.write_batch(batch).await?;
431 }
432 let mut batch = Batch::new();
434 for (hash, value) in storage_state.certificates {
435 let key = bcs::to_bytes(&BaseKey::Certificate(hash))?;
436 batch.put_key_value_bytes(key, value);
437 }
438 for (hash, value) in storage_state.confirmed_blocks {
439 let key = bcs::to_bytes(&BaseKey::ConfirmedBlock(hash))?;
440 batch.put_key_value_bytes(key, value);
441 }
442 for (blob_id, value) in storage_state.blobs {
443 let key = bcs::to_bytes(&BaseKey::Blob(blob_id))?;
444 batch.put_key_value_bytes(key, value);
445 }
446 for (blob_id, value) in storage_state.blob_states {
447 let key = bcs::to_bytes(&BaseKey::BlobState(blob_id))?;
448 batch.put_key_value_bytes(key, value);
449 }
450 for (event_id, value) in storage_state.events {
451 let key = bcs::to_bytes(&BaseKey::Event(event_id))?;
452 batch.put_key_value_bytes(key, value);
453 }
454 if let Some(network_description) = storage_state.network_description {
455 let key = bcs::to_bytes(&BaseKey::NetworkDescription)?;
456 batch.put_key_value_bytes(key, network_description);
457 }
458 let store = database.open_shared(&[])?;
459 store.write_batch(batch).await?;
460 Ok(())
461 }
462
463 fn is_valid_root_key(root_key: &[u8]) -> bool {
464 if root_key.is_empty() {
465 return false;
467 }
468 if root_key == [4] {
469 return false;
471 }
472 true
473 }
474
475 async fn read_storage_state_new_schema<D>(database: &D) -> Result<StorageState, ViewError>
476 where
477 D: KeyValueDatabase + Clone + Send + Sync + 'static,
478 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
479 D::Error: Send + Sync,
480 {
481 let mut chain_ids_key_values = BTreeMap::new();
482 let mut certificates = BTreeMap::new();
483 let mut confirmed_blocks = BTreeMap::new();
484 let mut blobs = BTreeMap::new();
485 let mut blob_states = BTreeMap::new();
486 let mut events = HashMap::new();
487 let mut block_exporter_states = BTreeMap::new();
488 let mut network_description = None;
489 let bcs_root_keys = database.list_root_keys().await?;
490 for bcs_root_key in bcs_root_keys {
491 if is_valid_root_key(&bcs_root_key) {
492 let root_key = bcs::from_bytes(&bcs_root_key)?;
493 match root_key {
494 RootKey::ChainState(chain_id) => {
495 let store = database.open_shared(&bcs_root_key)?;
496 let key_values = store.find_key_values_by_prefix(&[]).await?;
497 chain_ids_key_values.insert(chain_id, key_values);
498 }
499 RootKey::ConfirmedBlock(hash) => {
500 let store = database.open_shared(&bcs_root_key)?;
501 let value = store.read_value_bytes(LITE_CERTIFICATE_KEY).await?;
502 if let Some(value) = value {
503 certificates.insert(hash, value);
504 }
505 let value = store.read_value_bytes(BLOCK_KEY).await?;
506 if let Some(value) = value {
507 confirmed_blocks.insert(hash, value);
508 }
509 }
510 RootKey::Blob(blob_id) => {
511 let store = database.open_shared(&bcs_root_key)?;
512 let value = store.read_value_bytes(BLOB_KEY).await?;
513 if let Some(value) = value {
514 blobs.insert(blob_id, value);
515 }
516 let value = store.read_value_bytes(BLOB_STATE_KEY).await?;
517 if let Some(value) = value {
518 blob_states.insert(blob_id, value);
519 }
520 }
521 RootKey::Event(chain_id) => {
522 let store = database.open_shared(&bcs_root_key)?;
523 let key_values = store.find_key_values_by_prefix(&[]).await?;
524 for (key, value) in key_values {
525 let restricted_event_id = bcs::from_bytes::<RestrictedEventId>(&key)?;
526 let event_id = EventId {
527 chain_id,
528 stream_id: restricted_event_id.stream_id,
529 index: restricted_event_id.index,
530 };
531 events.insert(event_id, value);
532 }
533 }
534 RootKey::Placeholder => {
535 }
537 RootKey::NetworkDescription => {
538 let store = database.open_shared(&bcs_root_key)?;
539 let value = store.read_value_bytes(NETWORK_DESCRIPTION_KEY).await?;
540 if let Some(value) = value {
541 network_description = Some(value);
542 }
543 }
544 RootKey::BlockExporterState(index) => {
545 let store = database.open_shared(&bcs_root_key)?;
546 let key_values = store.find_key_values_by_prefix(&[]).await?;
547 block_exporter_states.insert(index, key_values);
548 }
549 RootKey::BlockByHeight(_) => {
550 }
552 }
553 }
554 }
555 Ok(StorageState {
556 chain_ids_key_values,
557 certificates,
558 confirmed_blocks,
559 blobs,
560 blob_states,
561 events,
562 block_exporter_states,
563 network_description,
564 })
565 }
566
567 async fn test_storage_migration<D>() -> Result<(), ViewError>
568 where
569 D: TestKeyValueDatabase + Clone + Send + Sync + 'static,
570 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
571 D::Error: Send + Sync,
572 {
573 let database = D::connect_test_namespace().await?;
574 let mut storage_state = get_storage_state();
576 write_storage_state_old_schema(&database, storage_state.clone()).await?;
577 let storage = DbStorage::<D, WallClock>::new(database, None, WallClock);
579 storage.migrate_if_needed().await?;
580 let read_storage_state = read_storage_state_new_schema(storage.database.deref()).await?;
582 assert_eq!(read_storage_state, storage_state);
583 let mut appended_state = get_storage_state();
586 appended_state.network_description = None;
587 write_storage_state_old_schema(storage.database.deref(), appended_state.clone()).await?;
588 storage.migrate_if_needed().await?;
589 storage_state.append_storage_state(appended_state);
590 let read_storage_state = read_storage_state_new_schema(storage.database.deref()).await?;
591 assert_eq!(read_storage_state, storage_state);
592 Ok(())
593 }
594
595 #[test_case(PhantomData::<MemoryDatabase>; "MemoryDatabase")]
596 #[cfg_attr(with_rocksdb, test_case(PhantomData::<RocksDbDatabase>; "RocksDbDatabase"))]
597 #[cfg_attr(with_scylladb, test_case(PhantomData::<ScyllaDbDatabase>; "ScyllaDbDatabase"))]
598 #[tokio::test]
599 async fn test_storage_migration_cases<D>(_storage_type: PhantomData<D>) -> Result<(), ViewError>
600 where
601 D: TestKeyValueDatabase + Clone + Send + Sync + 'static,
602 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
603 D::Error: Send + Sync,
604 {
605 test_storage_migration::<D>().await
606 }
607}