1use crate::chunk::{ChunkKey, ChunkPos, ChunkRecordTag, Dimension, LEGACY_TERRAIN_VALUE_LEN};
9use crate::error::{BedrockWorldError, Result};
10use crate::level_dat::read_level_dat_document;
11use crate::nbt::NbtTag;
12use bytes::Bytes;
13use std::collections::BTreeMap;
14use std::fs;
15use std::path::Path;
16use std::sync::{
17 Arc, RwLock,
18 atomic::{AtomicBool, Ordering},
19};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct StorageEntry {
24 pub key: Bytes,
26 pub value: Bytes,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct StorageEntryRef<'a> {
33 pub key: &'a [u8],
35 pub value: &'a [u8],
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum StorageOp {
42 Put {
44 key: Bytes,
46 value: Bytes,
48 },
49 Delete {
51 key: Bytes,
53 },
54}
55
56#[derive(Debug, Clone)]
57pub struct StorageReadOptions {
59 pub threading: StorageThreadingOptions,
61 pub scan_mode: StorageScanMode,
63 pub pipeline: StoragePipelineOptions,
65 pub cancel: Option<StorageCancelFlag>,
67 pub progress: Option<StorageProgressSink>,
69}
70
71impl Default for StorageReadOptions {
72 fn default() -> Self {
73 Self {
74 threading: StorageThreadingOptions::Auto,
75 scan_mode: StorageScanMode::ParallelTables,
76 pipeline: StoragePipelineOptions::default(),
77 cancel: None,
78 progress: None,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
84pub struct StoragePipelineOptions {
86 pub queue_depth: usize,
88 pub table_batch_size: usize,
90 pub progress_interval: usize,
92}
93
94#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95pub enum StorageThreadingOptions {
97 #[default]
98 Auto,
100 Fixed(usize),
102 Single,
104}
105
106#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
107pub enum StorageScanMode {
109 #[default]
110 Sequential,
112 ParallelTables,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum StorageVisitorControl {
119 Continue,
121 Stop,
123}
124
125#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
126pub struct StorageScanOutcome {
128 pub visited: usize,
130 pub bytes_read: usize,
132 pub stopped: bool,
134 pub tables_scanned: usize,
136 pub worker_threads: usize,
138 pub queue_wait_ms: u128,
140 pub cancel_checks: usize,
142}
143
144impl StorageScanOutcome {
145 #[must_use]
146 pub const fn empty() -> Self {
148 Self {
149 visited: 0,
150 bytes_read: 0,
151 stopped: false,
152 tables_scanned: 0,
153 worker_threads: 0,
154 queue_wait_ms: 0,
155 cancel_checks: 0,
156 }
157 }
158
159 pub fn record(&mut self, value_len: usize) {
161 self.visited = self.visited.saturating_add(1);
162 self.bytes_read = self.bytes_read.saturating_add(value_len);
163 }
164
165 pub fn merge(&mut self, other: Self) {
167 self.visited = self.visited.saturating_add(other.visited);
168 self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
169 self.stopped |= other.stopped;
170 self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
171 self.worker_threads = self.worker_threads.max(other.worker_threads);
172 self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
173 self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
174 }
175}
176
177#[derive(Debug, Clone, Default)]
178pub struct StorageCancelFlag(Arc<AtomicBool>);
180
181impl StorageCancelFlag {
182 #[must_use]
183 pub fn new() -> Self {
185 Self::default()
186 }
187
188 pub fn cancel(&self) {
190 self.0.store(true, Ordering::Relaxed);
191 }
192
193 #[must_use]
194 pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
196 Self(cancelled)
197 }
198
199 #[must_use]
200 pub fn is_cancelled(&self) -> bool {
202 self.0.load(Ordering::Relaxed)
203 }
204}
205
206#[derive(Clone)]
207pub struct StorageProgressSink {
209 inner: Arc<dyn Fn(StorageScanProgress) + Send + Sync>,
210}
211
212impl std::fmt::Debug for StorageProgressSink {
213 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 formatter
215 .debug_struct("StorageProgressSink")
216 .finish_non_exhaustive()
217 }
218}
219
220impl StorageProgressSink {
221 #[must_use]
222 pub fn new(callback: impl Fn(StorageScanProgress) + Send + Sync + 'static) -> Self {
224 Self {
225 inner: Arc::new(callback),
226 }
227 }
228
229 pub fn emit(&self, progress: StorageScanProgress) {
231 (self.inner)(progress);
232 }
233}
234
235#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
236pub struct StorageScanProgress {
238 pub entries_seen: usize,
240 pub bytes_read: usize,
242}
243
244#[derive(Debug, Clone, Default, PartialEq, Eq)]
245pub struct StorageBatch {
247 ops: Vec<StorageOp>,
248}
249
250impl StorageBatch {
251 #[must_use]
252 pub const fn new() -> Self {
254 Self { ops: Vec::new() }
255 }
256
257 pub fn put(&mut self, key: impl Into<Bytes>, value: impl Into<Bytes>) {
259 self.ops.push(StorageOp::Put {
260 key: key.into(),
261 value: value.into(),
262 });
263 }
264
265 pub fn delete(&mut self, key: impl Into<Bytes>) {
267 self.ops.push(StorageOp::Delete { key: key.into() });
268 }
269
270 #[must_use]
271 pub fn is_empty(&self) -> bool {
273 self.ops.is_empty()
274 }
275
276 #[must_use]
277 pub fn ops(&self) -> &[StorageOp] {
279 &self.ops
280 }
281}
282
283pub trait WorldStorage: Send + Sync {
285 fn get(&self, key: &[u8]) -> Result<Option<Bytes>>;
287 fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
289 keys.iter().map(|key| self.get(key)).collect()
290 }
291 fn get_many_ordered_with_control(
293 &self,
294 keys: &[Bytes],
295 options: StorageReadOptions,
296 ) -> Result<Vec<Option<Bytes>>> {
297 check_cancelled(&options)?;
298 let mut values = Vec::with_capacity(keys.len());
299 for key in keys {
300 check_cancelled(&options)?;
301 values.push(self.get(key)?);
302 }
303 Ok(values)
304 }
305 fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
307 fn delete(&self, key: &[u8]) -> Result<()>;
309 fn for_each_key(
312 &self,
313 options: StorageReadOptions,
314 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
315 ) -> Result<StorageScanOutcome>;
316 fn for_each_prefix(
318 &self,
319 prefix: &[u8],
320 options: StorageReadOptions,
321 visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
322 ) -> Result<StorageScanOutcome>;
323 fn for_each_prefix_ref(
325 &self,
326 prefix: &[u8],
327 options: StorageReadOptions,
328 visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
329 ) -> Result<StorageScanOutcome> {
330 self.for_each_prefix(prefix, options, &mut |key, value| {
331 visitor(StorageEntryRef {
332 key,
333 value: value.as_ref(),
334 })
335 })
336 }
337 fn for_each_prefix_key(
340 &self,
341 prefix: &[u8],
342 options: StorageReadOptions,
343 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
344 ) -> Result<StorageScanOutcome> {
345 self.for_each_prefix(prefix, options, &mut |key, _value| visitor(key))
346 }
347 fn for_each_entry(
349 &self,
350 options: StorageReadOptions,
351 visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
352 ) -> Result<StorageScanOutcome> {
353 self.for_each_prefix(b"", options, visitor)
354 }
355 fn write_batch(&self, batch: &StorageBatch) -> Result<()>;
357 fn flush(&self) -> Result<()>;
359}
360
361#[derive(Debug, Clone, Default)]
362pub struct MemoryStorage {
364 values: Arc<RwLock<BTreeMap<Vec<u8>, Bytes>>>,
365}
366
367impl MemoryStorage {
368 #[must_use]
369 pub fn new() -> Self {
371 Self::default()
372 }
373}
374
375impl WorldStorage for MemoryStorage {
376 fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
377 let values = self.values.read().map_err(|_| {
378 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
379 })?;
380 Ok(values.get(key).cloned())
381 }
382
383 fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
384 let values = self.values.read().map_err(|_| {
385 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
386 })?;
387 Ok(keys
388 .iter()
389 .map(|key| values.get(key.as_ref()).cloned())
390 .collect())
391 }
392
393 fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
394 let mut values = self.values.write().map_err(|_| {
395 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
396 })?;
397 values.insert(key.to_vec(), Bytes::copy_from_slice(value));
398 Ok(())
399 }
400
401 fn delete(&self, key: &[u8]) -> Result<()> {
402 let mut values = self.values.write().map_err(|_| {
403 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
404 })?;
405 values.remove(key);
406 Ok(())
407 }
408
409 fn for_each_key(
410 &self,
411 options: StorageReadOptions,
412 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
413 ) -> Result<StorageScanOutcome> {
414 let values = self.values.read().map_err(|_| {
415 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
416 })?;
417 let mut outcome = StorageScanOutcome::empty();
418 for (key, value) in values.iter() {
419 check_cancelled(&options)?;
420 outcome.record(value.len());
421 if visitor(key)? == StorageVisitorControl::Stop {
422 outcome.stopped = true;
423 return Ok(outcome);
424 }
425 emit_progress(&options, outcome);
426 }
427 Ok(outcome)
428 }
429
430 fn for_each_prefix(
431 &self,
432 prefix: &[u8],
433 options: StorageReadOptions,
434 visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
435 ) -> Result<StorageScanOutcome> {
436 let values = self.values.read().map_err(|_| {
437 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
438 })?;
439 let mut outcome = StorageScanOutcome::empty();
440 for (key, value) in values
441 .range(prefix.to_vec()..)
442 .take_while(|(key, _)| key.starts_with(prefix))
443 {
444 check_cancelled(&options)?;
445 outcome.record(value.len());
446 if visitor(key, value)? == StorageVisitorControl::Stop {
447 outcome.stopped = true;
448 return Ok(outcome);
449 }
450 emit_progress(&options, outcome);
451 }
452 Ok(outcome)
453 }
454
455 fn for_each_prefix_key(
456 &self,
457 prefix: &[u8],
458 options: StorageReadOptions,
459 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
460 ) -> Result<StorageScanOutcome> {
461 let values = self.values.read().map_err(|_| {
462 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
463 })?;
464 let mut outcome = StorageScanOutcome::empty();
465 for (key, value) in values
466 .range(prefix.to_vec()..)
467 .take_while(|(key, _)| key.starts_with(prefix))
468 {
469 check_cancelled(&options)?;
470 outcome.record(value.len());
471 if visitor(key)? == StorageVisitorControl::Stop {
472 outcome.stopped = true;
473 return Ok(outcome);
474 }
475 emit_progress(&options, outcome);
476 }
477 Ok(outcome)
478 }
479
480 fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
481 let mut values = self.values.write().map_err(|_| {
482 BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
483 })?;
484 for op in batch.ops() {
485 match op {
486 StorageOp::Put { key, value } => {
487 values.insert(key.to_vec(), value.clone());
488 }
489 StorageOp::Delete { key } => {
490 values.remove(key.as_ref());
491 }
492 }
493 }
494 Ok(())
495 }
496
497 fn flush(&self) -> Result<()> {
498 Ok(())
499 }
500}
501
502pub const POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN: usize = 82_176;
506const POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN: usize = 4 * 32 * 32;
507const POCKET_CHUNKS_DAT_SECTOR_BYTES: usize = 4096;
508const DEFAULT_LEGACY_BIOME_SAMPLE: [u8; 4] = [1, 0x7f, 0xb2, 0x38];
509
510#[derive(Debug, Clone)]
511pub struct PocketChunksDatStorage {
513 values: Arc<BTreeMap<Vec<u8>, Bytes>>,
514 origin_chunk_x: i32,
515 origin_chunk_z: i32,
516}
517
518impl PocketChunksDatStorage {
519 pub fn open(world_path: impl AsRef<Path>) -> Result<Self> {
521 let world_path = world_path.as_ref();
522 let chunks_path = world_path.join("chunks.dat");
523 let bytes = fs::read(&chunks_path)?;
524 let (origin_chunk_x, origin_chunk_z) = read_limited_world_origin(world_path);
525 let values = parse_pocket_chunks_dat(&bytes, origin_chunk_x, origin_chunk_z)?;
526 if world_path.join("entities.dat").is_file() {
527 match fs::read(world_path.join("entities.dat")) {
528 Ok(bytes) => log::debug!(
529 "legacy entities.dat present (bytes={}, parser=best_effort_skip)",
530 bytes.len()
531 ),
532 Err(error) => log::warn!("failed to read legacy entities.dat: {error}"),
533 }
534 }
535 log::debug!(
536 "opened Pocket chunks.dat storage (chunks={}, origin=({}, {}), path={})",
537 values.len(),
538 origin_chunk_x,
539 origin_chunk_z,
540 chunks_path.display()
541 );
542 Ok(Self {
543 values: Arc::new(values),
544 origin_chunk_x,
545 origin_chunk_z,
546 })
547 }
548
549 #[must_use]
550 pub const fn origin_chunk_x(&self) -> i32 {
552 self.origin_chunk_x
553 }
554
555 #[must_use]
556 pub const fn origin_chunk_z(&self) -> i32 {
558 self.origin_chunk_z
559 }
560}
561
562impl WorldStorage for PocketChunksDatStorage {
563 fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
564 Ok(self.values.get(key).cloned())
565 }
566
567 fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
568 Ok(keys
569 .iter()
570 .map(|key| self.values.get(key.as_ref()).cloned())
571 .collect())
572 }
573
574 fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
575 Err(pocket_chunks_dat_read_only_error())
576 }
577
578 fn delete(&self, _key: &[u8]) -> Result<()> {
579 Err(pocket_chunks_dat_read_only_error())
580 }
581
582 fn for_each_key(
583 &self,
584 options: StorageReadOptions,
585 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
586 ) -> Result<StorageScanOutcome> {
587 let mut outcome = StorageScanOutcome::empty();
588 for (key, value) in self.values.iter() {
589 check_cancelled(&options)?;
590 outcome.record(value.len());
591 if visitor(key)? == StorageVisitorControl::Stop {
592 outcome.stopped = true;
593 return Ok(outcome);
594 }
595 emit_progress(&options, outcome);
596 }
597 Ok(outcome)
598 }
599
600 fn for_each_prefix(
601 &self,
602 prefix: &[u8],
603 options: StorageReadOptions,
604 visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
605 ) -> Result<StorageScanOutcome> {
606 let mut outcome = StorageScanOutcome::empty();
607 for (key, value) in self
608 .values
609 .range(prefix.to_vec()..)
610 .take_while(|(key, _)| key.starts_with(prefix))
611 {
612 check_cancelled(&options)?;
613 outcome.record(value.len());
614 if visitor(key, value)? == StorageVisitorControl::Stop {
615 outcome.stopped = true;
616 return Ok(outcome);
617 }
618 emit_progress(&options, outcome);
619 }
620 Ok(outcome)
621 }
622
623 fn for_each_prefix_key(
624 &self,
625 prefix: &[u8],
626 options: StorageReadOptions,
627 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
628 ) -> Result<StorageScanOutcome> {
629 let mut outcome = StorageScanOutcome::empty();
630 for (key, value) in self
631 .values
632 .range(prefix.to_vec()..)
633 .take_while(|(key, _)| key.starts_with(prefix))
634 {
635 check_cancelled(&options)?;
636 outcome.record(value.len());
637 if visitor(key)? == StorageVisitorControl::Stop {
638 outcome.stopped = true;
639 return Ok(outcome);
640 }
641 emit_progress(&options, outcome);
642 }
643 Ok(outcome)
644 }
645
646 fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
647 Err(pocket_chunks_dat_read_only_error())
648 }
649
650 fn flush(&self) -> Result<()> {
651 Ok(())
652 }
653}
654
655fn parse_pocket_chunks_dat(
656 bytes: &[u8],
657 origin_chunk_x: i32,
658 origin_chunk_z: i32,
659) -> Result<BTreeMap<Vec<u8>, Bytes>> {
660 if bytes.len() < POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN {
661 return Err(BedrockWorldError::CorruptWorld(format!(
662 "chunks.dat is too small for its location table: {} bytes",
663 bytes.len()
664 )));
665 }
666 let mut values = BTreeMap::new();
667 for index in 0..(32 * 32) {
668 let entry_offset = index * 4;
669 let entry = &bytes[entry_offset..entry_offset + 4];
670 if entry == [0, 0, 0, 0] {
671 continue;
672 }
673 let sector_count = usize::from(entry[0]);
674 let sector_offset =
675 usize::from(entry[1]) | (usize::from(entry[2]) << 8) | (usize::from(entry[3]) << 16);
676 if sector_count == 0 || sector_offset == 0 {
677 continue;
678 }
679 let Some(byte_offset) = sector_offset.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES) else {
680 continue;
681 };
682 let Some(payload) = pocket_chunk_payload(bytes, byte_offset, sector_count) else {
683 log::warn!(
684 "skipping invalid chunks.dat entry (index={index}, sector_offset={sector_offset}, sector_count={sector_count})"
685 );
686 continue;
687 };
688 let local_x = i32::try_from(index % 32).unwrap_or(0);
689 let local_z = i32::try_from(index / 32).unwrap_or(0);
690 let pos = ChunkPos {
691 x: origin_chunk_x.saturating_add(local_x),
692 z: origin_chunk_z.saturating_add(local_z),
693 dimension: Dimension::Overworld,
694 };
695 values.insert(
696 ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain)
697 .encode()
698 .to_vec(),
699 convert_pocket_terrain_to_legacy(payload),
700 );
701 }
702 Ok(values)
703}
704
705fn pocket_chunk_payload(bytes: &[u8], byte_offset: usize, sector_count: usize) -> Option<&[u8]> {
706 let sector_bytes = sector_count.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES)?;
707 let max_end = byte_offset.checked_add(sector_bytes)?.min(bytes.len());
708 if byte_offset >= bytes.len() || byte_offset >= max_end {
709 return None;
710 }
711 let available = &bytes[byte_offset..max_end];
712 if available.len() >= 4 {
713 let declared_len = u32::from_le_bytes(available[0..4].try_into().ok()?) as usize;
714 if declared_len == POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN
715 && available.len() >= 4 + declared_len
716 {
717 return Some(&available[4..4 + declared_len]);
718 }
719 if declared_len == LEGACY_TERRAIN_VALUE_LEN && available.len() >= 4 + declared_len {
720 return Some(&available[4..4 + POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
721 }
722 }
723 if available.len() >= POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN {
724 return Some(&available[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
725 }
726 None
727}
728
729fn convert_pocket_terrain_to_legacy(payload: &[u8]) -> Bytes {
730 if payload.len() == LEGACY_TERRAIN_VALUE_LEN {
731 return Bytes::copy_from_slice(payload);
732 }
733 let mut legacy = Vec::with_capacity(LEGACY_TERRAIN_VALUE_LEN);
734 legacy.extend_from_slice(&payload[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
735 for _ in 0..256 {
736 legacy.extend_from_slice(&DEFAULT_LEGACY_BIOME_SAMPLE);
737 }
738 Bytes::from(legacy)
739}
740
741fn read_limited_world_origin(world_path: &Path) -> (i32, i32) {
742 let Ok(document) = read_level_dat_document(&world_path.join("level.dat")) else {
743 return (0, 0);
744 };
745 let NbtTag::Compound(root) = document.root else {
746 return (0, 0);
747 };
748 (
749 nbt_i32(root.get("LimitedWorldOriginX")).unwrap_or(0),
750 nbt_i32(root.get("LimitedWorldOriginZ")).unwrap_or(0),
751 )
752}
753
754fn nbt_i32(tag: Option<&NbtTag>) -> Option<i32> {
755 match tag {
756 Some(NbtTag::Byte(value)) => Some(i32::from(*value)),
757 Some(NbtTag::Short(value)) => Some(i32::from(*value)),
758 Some(NbtTag::Int(value)) => Some(*value),
759 Some(NbtTag::Long(value)) => i32::try_from(*value).ok(),
760 _ => None,
761 }
762}
763
764fn pocket_chunks_dat_read_only_error() -> BedrockWorldError {
765 BedrockWorldError::UnsupportedChunkFormat("Pocket chunks.dat storage is read-only".to_string())
766}
767
768pub mod backend {
770 use super::*;
771
772 #[cfg(feature = "backend-bedrock-leveldb")]
773 #[derive(Clone)]
774 pub struct BedrockLevelDbStorage {
776 db: Arc<bedrock_leveldb::Db>,
777 }
778
779 #[cfg(feature = "backend-bedrock-leveldb")]
780 impl BedrockLevelDbStorage {
781 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
783 Self::open_inner(path, false)
784 }
785
786 pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self> {
788 Self::open_inner(path, true)
789 }
790
791 fn open_inner(path: impl AsRef<Path>, read_only: bool) -> Result<Self> {
792 let path = path.as_ref().to_path_buf();
793 if !path.exists() {
794 return Err(BedrockWorldError::Io(std::io::Error::new(
795 std::io::ErrorKind::NotFound,
796 format!("LevelDB path not found: {}", path.display()),
797 )));
798 }
799 let options = bedrock_leveldb::OpenOptions {
800 read_only,
801 create_if_missing: false,
802 error_if_exists: false,
803 paranoid_checks: true,
804 compression_policy: bedrock_leveldb::CompressionPolicy::Zlib,
805 cache_size: 64 * 1024 * 1024,
806 write_buffer_size: 4 * 1024 * 1024,
807 };
808 let db = bedrock_leveldb::Db::open(path, options).map_err(map_leveldb_error)?;
809 Ok(Self { db: Arc::new(db) })
810 }
811 }
812
813 #[cfg(feature = "backend-bedrock-leveldb")]
814 impl WorldStorage for BedrockLevelDbStorage {
815 fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
816 self.db.get(key).map_err(map_leveldb_error)
817 }
818
819 fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
820 self.db
821 .get_many_owned(
822 keys.iter().cloned(),
823 bedrock_leveldb::ReadOptions::default(),
824 )
825 .map_err(map_leveldb_error)
826 }
827
828 fn get_many_ordered_with_control(
829 &self,
830 keys: &[Bytes],
831 options: StorageReadOptions,
832 ) -> Result<Vec<Option<Bytes>>> {
833 check_cancelled(&options)?;
834 self.db
835 .get_many_owned(keys.iter().cloned(), to_leveldb_read_options(options))
836 .map_err(map_leveldb_error)
837 }
838
839 fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
840 self.db
841 .put(
842 Bytes::copy_from_slice(key),
843 Bytes::copy_from_slice(value),
844 bedrock_leveldb::WriteOptions::default(),
845 )
846 .map_err(map_leveldb_error)
847 }
848
849 fn delete(&self, key: &[u8]) -> Result<()> {
850 self.db
851 .delete(
852 Bytes::copy_from_slice(key),
853 bedrock_leveldb::WriteOptions::default(),
854 )
855 .map_err(map_leveldb_error)
856 }
857
858 fn for_each_key(
859 &self,
860 options: StorageReadOptions,
861 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
862 ) -> Result<StorageScanOutcome> {
863 let read_options = to_leveldb_read_options(options);
864 let mut visitor_error = None;
865 let scan_result = self
866 .db
867 .for_each_key(read_options, |key| match visitor(key) {
868 Ok(StorageVisitorControl::Continue) => {
869 Ok(bedrock_leveldb::VisitorControl::Continue)
870 }
871 Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
872 Err(error) => {
873 visitor_error = Some(error);
874 Ok(bedrock_leveldb::VisitorControl::Stop)
875 }
876 });
877 match (scan_result, visitor_error) {
878 (_, Some(error)) => Err(error),
879 (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
880 (Err(error), None) => Err(map_leveldb_error(error)),
881 }
882 }
883
884 fn for_each_prefix(
885 &self,
886 prefix: &[u8],
887 options: StorageReadOptions,
888 visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
889 ) -> Result<StorageScanOutcome> {
890 let read_options = to_leveldb_read_options(options);
891 let mut visitor_error = None;
892 let scan_result = self.db.for_each_prefix(prefix, read_options, |key, value| {
893 match visitor(key, value) {
894 Ok(StorageVisitorControl::Continue) => {
895 Ok(bedrock_leveldb::VisitorControl::Continue)
896 }
897 Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
898 Err(error) => {
899 visitor_error = Some(error);
900 Ok(bedrock_leveldb::VisitorControl::Stop)
901 }
902 }
903 });
904 match (scan_result, visitor_error) {
905 (_, Some(error)) => Err(error),
906 (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
907 (Err(error), None) => Err(map_leveldb_error(error)),
908 }
909 }
910
911 fn for_each_prefix_ref(
912 &self,
913 prefix: &[u8],
914 options: StorageReadOptions,
915 visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
916 ) -> Result<StorageScanOutcome> {
917 let mut read_options = to_leveldb_read_options(options);
918 read_options.read_strategy = bedrock_leveldb::ReadStrategy::Borrowed;
919 let mut visitor_error = None;
920 let scan_result = self.db.for_each_prefix_ref(prefix, read_options, |entry| {
921 match visitor(StorageEntryRef {
922 key: entry.key.as_bytes(),
923 value: entry.value.as_bytes(),
924 }) {
925 Ok(StorageVisitorControl::Continue) => {
926 Ok(bedrock_leveldb::VisitorControl::Continue)
927 }
928 Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
929 Err(error) => {
930 visitor_error = Some(error);
931 Ok(bedrock_leveldb::VisitorControl::Stop)
932 }
933 }
934 });
935 match (scan_result, visitor_error) {
936 (_, Some(error)) => Err(error),
937 (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
938 (Err(error), None) => Err(map_leveldb_error(error)),
939 }
940 }
941
942 fn for_each_prefix_key(
943 &self,
944 prefix: &[u8],
945 options: StorageReadOptions,
946 visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
947 ) -> Result<StorageScanOutcome> {
948 let read_options = to_leveldb_read_options(options);
949 let mut visitor_error = None;
950 let scan_result =
951 self.db
952 .for_each_prefix_key(prefix, read_options, |key| match visitor(key) {
953 Ok(StorageVisitorControl::Continue) => {
954 Ok(bedrock_leveldb::VisitorControl::Continue)
955 }
956 Ok(StorageVisitorControl::Stop) => {
957 Ok(bedrock_leveldb::VisitorControl::Stop)
958 }
959 Err(error) => {
960 visitor_error = Some(error);
961 Ok(bedrock_leveldb::VisitorControl::Stop)
962 }
963 });
964 match (scan_result, visitor_error) {
965 (_, Some(error)) => Err(error),
966 (Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
967 (Err(error), None) => Err(map_leveldb_error(error)),
968 }
969 }
970
971 fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
972 let mut db_batch = bedrock_leveldb::WriteBatch::new();
973 for op in batch.ops() {
974 match op {
975 StorageOp::Put { key, value } => db_batch.put(key.clone(), value.clone()),
976 StorageOp::Delete { key } => db_batch.delete(key.clone()),
977 }
978 }
979 self.db
980 .write(db_batch, bedrock_leveldb::WriteOptions::default())
981 .map_err(map_leveldb_error)
982 }
983
984 fn flush(&self) -> Result<()> {
985 self.db.flush().map_err(map_leveldb_error)
986 }
987 }
988
989 #[cfg(feature = "backend-bedrock-leveldb")]
990 fn map_leveldb_error(error: bedrock_leveldb::LevelDbError) -> BedrockWorldError {
991 match error.kind() {
992 bedrock_leveldb::ErrorKind::Cancelled => BedrockWorldError::Cancelled {
993 operation: "LevelDB scan",
994 },
995 bedrock_leveldb::ErrorKind::ReadOnly => BedrockWorldError::ReadOnly,
996 _ => BedrockWorldError::LevelDb(error.to_string()),
997 }
998 }
999
1000 #[cfg(feature = "backend-bedrock-leveldb")]
1001 fn to_leveldb_read_options(options: StorageReadOptions) -> bedrock_leveldb::ReadOptions {
1002 bedrock_leveldb::ReadOptions {
1003 checksum: bedrock_leveldb::ChecksumMode::Inherit,
1004 cache_policy: bedrock_leveldb::CachePolicy::Bypass,
1005 read_strategy: bedrock_leveldb::ReadStrategy::Shared,
1006 threading: match options.threading {
1007 StorageThreadingOptions::Auto => bedrock_leveldb::ThreadingOptions::Auto,
1008 StorageThreadingOptions::Fixed(threads) => {
1009 bedrock_leveldb::ThreadingOptions::Fixed(threads)
1010 }
1011 StorageThreadingOptions::Single => bedrock_leveldb::ThreadingOptions::Single,
1012 },
1013 scan_mode: match options.scan_mode {
1014 StorageScanMode::Sequential => bedrock_leveldb::ScanMode::Sequential,
1015 StorageScanMode::ParallelTables => bedrock_leveldb::ScanMode::ParallelTables,
1016 },
1017 pipeline: bedrock_leveldb::ScanPipelineOptions {
1018 queue_depth: options.pipeline.queue_depth,
1019 table_batch_size: options.pipeline.table_batch_size,
1020 progress_interval: options.pipeline.progress_interval,
1021 },
1022 cancel: options
1023 .cancel
1024 .map(|cancel| bedrock_leveldb::ScanCancelFlag::from_shared(cancel.0)),
1025 progress: options.progress.map(|progress| {
1026 bedrock_leveldb::ScanProgressSink::new(move |db_progress| {
1027 progress.emit(StorageScanProgress {
1028 entries_seen: db_progress.visited,
1029 bytes_read: db_progress.bytes_read,
1030 });
1031 })
1032 }),
1033 }
1034 }
1035
1036 #[cfg(feature = "backend-bedrock-leveldb")]
1037 const fn to_storage_outcome(outcome: bedrock_leveldb::ScanOutcome) -> StorageScanOutcome {
1038 StorageScanOutcome {
1039 visited: outcome.visited,
1040 bytes_read: outcome.bytes_read,
1041 stopped: outcome.stopped,
1042 tables_scanned: outcome.tables_scanned,
1043 worker_threads: outcome.worker_threads,
1044 queue_wait_ms: outcome.queue_wait_ms,
1045 cancel_checks: outcome.cancel_checks,
1046 }
1047 }
1048
1049 #[cfg(not(feature = "backend-bedrock-leveldb"))]
1050 #[derive(Debug, Clone, Copy)]
1051 pub struct BedrockLevelDbStorage;
1053
1054 #[cfg(not(feature = "backend-bedrock-leveldb"))]
1055 impl BedrockLevelDbStorage {
1056 pub fn open(_path: impl AsRef<Path>) -> Result<Self> {
1058 Err(BedrockWorldError::LevelDb(
1059 "backend-bedrock-leveldb feature is disabled".to_string(),
1060 ))
1061 }
1062
1063 pub fn open_read_only(_path: impl AsRef<Path>) -> Result<Self> {
1065 Err(BedrockWorldError::LevelDb(
1066 "backend-bedrock-leveldb feature is disabled".to_string(),
1067 ))
1068 }
1069 }
1070
1071 #[cfg(not(feature = "backend-bedrock-leveldb"))]
1072 impl WorldStorage for BedrockLevelDbStorage {
1073 fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
1074 Err(BedrockWorldError::LevelDb(
1075 "backend-bedrock-leveldb feature is disabled".to_string(),
1076 ))
1077 }
1078
1079 fn get_many(&self, _keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
1080 Err(BedrockWorldError::LevelDb(
1081 "backend-bedrock-leveldb feature is disabled".to_string(),
1082 ))
1083 }
1084
1085 fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
1086 Err(BedrockWorldError::LevelDb(
1087 "backend-bedrock-leveldb feature is disabled".to_string(),
1088 ))
1089 }
1090
1091 fn delete(&self, _key: &[u8]) -> Result<()> {
1092 Err(BedrockWorldError::LevelDb(
1093 "backend-bedrock-leveldb feature is disabled".to_string(),
1094 ))
1095 }
1096
1097 fn for_each_key(
1098 &self,
1099 _options: StorageReadOptions,
1100 _visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
1101 ) -> Result<StorageScanOutcome> {
1102 Err(BedrockWorldError::LevelDb(
1103 "backend-bedrock-leveldb feature is disabled".to_string(),
1104 ))
1105 }
1106
1107 fn for_each_prefix(
1108 &self,
1109 _prefix: &[u8],
1110 _options: StorageReadOptions,
1111 _visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
1112 ) -> Result<StorageScanOutcome> {
1113 Err(BedrockWorldError::LevelDb(
1114 "backend-bedrock-leveldb feature is disabled".to_string(),
1115 ))
1116 }
1117
1118 fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
1119 Err(BedrockWorldError::LevelDb(
1120 "backend-bedrock-leveldb feature is disabled".to_string(),
1121 ))
1122 }
1123
1124 fn flush(&self) -> Result<()> {
1125 Err(BedrockWorldError::LevelDb(
1126 "backend-bedrock-leveldb feature is disabled".to_string(),
1127 ))
1128 }
1129 }
1130}
1131
1132fn check_cancelled(options: &StorageReadOptions) -> Result<()> {
1133 if options
1134 .cancel
1135 .as_ref()
1136 .is_some_and(StorageCancelFlag::is_cancelled)
1137 {
1138 return Err(BedrockWorldError::Cancelled {
1139 operation: "storage scan",
1140 });
1141 }
1142 Ok(())
1143}
1144
1145fn emit_progress(options: &StorageReadOptions, outcome: StorageScanOutcome) {
1146 if let Some(progress) = &options.progress {
1147 progress.emit(StorageScanProgress {
1148 entries_seen: outcome.visited,
1149 bytes_read: outcome.bytes_read,
1150 });
1151 }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157 #[cfg(feature = "backend-bedrock-leveldb")]
1158 use std::time::{SystemTime, UNIX_EPOCH};
1159
1160 #[test]
1161 fn memory_storage_scans_prefix_without_copying_values() {
1162 let storage = MemoryStorage::new();
1163 storage.put(b"abc1", b"one").expect("put");
1164 storage.put(b"abc2", b"two").expect("put");
1165 storage.put(b"abd", b"three").expect("put");
1166
1167 let mut entries = Vec::new();
1168 storage
1169 .for_each_prefix(b"abc", StorageReadOptions::default(), &mut |key, value| {
1170 entries.push(StorageEntry {
1171 key: Bytes::copy_from_slice(key),
1172 value: value.clone(),
1173 });
1174 Ok(StorageVisitorControl::Continue)
1175 })
1176 .expect("scan");
1177 assert_eq!(entries.len(), 2);
1178 assert_eq!(entries[0].key, Bytes::from_static(b"abc1"));
1179 assert_eq!(entries[1].value, Bytes::from_static(b"two"));
1180 }
1181
1182 #[cfg(feature = "backend-bedrock-leveldb")]
1183 #[test]
1184 fn bedrock_leveldb_storage_roundtrips_raw_records() {
1185 let path = std::env::temp_dir().join(format!(
1186 "bedrock-world-storage-{}",
1187 SystemTime::now()
1188 .duration_since(UNIX_EPOCH)
1189 .expect("time")
1190 .as_nanos()
1191 ));
1192 std::fs::create_dir_all(&path).expect("create");
1193 drop(
1194 bedrock_leveldb::Db::open(&path, bedrock_leveldb::OpenOptions::default())
1195 .expect("initialize"),
1196 );
1197
1198 let storage = backend::BedrockLevelDbStorage::open(&path).expect("open");
1199 storage.put(b"player_1", b"one").expect("put");
1200 storage.put(b"player_2", b"two").expect("put");
1201 storage.flush().expect("flush");
1202
1203 let reopened = backend::BedrockLevelDbStorage::open(&path).expect("reopen");
1204 assert_eq!(
1205 reopened.get(b"player_1").expect("get"),
1206 Some(Bytes::from_static(b"one"))
1207 );
1208 let mut player_count = 0;
1209 reopened
1210 .for_each_prefix(
1211 b"player_",
1212 StorageReadOptions::default(),
1213 &mut |_key, _value| {
1214 player_count += 1;
1215 Ok(StorageVisitorControl::Continue)
1216 },
1217 )
1218 .expect("scan");
1219 assert_eq!(player_count, 2);
1220
1221 std::fs::remove_dir_all(path).expect("cleanup");
1222 }
1223
1224 #[test]
1225 fn pocket_chunks_dat_exposes_virtual_legacy_terrain_records() {
1226 let path = std::env::temp_dir().join(format!(
1227 "bedrock-world-pocket-chunks-{}",
1228 std::time::SystemTime::now()
1229 .duration_since(std::time::UNIX_EPOCH)
1230 .expect("time")
1231 .as_nanos()
1232 ));
1233 std::fs::create_dir_all(&path).expect("create world dir");
1234 let mut terrain = vec![0_u8; POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN];
1235 let block_index = (1_usize << 11) | (3_usize << 7) | 2_usize;
1236 let column_index = 3_usize * 16 + 1_usize;
1237 terrain[block_index] = 42;
1238 terrain[crate::LEGACY_TERRAIN_BLOCK_COUNT
1239 + (crate::LEGACY_TERRAIN_BLOCK_COUNT / 2) * 3
1240 + column_index] = 99;
1241 let mut chunks = vec![0_u8; POCKET_CHUNKS_DAT_SECTOR_BYTES];
1242 chunks[0] = 21;
1243 chunks[1] = 1;
1244 let mut payload = Vec::new();
1245 payload.extend_from_slice(&(POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN as u32).to_le_bytes());
1246 payload.extend_from_slice(&terrain);
1247 chunks.extend_from_slice(&payload);
1248 let padded_len = POCKET_CHUNKS_DAT_SECTOR_BYTES * 22;
1249 chunks.resize(padded_len, 0);
1250 std::fs::write(path.join("chunks.dat"), chunks).expect("write chunks.dat");
1251
1252 let storage = PocketChunksDatStorage::open(&path).expect("open pocket chunks");
1253 let pos = ChunkPos {
1254 x: 0,
1255 z: 0,
1256 dimension: Dimension::Overworld,
1257 };
1258 let legacy_key = ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain).encode();
1259 let missing_key = ChunkKey::new(
1260 ChunkPos {
1261 x: 1,
1262 z: 0,
1263 dimension: Dimension::Overworld,
1264 },
1265 ChunkRecordTag::LegacyTerrain,
1266 )
1267 .encode();
1268
1269 let values = storage
1270 .get_many(&[missing_key.clone(), legacy_key.clone()])
1271 .expect("get many");
1272 assert!(values[0].is_none());
1273 let Some(value) = &values[1] else {
1274 panic!("legacy terrain should be present");
1275 };
1276 assert_eq!(value.len(), LEGACY_TERRAIN_VALUE_LEN);
1277 assert_eq!(
1278 &value[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN],
1279 terrain.as_slice()
1280 );
1281 let terrain = crate::LegacyTerrain::parse(value.clone()).expect("legacy terrain");
1282 assert_eq!(terrain.block_id_at(1, 2, 3), Some(42));
1283 assert_eq!(terrain.height_at(1, 3), Some(99));
1284
1285 let mut keys = Vec::new();
1286 storage
1287 .for_each_key(StorageReadOptions::default(), &mut |key| {
1288 keys.push(Bytes::copy_from_slice(key));
1289 Ok(StorageVisitorControl::Continue)
1290 })
1291 .expect("scan keys");
1292 assert_eq!(keys, vec![legacy_key]);
1293 assert!(storage.put(b"x", b"y").is_err());
1294
1295 std::fs::remove_dir_all(path).expect("cleanup");
1296 }
1297}