1use std::io::Write;
2use std::path::Path;
3use std::time::{Duration, Instant};
4
5use ethrex_common::H256;
6use ethrex_common::types::{BlockHash, BlockNumber, Index};
7use ethrex_rlp::decode::RLPDecode;
8use ethrex_rlp::encode::RLPEncode;
9
10use crate::api::tables::{RECEIPTS, RECEIPTS_V2, TRANSACTION_LOCATIONS};
11use crate::api::{StorageBackend, StorageWriteBatch};
12use crate::error::StoreError;
13use crate::store::receipt_key;
14use crate::{STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION};
15
16use super::store::StoreMetadata;
17
18pub type MigrationFn = fn(backend: &dyn StorageBackend) -> Result<(), StoreError>;
23
24pub const MIGRATIONS: &[MigrationFn] = &[migrate_1_to_2, migrate_2_to_3];
34
35const _: () = assert!(
38 MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize,
39 "MIGRATIONS length must equal STORE_SCHEMA_VERSION - 1"
40);
41
42fn migration_for_version(version: u64) -> MigrationFn {
44 MIGRATIONS[(version - 1) as usize]
45}
46
47const PROGRESS_LOG_INTERVAL: Duration = Duration::from_secs(10);
49
50fn entries_per_second(count: u64, elapsed: Duration) -> f64 {
53 let secs = elapsed.as_secs_f64();
54 if secs > 0.0 { count as f64 / secs } else { 0.0 }
55}
56
57pub fn run_pending_migrations(
66 backend: &dyn StorageBackend,
67 db_path: &Path,
68 current_version: u64,
69) -> Result<(), StoreError> {
70 if current_version > STORE_SCHEMA_VERSION {
71 tracing::warn!(
72 "Database schema is at v{current_version}, ahead of this binary's v{STORE_SCHEMA_VERSION}; \
73 running an older binary against a newer database is unsupported. Upgrade the binary"
74 );
75 }
76
77 let pending = STORE_SCHEMA_VERSION.saturating_sub(current_version);
78 if pending == 0 {
79 return Ok(());
80 }
81
82 tracing::info!(
83 "Database schema is at v{current_version}, latest is v{STORE_SCHEMA_VERSION}; running {pending} migration(s). This may take a while on large databases"
84 );
85
86 for version in current_version..STORE_SCHEMA_VERSION {
87 let target = version + 1;
88
89 tracing::info!("Running schema migration v{version} → v{target}");
90 let start = Instant::now();
91
92 migration_for_version(version)(backend).map_err(|e| StoreError::MigrationFailed {
93 from: version,
94 to: target,
95 reason: e.to_string(),
96 })?;
97
98 write_metadata_version(db_path, target).map_err(|e| StoreError::MigrationFailed {
100 from: version,
101 to: target,
102 reason: format!("failed to write metadata: {e}"),
103 })?;
104
105 tracing::info!(
106 "Schema migration v{version} → v{target} completed in {:.1}s",
107 start.elapsed().as_secs_f64()
108 );
109 }
110
111 Ok(())
112}
113
114fn write_metadata_version(db_path: &Path, version: u64) -> Result<(), StoreError> {
120 let metadata_path = db_path.join(STORE_METADATA_FILENAME);
121 let tmp_path = db_path.join(format!("{}.tmp", STORE_METADATA_FILENAME));
122 let metadata = StoreMetadata::new(version);
123 let serialized = serde_json::to_string_pretty(&metadata)?;
124
125 let mut file = std::fs::File::create(&tmp_path)?;
126 file.write_all(serialized.as_bytes())?;
127 file.sync_all()?;
128 std::fs::rename(&tmp_path, &metadata_path)?;
129
130 Ok(())
131}
132
133fn migrate_1_to_2(backend: &dyn StorageBackend) -> Result<(), StoreError> {
146 const BATCH_SIZE: usize = 10_000;
147
148 let txn = backend.begin_read()?;
149 let iter = txn.prefix_iterator(RECEIPTS, &[])?;
150
151 let mut batch: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(BATCH_SIZE);
152 let mut migrated: u64 = 0;
153 let start = Instant::now();
154 let mut last_progress_log = Instant::now();
155
156 for result in iter {
157 let (key, value) = result?;
158
159 let (block_hash, index) = match <(H256, u64)>::decode(&key) {
160 Ok(decoded) => decoded,
161 Err(_) => {
162 tracing::warn!(
163 "Schema migration v1 → v2: skipping receipts key that failed RLP decode (len={})",
164 key.len()
165 );
166 continue;
167 }
168 };
169
170 let new_key = receipt_key(&block_hash, index);
171 batch.push((new_key, value.to_vec()));
172
173 if batch.len() >= BATCH_SIZE {
174 let count = batch.len() as u64;
175 let mut tx = backend.begin_write()?;
176 tx.put_batch(RECEIPTS_V2, std::mem::take(&mut batch))?;
177 tx.commit()?;
178 migrated += count;
179 if last_progress_log.elapsed() >= PROGRESS_LOG_INTERVAL {
180 let rate = entries_per_second(migrated, start.elapsed());
181 tracing::info!(
182 "Schema migration v1 → v2: {migrated} receipt entries migrated so far ({rate:.0} entries/s)"
183 );
184 last_progress_log = Instant::now();
185 }
186 }
187 }
188
189 if !batch.is_empty() {
191 let count = batch.len() as u64;
192 let mut tx = backend.begin_write()?;
193 tx.put_batch(RECEIPTS_V2, batch)?;
194 tx.commit()?;
195 migrated += count;
196 }
197
198 tracing::info!("Schema migration v1 → v2: migrated {migrated} receipt entries in total");
199 Ok(())
200}
201
202type TxLocation = (BlockNumber, BlockHash, Index);
203
204fn migrate_2_to_3(backend: &dyn StorageBackend) -> Result<(), StoreError> {
220 const GROUPS_PER_COMMIT: usize = 50_000;
221
222 let read = backend.begin_read()?;
223 let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[])?;
226
227 let mut write_batch = backend.begin_write()?;
228 let mut groups_in_batch: usize = 0;
229 let mut current: Option<(H256, Vec<TxLocation>, Vec<Vec<u8>>)> = None;
230 let mut total_groups: u64 = 0;
231 let mut total_old_entries: u64 = 0;
232 let start = Instant::now();
233 let mut last_progress_log = Instant::now();
234
235 for result in iter {
236 let (key, value) = result?;
237
238 if key.len() == 32 {
240 continue;
241 }
242 if key.len() != 64 {
243 return Err(StoreError::Custom(format!(
244 "unexpected TRANSACTION_LOCATIONS key length {} during migration",
245 key.len()
246 )));
247 }
248
249 total_old_entries += 1;
250 if last_progress_log.elapsed() >= PROGRESS_LOG_INTERVAL {
251 let rate = entries_per_second(total_old_entries, start.elapsed());
252 tracing::info!(
253 "Schema migration v2 → v3: {total_old_entries} transaction location entries processed so far ({rate:.0} entries/s)"
254 );
255 last_progress_log = Instant::now();
256 }
257
258 let tx_hash = H256::from_slice(&key[..32]);
259 let location = TxLocation::decode(&value)?;
260 let key_vec = key.into_vec();
261
262 match &mut current {
263 Some((h, locs, keys_to_delete)) if *h == tx_hash => {
264 locs.push(location);
265 keys_to_delete.push(key_vec);
266 }
267 _ => {
268 if let Some((h, locs, keys_to_delete)) = current.take() {
269 flush_tx_location_group(&mut *write_batch, h, locs, keys_to_delete)?;
270 total_groups += 1;
271 groups_in_batch += 1;
272 if groups_in_batch >= GROUPS_PER_COMMIT {
273 write_batch.commit()?;
274 write_batch = backend.begin_write()?;
279 groups_in_batch = 0;
280 }
281 }
282 current = Some((tx_hash, vec![location], vec![key_vec]));
283 }
284 }
285 }
286
287 if let Some((h, locs, keys_to_delete)) = current {
288 flush_tx_location_group(&mut *write_batch, h, locs, keys_to_delete)?;
289 total_groups += 1;
290 }
291
292 write_batch.commit()?;
296
297 tracing::info!(
298 "Schema migration v2 → v3: rewrote {} transaction location entries into {} transaction records",
299 total_old_entries,
300 total_groups
301 );
302 Ok(())
303}
304
305fn flush_tx_location_group(
306 write_batch: &mut dyn StorageWriteBatch,
307 tx_hash: H256,
308 locations: Vec<TxLocation>,
309 composite_keys: Vec<Vec<u8>>,
310) -> Result<(), StoreError> {
311 write_batch.merge(
316 TRANSACTION_LOCATIONS,
317 tx_hash.as_bytes(),
318 &locations.encode_to_vec(),
319 )?;
320 for key in composite_keys {
321 write_batch.delete(TRANSACTION_LOCATIONS, &key)?;
322 }
323 Ok(())
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn migrations_length_matches_schema_version() {
332 assert_eq!(
333 MIGRATIONS.len(),
334 (STORE_SCHEMA_VERSION - 1) as usize,
335 "MIGRATIONS array length must be STORE_SCHEMA_VERSION - 1"
336 );
337 }
338
339 #[test]
340 fn run_pending_migrations_noop_when_current() {
341 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
344 let temp_dir = tempfile::tempdir().unwrap();
345
346 write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();
348
349 let result = run_pending_migrations(&backend, temp_dir.path(), STORE_SCHEMA_VERSION);
350 assert!(result.is_ok());
351 }
352
353 #[test]
354 fn fresh_store_creates_correct_metadata() {
355 let temp_dir = tempfile::tempdir().unwrap();
356
357 write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();
358
359 let metadata_path = temp_dir.path().join(STORE_METADATA_FILENAME);
360 let contents = std::fs::read_to_string(&metadata_path).unwrap();
361 let metadata: StoreMetadata = serde_json::from_str(&contents).unwrap();
362 assert_eq!(metadata.schema_version, STORE_SCHEMA_VERSION);
363 }
364
365 #[test]
366 fn migrate_1_to_2_converts_rlp_keys_to_fixed_width() {
367 use crate::api::StorageBackend;
368 use ethrex_common::types::{Receipt, TxType};
369 use ethrex_rlp::encode::RLPEncode;
370
371 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
372
373 let block_hash = H256::random();
374 let receipts: Vec<Receipt> = (0..5)
375 .map(|i| Receipt::new(TxType::Legacy, true, (i + 1) * 21000, vec![]))
376 .collect();
377
378 {
380 let mut tx = backend.begin_write().unwrap();
381 let batch: Vec<(Vec<u8>, Vec<u8>)> = receipts
382 .iter()
383 .enumerate()
384 .map(|(i, r)| {
385 let old_key = (block_hash, i as u64).encode_to_vec();
386 let value = r.encode_to_vec();
387 (old_key, value)
388 })
389 .collect();
390 tx.put_batch(RECEIPTS, batch).unwrap();
391 tx.commit().unwrap();
392 }
393
394 {
396 let txn = backend.begin_read().unwrap();
397 let old_key = (block_hash, 0u64).encode_to_vec();
398 assert!(txn.get(RECEIPTS, &old_key).unwrap().is_some());
399 }
400
401 migrate_1_to_2(&backend).unwrap();
403
404 let txn = backend.begin_read().unwrap();
406 for i in 0..5u64 {
407 let new_key = receipt_key(&block_hash, i);
408 let value = txn
409 .get(RECEIPTS_V2, &new_key)
410 .unwrap()
411 .expect("new key should exist in RECEIPTS_V2 after migration");
412 let decoded = Receipt::decode(value.as_ref()).unwrap();
413 assert_eq!(decoded, receipts[i as usize]);
414
415 let old_key = (block_hash, i).encode_to_vec();
417 assert!(
418 txn.get(RECEIPTS, &old_key).unwrap().is_some(),
419 "old key should still exist in RECEIPTS (dropped after migration)"
420 );
421 }
422 }
423
424 fn seed_old_entry(
427 backend: &dyn StorageBackend,
428 tx_hash: H256,
429 block_number: BlockNumber,
430 block_hash: BlockHash,
431 index: Index,
432 ) {
433 let mut composite_key = Vec::with_capacity(64);
434 composite_key.extend_from_slice(tx_hash.as_bytes());
435 composite_key.extend_from_slice(block_hash.as_bytes());
436 let value = (block_number, block_hash, index).encode_to_vec();
437
438 let mut batch = backend.begin_write().unwrap();
439 batch
440 .put(TRANSACTION_LOCATIONS, &composite_key, &value)
441 .unwrap();
442 batch.commit().unwrap();
443 }
444
445 fn read_new_entry(
446 backend: &dyn StorageBackend,
447 tx_hash: H256,
448 ) -> Option<Vec<(BlockNumber, BlockHash, Index)>> {
449 let read = backend.begin_read().unwrap();
450 let bytes = read
451 .get(TRANSACTION_LOCATIONS, tx_hash.as_bytes())
452 .unwrap()?;
453 Some(<Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes).unwrap())
454 }
455
456 fn h256(byte: u8) -> H256 {
457 H256::from_low_u64_be(byte as u64)
458 }
459
460 #[test]
461 fn migrate_2_to_3_empty_table() {
462 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
463 migrate_2_to_3(&backend).unwrap();
464 assert!(read_new_entry(&backend, h256(1)).is_none());
466 }
467
468 #[test]
469 fn migrate_2_to_3_single_entry_per_hash() {
470 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
471 seed_old_entry(&backend, h256(1), 100, h256(0x10), 0);
472 seed_old_entry(&backend, h256(2), 101, h256(0x11), 5);
473 seed_old_entry(&backend, h256(3), 102, h256(0x12), 7);
474
475 migrate_2_to_3(&backend).unwrap();
476
477 assert_eq!(
478 read_new_entry(&backend, h256(1)).unwrap(),
479 vec![(100u64, h256(0x10), 0u64)]
480 );
481 assert_eq!(
482 read_new_entry(&backend, h256(2)).unwrap(),
483 vec![(101u64, h256(0x11), 5u64)]
484 );
485 assert_eq!(
486 read_new_entry(&backend, h256(3)).unwrap(),
487 vec![(102u64, h256(0x12), 7u64)]
488 );
489
490 let read = backend.begin_read().unwrap();
492 let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[]).unwrap();
493 for entry in iter {
494 let (key, _) = entry.unwrap();
495 assert_eq!(key.len(), 32, "leftover non-migrated key: {:?}", key);
496 }
497 }
498
499 #[test]
500 fn migrate_2_to_3_multi_block_per_hash() {
501 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
502 seed_old_entry(&backend, h256(0xAA), 100, h256(0x10), 3);
504 seed_old_entry(&backend, h256(0xAA), 100, h256(0x11), 4);
505 seed_old_entry(&backend, h256(0xAA), 101, h256(0x12), 5);
506
507 migrate_2_to_3(&backend).unwrap();
508
509 let mut got = read_new_entry(&backend, h256(0xAA)).unwrap();
510 got.sort();
511 let mut expected = vec![
512 (100u64, h256(0x10), 3u64),
513 (100u64, h256(0x11), 4u64),
514 (101u64, h256(0x12), 5u64),
515 ];
516 expected.sort();
517 assert_eq!(got, expected);
518 }
519
520 #[test]
521 fn migrate_2_to_3_is_idempotent_on_partial_state() {
522 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
526
527 {
529 let v3_value: Vec<(BlockNumber, BlockHash, Index)> =
530 vec![(100, h256(0x10), 0), (100, h256(0x11), 0)];
531 let mut batch = backend.begin_write().unwrap();
532 batch
533 .put(
534 TRANSACTION_LOCATIONS,
535 h256(1).as_bytes(),
536 &v3_value.encode_to_vec(),
537 )
538 .unwrap();
539 batch.commit().unwrap();
540 }
541 seed_old_entry(&backend, h256(2), 200, h256(0x20), 0);
543 seed_old_entry(&backend, h256(2), 200, h256(0x21), 1);
544
545 migrate_2_to_3(&backend).unwrap();
546
547 assert_eq!(
549 read_new_entry(&backend, h256(1)).unwrap(),
550 vec![(100u64, h256(0x10), 0u64), (100u64, h256(0x11), 0u64)]
551 );
552
553 let mut got = read_new_entry(&backend, h256(2)).unwrap();
555 got.sort();
556 let mut expected = vec![(200u64, h256(0x20), 0u64), (200u64, h256(0x21), 1u64)];
557 expected.sort();
558 assert_eq!(got, expected);
559
560 let read = backend.begin_read().unwrap();
562 let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[]).unwrap();
563 for entry in iter {
564 let (key, _) = entry.unwrap();
565 assert_eq!(key.len(), 32);
566 }
567 }
568
569 #[test]
575 fn migrate_2_to_3_unions_same_hash_mixed_state() {
576 let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
577 let tx = h256(0x42);
578
579 {
581 let v3_value: Vec<(BlockNumber, BlockHash, Index)> = vec![(100, h256(0x10), 0)];
582 let mut batch = backend.begin_write().unwrap();
583 batch
584 .merge(
585 TRANSACTION_LOCATIONS,
586 tx.as_bytes(),
587 &v3_value.encode_to_vec(),
588 )
589 .unwrap();
590 batch.commit().unwrap();
591 }
592 seed_old_entry(&backend, tx, 101, h256(0x11), 3);
594 seed_old_entry(&backend, tx, 102, h256(0x12), 7);
595
596 migrate_2_to_3(&backend).unwrap();
597
598 let mut got = read_new_entry(&backend, tx).unwrap();
599 got.sort();
600 let mut expected = vec![
601 (100u64, h256(0x10), 0u64), (101u64, h256(0x11), 3u64),
603 (102u64, h256(0x12), 7u64),
604 ];
605 expected.sort();
606 assert_eq!(
607 got, expected,
608 "merge must union, not overwrite, on mixed state"
609 );
610 }
611}