use crate::chunk::{ChunkKey, ChunkPos, ChunkRecordTag, Dimension, LEGACY_TERRAIN_VALUE_LEN};
use crate::error::{BedrockWorldError, Result};
use crate::level_dat::read_level_dat_document;
use crate::nbt::NbtTag;
use bytes::Bytes;
use std::collections::BTreeMap;
use std::fs;
use std::path::Path;
use std::sync::{
Arc, RwLock,
atomic::{AtomicBool, Ordering},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageEntry {
pub key: Bytes,
pub value: Bytes,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StorageEntryRef<'a> {
pub key: &'a [u8],
pub value: &'a [u8],
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StorageOp {
Put {
key: Bytes,
value: Bytes,
},
Delete {
key: Bytes,
},
}
#[derive(Debug, Clone)]
pub struct StorageReadOptions {
pub threading: StorageThreadingOptions,
pub scan_mode: StorageScanMode,
pub pipeline: StoragePipelineOptions,
pub cancel: Option<StorageCancelFlag>,
pub progress: Option<StorageProgressSink>,
}
impl Default for StorageReadOptions {
fn default() -> Self {
Self {
threading: StorageThreadingOptions::Auto,
scan_mode: StorageScanMode::ParallelTables,
pipeline: StoragePipelineOptions::default(),
cancel: None,
progress: None,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct StoragePipelineOptions {
pub queue_depth: usize,
pub table_batch_size: usize,
pub progress_interval: usize,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum StorageThreadingOptions {
#[default]
Auto,
Fixed(usize),
Single,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum StorageScanMode {
#[default]
Sequential,
ParallelTables,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageVisitorControl {
Continue,
Stop,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct StorageScanOutcome {
pub visited: usize,
pub bytes_read: usize,
pub stopped: bool,
pub tables_scanned: usize,
pub worker_threads: usize,
pub queue_wait_ms: u128,
pub cancel_checks: usize,
}
impl StorageScanOutcome {
#[must_use]
pub const fn empty() -> Self {
Self {
visited: 0,
bytes_read: 0,
stopped: false,
tables_scanned: 0,
worker_threads: 0,
queue_wait_ms: 0,
cancel_checks: 0,
}
}
pub fn record(&mut self, value_len: usize) {
self.visited = self.visited.saturating_add(1);
self.bytes_read = self.bytes_read.saturating_add(value_len);
}
pub fn merge(&mut self, other: Self) {
self.visited = self.visited.saturating_add(other.visited);
self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
self.stopped |= other.stopped;
self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
self.worker_threads = self.worker_threads.max(other.worker_threads);
self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
}
}
#[derive(Debug, Clone, Default)]
pub struct StorageCancelFlag(Arc<AtomicBool>);
impl StorageCancelFlag {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn cancel(&self) {
self.0.store(true, Ordering::Relaxed);
}
#[must_use]
pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
Self(cancelled)
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct StorageProgressSink {
inner: Arc<dyn Fn(StorageScanProgress) + Send + Sync>,
}
impl std::fmt::Debug for StorageProgressSink {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("StorageProgressSink")
.finish_non_exhaustive()
}
}
impl StorageProgressSink {
#[must_use]
pub fn new(callback: impl Fn(StorageScanProgress) + Send + Sync + 'static) -> Self {
Self {
inner: Arc::new(callback),
}
}
pub fn emit(&self, progress: StorageScanProgress) {
(self.inner)(progress);
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct StorageScanProgress {
pub entries_seen: usize,
pub bytes_read: usize,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StorageBatch {
ops: Vec<StorageOp>,
}
impl StorageBatch {
#[must_use]
pub const fn new() -> Self {
Self { ops: Vec::new() }
}
pub fn put(&mut self, key: impl Into<Bytes>, value: impl Into<Bytes>) {
self.ops.push(StorageOp::Put {
key: key.into(),
value: value.into(),
});
}
pub fn delete(&mut self, key: impl Into<Bytes>) {
self.ops.push(StorageOp::Delete { key: key.into() });
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.ops.is_empty()
}
#[must_use]
pub fn ops(&self) -> &[StorageOp] {
&self.ops
}
}
pub trait WorldStorage: Send + Sync {
fn get(&self, key: &[u8]) -> Result<Option<Bytes>>;
fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
keys.iter().map(|key| self.get(key)).collect()
}
fn get_many_ordered_with_control(
&self,
keys: &[Bytes],
options: StorageReadOptions,
) -> Result<Vec<Option<Bytes>>> {
check_cancelled(&options)?;
let mut values = Vec::with_capacity(keys.len());
for key in keys {
check_cancelled(&options)?;
values.push(self.get(key)?);
}
Ok(values)
}
fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn delete(&self, key: &[u8]) -> Result<()>;
fn for_each_key(
&self,
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome>;
fn for_each_prefix(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome>;
fn for_each_prefix_ref(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
self.for_each_prefix(prefix, options, &mut |key, value| {
visitor(StorageEntryRef {
key,
value: value.as_ref(),
})
})
}
fn for_each_prefix_key(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
self.for_each_prefix(prefix, options, &mut |key, _value| visitor(key))
}
fn for_each_entry(
&self,
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
self.for_each_prefix(b"", options, visitor)
}
fn write_batch(&self, batch: &StorageBatch) -> Result<()>;
fn flush(&self) -> Result<()>;
}
#[derive(Debug, Clone, Default)]
pub struct MemoryStorage {
values: Arc<RwLock<BTreeMap<Vec<u8>, Bytes>>>,
}
impl MemoryStorage {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl WorldStorage for MemoryStorage {
fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
let values = self.values.read().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
Ok(values.get(key).cloned())
}
fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
let values = self.values.read().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
Ok(keys
.iter()
.map(|key| values.get(key.as_ref()).cloned())
.collect())
}
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let mut values = self.values.write().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
values.insert(key.to_vec(), Bytes::copy_from_slice(value));
Ok(())
}
fn delete(&self, key: &[u8]) -> Result<()> {
let mut values = self.values.write().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
values.remove(key);
Ok(())
}
fn for_each_key(
&self,
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let values = self.values.read().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
let mut outcome = StorageScanOutcome::empty();
for (key, value) in values.iter() {
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn for_each_prefix(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let values = self.values.read().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
let mut outcome = StorageScanOutcome::empty();
for (key, value) in values
.range(prefix.to_vec()..)
.take_while(|(key, _)| key.starts_with(prefix))
{
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key, value)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn for_each_prefix_key(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let values = self.values.read().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
let mut outcome = StorageScanOutcome::empty();
for (key, value) in values
.range(prefix.to_vec()..)
.take_while(|(key, _)| key.starts_with(prefix))
{
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
let mut values = self.values.write().map_err(|_| {
BedrockWorldError::ConcurrentWrite("memory storage poisoned".to_string())
})?;
for op in batch.ops() {
match op {
StorageOp::Put { key, value } => {
values.insert(key.to_vec(), value.clone());
}
StorageOp::Delete { key } => {
values.remove(key.as_ref());
}
}
}
Ok(())
}
fn flush(&self) -> Result<()> {
Ok(())
}
}
pub const POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN: usize = 82_176;
const POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN: usize = 4 * 32 * 32;
const POCKET_CHUNKS_DAT_SECTOR_BYTES: usize = 4096;
const DEFAULT_LEGACY_BIOME_SAMPLE: [u8; 4] = [1, 0x7f, 0xb2, 0x38];
#[derive(Debug, Clone)]
pub struct PocketChunksDatStorage {
values: Arc<BTreeMap<Vec<u8>, Bytes>>,
origin_chunk_x: i32,
origin_chunk_z: i32,
}
impl PocketChunksDatStorage {
pub fn open(world_path: impl AsRef<Path>) -> Result<Self> {
let world_path = world_path.as_ref();
let chunks_path = world_path.join("chunks.dat");
let bytes = fs::read(&chunks_path)?;
let (origin_chunk_x, origin_chunk_z) = read_limited_world_origin(world_path);
let values = parse_pocket_chunks_dat(&bytes, origin_chunk_x, origin_chunk_z)?;
if world_path.join("entities.dat").is_file() {
match fs::read(world_path.join("entities.dat")) {
Ok(bytes) => log::debug!(
"legacy entities.dat present (bytes={}, parser=best_effort_skip)",
bytes.len()
),
Err(error) => log::warn!("failed to read legacy entities.dat: {error}"),
}
}
log::debug!(
"opened Pocket chunks.dat storage (chunks={}, origin=({}, {}), path={})",
values.len(),
origin_chunk_x,
origin_chunk_z,
chunks_path.display()
);
Ok(Self {
values: Arc::new(values),
origin_chunk_x,
origin_chunk_z,
})
}
#[must_use]
pub const fn origin_chunk_x(&self) -> i32 {
self.origin_chunk_x
}
#[must_use]
pub const fn origin_chunk_z(&self) -> i32 {
self.origin_chunk_z
}
}
impl WorldStorage for PocketChunksDatStorage {
fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
Ok(self.values.get(key).cloned())
}
fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
Ok(keys
.iter()
.map(|key| self.values.get(key.as_ref()).cloned())
.collect())
}
fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
Err(pocket_chunks_dat_read_only_error())
}
fn delete(&self, _key: &[u8]) -> Result<()> {
Err(pocket_chunks_dat_read_only_error())
}
fn for_each_key(
&self,
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let mut outcome = StorageScanOutcome::empty();
for (key, value) in self.values.iter() {
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn for_each_prefix(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let mut outcome = StorageScanOutcome::empty();
for (key, value) in self
.values
.range(prefix.to_vec()..)
.take_while(|(key, _)| key.starts_with(prefix))
{
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key, value)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn for_each_prefix_key(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let mut outcome = StorageScanOutcome::empty();
for (key, value) in self
.values
.range(prefix.to_vec()..)
.take_while(|(key, _)| key.starts_with(prefix))
{
check_cancelled(&options)?;
outcome.record(value.len());
if visitor(key)? == StorageVisitorControl::Stop {
outcome.stopped = true;
return Ok(outcome);
}
emit_progress(&options, outcome);
}
Ok(outcome)
}
fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
Err(pocket_chunks_dat_read_only_error())
}
fn flush(&self) -> Result<()> {
Ok(())
}
}
fn parse_pocket_chunks_dat(
bytes: &[u8],
origin_chunk_x: i32,
origin_chunk_z: i32,
) -> Result<BTreeMap<Vec<u8>, Bytes>> {
if bytes.len() < POCKET_CHUNKS_DAT_LOCATION_TABLE_LEN {
return Err(BedrockWorldError::CorruptWorld(format!(
"chunks.dat is too small for its location table: {} bytes",
bytes.len()
)));
}
let mut values = BTreeMap::new();
for index in 0..(32 * 32) {
let entry_offset = index * 4;
let entry = &bytes[entry_offset..entry_offset + 4];
if entry == [0, 0, 0, 0] {
continue;
}
let sector_count = usize::from(entry[0]);
let sector_offset =
usize::from(entry[1]) | (usize::from(entry[2]) << 8) | (usize::from(entry[3]) << 16);
if sector_count == 0 || sector_offset == 0 {
continue;
}
let Some(byte_offset) = sector_offset.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES) else {
continue;
};
let Some(payload) = pocket_chunk_payload(bytes, byte_offset, sector_count) else {
log::warn!(
"skipping invalid chunks.dat entry (index={index}, sector_offset={sector_offset}, sector_count={sector_count})"
);
continue;
};
let local_x = i32::try_from(index % 32).unwrap_or(0);
let local_z = i32::try_from(index / 32).unwrap_or(0);
let pos = ChunkPos {
x: origin_chunk_x.saturating_add(local_x),
z: origin_chunk_z.saturating_add(local_z),
dimension: Dimension::Overworld,
};
values.insert(
ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain)
.encode()
.to_vec(),
convert_pocket_terrain_to_legacy(payload),
);
}
Ok(values)
}
fn pocket_chunk_payload(bytes: &[u8], byte_offset: usize, sector_count: usize) -> Option<&[u8]> {
let sector_bytes = sector_count.checked_mul(POCKET_CHUNKS_DAT_SECTOR_BYTES)?;
let max_end = byte_offset.checked_add(sector_bytes)?.min(bytes.len());
if byte_offset >= bytes.len() || byte_offset >= max_end {
return None;
}
let available = &bytes[byte_offset..max_end];
if available.len() >= 4 {
let declared_len = u32::from_le_bytes(available[0..4].try_into().ok()?) as usize;
if declared_len == POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN
&& available.len() >= 4 + declared_len
{
return Some(&available[4..4 + declared_len]);
}
if declared_len == LEGACY_TERRAIN_VALUE_LEN && available.len() >= 4 + declared_len {
return Some(&available[4..4 + POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
}
}
if available.len() >= POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN {
return Some(&available[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
}
None
}
fn convert_pocket_terrain_to_legacy(payload: &[u8]) -> Bytes {
if payload.len() == LEGACY_TERRAIN_VALUE_LEN {
return Bytes::copy_from_slice(payload);
}
let mut legacy = Vec::with_capacity(LEGACY_TERRAIN_VALUE_LEN);
legacy.extend_from_slice(&payload[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN]);
for _ in 0..256 {
legacy.extend_from_slice(&DEFAULT_LEGACY_BIOME_SAMPLE);
}
Bytes::from(legacy)
}
fn read_limited_world_origin(world_path: &Path) -> (i32, i32) {
let Ok(document) = read_level_dat_document(&world_path.join("level.dat")) else {
return (0, 0);
};
let NbtTag::Compound(root) = document.root else {
return (0, 0);
};
(
nbt_i32(root.get("LimitedWorldOriginX")).unwrap_or(0),
nbt_i32(root.get("LimitedWorldOriginZ")).unwrap_or(0),
)
}
fn nbt_i32(tag: Option<&NbtTag>) -> Option<i32> {
match tag {
Some(NbtTag::Byte(value)) => Some(i32::from(*value)),
Some(NbtTag::Short(value)) => Some(i32::from(*value)),
Some(NbtTag::Int(value)) => Some(*value),
Some(NbtTag::Long(value)) => i32::try_from(*value).ok(),
_ => None,
}
}
fn pocket_chunks_dat_read_only_error() -> BedrockWorldError {
BedrockWorldError::UnsupportedChunkFormat("Pocket chunks.dat storage is read-only".to_string())
}
pub mod backend {
use super::*;
#[cfg(feature = "backend-bedrock-leveldb")]
#[derive(Clone)]
pub struct BedrockLevelDbStorage {
db: Arc<bedrock_leveldb::Db>,
}
#[cfg(feature = "backend-bedrock-leveldb")]
impl BedrockLevelDbStorage {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Self::open_inner(path, false)
}
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self> {
Self::open_inner(path, true)
}
fn open_inner(path: impl AsRef<Path>, read_only: bool) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
return Err(BedrockWorldError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("LevelDB path not found: {}", path.display()),
)));
}
let options = bedrock_leveldb::OpenOptions {
read_only,
create_if_missing: false,
error_if_exists: false,
paranoid_checks: true,
compression_policy: bedrock_leveldb::CompressionPolicy::Zlib,
cache_size: 64 * 1024 * 1024,
write_buffer_size: 4 * 1024 * 1024,
};
let db = bedrock_leveldb::Db::open(path, options).map_err(map_leveldb_error)?;
Ok(Self { db: Arc::new(db) })
}
}
#[cfg(feature = "backend-bedrock-leveldb")]
impl WorldStorage for BedrockLevelDbStorage {
fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
self.db.get(key).map_err(map_leveldb_error)
}
fn get_many(&self, keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
self.db
.get_many_owned(
keys.iter().cloned(),
bedrock_leveldb::ReadOptions::default(),
)
.map_err(map_leveldb_error)
}
fn get_many_ordered_with_control(
&self,
keys: &[Bytes],
options: StorageReadOptions,
) -> Result<Vec<Option<Bytes>>> {
check_cancelled(&options)?;
self.db
.get_many_owned(keys.iter().cloned(), to_leveldb_read_options(options))
.map_err(map_leveldb_error)
}
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.db
.put(
Bytes::copy_from_slice(key),
Bytes::copy_from_slice(value),
bedrock_leveldb::WriteOptions::default(),
)
.map_err(map_leveldb_error)
}
fn delete(&self, key: &[u8]) -> Result<()> {
self.db
.delete(
Bytes::copy_from_slice(key),
bedrock_leveldb::WriteOptions::default(),
)
.map_err(map_leveldb_error)
}
fn for_each_key(
&self,
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let read_options = to_leveldb_read_options(options);
let mut visitor_error = None;
let scan_result = self
.db
.for_each_key(read_options, |key| match visitor(key) {
Ok(StorageVisitorControl::Continue) => {
Ok(bedrock_leveldb::VisitorControl::Continue)
}
Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
Err(error) => {
visitor_error = Some(error);
Ok(bedrock_leveldb::VisitorControl::Stop)
}
});
match (scan_result, visitor_error) {
(_, Some(error)) => Err(error),
(Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
(Err(error), None) => Err(map_leveldb_error(error)),
}
}
fn for_each_prefix(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let read_options = to_leveldb_read_options(options);
let mut visitor_error = None;
let scan_result = self.db.for_each_prefix(prefix, read_options, |key, value| {
match visitor(key, value) {
Ok(StorageVisitorControl::Continue) => {
Ok(bedrock_leveldb::VisitorControl::Continue)
}
Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
Err(error) => {
visitor_error = Some(error);
Ok(bedrock_leveldb::VisitorControl::Stop)
}
}
});
match (scan_result, visitor_error) {
(_, Some(error)) => Err(error),
(Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
(Err(error), None) => Err(map_leveldb_error(error)),
}
}
fn for_each_prefix_ref(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(StorageEntryRef<'_>) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let mut read_options = to_leveldb_read_options(options);
read_options.read_strategy = bedrock_leveldb::ReadStrategy::Borrowed;
let mut visitor_error = None;
let scan_result = self.db.for_each_prefix_ref(prefix, read_options, |entry| {
match visitor(StorageEntryRef {
key: entry.key.as_bytes(),
value: entry.value.as_bytes(),
}) {
Ok(StorageVisitorControl::Continue) => {
Ok(bedrock_leveldb::VisitorControl::Continue)
}
Ok(StorageVisitorControl::Stop) => Ok(bedrock_leveldb::VisitorControl::Stop),
Err(error) => {
visitor_error = Some(error);
Ok(bedrock_leveldb::VisitorControl::Stop)
}
}
});
match (scan_result, visitor_error) {
(_, Some(error)) => Err(error),
(Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
(Err(error), None) => Err(map_leveldb_error(error)),
}
}
fn for_each_prefix_key(
&self,
prefix: &[u8],
options: StorageReadOptions,
visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
let read_options = to_leveldb_read_options(options);
let mut visitor_error = None;
let scan_result =
self.db
.for_each_prefix_key(prefix, read_options, |key| match visitor(key) {
Ok(StorageVisitorControl::Continue) => {
Ok(bedrock_leveldb::VisitorControl::Continue)
}
Ok(StorageVisitorControl::Stop) => {
Ok(bedrock_leveldb::VisitorControl::Stop)
}
Err(error) => {
visitor_error = Some(error);
Ok(bedrock_leveldb::VisitorControl::Stop)
}
});
match (scan_result, visitor_error) {
(_, Some(error)) => Err(error),
(Ok(outcome), None) => Ok(to_storage_outcome(outcome)),
(Err(error), None) => Err(map_leveldb_error(error)),
}
}
fn write_batch(&self, batch: &StorageBatch) -> Result<()> {
let mut db_batch = bedrock_leveldb::WriteBatch::new();
for op in batch.ops() {
match op {
StorageOp::Put { key, value } => db_batch.put(key.clone(), value.clone()),
StorageOp::Delete { key } => db_batch.delete(key.clone()),
}
}
self.db
.write(db_batch, bedrock_leveldb::WriteOptions::default())
.map_err(map_leveldb_error)
}
fn flush(&self) -> Result<()> {
self.db.flush().map_err(map_leveldb_error)
}
}
#[cfg(feature = "backend-bedrock-leveldb")]
fn map_leveldb_error(error: bedrock_leveldb::LevelDbError) -> BedrockWorldError {
match error.kind() {
bedrock_leveldb::ErrorKind::Cancelled => BedrockWorldError::Cancelled {
operation: "LevelDB scan",
},
bedrock_leveldb::ErrorKind::ReadOnly => BedrockWorldError::ReadOnly,
_ => BedrockWorldError::LevelDb(error.to_string()),
}
}
#[cfg(feature = "backend-bedrock-leveldb")]
fn to_leveldb_read_options(options: StorageReadOptions) -> bedrock_leveldb::ReadOptions {
bedrock_leveldb::ReadOptions {
checksum: bedrock_leveldb::ChecksumMode::Inherit,
cache_policy: bedrock_leveldb::CachePolicy::Bypass,
read_strategy: bedrock_leveldb::ReadStrategy::Shared,
threading: match options.threading {
StorageThreadingOptions::Auto => bedrock_leveldb::ThreadingOptions::Auto,
StorageThreadingOptions::Fixed(threads) => {
bedrock_leveldb::ThreadingOptions::Fixed(threads)
}
StorageThreadingOptions::Single => bedrock_leveldb::ThreadingOptions::Single,
},
scan_mode: match options.scan_mode {
StorageScanMode::Sequential => bedrock_leveldb::ScanMode::Sequential,
StorageScanMode::ParallelTables => bedrock_leveldb::ScanMode::ParallelTables,
},
pipeline: bedrock_leveldb::ScanPipelineOptions {
queue_depth: options.pipeline.queue_depth,
table_batch_size: options.pipeline.table_batch_size,
progress_interval: options.pipeline.progress_interval,
},
cancel: options
.cancel
.map(|cancel| bedrock_leveldb::ScanCancelFlag::from_shared(cancel.0)),
progress: options.progress.map(|progress| {
bedrock_leveldb::ScanProgressSink::new(move |db_progress| {
progress.emit(StorageScanProgress {
entries_seen: db_progress.visited,
bytes_read: db_progress.bytes_read,
});
})
}),
}
}
#[cfg(feature = "backend-bedrock-leveldb")]
const fn to_storage_outcome(outcome: bedrock_leveldb::ScanOutcome) -> StorageScanOutcome {
StorageScanOutcome {
visited: outcome.visited,
bytes_read: outcome.bytes_read,
stopped: outcome.stopped,
tables_scanned: outcome.tables_scanned,
worker_threads: outcome.worker_threads,
queue_wait_ms: outcome.queue_wait_ms,
cancel_checks: outcome.cancel_checks,
}
}
#[cfg(not(feature = "backend-bedrock-leveldb"))]
#[derive(Debug, Clone, Copy)]
pub struct BedrockLevelDbStorage;
#[cfg(not(feature = "backend-bedrock-leveldb"))]
impl BedrockLevelDbStorage {
pub fn open(_path: impl AsRef<Path>) -> Result<Self> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
pub fn open_read_only(_path: impl AsRef<Path>) -> Result<Self> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
}
#[cfg(not(feature = "backend-bedrock-leveldb"))]
impl WorldStorage for BedrockLevelDbStorage {
fn get(&self, _key: &[u8]) -> Result<Option<Bytes>> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn get_many(&self, _keys: &[Bytes]) -> Result<Vec<Option<Bytes>>> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn delete(&self, _key: &[u8]) -> Result<()> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn for_each_key(
&self,
_options: StorageReadOptions,
_visitor: &mut (dyn FnMut(&[u8]) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn for_each_prefix(
&self,
_prefix: &[u8],
_options: StorageReadOptions,
_visitor: &mut (dyn FnMut(&[u8], &Bytes) -> Result<StorageVisitorControl> + Send),
) -> Result<StorageScanOutcome> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn write_batch(&self, _batch: &StorageBatch) -> Result<()> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
fn flush(&self) -> Result<()> {
Err(BedrockWorldError::LevelDb(
"backend-bedrock-leveldb feature is disabled".to_string(),
))
}
}
}
fn check_cancelled(options: &StorageReadOptions) -> Result<()> {
if options
.cancel
.as_ref()
.is_some_and(StorageCancelFlag::is_cancelled)
{
return Err(BedrockWorldError::Cancelled {
operation: "storage scan",
});
}
Ok(())
}
fn emit_progress(options: &StorageReadOptions, outcome: StorageScanOutcome) {
if let Some(progress) = &options.progress {
progress.emit(StorageScanProgress {
entries_seen: outcome.visited,
bytes_read: outcome.bytes_read,
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "backend-bedrock-leveldb")]
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn memory_storage_scans_prefix_without_copying_values() {
let storage = MemoryStorage::new();
storage.put(b"abc1", b"one").expect("put");
storage.put(b"abc2", b"two").expect("put");
storage.put(b"abd", b"three").expect("put");
let mut entries = Vec::new();
storage
.for_each_prefix(b"abc", StorageReadOptions::default(), &mut |key, value| {
entries.push(StorageEntry {
key: Bytes::copy_from_slice(key),
value: value.clone(),
});
Ok(StorageVisitorControl::Continue)
})
.expect("scan");
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].key, Bytes::from_static(b"abc1"));
assert_eq!(entries[1].value, Bytes::from_static(b"two"));
}
#[cfg(feature = "backend-bedrock-leveldb")]
#[test]
fn bedrock_leveldb_storage_roundtrips_raw_records() {
let path = std::env::temp_dir().join(format!(
"bedrock-world-storage-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time")
.as_nanos()
));
std::fs::create_dir_all(&path).expect("create");
drop(
bedrock_leveldb::Db::open(&path, bedrock_leveldb::OpenOptions::default())
.expect("initialize"),
);
let storage = backend::BedrockLevelDbStorage::open(&path).expect("open");
storage.put(b"player_1", b"one").expect("put");
storage.put(b"player_2", b"two").expect("put");
storage.flush().expect("flush");
let reopened = backend::BedrockLevelDbStorage::open(&path).expect("reopen");
assert_eq!(
reopened.get(b"player_1").expect("get"),
Some(Bytes::from_static(b"one"))
);
let mut player_count = 0;
reopened
.for_each_prefix(
b"player_",
StorageReadOptions::default(),
&mut |_key, _value| {
player_count += 1;
Ok(StorageVisitorControl::Continue)
},
)
.expect("scan");
assert_eq!(player_count, 2);
std::fs::remove_dir_all(path).expect("cleanup");
}
#[test]
fn pocket_chunks_dat_exposes_virtual_legacy_terrain_records() {
let path = std::env::temp_dir().join(format!(
"bedrock-world-pocket-chunks-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
));
std::fs::create_dir_all(&path).expect("create world dir");
let mut terrain = vec![0_u8; POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN];
let block_index = (1_usize << 11) | (3_usize << 7) | 2_usize;
let column_index = 3_usize * 16 + 1_usize;
terrain[block_index] = 42;
terrain[crate::LEGACY_TERRAIN_BLOCK_COUNT
+ (crate::LEGACY_TERRAIN_BLOCK_COUNT / 2) * 3
+ column_index] = 99;
let mut chunks = vec![0_u8; POCKET_CHUNKS_DAT_SECTOR_BYTES];
chunks[0] = 21;
chunks[1] = 1;
let mut payload = Vec::new();
payload.extend_from_slice(&(POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN as u32).to_le_bytes());
payload.extend_from_slice(&terrain);
chunks.extend_from_slice(&payload);
let padded_len = POCKET_CHUNKS_DAT_SECTOR_BYTES * 22;
chunks.resize(padded_len, 0);
std::fs::write(path.join("chunks.dat"), chunks).expect("write chunks.dat");
let storage = PocketChunksDatStorage::open(&path).expect("open pocket chunks");
let pos = ChunkPos {
x: 0,
z: 0,
dimension: Dimension::Overworld,
};
let legacy_key = ChunkKey::new(pos, ChunkRecordTag::LegacyTerrain).encode();
let missing_key = ChunkKey::new(
ChunkPos {
x: 1,
z: 0,
dimension: Dimension::Overworld,
},
ChunkRecordTag::LegacyTerrain,
)
.encode();
let values = storage
.get_many(&[missing_key.clone(), legacy_key.clone()])
.expect("get many");
assert!(values[0].is_none());
let Some(value) = &values[1] else {
panic!("legacy terrain should be present");
};
assert_eq!(value.len(), LEGACY_TERRAIN_VALUE_LEN);
assert_eq!(
&value[..POCKET_CHUNKS_DAT_TERRAIN_VALUE_LEN],
terrain.as_slice()
);
let terrain = crate::LegacyTerrain::parse(value.clone()).expect("legacy terrain");
assert_eq!(terrain.block_id_at(1, 2, 3), Some(42));
assert_eq!(terrain.height_at(1, 3), Some(99));
let mut keys = Vec::new();
storage
.for_each_key(StorageReadOptions::default(), &mut |key| {
keys.push(Bytes::copy_from_slice(key));
Ok(StorageVisitorControl::Continue)
})
.expect("scan keys");
assert_eq!(keys, vec![legacy_key]);
assert!(storage.put(b"x", b"y").is_err());
std::fs::remove_dir_all(path).expect("cleanup");
}
}