use super::format::{CompressionType, DUMP_MAGIC_NUMBER, DUMP_VERSION};
use crate::{Result, Error, Tuple, Schema};
use std::path::{Path, PathBuf};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, BufReader, Write, Read, Seek, SeekFrom};
use std::sync::Arc;
use std::time::{SystemTime, Instant};
use std::collections::HashSet;
use parking_lot::{RwLock, Mutex};
use serde::{Serialize, Deserialize};
use tracing::{info, debug, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DumpType {
Full,
Incremental,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DumpMetadata {
pub dump_id: u64,
pub created_at: SystemTime,
pub dump_type: DumpType,
pub table_count: u32,
pub total_rows: u64,
pub compressed_size: u64,
pub uncompressed_size: u64,
pub checksum: String,
pub append_count: u32,
}
impl DumpMetadata {
pub fn new(dump_id: u64, dump_type: DumpType) -> Self {
Self {
dump_id,
created_at: SystemTime::now(),
dump_type,
table_count: 0,
total_rows: 0,
compressed_size: 0,
uncompressed_size: 0,
checksum: String::new(),
append_count: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexMetadata {
pub name: String,
pub index_type: String,
pub columns: Vec<String>,
pub is_unique: bool,
}
pub struct DirtyTracker {
last_dump_time: Arc<Mutex<Option<Instant>>>,
dirty: Arc<RwLock<bool>>,
dirty_tables: Arc<RwLock<HashSet<String>>>,
}
impl DirtyTracker {
pub fn new() -> Self {
Self {
last_dump_time: Arc::new(Mutex::new(None)),
dirty: Arc::new(RwLock::new(false)),
dirty_tables: Arc::new(RwLock::new(HashSet::new())),
}
}
pub fn mark_dirty(&self) {
*self.dirty.write() = true;
}
pub fn mark_table_dirty(&self, table: &str) {
self.dirty_tables.write().insert(table.to_string());
self.mark_dirty();
}
pub fn is_dirty(&self) -> bool {
*self.dirty.read()
}
pub fn get_dirty_tables(&self) -> Vec<String> {
self.dirty_tables.read().iter().cloned().collect()
}
pub fn clear_dirty(&self) {
*self.dirty.write() = false;
self.dirty_tables.write().clear();
*self.last_dump_time.lock() = Some(Instant::now());
}
pub fn time_since_last_dump(&self) -> Option<std::time::Duration> {
self.last_dump_time.lock().map(|t| t.elapsed())
}
}
impl Default for DirtyTracker {
fn default() -> Self {
Self::new()
}
}
pub trait DatabaseInterface: Send + Sync {
fn list_tables(&self) -> Result<Vec<String>>;
fn get_table_schema(&self, table: &str) -> Result<Schema>;
fn scan_table(&self, table: &str) -> Result<Vec<Tuple>>;
fn get_table_indexes(&self, table: &str) -> Result<Vec<IndexMetadata>>;
}
pub trait DatabaseRestoreInterface {
fn create_table(&mut self, name: &str, schema: Schema) -> Result<()>;
fn create_index(&mut self, table: &str, index: &IndexMetadata) -> Result<()>;
fn insert_row(&mut self, table: &str, row: Tuple) -> Result<()>;
}
pub struct DumpManager {
dump_history: Arc<RwLock<Vec<DumpMetadata>>>,
last_dump_time: Arc<Mutex<Instant>>,
dirty_tracker: Arc<DirtyTracker>,
compression: CompressionType,
data_dir: PathBuf,
next_dump_id: Arc<Mutex<u64>>,
}
impl DumpManager {
pub fn new(data_dir: PathBuf, compression: CompressionType) -> Self {
Self {
dump_history: Arc::new(RwLock::new(Vec::new())),
last_dump_time: Arc::new(Mutex::new(Instant::now())),
dirty_tracker: Arc::new(DirtyTracker::new()),
compression,
data_dir,
next_dump_id: Arc::new(Mutex::new(1)),
}
}
pub fn get_next_dump_id(&self) -> u64 {
let mut id = self.next_dump_id.lock();
let current = *id;
*id += 1;
current
}
pub fn dump<D: DatabaseInterface>(&self, opts: &DumpOptions, db: &D) -> Result<DumpReport> {
let start_time = Instant::now();
let metadata = match opts.format {
DumpOutputFormat::Binary => {
match opts.mode {
DumpMode::Full => self.create_full_dump(&opts.output_path, db)?,
DumpMode::Incremental => self.create_incremental_dump(&opts.output_path, db, opts.append)?,
}
}
DumpOutputFormat::Sql => {
self.create_sql_dump(&opts.output_path, db)?
}
};
let duration_ms = start_time.elapsed().as_millis() as u64;
let compression_ratio = if metadata.uncompressed_size > 0 {
metadata.compressed_size as f64 / metadata.uncompressed_size as f64
} else {
1.0
};
Ok(DumpReport {
dump_id: metadata.dump_id.to_string(),
tables_dumped: metadata.table_count as usize,
rows_dumped: metadata.total_rows,
bytes_written: metadata.compressed_size,
bytes_uncompressed: metadata.uncompressed_size,
duration_ms,
compression_ratio,
})
}
pub fn create_sql_dump<D: DatabaseInterface>(
&self,
output_path: &Path,
db: &D,
) -> Result<DumpMetadata> {
let start_time = Instant::now();
let dump_id = self.get_next_dump_id();
let mut metadata = DumpMetadata::new(dump_id, DumpType::Full);
info!("Starting SQL dump {} to {}", dump_id, output_path.display());
let file = File::create(output_path)
.map_err(|e| Error::storage(format!("Failed to create SQL dump file: {}", e)))?;
let mut writer = BufWriter::new(file);
writeln!(writer, "-- HeliosDB Lite Database Dump")
.map_err(|e| Error::storage(format!("Failed to write header: {}", e)))?;
writeln!(writer, "-- Generated: {}", chrono::Local::now().format("%Y-%m-%d %H:%M:%S"))
.map_err(|e| Error::storage(format!("Failed to write header: {}", e)))?;
writeln!(writer, "-- Database: heliosdb-nano\n")
.map_err(|e| Error::storage(format!("Failed to write header: {}", e)))?;
let tables = db.list_tables()?;
metadata.table_count = tables.len() as u32;
let mut total_rows = 0;
for table in tables {
let schema = db.get_table_schema(&table)?;
writeln!(writer, "-- Table: {}", table)
.map_err(|e| Error::storage(format!("Failed to write comment: {}", e)))?;
writeln!(writer, "CREATE TABLE IF NOT EXISTS {} (", table)
.map_err(|e| Error::storage(format!("Failed to write create table: {}", e)))?;
for (i, col) in schema.columns.iter().enumerate() {
let suffix = if i < schema.columns.len() - 1 { "," } else { "" };
let type_str = col.data_type.to_string();
let pk_str = if col.primary_key { " PRIMARY KEY" } else { "" };
let null_str = if !col.nullable { " NOT NULL" } else { "" };
writeln!(writer, " {} {}{}{}{}", col.name, type_str, pk_str, null_str, suffix)
.map_err(|e| Error::storage(format!("Failed to write column: {}", e)))?;
}
writeln!(writer, ");\n")
.map_err(|e| Error::storage(format!("Failed to write end table: {}", e)))?;
let rows = db.scan_table(&table)?;
total_rows += rows.len() as u64;
if !rows.is_empty() {
writeln!(writer, "INSERT INTO {} VALUES", table)
.map_err(|e| Error::storage(format!("Failed to write insert header: {}", e)))?;
for (i, row) in rows.iter().enumerate() {
let suffix = if i < rows.len() - 1 { "," } else { ";" };
let values: Vec<String> = row.values.iter().map(Self::format_value_for_sql).collect();
writeln!(writer, " ({}){}", values.join(", "), suffix)
.map_err(|e| Error::storage(format!("Failed to write row: {}", e)))?;
}
writeln!(writer, "\n")
.map_err(|e| Error::storage(format!("Failed to write end insert: {}", e)))?;
}
}
writer.flush().map_err(|e| Error::storage(format!("Failed to flush writer: {}", e)))?;
let file_size = std::fs::metadata(output_path)
.map_err(|e| Error::storage(format!("Failed to get file metadata: {}", e)))?
.len();
metadata.total_rows = total_rows;
metadata.compressed_size = file_size;
metadata.uncompressed_size = file_size;
self.dump_history.write().push(metadata.clone());
Ok(metadata)
}
fn format_value_for_sql(value: &crate::Value) -> String {
match value {
crate::Value::Null => "NULL".to_string(),
crate::Value::Boolean(b) => if *b { "TRUE".to_string() } else { "FALSE".to_string() },
crate::Value::Int2(i) => i.to_string(),
crate::Value::Int4(i) => i.to_string(),
crate::Value::Int8(i) => i.to_string(),
crate::Value::Float4(f) => f.to_string(),
crate::Value::Float8(f) => f.to_string(),
crate::Value::String(s) => format!("'{}'", s.replace('\'', "''")),
crate::Value::Timestamp(ts) => format!("'{}'", ts), _ => format!("'{}'", format!("{:?}", value).replace('\'', "''")),
}
}
pub fn restore<D: DatabaseRestoreInterface>(&self, opts: &RestoreOptions, db: &mut D) -> Result<RestoreReport> {
let start_time = Instant::now();
self.restore_from_dump(&opts.input_path, db)?;
let duration_ms = start_time.elapsed().as_millis() as u64;
Ok(RestoreReport {
tables_restored: 0, rows_restored: 0, duration_ms,
})
}
pub fn create_full_dump<D: DatabaseInterface>(
&self,
output_path: &Path,
db: &D,
) -> Result<DumpMetadata> {
let start_time = Instant::now();
let dump_id = self.get_next_dump_id();
let mut metadata = DumpMetadata::new(dump_id, DumpType::Full);
info!("Starting full dump {} to {}", dump_id, output_path.display());
let file = File::create(output_path)
.map_err(|e| Error::storage(format!("Failed to create dump file: {}", e)))?;
let mut writer = BufWriter::with_capacity(256 * 1024, file);
writer.write_all(DUMP_MAGIC_NUMBER)
.map_err(|e| Error::storage(format!("Failed to write magic bytes: {}", e)))?;
writer.write_all(&DUMP_VERSION.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write version: {}", e)))?;
let metadata_pos = writer.stream_position()
.map_err(|e| Error::storage(format!("Failed to get position: {}", e)))?;
let metadata_placeholder = vec![0u8; 8192]; writer.write_all(&metadata_placeholder)
.map_err(|e| Error::storage(format!("Failed to write placeholder: {}", e)))?;
let tables = db.list_tables()?;
metadata.table_count = tables.len() as u32;
let mut total_rows = 0u64;
let mut uncompressed_bytes = 0u64;
for (idx, table) in tables.iter().enumerate() {
debug!("Dumping table {}/{}: {}", idx + 1, tables.len(), table);
writer.write_all(b"TABL")
.map_err(|e| Error::storage(format!("Failed to write table marker: {}", e)))?;
let table_bytes = table.as_bytes();
writer.write_all(&(table_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write table name length: {}", e)))?;
writer.write_all(table_bytes)
.map_err(|e| Error::storage(format!("Failed to write table name: {}", e)))?;
let schema = db.get_table_schema(table)?;
let schema_bytes = bincode::serialize(&schema)
.map_err(|e| Error::storage(format!("Failed to serialize schema: {}", e)))?;
writer.write_all(&(schema_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write schema length: {}", e)))?;
writer.write_all(&schema_bytes)
.map_err(|e| Error::storage(format!("Failed to write schema: {}", e)))?;
uncompressed_bytes += schema_bytes.len() as u64;
let indexes = db.get_table_indexes(table).unwrap_or_default();
let indexes_bytes = bincode::serialize(&indexes)
.map_err(|e| Error::storage(format!("Failed to serialize indexes: {}", e)))?;
writer.write_all(&(indexes_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write indexes length: {}", e)))?;
writer.write_all(&indexes_bytes)
.map_err(|e| Error::storage(format!("Failed to write indexes: {}", e)))?;
uncompressed_bytes += indexes_bytes.len() as u64;
let rows = db.scan_table(table)?;
let row_count = rows.len() as u64;
total_rows += row_count;
writer.write_all(&row_count.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write row count: {}", e)))?;
const BATCH_SIZE: usize = 1000;
for batch in rows.chunks(BATCH_SIZE) {
let batch_bytes = bincode::serialize(batch)
.map_err(|e| Error::storage(format!("Failed to serialize batch: {}", e)))?;
uncompressed_bytes += batch_bytes.len() as u64;
let compressed = self.compress_data(&batch_bytes)?;
writer.write_all(&(compressed.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write batch length: {}", e)))?;
writer.write_all(&compressed)
.map_err(|e| Error::storage(format!("Failed to write batch: {}", e)))?;
}
writer.write_all(&0u32.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write EOT marker: {}", e)))?;
}
metadata.total_rows = total_rows;
metadata.uncompressed_size = uncompressed_bytes;
writer.write_all(b"ENDD")
.map_err(|e| Error::storage(format!("Failed to write end marker: {}", e)))?;
writer.flush()
.map_err(|e| Error::storage(format!("Failed to flush writer: {}", e)))?;
drop(writer);
let checksum = self.calculate_checksum(output_path)?;
metadata.checksum = checksum;
let file_size = std::fs::metadata(output_path)
.map_err(|e| Error::storage(format!("Failed to get file metadata: {}", e)))?
.len();
metadata.compressed_size = file_size;
self.write_metadata_header(output_path, metadata_pos, &metadata)?;
self.dump_history.write().push(metadata.clone());
self.dirty_tracker.clear_dirty();
*self.last_dump_time.lock() = Instant::now();
let elapsed = start_time.elapsed();
let throughput_mbps = (metadata.uncompressed_size as f64 / 1_048_576.0) / elapsed.as_secs_f64();
info!(
"Full dump {} completed: {} tables, {} rows, {:.2} MB in {:.2}s ({:.2} MB/s)",
dump_id,
metadata.table_count,
metadata.total_rows,
metadata.uncompressed_size as f64 / 1_048_576.0,
elapsed.as_secs_f64(),
throughput_mbps
);
Ok(metadata)
}
pub fn create_incremental_dump<D: DatabaseInterface>(
&self,
output_path: &Path,
db: &D,
append: bool,
) -> Result<DumpMetadata> {
let dirty_tables = self.dirty_tracker.get_dirty_tables();
if dirty_tables.is_empty() {
return Err(Error::storage("No dirty tables to dump"));
}
let start_time = Instant::now();
let dump_id = self.get_next_dump_id();
let mut metadata = DumpMetadata::new(dump_id, DumpType::Incremental);
info!("Starting incremental dump {} (append={})", dump_id, append);
let file = if append && output_path.exists() {
OpenOptions::new()
.append(true)
.open(output_path)
.map_err(|e| Error::storage(format!("Failed to open dump file: {}", e)))?
} else {
File::create(output_path)
.map_err(|e| Error::storage(format!("Failed to create dump file: {}", e)))?
};
let mut writer = BufWriter::with_capacity(256 * 1024, file);
if !append || !output_path.exists() {
writer.write_all(DUMP_MAGIC_NUMBER)
.map_err(|e| Error::storage(format!("Failed to write magic bytes: {}", e)))?;
writer.write_all(&DUMP_VERSION.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write version: {}", e)))?;
let metadata_placeholder = vec![0u8; 8192];
writer.write_all(&metadata_placeholder)
.map_err(|e| Error::storage(format!("Failed to write placeholder: {}", e)))?;
}
writer.write_all(b"INCR")
.map_err(|e| Error::storage(format!("Failed to write incremental marker: {}", e)))?;
metadata.table_count = dirty_tables.len() as u32;
let mut total_rows = 0u64;
let mut uncompressed_bytes = 0u64;
for table in &dirty_tables {
debug!("Dumping dirty table: {}", table);
writer.write_all(b"TABL")
.map_err(|e| Error::storage(format!("Failed to write table marker: {}", e)))?;
let table_bytes = table.as_bytes();
writer.write_all(&(table_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write table name length: {}", e)))?;
writer.write_all(table_bytes)
.map_err(|e| Error::storage(format!("Failed to write table name: {}", e)))?;
let schema = db.get_table_schema(table)?;
let schema_bytes = bincode::serialize(&schema)
.map_err(|e| Error::storage(format!("Failed to serialize schema: {}", e)))?;
writer.write_all(&(schema_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write schema length: {}", e)))?;
writer.write_all(&schema_bytes)
.map_err(|e| Error::storage(format!("Failed to write schema: {}", e)))?;
uncompressed_bytes += schema_bytes.len() as u64;
let indexes = db.get_table_indexes(table).unwrap_or_default();
let indexes_bytes = bincode::serialize(&indexes)
.map_err(|e| Error::storage(format!("Failed to serialize indexes: {}", e)))?;
writer.write_all(&(indexes_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write indexes length: {}", e)))?;
writer.write_all(&indexes_bytes)
.map_err(|e| Error::storage(format!("Failed to write indexes: {}", e)))?;
uncompressed_bytes += indexes_bytes.len() as u64;
let rows = db.scan_table(table)?;
let row_count = rows.len() as u64;
total_rows += row_count;
writer.write_all(&row_count.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write row count: {}", e)))?;
for batch in rows.chunks(1000) {
let batch_bytes = bincode::serialize(batch)
.map_err(|e| Error::storage(format!("Failed to serialize batch: {}", e)))?;
uncompressed_bytes += batch_bytes.len() as u64;
let compressed = self.compress_data(&batch_bytes)?;
writer.write_all(&(compressed.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write batch length: {}", e)))?;
writer.write_all(&compressed)
.map_err(|e| Error::storage(format!("Failed to write batch: {}", e)))?;
}
writer.write_all(&0u32.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write EOT marker: {}", e)))?;
}
metadata.total_rows = total_rows;
metadata.uncompressed_size = uncompressed_bytes;
metadata.append_count = if append { 1 } else { 0 };
writer.flush()
.map_err(|e| Error::storage(format!("Failed to flush writer: {}", e)))?;
drop(writer);
let checksum = self.calculate_checksum(output_path)?;
metadata.checksum = checksum;
let file_size = std::fs::metadata(output_path)
.map_err(|e| Error::storage(format!("Failed to get file metadata: {}", e)))?
.len();
metadata.compressed_size = file_size;
self.dump_history.write().push(metadata.clone());
self.dirty_tracker.clear_dirty();
let elapsed = start_time.elapsed();
info!(
"Incremental dump {} completed: {} tables, {} rows in {:.2}s",
dump_id,
metadata.table_count,
metadata.total_rows,
elapsed.as_secs_f64()
);
Ok(metadata)
}
pub fn restore_from_dump<D: DatabaseRestoreInterface>(
&self,
input_path: &Path,
db: &mut D,
) -> Result<()> {
info!("Starting restore from {}", input_path.display());
self.validate_dump(input_path)?;
let file = File::open(input_path)
.map_err(|e| Error::storage(format!("Failed to open dump file: {}", e)))?;
let mut reader = BufReader::with_capacity(256 * 1024, file);
let mut magic = [0u8; 8];
reader.read_exact(&mut magic)
.map_err(|e| Error::storage(format!("Failed to read magic bytes: {}", e)))?;
if &magic != DUMP_MAGIC_NUMBER {
return Err(Error::storage("Invalid dump file: bad magic bytes"));
}
let mut version_bytes = [0u8; 4];
reader.read_exact(&mut version_bytes)
.map_err(|e| Error::storage(format!("Failed to read version: {}", e)))?;
let version = u32::from_le_bytes(version_bytes);
if version != DUMP_VERSION {
return Err(Error::storage(format!("Unsupported dump version: {}", version)));
}
reader.seek(SeekFrom::Current(8192))
.map_err(|e| Error::storage(format!("Failed to seek past metadata: {}", e)))?;
let mut total_tables = 0;
let mut total_rows = 0u64;
loop {
let mut marker = [0u8; 4];
if reader.read_exact(&mut marker).is_err() {
break; }
match &marker {
b"ENDD" => {
debug!("Reached end-of-dump marker");
break;
}
b"INCR" => {
debug!("Found incremental marker, continuing...");
continue;
}
b"TABL" => {
}
_ => {
return Err(Error::storage(format!("Invalid marker: {:?}", marker)));
}
}
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)
.map_err(|e| Error::storage(format!("Failed to read table name length: {}", e)))?;
let table_name_len = u32::from_le_bytes(len_bytes);
let mut table_bytes = vec![0u8; table_name_len as usize];
reader.read_exact(&mut table_bytes)
.map_err(|e| Error::storage(format!("Failed to read table name: {}", e)))?;
let table = String::from_utf8(table_bytes)
.map_err(|e| Error::storage(format!("Invalid table name: {}", e)))?;
debug!("Restoring table: {}", table);
let mut schema_len_bytes = [0u8; 4];
reader.read_exact(&mut schema_len_bytes)
.map_err(|e| Error::storage(format!("Failed to read schema length: {}", e)))?;
let schema_len = u32::from_le_bytes(schema_len_bytes);
let mut schema_bytes = vec![0u8; schema_len as usize];
reader.read_exact(&mut schema_bytes)
.map_err(|e| Error::storage(format!("Failed to read schema: {}", e)))?;
let schema: Schema = bincode::deserialize(&schema_bytes)
.map_err(|e| Error::storage(format!("Failed to deserialize schema: {}", e)))?;
let mut indexes_len_bytes = [0u8; 4];
reader.read_exact(&mut indexes_len_bytes)
.map_err(|e| Error::storage(format!("Failed to read indexes length: {}", e)))?;
let indexes_len = u32::from_le_bytes(indexes_len_bytes);
let mut indexes_bytes = vec![0u8; indexes_len as usize];
reader.read_exact(&mut indexes_bytes)
.map_err(|e| Error::storage(format!("Failed to read indexes: {}", e)))?;
let indexes: Vec<IndexMetadata> = bincode::deserialize(&indexes_bytes)
.map_err(|e| Error::storage(format!("Failed to deserialize indexes: {}", e)))?;
db.create_table(&table, schema)?;
for index in indexes {
db.create_index(&table, &index)?;
}
let mut row_count_bytes = [0u8; 8];
reader.read_exact(&mut row_count_bytes)
.map_err(|e| Error::storage(format!("Failed to read row count: {}", e)))?;
let row_count = u64::from_le_bytes(row_count_bytes);
let mut rows_read = 0u64;
loop {
let mut batch_len_bytes = [0u8; 4];
reader.read_exact(&mut batch_len_bytes)
.map_err(|e| Error::storage(format!("Failed to read batch length: {}", e)))?;
let batch_len = u32::from_le_bytes(batch_len_bytes);
if batch_len == 0 {
break;
}
let mut batch_bytes = vec![0u8; batch_len as usize];
reader.read_exact(&mut batch_bytes)
.map_err(|e| Error::storage(format!("Failed to read batch: {}", e)))?;
let decompressed = self.decompress_data(&batch_bytes)?;
let batch: Vec<Tuple> = bincode::deserialize(&decompressed)
.map_err(|e| Error::storage(format!("Failed to deserialize batch: {}", e)))?;
rows_read += batch.len() as u64;
for row in batch {
db.insert_row(&table, row)?;
}
}
if rows_read != row_count {
warn!("Row count mismatch for table {}: expected {}, got {}", table, row_count, rows_read);
}
total_tables += 1;
total_rows += rows_read;
}
info!("Restore completed: {} tables, {} rows", total_tables, total_rows);
Ok(())
}
pub fn list_dumps(&self) -> Vec<DumpMetadata> {
self.dump_history.read().clone()
}
pub fn validate_dump(&self, path: &Path) -> Result<()> {
if !path.exists() {
return Err(Error::storage("Dump file does not exist"));
}
let file = File::open(path)
.map_err(|e| Error::storage(format!("Failed to open dump file: {}", e)))?;
let mut reader = BufReader::new(file);
let mut magic = [0u8; 8];
reader.read_exact(&mut magic)
.map_err(|e| Error::storage(format!("Failed to read magic bytes: {}", e)))?;
if &magic != DUMP_MAGIC_NUMBER {
return Err(Error::storage("Invalid dump file: bad magic bytes"));
}
let mut version_bytes = [0u8; 4];
reader.read_exact(&mut version_bytes)
.map_err(|e| Error::storage(format!("Failed to read version: {}", e)))?;
let version = u32::from_le_bytes(version_bytes);
if version > DUMP_VERSION {
return Err(Error::storage(format!("Unsupported dump version: {}", version)));
}
drop(reader);
let _checksum = self.calculate_checksum(path)?;
debug!("Dump file validation passed: {}", path.display());
Ok(())
}
pub fn get_dump_metadata(&self, dump_id: u64) -> Result<DumpMetadata> {
self.dump_history
.read()
.iter()
.find(|m| m.dump_id == dump_id)
.cloned()
.ok_or_else(|| Error::storage(format!("Dump {} not found", dump_id)))
}
pub fn delete_old_dumps(&self, keep_count: usize) -> Result<()> {
let mut history = self.dump_history.write();
if history.len() <= keep_count {
return Ok(());
}
history.sort_by(|a, b| b.created_at.cmp(&a.created_at));
let removed = history.drain(keep_count..).collect::<Vec<_>>();
info!("Removed {} old dump(s) from history", removed.len());
Ok(())
}
pub fn dirty_tracker(&self) -> &Arc<DirtyTracker> {
&self.dirty_tracker
}
fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
match self.compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Zstd => {
zstd::bulk::compress(data, 3)
.map_err(|e| Error::compression(format!("Zstd compression failed: {}", e)))
}
CompressionType::Gzip | CompressionType::Brotli => {
zstd::bulk::compress(data, 3)
.map_err(|e| Error::compression(format!("Compression failed: {}", e)))
}
}
}
fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
match self.compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Zstd => {
zstd::bulk::decompress(data, 100 * 1024 * 1024) .map_err(|e| Error::compression(format!("Zstd decompression failed: {}", e)))
}
CompressionType::Gzip | CompressionType::Brotli => {
zstd::bulk::decompress(data, 100 * 1024 * 1024)
.map_err(|e| Error::compression(format!("Decompression failed: {}", e)))
}
}
}
fn calculate_checksum(&self, path: &Path) -> Result<String> {
let file = File::open(path)
.map_err(|e| Error::storage(format!("Failed to open file for checksum: {}", e)))?;
let mut reader = BufReader::new(file);
let mut buffer = vec![0u8; 8192];
let mut hasher = crc32fast::Hasher::new();
loop {
let bytes_read = reader.read(&mut buffer)
.map_err(|e| Error::storage(format!("Failed to read file: {}", e)))?;
if bytes_read == 0 {
break;
}
if let Some(data) = buffer.get(..bytes_read) {
hasher.update(data);
}
}
Ok(format!("{:08x}", hasher.finalize()))
}
fn write_metadata_header(
&self,
path: &Path,
position: u64,
metadata: &DumpMetadata,
) -> Result<()> {
let file = OpenOptions::new()
.write(true)
.open(path)
.map_err(|e| Error::storage(format!("Failed to open dump file: {}", e)))?;
let mut writer = BufWriter::new(file);
writer.seek(SeekFrom::Start(position))
.map_err(|e| Error::storage(format!("Failed to seek to metadata position: {}", e)))?;
let metadata_bytes = serde_json::to_vec(metadata)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
writer.write_all(&(metadata_bytes.len() as u32).to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to write metadata length: {}", e)))?;
writer.write_all(&metadata_bytes)
.map_err(|e| Error::storage(format!("Failed to write metadata: {}", e)))?;
writer.flush()
.map_err(|e| Error::storage(format!("Failed to flush writer: {}", e)))?;
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DumpMode {
Full,
Incremental,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DumpOutputFormat {
Binary,
Sql,
}
#[derive(Debug, Clone)]
pub struct DumpOptions {
pub output_path: PathBuf,
pub mode: DumpMode,
pub compression: CompressionType,
pub append: bool,
pub tables: Option<Vec<String>>,
pub verbose: bool,
pub connection: Option<String>,
pub format: DumpOutputFormat,
}
impl Default for DumpOptions {
fn default() -> Self {
Self {
output_path: PathBuf::from("backup.heliodump"),
mode: DumpMode::Full,
compression: CompressionType::Zstd,
append: false,
tables: None,
verbose: false,
connection: None,
format: DumpOutputFormat::Binary,
}
}
}
#[derive(Debug, Clone)]
pub struct RestoreOptions {
pub input_path: PathBuf,
pub target: Option<PathBuf>,
pub tables: Option<Vec<String>>,
pub verify: bool,
pub verbose: bool,
pub connection: Option<String>,
}
impl Default for RestoreOptions {
fn default() -> Self {
Self {
input_path: PathBuf::from("backup.heliodump"),
target: None,
tables: None,
verify: true,
verbose: false,
connection: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DumpReport {
pub dump_id: String,
pub tables_dumped: usize,
pub rows_dumped: u64,
pub bytes_written: u64,
pub bytes_uncompressed: u64,
pub duration_ms: u64,
pub compression_ratio: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RestoreReport {
pub tables_restored: usize,
pub rows_restored: u64,
pub duration_ms: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use crate::{Column, DataType, Value};
struct MockDatabase {
tables: HashMap<String, (Schema, Vec<Tuple>)>,
indexes: HashMap<String, Vec<IndexMetadata>>,
}
impl MockDatabase {
fn new() -> Self {
Self {
tables: HashMap::new(),
indexes: HashMap::new(),
}
}
fn add_table(&mut self, name: &str, schema: Schema, rows: Vec<Tuple>) {
self.tables.insert(name.to_string(), (schema, rows));
}
fn add_index(&mut self, table: &str, index: IndexMetadata) {
self.indexes.entry(table.to_string())
.or_insert_with(Vec::new)
.push(index);
}
}
impl DatabaseInterface for MockDatabase {
fn list_tables(&self) -> Result<Vec<String>> {
Ok(self.tables.keys().cloned().collect())
}
fn get_table_schema(&self, table: &str) -> Result<Schema> {
self.tables
.get(table)
.map(|(schema, _)| schema.clone())
.ok_or_else(|| Error::storage(format!("Table {} not found", table)))
}
fn scan_table(&self, table: &str) -> Result<Vec<Tuple>> {
self.tables
.get(table)
.map(|(_, rows)| rows.clone())
.ok_or_else(|| Error::storage(format!("Table {} not found", table)))
}
fn get_table_indexes(&self, table: &str) -> Result<Vec<IndexMetadata>> {
Ok(self.indexes.get(table).cloned().unwrap_or_default())
}
}
impl DatabaseRestoreInterface for MockDatabase {
fn create_table(&mut self, name: &str, schema: Schema) -> Result<()> {
self.tables.insert(name.to_string(), (schema, Vec::new()));
Ok(())
}
fn create_index(&mut self, table: &str, index: &IndexMetadata) -> Result<()> {
self.add_index(table, index.clone());
Ok(())
}
fn insert_row(&mut self, table: &str, row: Tuple) -> Result<()> {
if let Some((_, rows)) = self.tables.get_mut(table) {
rows.push(row);
Ok(())
} else {
Err(Error::storage(format!("Table {} not found", table)))
}
}
}
#[test]
fn test_dump_manager_creation() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::Zstd);
assert_eq!(manager.list_dumps().len(), 0);
}
#[test]
fn test_full_dump_creation() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::Zstd);
let mut db = MockDatabase::new();
let schema = Schema::new(vec![
Column::new("id", DataType::Int4),
Column::new("name", DataType::Text),
]);
let rows = vec![
Tuple::new(vec![Value::Int4(1), Value::String("Alice".to_string())]),
Tuple::new(vec![Value::Int4(2), Value::String("Bob".to_string())]),
];
db.add_table("users", schema, rows);
let dump_path = temp_dir.path().join("test.dump");
let metadata = manager.create_full_dump(&dump_path, &db)?;
assert_eq!(metadata.dump_type, DumpType::Full);
assert_eq!(metadata.table_count, 1);
assert_eq!(metadata.total_rows, 2);
assert!(dump_path.exists());
assert!(metadata.compressed_size > 0);
assert!(!metadata.checksum.is_empty());
Ok(())
}
#[test]
fn test_restore_from_dump() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
let mut db = MockDatabase::new();
let schema = Schema::new(vec![
Column::new("id", DataType::Int4),
Column::new("value", DataType::Float8),
]);
let rows = vec![
Tuple::new(vec![Value::Int4(1), Value::Float8(1.5)]),
Tuple::new(vec![Value::Int4(2), Value::Float8(2.5)]),
Tuple::new(vec![Value::Int4(3), Value::Float8(3.5)]),
];
db.add_table("data", schema, rows);
let dump_path = temp_dir.path().join("test_restore.dump");
manager.create_full_dump(&dump_path, &db)?;
let mut db2 = MockDatabase::new();
manager.restore_from_dump(&dump_path, &mut db2)?;
let restored_rows = db2.scan_table("data")?;
assert_eq!(restored_rows.len(), 3);
Ok(())
}
#[test]
fn test_compression_roundtrip() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::Zstd);
let test_data = b"Hello, World! This is test data for compression.".repeat(100);
let compressed = manager.compress_data(&test_data)?;
let decompressed = manager.decompress_data(&compressed)?;
assert_eq!(test_data.to_vec(), decompressed);
assert!(compressed.len() < test_data.len());
Ok(())
}
#[test]
fn test_dirty_tracker() {
let tracker = DirtyTracker::new();
assert!(!tracker.is_dirty());
tracker.mark_table_dirty("users");
assert!(tracker.is_dirty());
let dirty_tables = tracker.get_dirty_tables();
assert_eq!(dirty_tables.len(), 1);
assert!(dirty_tables.contains(&"users".to_string()));
tracker.clear_dirty();
assert!(!tracker.is_dirty());
assert_eq!(tracker.get_dirty_tables().len(), 0);
}
#[test]
fn test_large_dataset_throughput() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::Zstd);
let mut db = MockDatabase::new();
let schema = Schema::new(vec![
Column::new("id", DataType::Int4),
Column::new("data", DataType::Text),
]);
let mut rows = Vec::new();
for i in 0..100_000 {
rows.push(Tuple::new(vec![
Value::Int4(i),
Value::String(format!("Data row {} with some content", i)),
]));
}
db.add_table("large_table", schema, rows);
let start = Instant::now();
let dump_path = temp_dir.path().join("large.dump");
let metadata = manager.create_full_dump(&dump_path, &db)?;
let elapsed = start.elapsed();
let throughput_mbps = (metadata.uncompressed_size as f64 / 1_048_576.0) / elapsed.as_secs_f64();
println!("Dumped {} rows in {:?}", metadata.total_rows, elapsed);
println!("Throughput: {:.2} MB/s", throughput_mbps);
assert!(throughput_mbps > 3.0, "Throughput too low: {:.2} MB/s", throughput_mbps);
Ok(())
}
#[test]
fn test_validate_dump() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
let mut db = MockDatabase::new();
let schema = Schema::new(vec![Column::new("id", DataType::Int4)]);
db.add_table("test", schema, vec![]);
let dump_path = temp_dir.path().join("validate.dump");
manager.create_full_dump(&dump_path, &db)?;
manager.validate_dump(&dump_path)?;
let invalid_path = temp_dir.path().join("invalid.dump");
std::fs::write(&invalid_path, b"invalid data")?;
assert!(manager.validate_dump(&invalid_path).is_err());
Ok(())
}
#[test]
fn test_incremental_dump() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
manager.dirty_tracker().mark_table_dirty("users");
let mut db = MockDatabase::new();
let schema = Schema::new(vec![Column::new("id", DataType::Int4)]);
db.add_table("users", schema, vec![Tuple::new(vec![Value::Int4(1)])]);
let dump_path = temp_dir.path().join("incremental.dump");
let metadata = manager.create_incremental_dump(&dump_path, &db, false)?;
assert_eq!(metadata.dump_type, DumpType::Incremental);
assert_eq!(metadata.table_count, 1);
Ok(())
}
#[test]
fn test_dump_with_indexes() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
let mut db = MockDatabase::new();
let schema = Schema::new(vec![
Column::new("id", DataType::Int4),
Column::new("email", DataType::Text),
]);
db.add_table("users", schema, vec![]);
db.add_index("users", IndexMetadata {
name: "idx_email".to_string(),
index_type: "btree".to_string(),
columns: vec!["email".to_string()],
is_unique: true,
});
let dump_path = temp_dir.path().join("with_indexes.dump");
manager.create_full_dump(&dump_path, &db)?;
let mut db2 = MockDatabase::new();
manager.restore_from_dump(&dump_path, &mut db2)?;
let indexes = db2.get_table_indexes("users")?;
assert_eq!(indexes.len(), 1);
assert_eq!(indexes[0].name, "idx_email");
Ok(())
}
#[test]
fn test_get_next_dump_id() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
let id1 = manager.get_next_dump_id();
let id2 = manager.get_next_dump_id();
let id3 = manager.get_next_dump_id();
assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(id3, 3);
}
#[test]
fn test_delete_old_dumps() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
for i in 1..=5 {
let mut metadata = DumpMetadata::new(i, DumpType::Full);
metadata.created_at = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(i);
manager.dump_history.write().push(metadata);
}
assert_eq!(manager.list_dumps().len(), 5);
manager.delete_old_dumps(3)?;
assert_eq!(manager.list_dumps().len(), 3);
Ok(())
}
#[test]
fn test_checksum_calculation() -> Result<()> {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let manager = DumpManager::new(temp_dir.path().to_path_buf(), CompressionType::None);
let test_file = temp_dir.path().join("test.dat");
std::fs::write(&test_file, b"test data for checksum")?;
let checksum1 = manager.calculate_checksum(&test_file)?;
let checksum2 = manager.calculate_checksum(&test_file)?;
assert_eq!(checksum1, checksum2);
std::fs::write(&test_file, b"different test data")?;
let checksum3 = manager.calculate_checksum(&test_file)?;
assert_ne!(checksum1, checksum3);
Ok(())
}
}