use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SyncMode {
None,
Periodic,
Always,
MetadataOnly,
}
pub type Address = u64;
pub type Key = Vec<u8>;
pub type Value = Vec<u8>;
pub const PAGE_SIZE: u32 = 32 * 1024 * 1024;
pub const ADDRESS_BITS: u64 = 48;
pub const OFFSET_BITS: u64 = 25;
pub const PAGE_BITS: u64 = ADDRESS_BITS - OFFSET_BITS; pub const MAX_OFFSET: u32 = (1u32 << OFFSET_BITS) - 1;
pub const MAX_PAGE: u32 = (1u32 << PAGE_BITS) - 1;
pub const INVALID_ADDRESS: Address = 1;
#[inline]
pub fn get_page(address: Address) -> u32 {
((address >> OFFSET_BITS) & ((1u64 << PAGE_BITS) - 1)) as u32
}
#[inline]
pub fn get_offset(address: Address) -> u32 {
(address & ((1u64 << OFFSET_BITS) - 1)) as u32
}
#[inline]
pub fn make_address(page: u32, offset: u32) -> Address {
((page as u64) << OFFSET_BITS) | (offset as u64)
}
#[derive(Error, Debug)]
pub enum RsKvError {
#[error("IO Error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization Error: {0}")]
Serialization(#[from] bincode::Error),
#[error("Key not found")]
KeyNotFound,
#[error("Address out of bounds: {address}")]
AddressOutOfBounds { address: Address },
#[error("Page not found: {page}")]
PageNotFound { page: u32 },
#[error("Allocation failed: size {size}")]
AllocationFailed { size: u32 },
#[error("Checkpoint operation failed: {message}")]
CheckpointFailed { message: String },
#[error("Recovery operation failed: {message}")]
RecoveryFailed { message: String },
#[error("Garbage collection failed: {message}")]
GarbageCollectionFailed { message: String },
#[error("Configuration error: {message}")]
Configuration { message: String },
#[error("Invalid configuration: {message}")]
InvalidConfig { message: String },
#[error("Key size {size} bytes exceeds maximum allowed size {max_size} bytes")]
KeyTooLarge { size: usize, max_size: usize },
#[error("Value size {size} bytes exceeds maximum allowed size {max_size} bytes")]
ValueTooLarge { size: usize, max_size: usize },
#[error("Storage device error: {message}")]
StorageError { message: String },
#[error("Memory mapping error: {message}")]
MmapError { message: String },
#[error("Data corruption detected: {message}")]
Corruption { message: String },
#[error("Resource exhausted: {resource}")]
ResourceExhausted { resource: String },
#[error("Operation timed out after {duration_ms} ms")]
Timeout { duration_ms: u64 },
#[error("Concurrent operation conflict: {message}")]
Conflict { message: String },
#[error("Internal error: {message}")]
Internal { message: String },
}
impl RsKvError {
pub fn is_recoverable(&self) -> bool {
matches!(
self,
RsKvError::Io(_)
| RsKvError::Timeout { .. }
| RsKvError::Conflict { .. }
| RsKvError::ResourceExhausted { .. }
| RsKvError::StorageError { .. }
| RsKvError::MmapError { .. }
)
}
pub fn is_corruption(&self) -> bool {
matches!(self, RsKvError::Corruption { .. })
}
pub fn is_user_error(&self) -> bool {
matches!(
self,
RsKvError::KeyNotFound
| RsKvError::KeyTooLarge { .. }
| RsKvError::ValueTooLarge { .. }
| RsKvError::InvalidConfig { .. }
| RsKvError::Configuration { .. }
)
}
pub fn category(&self) -> &'static str {
match self {
RsKvError::Io(_) => "io",
RsKvError::Serialization(_) => "serialization",
RsKvError::AddressOutOfBounds { .. } => "addressing",
RsKvError::PageNotFound { .. } => "addressing",
RsKvError::AllocationFailed { .. } => "allocation",
RsKvError::KeyNotFound => "not_found",
RsKvError::KeyTooLarge { .. } | RsKvError::ValueTooLarge { .. } => "size_limit",
RsKvError::CheckpointFailed { .. } => "checkpoint",
RsKvError::RecoveryFailed { .. } => "recovery",
RsKvError::GarbageCollectionFailed { .. } => "garbage_collection",
RsKvError::Configuration { .. } | RsKvError::InvalidConfig { .. } => "configuration",
RsKvError::StorageError { .. } => "storage",
RsKvError::MmapError { .. } => "memory_mapping",
RsKvError::Corruption { .. } => "corruption",
RsKvError::ResourceExhausted { .. } => "resource_exhausted",
RsKvError::Timeout { .. } => "timeout",
RsKvError::Conflict { .. } => "conflict",
RsKvError::Internal { .. } => "internal",
}
}
}
impl From<std::num::TryFromIntError> for RsKvError {
fn from(err: std::num::TryFromIntError) -> Self {
RsKvError::Internal {
message: format!("Integer conversion error: {err}"),
}
}
}
pub type Result<T> = std::result::Result<T, RsKvError>;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct RecordInfo {
pub previous_address: Address,
pub checkpoint_version: u16,
pub invalid: bool,
pub tombstone: bool,
pub final_bit: bool,
}
impl RecordInfo {
pub fn new(
previous_address: Address,
checkpoint_version: u16,
final_bit: bool,
tombstone: bool,
invalid: bool,
) -> Self {
Self {
previous_address,
checkpoint_version,
invalid,
tombstone,
final_bit,
}
}
pub fn is_null(&self) -> bool {
self.previous_address == 0 && self.checkpoint_version == 0
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub memory_size: u64,
pub page_size: u32,
pub storage_dir: String,
pub enable_checkpointing: bool,
pub checkpoint_interval_ms: u64,
pub enable_gc: bool,
pub gc_interval_ms: u64,
pub max_background_threads: usize,
pub use_mmap: bool,
pub enable_readahead: bool,
pub readahead_size: usize,
pub enable_write_batching: bool,
pub write_batch_size: usize,
pub enable_compression: bool,
pub sync_mode: SyncMode,
pub preallocate_log: bool,
pub log_prealloc_size: u64,
}
impl Config {
pub fn validate(&self) -> Result<()> {
if self.memory_size < 1024 * 1024 {
return Err(RsKvError::InvalidConfig {
message: "Memory size must be at least 1MB".to_string(),
});
}
if self.memory_size > 64 * 1024 * 1024 * 1024 {
return Err(RsKvError::InvalidConfig {
message: "Memory size cannot exceed 64GB".to_string(),
});
}
if self.page_size < 4096 {
return Err(RsKvError::InvalidConfig {
message: "Page size must be at least 4KB".to_string(),
});
}
if !self.page_size.is_power_of_two() {
return Err(RsKvError::InvalidConfig {
message: "Page size must be a power of 2".to_string(),
});
}
if u64::from(self.page_size) > self.memory_size {
return Err(RsKvError::InvalidConfig {
message: "Page size cannot be larger than memory size".to_string(),
});
}
if self.storage_dir.is_empty() {
return Err(RsKvError::InvalidConfig {
message: "Storage directory cannot be empty".to_string(),
});
}
if self.checkpoint_interval_ms < 100 {
return Err(RsKvError::InvalidConfig {
message: "Checkpoint interval must be at least 100ms".to_string(),
});
}
if self.gc_interval_ms < 1000 {
return Err(RsKvError::InvalidConfig {
message: "GC interval must be at least 1000ms".to_string(),
});
}
if self.max_background_threads == 0 {
return Err(RsKvError::InvalidConfig {
message: "Maximum background threads must be at least 1".to_string(),
});
}
if self.max_background_threads > 32 {
return Err(RsKvError::InvalidConfig {
message: "Maximum background threads cannot exceed 32".to_string(),
});
}
if self.checkpoint_interval_ms > self.gc_interval_ms {
log::warn!(
"Checkpoint interval ({} ms) is longer than GC interval ({} ms), this might cause \
performance issues",
self.checkpoint_interval_ms,
self.gc_interval_ms
);
}
Ok(())
}
pub fn with_memory_size(memory_size: u64) -> Result<Self> {
let mut config = Self {
memory_size,
..Self::default()
};
if memory_size >= 8 * 1024 * 1024 * 1024 {
config.page_size = 64 * 1024 * 1024;
} else if memory_size >= 1024 * 1024 * 1024 {
config.page_size = 32 * 1024 * 1024;
} else if memory_size >= 256 * 1024 * 1024 {
config.page_size = 16 * 1024 * 1024;
} else {
config.page_size = 8 * 1024 * 1024;
}
config.validate()?;
Ok(config)
}
pub fn high_performance() -> Result<Self> {
let config = Self {
memory_size: 4 * 1024 * 1024 * 1024, page_size: 64 * 1024 * 1024, checkpoint_interval_ms: 30000, gc_interval_ms: 60000, max_background_threads: 8,
..Self::default()
};
config.validate()?;
Ok(config)
}
pub fn low_memory() -> Result<Self> {
let config = Self {
memory_size: 64 * 1024 * 1024, page_size: 4 * 1024 * 1024, checkpoint_interval_ms: 2000, gc_interval_ms: 5000, max_background_threads: 2,
..Self::default()
};
config.validate()?;
Ok(config)
}
}
impl Default for Config {
fn default() -> Self {
Self {
memory_size: 1024 * 1024 * 1024, page_size: PAGE_SIZE,
storage_dir: "./rskv_data".to_string(),
enable_checkpointing: true,
checkpoint_interval_ms: 5000, enable_gc: true,
gc_interval_ms: 10000, max_background_threads: 4,
use_mmap: true, enable_readahead: true,
readahead_size: 1024 * 1024, enable_write_batching: true,
write_batch_size: 64 * 1024, enable_compression: false, sync_mode: SyncMode::Periodic,
preallocate_log: true,
log_prealloc_size: 100 * 1024 * 1024, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_address_utilities() {
let page = 100;
let offset = 1024;
let address = make_address(page, offset);
assert_eq!(get_page(address), page);
assert_eq!(get_offset(address), offset);
}
#[test]
fn test_record_info() {
let record_info = RecordInfo::new(42, 1, true, false, false);
assert_eq!(record_info.previous_address, 42);
assert_eq!(record_info.checkpoint_version, 1);
assert!(record_info.final_bit);
assert!(!record_info.tombstone);
assert!(!record_info.invalid);
assert!(!record_info.is_null());
}
#[test]
fn test_null_record_info() {
let record_info = RecordInfo::new(0, 0, false, false, false);
assert!(record_info.is_null());
}
}