use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fs::File;
use std::fs::OpenOptions;
use std::fs::{self};
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::ops::RangeInclusive;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use d_engine_core::Error;
use d_engine_core::HardState;
use d_engine_core::LogStore;
use d_engine_core::MetaStore;
use d_engine_core::StorageEngine;
use d_engine_core::StorageError;
use d_engine_proto::common::Entry;
use d_engine_proto::common::LogId;
use prost::Message;
use tonic::async_trait;
use tracing::info;
const HARD_STATE_FILE_NAME: &str = "hard_state.bin";
pub(crate) const HARD_STATE_KEY: &[u8] = b"hard_state";
#[derive(Debug)]
pub struct FileLogStore {
#[allow(unused)]
data_dir: PathBuf,
entries: Mutex<BTreeMap<u64, Entry>>,
last_index: AtomicU64,
file_handle: Mutex<File>,
index_positions: Mutex<BTreeMap<u64, u64>>, }
#[derive(Debug)]
pub struct FileMetaStore {
data_dir: PathBuf,
data: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
}
#[derive(Debug)]
pub struct FileStorageEngine {
log_store: Arc<FileLogStore>,
meta_store: Arc<FileMetaStore>,
data_dir: PathBuf,
}
impl StorageEngine for FileStorageEngine {
type LogStore = FileLogStore;
type MetaStore = FileMetaStore;
#[inline]
fn log_store(&self) -> Arc<Self::LogStore> {
self.log_store.clone()
}
#[inline]
fn meta_store(&self) -> Arc<Self::MetaStore> {
self.meta_store.clone()
}
}
impl FileStorageEngine {
pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
fs::create_dir_all(&data_dir)?;
let log_store = Arc::new(FileLogStore::new(data_dir.join("logs"))?);
let meta_store = Arc::new(FileMetaStore::new(data_dir.join("meta"))?);
Ok(Self {
log_store,
meta_store,
data_dir,
})
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
}
impl FileLogStore {
pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
fs::create_dir_all(&data_dir)?;
let log_file_path = data_dir.join("log.data");
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(log_file_path)?;
let entries = Mutex::new(BTreeMap::new());
let last_index = AtomicU64::new(0);
let index_positions = Mutex::new(BTreeMap::new());
let store = Self {
data_dir,
entries,
last_index,
file_handle: Mutex::new(file),
index_positions,
};
store.load_from_file()?;
Ok(store)
}
fn load_from_file(&self) -> Result<(), Error> {
let mut file = self.file_handle.lock().unwrap();
file.seek(SeekFrom::Start(0))?;
let mut entries = self.entries.lock().unwrap();
let mut index_positions = self.index_positions.lock().unwrap();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let mut pos = 0;
let mut max_index = 0;
while pos < buffer.len() {
let entry_position = pos as u64;
if pos + 8 > buffer.len() {
break;
}
let len_bytes = &buffer[pos..pos + 8];
let entry_len = u64::from_be_bytes([
len_bytes[0],
len_bytes[1],
len_bytes[2],
len_bytes[3],
len_bytes[4],
len_bytes[5],
len_bytes[6],
len_bytes[7],
]) as usize;
pos += 8;
if pos + entry_len > buffer.len() {
break;
}
let entry_data = &buffer[pos..pos + entry_len];
match Entry::decode(entry_data) {
Ok(entry) => {
entries.insert(entry.index, entry.clone());
index_positions.insert(entry.index, entry_position);
max_index = max_index.max(entry.index);
}
Err(e) => {
eprintln!("Failed to decode entry: {e}",);
}
}
pos += entry_len;
}
self.last_index.store(max_index, Ordering::SeqCst);
Ok(())
}
fn append_to_file(
&self,
entry: &Entry,
) -> Result<(), Error> {
let mut file = self.file_handle.lock().unwrap();
let position = file.seek(SeekFrom::End(0))?;
let encoded = entry.encode_to_vec();
let len = encoded.len() as u64;
file.write_all(&len.to_be_bytes())?;
file.write_all(&encoded)?;
file.flush()?;
let mut index_positions = self.index_positions.lock().unwrap();
index_positions.insert(entry.index, position);
Ok(())
}
#[allow(dead_code)]
#[cfg(test)]
pub(crate) fn reset_sync(&self) -> Result<(), Error> {
{
let mut file = self.file_handle.lock().unwrap();
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
file.flush()?;
}
{
let mut store = self.entries.lock().unwrap();
store.clear();
}
{
let mut index_positions = self.index_positions.lock().unwrap();
index_positions.clear();
}
self.last_index.store(0, Ordering::SeqCst);
Ok(())
}
}
#[async_trait]
impl LogStore for FileLogStore {
async fn persist_entries(
&self,
entries: Vec<Entry>,
) -> Result<(), Error> {
let mut max_index = 0;
for entry in entries {
self.append_to_file(&entry)?;
{
let mut store = self.entries.lock().unwrap();
store.insert(entry.index, entry.clone());
}
max_index = max_index.max(entry.index);
}
if max_index > 0 {
self.last_index.store(max_index, Ordering::SeqCst);
}
Ok(())
}
async fn entry(
&self,
index: u64,
) -> Result<Option<Entry>, Error> {
let store = self.entries.lock().unwrap();
Ok(store.get(&index).cloned())
}
fn get_entries(
&self,
range: RangeInclusive<u64>,
) -> Result<Vec<Entry>, Error> {
let store = self.entries.lock().unwrap();
let mut result = Vec::new();
for (_, entry) in store.range(range) {
result.push(entry.clone());
}
Ok(result)
}
async fn purge(
&self,
cutoff_index: LogId,
) -> Result<(), Error> {
let entries_to_keep: Vec<Entry> = {
let entries = self.entries.lock().unwrap();
entries
.range((cutoff_index.index + 1)..)
.map(|(_, entry)| entry.clone())
.collect()
};
{
let mut file = self.file_handle.lock().unwrap();
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
let mut new_positions = BTreeMap::new();
for entry in &entries_to_keep {
let position = file.stream_position()?;
let encoded = entry.encode_to_vec();
let len = encoded.len() as u64;
file.write_all(&len.to_be_bytes())?;
file.write_all(&encoded)?;
new_positions.insert(entry.index, position);
}
file.flush()?;
file.sync_all()?;
let mut index_positions = self.index_positions.lock().unwrap();
*index_positions = new_positions;
}
{
let mut entries = self.entries.lock().unwrap();
entries.retain(|&index, _| index > cutoff_index.index);
}
Ok(())
}
async fn truncate(
&self,
from_index: u64,
) -> Result<(), Error> {
let indexes_to_remove: Vec<u64> = {
let index_positions = self.index_positions.lock().unwrap();
index_positions.range(from_index..).map(|(k, _)| *k).collect()
};
{
let mut entries = self.entries.lock().unwrap();
for index in &indexes_to_remove {
entries.remove(index);
}
}
{
let mut index_positions = self.index_positions.lock().unwrap();
for index in &indexes_to_remove {
index_positions.remove(index);
}
}
if let Some(last_keep_position) = self
.index_positions
.lock()
.unwrap()
.range(..from_index)
.next_back()
.map(|(_, pos)| *pos)
{
let mut file = self.file_handle.lock().unwrap();
file.seek(SeekFrom::Start(last_keep_position))?;
let mut len_buffer = [0u8; 8];
file.read_exact(&mut len_buffer)?;
let entry_len = u64::from_be_bytes(len_buffer);
let truncate_pos = last_keep_position + 8 + entry_len;
file.set_len(truncate_pos)?;
} else {
let file = self.file_handle.lock().unwrap();
file.set_len(0)?;
}
if let Some(new_last_index) = self.index_positions.lock().unwrap().keys().next_back() {
self.last_index.store(*new_last_index, Ordering::SeqCst);
} else {
self.last_index.store(0, Ordering::SeqCst);
}
Ok(())
}
fn flush(&self) -> Result<(), Error> {
let mut file = self.file_handle.lock().unwrap();
file.flush()?;
file.sync_all()?;
Ok(())
}
async fn flush_async(&self) -> Result<(), Error> {
self.flush()
}
async fn reset(&self) -> Result<(), Error> {
{
let mut file = self.file_handle.lock().unwrap();
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
file.flush()?;
}
{
let mut store = self.entries.lock().unwrap();
store.clear();
}
{
let mut index_positions = self.index_positions.lock().unwrap();
index_positions.clear();
}
self.last_index.store(0, Ordering::SeqCst);
Ok(())
}
fn last_index(&self) -> u64 {
self.last_index.load(Ordering::SeqCst)
}
}
impl Drop for FileLogStore {
fn drop(&mut self) {
if let Err(e) = self.flush() {
tracing::error!("Failed to flush FileLogStore on drop: {}", e);
} else {
tracing::debug!("FileLogStore flushed successfully on drop");
}
}
}
impl FileMetaStore {
pub fn new(data_dir: PathBuf) -> Result<Self, Error> {
fs::create_dir_all(&data_dir)?;
let store = Self {
data_dir,
data: Mutex::new(HashMap::new()),
};
store.load_from_file()?;
Ok(store)
}
fn load_from_file(&self) -> Result<(), Error> {
let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
if hard_state_path.exists() {
let mut file = File::open(hard_state_path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
match bincode::deserialize::<HardState>(&buffer) {
Ok(_hard_state) => {
let mut data = self.data.lock().unwrap();
data.insert(HARD_STATE_KEY.to_vec(), buffer);
info!("Loaded hard state from file");
}
Err(e) => {
eprintln!("Failed to decode hard state: {e}",);
}
}
}
Ok(())
}
fn save_to_file(
&self,
key: &[u8],
value: &[u8],
) -> Result<(), Error> {
if key == HARD_STATE_KEY {
let hard_state_path = self.data_dir.join(HARD_STATE_FILE_NAME);
let mut file = File::create(hard_state_path)?;
file.write_all(value)?;
file.flush()?;
}
Ok(())
}
}
#[async_trait]
impl MetaStore for FileMetaStore {
fn save_hard_state(
&self,
state: &HardState,
) -> Result<(), Error> {
let serialized = bincode::serialize(state).map_err(StorageError::BincodeError)?;
let mut data = self.data.lock().unwrap();
data.insert(HARD_STATE_KEY.to_vec(), serialized.clone());
self.save_to_file(HARD_STATE_KEY, &serialized)?;
info!("Persisted hard state to file");
Ok(())
}
fn load_hard_state(&self) -> Result<Option<HardState>, Error> {
let data = self.data.lock().unwrap();
match data.get(HARD_STATE_KEY) {
Some(bytes) => {
let state = bincode::deserialize(bytes).map_err(StorageError::BincodeError)?;
info!("Loaded hard state from memory");
Ok(Some(state))
}
None => {
info!("No hard state found");
Ok(None)
}
}
}
fn flush(&self) -> Result<(), Error> {
Ok(())
}
async fn flush_async(&self) -> Result<(), Error> {
self.flush()
}
}