#![allow(dead_code)]
use std::fs::{self, File};
use std::io::{BufReader, Read, Write, IoSlice};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use uuid::Uuid;
use xxhash_rust::xxh3::xxh3_128;
use crate::storage::{Result, StorageError, validate_segment_length};
const SEGMENT_MAGIC: &[u8; 8] = b"MINDSEG0";
const SEGMENT_VERSION: u32 = 1;
const PAGE_SIZE: usize = 1 << 12;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "level")]
pub enum CompressionCodec {
None,
Zstd(i32),
}
impl Default for CompressionCodec {
fn default() -> Self {
CompressionCodec::None
}
}
impl CompressionCodec {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(data.to_vec()),
CompressionCodec::Zstd(level) => {
let reader = std::io::Cursor::new(data);
zstd::stream::encode_all(reader, *level)
.map_err(|err| StorageError::Compression(err.to_string()))
}
}
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
match self {
CompressionCodec::None => Ok(data.to_vec()),
CompressionCodec::Zstd(_) => {
let reader = std::io::Cursor::new(data);
zstd::stream::decode_all(reader)
.map_err(|err| StorageError::Compression(err.to_string()))
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PageDescriptor {
pub offset: u64,
pub length: u32,
pub checksum: u128,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SegmentMetadata {
pub id: String,
pub path: PathBuf,
pub size_bytes: u64,
pub compressed_size_bytes: u64,
pub compression: CompressionCodec,
pub pages: Vec<PageDescriptor>,
pub sha256: String,
pub created_at_ms: u128,
}
impl SegmentMetadata {
pub fn page_count(&self) -> usize {
self.pages.len()
}
}
#[derive(Clone, Debug)]
pub struct FileSegment {
metadata: SegmentMetadata,
}
impl FileSegment {
pub fn new(metadata: SegmentMetadata) -> Self {
Self { metadata }
}
}
impl crate::storage::Segment for FileSegment {
fn metadata(&self) -> &SegmentMetadata {
&self.metadata
}
fn open_reader(&self) -> Result<SegmentReader> {
SegmentReader::open(&self.metadata.path)
}
}
pub struct SegmentWriter {
path: PathBuf,
codec: CompressionCodec,
buffer: Vec<u8>,
pages: Vec<PageDescriptor>,
}
impl SegmentWriter {
pub fn new<P: Into<PathBuf>>(path: P, codec: CompressionCodec) -> Self {
Self {
path: path.into(),
codec,
buffer: Vec::new(),
pages: Vec::new(),
}
}
pub fn with_capacity<P: Into<PathBuf>>(path: P, codec: CompressionCodec, capacity: usize) -> Self {
Self {
path: path.into(),
codec,
buffer: Vec::with_capacity(capacity),
pages: Vec::new(),
}
}
pub fn append_block(&mut self, block: &[u8]) {
let start_offset = self.buffer.len() as u64;
self.buffer.extend_from_slice(block);
let mut local_offset = 0usize;
while local_offset < block.len() {
let remaining = block.len() - local_offset;
let len = remaining.min(PAGE_SIZE);
let slice = &block[local_offset..local_offset + len];
let checksum = xxh3_128(slice);
self.pages.push(PageDescriptor {
offset: start_offset + local_offset as u64,
length: len as u32,
checksum,
});
local_offset += len;
}
}
pub fn append_entry(&mut self, key: &[u8], value: &[u8], sequence: u64, tombstone: bool) {
let start_offset = self.buffer.len() as u64;
self.buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
self.buffer.extend_from_slice(&(value.len() as u32).to_le_bytes());
self.buffer.extend_from_slice(&sequence.to_le_bytes());
self.buffer.push(if tombstone { 1 } else { 0 });
self.buffer.extend_from_slice(key);
self.buffer.extend_from_slice(value);
let appended_len = 4 + 4 + 8 + 1 + key.len() + value.len();
let mut local_offset = 0usize;
while local_offset < appended_len {
let remaining = appended_len - local_offset;
let len = remaining.min(PAGE_SIZE);
let start = (start_offset as usize) + local_offset;
let end = start + len;
let slice = &self.buffer[start..end];
let checksum = xxh3_128(slice);
self.pages.push(PageDescriptor {
offset: start_offset + local_offset as u64,
length: len as u32,
checksum,
});
local_offset += len;
}
}
pub fn bytes_written(&self) -> usize {
self.buffer.len()
}
pub fn finish(mut self) -> Result<FileSegment> {
if !self.buffer.is_empty() {
let remainder = self.buffer.len() % PAGE_SIZE;
if remainder != 0 {
let pad_len = PAGE_SIZE - remainder;
let new_len = self.buffer.len() + pad_len;
self.buffer.resize(new_len, 0);
if let Some(last_page) = self.pages.last_mut() {
last_page.length += pad_len as u32;
let start = last_page.offset as usize;
let end = start + last_page.length as usize;
last_page.checksum = xxh3_128(&self.buffer[start..end]);
}
}
}
validate_segment_length(self.buffer.len() as u64)?;
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}
let mut sha = Sha256::new();
sha.update(&self.buffer);
let digest = sha.finalize();
let sha256 = format!("{:x}", digest);
let mut compressed_vec: Option<Vec<u8>> = None;
let compressed_size_bytes: u64 = match &self.codec {
CompressionCodec::None => self.buffer.len() as u64,
CompressionCodec::Zstd(level) => {
let reader = std::io::Cursor::new(&self.buffer);
let compressed = zstd::stream::encode_all(reader, *level)
.map_err(|err| StorageError::Compression(err.to_string()))?;
let len = compressed.len() as u64;
compressed_vec = Some(compressed);
len
}
};
let metadata = SegmentMetadata {
id: Uuid::new_v4().to_string(),
path: self.path.clone(),
size_bytes: self.buffer.len() as u64,
compressed_size_bytes,
compression: self.codec.clone(),
pages: self.pages.clone(),
sha256,
created_at_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
};
let metadata_json = serde_json::to_vec(&metadata)?;
let mut file = File::create(&self.path)?;
let version_bytes = SEGMENT_VERSION.to_le_bytes();
let metadata_len_bytes = (metadata_json.len() as u32).to_le_bytes();
let mut header = Vec::with_capacity(
SEGMENT_MAGIC.len() + version_bytes.len() + metadata_len_bytes.len() + metadata_json.len(),
);
header.extend_from_slice(SEGMENT_MAGIC);
header.extend_from_slice(&version_bytes);
header.extend_from_slice(&metadata_len_bytes);
header.extend_from_slice(&metadata_json);
file.write_all(&header)?;
match &self.codec {
CompressionCodec::None => file.write_all(&self.buffer)?,
CompressionCodec::Zstd(_) => file.write_all(compressed_vec.as_ref().unwrap())?,
}
Ok(FileSegment::new(metadata))
}
}
pub struct SegmentReader {
metadata: SegmentMetadata,
buffer: Vec<u8>,
}
impl SegmentReader {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(path.as_ref())?;
let mut reader = BufReader::new(file);
let mut magic = [0u8; 8];
reader.read_exact(&mut magic)?;
if &magic != SEGMENT_MAGIC {
return Err(StorageError::InvalidFormat("invalid segment magic".into()));
}
let mut version = [0u8; 4];
reader.read_exact(&mut version)?;
if u32::from_le_bytes(version) != SEGMENT_VERSION {
return Err(StorageError::InvalidFormat(
"unsupported segment version".into(),
));
}
let mut metadata_len = [0u8; 4];
reader.read_exact(&mut metadata_len)?;
let metadata_len = u32::from_le_bytes(metadata_len) as usize;
let mut metadata_buf = vec![0u8; metadata_len];
reader.read_exact(&mut metadata_buf)?;
let mut metadata: SegmentMetadata = serde_json::from_slice(&metadata_buf)?;
metadata.path = path.as_ref().to_path_buf();
let mut compressed = Vec::new();
reader.read_to_end(&mut compressed)?;
if compressed.len() as u64 != metadata.compressed_size_bytes {
metadata.compressed_size_bytes = compressed.len() as u64;
}
let buffer = metadata.compression.decompress(&compressed)?;
if !buffer.is_empty() {
let remainder = buffer.len() % PAGE_SIZE;
if remainder != 0 {
return Err(StorageError::InvalidFormat(
"segment buffer is not page aligned".into(),
));
}
}
validate_segment_length(buffer.len() as u64)?;
let mut sha = Sha256::new();
sha.update(&buffer);
let computed = format!("{:x}", sha.finalize());
if computed != metadata.sha256 {
return Err(StorageError::InvalidFormat(
"segment digest mismatch".into(),
));
}
Ok(Self { metadata, buffer })
}
pub fn metadata(&self) -> &SegmentMetadata {
&self.metadata
}
pub fn page_count(&self) -> usize {
self.metadata.pages.len()
}
pub fn read_page(&self, index: usize) -> Result<&[u8]> {
let page = self
.metadata
.pages
.get(index)
.ok_or_else(|| StorageError::InvalidFormat("page index out of bounds".into()))?;
let start = page.offset as usize;
let end = start + page.length as usize;
let slice = self
.buffer
.get(start..end)
.ok_or_else(|| StorageError::InvalidFormat("page slice outside of bounds".into()))?;
let checksum = xxh3_128(slice);
if checksum != page.checksum {
return Err(StorageError::ChecksumMismatch {
segment: self.metadata.id.clone(),
page: index,
});
}
Ok(slice)
}
pub fn as_bytes(&self) -> &[u8] {
&self.buffer
}
}