use std::collections::{HashMap, HashSet};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use crate::compression::{compress, decode_delta, decompress, encode_delta, should_compress};
use crate::error::{Result, SynaError};
use crate::types::{Atom, LogHeader, HEADER_SIZE, IS_COMPRESSED, IS_DELTA, IS_TOMBSTONE};
static DB_REGISTRY: Lazy<Mutex<HashMap<String, SynaDB>>> = Lazy::new(|| Mutex::new(HashMap::new()));
pub fn open_db(path: &str) -> Result<()> {
open_db_with_config(path, DbConfig::default())
}
pub fn open_db_with_config(path: &str, config: DbConfig) -> Result<()> {
let canonical_path = canonicalize_path(path)?;
let mut registry = DB_REGISTRY.lock();
if registry.contains_key(&canonical_path) {
return Ok(());
}
let db = SynaDB::with_config(path, config)?;
registry.insert(canonical_path, db);
Ok(())
}
pub fn close_db(path: &str) -> Result<()> {
let canonical_path = canonicalize_path(path)?;
let mut registry = DB_REGISTRY.lock();
let db = registry
.remove(&canonical_path)
.ok_or_else(|| SynaError::NotFound(path.to_string()))?;
db.close()
}
pub fn with_db<F, R>(path: &str, f: F) -> Result<R>
where
F: FnOnce(&mut SynaDB) -> Result<R>,
{
let canonical_path = canonicalize_path(path)?;
let mut registry = DB_REGISTRY.lock();
let db = registry
.get_mut(&canonical_path)
.ok_or_else(|| SynaError::NotFound(path.to_string()))?;
f(db)
}
fn canonicalize_path(path: &str) -> Result<String> {
let path_buf = PathBuf::from(path);
if let Ok(canonical) = std::fs::canonicalize(&path_buf) {
return Ok(canonical.to_string_lossy().to_string());
}
let parent = path_buf.parent().unwrap_or(Path::new("."));
let filename = path_buf
.file_name()
.ok_or_else(|| SynaError::InvalidPath(path.to_string()))?;
let canonical_parent = if parent.as_os_str().is_empty() || parent == Path::new(".") {
std::env::current_dir().map_err(|_| SynaError::InvalidPath(path.to_string()))?
} else {
std::fs::canonicalize(parent).map_err(|_| {
SynaError::InvalidPath(format!("Parent directory not found: {}", parent.display()))
})?
};
let canonical = canonical_parent.join(filename);
Ok(canonical.to_string_lossy().to_string())
}
#[derive(Clone)]
pub struct DbConfig {
pub enable_compression: bool,
pub enable_delta: bool,
pub sync_on_write: bool,
}
impl Default for DbConfig {
fn default() -> Self {
Self {
enable_compression: false,
enable_delta: false,
sync_on_write: true,
}
}
}
pub struct SynaDB {
pub(crate) path: PathBuf,
pub(crate) file: File,
pub(crate) index: HashMap<String, Vec<u64>>,
pub(crate) latest: HashMap<String, u64>,
pub(crate) previous_values: HashMap<String, Atom>,
pub(crate) deleted: HashSet<String>,
pub(crate) file_len: u64,
pub(crate) config: DbConfig,
pub(crate) write_lock: Mutex<()>,
}
impl SynaDB {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
Self::with_config(path, DbConfig::default())
}
pub fn with_config(path: impl AsRef<Path>, config: DbConfig) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let file_len = file.metadata()?.len();
let mut db = Self {
path,
file,
index: HashMap::new(),
latest: HashMap::new(),
previous_values: HashMap::new(),
deleted: HashSet::new(),
file_len,
config,
write_lock: Mutex::new(()),
};
if file_len > 0 {
let (_recovered, _skipped) = db.rebuild_index()?;
}
Ok(db)
}
pub fn append(&mut self, key: &str, value: Atom) -> Result<u64> {
if key.is_empty() {
return Err(SynaError::EmptyKey);
}
if key.len() > u16::MAX as usize {
return Err(SynaError::KeyTooLong(key.len()));
}
let _guard = self.write_lock.lock();
let (value_to_store, mut flags) = if self.config.enable_delta {
if let Atom::Float(current) = &value {
if let Some(Atom::Float(previous)) = self.previous_values.get(key) {
let delta = encode_delta(*current, *previous);
(Atom::Float(delta), IS_DELTA)
} else {
(value.clone(), 0u8)
}
} else {
(value.clone(), 0u8)
}
} else {
(value.clone(), 0u8)
};
let value_bytes = bincode::serialize(&value_to_store)?;
let final_bytes = if self.config.enable_compression && should_compress(&value_bytes) {
flags |= IS_COMPRESSED;
compress(&value_bytes)
} else {
value_bytes
};
let header = LogHeader::new(key.len() as u16, final_bytes.len() as u32, flags);
let offset = self.file_len;
self.file.seek(SeekFrom::End(0))?;
self.file.write_all(&header.to_bytes())?;
self.file.write_all(key.as_bytes())?;
self.file.write_all(&final_bytes)?;
if self.config.sync_on_write {
self.file.sync_data()?;
}
self.latest.insert(key.to_string(), offset);
self.index.entry(key.to_string()).or_default().push(offset);
self.deleted.remove(key);
self.file_len += HEADER_SIZE as u64 + key.len() as u64 + final_bytes.len() as u64;
self.previous_values.insert(key.to_string(), value);
Ok(offset)
}
pub fn append_floats_batch(&mut self, key: &str, values: &[f64]) -> Result<usize> {
if key.is_empty() {
return Err(SynaError::EmptyKey);
}
if key.len() > u16::MAX as usize {
return Err(SynaError::KeyTooLong(key.len()));
}
if values.is_empty() {
return Ok(0);
}
let _guard = self.write_lock.lock();
self.file.seek(SeekFrom::End(0))?;
for &value in values {
let atom = Atom::Float(value);
let (value_to_store, mut flags) = if self.config.enable_delta {
if let Some(Atom::Float(previous)) = self.previous_values.get(key) {
let delta = encode_delta(value, *previous);
(Atom::Float(delta), IS_DELTA)
} else {
(atom.clone(), 0u8)
}
} else {
(atom.clone(), 0u8)
};
let value_bytes = bincode::serialize(&value_to_store)?;
let final_bytes = if self.config.enable_compression && should_compress(&value_bytes) {
flags |= IS_COMPRESSED;
compress(&value_bytes)
} else {
value_bytes
};
let header = LogHeader::new(key.len() as u16, final_bytes.len() as u32, flags);
let offset = self.file_len;
self.file.write_all(&header.to_bytes())?;
self.file.write_all(key.as_bytes())?;
self.file.write_all(&final_bytes)?;
self.latest.insert(key.to_string(), offset);
self.index.entry(key.to_string()).or_default().push(offset);
self.deleted.remove(key);
self.file_len += HEADER_SIZE as u64 + key.len() as u64 + final_bytes.len() as u64;
self.previous_values.insert(key.to_string(), atom);
}
if self.config.sync_on_write {
self.file.sync_data()?;
}
Ok(values.len())
}
pub fn get(&mut self, key: &str) -> Result<Option<Atom>> {
if self.deleted.contains(key) {
return Ok(None);
}
if !self.latest.contains_key(key) {
return Ok(None);
}
let offsets = self.index.get(key).cloned().unwrap_or_default();
if offsets.is_empty() {
return Ok(None);
}
let mut current_float: Option<f64> = None;
let mut last_atom: Option<Atom> = None;
for offset in offsets {
let (_key, atom, flags) = self.read_entry_at(offset)?;
if flags & IS_DELTA != 0 {
if let Atom::Float(delta) = atom {
if let Some(prev) = current_float {
let absolute = decode_delta(delta, prev);
current_float = Some(absolute);
last_atom = Some(Atom::Float(absolute));
} else {
current_float = Some(delta);
last_atom = Some(Atom::Float(delta));
}
} else {
last_atom = Some(atom);
}
} else {
if let Atom::Float(f) = &atom {
current_float = Some(*f);
}
last_atom = Some(atom);
}
}
Ok(last_atom)
}
fn read_entry_at(&mut self, offset: u64) -> Result<(String, Atom, u8)> {
self.file.seek(SeekFrom::Start(offset))?;
let mut header_buf = [0u8; HEADER_SIZE];
self.file.read_exact(&mut header_buf)?;
let header = LogHeader::from_bytes(&header_buf);
if !header.is_valid() {
return Err(SynaError::CorruptedEntry(offset));
}
let mut key_buf = vec![0u8; header.key_len as usize];
self.file.read_exact(&mut key_buf)?;
let key = String::from_utf8(key_buf).map_err(|_| SynaError::CorruptedEntry(offset))?;
let mut value_buf = vec![0u8; header.val_len as usize];
self.file.read_exact(&mut value_buf)?;
let value_bytes = if header.flags & IS_COMPRESSED != 0 {
decompress(&value_buf)?
} else {
value_buf
};
let atom: Atom = bincode::deserialize(&value_bytes)?;
Ok((key, atom, header.flags))
}
pub(crate) fn rebuild_index(&mut self) -> Result<(usize, usize)> {
self.file.seek(SeekFrom::Start(0))?;
let mut offset = 0u64;
let mut entries_recovered = 0usize;
let mut entries_skipped = 0usize;
loop {
let entry_offset = offset;
let mut header_buf = [0u8; HEADER_SIZE];
match self.file.read_exact(&mut header_buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let header = LogHeader::from_bytes(&header_buf);
if !header.is_valid() {
eprintln!(
"Warning: Invalid header at offset {}, attempting to find next valid entry",
entry_offset
);
entries_skipped += 1;
if let Some(next_offset) = self.scan_for_next_valid_entry(entry_offset + 1)? {
offset = next_offset;
self.file.seek(SeekFrom::Start(offset))?;
continue;
} else {
break;
}
}
let mut key_buf = vec![0u8; header.key_len as usize];
match self.file.read_exact(&mut key_buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
eprintln!("Warning: Truncated key at offset {}", entry_offset);
entries_skipped += 1;
break;
}
Err(e) => return Err(e.into()),
}
let key = match String::from_utf8(key_buf) {
Ok(k) => k,
Err(_) => {
eprintln!("Warning: Invalid UTF-8 key at offset {}", entry_offset);
entries_skipped += 1;
if let Some(next_offset) =
self.scan_for_next_valid_entry(entry_offset + HEADER_SIZE as u64)?
{
offset = next_offset;
self.file.seek(SeekFrom::Start(offset))?;
continue;
} else {
break;
}
}
};
let value_len = header.val_len as u64;
match self.file.seek(SeekFrom::Current(value_len as i64)) {
Ok(new_pos) => {
if new_pos > self.file_len {
eprintln!("Warning: Truncated value at offset {}", entry_offset);
entries_skipped += 1;
break;
}
}
Err(_) => {
eprintln!("Warning: Truncated value at offset {}", entry_offset);
entries_skipped += 1;
break;
}
}
self.latest.insert(key.clone(), entry_offset);
self.index
.entry(key.clone())
.or_default()
.push(entry_offset);
if header.flags & IS_TOMBSTONE != 0 {
self.deleted.insert(key);
} else {
self.deleted.remove(&key);
}
entries_recovered += 1;
offset = entry_offset + HEADER_SIZE as u64 + header.key_len as u64 + value_len;
}
if entries_skipped > 0 {
eprintln!(
"Recovery complete: {} entries recovered, {} entries skipped",
entries_recovered, entries_skipped
);
}
Ok((entries_recovered, entries_skipped))
}
fn scan_for_next_valid_entry(&mut self, start_offset: u64) -> Result<Option<u64>> {
if start_offset >= self.file_len {
return Ok(None);
}
self.file.seek(SeekFrom::Start(start_offset))?;
let mut offset = start_offset;
let mut header_buf = [0u8; HEADER_SIZE];
while offset + HEADER_SIZE as u64 <= self.file_len {
self.file.seek(SeekFrom::Start(offset))?;
match self.file.read_exact(&mut header_buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
}
let header = LogHeader::from_bytes(&header_buf);
if self.is_likely_valid_header(&header, offset) {
return Ok(Some(offset));
}
offset += 1;
}
Ok(None)
}
fn is_likely_valid_header(&self, header: &LogHeader, offset: u64) -> bool {
if !header.is_valid() {
return false;
}
const MIN_TIMESTAMP: u64 = 946_684_800_000_000; const MAX_TIMESTAMP: u64 = 4_102_444_800_000_000;
if header.timestamp != 0
&& (header.timestamp < MIN_TIMESTAMP || header.timestamp > MAX_TIMESTAMP)
{
return false;
}
if header.key_len == 0 {
return false;
}
let entry_size = HEADER_SIZE as u64 + header.key_len as u64 + header.val_len as u64;
if offset + entry_size > self.file_len {
return false;
}
const VALID_FLAGS_MASK: u8 = IS_DELTA | IS_COMPRESSED | IS_TOMBSTONE;
if header.flags & !VALID_FLAGS_MASK != 0 {
return false;
}
true
}
pub fn close(self) -> Result<()> {
self.file.sync_all()?;
Ok(())
}
pub fn delete(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
return Err(SynaError::EmptyKey);
}
if key.len() > u16::MAX as usize {
return Err(SynaError::KeyTooLong(key.len()));
}
let _guard = self.write_lock.lock();
let value_bytes = bincode::serialize(&Atom::Null)?;
let header = LogHeader::new(key.len() as u16, value_bytes.len() as u32, IS_TOMBSTONE);
let offset = self.file_len;
self.file.seek(SeekFrom::End(0))?;
self.file.write_all(&header.to_bytes())?;
self.file.write_all(key.as_bytes())?;
self.file.write_all(&value_bytes)?;
if self.config.sync_on_write {
self.file.sync_data()?;
}
self.latest.insert(key.to_string(), offset);
self.index.entry(key.to_string()).or_default().push(offset);
self.deleted.insert(key.to_string());
self.file_len += HEADER_SIZE as u64 + key.len() as u64 + value_bytes.len() as u64;
Ok(())
}
pub fn keys(&self) -> Vec<String> {
self.latest
.keys()
.filter(|k| !self.deleted.contains(*k))
.cloned()
.collect()
}
pub fn exists(&self, key: &str) -> bool {
self.latest.contains_key(key) && !self.deleted.contains(key)
}
pub fn get_history(&mut self, key: &str) -> Result<Vec<Atom>> {
let offsets = self.index.get(key).cloned().unwrap_or_default();
let mut atoms = Vec::with_capacity(offsets.len());
let mut current_float: Option<f64> = None;
for offset in offsets {
let (_key, atom, flags) = self.read_entry_at(offset)?;
if flags & IS_TOMBSTONE != 0 {
continue;
}
if flags & IS_DELTA != 0 {
if let Atom::Float(delta) = atom {
if let Some(prev) = current_float {
let absolute = decode_delta(delta, prev);
current_float = Some(absolute);
atoms.push(Atom::Float(absolute));
} else {
current_float = Some(delta);
atoms.push(Atom::Float(delta));
}
} else {
atoms.push(atom);
}
} else {
if let Atom::Float(f) = &atom {
current_float = Some(*f);
}
atoms.push(atom);
}
}
Ok(atoms)
}
pub fn get_history_floats(&mut self, key: &str) -> Result<Vec<f64>> {
let history = self.get_history(key)?;
let floats = history
.into_iter()
.filter_map(|a| {
if let Atom::Float(f) = a {
Some(f)
} else {
None
}
})
.collect();
Ok(floats)
}
pub fn get_history_tensor(&mut self, key: &str) -> Result<(*mut f64, usize)> {
let floats = self.get_history_floats(key)?;
let boxed = floats.into_boxed_slice();
let len = boxed.len();
let ptr = Box::into_raw(boxed) as *mut f64;
Ok((ptr, len))
}
pub fn compact(&mut self) -> Result<()> {
let temp_path = self.path.with_extension("compact.tmp");
let mut temp_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
let keys_to_compact: Vec<String> = self
.latest
.keys()
.filter(|k| !self.deleted.contains(*k))
.cloned()
.collect();
let enable_compression = self.config.enable_compression;
for key in &keys_to_compact {
let value = match self.get_value_internal(key)? {
Some(v) => v,
None => continue, };
let value_bytes = bincode::serialize(&value)?;
let (final_bytes, flags) = if enable_compression && should_compress(&value_bytes) {
(compress(&value_bytes), IS_COMPRESSED)
} else {
(value_bytes, 0u8)
};
let header = LogHeader::new(key.len() as u16, final_bytes.len() as u32, flags);
temp_file.write_all(&header.to_bytes())?;
temp_file.write_all(key.as_bytes())?;
temp_file.write_all(&final_bytes)?;
}
temp_file.sync_all()?;
drop(temp_file);
let original_path = self.path.clone();
if let Err(e) = atomic_replace(&temp_path, &original_path) {
let _ = std::fs::remove_file(&temp_path);
return Err(SynaError::Io(e));
}
self.file = OpenOptions::new()
.read(true)
.write(true)
.open(&original_path)?;
self.file_len = self.file.metadata()?.len();
self.index.clear();
self.latest.clear();
self.deleted.clear();
self.previous_values.clear();
self.rebuild_index()?;
Ok(())
}
fn get_value_internal(&mut self, key: &str) -> Result<Option<Atom>> {
if self.deleted.contains(key) {
return Ok(None);
}
if !self.latest.contains_key(key) {
return Ok(None);
}
let offsets = self.index.get(key).cloned().unwrap_or_default();
if offsets.is_empty() {
return Ok(None);
}
let mut current_float: Option<f64> = None;
let mut last_atom: Option<Atom> = None;
for offset in offsets {
let (_key, atom, flags) = self.read_entry_at(offset)?;
if flags & IS_DELTA != 0 {
if let Atom::Float(delta) = atom {
if let Some(prev) = current_float {
let absolute = decode_delta(delta, prev);
current_float = Some(absolute);
last_atom = Some(Atom::Float(absolute));
} else {
current_float = Some(delta);
last_atom = Some(Atom::Float(delta));
}
} else {
last_atom = Some(atom);
}
} else {
if let Atom::Float(f) = &atom {
current_float = Some(*f);
}
last_atom = Some(atom);
}
}
Ok(last_atom)
}
}
#[allow(clippy::cast_slice_from_raw_parts)]
pub unsafe fn free_tensor(ptr: *mut f64, len: usize) {
if !ptr.is_null() && len > 0 {
let _ = Box::from_raw(std::slice::from_raw_parts_mut(ptr, len));
}
}
#[cfg(unix)]
fn atomic_replace(src: &Path, dst: &Path) -> std::io::Result<()> {
std::fs::rename(src, dst)
}
#[cfg(windows)]
fn atomic_replace(src: &Path, dst: &Path) -> std::io::Result<()> {
const MAX_RETRIES: u32 = 3;
for attempt in 0..MAX_RETRIES {
match std::fs::rename(src, dst) {
Ok(()) => return Ok(()),
Err(_) if attempt < MAX_RETRIES - 1 => {
let _ = std::fs::remove_file(dst);
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
Err(e) => return Err(e),
}
}
std::fs::rename(src, dst)
}