Skip to main content

bedrock_world/
storage.rs

1//! Storage abstraction used by [`crate::BedrockWorld`].
2//!
3//! The trait in this module keeps world parsing independent from a concrete
4//! LevelDB implementation. [`MemoryStorage`] is useful for tests and synthetic
5//! tools, while [`BedrockLevelDbStorage`](crate::BedrockLevelDbStorage) adapts
6//! the `bedrock-leveldb` crate.
7
8use crate::chunk::{ChunkKey, ChunkPos, ChunkRecordTag, Dimension, LEGACY_TERRAIN_VALUE_LEN};
9use crate::error::{BedrockWorldError, Result};
10use crate::level_dat::read_level_dat_document;
11use crate::nbt::NbtTag;
12use bytes::Bytes;
13use std::collections::BTreeMap;
14use std::fs;
15use std::path::Path;
16use std::sync::{
17    Arc, RwLock,
18    atomic::{AtomicBool, Ordering},
19};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22/// Owned raw key/value storage entry.
23pub struct StorageEntry {
24    /// Decoded storage key for this record.
25    pub key: Bytes,
26    /// Parsed or raw value associated with this record.
27    pub value: Bytes,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31/// Borrowed raw key/value storage entry view.
32pub struct StorageEntryRef<'a> {
33    /// Decoded storage key for this record.
34    pub key: &'a [u8],
35    /// Parsed or raw value associated with this record.
36    pub value: &'a [u8],
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40/// Raw storage mutation operation.
41pub enum StorageOp {
42    /// Writes or replaces a raw storage value.
43    Put {
44        /// Decoded storage key for this record.
45        key: Bytes,
46        /// Raw value bytes written for the key.
47        value: Bytes,
48    },
49    /// Delete operation.
50    Delete {
51        /// Decoded storage key for this record.
52        key: Bytes,
53    },
54}
55
56#[derive(Debug, Clone)]
57/// Options controlling raw storage reads and scans.
58pub struct StorageReadOptions {
59    /// Threading policy for this operation.
60    pub threading: StorageThreadingOptions,
61    /// Scan strategy requested from the backend.
62    pub scan_mode: StorageScanMode,
63    /// Bounded pipeline settings for this operation.
64    pub pipeline: StoragePipelineOptions,
65    /// Optional cancellation flag checked during long-running work.
66    pub cancel: Option<StorageCancelFlag>,
67    /// Optional progress sink invoked during long-running work.
68    pub progress: Option<StorageProgressSink>,
69}
70
71impl Default for StorageReadOptions {
72    fn default() -> Self {
73        Self {
74            threading: StorageThreadingOptions::Auto,
75            scan_mode: StorageScanMode::ParallelTables,
76            pipeline: StoragePipelineOptions::default(),
77            cancel: None,
78            progress: None,
79        }
80    }
81}
82
83#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
84/// Bounded pipeline settings for storage scans.
85pub struct StoragePipelineOptions {
86    /// Maximum queued work items; zero selects an automatic default.
87    pub queue_depth: usize,
88    /// Table batch size; zero selects an automatic default.
89    pub table_batch_size: usize,
90    /// Progress callback interval; zero selects an automatic default.
91    pub progress_interval: usize,
92}
93
94#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95/// Threading policy for storage operations.
96pub enum StorageThreadingOptions {
97    #[default]
98    /// Automatically choose the appropriate mode.
99    Auto,
100    /// Use a fixed worker count.
101    Fixed(usize),
102    /// Use a single worker.
103    Single,
104}
105
106#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
107/// Scan strategy requested from a storage backend.
108pub enum StorageScanMode {
109    #[default]
110    /// Scan sequentially.
111    Sequential,
112    /// Scan backend tables in parallel when supported.
113    ParallelTables,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117/// Visitor control flow for storage scans.
118pub enum StorageVisitorControl {
119    /// Continue visiting records.
120    Continue,
121    /// Stop visiting records.
122    Stop,
123}
124
125#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
126/// Diagnostics collected by a storage scan.
127pub struct StorageScanOutcome {
128    /// Number of entries visited by the scan.
129    pub visited: usize,
130    /// Number of value bytes read while scanning.
131    pub bytes_read: usize,
132    /// Whether a visitor requested early termination.
133    pub stopped: bool,
134    /// Number of backend tables scanned.
135    pub tables_scanned: usize,
136    /// Number of worker threads used by the operation.
137    pub worker_threads: usize,
138    /// Milliseconds spent waiting for bounded pipeline capacity.
139    pub queue_wait_ms: u128,
140    /// Number of cancellation checks performed.
141    pub cancel_checks: usize,
142}
143
144impl StorageScanOutcome {
145    #[must_use]
146    /// Returns an empty scan outcome with all counters set to zero.
147    pub const fn empty() -> Self {
148        Self {
149            visited: 0,
150            bytes_read: 0,
151            stopped: false,
152            tables_scanned: 0,
153            worker_threads: 0,
154            queue_wait_ms: 0,
155            cancel_checks: 0,
156        }
157    }
158
159    /// Records one visited value and its byte length.
160    pub fn record(&mut self, value_len: usize) {
161        self.visited = self.visited.saturating_add(1);
162        self.bytes_read = self.bytes_read.saturating_add(value_len);
163    }
164
165    /// Merges another scan outcome into this one.
166    pub fn merge(&mut self, other: Self) {
167        self.visited = self.visited.saturating_add(other.visited);
168        self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
169        self.stopped |= other.stopped;
170        self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
171        self.worker_threads = self.worker_threads.max(other.worker_threads);
172        self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
173        self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
174    }
175}
176
177#[derive(Debug, Clone, Default)]
178/// Shareable cancellation flag for storage operations.
179pub struct StorageCancelFlag(Arc<AtomicBool>);
180
181impl StorageCancelFlag {
182    #[must_use]
183    /// Creates a new value.
184    pub fn new() -> Self {
185        Self::default()
186    }
187
188    /// Requests cancellation for operations sharing this flag.
189    pub fn cancel(&self) {
190        self.0.store(true, Ordering::Relaxed);
191    }
192
193    #[must_use]
194    /// Creates a cancellation flag from a shared atomic boolean.
195    pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
196        Self(cancelled)
197    }
198
199    #[must_use]
200    /// Returns whether cancellation has been requested.
201    pub fn is_cancelled(&self) -> bool {
202        self.0.load(Ordering::Relaxed)
203    }
204}
205
206#[derive(Clone)]
207/// Callback sink for storage progress updates.
208pub struct StorageProgressSink {
209    inner: Arc<dyn Fn(StorageScanProgress) + Send + Sync>,
210}
211
212impl std::fmt::Debug for StorageProgressSink {
213    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        formatter
215            .debug_struct("StorageProgressSink")
216            .finish_non_exhaustive()
217    }
218}
219
220impl StorageProgressSink {
221    #[must_use]
222    /// Creates a new value.
223    pub fn new(callback: impl Fn(StorageScanProgress) + Send + Sync + 'static) -> Self {
224        Self {
225            inner: Arc::new(callback),
226        }
227    }
228
229    /// Emits a progress update to the callback.
230    pub fn emit(&self, progress: StorageScanProgress) {
231        (self.inner)(progress);
232    }
233}
234
235#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
236/// Progress update emitted by a storage backend.
237pub struct StorageScanProgress {
238    /// Number of entries observed when progress was emitted.
239    pub entries_seen: usize,
240    /// Number of value bytes read while scanning.
241    pub bytes_read: usize,
242}
243
244#[derive(Debug, Clone, Default, PartialEq, Eq)]
245/// Buffered batch of raw storage operations.
246pub struct StorageBatch {
247    ops: Vec<StorageOp>,
248}
249
250impl StorageBatch {
251    #[must_use]
252    /// Creates a new value.
253    pub const fn new() -> Self {
254        Self { ops: Vec::new() }
255    }
256
257    /// Adds a raw put operation to this batch.
258    pub fn put(&mut self, key: impl Into<Bytes>, value: impl Into<Bytes>) {
259        self.ops.push(StorageOp::Put {
260            key: key.into(),
261            value: value.into(),
262        });
263    }
264
265    /// Adds a raw delete operation to this batch.
266    pub fn delete(&mut self, key: impl Into<Bytes>) {
267        self.ops.push(StorageOp::Delete { key: key.into() });
268    }
269
270    #[must_use]
271    /// Returns whether this batch contains no operations.
272    pub fn is_empty(&self) -> bool {
273        self.ops.is_empty()
274    }
275
276    #[must_use]
277    /// Returns the buffered operations.
278    pub fn ops(&self) -> &[StorageOp] {
279        &self.ops
280    }
281}
282
283/// Raw key/value storage abstraction used by [`BedrockWorld`](crate::BedrockWorld).
284pub trait WorldStorage: Send + Sync {
285    /// Looks up a raw value by exact key.
286    fn get(&self, key: &[u8]) -> Result<Option<Bytes>>;
287    /// Looks up raw values by exact key, preserving input order.
288    fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
289        keys.iter().map(|key| self.get(key)).collect()
290    }
291    /// Looks up raw values by exact key with read options and cancellation, preserving input order.
292    fn get_many_ordered_with_control(
293        &self,
294        keys: &[Bytes],
295        options: StorageReadOptions,
296    ) -> Result<Vec<Option<Bytes>>> {
297        check_cancelled(&options)?;
298        let mut values = Vec::with_capacity(keys.len());
299        for key in keys {
300            check_cancelled(&options)?;
301            values.push(self.get(key)?);
302        }
303        Ok(values)
304    }
305    /// Writes a raw key/value pair.
306    fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
307    /// Deletes a raw key.
308    fn delete(&self, key: &[u8]) -> Result<()>;
309    /// Visits keys without forcing value materialization when the backend can
310    /// support key-only scans.
311    fn for_each_key(
312        &self,
313        options: StorageReadOptions,
314        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
315    ) -> Result<StorageScanOutcome>;
316    /// Visits key/value records whose key starts with `prefix`.
317    fn for_each_prefix(
318        &self,
319        prefix: &[u8],
320        options: StorageReadOptions,
321        visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
322    ) -> Result<StorageScanOutcome>;
323    /// Visits key/value records as borrowed byte views.
324    fn for_each_prefix_ref(
325        &self,
326        prefix: &[u8],
327        options: StorageReadOptions,
328        visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
329    ) -> Result<StorageScanOutcome> {
330        self.for_each_prefix(prefix, options, &mut |key, value| {
331            visitor(StorageEntryRef {
332                key,
333                value: value.as_ref(),
334            })
335        })
336    }
337    /// Visits keys whose key starts with `prefix` without requiring value
338    /// materialization when the backend can support key-only scans.
339    fn for_each_prefix_key(
340        &self,
341        prefix: &[u8],
342        options: StorageReadOptions,
343        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
344    ) -> Result<StorageScanOutcome> {
345        self.for_each_prefix(prefix, options, &mut |key, _value| visitor(key))
346    }
347    /// Visits all key/value records.
348    fn for_each_entry(
349        &self,
350        options: StorageReadOptions,
351        visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
352    ) -> Result<StorageScanOutcome> {
353        self.for_each_prefix(b"", options, visitor)
354    }
355    /// Applies a batch of raw operations atomically when supported by the backend.
356    fn write_batch(&self, batch: &StorageBatch) -> Result<()>;
357    /// Flushes pending writes to durable storage when supported by the backend.
358    fn flush(&self) -> Result<()>;
359}
360
361#[derive(Debug, Clone, Default)]
362/// In-memory storage backend for tests and synthetic tools.
363pub struct MemoryStorage {
364    values: Arc<RwLock<BTreeMap<Vec<u8>, Bytes>>>,
365}
366
367impl MemoryStorage {
368    #[must_use]
369    /// Creates a new value.
370    pub fn new() -> Self {
371        Self::default()
372    }
373}
374
375impl WorldStorage for MemoryStorage {
376    fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
377        let values = self.values.read().map_err(|_| {
378            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
379        })?;
380        Ok(values.get(key).cloned())
381    }
382
383    fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
384        let values = self.values.read().map_err(|_| {
385            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
386        })?;
387        Ok(keys
388            .iter()
389            .map(|key| values.get(key.as_ref()).cloned())
390            .collect())
391    }
392
393    fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
394        let mut values = self.values.write().map_err(|_| {
395            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
396        })?;
397        values.insert(key.to_vec(), Bytes::copy_from_slice(value));
398        Ok(())
399    }
400
401    fn delete(&self, key: &[u8]) -> Result<()> {
402        let mut values = self.values.write().map_err(|_| {
403            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
404        })?;
405        values.remove(key);
406        Ok(())
407    }
408
409    fn for_each_key(
410        &self,
411        options: StorageReadOptions,
412        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
413    ) -> Result<StorageScanOutcome> {
414        let values = self.values.read().map_err(|_| {
415            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
416        })?;
417        let mut outcome = StorageScanOutcome::empty();
418        for (key, value) in values.iter() {
419            check_cancelled(&options)?;
420            outcome.record(value.len());
421            if visitor(key)? == StorageVisitorControl::Stop {
422                outcome.stopped = true;
423                return Ok(outcome);
424            }
425            emit_progress(&options, outcome);
426        }
427        Ok(outcome)
428    }
429
430    fn for_each_prefix(
431        &self,
432        prefix: &[u8],
433        options: StorageReadOptions,
434        visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
435    ) -> Result<StorageScanOutcome> {
436        let values = self.values.read().map_err(|_| {
437            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
438        })?;
439        let mut outcome = StorageScanOutcome::empty();
440        for (key, value) in values
441            .range(prefix.to_vec()..)
442            .take_while(|(key, _)| key.starts_with(prefix))
443        {
444            check_cancelled(&options)?;
445            outcome.record(value.len());
446            if visitor(key, value)? == StorageVisitorControl::Stop {
447                outcome.stopped = true;
448                return Ok(outcome);
449            }
450            emit_progress(&options, outcome);
451        }
452        Ok(outcome)
453    }
454
455    fn for_each_prefix_key(
456        &self,
457        prefix: &[u8],
458        options: StorageReadOptions,
459        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
460    ) -> Result<StorageScanOutcome> {
461        let values = self.values.read().map_err(|_| {
462            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
463        })?;
464        let mut outcome = StorageScanOutcome::empty();
465        for (key, value) in values
466            .range(prefix.to_vec()..)
467            .take_while(|(key, _)| key.starts_with(prefix))
468        {
469            check_cancelled(&options)?;
470            outcome.record(value.len());
471            if visitor(key)? == StorageVisitorControl::Stop {
472                outcome.stopped = true;
473                return Ok(outcome);
474            }
475            emit_progress(&options, outcome);
476        }
477        Ok(outcome)
478    }
479
480    fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
481        let mut values = self.values.write().map_err(|_| {
482            BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
483        })?;
484        for op in batch.ops() {
485            match op {
486                StorageOp::Put { key, value } => {
487                    values.insert(key.to_vec(), value.clone());
488                }
489                StorageOp::Delete { key } => {
490                    values.remove(key.as_ref());
491                }
492            }
493        }
494        Ok(())
495    }
496
497    fn flush(&self) -> Result<()> {
498        Ok(())
499    }
500}
501
502/// Terrain payload length used by old Pocket Edition `chunks.dat` files before
503/// the 1024-byte `[biome_id, red, green, blue]` tail was added to `LevelDB`
504/// `LegacyTerrain`.
505pub const POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN: usize = 82_176;
506const POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN: usize = 4 * 32 * 32;
507const POCKET_CHUNKS_DAT_SECTOR_BYTES: usize = 4096;
508const DEFAULT_LEGACY_BIOME_SAMPLE: [u8; 4] = [1, 0x7f, 0xb2, 0x38];
509
510#[derive(Debug, Clone)]
511/// Read-only backend for pre-LevelDB Pocket Edition chunks.dat worlds.
512pub struct PocketChunksDatStorage {
513    values: Arc<BTreeMap<Vec<u8>, Bytes>>,
514    origin_chunk_x: i32,
515    origin_chunk_z: i32,
516}
517
518impl PocketChunksDatStorage {
519    /// Opens a read-only backend for a pre-`LevelDB` Pocket Edition world folder.
520    pub fn open(world_path: impl AsRef<Path>) -> Result<Self> {
521        let world_path = world_path.as_ref();
522        let chunks_path = world_path.join("chunks.dat");
523        let bytes = fs::read(&chunks_path)?;
524        let (origin_chunk_x, origin_chunk_z) = read_limited_world_origin(world_path);
525        let values = parse_pocket_chunks_dat(&bytes, origin_chunk_x, origin_chunk_z)?;
526        if world_path.join("entities.dat").is_file() {
527            match fs::read(world_path.join("entities.dat")) {
528                Ok(bytes) => log::debug!(
529                    "legacy entities.dat present (bytes={}, parser=best_effort_skip)",
530                    bytes.len()
531                ),
532                Err(error) => log::warn!("failed to read legacy entities.dat: {error}"),
533            }
534        }
535        log::debug!(
536            "opened Pocket chunks.dat storage (chunks={}, origin=({}, {}), path={})",
537            values.len(),
538            origin_chunk_x,
539            origin_chunk_z,
540            chunks_path.display()
541        );
542        Ok(Self {
543            values: Arc::new(values),
544            origin_chunk_x,
545            origin_chunk_z,
546        })
547    }
548
549    #[must_use]
550    /// Origin chunk x.
551    pub const fn origin_chunk_x(&self) -> i32 {
552        self.origin_chunk_x
553    }
554
555    #[must_use]
556    /// Origin chunk z.
557    pub const fn origin_chunk_z(&self) -> i32 {
558        self.origin_chunk_z
559    }
560}
561
562impl WorldStorage for PocketChunksDatStorage {
563    fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
564        Ok(self.values.get(key).cloned())
565    }
566
567    fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
568        Ok(keys
569            .iter()
570            .map(|key| self.values.get(key.as_ref()).cloned())
571            .collect())
572    }
573
574    fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
575        Err(pocket_chunks_dat_read_only_error())
576    }
577
578    fn delete(&self, _key: &[u8]) -> Result<()> {
579        Err(pocket_chunks_dat_read_only_error())
580    }
581
582    fn for_each_key(
583        &self,
584        options: StorageReadOptions,
585        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
586    ) -> Result<StorageScanOutcome> {
587        let mut outcome = StorageScanOutcome::empty();
588        for (key, value) in self.values.iter() {
589            check_cancelled(&options)?;
590            outcome.record(value.len());
591            if visitor(key)? == StorageVisitorControl::Stop {
592                outcome.stopped = true;
593                return Ok(outcome);
594            }
595            emit_progress(&options, outcome);
596        }
597        Ok(outcome)
598    }
599
600    fn for_each_prefix(
601        &self,
602        prefix: &[u8],
603        options: StorageReadOptions,
604        visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
605    ) -> Result<StorageScanOutcome> {
606        let mut outcome = StorageScanOutcome::empty();
607        for (key, value) in self
608            .values
609            .range(prefix.to_vec()..)
610            .take_while(|(key, _)| key.starts_with(prefix))
611        {
612            check_cancelled(&options)?;
613            outcome.record(value.len());
614            if visitor(key, value)? == StorageVisitorControl::Stop {
615                outcome.stopped = true;
616                return Ok(outcome);
617            }
618            emit_progress(&options, outcome);
619        }
620        Ok(outcome)
621    }
622
623    fn for_each_prefix_key(
624        &self,
625        prefix: &[u8],
626        options: StorageReadOptions,
627        visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
628    ) -> Result<StorageScanOutcome> {
629        let mut outcome = StorageScanOutcome::empty();
630        for (key, value) in self
631            .values
632            .range(prefix.to_vec()..)
633            .take_while(|(key, _)| key.starts_with(prefix))
634        {
635            check_cancelled(&options)?;
636            outcome.record(value.len());
637            if visitor(key)? == StorageVisitorControl::Stop {
638                outcome.stopped = true;
639                return Ok(outcome);
640            }
641            emit_progress(&options, outcome);
642        }
643        Ok(outcome)
644    }
645
646    fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
647        Err(pocket_chunks_dat_read_only_error())
648    }
649
650    fn flush(&self) -> Result<()> {
651        Ok(())
652    }
653}
654
655fn parse_pocket_chunks_dat(
656    bytes: &[u8],
657    origin_chunk_x: i32,
658    origin_chunk_z: i32,
659) -> Result<BTreeMap<Vec<u8>, Bytes>> {
660    if bytes.len() < POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN {
661        return Err(BedrockWorldError::CorruptWorld(format!(
662            "chunks.dat is too small for its location table: {} bytes",
663            bytes.len()
664        )));
665    }
666    let mut values = BTreeMap::new();
667    for index in 0..(32 * 32) {
668        let entry_offset = index * 4;
669        let entry = &bytes[entry_offset..entry_offset + 4];
670        if entry == [0, 0, 0, 0] {
671            continue;
672        }
673        let sector_count = usize::from(entry[0]);
674        let sector_offset =
675            usize::from(entry[1]) | (usize::from(entry[2]) << 8) | (usize::from(entry[3]) << 16);
676        if sector_count == 0 || sector_offset == 0 {
677            continue;
678        }
679        let Some(byte_offset) = sector_offset.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES) else {
680            continue;
681        };
682        let Some(payload) = pocket_chunk_payload(bytes, byte_offset, sector_count) else {
683            log::warn!(
684                "skipping invalid chunks.dat entry (index={index}, sector_offset={sector_offset}, sector_count={sector_count})"
685            );
686            continue;
687        };
688        let local_x = i32::try_from(index % 32).unwrap_or(0);
689        let local_z = i32::try_from(index / 32).unwrap_or(0);
690        let pos = ChunkPos {
691            x: origin_chunk_x.saturating_add(local_x),
692            z: origin_chunk_z.saturating_add(local_z),
693            dimension: Dimension::Overworld,
694        };
695        values.insert(
696            ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain)
697                .encode()
698                .to_vec(),
699            convert_pocket_terrain_to_legacy(payload),
700        );
701    }
702    Ok(values)
703}
704
705fn pocket_chunk_payload(bytes: &[u8], byte_offset: usize, sector_count: usize) -> Option<&[u8]> {
706    let sector_bytes = sector_count.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES)?;
707    let max_end = byte_offset.checked_add(sector_bytes)?.min(bytes.len());
708    if byte_offset >= bytes.len() || byte_offset >= max_end {
709        return None;
710    }
711    let available = &bytes[byte_offset..max_end];
712    if available.len() >= 4 {
713        let declared_len = u32::from_le_bytes(available[0..4].try_into().ok()?) as usize;
714        if declared_len == POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN
715            && available.len() >= 4 + declared_len
716        {
717            return Some(&available[4..4 + declared_len]);
718        }
719        if declared_len == LEGACY_TERRAIN_VALUE_LEN && available.len() >= 4 + declared_len {
720            return Some(&available[4..4 + POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
721        }
722    }
723    if available.len() >= POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN {
724        return Some(&available[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
725    }
726    None
727}
728
729fn convert_pocket_terrain_to_legacy(payload: &[u8]) -> Bytes {
730    if payload.len() == LEGACY_TERRAIN_VALUE_LEN {
731        return Bytes::copy_from_slice(payload);
732    }
733    let mut legacy = Vec::with_capacity(LEGACY_TERRAIN_VALUE_LEN);
734    legacy.extend_from_slice(&payload[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
735    for _ in 0..256 {
736        legacy.extend_from_slice(&DEFAULT_LEGACY_BIOME_SAMPLE);
737    }
738    Bytes::from(legacy)
739}
740
741fn read_limited_world_origin(world_path: &Path) -> (i32, i32) {
742    let Ok(document) = read_level_dat_document(&world_path.join("level.dat")) else {
743        return (0, 0);
744    };
745    let NbtTag::Compound(root) = document.root else {
746        return (0, 0);
747    };
748    (
749        nbt_i32(root.get("LimitedWorldOriginX")).unwrap_or(0),
750        nbt_i32(root.get("LimitedWorldOriginZ")).unwrap_or(0),
751    )
752}
753
754fn nbt_i32(tag: Option<&NbtTag>) -> Option<i32> {
755    match tag {
756        Some(NbtTag::Byte(value)) => Some(i32::from(*value)),
757        Some(NbtTag::Short(value)) => Some(i32::from(*value)),
758        Some(NbtTag::Int(value)) => Some(*value),
759        Some(NbtTag::Long(value)) => i32::try_from(*value).ok(),
760        _ => None,
761    }
762}
763
764fn pocket_chunks_dat_read_only_error() -> BedrockWorldError {
765    BedrockWorldError::UnsupportedChunkFormat("Pocket chunks.dat storage is read-only".to_string())
766}
767
768/// Backend module.
769pub mod backend {
770    use super::*;
771
772    #[cfg(feature = "backend-bedrock-leveldb")]
773    #[derive(Clone)]
774    /// Bedrock level db storage data model.
775    pub struct BedrockLevelDbStorage {
776        db: Arc<bedrock_leveldb::Db>,
777    }
778
779    #[cfg(feature = "backend-bedrock-leveldb")]
780    impl BedrockLevelDbStorage {
781        /// Opens a `LevelDB` directory for read/write access.
782        pub fn open(path: impl AsRef<Path>) -> Result<Self> {
783            Self::open_inner(path, false)
784        }
785
786        /// Opens a `LevelDB` directory without allowing backend writes.
787        pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self> {
788            Self::open_inner(path, true)
789        }
790
791        fn open_inner(path: impl AsRef<Path>, read_only: bool) -> Result<Self> {
792            let path = path.as_ref().to_path_buf();
793            if !path.exists() {
794                return Err(BedrockWorldError::Io(std::io::Error::new(
795                    std::io::ErrorKind::NotFound,
796                    format!("LevelDB path not found: {}", path.display()),
797                )));
798            }
799            let options = bedrock_leveldb::OpenOptions {
800                read_only,
801                create_if_missing: false,
802                error_if_exists: false,
803                paranoid_checks: true,
804                compression_policy: bedrock_leveldb::CompressionPolicy::Zlib,
805                cache_size: 64 * 1024 * 1024,
806                write_buffer_size: 4 * 1024 * 1024,
807            };
808            let db = bedrock_leveldb::Db::open(path, options).map_err(map_leveldb_error)?;
809            Ok(Self { db: Arc::new(db) })
810        }
811    }
812
813    #[cfg(feature = "backend-bedrock-leveldb")]
814    impl WorldStorage for BedrockLevelDbStorage {
815        fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
816            self.db.get(key).map_err(map_leveldb_error)
817        }
818
819        fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
820            self.db
821                .get_many_owned(
822                    keys.iter().cloned(),
823                    bedrock_leveldb::ReadOptions::default(),
824                )
825                .map_err(map_leveldb_error)
826        }
827
828        fn get_many_ordered_with_control(
829            &self,
830            keys: &[Bytes],
831            options: StorageReadOptions,
832        ) -> Result<Vec<Option<Bytes>>> {
833            check_cancelled(&options)?;
834            self.db
835                .get_many_owned(keys.iter().cloned(), to_leveldb_read_options(options))
836                .map_err(map_leveldb_error)
837        }
838
839        fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
840            self.db
841                .put(
842                    Bytes::copy_from_slice(key),
843                    Bytes::copy_from_slice(value),
844                    bedrock_leveldb::WriteOptions::default(),
845                )
846                .map_err(map_leveldb_error)
847        }
848
849        fn delete(&self, key: &[u8]) -> Result<()> {
850            self.db
851                .delete(
852                    Bytes::copy_from_slice(key),
853                    bedrock_leveldb::WriteOptions::default(),
854                )
855                .map_err(map_leveldb_error)
856        }
857
858        fn for_each_key(
859            &self,
860            options: StorageReadOptions,
861            visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
862        ) -> Result<StorageScanOutcome> {
863            let read_options = to_leveldb_read_options(options);
864            let mut visitor_error = None;
865            let scan_result = self
866                .db
867                .for_each_key(read_options, |key| match visitor(key) {
868                    Ok(StorageVisitorControl::Continue) => {
869                        Ok(bedrock_leveldb::VisitorControl::Continue)
870                    }
871                    Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
872                    Err(error) => {
873                        visitor_error = Some(error);
874                        Ok(bedrock_leveldb::VisitorControl::Stop)
875                    }
876                });
877            match (scan_result, visitor_error) {
878                (_, Some(error)) => Err(error),
879                (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
880                (Err(error), None) => Err(map_leveldb_error(error)),
881            }
882        }
883
884        fn for_each_prefix(
885            &self,
886            prefix: &[u8],
887            options: StorageReadOptions,
888            visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
889        ) -> Result<StorageScanOutcome> {
890            let read_options = to_leveldb_read_options(options);
891            let mut visitor_error = None;
892            let scan_result = self.db.for_each_prefix(prefix, read_options, |key, value| {
893                match visitor(key, value) {
894                    Ok(StorageVisitorControl::Continue) => {
895                        Ok(bedrock_leveldb::VisitorControl::Continue)
896                    }
897                    Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
898                    Err(error) => {
899                        visitor_error = Some(error);
900                        Ok(bedrock_leveldb::VisitorControl::Stop)
901                    }
902                }
903            });
904            match (scan_result, visitor_error) {
905                (_, Some(error)) => Err(error),
906                (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
907                (Err(error), None) => Err(map_leveldb_error(error)),
908            }
909        }
910
911        fn for_each_prefix_ref(
912            &self,
913            prefix: &[u8],
914            options: StorageReadOptions,
915            visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
916        ) -> Result<StorageScanOutcome> {
917            let mut read_options = to_leveldb_read_options(options);
918            read_options.read_strategy = bedrock_leveldb::ReadStrategy::Borrowed;
919            let mut visitor_error = None;
920            let scan_result = self.db.for_each_prefix_ref(prefix, read_options, |entry| {
921                match visitor(StorageEntryRef {
922                    key: entry.key.as_bytes(),
923                    value: entry.value.as_bytes(),
924                }) {
925                    Ok(StorageVisitorControl::Continue) => {
926                        Ok(bedrock_leveldb::VisitorControl::Continue)
927                    }
928                    Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
929                    Err(error) => {
930                        visitor_error = Some(error);
931                        Ok(bedrock_leveldb::VisitorControl::Stop)
932                    }
933                }
934            });
935            match (scan_result, visitor_error) {
936                (_, Some(error)) => Err(error),
937                (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
938                (Err(error), None) => Err(map_leveldb_error(error)),
939            }
940        }
941
942        fn for_each_prefix_key(
943            &self,
944            prefix: &[u8],
945            options: StorageReadOptions,
946            visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
947        ) -> Result<StorageScanOutcome> {
948            let read_options = to_leveldb_read_options(options);
949            let mut visitor_error = None;
950            let scan_result =
951                self.db
952                    .for_each_prefix_key(prefix, read_options, |key| match visitor(key) {
953                        Ok(StorageVisitorControl::Continue) => {
954                            Ok(bedrock_leveldb::VisitorControl::Continue)
955                        }
956                        Ok(StorageVisitorControl::Stop) => {
957                            Ok(bedrock_leveldb::VisitorControl::Stop)
958                        }
959                        Err(error) => {
960                            visitor_error = Some(error);
961                            Ok(bedrock_leveldb::VisitorControl::Stop)
962                        }
963                    });
964            match (scan_result, visitor_error) {
965                (_, Some(error)) => Err(error),
966                (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
967                (Err(error), None) => Err(map_leveldb_error(error)),
968            }
969        }
970
971        fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
972            let mut db_batch = bedrock_leveldb::WriteBatch::new();
973            for op in batch.ops() {
974                match op {
975                    StorageOp::Put { key, value } => db_batch.put(key.clone(), value.clone()),
976                    StorageOp::Delete { key } => db_batch.delete(key.clone()),
977                }
978            }
979            self.db
980                .write(db_batch, bedrock_leveldb::WriteOptions::default())
981                .map_err(map_leveldb_error)
982        }
983
984        fn flush(&self) -> Result<()> {
985            self.db.flush().map_err(map_leveldb_error)
986        }
987    }
988
989    #[cfg(feature = "backend-bedrock-leveldb")]
990    fn map_leveldb_error(error: bedrock_leveldb::LevelDbError) -> BedrockWorldError {
991        match error.kind() {
992            bedrock_leveldb::ErrorKind::Cancelled => BedrockWorldError::Cancelled {
993                operation: "LevelDB scan",
994            },
995            bedrock_leveldb::ErrorKind::ReadOnly => BedrockWorldError::ReadOnly,
996            _ => BedrockWorldError::LevelDb(error.to_string()),
997        }
998    }
999
1000    #[cfg(feature = "backend-bedrock-leveldb")]
1001    fn to_leveldb_read_options(options: StorageReadOptions) -> bedrock_leveldb::ReadOptions {
1002        bedrock_leveldb::ReadOptions {
1003            checksum: bedrock_leveldb::ChecksumMode::Inherit,
1004            cache_policy: bedrock_leveldb::CachePolicy::Bypass,
1005            read_strategy: bedrock_leveldb::ReadStrategy::Shared,
1006            threading: match options.threading {
1007                StorageThreadingOptions::Auto => bedrock_leveldb::ThreadingOptions::Auto,
1008                StorageThreadingOptions::Fixed(threads) => {
1009                    bedrock_leveldb::ThreadingOptions::Fixed(threads)
1010                }
1011                StorageThreadingOptions::Single => bedrock_leveldb::ThreadingOptions::Single,
1012            },
1013            scan_mode: match options.scan_mode {
1014                StorageScanMode::Sequential => bedrock_leveldb::ScanMode::Sequential,
1015                StorageScanMode::ParallelTables => bedrock_leveldb::ScanMode::ParallelTables,
1016            },
1017            pipeline: bedrock_leveldb::ScanPipelineOptions {
1018                queue_depth: options.pipeline.queue_depth,
1019                table_batch_size: options.pipeline.table_batch_size,
1020                progress_interval: options.pipeline.progress_interval,
1021            },
1022            cancel: options
1023                .cancel
1024                .map(|cancel| bedrock_leveldb::ScanCancelFlag::from_shared(cancel.0)),
1025            progress: options.progress.map(|progress| {
1026                bedrock_leveldb::ScanProgressSink::new(move |db_progress| {
1027                    progress.emit(StorageScanProgress {
1028                        entries_seen: db_progress.visited,
1029                        bytes_read: db_progress.bytes_read,
1030                    });
1031                })
1032            }),
1033        }
1034    }
1035
1036    #[cfg(feature = "backend-bedrock-leveldb")]
1037    const fn to_storage_outcome(outcome: bedrock_leveldb::ScanOutcome) -> StorageScanOutcome {
1038        StorageScanOutcome {
1039            visited: outcome.visited,
1040            bytes_read: outcome.bytes_read,
1041            stopped: outcome.stopped,
1042            tables_scanned: outcome.tables_scanned,
1043            worker_threads: outcome.worker_threads,
1044            queue_wait_ms: outcome.queue_wait_ms,
1045            cancel_checks: outcome.cancel_checks,
1046        }
1047    }
1048
1049    #[cfg(not(feature = "backend-bedrock-leveldb"))]
1050    #[derive(Debug, Clone, Copy)]
1051    /// Placeholder backend returned when `backend-bedrock-leveldb` is disabled.
1052    pub struct BedrockLevelDbStorage;
1053
1054    #[cfg(not(feature = "backend-bedrock-leveldb"))]
1055    impl BedrockLevelDbStorage {
1056        /// Returns an error because the LevelDB backend feature is disabled.
1057        pub fn open(_path: impl AsRef<Path>) -> Result<Self> {
1058            Err(BedrockWorldError::LevelDb(
1059                "backend-bedrock-leveldb feature is disabled".to_string(),
1060            ))
1061        }
1062
1063        /// Returns an error because the LevelDB backend feature is disabled.
1064        pub fn open_read_only(_path: impl AsRef<Path>) -> Result<Self> {
1065            Err(BedrockWorldError::LevelDb(
1066                "backend-bedrock-leveldb feature is disabled".to_string(),
1067            ))
1068        }
1069    }
1070
1071    #[cfg(not(feature = "backend-bedrock-leveldb"))]
1072    impl WorldStorage for BedrockLevelDbStorage {
1073        fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
1074            Err(BedrockWorldError::LevelDb(
1075                "backend-bedrock-leveldb feature is disabled".to_string(),
1076            ))
1077        }
1078
1079        fn get_many(&self, _keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
1080            Err(BedrockWorldError::LevelDb(
1081                "backend-bedrock-leveldb feature is disabled".to_string(),
1082            ))
1083        }
1084
1085        fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
1086            Err(BedrockWorldError::LevelDb(
1087                "backend-bedrock-leveldb feature is disabled".to_string(),
1088            ))
1089        }
1090
1091        fn delete(&self, _key: &[u8]) -> Result<()> {
1092            Err(BedrockWorldError::LevelDb(
1093                "backend-bedrock-leveldb feature is disabled".to_string(),
1094            ))
1095        }
1096
1097        fn for_each_key(
1098            &self,
1099            _options: StorageReadOptions,
1100            _visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
1101        ) -> Result<StorageScanOutcome> {
1102            Err(BedrockWorldError::LevelDb(
1103                "backend-bedrock-leveldb feature is disabled".to_string(),
1104            ))
1105        }
1106
1107        fn for_each_prefix(
1108            &self,
1109            _prefix: &[u8],
1110            _options: StorageReadOptions,
1111            _visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
1112        ) -> Result<StorageScanOutcome> {
1113            Err(BedrockWorldError::LevelDb(
1114                "backend-bedrock-leveldb feature is disabled".to_string(),
1115            ))
1116        }
1117
1118        fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
1119            Err(BedrockWorldError::LevelDb(
1120                "backend-bedrock-leveldb feature is disabled".to_string(),
1121            ))
1122        }
1123
1124        fn flush(&self) -> Result<()> {
1125            Err(BedrockWorldError::LevelDb(
1126                "backend-bedrock-leveldb feature is disabled".to_string(),
1127            ))
1128        }
1129    }
1130}
1131
1132fn check_cancelled(options: &StorageReadOptions) -> Result<()> {
1133    if options
1134        .cancel
1135        .as_ref()
1136        .is_some_and(StorageCancelFlag::is_cancelled)
1137    {
1138        return Err(BedrockWorldError::Cancelled {
1139            operation: "storage scan",
1140        });
1141    }
1142    Ok(())
1143}
1144
1145fn emit_progress(options: &StorageReadOptions, outcome: StorageScanOutcome) {
1146    if let Some(progress) = &options.progress {
1147        progress.emit(StorageScanProgress {
1148            entries_seen: outcome.visited,
1149            bytes_read: outcome.bytes_read,
1150        });
1151    }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157    #[cfg(feature = "backend-bedrock-leveldb")]
1158    use std::time::{SystemTime, UNIX_EPOCH};
1159
1160    #[test]
1161    fn memory_storage_scans_prefix_without_copying_values() {
1162        let storage = MemoryStorage::new();
1163        storage.put(b"abc1", b"one").expect("put");
1164        storage.put(b"abc2", b"two").expect("put");
1165        storage.put(b"abd", b"three").expect("put");
1166
1167        let mut entries = Vec::new();
1168        storage
1169            .for_each_prefix(b"abc", StorageReadOptions::default(), &mut |key, value| {
1170                entries.push(StorageEntry {
1171                    key: Bytes::copy_from_slice(key),
1172                    value: value.clone(),
1173                });
1174                Ok(StorageVisitorControl::Continue)
1175            })
1176            .expect("scan");
1177        assert_eq!(entries.len(), 2);
1178        assert_eq!(entries[0].key, Bytes::from_static(b"abc1"));
1179        assert_eq!(entries[1].value, Bytes::from_static(b"two"));
1180    }
1181
1182    #[cfg(feature = "backend-bedrock-leveldb")]
1183    #[test]
1184    fn bedrock_leveldb_storage_roundtrips_raw_records() {
1185        let path = std::env::temp_dir().join(format!(
1186            "bedrock-world-storage-{}",
1187            SystemTime::now()
1188                .duration_since(UNIX_EPOCH)
1189                .expect("time")
1190                .as_nanos()
1191        ));
1192        std::fs::create_dir_all(&path).expect("create");
1193        drop(
1194            bedrock_leveldb::Db::open(&path, bedrock_leveldb::OpenOptions::default())
1195                .expect("initialize"),
1196        );
1197
1198        let storage = backend::BedrockLevelDbStorage::open(&path).expect("open");
1199        storage.put(b"player_1", b"one").expect("put");
1200        storage.put(b"player_2", b"two").expect("put");
1201        storage.flush().expect("flush");
1202
1203        let reopened = backend::BedrockLevelDbStorage::open(&path).expect("reopen");
1204        assert_eq!(
1205            reopened.get(b"player_1").expect("get"),
1206            Some(Bytes::from_static(b"one"))
1207        );
1208        let mut player_count = 0;
1209        reopened
1210            .for_each_prefix(
1211                b"player_",
1212                StorageReadOptions::default(),
1213                &mut |_key, _value| {
1214                    player_count += 1;
1215                    Ok(StorageVisitorControl::Continue)
1216                },
1217            )
1218            .expect("scan");
1219        assert_eq!(player_count, 2);
1220
1221        std::fs::remove_dir_all(path).expect("cleanup");
1222    }
1223
1224    #[test]
1225    fn pocket_chunks_dat_exposes_virtual_legacy_terrain_records() {
1226        let path = std::env::temp_dir().join(format!(
1227            "bedrock-world-pocket-chunks-{}",
1228            std::time::SystemTime::now()
1229                .duration_since(std::time::UNIX_EPOCH)
1230                .expect("time")
1231                .as_nanos()
1232        ));
1233        std::fs::create_dir_all(&path).expect("create world dir");
1234        let mut terrain = vec![0_u8; POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN];
1235        let block_index = (1_usize << 11) | (3_usize << 7) | 2_usize;
1236        let column_index = 3_usize * 16 + 1_usize;
1237        terrain[block_index] = 42;
1238        terrain[crate::LEGACY_TERRAIN_BLOCK_COUNT
1239            + (crate::LEGACY_TERRAIN_BLOCK_COUNT / 2) * 3
1240            + column_index] = 99;
1241        let mut chunks = vec![0_u8; POCKET_CHUNKS_DAT_SECTOR_BYTES];
1242        chunks[0] = 21;
1243        chunks[1] = 1;
1244        let mut payload = Vec::new();
1245        payload.extend_from_slice(&(POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN as u32).to_le_bytes());
1246        payload.extend_from_slice(&terrain);
1247        chunks.extend_from_slice(&payload);
1248        let padded_len = POCKET_CHUNKS_DAT_SECTOR_BYTES * 22;
1249        chunks.resize(padded_len, 0);
1250        std::fs::write(path.join("chunks.dat"), chunks).expect("write chunks.dat");
1251
1252        let storage = PocketChunksDatStorage::open(&path).expect("open pocket chunks");
1253        let pos = ChunkPos {
1254            x: 0,
1255            z: 0,
1256            dimension: Dimension::Overworld,
1257        };
1258        let legacy_key = ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain).encode();
1259        let missing_key = ChunkKey::new(
1260            ChunkPos {
1261                x: 1,
1262                z: 0,
1263                dimension: Dimension::Overworld,
1264            },
1265            ChunkRecordTag::LegacyTerrain,
1266        )
1267        .encode();
1268
1269        let values = storage
1270            .get_many(&[missing_key.clone(), legacy_key.clone()])
1271            .expect("get many");
1272        assert!(values[0].is_none());
1273        let Some(value) = &values[1] else {
1274            panic!("legacy terrain should be present");
1275        };
1276        assert_eq!(value.len(), LEGACY_TERRAIN_VALUE_LEN);
1277        assert_eq!(
1278            &value[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN],
1279            terrain.as_slice()
1280        );
1281        let terrain = crate::LegacyTerrain::parse(value.clone()).expect("legacy terrain");
1282        assert_eq!(terrain.block_id_at(1, 2, 3), Some(42));
1283        assert_eq!(terrain.height_at(1, 3), Some(99));
1284
1285        let mut keys = Vec::new();
1286        storage
1287            .for_each_key(StorageReadOptions::default(), &mut |key| {
1288                keys.push(Bytes::copy_from_slice(key));
1289                Ok(StorageVisitorControl::Continue)
1290            })
1291            .expect("scan keys");
1292        assert_eq!(keys, vec![legacy_key]);
1293        assert!(storage.put(b"x", b"y").is_err());
1294
1295        std::fs::remove_dir_all(path).expect("cleanup");
1296    }
1297}