Skip to main content

ethrex_storage/
migrations.rs

1use std::io::Write;
2use std::path::Path;
3use std::time::{Duration, Instant};
4
5use ethrex_common::H256;
6use ethrex_common::types::{BlockHash, BlockNumber, Index};
7use ethrex_rlp::decode::RLPDecode;
8use ethrex_rlp::encode::RLPEncode;
9
10use crate::api::tables::{RECEIPTS, RECEIPTS_V2, TRANSACTION_LOCATIONS};
11use crate::api::{StorageBackend, StorageWriteBatch};
12use crate::error::StoreError;
13use crate::store::receipt_key;
14use crate::{STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION};
15
16use super::store::StoreMetadata;
17
18/// A migration function that upgrades the database schema by one version.
19///
20/// Receives a reference to the storage backend so it can read/write data
21/// as needed for the migration.
22pub type MigrationFn = fn(backend: &dyn StorageBackend) -> Result<(), StoreError>;
23
24/// Migration functions indexed by source version.
25///
26/// `MIGRATIONS[i]` upgrades the schema from version `(i + 1)` to `(i + 2)`.
27/// For example:
28/// - `MIGRATIONS[0]` upgrades v1 → v2
29/// - `MIGRATIONS[1]` upgrades v2 → v3
30///
31/// **Invariant**: `MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize`
32/// (empty when `STORE_SCHEMA_VERSION == 1`, one entry when it's 2, etc.)
33pub const MIGRATIONS: &[MigrationFn] = &[migrate_1_to_2, migrate_2_to_3];
34
35// Compile-time check: the number of migration functions must match the number
36// of version gaps (i.e. STORE_SCHEMA_VERSION - 1).
37const _: () = assert!(
38    MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize,
39    "MIGRATIONS length must equal STORE_SCHEMA_VERSION - 1"
40);
41
42/// Returns the migration function that upgrades from `version` to `version + 1`.
43fn migration_for_version(version: u64) -> MigrationFn {
44    MIGRATIONS[(version - 1) as usize]
45}
46
47/// Minimum interval between migration progress log lines.
48const PROGRESS_LOG_INTERVAL: Duration = Duration::from_secs(10);
49
50/// Per-second processing rate for progress logs. Returns 0 when `elapsed` is
51/// zero so the division can never produce `inf` or `NaN`.
52fn entries_per_second(count: u64, elapsed: Duration) -> f64 {
53    let secs = elapsed.as_secs_f64();
54    if secs > 0.0 { count as f64 / secs } else { 0.0 }
55}
56
57/// Runs all pending migrations from `current_version` up to `STORE_SCHEMA_VERSION`.
58///
59/// Each migration is applied one version at a time, and the metadata file is
60/// updated (with fsync) after each successful step for crash safety.
61///
62/// Returns `Ok(())` if `current_version == STORE_SCHEMA_VERSION` (no-op).
63/// If `current_version > STORE_SCHEMA_VERSION` (older binary against a newer
64/// database), it warns and returns `Ok(())` without migrating.
65pub fn run_pending_migrations(
66    backend: &dyn StorageBackend,
67    db_path: &Path,
68    current_version: u64,
69) -> Result<(), StoreError> {
70    if current_version > STORE_SCHEMA_VERSION {
71        tracing::warn!(
72            "Database schema is at v{current_version}, ahead of this binary's v{STORE_SCHEMA_VERSION}; \
73             running an older binary against a newer database is unsupported. Upgrade the binary"
74        );
75    }
76
77    let pending = STORE_SCHEMA_VERSION.saturating_sub(current_version);
78    if pending == 0 {
79        return Ok(());
80    }
81
82    tracing::info!(
83        "Database schema is at v{current_version}, latest is v{STORE_SCHEMA_VERSION}; running {pending} migration(s). This may take a while on large databases"
84    );
85
86    for version in current_version..STORE_SCHEMA_VERSION {
87        let target = version + 1;
88
89        tracing::info!("Running schema migration v{version} → v{target}");
90        let start = Instant::now();
91
92        migration_for_version(version)(backend).map_err(|e| StoreError::MigrationFailed {
93            from: version,
94            to: target,
95            reason: e.to_string(),
96        })?;
97
98        // Persist the new version to metadata.json after each migration step
99        write_metadata_version(db_path, target).map_err(|e| StoreError::MigrationFailed {
100            from: version,
101            to: target,
102            reason: format!("failed to write metadata: {e}"),
103        })?;
104
105        tracing::info!(
106            "Schema migration v{version} → v{target} completed in {:.1}s",
107            start.elapsed().as_secs_f64()
108        );
109    }
110
111    Ok(())
112}
113
114/// Writes the schema version to metadata.json using write-to-temp-then-rename
115/// for crash safety. On POSIX filesystems `rename` is atomic, so the metadata
116/// file is never left in a partial/truncated state.
117// TODO: move metadata persistence into the StorageBackend abstraction so we
118// don't need to pass `db_path` around.
119fn write_metadata_version(db_path: &Path, version: u64) -> Result<(), StoreError> {
120    let metadata_path = db_path.join(STORE_METADATA_FILENAME);
121    let tmp_path = db_path.join(format!("{}.tmp", STORE_METADATA_FILENAME));
122    let metadata = StoreMetadata::new(version);
123    let serialized = serde_json::to_string_pretty(&metadata)?;
124
125    let mut file = std::fs::File::create(&tmp_path)?;
126    file.write_all(serialized.as_bytes())?;
127    file.sync_all()?;
128    std::fs::rename(&tmp_path, &metadata_path)?;
129
130    Ok(())
131}
132
133/// Migrates the RECEIPTS table from RLP-encoded `(BlockHash, u64)` keys
134/// to raw `block_hash (32B) || index (8B big-endian u64)` keys in a new
135/// `receipts_v2` column family.
136///
137/// This two-CF approach copies entries from the old `receipts` CF to
138/// `receipts_v2` with the new key format. The old `receipts` CF is **not**
139/// deleted here — `Store::new()` calls `drop_obsolete_cfs()` right after
140/// this migration returns, which drops it in the same startup.
141///
142/// Crash safety: if interrupted, metadata still says v1, so the migration
143/// restarts from scratch on next boot. Duplicate puts to `receipts_v2` are
144/// idempotent.
145fn migrate_1_to_2(backend: &dyn StorageBackend) -> Result<(), StoreError> {
146    const BATCH_SIZE: usize = 10_000;
147
148    let txn = backend.begin_read()?;
149    let iter = txn.prefix_iterator(RECEIPTS, &[])?;
150
151    let mut batch: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(BATCH_SIZE);
152    let mut migrated: u64 = 0;
153    let start = Instant::now();
154    let mut last_progress_log = Instant::now();
155
156    for result in iter {
157        let (key, value) = result?;
158
159        let (block_hash, index) = match <(H256, u64)>::decode(&key) {
160            Ok(decoded) => decoded,
161            Err(_) => {
162                tracing::warn!(
163                    "Schema migration v1 → v2: skipping receipts key that failed RLP decode (len={})",
164                    key.len()
165                );
166                continue;
167            }
168        };
169
170        let new_key = receipt_key(&block_hash, index);
171        batch.push((new_key, value.to_vec()));
172
173        if batch.len() >= BATCH_SIZE {
174            let count = batch.len() as u64;
175            let mut tx = backend.begin_write()?;
176            tx.put_batch(RECEIPTS_V2, std::mem::take(&mut batch))?;
177            tx.commit()?;
178            migrated += count;
179            if last_progress_log.elapsed() >= PROGRESS_LOG_INTERVAL {
180                let rate = entries_per_second(migrated, start.elapsed());
181                tracing::info!(
182                    "Schema migration v1 → v2: {migrated} receipt entries migrated so far ({rate:.0} entries/s)"
183                );
184                last_progress_log = Instant::now();
185            }
186        }
187    }
188
189    // Flush remaining entries.
190    if !batch.is_empty() {
191        let count = batch.len() as u64;
192        let mut tx = backend.begin_write()?;
193        tx.put_batch(RECEIPTS_V2, batch)?;
194        tx.commit()?;
195        migrated += count;
196    }
197
198    tracing::info!("Schema migration v1 → v2: migrated {migrated} receipt entries in total");
199    Ok(())
200}
201
202type TxLocation = (BlockNumber, BlockHash, Index);
203
204/// Rewrites `TRANSACTION_LOCATIONS` from the v2 composite-key schema
205/// (`key = tx_hash || block_hash`, `value = (block_number, block_hash, index)`)
206/// to the v3 schema (`key = tx_hash`, `value = Vec<(block_number, block_hash, index)>`).
207///
208/// Streams the old table in lex order, grouping consecutive entries by tx_hash
209/// (composite keys with the same 32-byte prefix are adjacent — both backends
210/// iterate sorted). Flushes each group as an atomic write batch (merge the new
211/// key + delete the old composite keys), chunking commits to bound memory.
212/// Skips any already-migrated 32-byte keys it encounters.
213///
214/// Crash-resume is safe by construction: the new value is written with `merge`,
215/// not `put`, so if a tx_hash ever has both a v3 value and leftover v2 siblings
216/// (e.g. a non-atomic backend), the resumed run unions them (deduping by
217/// block_hash) instead of overwriting. The merge operator that already backs
218/// the live write path makes this free.
219fn migrate_2_to_3(backend: &dyn StorageBackend) -> Result<(), StoreError> {
220    const GROUPS_PER_COMMIT: usize = 50_000;
221
222    let read = backend.begin_read()?;
223    // Empty prefix → full-table scan. Both backends yield keys in sorted order,
224    // which the same-prefix grouping below relies on.
225    let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[])?;
226
227    let mut write_batch = backend.begin_write()?;
228    let mut groups_in_batch: usize = 0;
229    let mut current: Option<(H256, Vec<TxLocation>, Vec<Vec<u8>>)> = None;
230    let mut total_groups: u64 = 0;
231    let mut total_old_entries: u64 = 0;
232    let start = Instant::now();
233    let mut last_progress_log = Instant::now();
234
235    for result in iter {
236        let (key, value) = result?;
237
238        // Already-migrated entries (32-byte tx_hash keys, from a prior partial run): skip.
239        if key.len() == 32 {
240            continue;
241        }
242        if key.len() != 64 {
243            return Err(StoreError::Custom(format!(
244                "unexpected TRANSACTION_LOCATIONS key length {} during migration",
245                key.len()
246            )));
247        }
248
249        total_old_entries += 1;
250        if last_progress_log.elapsed() >= PROGRESS_LOG_INTERVAL {
251            let rate = entries_per_second(total_old_entries, start.elapsed());
252            tracing::info!(
253                "Schema migration v2 → v3: {total_old_entries} transaction location entries processed so far ({rate:.0} entries/s)"
254            );
255            last_progress_log = Instant::now();
256        }
257
258        let tx_hash = H256::from_slice(&key[..32]);
259        let location = TxLocation::decode(&value)?;
260        let key_vec = key.into_vec();
261
262        match &mut current {
263            Some((h, locs, keys_to_delete)) if *h == tx_hash => {
264                locs.push(location);
265                keys_to_delete.push(key_vec);
266            }
267            _ => {
268                if let Some((h, locs, keys_to_delete)) = current.take() {
269                    flush_tx_location_group(&mut *write_batch, h, locs, keys_to_delete)?;
270                    total_groups += 1;
271                    groups_in_batch += 1;
272                    if groups_in_batch >= GROUPS_PER_COMMIT {
273                        write_batch.commit()?;
274                        // Re-acquire instead of relying on post-commit reuse
275                        // of the trait object (works today via mem::take in
276                        // RocksDB and a no-op InMemory commit, but it's not
277                        // a documented contract on `StorageWriteBatch`).
278                        write_batch = backend.begin_write()?;
279                        groups_in_batch = 0;
280                    }
281                }
282                current = Some((tx_hash, vec![location], vec![key_vec]));
283            }
284        }
285    }
286
287    if let Some((h, locs, keys_to_delete)) = current {
288        flush_tx_location_group(&mut *write_batch, h, locs, keys_to_delete)?;
289        total_groups += 1;
290    }
291
292    // Final commit. `groups_in_batch` is not bumped/reset here intentionally
293    // — the post-loop flush is followed immediately by a commit, after which
294    // the variable goes out of scope.
295    write_batch.commit()?;
296
297    tracing::info!(
298        "Schema migration v2 → v3: rewrote {} transaction location entries into {} transaction records",
299        total_old_entries,
300        total_groups
301    );
302    Ok(())
303}
304
305fn flush_tx_location_group(
306    write_batch: &mut dyn StorageWriteBatch,
307    tx_hash: H256,
308    locations: Vec<TxLocation>,
309    composite_keys: Vec<Vec<u8>>,
310) -> Result<(), StoreError> {
311    // Use `merge`, not `put`: the operand is the same `Vec` type as the value,
312    // so a re-processed group unions with any existing v3 value (dedup by
313    // block_hash) instead of overwriting it. The composite-key deletes ride in
314    // the same batch, so the group is applied atomically.
315    write_batch.merge(
316        TRANSACTION_LOCATIONS,
317        tx_hash.as_bytes(),
318        &locations.encode_to_vec(),
319    )?;
320    for key in composite_keys {
321        write_batch.delete(TRANSACTION_LOCATIONS, &key)?;
322    }
323    Ok(())
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn migrations_length_matches_schema_version() {
332        assert_eq!(
333            MIGRATIONS.len(),
334            (STORE_SCHEMA_VERSION - 1) as usize,
335            "MIGRATIONS array length must be STORE_SCHEMA_VERSION - 1"
336        );
337    }
338
339    #[test]
340    fn run_pending_migrations_noop_when_current() {
341        // When current_version == STORE_SCHEMA_VERSION, nothing should happen.
342        // We use a dummy in-memory backend since no migrations will actually run.
343        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
344        let temp_dir = tempfile::tempdir().unwrap();
345
346        // Write initial metadata
347        write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();
348
349        let result = run_pending_migrations(&backend, temp_dir.path(), STORE_SCHEMA_VERSION);
350        assert!(result.is_ok());
351    }
352
353    #[test]
354    fn fresh_store_creates_correct_metadata() {
355        let temp_dir = tempfile::tempdir().unwrap();
356
357        write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();
358
359        let metadata_path = temp_dir.path().join(STORE_METADATA_FILENAME);
360        let contents = std::fs::read_to_string(&metadata_path).unwrap();
361        let metadata: StoreMetadata = serde_json::from_str(&contents).unwrap();
362        assert_eq!(metadata.schema_version, STORE_SCHEMA_VERSION);
363    }
364
365    #[test]
366    fn migrate_1_to_2_converts_rlp_keys_to_fixed_width() {
367        use crate::api::StorageBackend;
368        use ethrex_common::types::{Receipt, TxType};
369        use ethrex_rlp::encode::RLPEncode;
370
371        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
372
373        let block_hash = H256::random();
374        let receipts: Vec<Receipt> = (0..5)
375            .map(|i| Receipt::new(TxType::Legacy, true, (i + 1) * 21000, vec![]))
376            .collect();
377
378        // Seed old-format RLP keys: (BlockHash, u64).encode_to_vec()
379        {
380            let mut tx = backend.begin_write().unwrap();
381            let batch: Vec<(Vec<u8>, Vec<u8>)> = receipts
382                .iter()
383                .enumerate()
384                .map(|(i, r)| {
385                    let old_key = (block_hash, i as u64).encode_to_vec();
386                    let value = r.encode_to_vec();
387                    (old_key, value)
388                })
389                .collect();
390            tx.put_batch(RECEIPTS, batch).unwrap();
391            tx.commit().unwrap();
392        }
393
394        // Verify old keys exist
395        {
396            let txn = backend.begin_read().unwrap();
397            let old_key = (block_hash, 0u64).encode_to_vec();
398            assert!(txn.get(RECEIPTS, &old_key).unwrap().is_some());
399        }
400
401        // Run migration
402        migrate_1_to_2(&backend).unwrap();
403
404        // Verify new fixed-width keys exist in RECEIPTS_V2
405        let txn = backend.begin_read().unwrap();
406        for i in 0..5u64 {
407            let new_key = receipt_key(&block_hash, i);
408            let value = txn
409                .get(RECEIPTS_V2, &new_key)
410                .unwrap()
411                .expect("new key should exist in RECEIPTS_V2 after migration");
412            let decoded = Receipt::decode(value.as_ref()).unwrap();
413            assert_eq!(decoded, receipts[i as usize]);
414
415            // Old keys should still be in RECEIPTS (drop_obsolete_cfs runs after migration)
416            let old_key = (block_hash, i).encode_to_vec();
417            assert!(
418                txn.get(RECEIPTS, &old_key).unwrap().is_some(),
419                "old key should still exist in RECEIPTS (dropped after migration)"
420            );
421        }
422    }
423
424    /// Seeds the backend with one entry under the v2 composite-key schema:
425    /// `key = tx_hash || block_hash`, `value = (block_number, block_hash, index)`.
426    fn seed_old_entry(
427        backend: &dyn StorageBackend,
428        tx_hash: H256,
429        block_number: BlockNumber,
430        block_hash: BlockHash,
431        index: Index,
432    ) {
433        let mut composite_key = Vec::with_capacity(64);
434        composite_key.extend_from_slice(tx_hash.as_bytes());
435        composite_key.extend_from_slice(block_hash.as_bytes());
436        let value = (block_number, block_hash, index).encode_to_vec();
437
438        let mut batch = backend.begin_write().unwrap();
439        batch
440            .put(TRANSACTION_LOCATIONS, &composite_key, &value)
441            .unwrap();
442        batch.commit().unwrap();
443    }
444
445    fn read_new_entry(
446        backend: &dyn StorageBackend,
447        tx_hash: H256,
448    ) -> Option<Vec<(BlockNumber, BlockHash, Index)>> {
449        let read = backend.begin_read().unwrap();
450        let bytes = read
451            .get(TRANSACTION_LOCATIONS, tx_hash.as_bytes())
452            .unwrap()?;
453        Some(<Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes).unwrap())
454    }
455
456    fn h256(byte: u8) -> H256 {
457        H256::from_low_u64_be(byte as u64)
458    }
459
460    #[test]
461    fn migrate_2_to_3_empty_table() {
462        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
463        migrate_2_to_3(&backend).unwrap();
464        // Nothing to assert other than no error and no spurious entries.
465        assert!(read_new_entry(&backend, h256(1)).is_none());
466    }
467
468    #[test]
469    fn migrate_2_to_3_single_entry_per_hash() {
470        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
471        seed_old_entry(&backend, h256(1), 100, h256(0x10), 0);
472        seed_old_entry(&backend, h256(2), 101, h256(0x11), 5);
473        seed_old_entry(&backend, h256(3), 102, h256(0x12), 7);
474
475        migrate_2_to_3(&backend).unwrap();
476
477        assert_eq!(
478            read_new_entry(&backend, h256(1)).unwrap(),
479            vec![(100u64, h256(0x10), 0u64)]
480        );
481        assert_eq!(
482            read_new_entry(&backend, h256(2)).unwrap(),
483            vec![(101u64, h256(0x11), 5u64)]
484        );
485        assert_eq!(
486            read_new_entry(&backend, h256(3)).unwrap(),
487            vec![(102u64, h256(0x12), 7u64)]
488        );
489
490        // Old composite-key entries are gone.
491        let read = backend.begin_read().unwrap();
492        let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[]).unwrap();
493        for entry in iter {
494            let (key, _) = entry.unwrap();
495            assert_eq!(key.len(), 32, "leftover non-migrated key: {:?}", key);
496        }
497    }
498
499    #[test]
500    fn migrate_2_to_3_multi_block_per_hash() {
501        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
502        // Same tx hash appears in three different blocks (reorg scenario).
503        seed_old_entry(&backend, h256(0xAA), 100, h256(0x10), 3);
504        seed_old_entry(&backend, h256(0xAA), 100, h256(0x11), 4);
505        seed_old_entry(&backend, h256(0xAA), 101, h256(0x12), 5);
506
507        migrate_2_to_3(&backend).unwrap();
508
509        let mut got = read_new_entry(&backend, h256(0xAA)).unwrap();
510        got.sort();
511        let mut expected = vec![
512            (100u64, h256(0x10), 3u64),
513            (100u64, h256(0x11), 4u64),
514            (101u64, h256(0x12), 5u64),
515        ];
516        expected.sort();
517        assert_eq!(got, expected);
518    }
519
520    #[test]
521    fn migrate_2_to_3_is_idempotent_on_partial_state() {
522        // Simulate a crash-resume: the backend already has a v3 32-byte entry
523        // for one tx hash (from a previously-completed chunk), and v2 composite
524        // entries for another tx hash (still pending).
525        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
526
527        // Already-migrated v3 entry for h256(1).
528        {
529            let v3_value: Vec<(BlockNumber, BlockHash, Index)> =
530                vec![(100, h256(0x10), 0), (100, h256(0x11), 0)];
531            let mut batch = backend.begin_write().unwrap();
532            batch
533                .put(
534                    TRANSACTION_LOCATIONS,
535                    h256(1).as_bytes(),
536                    &v3_value.encode_to_vec(),
537                )
538                .unwrap();
539            batch.commit().unwrap();
540        }
541        // Pending v2 entries for h256(2).
542        seed_old_entry(&backend, h256(2), 200, h256(0x20), 0);
543        seed_old_entry(&backend, h256(2), 200, h256(0x21), 1);
544
545        migrate_2_to_3(&backend).unwrap();
546
547        // h256(1)'s already-migrated entry is unchanged.
548        assert_eq!(
549            read_new_entry(&backend, h256(1)).unwrap(),
550            vec![(100u64, h256(0x10), 0u64), (100u64, h256(0x11), 0u64)]
551        );
552
553        // h256(2) is now migrated.
554        let mut got = read_new_entry(&backend, h256(2)).unwrap();
555        got.sort();
556        let mut expected = vec![(200u64, h256(0x20), 0u64), (200u64, h256(0x21), 1u64)];
557        expected.sort();
558        assert_eq!(got, expected);
559
560        // No leftover 64-byte keys.
561        let read = backend.begin_read().unwrap();
562        let iter = read.prefix_iterator(TRANSACTION_LOCATIONS, &[]).unwrap();
563        for entry in iter {
564            let (key, _) = entry.unwrap();
565            assert_eq!(key.len(), 32);
566        }
567    }
568
569    /// The pathological case flagged in review: a single tx_hash has BOTH a v3
570    /// value (from a prior partial run) AND leftover v2 composite keys. Because
571    /// `flush_tx_location_group` uses `merge` (not `put`), the resumed migration
572    /// must UNION the leftover entries into the existing v3 value, not overwrite
573    /// it — no locations may be lost.
574    #[test]
575    fn migrate_2_to_3_unions_same_hash_mixed_state() {
576        let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
577        let tx = h256(0x42);
578
579        // Pre-existing v3 value for `tx` (one block already migrated).
580        {
581            let v3_value: Vec<(BlockNumber, BlockHash, Index)> = vec![(100, h256(0x10), 0)];
582            let mut batch = backend.begin_write().unwrap();
583            batch
584                .merge(
585                    TRANSACTION_LOCATIONS,
586                    tx.as_bytes(),
587                    &v3_value.encode_to_vec(),
588                )
589                .unwrap();
590            batch.commit().unwrap();
591        }
592        // Leftover v2 composite entries for the SAME tx (different blocks).
593        seed_old_entry(&backend, tx, 101, h256(0x11), 3);
594        seed_old_entry(&backend, tx, 102, h256(0x12), 7);
595
596        migrate_2_to_3(&backend).unwrap();
597
598        let mut got = read_new_entry(&backend, tx).unwrap();
599        got.sort();
600        let mut expected = vec![
601            (100u64, h256(0x10), 0u64), // pre-existing v3 entry survives
602            (101u64, h256(0x11), 3u64),
603            (102u64, h256(0x12), 7u64),
604        ];
605        expected.sort();
606        assert_eq!(
607            got, expected,
608            "merge must union, not overwrite, on mixed state"
609        );
610    }
611}